drbd: Pass struct packet_info down to the receive functions
[deliverable/linux.git] / drivers / block / drbd / drbd_receiver.c
CommitLineData
b411b363
PR
1/*
2 drbd_receiver.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
b411b363
PR
26#include <linux/module.h>
27
28#include <asm/uaccess.h>
29#include <net/sock.h>
30
b411b363
PR
31#include <linux/drbd.h>
32#include <linux/fs.h>
33#include <linux/file.h>
34#include <linux/in.h>
35#include <linux/mm.h>
36#include <linux/memcontrol.h>
37#include <linux/mm_inline.h>
38#include <linux/slab.h>
b411b363
PR
39#include <linux/pkt_sched.h>
40#define __KERNEL_SYSCALLS__
41#include <linux/unistd.h>
42#include <linux/vmalloc.h>
43#include <linux/random.h>
b411b363
PR
44#include <linux/string.h>
45#include <linux/scatterlist.h>
46#include "drbd_int.h"
b411b363
PR
47#include "drbd_req.h"
48
49#include "drbd_vli.h"
50
77351055
PR
51struct packet_info {
52 enum drbd_packet cmd;
e2857216
AG
53 unsigned int size;
54 unsigned int vnr;
77351055
PR
55};
56
b411b363
PR
57enum finish_epoch {
58 FE_STILL_LIVE,
59 FE_DESTROYED,
60 FE_RECYCLED,
61};
62
a4fbda8e
PR
63enum mdev_or_conn {
64 MDEV,
65 CONN,
66};
67
65d11ed6 68static int drbd_do_handshake(struct drbd_tconn *tconn);
13e6037d 69static int drbd_do_auth(struct drbd_tconn *tconn);
360cc740 70static int drbd_disconnected(int vnr, void *p, void *data);
b411b363
PR
71
72static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
99920dc5 73static int e_end_block(struct drbd_work *, int);
b411b363 74
b411b363
PR
75
76#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
77
45bb912b
LE
78/*
79 * some helper functions to deal with single linked page lists,
80 * page->private being our "next" pointer.
81 */
82
83/* If at least n pages are linked at head, get n pages off.
84 * Otherwise, don't modify head, and return NULL.
85 * Locking is the responsibility of the caller.
86 */
87static struct page *page_chain_del(struct page **head, int n)
88{
89 struct page *page;
90 struct page *tmp;
91
92 BUG_ON(!n);
93 BUG_ON(!head);
94
95 page = *head;
23ce4227
PR
96
97 if (!page)
98 return NULL;
99
45bb912b
LE
100 while (page) {
101 tmp = page_chain_next(page);
102 if (--n == 0)
103 break; /* found sufficient pages */
104 if (tmp == NULL)
105 /* insufficient pages, don't use any of them. */
106 return NULL;
107 page = tmp;
108 }
109
110 /* add end of list marker for the returned list */
111 set_page_private(page, 0);
112 /* actual return value, and adjustment of head */
113 page = *head;
114 *head = tmp;
115 return page;
116}
117
118/* may be used outside of locks to find the tail of a (usually short)
119 * "private" page chain, before adding it back to a global chain head
120 * with page_chain_add() under a spinlock. */
121static struct page *page_chain_tail(struct page *page, int *len)
122{
123 struct page *tmp;
124 int i = 1;
125 while ((tmp = page_chain_next(page)))
126 ++i, page = tmp;
127 if (len)
128 *len = i;
129 return page;
130}
131
132static int page_chain_free(struct page *page)
133{
134 struct page *tmp;
135 int i = 0;
136 page_chain_for_each_safe(page, tmp) {
137 put_page(page);
138 ++i;
139 }
140 return i;
141}
142
143static void page_chain_add(struct page **head,
144 struct page *chain_first, struct page *chain_last)
145{
146#if 1
147 struct page *tmp;
148 tmp = page_chain_tail(chain_first, NULL);
149 BUG_ON(tmp != chain_last);
150#endif
151
152 /* add chain to head */
153 set_page_private(chain_last, (unsigned long)*head);
154 *head = chain_first;
155}
156
157static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
b411b363
PR
158{
159 struct page *page = NULL;
45bb912b
LE
160 struct page *tmp = NULL;
161 int i = 0;
b411b363
PR
162
163 /* Yes, testing drbd_pp_vacant outside the lock is racy.
164 * So what. It saves a spin_lock. */
45bb912b 165 if (drbd_pp_vacant >= number) {
b411b363 166 spin_lock(&drbd_pp_lock);
45bb912b
LE
167 page = page_chain_del(&drbd_pp_pool, number);
168 if (page)
169 drbd_pp_vacant -= number;
b411b363 170 spin_unlock(&drbd_pp_lock);
45bb912b
LE
171 if (page)
172 return page;
b411b363 173 }
45bb912b 174
b411b363
PR
175 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
176 * "criss-cross" setup, that might cause write-out on some other DRBD,
177 * which in turn might block on the other node at this very place. */
45bb912b
LE
178 for (i = 0; i < number; i++) {
179 tmp = alloc_page(GFP_TRY);
180 if (!tmp)
181 break;
182 set_page_private(tmp, (unsigned long)page);
183 page = tmp;
184 }
185
186 if (i == number)
187 return page;
188
189 /* Not enough pages immediately available this time.
190 * No need to jump around here, drbd_pp_alloc will retry this
191 * function "soon". */
192 if (page) {
193 tmp = page_chain_tail(page, NULL);
194 spin_lock(&drbd_pp_lock);
195 page_chain_add(&drbd_pp_pool, page, tmp);
196 drbd_pp_vacant += i;
197 spin_unlock(&drbd_pp_lock);
198 }
199 return NULL;
b411b363
PR
200}
201
b411b363
PR
202static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
203{
db830c46 204 struct drbd_peer_request *peer_req;
b411b363
PR
205 struct list_head *le, *tle;
206
207 /* The EEs are always appended to the end of the list. Since
208 they are sent in order over the wire, they have to finish
209 in order. As soon as we see the first not finished we can
210 stop to examine the list... */
211
212 list_for_each_safe(le, tle, &mdev->net_ee) {
db830c46
AG
213 peer_req = list_entry(le, struct drbd_peer_request, w.list);
214 if (drbd_ee_has_active_page(peer_req))
b411b363
PR
215 break;
216 list_move(le, to_be_freed);
217 }
218}
219
220static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
221{
222 LIST_HEAD(reclaimed);
db830c46 223 struct drbd_peer_request *peer_req, *t;
b411b363 224
87eeee41 225 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 226 reclaim_net_ee(mdev, &reclaimed);
87eeee41 227 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 228
db830c46
AG
229 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
230 drbd_free_net_ee(mdev, peer_req);
b411b363
PR
231}
232
233/**
45bb912b 234 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
b411b363 235 * @mdev: DRBD device.
45bb912b
LE
236 * @number: number of pages requested
237 * @retry: whether to retry, if not enough pages are available right now
238 *
239 * Tries to allocate number pages, first from our own page pool, then from
240 * the kernel, unless this allocation would exceed the max_buffers setting.
241 * Possibly retry until DRBD frees sufficient pages somewhere else.
b411b363 242 *
45bb912b 243 * Returns a page chain linked via page->private.
b411b363 244 */
45bb912b 245static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
b411b363
PR
246{
247 struct page *page = NULL;
248 DEFINE_WAIT(wait);
249
45bb912b
LE
250 /* Yes, we may run up to @number over max_buffers. If we
251 * follow it strictly, the admin will get it wrong anyways. */
89e58e75 252 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers)
45bb912b 253 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363 254
45bb912b 255 while (page == NULL) {
b411b363
PR
256 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
257
258 drbd_kick_lo_and_reclaim_net(mdev);
259
89e58e75 260 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers) {
45bb912b 261 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363
PR
262 if (page)
263 break;
264 }
265
266 if (!retry)
267 break;
268
269 if (signal_pending(current)) {
270 dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
271 break;
272 }
273
274 schedule();
275 }
276 finish_wait(&drbd_pp_wait, &wait);
277
45bb912b
LE
278 if (page)
279 atomic_add(number, &mdev->pp_in_use);
b411b363
PR
280 return page;
281}
282
283/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
87eeee41 284 * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
45bb912b
LE
285 * Either links the page chain back to the global pool,
286 * or returns all pages to the system. */
435f0740 287static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
b411b363 288{
435f0740 289 atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
b411b363 290 int i;
435f0740 291
81a5d60e 292 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
45bb912b
LE
293 i = page_chain_free(page);
294 else {
295 struct page *tmp;
296 tmp = page_chain_tail(page, &i);
297 spin_lock(&drbd_pp_lock);
298 page_chain_add(&drbd_pp_pool, page, tmp);
299 drbd_pp_vacant += i;
300 spin_unlock(&drbd_pp_lock);
b411b363 301 }
435f0740 302 i = atomic_sub_return(i, a);
45bb912b 303 if (i < 0)
435f0740
LE
304 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
305 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
b411b363
PR
306 wake_up(&drbd_pp_wait);
307}
308
309/*
310You need to hold the req_lock:
311 _drbd_wait_ee_list_empty()
312
313You must not have the req_lock:
314 drbd_free_ee()
315 drbd_alloc_ee()
316 drbd_init_ee()
317 drbd_release_ee()
318 drbd_ee_fix_bhs()
319 drbd_process_done_ee()
320 drbd_clear_done_ee()
321 drbd_wait_ee_list_empty()
322*/
323
f6ffca9f
AG
324struct drbd_peer_request *
325drbd_alloc_ee(struct drbd_conf *mdev, u64 id, sector_t sector,
326 unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
b411b363 327{
db830c46 328 struct drbd_peer_request *peer_req;
b411b363 329 struct page *page;
45bb912b 330 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
b411b363 331
0cf9d27e 332 if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
b411b363
PR
333 return NULL;
334
db830c46
AG
335 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
336 if (!peer_req) {
b411b363
PR
337 if (!(gfp_mask & __GFP_NOWARN))
338 dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
339 return NULL;
340 }
341
45bb912b
LE
342 page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
343 if (!page)
344 goto fail;
b411b363 345
db830c46
AG
346 drbd_clear_interval(&peer_req->i);
347 peer_req->i.size = data_size;
348 peer_req->i.sector = sector;
349 peer_req->i.local = false;
350 peer_req->i.waiting = false;
351
352 peer_req->epoch = NULL;
a21e9298 353 peer_req->w.mdev = mdev;
db830c46
AG
354 peer_req->pages = page;
355 atomic_set(&peer_req->pending_bios, 0);
356 peer_req->flags = 0;
9a8e7753
AG
357 /*
358 * The block_id is opaque to the receiver. It is not endianness
359 * converted, and sent back to the sender unchanged.
360 */
db830c46 361 peer_req->block_id = id;
b411b363 362
db830c46 363 return peer_req;
b411b363 364
45bb912b 365 fail:
db830c46 366 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
367 return NULL;
368}
369
db830c46 370void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
f6ffca9f 371 int is_net)
b411b363 372{
db830c46
AG
373 if (peer_req->flags & EE_HAS_DIGEST)
374 kfree(peer_req->digest);
375 drbd_pp_free(mdev, peer_req->pages, is_net);
376 D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
377 D_ASSERT(drbd_interval_empty(&peer_req->i));
378 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
379}
380
381int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
382{
383 LIST_HEAD(work_list);
db830c46 384 struct drbd_peer_request *peer_req, *t;
b411b363 385 int count = 0;
435f0740 386 int is_net = list == &mdev->net_ee;
b411b363 387
87eeee41 388 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 389 list_splice_init(list, &work_list);
87eeee41 390 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 391
db830c46
AG
392 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
393 drbd_free_some_ee(mdev, peer_req, is_net);
b411b363
PR
394 count++;
395 }
396 return count;
397}
398
399
32862ec7 400/* See also comments in _req_mod(,BARRIER_ACKED)
b411b363
PR
401 * and receive_Barrier.
402 *
403 * Move entries from net_ee to done_ee, if ready.
404 * Grab done_ee, call all callbacks, free the entries.
405 * The callbacks typically send out ACKs.
406 */
407static int drbd_process_done_ee(struct drbd_conf *mdev)
408{
409 LIST_HEAD(work_list);
410 LIST_HEAD(reclaimed);
db830c46 411 struct drbd_peer_request *peer_req, *t;
e2b3032b 412 int err = 0;
b411b363 413
87eeee41 414 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
415 reclaim_net_ee(mdev, &reclaimed);
416 list_splice_init(&mdev->done_ee, &work_list);
87eeee41 417 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 418
db830c46
AG
419 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
420 drbd_free_net_ee(mdev, peer_req);
b411b363
PR
421
422 /* possible callbacks here:
7be8da07 423 * e_end_block, and e_end_resync_block, e_send_discard_write.
b411b363
PR
424 * all ignore the last argument.
425 */
db830c46 426 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
e2b3032b
AG
427 int err2;
428
b411b363 429 /* list_del not necessary, next/prev members not touched */
e2b3032b
AG
430 err2 = peer_req->w.cb(&peer_req->w, !!err);
431 if (!err)
432 err = err2;
db830c46 433 drbd_free_ee(mdev, peer_req);
b411b363
PR
434 }
435 wake_up(&mdev->ee_wait);
436
e2b3032b 437 return err;
b411b363
PR
438}
439
440void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
441{
442 DEFINE_WAIT(wait);
443
444 /* avoids spin_lock/unlock
445 * and calling prepare_to_wait in the fast path */
446 while (!list_empty(head)) {
447 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
87eeee41 448 spin_unlock_irq(&mdev->tconn->req_lock);
7eaceacc 449 io_schedule();
b411b363 450 finish_wait(&mdev->ee_wait, &wait);
87eeee41 451 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
452 }
453}
454
455void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
456{
87eeee41 457 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 458 _drbd_wait_ee_list_empty(mdev, head);
87eeee41 459 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
460}
461
462/* see also kernel_accept; which is only present since 2.6.18.
463 * also we want to log which part of it failed, exactly */
7653620d 464static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
b411b363
PR
465{
466 struct sock *sk = sock->sk;
467 int err = 0;
468
469 *what = "listen";
470 err = sock->ops->listen(sock, 5);
471 if (err < 0)
472 goto out;
473
474 *what = "sock_create_lite";
475 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
476 newsock);
477 if (err < 0)
478 goto out;
479
480 *what = "accept";
481 err = sock->ops->accept(sock, *newsock, 0);
482 if (err < 0) {
483 sock_release(*newsock);
484 *newsock = NULL;
485 goto out;
486 }
487 (*newsock)->ops = sock->ops;
488
489out:
490 return err;
491}
492
dbd9eea0 493static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
b411b363
PR
494{
495 mm_segment_t oldfs;
496 struct kvec iov = {
497 .iov_base = buf,
498 .iov_len = size,
499 };
500 struct msghdr msg = {
501 .msg_iovlen = 1,
502 .msg_iov = (struct iovec *)&iov,
503 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
504 };
505 int rv;
506
507 oldfs = get_fs();
508 set_fs(KERNEL_DS);
509 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
510 set_fs(oldfs);
511
512 return rv;
513}
514
de0ff338 515static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
b411b363
PR
516{
517 mm_segment_t oldfs;
518 struct kvec iov = {
519 .iov_base = buf,
520 .iov_len = size,
521 };
522 struct msghdr msg = {
523 .msg_iovlen = 1,
524 .msg_iov = (struct iovec *)&iov,
525 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
526 };
527 int rv;
528
529 oldfs = get_fs();
530 set_fs(KERNEL_DS);
531
532 for (;;) {
de0ff338 533 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
b411b363
PR
534 if (rv == size)
535 break;
536
537 /* Note:
538 * ECONNRESET other side closed the connection
539 * ERESTARTSYS (on sock) we got a signal
540 */
541
542 if (rv < 0) {
543 if (rv == -ECONNRESET)
de0ff338 544 conn_info(tconn, "sock was reset by peer\n");
b411b363 545 else if (rv != -ERESTARTSYS)
de0ff338 546 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
b411b363
PR
547 break;
548 } else if (rv == 0) {
de0ff338 549 conn_info(tconn, "sock was shut down by peer\n");
b411b363
PR
550 break;
551 } else {
552 /* signal came in, or peer/link went down,
553 * after we read a partial message
554 */
555 /* D_ASSERT(signal_pending(current)); */
556 break;
557 }
558 };
559
560 set_fs(oldfs);
561
562 if (rv != size)
bbeb641c 563 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
b411b363
PR
564
565 return rv;
566}
567
c6967746
AG
568static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
569{
570 int err;
571
572 err = drbd_recv(tconn, buf, size);
573 if (err != size) {
574 if (err >= 0)
575 err = -EIO;
576 } else
577 err = 0;
578 return err;
579}
580
a5c31904
AG
581static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
582{
583 int err;
584
585 err = drbd_recv_all(tconn, buf, size);
586 if (err && !signal_pending(current))
587 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
588 return err;
589}
590
5dbf1673
LE
591/* quoting tcp(7):
592 * On individual connections, the socket buffer size must be set prior to the
593 * listen(2) or connect(2) calls in order to have it take effect.
594 * This is our wrapper to do so.
595 */
596static void drbd_setbufsize(struct socket *sock, unsigned int snd,
597 unsigned int rcv)
598{
599 /* open coded SO_SNDBUF, SO_RCVBUF */
600 if (snd) {
601 sock->sk->sk_sndbuf = snd;
602 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
603 }
604 if (rcv) {
605 sock->sk->sk_rcvbuf = rcv;
606 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
607 }
608}
609
eac3e990 610static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
b411b363
PR
611{
612 const char *what;
613 struct socket *sock;
614 struct sockaddr_in6 src_in6;
615 int err;
616 int disconnect_on_error = 1;
617
eac3e990 618 if (!get_net_conf(tconn))
b411b363
PR
619 return NULL;
620
621 what = "sock_create_kern";
eac3e990 622 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
b411b363
PR
623 SOCK_STREAM, IPPROTO_TCP, &sock);
624 if (err < 0) {
625 sock = NULL;
626 goto out;
627 }
628
629 sock->sk->sk_rcvtimeo =
eac3e990
PR
630 sock->sk->sk_sndtimeo = tconn->net_conf->try_connect_int*HZ;
631 drbd_setbufsize(sock, tconn->net_conf->sndbuf_size,
632 tconn->net_conf->rcvbuf_size);
b411b363
PR
633
634 /* explicitly bind to the configured IP as source IP
635 * for the outgoing connections.
636 * This is needed for multihomed hosts and to be
637 * able to use lo: interfaces for drbd.
638 * Make sure to use 0 as port number, so linux selects
639 * a free one dynamically.
640 */
eac3e990
PR
641 memcpy(&src_in6, tconn->net_conf->my_addr,
642 min_t(int, tconn->net_conf->my_addr_len, sizeof(src_in6)));
643 if (((struct sockaddr *)tconn->net_conf->my_addr)->sa_family == AF_INET6)
b411b363
PR
644 src_in6.sin6_port = 0;
645 else
646 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
647
648 what = "bind before connect";
649 err = sock->ops->bind(sock,
650 (struct sockaddr *) &src_in6,
eac3e990 651 tconn->net_conf->my_addr_len);
b411b363
PR
652 if (err < 0)
653 goto out;
654
655 /* connect may fail, peer not yet available.
656 * stay C_WF_CONNECTION, don't go Disconnecting! */
657 disconnect_on_error = 0;
658 what = "connect";
659 err = sock->ops->connect(sock,
eac3e990
PR
660 (struct sockaddr *)tconn->net_conf->peer_addr,
661 tconn->net_conf->peer_addr_len, 0);
b411b363
PR
662
663out:
664 if (err < 0) {
665 if (sock) {
666 sock_release(sock);
667 sock = NULL;
668 }
669 switch (-err) {
670 /* timeout, busy, signal pending */
671 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
672 case EINTR: case ERESTARTSYS:
673 /* peer not (yet) available, network problem */
674 case ECONNREFUSED: case ENETUNREACH:
675 case EHOSTDOWN: case EHOSTUNREACH:
676 disconnect_on_error = 0;
677 break;
678 default:
eac3e990 679 conn_err(tconn, "%s failed, err = %d\n", what, err);
b411b363
PR
680 }
681 if (disconnect_on_error)
bbeb641c 682 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363 683 }
eac3e990 684 put_net_conf(tconn);
b411b363
PR
685 return sock;
686}
687
7653620d 688static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
b411b363
PR
689{
690 int timeo, err;
691 struct socket *s_estab = NULL, *s_listen;
692 const char *what;
693
7653620d 694 if (!get_net_conf(tconn))
b411b363
PR
695 return NULL;
696
697 what = "sock_create_kern";
7653620d 698 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
b411b363
PR
699 SOCK_STREAM, IPPROTO_TCP, &s_listen);
700 if (err) {
701 s_listen = NULL;
702 goto out;
703 }
704
7653620d 705 timeo = tconn->net_conf->try_connect_int * HZ;
b411b363
PR
706 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
707
708 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
709 s_listen->sk->sk_rcvtimeo = timeo;
710 s_listen->sk->sk_sndtimeo = timeo;
7653620d
PR
711 drbd_setbufsize(s_listen, tconn->net_conf->sndbuf_size,
712 tconn->net_conf->rcvbuf_size);
b411b363
PR
713
714 what = "bind before listen";
715 err = s_listen->ops->bind(s_listen,
7653620d
PR
716 (struct sockaddr *) tconn->net_conf->my_addr,
717 tconn->net_conf->my_addr_len);
b411b363
PR
718 if (err < 0)
719 goto out;
720
7653620d 721 err = drbd_accept(&what, s_listen, &s_estab);
b411b363
PR
722
723out:
724 if (s_listen)
725 sock_release(s_listen);
726 if (err < 0) {
727 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
7653620d 728 conn_err(tconn, "%s failed, err = %d\n", what, err);
bbeb641c 729 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
730 }
731 }
7653620d 732 put_net_conf(tconn);
b411b363
PR
733
734 return s_estab;
735}
736
d38e787e 737static int drbd_send_fp(struct drbd_tconn *tconn, struct socket *sock, enum drbd_packet cmd)
b411b363 738{
5a87d920 739 struct p_header *h = tconn->data.sbuf;
b411b363 740
ecf2363c 741 return !_conn_send_cmd(tconn, 0, sock, cmd, h, sizeof(*h), 0);
b411b363
PR
742}
743
a25b63f1 744static enum drbd_packet drbd_recv_fp(struct drbd_tconn *tconn, struct socket *sock)
b411b363 745{
e6ef8a5c 746 struct p_header80 *h = tconn->data.rbuf;
b411b363
PR
747 int rr;
748
dbd9eea0 749 rr = drbd_recv_short(sock, h, sizeof(*h), 0);
b411b363 750
ca9bc12b 751 if (rr == sizeof(*h) && h->magic == cpu_to_be32(DRBD_MAGIC))
b411b363
PR
752 return be16_to_cpu(h->command);
753
754 return 0xffff;
755}
756
757/**
758 * drbd_socket_okay() - Free the socket if its connection is not okay
b411b363
PR
759 * @sock: pointer to the pointer to the socket.
760 */
dbd9eea0 761static int drbd_socket_okay(struct socket **sock)
b411b363
PR
762{
763 int rr;
764 char tb[4];
765
766 if (!*sock)
81e84650 767 return false;
b411b363 768
dbd9eea0 769 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
b411b363
PR
770
771 if (rr > 0 || rr == -EAGAIN) {
81e84650 772 return true;
b411b363
PR
773 } else {
774 sock_release(*sock);
775 *sock = NULL;
81e84650 776 return false;
b411b363
PR
777 }
778}
2325eb66
PR
779/* Gets called if a connection is established, or if a new minor gets created
780 in a connection */
781int drbd_connected(int vnr, void *p, void *data)
907599e0
PR
782{
783 struct drbd_conf *mdev = (struct drbd_conf *)p;
0829f5ed 784 int err;
907599e0
PR
785
786 atomic_set(&mdev->packet_seq, 0);
787 mdev->peer_seq = 0;
788
8410da8f
PR
789 mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
790 &mdev->tconn->cstate_mutex :
791 &mdev->own_state_mutex;
792
0829f5ed
AG
793 err = drbd_send_sync_param(mdev);
794 if (!err)
795 err = drbd_send_sizes(mdev, 0, 0);
796 if (!err)
797 err = drbd_send_uuids(mdev);
798 if (!err)
799 err = drbd_send_state(mdev);
907599e0
PR
800 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
801 clear_bit(RESIZE_PENDING, &mdev->flags);
0829f5ed 802 return err;
907599e0
PR
803}
804
b411b363
PR
805/*
806 * return values:
807 * 1 yes, we have a valid connection
808 * 0 oops, did not work out, please try again
809 * -1 peer talks different language,
810 * no point in trying again, please go standalone.
811 * -2 We do not have a network config...
812 */
907599e0 813static int drbd_connect(struct drbd_tconn *tconn)
b411b363
PR
814{
815 struct socket *s, *sock, *msock;
816 int try, h, ok;
817
bbeb641c 818 if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
b411b363
PR
819 return -2;
820
907599e0 821 clear_bit(DISCARD_CONCURRENT, &tconn->flags);
0916e0e3
AG
822
823 /* Assume that the peer only understands protocol 80 until we know better. */
824 tconn->agreed_pro_version = 80;
b411b363
PR
825
826 sock = NULL;
827 msock = NULL;
828
829 do {
830 for (try = 0;;) {
831 /* 3 tries, this should take less than a second! */
907599e0 832 s = drbd_try_connect(tconn);
b411b363
PR
833 if (s || ++try >= 3)
834 break;
835 /* give the other side time to call bind() & listen() */
20ee6390 836 schedule_timeout_interruptible(HZ / 10);
b411b363
PR
837 }
838
839 if (s) {
840 if (!sock) {
907599e0 841 drbd_send_fp(tconn, s, P_HAND_SHAKE_S);
b411b363
PR
842 sock = s;
843 s = NULL;
844 } else if (!msock) {
907599e0 845 drbd_send_fp(tconn, s, P_HAND_SHAKE_M);
b411b363
PR
846 msock = s;
847 s = NULL;
848 } else {
907599e0 849 conn_err(tconn, "Logic error in drbd_connect()\n");
b411b363
PR
850 goto out_release_sockets;
851 }
852 }
853
854 if (sock && msock) {
907599e0 855 schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
dbd9eea0
PR
856 ok = drbd_socket_okay(&sock);
857 ok = drbd_socket_okay(&msock) && ok;
b411b363
PR
858 if (ok)
859 break;
860 }
861
862retry:
907599e0 863 s = drbd_wait_for_connect(tconn);
b411b363 864 if (s) {
907599e0 865 try = drbd_recv_fp(tconn, s);
dbd9eea0
PR
866 drbd_socket_okay(&sock);
867 drbd_socket_okay(&msock);
b411b363
PR
868 switch (try) {
869 case P_HAND_SHAKE_S:
870 if (sock) {
907599e0 871 conn_warn(tconn, "initial packet S crossed\n");
b411b363
PR
872 sock_release(sock);
873 }
874 sock = s;
875 break;
876 case P_HAND_SHAKE_M:
877 if (msock) {
907599e0 878 conn_warn(tconn, "initial packet M crossed\n");
b411b363
PR
879 sock_release(msock);
880 }
881 msock = s;
907599e0 882 set_bit(DISCARD_CONCURRENT, &tconn->flags);
b411b363
PR
883 break;
884 default:
907599e0 885 conn_warn(tconn, "Error receiving initial packet\n");
b411b363
PR
886 sock_release(s);
887 if (random32() & 1)
888 goto retry;
889 }
890 }
891
bbeb641c 892 if (tconn->cstate <= C_DISCONNECTING)
b411b363
PR
893 goto out_release_sockets;
894 if (signal_pending(current)) {
895 flush_signals(current);
896 smp_rmb();
907599e0 897 if (get_t_state(&tconn->receiver) == EXITING)
b411b363
PR
898 goto out_release_sockets;
899 }
900
901 if (sock && msock) {
dbd9eea0
PR
902 ok = drbd_socket_okay(&sock);
903 ok = drbd_socket_okay(&msock) && ok;
b411b363
PR
904 if (ok)
905 break;
906 }
907 } while (1);
908
909 msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
910 sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
911
912 sock->sk->sk_allocation = GFP_NOIO;
913 msock->sk->sk_allocation = GFP_NOIO;
914
915 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
916 msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
917
b411b363 918 /* NOT YET ...
907599e0 919 * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
b411b363
PR
920 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
921 * first set it to the P_HAND_SHAKE timeout,
922 * which we set to 4x the configured ping_timeout. */
923 sock->sk->sk_sndtimeo =
907599e0 924 sock->sk->sk_rcvtimeo = tconn->net_conf->ping_timeo*4*HZ/10;
b411b363 925
907599e0
PR
926 msock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
927 msock->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
b411b363
PR
928
929 /* we don't want delays.
25985edc 930 * we use TCP_CORK where appropriate, though */
b411b363
PR
931 drbd_tcp_nodelay(sock);
932 drbd_tcp_nodelay(msock);
933
907599e0
PR
934 tconn->data.socket = sock;
935 tconn->meta.socket = msock;
936 tconn->last_received = jiffies;
b411b363 937
907599e0 938 h = drbd_do_handshake(tconn);
b411b363
PR
939 if (h <= 0)
940 return h;
941
907599e0 942 if (tconn->cram_hmac_tfm) {
b411b363 943 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
907599e0 944 switch (drbd_do_auth(tconn)) {
b10d96cb 945 case -1:
907599e0 946 conn_err(tconn, "Authentication of peer failed\n");
b411b363 947 return -1;
b10d96cb 948 case 0:
907599e0 949 conn_err(tconn, "Authentication of peer failed, trying again.\n");
b10d96cb 950 return 0;
b411b363
PR
951 }
952 }
953
bbeb641c 954 if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
b411b363
PR
955 return 0;
956
907599e0 957 sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
b411b363
PR
958 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
959
907599e0 960 drbd_thread_start(&tconn->asender);
b411b363 961
387eb308 962 if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
7e2455c1 963 return -1;
b411b363 964
907599e0 965 return !idr_for_each(&tconn->volumes, drbd_connected, tconn);
b411b363
PR
966
967out_release_sockets:
968 if (sock)
969 sock_release(sock);
970 if (msock)
971 sock_release(msock);
972 return -1;
973}
974
8172f3e9 975static int decode_header(struct drbd_tconn *tconn, struct p_header *h, struct packet_info *pi)
b411b363 976{
fd340c12 977 if (h->h80.magic == cpu_to_be32(DRBD_MAGIC)) {
77351055
PR
978 pi->cmd = be16_to_cpu(h->h80.command);
979 pi->size = be16_to_cpu(h->h80.length);
eefc2f7d 980 pi->vnr = 0;
ca9bc12b 981 } else if (h->h95.magic == cpu_to_be16(DRBD_MAGIC_BIG)) {
77351055
PR
982 pi->cmd = be16_to_cpu(h->h95.command);
983 pi->size = be32_to_cpu(h->h95.length) & 0x00ffffff;
984 pi->vnr = 0;
02918be2 985 } else {
ce243853 986 conn_err(tconn, "magic?? on data m: 0x%08x c: %d l: %d\n",
004352fa
LE
987 be32_to_cpu(h->h80.magic),
988 be16_to_cpu(h->h80.command),
989 be16_to_cpu(h->h80.length));
8172f3e9 990 return -EINVAL;
b411b363 991 }
8172f3e9 992 return 0;
257d0af6
PR
993}
994
9ba7aa00 995static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
257d0af6 996{
e6ef8a5c 997 struct p_header *h = tconn->data.rbuf;
69bc7bc3 998 int err;
257d0af6 999
a5c31904
AG
1000 err = drbd_recv_all_warn(tconn, h, sizeof(*h));
1001 if (err)
69bc7bc3 1002 return err;
257d0af6 1003
69bc7bc3 1004 err = decode_header(tconn, h, pi);
9ba7aa00 1005 tconn->last_received = jiffies;
b411b363 1006
69bc7bc3 1007 return err;
b411b363
PR
1008}
1009
2451fc3b 1010static void drbd_flush(struct drbd_conf *mdev)
b411b363
PR
1011{
1012 int rv;
1013
1014 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
fbd9b09a 1015 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
dd3932ed 1016 NULL);
b411b363
PR
1017 if (rv) {
1018 dev_err(DEV, "local disk flush failed with status %d\n", rv);
1019 /* would rather check on EOPNOTSUPP, but that is not reliable.
1020 * don't try again for ANY return value != 0
1021 * if (rv == -EOPNOTSUPP) */
1022 drbd_bump_write_ordering(mdev, WO_drain_io);
1023 }
1024 put_ldev(mdev);
1025 }
b411b363
PR
1026}
1027
1028/**
1029 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1030 * @mdev: DRBD device.
1031 * @epoch: Epoch object.
1032 * @ev: Epoch event.
1033 */
1034static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1035 struct drbd_epoch *epoch,
1036 enum epoch_event ev)
1037{
2451fc3b 1038 int epoch_size;
b411b363 1039 struct drbd_epoch *next_epoch;
b411b363
PR
1040 enum finish_epoch rv = FE_STILL_LIVE;
1041
1042 spin_lock(&mdev->epoch_lock);
1043 do {
1044 next_epoch = NULL;
b411b363
PR
1045
1046 epoch_size = atomic_read(&epoch->epoch_size);
1047
1048 switch (ev & ~EV_CLEANUP) {
1049 case EV_PUT:
1050 atomic_dec(&epoch->active);
1051 break;
1052 case EV_GOT_BARRIER_NR:
1053 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
b411b363
PR
1054 break;
1055 case EV_BECAME_LAST:
1056 /* nothing to do*/
1057 break;
1058 }
1059
b411b363
PR
1060 if (epoch_size != 0 &&
1061 atomic_read(&epoch->active) == 0 &&
2451fc3b 1062 test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
b411b363
PR
1063 if (!(ev & EV_CLEANUP)) {
1064 spin_unlock(&mdev->epoch_lock);
1065 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1066 spin_lock(&mdev->epoch_lock);
1067 }
1068 dec_unacked(mdev);
1069
1070 if (mdev->current_epoch != epoch) {
1071 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1072 list_del(&epoch->list);
1073 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1074 mdev->epochs--;
b411b363
PR
1075 kfree(epoch);
1076
1077 if (rv == FE_STILL_LIVE)
1078 rv = FE_DESTROYED;
1079 } else {
1080 epoch->flags = 0;
1081 atomic_set(&epoch->epoch_size, 0);
698f9315 1082 /* atomic_set(&epoch->active, 0); is already zero */
b411b363
PR
1083 if (rv == FE_STILL_LIVE)
1084 rv = FE_RECYCLED;
2451fc3b 1085 wake_up(&mdev->ee_wait);
b411b363
PR
1086 }
1087 }
1088
1089 if (!next_epoch)
1090 break;
1091
1092 epoch = next_epoch;
1093 } while (1);
1094
1095 spin_unlock(&mdev->epoch_lock);
1096
b411b363
PR
1097 return rv;
1098}
1099
1100/**
1101 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1102 * @mdev: DRBD device.
1103 * @wo: Write ordering method to try.
1104 */
1105void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1106{
1107 enum write_ordering_e pwo;
1108 static char *write_ordering_str[] = {
1109 [WO_none] = "none",
1110 [WO_drain_io] = "drain",
1111 [WO_bdev_flush] = "flush",
b411b363
PR
1112 };
1113
1114 pwo = mdev->write_ordering;
1115 wo = min(pwo, wo);
b411b363
PR
1116 if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1117 wo = WO_drain_io;
1118 if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1119 wo = WO_none;
1120 mdev->write_ordering = wo;
2451fc3b 1121 if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
b411b363
PR
1122 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1123}
1124
45bb912b 1125/**
fbe29dec 1126 * drbd_submit_peer_request()
45bb912b 1127 * @mdev: DRBD device.
db830c46 1128 * @peer_req: peer request
45bb912b 1129 * @rw: flag field, see bio->bi_rw
10f6d992
LE
1130 *
1131 * May spread the pages to multiple bios,
1132 * depending on bio_add_page restrictions.
1133 *
1134 * Returns 0 if all bios have been submitted,
1135 * -ENOMEM if we could not allocate enough bios,
1136 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1137 * single page to an empty bio (which should never happen and likely indicates
1138 * that the lower level IO stack is in some way broken). This has been observed
1139 * on certain Xen deployments.
45bb912b
LE
1140 */
1141/* TODO allocate from our own bio_set. */
fbe29dec
AG
1142int drbd_submit_peer_request(struct drbd_conf *mdev,
1143 struct drbd_peer_request *peer_req,
1144 const unsigned rw, const int fault_type)
45bb912b
LE
1145{
1146 struct bio *bios = NULL;
1147 struct bio *bio;
db830c46
AG
1148 struct page *page = peer_req->pages;
1149 sector_t sector = peer_req->i.sector;
1150 unsigned ds = peer_req->i.size;
45bb912b
LE
1151 unsigned n_bios = 0;
1152 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
10f6d992 1153 int err = -ENOMEM;
45bb912b
LE
1154
1155 /* In most cases, we will only need one bio. But in case the lower
1156 * level restrictions happen to be different at this offset on this
1157 * side than those of the sending peer, we may need to submit the
da4a75d2
LE
1158 * request in more than one bio.
1159 *
1160 * Plain bio_alloc is good enough here, this is no DRBD internally
1161 * generated bio, but a bio allocated on behalf of the peer.
1162 */
45bb912b
LE
1163next_bio:
1164 bio = bio_alloc(GFP_NOIO, nr_pages);
1165 if (!bio) {
1166 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1167 goto fail;
1168 }
db830c46 1169 /* > peer_req->i.sector, unless this is the first bio */
45bb912b
LE
1170 bio->bi_sector = sector;
1171 bio->bi_bdev = mdev->ldev->backing_bdev;
45bb912b 1172 bio->bi_rw = rw;
db830c46 1173 bio->bi_private = peer_req;
fcefa62e 1174 bio->bi_end_io = drbd_peer_request_endio;
45bb912b
LE
1175
1176 bio->bi_next = bios;
1177 bios = bio;
1178 ++n_bios;
1179
1180 page_chain_for_each(page) {
1181 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1182 if (!bio_add_page(bio, page, len, 0)) {
10f6d992
LE
1183 /* A single page must always be possible!
1184 * But in case it fails anyways,
1185 * we deal with it, and complain (below). */
1186 if (bio->bi_vcnt == 0) {
1187 dev_err(DEV,
1188 "bio_add_page failed for len=%u, "
1189 "bi_vcnt=0 (bi_sector=%llu)\n",
1190 len, (unsigned long long)bio->bi_sector);
1191 err = -ENOSPC;
1192 goto fail;
1193 }
45bb912b
LE
1194 goto next_bio;
1195 }
1196 ds -= len;
1197 sector += len >> 9;
1198 --nr_pages;
1199 }
1200 D_ASSERT(page == NULL);
1201 D_ASSERT(ds == 0);
1202
db830c46 1203 atomic_set(&peer_req->pending_bios, n_bios);
45bb912b
LE
1204 do {
1205 bio = bios;
1206 bios = bios->bi_next;
1207 bio->bi_next = NULL;
1208
45bb912b 1209 drbd_generic_make_request(mdev, fault_type, bio);
45bb912b 1210 } while (bios);
45bb912b
LE
1211 return 0;
1212
1213fail:
1214 while (bios) {
1215 bio = bios;
1216 bios = bios->bi_next;
1217 bio_put(bio);
1218 }
10f6d992 1219 return err;
45bb912b
LE
1220}
1221
53840641 1222static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
db830c46 1223 struct drbd_peer_request *peer_req)
53840641 1224{
db830c46 1225 struct drbd_interval *i = &peer_req->i;
53840641
AG
1226
1227 drbd_remove_interval(&mdev->write_requests, i);
1228 drbd_clear_interval(i);
1229
6c852bec 1230 /* Wake up any processes waiting for this peer request to complete. */
53840641
AG
1231 if (i->waiting)
1232 wake_up(&mdev->misc_wait);
1233}
1234
d8763023
AG
1235static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packet cmd,
1236 unsigned int data_size)
b411b363 1237{
2451fc3b 1238 int rv;
e6ef8a5c 1239 struct p_barrier *p = mdev->tconn->data.rbuf;
b411b363
PR
1240 struct drbd_epoch *epoch;
1241
b411b363
PR
1242 inc_unacked(mdev);
1243
b411b363
PR
1244 mdev->current_epoch->barrier_nr = p->barrier;
1245 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1246
1247 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1248 * the activity log, which means it would not be resynced in case the
1249 * R_PRIMARY crashes now.
1250 * Therefore we must send the barrier_ack after the barrier request was
1251 * completed. */
1252 switch (mdev->write_ordering) {
b411b363
PR
1253 case WO_none:
1254 if (rv == FE_RECYCLED)
82bc0194 1255 return 0;
2451fc3b
PR
1256
1257 /* receiver context, in the writeout path of the other node.
1258 * avoid potential distributed deadlock */
1259 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1260 if (epoch)
1261 break;
1262 else
1263 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1264 /* Fall through */
b411b363
PR
1265
1266 case WO_bdev_flush:
1267 case WO_drain_io:
b411b363 1268 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
2451fc3b
PR
1269 drbd_flush(mdev);
1270
1271 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1272 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1273 if (epoch)
1274 break;
b411b363
PR
1275 }
1276
2451fc3b
PR
1277 epoch = mdev->current_epoch;
1278 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1279
1280 D_ASSERT(atomic_read(&epoch->active) == 0);
1281 D_ASSERT(epoch->flags == 0);
b411b363 1282
82bc0194 1283 return 0;
2451fc3b
PR
1284 default:
1285 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
82bc0194 1286 return -EIO;
b411b363
PR
1287 }
1288
1289 epoch->flags = 0;
1290 atomic_set(&epoch->epoch_size, 0);
1291 atomic_set(&epoch->active, 0);
1292
1293 spin_lock(&mdev->epoch_lock);
1294 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1295 list_add(&epoch->list, &mdev->current_epoch->list);
1296 mdev->current_epoch = epoch;
1297 mdev->epochs++;
b411b363
PR
1298 } else {
1299 /* The current_epoch got recycled while we allocated this one... */
1300 kfree(epoch);
1301 }
1302 spin_unlock(&mdev->epoch_lock);
1303
82bc0194 1304 return 0;
b411b363
PR
1305}
1306
1307/* used from receive_RSDataReply (recv_resync_read)
1308 * and from receive_Data */
f6ffca9f
AG
1309static struct drbd_peer_request *
1310read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1311 int data_size) __must_hold(local)
b411b363 1312{
6666032a 1313 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
db830c46 1314 struct drbd_peer_request *peer_req;
b411b363 1315 struct page *page;
a5c31904 1316 int dgs, ds, err;
a0638456
PR
1317 void *dig_in = mdev->tconn->int_dig_in;
1318 void *dig_vv = mdev->tconn->int_dig_vv;
6b4388ac 1319 unsigned long *data;
b411b363 1320
a0638456
PR
1321 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1322 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
b411b363
PR
1323
1324 if (dgs) {
a5c31904
AG
1325 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1326 if (err)
b411b363 1327 return NULL;
b411b363
PR
1328 }
1329
1330 data_size -= dgs;
1331
841ce241
AG
1332 if (!expect(data_size != 0))
1333 return NULL;
1334 if (!expect(IS_ALIGNED(data_size, 512)))
1335 return NULL;
1336 if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1337 return NULL;
b411b363 1338
6666032a
LE
1339 /* even though we trust out peer,
1340 * we sometimes have to double check. */
1341 if (sector + (data_size>>9) > capacity) {
fdda6544
LE
1342 dev_err(DEV, "request from peer beyond end of local disk: "
1343 "capacity: %llus < sector: %llus + size: %u\n",
6666032a
LE
1344 (unsigned long long)capacity,
1345 (unsigned long long)sector, data_size);
1346 return NULL;
1347 }
1348
b411b363
PR
1349 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1350 * "criss-cross" setup, that might cause write-out on some other DRBD,
1351 * which in turn might block on the other node at this very place. */
db830c46
AG
1352 peer_req = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1353 if (!peer_req)
b411b363 1354 return NULL;
45bb912b 1355
b411b363 1356 ds = data_size;
db830c46 1357 page = peer_req->pages;
45bb912b
LE
1358 page_chain_for_each(page) {
1359 unsigned len = min_t(int, ds, PAGE_SIZE);
6b4388ac 1360 data = kmap(page);
a5c31904 1361 err = drbd_recv_all_warn(mdev->tconn, data, len);
0cf9d27e 1362 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
6b4388ac
PR
1363 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1364 data[0] = data[0] ^ (unsigned long)-1;
1365 }
b411b363 1366 kunmap(page);
a5c31904 1367 if (err) {
db830c46 1368 drbd_free_ee(mdev, peer_req);
b411b363
PR
1369 return NULL;
1370 }
a5c31904 1371 ds -= len;
b411b363
PR
1372 }
1373
1374 if (dgs) {
db830c46 1375 drbd_csum_ee(mdev, mdev->tconn->integrity_r_tfm, peer_req, dig_vv);
b411b363 1376 if (memcmp(dig_in, dig_vv, dgs)) {
470be44a
LE
1377 dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1378 (unsigned long long)sector, data_size);
db830c46 1379 drbd_free_ee(mdev, peer_req);
b411b363
PR
1380 return NULL;
1381 }
1382 }
1383 mdev->recv_cnt += data_size>>9;
db830c46 1384 return peer_req;
b411b363
PR
1385}
1386
1387/* drbd_drain_block() just takes a data block
1388 * out of the socket input buffer, and discards it.
1389 */
1390static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1391{
1392 struct page *page;
a5c31904 1393 int err = 0;
b411b363
PR
1394 void *data;
1395
c3470cde 1396 if (!data_size)
fc5be839 1397 return 0;
c3470cde 1398
45bb912b 1399 page = drbd_pp_alloc(mdev, 1, 1);
b411b363
PR
1400
1401 data = kmap(page);
1402 while (data_size) {
fc5be839
AG
1403 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1404
a5c31904
AG
1405 err = drbd_recv_all_warn(mdev->tconn, data, len);
1406 if (err)
b411b363 1407 break;
a5c31904 1408 data_size -= len;
b411b363
PR
1409 }
1410 kunmap(page);
435f0740 1411 drbd_pp_free(mdev, page, 0);
fc5be839 1412 return err;
b411b363
PR
1413}
1414
1415static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1416 sector_t sector, int data_size)
1417{
1418 struct bio_vec *bvec;
1419 struct bio *bio;
a5c31904 1420 int dgs, err, i, expect;
a0638456
PR
1421 void *dig_in = mdev->tconn->int_dig_in;
1422 void *dig_vv = mdev->tconn->int_dig_vv;
b411b363 1423
a0638456
PR
1424 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1425 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
b411b363
PR
1426
1427 if (dgs) {
a5c31904
AG
1428 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1429 if (err)
1430 return err;
b411b363
PR
1431 }
1432
1433 data_size -= dgs;
1434
1435 /* optimistically update recv_cnt. if receiving fails below,
1436 * we disconnect anyways, and counters will be reset. */
1437 mdev->recv_cnt += data_size>>9;
1438
1439 bio = req->master_bio;
1440 D_ASSERT(sector == bio->bi_sector);
1441
1442 bio_for_each_segment(bvec, bio, i) {
a5c31904 1443 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
b411b363 1444 expect = min_t(int, data_size, bvec->bv_len);
a5c31904 1445 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
b411b363 1446 kunmap(bvec->bv_page);
a5c31904
AG
1447 if (err)
1448 return err;
1449 data_size -= expect;
b411b363
PR
1450 }
1451
1452 if (dgs) {
a0638456 1453 drbd_csum_bio(mdev, mdev->tconn->integrity_r_tfm, bio, dig_vv);
b411b363
PR
1454 if (memcmp(dig_in, dig_vv, dgs)) {
1455 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
28284cef 1456 return -EINVAL;
b411b363
PR
1457 }
1458 }
1459
1460 D_ASSERT(data_size == 0);
28284cef 1461 return 0;
b411b363
PR
1462}
1463
1464/* e_end_resync_block() is called via
1465 * drbd_process_done_ee() by asender only */
99920dc5 1466static int e_end_resync_block(struct drbd_work *w, int unused)
b411b363 1467{
8050e6d0
AG
1468 struct drbd_peer_request *peer_req =
1469 container_of(w, struct drbd_peer_request, w);
00d56944 1470 struct drbd_conf *mdev = w->mdev;
db830c46 1471 sector_t sector = peer_req->i.sector;
99920dc5 1472 int err;
b411b363 1473
db830c46 1474 D_ASSERT(drbd_interval_empty(&peer_req->i));
b411b363 1475
db830c46
AG
1476 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1477 drbd_set_in_sync(mdev, sector, peer_req->i.size);
99920dc5 1478 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
b411b363
PR
1479 } else {
1480 /* Record failure to sync */
db830c46 1481 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
b411b363 1482
99920dc5 1483 err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
b411b363
PR
1484 }
1485 dec_unacked(mdev);
1486
99920dc5 1487 return err;
b411b363
PR
1488}
1489
1490static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1491{
db830c46 1492 struct drbd_peer_request *peer_req;
b411b363 1493
db830c46
AG
1494 peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1495 if (!peer_req)
45bb912b 1496 goto fail;
b411b363
PR
1497
1498 dec_rs_pending(mdev);
1499
b411b363
PR
1500 inc_unacked(mdev);
1501 /* corresponding dec_unacked() in e_end_resync_block()
1502 * respective _drbd_clear_done_ee */
1503
db830c46 1504 peer_req->w.cb = e_end_resync_block;
45bb912b 1505
87eeee41 1506 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 1507 list_add(&peer_req->w.list, &mdev->sync_ee);
87eeee41 1508 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 1509
0f0601f4 1510 atomic_add(data_size >> 9, &mdev->rs_sect_ev);
fbe29dec 1511 if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
e1c1b0fc 1512 return 0;
b411b363 1513
10f6d992
LE
1514 /* don't care for the reason here */
1515 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 1516 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 1517 list_del(&peer_req->w.list);
87eeee41 1518 spin_unlock_irq(&mdev->tconn->req_lock);
22cc37a9 1519
db830c46 1520 drbd_free_ee(mdev, peer_req);
45bb912b
LE
1521fail:
1522 put_ldev(mdev);
e1c1b0fc 1523 return -EIO;
b411b363
PR
1524}
1525
668eebc6 1526static struct drbd_request *
bc9c5c41
AG
1527find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1528 sector_t sector, bool missing_ok, const char *func)
51624585 1529{
51624585
AG
1530 struct drbd_request *req;
1531
bc9c5c41
AG
1532 /* Request object according to our peer */
1533 req = (struct drbd_request *)(unsigned long)id;
5e472264 1534 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
668eebc6 1535 return req;
c3afd8f5
AG
1536 if (!missing_ok) {
1537 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1538 (unsigned long)id, (unsigned long long)sector);
1539 }
51624585
AG
1540 return NULL;
1541}
1542
e2857216 1543static int receive_DataReply(struct drbd_conf *mdev, struct packet_info *pi)
b411b363
PR
1544{
1545 struct drbd_request *req;
1546 sector_t sector;
82bc0194 1547 int err;
e6ef8a5c 1548 struct p_data *p = mdev->tconn->data.rbuf;
b411b363
PR
1549
1550 sector = be64_to_cpu(p->sector);
1551
87eeee41 1552 spin_lock_irq(&mdev->tconn->req_lock);
bc9c5c41 1553 req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
87eeee41 1554 spin_unlock_irq(&mdev->tconn->req_lock);
c3afd8f5 1555 if (unlikely(!req))
82bc0194 1556 return -EIO;
b411b363 1557
24c4830c 1558 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
b411b363
PR
1559 * special casing it there for the various failure cases.
1560 * still no race with drbd_fail_pending_reads */
e2857216 1561 err = recv_dless_read(mdev, req, sector, pi->size);
82bc0194 1562 if (!err)
8554df1c 1563 req_mod(req, DATA_RECEIVED);
b411b363
PR
1564 /* else: nothing. handled from drbd_disconnect...
1565 * I don't think we may complete this just yet
1566 * in case we are "on-disconnect: freeze" */
1567
82bc0194 1568 return err;
b411b363
PR
1569}
1570
e2857216 1571static int receive_RSDataReply(struct drbd_conf *mdev, struct packet_info *pi)
b411b363
PR
1572{
1573 sector_t sector;
82bc0194 1574 int err;
e6ef8a5c 1575 struct p_data *p = mdev->tconn->data.rbuf;
b411b363
PR
1576
1577 sector = be64_to_cpu(p->sector);
1578 D_ASSERT(p->block_id == ID_SYNCER);
1579
1580 if (get_ldev(mdev)) {
1581 /* data is submitted to disk within recv_resync_read.
1582 * corresponding put_ldev done below on error,
fcefa62e 1583 * or in drbd_peer_request_endio. */
e2857216 1584 err = recv_resync_read(mdev, sector, pi->size);
b411b363
PR
1585 } else {
1586 if (__ratelimit(&drbd_ratelimit_state))
1587 dev_err(DEV, "Can not write resync data to local disk.\n");
1588
e2857216 1589 err = drbd_drain_block(mdev, pi->size);
b411b363 1590
e2857216 1591 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
b411b363
PR
1592 }
1593
e2857216 1594 atomic_add(pi->size >> 9, &mdev->rs_sect_in);
778f271d 1595
82bc0194 1596 return err;
b411b363
PR
1597}
1598
99920dc5 1599static int w_restart_write(struct drbd_work *w, int cancel)
7be8da07
AG
1600{
1601 struct drbd_request *req = container_of(w, struct drbd_request, w);
1602 struct drbd_conf *mdev = w->mdev;
1603 struct bio *bio;
1604 unsigned long start_time;
1605 unsigned long flags;
1606
1607 spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1608 if (!expect(req->rq_state & RQ_POSTPONED)) {
1609 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
99920dc5 1610 return -EIO;
7be8da07
AG
1611 }
1612 bio = req->master_bio;
1613 start_time = req->start_time;
1614 /* Postponed requests will not have their master_bio completed! */
1615 __req_mod(req, DISCARD_WRITE, NULL);
1616 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1617
1618 while (__drbd_make_request(mdev, bio, start_time))
1619 /* retry */ ;
99920dc5 1620 return 0;
7be8da07
AG
1621}
1622
1623static void restart_conflicting_writes(struct drbd_conf *mdev,
1624 sector_t sector, int size)
1625{
1626 struct drbd_interval *i;
1627 struct drbd_request *req;
1628
1629 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1630 if (!i->local)
1631 continue;
1632 req = container_of(i, struct drbd_request, i);
1633 if (req->rq_state & RQ_LOCAL_PENDING ||
1634 !(req->rq_state & RQ_POSTPONED))
1635 continue;
1636 if (expect(list_empty(&req->w.list))) {
1637 req->w.mdev = mdev;
1638 req->w.cb = w_restart_write;
1639 drbd_queue_work(&mdev->tconn->data.work, &req->w);
1640 }
1641 }
1642}
1643
b411b363
PR
1644/* e_end_block() is called via drbd_process_done_ee().
1645 * this means this function only runs in the asender thread
1646 */
99920dc5 1647static int e_end_block(struct drbd_work *w, int cancel)
b411b363 1648{
8050e6d0
AG
1649 struct drbd_peer_request *peer_req =
1650 container_of(w, struct drbd_peer_request, w);
00d56944 1651 struct drbd_conf *mdev = w->mdev;
db830c46 1652 sector_t sector = peer_req->i.sector;
99920dc5 1653 int err = 0, pcmd;
b411b363 1654
89e58e75 1655 if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C) {
db830c46 1656 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1657 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1658 mdev->state.conn <= C_PAUSED_SYNC_T &&
db830c46 1659 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
b411b363 1660 P_RS_WRITE_ACK : P_WRITE_ACK;
99920dc5 1661 err = drbd_send_ack(mdev, pcmd, peer_req);
b411b363 1662 if (pcmd == P_RS_WRITE_ACK)
db830c46 1663 drbd_set_in_sync(mdev, sector, peer_req->i.size);
b411b363 1664 } else {
99920dc5 1665 err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
b411b363
PR
1666 /* we expect it to be marked out of sync anyways...
1667 * maybe assert this? */
1668 }
1669 dec_unacked(mdev);
1670 }
1671 /* we delete from the conflict detection hash _after_ we sent out the
1672 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
89e58e75 1673 if (mdev->tconn->net_conf->two_primaries) {
87eeee41 1674 spin_lock_irq(&mdev->tconn->req_lock);
db830c46
AG
1675 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1676 drbd_remove_epoch_entry_interval(mdev, peer_req);
7be8da07
AG
1677 if (peer_req->flags & EE_RESTART_REQUESTS)
1678 restart_conflicting_writes(mdev, sector, peer_req->i.size);
87eeee41 1679 spin_unlock_irq(&mdev->tconn->req_lock);
bb3bfe96 1680 } else
db830c46 1681 D_ASSERT(drbd_interval_empty(&peer_req->i));
b411b363 1682
db830c46 1683 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
b411b363 1684
99920dc5 1685 return err;
b411b363
PR
1686}
1687
7be8da07 1688static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
b411b363 1689{
7be8da07 1690 struct drbd_conf *mdev = w->mdev;
8050e6d0
AG
1691 struct drbd_peer_request *peer_req =
1692 container_of(w, struct drbd_peer_request, w);
99920dc5 1693 int err;
b411b363 1694
99920dc5 1695 err = drbd_send_ack(mdev, ack, peer_req);
b411b363
PR
1696 dec_unacked(mdev);
1697
99920dc5 1698 return err;
b411b363
PR
1699}
1700
99920dc5 1701static int e_send_discard_write(struct drbd_work *w, int unused)
7be8da07
AG
1702{
1703 return e_send_ack(w, P_DISCARD_WRITE);
1704}
1705
99920dc5 1706static int e_send_retry_write(struct drbd_work *w, int unused)
7be8da07
AG
1707{
1708 struct drbd_tconn *tconn = w->mdev->tconn;
1709
1710 return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1711 P_RETRY_WRITE : P_DISCARD_WRITE);
1712}
1713
3e394da1
AG
1714static bool seq_greater(u32 a, u32 b)
1715{
1716 /*
1717 * We assume 32-bit wrap-around here.
1718 * For 24-bit wrap-around, we would have to shift:
1719 * a <<= 8; b <<= 8;
1720 */
1721 return (s32)a - (s32)b > 0;
1722}
1723
1724static u32 seq_max(u32 a, u32 b)
1725{
1726 return seq_greater(a, b) ? a : b;
1727}
1728
7be8da07
AG
1729static bool need_peer_seq(struct drbd_conf *mdev)
1730{
1731 struct drbd_tconn *tconn = mdev->tconn;
1732
1733 /*
1734 * We only need to keep track of the last packet_seq number of our peer
1735 * if we are in dual-primary mode and we have the discard flag set; see
1736 * handle_write_conflicts().
1737 */
1738 return tconn->net_conf->two_primaries &&
1739 test_bit(DISCARD_CONCURRENT, &tconn->flags);
1740}
1741
43ae077d 1742static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
3e394da1 1743{
3c13b680 1744 unsigned int newest_peer_seq;
3e394da1 1745
7be8da07
AG
1746 if (need_peer_seq(mdev)) {
1747 spin_lock(&mdev->peer_seq_lock);
3c13b680
LE
1748 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1749 mdev->peer_seq = newest_peer_seq;
7be8da07 1750 spin_unlock(&mdev->peer_seq_lock);
3c13b680
LE
1751 /* wake up only if we actually changed mdev->peer_seq */
1752 if (peer_seq == newest_peer_seq)
7be8da07
AG
1753 wake_up(&mdev->seq_wait);
1754 }
3e394da1
AG
1755}
1756
b411b363
PR
1757/* Called from receive_Data.
1758 * Synchronize packets on sock with packets on msock.
1759 *
1760 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1761 * packet traveling on msock, they are still processed in the order they have
1762 * been sent.
1763 *
1764 * Note: we don't care for Ack packets overtaking P_DATA packets.
1765 *
1766 * In case packet_seq is larger than mdev->peer_seq number, there are
1767 * outstanding packets on the msock. We wait for them to arrive.
1768 * In case we are the logically next packet, we update mdev->peer_seq
1769 * ourselves. Correctly handles 32bit wrap around.
1770 *
1771 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1772 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1773 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1774 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1775 *
1776 * returns 0 if we may process the packet,
1777 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
7be8da07 1778static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
b411b363
PR
1779{
1780 DEFINE_WAIT(wait);
b411b363 1781 long timeout;
7be8da07
AG
1782 int ret;
1783
1784 if (!need_peer_seq(mdev))
1785 return 0;
1786
b411b363
PR
1787 spin_lock(&mdev->peer_seq_lock);
1788 for (;;) {
7be8da07
AG
1789 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1790 mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1791 ret = 0;
b411b363 1792 break;
7be8da07 1793 }
b411b363
PR
1794 if (signal_pending(current)) {
1795 ret = -ERESTARTSYS;
1796 break;
1797 }
7be8da07 1798 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
b411b363 1799 spin_unlock(&mdev->peer_seq_lock);
71b1c1eb
AG
1800 timeout = mdev->tconn->net_conf->ping_timeo*HZ/10;
1801 timeout = schedule_timeout(timeout);
b411b363 1802 spin_lock(&mdev->peer_seq_lock);
7be8da07 1803 if (!timeout) {
b411b363 1804 ret = -ETIMEDOUT;
71b1c1eb 1805 dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
b411b363
PR
1806 break;
1807 }
1808 }
b411b363 1809 spin_unlock(&mdev->peer_seq_lock);
7be8da07 1810 finish_wait(&mdev->seq_wait, &wait);
b411b363
PR
1811 return ret;
1812}
1813
688593c5
LE
1814/* see also bio_flags_to_wire()
1815 * DRBD_REQ_*, because we need to semantically map the flags to data packet
1816 * flags and back. We may replicate to other kernel versions. */
1817static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
76d2e7ec 1818{
688593c5
LE
1819 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1820 (dpf & DP_FUA ? REQ_FUA : 0) |
1821 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1822 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
76d2e7ec
PR
1823}
1824
7be8da07
AG
1825static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1826 unsigned int size)
1827{
1828 struct drbd_interval *i;
1829
1830 repeat:
1831 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1832 struct drbd_request *req;
1833 struct bio_and_error m;
1834
1835 if (!i->local)
1836 continue;
1837 req = container_of(i, struct drbd_request, i);
1838 if (!(req->rq_state & RQ_POSTPONED))
1839 continue;
1840 req->rq_state &= ~RQ_POSTPONED;
1841 __req_mod(req, NEG_ACKED, &m);
1842 spin_unlock_irq(&mdev->tconn->req_lock);
1843 if (m.bio)
1844 complete_master_bio(mdev, &m);
1845 spin_lock_irq(&mdev->tconn->req_lock);
1846 goto repeat;
1847 }
1848}
1849
1850static int handle_write_conflicts(struct drbd_conf *mdev,
1851 struct drbd_peer_request *peer_req)
1852{
1853 struct drbd_tconn *tconn = mdev->tconn;
1854 bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1855 sector_t sector = peer_req->i.sector;
1856 const unsigned int size = peer_req->i.size;
1857 struct drbd_interval *i;
1858 bool equal;
1859 int err;
1860
1861 /*
1862 * Inserting the peer request into the write_requests tree will prevent
1863 * new conflicting local requests from being added.
1864 */
1865 drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1866
1867 repeat:
1868 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1869 if (i == &peer_req->i)
1870 continue;
1871
1872 if (!i->local) {
1873 /*
1874 * Our peer has sent a conflicting remote request; this
1875 * should not happen in a two-node setup. Wait for the
1876 * earlier peer request to complete.
1877 */
1878 err = drbd_wait_misc(mdev, i);
1879 if (err)
1880 goto out;
1881 goto repeat;
1882 }
1883
1884 equal = i->sector == sector && i->size == size;
1885 if (resolve_conflicts) {
1886 /*
1887 * If the peer request is fully contained within the
1888 * overlapping request, it can be discarded; otherwise,
1889 * it will be retried once all overlapping requests
1890 * have completed.
1891 */
1892 bool discard = i->sector <= sector && i->sector +
1893 (i->size >> 9) >= sector + (size >> 9);
1894
1895 if (!equal)
1896 dev_alert(DEV, "Concurrent writes detected: "
1897 "local=%llus +%u, remote=%llus +%u, "
1898 "assuming %s came first\n",
1899 (unsigned long long)i->sector, i->size,
1900 (unsigned long long)sector, size,
1901 discard ? "local" : "remote");
1902
1903 inc_unacked(mdev);
1904 peer_req->w.cb = discard ? e_send_discard_write :
1905 e_send_retry_write;
1906 list_add_tail(&peer_req->w.list, &mdev->done_ee);
1907 wake_asender(mdev->tconn);
1908
1909 err = -ENOENT;
1910 goto out;
1911 } else {
1912 struct drbd_request *req =
1913 container_of(i, struct drbd_request, i);
1914
1915 if (!equal)
1916 dev_alert(DEV, "Concurrent writes detected: "
1917 "local=%llus +%u, remote=%llus +%u\n",
1918 (unsigned long long)i->sector, i->size,
1919 (unsigned long long)sector, size);
1920
1921 if (req->rq_state & RQ_LOCAL_PENDING ||
1922 !(req->rq_state & RQ_POSTPONED)) {
1923 /*
1924 * Wait for the node with the discard flag to
1925 * decide if this request will be discarded or
1926 * retried. Requests that are discarded will
1927 * disappear from the write_requests tree.
1928 *
1929 * In addition, wait for the conflicting
1930 * request to finish locally before submitting
1931 * the conflicting peer request.
1932 */
1933 err = drbd_wait_misc(mdev, &req->i);
1934 if (err) {
1935 _conn_request_state(mdev->tconn,
1936 NS(conn, C_TIMEOUT),
1937 CS_HARD);
1938 fail_postponed_requests(mdev, sector, size);
1939 goto out;
1940 }
1941 goto repeat;
1942 }
1943 /*
1944 * Remember to restart the conflicting requests after
1945 * the new peer request has completed.
1946 */
1947 peer_req->flags |= EE_RESTART_REQUESTS;
1948 }
1949 }
1950 err = 0;
1951
1952 out:
1953 if (err)
1954 drbd_remove_epoch_entry_interval(mdev, peer_req);
1955 return err;
1956}
1957
b411b363 1958/* mirrored write */
e2857216 1959static int receive_Data(struct drbd_conf *mdev, struct packet_info *pi)
b411b363
PR
1960{
1961 sector_t sector;
db830c46 1962 struct drbd_peer_request *peer_req;
e6ef8a5c 1963 struct p_data *p = mdev->tconn->data.rbuf;
7be8da07 1964 u32 peer_seq = be32_to_cpu(p->seq_num);
b411b363
PR
1965 int rw = WRITE;
1966 u32 dp_flags;
7be8da07 1967 int err;
b411b363 1968
7be8da07 1969 if (!get_ldev(mdev)) {
82bc0194
AG
1970 int err2;
1971
7be8da07 1972 err = wait_for_and_update_peer_seq(mdev, peer_seq);
e2857216 1973 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
b411b363 1974 atomic_inc(&mdev->current_epoch->epoch_size);
e2857216 1975 err2 = drbd_drain_block(mdev, pi->size);
82bc0194
AG
1976 if (!err)
1977 err = err2;
1978 return err;
b411b363
PR
1979 }
1980
fcefa62e
AG
1981 /*
1982 * Corresponding put_ldev done either below (on various errors), or in
1983 * drbd_peer_request_endio, if we successfully submit the data at the
1984 * end of this function.
1985 */
b411b363
PR
1986
1987 sector = be64_to_cpu(p->sector);
e2857216 1988 peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
db830c46 1989 if (!peer_req) {
b411b363 1990 put_ldev(mdev);
82bc0194 1991 return -EIO;
b411b363
PR
1992 }
1993
db830c46 1994 peer_req->w.cb = e_end_block;
b411b363 1995
688593c5
LE
1996 dp_flags = be32_to_cpu(p->dp_flags);
1997 rw |= wire_flags_to_bio(mdev, dp_flags);
1998
1999 if (dp_flags & DP_MAY_SET_IN_SYNC)
db830c46 2000 peer_req->flags |= EE_MAY_SET_IN_SYNC;
688593c5 2001
b411b363 2002 spin_lock(&mdev->epoch_lock);
db830c46
AG
2003 peer_req->epoch = mdev->current_epoch;
2004 atomic_inc(&peer_req->epoch->epoch_size);
2005 atomic_inc(&peer_req->epoch->active);
b411b363
PR
2006 spin_unlock(&mdev->epoch_lock);
2007
7be8da07
AG
2008 if (mdev->tconn->net_conf->two_primaries) {
2009 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2010 if (err)
b411b363 2011 goto out_interrupted;
87eeee41 2012 spin_lock_irq(&mdev->tconn->req_lock);
7be8da07
AG
2013 err = handle_write_conflicts(mdev, peer_req);
2014 if (err) {
2015 spin_unlock_irq(&mdev->tconn->req_lock);
2016 if (err == -ENOENT) {
b411b363 2017 put_ldev(mdev);
82bc0194 2018 return 0;
b411b363 2019 }
7be8da07 2020 goto out_interrupted;
b411b363 2021 }
7be8da07
AG
2022 } else
2023 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2024 list_add(&peer_req->w.list, &mdev->active_ee);
87eeee41 2025 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 2026
89e58e75 2027 switch (mdev->tconn->net_conf->wire_protocol) {
b411b363
PR
2028 case DRBD_PROT_C:
2029 inc_unacked(mdev);
2030 /* corresponding dec_unacked() in e_end_block()
2031 * respective _drbd_clear_done_ee */
2032 break;
2033 case DRBD_PROT_B:
2034 /* I really don't like it that the receiver thread
2035 * sends on the msock, but anyways */
db830c46 2036 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
b411b363
PR
2037 break;
2038 case DRBD_PROT_A:
2039 /* nothing to do */
2040 break;
2041 }
2042
6719fb03 2043 if (mdev->state.pdsk < D_INCONSISTENT) {
b411b363 2044 /* In case we have the only disk of the cluster, */
db830c46
AG
2045 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2046 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2047 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2048 drbd_al_begin_io(mdev, peer_req->i.sector);
b411b363
PR
2049 }
2050
82bc0194
AG
2051 err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2052 if (!err)
2053 return 0;
b411b363 2054
10f6d992
LE
2055 /* don't care for the reason here */
2056 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 2057 spin_lock_irq(&mdev->tconn->req_lock);
db830c46
AG
2058 list_del(&peer_req->w.list);
2059 drbd_remove_epoch_entry_interval(mdev, peer_req);
87eeee41 2060 spin_unlock_irq(&mdev->tconn->req_lock);
db830c46
AG
2061 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2062 drbd_al_complete_io(mdev, peer_req->i.sector);
22cc37a9 2063
b411b363 2064out_interrupted:
db830c46 2065 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
b411b363 2066 put_ldev(mdev);
db830c46 2067 drbd_free_ee(mdev, peer_req);
82bc0194 2068 return err;
b411b363
PR
2069}
2070
0f0601f4
LE
2071/* We may throttle resync, if the lower device seems to be busy,
2072 * and current sync rate is above c_min_rate.
2073 *
2074 * To decide whether or not the lower device is busy, we use a scheme similar
2075 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2076 * (more than 64 sectors) of activity we cannot account for with our own resync
2077 * activity, it obviously is "busy".
2078 *
2079 * The current sync rate used here uses only the most recent two step marks,
2080 * to have a short time average so we can react faster.
2081 */
e3555d85 2082int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
0f0601f4
LE
2083{
2084 struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2085 unsigned long db, dt, dbdt;
e3555d85 2086 struct lc_element *tmp;
0f0601f4
LE
2087 int curr_events;
2088 int throttle = 0;
2089
2090 /* feature disabled? */
f399002e 2091 if (mdev->ldev->dc.c_min_rate == 0)
0f0601f4
LE
2092 return 0;
2093
e3555d85
PR
2094 spin_lock_irq(&mdev->al_lock);
2095 tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2096 if (tmp) {
2097 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2098 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2099 spin_unlock_irq(&mdev->al_lock);
2100 return 0;
2101 }
2102 /* Do not slow down if app IO is already waiting for this extent */
2103 }
2104 spin_unlock_irq(&mdev->al_lock);
2105
0f0601f4
LE
2106 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2107 (int)part_stat_read(&disk->part0, sectors[1]) -
2108 atomic_read(&mdev->rs_sect_ev);
e3555d85 2109
0f0601f4
LE
2110 if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2111 unsigned long rs_left;
2112 int i;
2113
2114 mdev->rs_last_events = curr_events;
2115
2116 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2117 * approx. */
2649f080
LE
2118 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2119
2120 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2121 rs_left = mdev->ov_left;
2122 else
2123 rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
0f0601f4
LE
2124
2125 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2126 if (!dt)
2127 dt++;
2128 db = mdev->rs_mark_left[i] - rs_left;
2129 dbdt = Bit2KB(db/dt);
2130
f399002e 2131 if (dbdt > mdev->ldev->dc.c_min_rate)
0f0601f4
LE
2132 throttle = 1;
2133 }
2134 return throttle;
2135}
2136
2137
e2857216 2138static int receive_DataRequest(struct drbd_conf *mdev, struct packet_info *pi)
b411b363
PR
2139{
2140 sector_t sector;
2141 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
db830c46 2142 struct drbd_peer_request *peer_req;
b411b363 2143 struct digest_info *di = NULL;
b18b37be 2144 int size, verb;
b411b363 2145 unsigned int fault_type;
e6ef8a5c 2146 struct p_block_req *p = mdev->tconn->data.rbuf;
b411b363
PR
2147
2148 sector = be64_to_cpu(p->sector);
2149 size = be32_to_cpu(p->blksize);
2150
c670a398 2151 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
b411b363
PR
2152 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2153 (unsigned long long)sector, size);
82bc0194 2154 return -EINVAL;
b411b363
PR
2155 }
2156 if (sector + (size>>9) > capacity) {
2157 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2158 (unsigned long long)sector, size);
82bc0194 2159 return -EINVAL;
b411b363
PR
2160 }
2161
2162 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
b18b37be 2163 verb = 1;
e2857216 2164 switch (pi->cmd) {
b18b37be
PR
2165 case P_DATA_REQUEST:
2166 drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2167 break;
2168 case P_RS_DATA_REQUEST:
2169 case P_CSUM_RS_REQUEST:
2170 case P_OV_REQUEST:
2171 drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2172 break;
2173 case P_OV_REPLY:
2174 verb = 0;
2175 dec_rs_pending(mdev);
2176 drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2177 break;
2178 default:
49ba9b1b 2179 BUG();
b18b37be
PR
2180 }
2181 if (verb && __ratelimit(&drbd_ratelimit_state))
b411b363
PR
2182 dev_err(DEV, "Can not satisfy peer's read request, "
2183 "no local data.\n");
b18b37be 2184
a821cc4a 2185 /* drain possibly payload */
e2857216 2186 return drbd_drain_block(mdev, pi->size);
b411b363
PR
2187 }
2188
2189 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2190 * "criss-cross" setup, that might cause write-out on some other DRBD,
2191 * which in turn might block on the other node at this very place. */
db830c46
AG
2192 peer_req = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2193 if (!peer_req) {
b411b363 2194 put_ldev(mdev);
82bc0194 2195 return -ENOMEM;
b411b363
PR
2196 }
2197
e2857216 2198 switch (pi->cmd) {
b411b363 2199 case P_DATA_REQUEST:
db830c46 2200 peer_req->w.cb = w_e_end_data_req;
b411b363 2201 fault_type = DRBD_FAULT_DT_RD;
80a40e43
LE
2202 /* application IO, don't drbd_rs_begin_io */
2203 goto submit;
2204
b411b363 2205 case P_RS_DATA_REQUEST:
db830c46 2206 peer_req->w.cb = w_e_end_rsdata_req;
b411b363 2207 fault_type = DRBD_FAULT_RS_RD;
5f9915bb
LE
2208 /* used in the sector offset progress display */
2209 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
b411b363
PR
2210 break;
2211
2212 case P_OV_REPLY:
2213 case P_CSUM_RS_REQUEST:
2214 fault_type = DRBD_FAULT_RS_RD;
e2857216 2215 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
b411b363
PR
2216 if (!di)
2217 goto out_free_e;
2218
e2857216 2219 di->digest_size = pi->size;
b411b363
PR
2220 di->digest = (((char *)di)+sizeof(struct digest_info));
2221
db830c46
AG
2222 peer_req->digest = di;
2223 peer_req->flags |= EE_HAS_DIGEST;
c36c3ced 2224
e2857216 2225 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
b411b363
PR
2226 goto out_free_e;
2227
e2857216 2228 if (pi->cmd == P_CSUM_RS_REQUEST) {
31890f4a 2229 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
db830c46 2230 peer_req->w.cb = w_e_end_csum_rs_req;
5f9915bb
LE
2231 /* used in the sector offset progress display */
2232 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
e2857216 2233 } else if (pi->cmd == P_OV_REPLY) {
2649f080
LE
2234 /* track progress, we may need to throttle */
2235 atomic_add(size >> 9, &mdev->rs_sect_in);
db830c46 2236 peer_req->w.cb = w_e_end_ov_reply;
b411b363 2237 dec_rs_pending(mdev);
0f0601f4
LE
2238 /* drbd_rs_begin_io done when we sent this request,
2239 * but accounting still needs to be done. */
2240 goto submit_for_resync;
b411b363
PR
2241 }
2242 break;
2243
2244 case P_OV_REQUEST:
b411b363 2245 if (mdev->ov_start_sector == ~(sector_t)0 &&
31890f4a 2246 mdev->tconn->agreed_pro_version >= 90) {
de228bba
LE
2247 unsigned long now = jiffies;
2248 int i;
b411b363
PR
2249 mdev->ov_start_sector = sector;
2250 mdev->ov_position = sector;
30b743a2
LE
2251 mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2252 mdev->rs_total = mdev->ov_left;
de228bba
LE
2253 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2254 mdev->rs_mark_left[i] = mdev->ov_left;
2255 mdev->rs_mark_time[i] = now;
2256 }
b411b363
PR
2257 dev_info(DEV, "Online Verify start sector: %llu\n",
2258 (unsigned long long)sector);
2259 }
db830c46 2260 peer_req->w.cb = w_e_end_ov_req;
b411b363 2261 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2262 break;
2263
b411b363 2264 default:
49ba9b1b 2265 BUG();
b411b363
PR
2266 }
2267
0f0601f4
LE
2268 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2269 * wrt the receiver, but it is not as straightforward as it may seem.
2270 * Various places in the resync start and stop logic assume resync
2271 * requests are processed in order, requeuing this on the worker thread
2272 * introduces a bunch of new code for synchronization between threads.
2273 *
2274 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2275 * "forever", throttling after drbd_rs_begin_io will lock that extent
2276 * for application writes for the same time. For now, just throttle
2277 * here, where the rest of the code expects the receiver to sleep for
2278 * a while, anyways.
2279 */
2280
2281 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2282 * this defers syncer requests for some time, before letting at least
2283 * on request through. The resync controller on the receiving side
2284 * will adapt to the incoming rate accordingly.
2285 *
2286 * We cannot throttle here if remote is Primary/SyncTarget:
2287 * we would also throttle its application reads.
2288 * In that case, throttling is done on the SyncTarget only.
2289 */
e3555d85
PR
2290 if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2291 schedule_timeout_uninterruptible(HZ/10);
2292 if (drbd_rs_begin_io(mdev, sector))
80a40e43 2293 goto out_free_e;
b411b363 2294
0f0601f4
LE
2295submit_for_resync:
2296 atomic_add(size >> 9, &mdev->rs_sect_ev);
2297
80a40e43 2298submit:
b411b363 2299 inc_unacked(mdev);
87eeee41 2300 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2301 list_add_tail(&peer_req->w.list, &mdev->read_ee);
87eeee41 2302 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 2303
fbe29dec 2304 if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
82bc0194 2305 return 0;
b411b363 2306
10f6d992
LE
2307 /* don't care for the reason here */
2308 dev_err(DEV, "submit failed, triggering re-connect\n");
87eeee41 2309 spin_lock_irq(&mdev->tconn->req_lock);
db830c46 2310 list_del(&peer_req->w.list);
87eeee41 2311 spin_unlock_irq(&mdev->tconn->req_lock);
22cc37a9
LE
2312 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2313
b411b363 2314out_free_e:
b411b363 2315 put_ldev(mdev);
db830c46 2316 drbd_free_ee(mdev, peer_req);
82bc0194 2317 return -EIO;
b411b363
PR
2318}
2319
2320static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2321{
2322 int self, peer, rv = -100;
2323 unsigned long ch_self, ch_peer;
2324
2325 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2326 peer = mdev->p_uuid[UI_BITMAP] & 1;
2327
2328 ch_peer = mdev->p_uuid[UI_SIZE];
2329 ch_self = mdev->comm_bm_set;
2330
89e58e75 2331 switch (mdev->tconn->net_conf->after_sb_0p) {
b411b363
PR
2332 case ASB_CONSENSUS:
2333 case ASB_DISCARD_SECONDARY:
2334 case ASB_CALL_HELPER:
2335 dev_err(DEV, "Configuration error.\n");
2336 break;
2337 case ASB_DISCONNECT:
2338 break;
2339 case ASB_DISCARD_YOUNGER_PRI:
2340 if (self == 0 && peer == 1) {
2341 rv = -1;
2342 break;
2343 }
2344 if (self == 1 && peer == 0) {
2345 rv = 1;
2346 break;
2347 }
2348 /* Else fall through to one of the other strategies... */
2349 case ASB_DISCARD_OLDER_PRI:
2350 if (self == 0 && peer == 1) {
2351 rv = 1;
2352 break;
2353 }
2354 if (self == 1 && peer == 0) {
2355 rv = -1;
2356 break;
2357 }
2358 /* Else fall through to one of the other strategies... */
ad19bf6e 2359 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
b411b363
PR
2360 "Using discard-least-changes instead\n");
2361 case ASB_DISCARD_ZERO_CHG:
2362 if (ch_peer == 0 && ch_self == 0) {
25703f83 2363 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
b411b363
PR
2364 ? -1 : 1;
2365 break;
2366 } else {
2367 if (ch_peer == 0) { rv = 1; break; }
2368 if (ch_self == 0) { rv = -1; break; }
2369 }
89e58e75 2370 if (mdev->tconn->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
b411b363
PR
2371 break;
2372 case ASB_DISCARD_LEAST_CHG:
2373 if (ch_self < ch_peer)
2374 rv = -1;
2375 else if (ch_self > ch_peer)
2376 rv = 1;
2377 else /* ( ch_self == ch_peer ) */
2378 /* Well, then use something else. */
25703f83 2379 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
b411b363
PR
2380 ? -1 : 1;
2381 break;
2382 case ASB_DISCARD_LOCAL:
2383 rv = -1;
2384 break;
2385 case ASB_DISCARD_REMOTE:
2386 rv = 1;
2387 }
2388
2389 return rv;
2390}
2391
2392static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2393{
6184ea21 2394 int hg, rv = -100;
b411b363 2395
89e58e75 2396 switch (mdev->tconn->net_conf->after_sb_1p) {
b411b363
PR
2397 case ASB_DISCARD_YOUNGER_PRI:
2398 case ASB_DISCARD_OLDER_PRI:
2399 case ASB_DISCARD_LEAST_CHG:
2400 case ASB_DISCARD_LOCAL:
2401 case ASB_DISCARD_REMOTE:
2402 dev_err(DEV, "Configuration error.\n");
2403 break;
2404 case ASB_DISCONNECT:
2405 break;
2406 case ASB_CONSENSUS:
2407 hg = drbd_asb_recover_0p(mdev);
2408 if (hg == -1 && mdev->state.role == R_SECONDARY)
2409 rv = hg;
2410 if (hg == 1 && mdev->state.role == R_PRIMARY)
2411 rv = hg;
2412 break;
2413 case ASB_VIOLENTLY:
2414 rv = drbd_asb_recover_0p(mdev);
2415 break;
2416 case ASB_DISCARD_SECONDARY:
2417 return mdev->state.role == R_PRIMARY ? 1 : -1;
2418 case ASB_CALL_HELPER:
2419 hg = drbd_asb_recover_0p(mdev);
2420 if (hg == -1 && mdev->state.role == R_PRIMARY) {
bb437946
AG
2421 enum drbd_state_rv rv2;
2422
2423 drbd_set_role(mdev, R_SECONDARY, 0);
b411b363
PR
2424 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2425 * we might be here in C_WF_REPORT_PARAMS which is transient.
2426 * we do not need to wait for the after state change work either. */
bb437946
AG
2427 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2428 if (rv2 != SS_SUCCESS) {
b411b363
PR
2429 drbd_khelper(mdev, "pri-lost-after-sb");
2430 } else {
2431 dev_warn(DEV, "Successfully gave up primary role.\n");
2432 rv = hg;
2433 }
2434 } else
2435 rv = hg;
2436 }
2437
2438 return rv;
2439}
2440
2441static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2442{
6184ea21 2443 int hg, rv = -100;
b411b363 2444
89e58e75 2445 switch (mdev->tconn->net_conf->after_sb_2p) {
b411b363
PR
2446 case ASB_DISCARD_YOUNGER_PRI:
2447 case ASB_DISCARD_OLDER_PRI:
2448 case ASB_DISCARD_LEAST_CHG:
2449 case ASB_DISCARD_LOCAL:
2450 case ASB_DISCARD_REMOTE:
2451 case ASB_CONSENSUS:
2452 case ASB_DISCARD_SECONDARY:
2453 dev_err(DEV, "Configuration error.\n");
2454 break;
2455 case ASB_VIOLENTLY:
2456 rv = drbd_asb_recover_0p(mdev);
2457 break;
2458 case ASB_DISCONNECT:
2459 break;
2460 case ASB_CALL_HELPER:
2461 hg = drbd_asb_recover_0p(mdev);
2462 if (hg == -1) {
bb437946
AG
2463 enum drbd_state_rv rv2;
2464
b411b363
PR
2465 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2466 * we might be here in C_WF_REPORT_PARAMS which is transient.
2467 * we do not need to wait for the after state change work either. */
bb437946
AG
2468 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2469 if (rv2 != SS_SUCCESS) {
b411b363
PR
2470 drbd_khelper(mdev, "pri-lost-after-sb");
2471 } else {
2472 dev_warn(DEV, "Successfully gave up primary role.\n");
2473 rv = hg;
2474 }
2475 } else
2476 rv = hg;
2477 }
2478
2479 return rv;
2480}
2481
2482static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2483 u64 bits, u64 flags)
2484{
2485 if (!uuid) {
2486 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2487 return;
2488 }
2489 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2490 text,
2491 (unsigned long long)uuid[UI_CURRENT],
2492 (unsigned long long)uuid[UI_BITMAP],
2493 (unsigned long long)uuid[UI_HISTORY_START],
2494 (unsigned long long)uuid[UI_HISTORY_END],
2495 (unsigned long long)bits,
2496 (unsigned long long)flags);
2497}
2498
2499/*
2500 100 after split brain try auto recover
2501 2 C_SYNC_SOURCE set BitMap
2502 1 C_SYNC_SOURCE use BitMap
2503 0 no Sync
2504 -1 C_SYNC_TARGET use BitMap
2505 -2 C_SYNC_TARGET set BitMap
2506 -100 after split brain, disconnect
2507-1000 unrelated data
4a23f264
PR
2508-1091 requires proto 91
2509-1096 requires proto 96
b411b363
PR
2510 */
2511static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2512{
2513 u64 self, peer;
2514 int i, j;
2515
2516 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2517 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2518
2519 *rule_nr = 10;
2520 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2521 return 0;
2522
2523 *rule_nr = 20;
2524 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2525 peer != UUID_JUST_CREATED)
2526 return -2;
2527
2528 *rule_nr = 30;
2529 if (self != UUID_JUST_CREATED &&
2530 (peer == UUID_JUST_CREATED || peer == (u64)0))
2531 return 2;
2532
2533 if (self == peer) {
2534 int rct, dc; /* roles at crash time */
2535
2536 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2537
31890f4a 2538 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2539 return -1091;
b411b363
PR
2540
2541 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2542 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2543 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2544 drbd_uuid_set_bm(mdev, 0UL);
2545
2546 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2547 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2548 *rule_nr = 34;
2549 } else {
2550 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2551 *rule_nr = 36;
2552 }
2553
2554 return 1;
2555 }
2556
2557 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2558
31890f4a 2559 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2560 return -1091;
b411b363
PR
2561
2562 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2563 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2564 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2565
2566 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2567 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2568 mdev->p_uuid[UI_BITMAP] = 0UL;
2569
2570 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2571 *rule_nr = 35;
2572 } else {
2573 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2574 *rule_nr = 37;
2575 }
2576
2577 return -1;
2578 }
2579
2580 /* Common power [off|failure] */
2581 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2582 (mdev->p_uuid[UI_FLAGS] & 2);
2583 /* lowest bit is set when we were primary,
2584 * next bit (weight 2) is set when peer was primary */
2585 *rule_nr = 40;
2586
2587 switch (rct) {
2588 case 0: /* !self_pri && !peer_pri */ return 0;
2589 case 1: /* self_pri && !peer_pri */ return 1;
2590 case 2: /* !self_pri && peer_pri */ return -1;
2591 case 3: /* self_pri && peer_pri */
25703f83 2592 dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
b411b363
PR
2593 return dc ? -1 : 1;
2594 }
2595 }
2596
2597 *rule_nr = 50;
2598 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2599 if (self == peer)
2600 return -1;
2601
2602 *rule_nr = 51;
2603 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2604 if (self == peer) {
31890f4a 2605 if (mdev->tconn->agreed_pro_version < 96 ?
4a23f264
PR
2606 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2607 (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2608 peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2609 /* The last P_SYNC_UUID did not get though. Undo the last start of
2610 resync as sync source modifications of the peer's UUIDs. */
2611
31890f4a 2612 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2613 return -1091;
b411b363
PR
2614
2615 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2616 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
4a23f264
PR
2617
2618 dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2619 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2620
b411b363
PR
2621 return -1;
2622 }
2623 }
2624
2625 *rule_nr = 60;
2626 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2627 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2628 peer = mdev->p_uuid[i] & ~((u64)1);
2629 if (self == peer)
2630 return -2;
2631 }
2632
2633 *rule_nr = 70;
2634 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2635 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2636 if (self == peer)
2637 return 1;
2638
2639 *rule_nr = 71;
2640 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2641 if (self == peer) {
31890f4a 2642 if (mdev->tconn->agreed_pro_version < 96 ?
4a23f264
PR
2643 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2644 (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2645 self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2646 /* The last P_SYNC_UUID did not get though. Undo the last start of
2647 resync as sync source modifications of our UUIDs. */
2648
31890f4a 2649 if (mdev->tconn->agreed_pro_version < 91)
4a23f264 2650 return -1091;
b411b363
PR
2651
2652 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2653 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2654
4a23f264 2655 dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
b411b363
PR
2656 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2657 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2658
2659 return 1;
2660 }
2661 }
2662
2663
2664 *rule_nr = 80;
d8c2a36b 2665 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
b411b363
PR
2666 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2667 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2668 if (self == peer)
2669 return 2;
2670 }
2671
2672 *rule_nr = 90;
2673 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2674 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2675 if (self == peer && self != ((u64)0))
2676 return 100;
2677
2678 *rule_nr = 100;
2679 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2680 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2681 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2682 peer = mdev->p_uuid[j] & ~((u64)1);
2683 if (self == peer)
2684 return -100;
2685 }
2686 }
2687
2688 return -1000;
2689}
2690
2691/* drbd_sync_handshake() returns the new conn state on success, or
2692 CONN_MASK (-1) on failure.
2693 */
2694static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2695 enum drbd_disk_state peer_disk) __must_hold(local)
2696{
2697 int hg, rule_nr;
2698 enum drbd_conns rv = C_MASK;
2699 enum drbd_disk_state mydisk;
2700
2701 mydisk = mdev->state.disk;
2702 if (mydisk == D_NEGOTIATING)
2703 mydisk = mdev->new_state_tmp.disk;
2704
2705 dev_info(DEV, "drbd_sync_handshake:\n");
2706 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2707 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2708 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2709
2710 hg = drbd_uuid_compare(mdev, &rule_nr);
2711
2712 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2713
2714 if (hg == -1000) {
2715 dev_alert(DEV, "Unrelated data, aborting!\n");
2716 return C_MASK;
2717 }
4a23f264
PR
2718 if (hg < -1000) {
2719 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
b411b363
PR
2720 return C_MASK;
2721 }
2722
2723 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2724 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2725 int f = (hg == -100) || abs(hg) == 2;
2726 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2727 if (f)
2728 hg = hg*2;
2729 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2730 hg > 0 ? "source" : "target");
2731 }
2732
3a11a487
AG
2733 if (abs(hg) == 100)
2734 drbd_khelper(mdev, "initial-split-brain");
2735
89e58e75 2736 if (hg == 100 || (hg == -100 && mdev->tconn->net_conf->always_asbp)) {
b411b363
PR
2737 int pcount = (mdev->state.role == R_PRIMARY)
2738 + (peer_role == R_PRIMARY);
2739 int forced = (hg == -100);
2740
2741 switch (pcount) {
2742 case 0:
2743 hg = drbd_asb_recover_0p(mdev);
2744 break;
2745 case 1:
2746 hg = drbd_asb_recover_1p(mdev);
2747 break;
2748 case 2:
2749 hg = drbd_asb_recover_2p(mdev);
2750 break;
2751 }
2752 if (abs(hg) < 100) {
2753 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2754 "automatically solved. Sync from %s node\n",
2755 pcount, (hg < 0) ? "peer" : "this");
2756 if (forced) {
2757 dev_warn(DEV, "Doing a full sync, since"
2758 " UUIDs where ambiguous.\n");
2759 hg = hg*2;
2760 }
2761 }
2762 }
2763
2764 if (hg == -100) {
89e58e75 2765 if (mdev->tconn->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
b411b363 2766 hg = -1;
89e58e75 2767 if (!mdev->tconn->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
b411b363
PR
2768 hg = 1;
2769
2770 if (abs(hg) < 100)
2771 dev_warn(DEV, "Split-Brain detected, manually solved. "
2772 "Sync from %s node\n",
2773 (hg < 0) ? "peer" : "this");
2774 }
2775
2776 if (hg == -100) {
580b9767
LE
2777 /* FIXME this log message is not correct if we end up here
2778 * after an attempted attach on a diskless node.
2779 * We just refuse to attach -- well, we drop the "connection"
2780 * to that disk, in a way... */
3a11a487 2781 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
b411b363
PR
2782 drbd_khelper(mdev, "split-brain");
2783 return C_MASK;
2784 }
2785
2786 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2787 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2788 return C_MASK;
2789 }
2790
2791 if (hg < 0 && /* by intention we do not use mydisk here. */
2792 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
89e58e75 2793 switch (mdev->tconn->net_conf->rr_conflict) {
b411b363
PR
2794 case ASB_CALL_HELPER:
2795 drbd_khelper(mdev, "pri-lost");
2796 /* fall through */
2797 case ASB_DISCONNECT:
2798 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2799 return C_MASK;
2800 case ASB_VIOLENTLY:
2801 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2802 "assumption\n");
2803 }
2804 }
2805
8169e41b 2806 if (mdev->tconn->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
cf14c2e9
PR
2807 if (hg == 0)
2808 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2809 else
2810 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2811 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2812 abs(hg) >= 2 ? "full" : "bit-map based");
2813 return C_MASK;
2814 }
2815
b411b363
PR
2816 if (abs(hg) >= 2) {
2817 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
20ceb2b2
LE
2818 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2819 BM_LOCKED_SET_ALLOWED))
b411b363
PR
2820 return C_MASK;
2821 }
2822
2823 if (hg > 0) { /* become sync source. */
2824 rv = C_WF_BITMAP_S;
2825 } else if (hg < 0) { /* become sync target */
2826 rv = C_WF_BITMAP_T;
2827 } else {
2828 rv = C_CONNECTED;
2829 if (drbd_bm_total_weight(mdev)) {
2830 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2831 drbd_bm_total_weight(mdev));
2832 }
2833 }
2834
2835 return rv;
2836}
2837
2838/* returns 1 if invalid */
2839static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2840{
2841 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2842 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2843 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2844 return 0;
2845
2846 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2847 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2848 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2849 return 1;
2850
2851 /* everything else is valid if they are equal on both sides. */
2852 if (peer == self)
2853 return 0;
2854
2855 /* everything es is invalid. */
2856 return 1;
2857}
2858
e2857216 2859static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 2860{
e6ef8a5c 2861 struct p_protocol *p = tconn->data.rbuf;
b411b363 2862 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
cf14c2e9 2863 int p_want_lose, p_two_primaries, cf;
b411b363
PR
2864 char p_integrity_alg[SHARED_SECRET_MAX] = "";
2865
b411b363
PR
2866 p_proto = be32_to_cpu(p->protocol);
2867 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
2868 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
2869 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
b411b363 2870 p_two_primaries = be32_to_cpu(p->two_primaries);
cf14c2e9
PR
2871 cf = be32_to_cpu(p->conn_flags);
2872 p_want_lose = cf & CF_WANT_LOSE;
2873
7204624c 2874 clear_bit(CONN_DRY_RUN, &tconn->flags);
cf14c2e9
PR
2875
2876 if (cf & CF_DRY_RUN)
7204624c 2877 set_bit(CONN_DRY_RUN, &tconn->flags);
b411b363 2878
7204624c
PR
2879 if (p_proto != tconn->net_conf->wire_protocol) {
2880 conn_err(tconn, "incompatible communication protocols\n");
b411b363
PR
2881 goto disconnect;
2882 }
2883
7204624c
PR
2884 if (cmp_after_sb(p_after_sb_0p, tconn->net_conf->after_sb_0p)) {
2885 conn_err(tconn, "incompatible after-sb-0pri settings\n");
b411b363
PR
2886 goto disconnect;
2887 }
2888
7204624c
PR
2889 if (cmp_after_sb(p_after_sb_1p, tconn->net_conf->after_sb_1p)) {
2890 conn_err(tconn, "incompatible after-sb-1pri settings\n");
b411b363
PR
2891 goto disconnect;
2892 }
2893
7204624c
PR
2894 if (cmp_after_sb(p_after_sb_2p, tconn->net_conf->after_sb_2p)) {
2895 conn_err(tconn, "incompatible after-sb-2pri settings\n");
b411b363
PR
2896 goto disconnect;
2897 }
2898
7204624c
PR
2899 if (p_want_lose && tconn->net_conf->want_lose) {
2900 conn_err(tconn, "both sides have the 'want_lose' flag set\n");
b411b363
PR
2901 goto disconnect;
2902 }
2903
7204624c
PR
2904 if (p_two_primaries != tconn->net_conf->two_primaries) {
2905 conn_err(tconn, "incompatible setting of the two-primaries options\n");
b411b363
PR
2906 goto disconnect;
2907 }
2908
7204624c
PR
2909 if (tconn->agreed_pro_version >= 87) {
2910 unsigned char *my_alg = tconn->net_conf->integrity_alg;
82bc0194 2911 int err;
b411b363 2912
e2857216 2913 err = drbd_recv_all(tconn, p_integrity_alg, pi->size);
82bc0194
AG
2914 if (err)
2915 return err;
b411b363
PR
2916
2917 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2918 if (strcmp(p_integrity_alg, my_alg)) {
7204624c 2919 conn_err(tconn, "incompatible setting of the data-integrity-alg\n");
b411b363
PR
2920 goto disconnect;
2921 }
7204624c 2922 conn_info(tconn, "data-integrity-alg: %s\n",
b411b363
PR
2923 my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2924 }
2925
82bc0194 2926 return 0;
b411b363
PR
2927
2928disconnect:
7204624c 2929 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 2930 return -EIO;
b411b363
PR
2931}
2932
2933/* helper function
2934 * input: alg name, feature name
2935 * return: NULL (alg name was "")
2936 * ERR_PTR(error) if something goes wrong
2937 * or the crypto hash ptr, if it worked out ok. */
2938struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2939 const char *alg, const char *name)
2940{
2941 struct crypto_hash *tfm;
2942
2943 if (!alg[0])
2944 return NULL;
2945
2946 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2947 if (IS_ERR(tfm)) {
2948 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2949 alg, name, PTR_ERR(tfm));
2950 return tfm;
2951 }
2952 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2953 crypto_free_hash(tfm);
2954 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2955 return ERR_PTR(-EINVAL);
2956 }
2957 return tfm;
2958}
2959
e2857216 2960static int receive_SyncParam(struct drbd_conf *mdev, struct packet_info *pi)
b411b363 2961{
e6ef8a5c 2962 struct p_rs_param_95 *p = mdev->tconn->data.rbuf;
b411b363
PR
2963 unsigned int header_size, data_size, exp_max_sz;
2964 struct crypto_hash *verify_tfm = NULL;
2965 struct crypto_hash *csums_tfm = NULL;
31890f4a 2966 const int apv = mdev->tconn->agreed_pro_version;
778f271d
PR
2967 int *rs_plan_s = NULL;
2968 int fifo_size = 0;
82bc0194 2969 int err;
b411b363
PR
2970
2971 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
2972 : apv == 88 ? sizeof(struct p_rs_param)
2973 + SHARED_SECRET_MAX
8e26f9cc
PR
2974 : apv <= 94 ? sizeof(struct p_rs_param_89)
2975 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
b411b363 2976
e2857216 2977 if (pi->size > exp_max_sz) {
b411b363 2978 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
e2857216 2979 pi->size, exp_max_sz);
82bc0194 2980 return -EIO;
b411b363
PR
2981 }
2982
2983 if (apv <= 88) {
257d0af6 2984 header_size = sizeof(struct p_rs_param) - sizeof(struct p_header);
e2857216 2985 data_size = pi->size - header_size;
8e26f9cc 2986 } else if (apv <= 94) {
257d0af6 2987 header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header);
e2857216 2988 data_size = pi->size - header_size;
b411b363 2989 D_ASSERT(data_size == 0);
8e26f9cc 2990 } else {
257d0af6 2991 header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header);
e2857216 2992 data_size = pi->size - header_size;
b411b363
PR
2993 D_ASSERT(data_size == 0);
2994 }
2995
2996 /* initialize verify_alg and csums_alg */
2997 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2998
82bc0194
AG
2999 err = drbd_recv_all(mdev->tconn, &p->head.payload, header_size);
3000 if (err)
3001 return err;
b411b363 3002
f399002e
LE
3003 if (get_ldev(mdev)) {
3004 mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3005 put_ldev(mdev);
3006 }
b411b363
PR
3007
3008 if (apv >= 88) {
3009 if (apv == 88) {
3010 if (data_size > SHARED_SECRET_MAX) {
3011 dev_err(DEV, "verify-alg too long, "
3012 "peer wants %u, accepting only %u byte\n",
3013 data_size, SHARED_SECRET_MAX);
82bc0194 3014 return -EIO;
b411b363
PR
3015 }
3016
82bc0194
AG
3017 err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3018 if (err)
3019 return err;
b411b363
PR
3020
3021 /* we expect NUL terminated string */
3022 /* but just in case someone tries to be evil */
3023 D_ASSERT(p->verify_alg[data_size-1] == 0);
3024 p->verify_alg[data_size-1] = 0;
3025
3026 } else /* apv >= 89 */ {
3027 /* we still expect NUL terminated strings */
3028 /* but just in case someone tries to be evil */
3029 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3030 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3031 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3032 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3033 }
3034
f399002e 3035 if (strcmp(mdev->tconn->net_conf->verify_alg, p->verify_alg)) {
b411b363
PR
3036 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3037 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
f399002e 3038 mdev->tconn->net_conf->verify_alg, p->verify_alg);
b411b363
PR
3039 goto disconnect;
3040 }
3041 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3042 p->verify_alg, "verify-alg");
3043 if (IS_ERR(verify_tfm)) {
3044 verify_tfm = NULL;
3045 goto disconnect;
3046 }
3047 }
3048
f399002e 3049 if (apv >= 89 && strcmp(mdev->tconn->net_conf->csums_alg, p->csums_alg)) {
b411b363
PR
3050 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3051 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
f399002e 3052 mdev->tconn->net_conf->csums_alg, p->csums_alg);
b411b363
PR
3053 goto disconnect;
3054 }
3055 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3056 p->csums_alg, "csums-alg");
3057 if (IS_ERR(csums_tfm)) {
3058 csums_tfm = NULL;
3059 goto disconnect;
3060 }
3061 }
3062
f399002e
LE
3063 if (apv > 94 && get_ldev(mdev)) {
3064 mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3065 mdev->ldev->dc.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3066 mdev->ldev->dc.c_delay_target = be32_to_cpu(p->c_delay_target);
3067 mdev->ldev->dc.c_fill_target = be32_to_cpu(p->c_fill_target);
3068 mdev->ldev->dc.c_max_rate = be32_to_cpu(p->c_max_rate);
778f271d 3069
f399002e 3070 fifo_size = (mdev->ldev->dc.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
778f271d
PR
3071 if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
3072 rs_plan_s = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
3073 if (!rs_plan_s) {
3074 dev_err(DEV, "kmalloc of fifo_buffer failed");
f399002e 3075 put_ldev(mdev);
778f271d
PR
3076 goto disconnect;
3077 }
3078 }
f399002e 3079 put_ldev(mdev);
8e26f9cc 3080 }
b411b363
PR
3081
3082 spin_lock(&mdev->peer_seq_lock);
3083 /* lock against drbd_nl_syncer_conf() */
3084 if (verify_tfm) {
f399002e
LE
3085 strcpy(mdev->tconn->net_conf->verify_alg, p->verify_alg);
3086 mdev->tconn->net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3087 crypto_free_hash(mdev->tconn->verify_tfm);
3088 mdev->tconn->verify_tfm = verify_tfm;
b411b363
PR
3089 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3090 }
3091 if (csums_tfm) {
f399002e
LE
3092 strcpy(mdev->tconn->net_conf->csums_alg, p->csums_alg);
3093 mdev->tconn->net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3094 crypto_free_hash(mdev->tconn->csums_tfm);
3095 mdev->tconn->csums_tfm = csums_tfm;
b411b363
PR
3096 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3097 }
778f271d
PR
3098 if (fifo_size != mdev->rs_plan_s.size) {
3099 kfree(mdev->rs_plan_s.values);
3100 mdev->rs_plan_s.values = rs_plan_s;
3101 mdev->rs_plan_s.size = fifo_size;
3102 mdev->rs_planed = 0;
3103 }
b411b363
PR
3104 spin_unlock(&mdev->peer_seq_lock);
3105 }
82bc0194 3106 return 0;
b411b363 3107
b411b363
PR
3108disconnect:
3109 /* just for completeness: actually not needed,
3110 * as this is not reached if csums_tfm was ok. */
3111 crypto_free_hash(csums_tfm);
3112 /* but free the verify_tfm again, if csums_tfm did not work out */
3113 crypto_free_hash(verify_tfm);
38fa9988 3114 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 3115 return -EIO;
b411b363
PR
3116}
3117
b411b363
PR
3118/* warn if the arguments differ by more than 12.5% */
3119static void warn_if_differ_considerably(struct drbd_conf *mdev,
3120 const char *s, sector_t a, sector_t b)
3121{
3122 sector_t d;
3123 if (a == 0 || b == 0)
3124 return;
3125 d = (a > b) ? (a - b) : (b - a);
3126 if (d > (a>>3) || d > (b>>3))
3127 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3128 (unsigned long long)a, (unsigned long long)b);
3129}
3130
e2857216 3131static int receive_sizes(struct drbd_conf *mdev, struct packet_info *pi)
b411b363 3132{
e6ef8a5c 3133 struct p_sizes *p = mdev->tconn->data.rbuf;
b411b363 3134 enum determine_dev_size dd = unchanged;
b411b363
PR
3135 sector_t p_size, p_usize, my_usize;
3136 int ldsc = 0; /* local disk size changed */
e89b591c 3137 enum dds_flags ddsf;
b411b363 3138
b411b363
PR
3139 p_size = be64_to_cpu(p->d_size);
3140 p_usize = be64_to_cpu(p->u_size);
3141
b411b363
PR
3142 /* just store the peer's disk size for now.
3143 * we still need to figure out whether we accept that. */
3144 mdev->p_size = p_size;
3145
b411b363
PR
3146 if (get_ldev(mdev)) {
3147 warn_if_differ_considerably(mdev, "lower level device sizes",
3148 p_size, drbd_get_max_capacity(mdev->ldev));
3149 warn_if_differ_considerably(mdev, "user requested size",
3150 p_usize, mdev->ldev->dc.disk_size);
3151
3152 /* if this is the first connect, or an otherwise expected
3153 * param exchange, choose the minimum */
3154 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3155 p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3156 p_usize);
3157
3158 my_usize = mdev->ldev->dc.disk_size;
3159
3160 if (mdev->ldev->dc.disk_size != p_usize) {
3161 mdev->ldev->dc.disk_size = p_usize;
3162 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3163 (unsigned long)mdev->ldev->dc.disk_size);
3164 }
3165
3166 /* Never shrink a device with usable data during connect.
3167 But allow online shrinking if we are connected. */
a393db6f 3168 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
b411b363
PR
3169 drbd_get_capacity(mdev->this_bdev) &&
3170 mdev->state.disk >= D_OUTDATED &&
3171 mdev->state.conn < C_CONNECTED) {
3172 dev_err(DEV, "The peer's disk size is too small!\n");
38fa9988 3173 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
3174 mdev->ldev->dc.disk_size = my_usize;
3175 put_ldev(mdev);
82bc0194 3176 return -EIO;
b411b363
PR
3177 }
3178 put_ldev(mdev);
3179 }
b411b363 3180
e89b591c 3181 ddsf = be16_to_cpu(p->dds_flags);
b411b363 3182 if (get_ldev(mdev)) {
24c4830c 3183 dd = drbd_determine_dev_size(mdev, ddsf);
b411b363
PR
3184 put_ldev(mdev);
3185 if (dd == dev_size_error)
82bc0194 3186 return -EIO;
b411b363
PR
3187 drbd_md_sync(mdev);
3188 } else {
3189 /* I am diskless, need to accept the peer's size. */
3190 drbd_set_my_capacity(mdev, p_size);
3191 }
3192
99432fcc
PR
3193 mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3194 drbd_reconsider_max_bio_size(mdev);
3195
b411b363
PR
3196 if (get_ldev(mdev)) {
3197 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3198 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3199 ldsc = 1;
3200 }
3201
b411b363
PR
3202 put_ldev(mdev);
3203 }
3204
3205 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3206 if (be64_to_cpu(p->c_size) !=
3207 drbd_get_capacity(mdev->this_bdev) || ldsc) {
3208 /* we have different sizes, probably peer
3209 * needs to know my new size... */
e89b591c 3210 drbd_send_sizes(mdev, 0, ddsf);
b411b363
PR
3211 }
3212 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3213 (dd == grew && mdev->state.conn == C_CONNECTED)) {
3214 if (mdev->state.pdsk >= D_INCONSISTENT &&
e89b591c
PR
3215 mdev->state.disk >= D_INCONSISTENT) {
3216 if (ddsf & DDSF_NO_RESYNC)
3217 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3218 else
3219 resync_after_online_grow(mdev);
3220 } else
b411b363
PR
3221 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3222 }
3223 }
3224
82bc0194 3225 return 0;
b411b363
PR
3226}
3227
e2857216 3228static int receive_uuids(struct drbd_conf *mdev, struct packet_info *pi)
b411b363 3229{
e6ef8a5c 3230 struct p_uuids *p = mdev->tconn->data.rbuf;
b411b363 3231 u64 *p_uuid;
62b0da3a 3232 int i, updated_uuids = 0;
b411b363 3233
b411b363
PR
3234 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3235
3236 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3237 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3238
3239 kfree(mdev->p_uuid);
3240 mdev->p_uuid = p_uuid;
3241
3242 if (mdev->state.conn < C_CONNECTED &&
3243 mdev->state.disk < D_INCONSISTENT &&
3244 mdev->state.role == R_PRIMARY &&
3245 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3246 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3247 (unsigned long long)mdev->ed_uuid);
38fa9988 3248 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 3249 return -EIO;
b411b363
PR
3250 }
3251
3252 if (get_ldev(mdev)) {
3253 int skip_initial_sync =
3254 mdev->state.conn == C_CONNECTED &&
31890f4a 3255 mdev->tconn->agreed_pro_version >= 90 &&
b411b363
PR
3256 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3257 (p_uuid[UI_FLAGS] & 8);
3258 if (skip_initial_sync) {
3259 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3260 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
20ceb2b2
LE
3261 "clear_n_write from receive_uuids",
3262 BM_LOCKED_TEST_ALLOWED);
b411b363
PR
3263 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3264 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3265 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3266 CS_VERBOSE, NULL);
3267 drbd_md_sync(mdev);
62b0da3a 3268 updated_uuids = 1;
b411b363
PR
3269 }
3270 put_ldev(mdev);
18a50fa2
PR
3271 } else if (mdev->state.disk < D_INCONSISTENT &&
3272 mdev->state.role == R_PRIMARY) {
3273 /* I am a diskless primary, the peer just created a new current UUID
3274 for me. */
62b0da3a 3275 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
b411b363
PR
3276 }
3277
3278 /* Before we test for the disk state, we should wait until an eventually
3279 ongoing cluster wide state change is finished. That is important if
3280 we are primary and are detaching from our disk. We need to see the
3281 new disk state... */
8410da8f
PR
3282 mutex_lock(mdev->state_mutex);
3283 mutex_unlock(mdev->state_mutex);
b411b363 3284 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
62b0da3a
LE
3285 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3286
3287 if (updated_uuids)
3288 drbd_print_uuids(mdev, "receiver updated UUIDs to");
b411b363 3289
82bc0194 3290 return 0;
b411b363
PR
3291}
3292
3293/**
3294 * convert_state() - Converts the peer's view of the cluster state to our point of view
3295 * @ps: The state as seen by the peer.
3296 */
3297static union drbd_state convert_state(union drbd_state ps)
3298{
3299 union drbd_state ms;
3300
3301 static enum drbd_conns c_tab[] = {
3302 [C_CONNECTED] = C_CONNECTED,
3303
3304 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3305 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3306 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3307 [C_VERIFY_S] = C_VERIFY_T,
3308 [C_MASK] = C_MASK,
3309 };
3310
3311 ms.i = ps.i;
3312
3313 ms.conn = c_tab[ps.conn];
3314 ms.peer = ps.role;
3315 ms.role = ps.peer;
3316 ms.pdsk = ps.disk;
3317 ms.disk = ps.pdsk;
3318 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3319
3320 return ms;
3321}
3322
e2857216 3323static int receive_req_state(struct drbd_conf *mdev, struct packet_info *pi)
b411b363 3324{
e6ef8a5c 3325 struct p_req_state *p = mdev->tconn->data.rbuf;
b411b363 3326 union drbd_state mask, val;
bf885f8a 3327 enum drbd_state_rv rv;
b411b363 3328
b411b363
PR
3329 mask.i = be32_to_cpu(p->mask);
3330 val.i = be32_to_cpu(p->val);
3331
25703f83 3332 if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
8410da8f 3333 mutex_is_locked(mdev->state_mutex)) {
b411b363 3334 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
82bc0194 3335 return 0;
b411b363
PR
3336 }
3337
3338 mask = convert_state(mask);
3339 val = convert_state(val);
3340
dfafcc8a
PR
3341 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3342 drbd_send_sr_reply(mdev, rv);
b411b363 3343
b411b363
PR
3344 drbd_md_sync(mdev);
3345
82bc0194 3346 return 0;
b411b363
PR
3347}
3348
e2857216 3349static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
dfafcc8a 3350{
e6ef8a5c 3351 struct p_req_state *p = tconn->data.rbuf;
dfafcc8a
PR
3352 union drbd_state mask, val;
3353 enum drbd_state_rv rv;
3354
3355 mask.i = be32_to_cpu(p->mask);
3356 val.i = be32_to_cpu(p->val);
3357
3358 if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3359 mutex_is_locked(&tconn->cstate_mutex)) {
3360 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
82bc0194 3361 return 0;
dfafcc8a
PR
3362 }
3363
3364 mask = convert_state(mask);
3365 val = convert_state(val);
3366
3367 rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY);
3368 conn_send_sr_reply(tconn, rv);
3369
82bc0194 3370 return 0;
dfafcc8a
PR
3371}
3372
e2857216 3373static int receive_state(struct drbd_conf *mdev, struct packet_info *pi)
b411b363 3374{
e6ef8a5c 3375 struct p_state *p = mdev->tconn->data.rbuf;
4ac4aada 3376 union drbd_state os, ns, peer_state;
b411b363 3377 enum drbd_disk_state real_peer_disk;
65d922c3 3378 enum chg_state_flags cs_flags;
b411b363
PR
3379 int rv;
3380
b411b363
PR
3381 peer_state.i = be32_to_cpu(p->state);
3382
3383 real_peer_disk = peer_state.disk;
3384 if (peer_state.disk == D_NEGOTIATING) {
3385 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3386 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3387 }
3388
87eeee41 3389 spin_lock_irq(&mdev->tconn->req_lock);
b411b363 3390 retry:
4ac4aada 3391 os = ns = mdev->state;
87eeee41 3392 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363 3393
e9ef7bb6
LE
3394 /* peer says his disk is uptodate, while we think it is inconsistent,
3395 * and this happens while we think we have a sync going on. */
3396 if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3397 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3398 /* If we are (becoming) SyncSource, but peer is still in sync
3399 * preparation, ignore its uptodate-ness to avoid flapping, it
3400 * will change to inconsistent once the peer reaches active
3401 * syncing states.
3402 * It may have changed syncer-paused flags, however, so we
3403 * cannot ignore this completely. */
3404 if (peer_state.conn > C_CONNECTED &&
3405 peer_state.conn < C_SYNC_SOURCE)
3406 real_peer_disk = D_INCONSISTENT;
3407
3408 /* if peer_state changes to connected at the same time,
3409 * it explicitly notifies us that it finished resync.
3410 * Maybe we should finish it up, too? */
3411 else if (os.conn >= C_SYNC_SOURCE &&
3412 peer_state.conn == C_CONNECTED) {
3413 if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3414 drbd_resync_finished(mdev);
82bc0194 3415 return 0;
e9ef7bb6
LE
3416 }
3417 }
3418
3419 /* peer says his disk is inconsistent, while we think it is uptodate,
3420 * and this happens while the peer still thinks we have a sync going on,
3421 * but we think we are already done with the sync.
3422 * We ignore this to avoid flapping pdsk.
3423 * This should not happen, if the peer is a recent version of drbd. */
3424 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3425 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3426 real_peer_disk = D_UP_TO_DATE;
3427
4ac4aada
LE
3428 if (ns.conn == C_WF_REPORT_PARAMS)
3429 ns.conn = C_CONNECTED;
b411b363 3430
67531718
PR
3431 if (peer_state.conn == C_AHEAD)
3432 ns.conn = C_BEHIND;
3433
b411b363
PR
3434 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3435 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3436 int cr; /* consider resync */
3437
3438 /* if we established a new connection */
4ac4aada 3439 cr = (os.conn < C_CONNECTED);
b411b363
PR
3440 /* if we had an established connection
3441 * and one of the nodes newly attaches a disk */
4ac4aada 3442 cr |= (os.conn == C_CONNECTED &&
b411b363 3443 (peer_state.disk == D_NEGOTIATING ||
4ac4aada 3444 os.disk == D_NEGOTIATING));
b411b363
PR
3445 /* if we have both been inconsistent, and the peer has been
3446 * forced to be UpToDate with --overwrite-data */
3447 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3448 /* if we had been plain connected, and the admin requested to
3449 * start a sync by "invalidate" or "invalidate-remote" */
4ac4aada 3450 cr |= (os.conn == C_CONNECTED &&
b411b363
PR
3451 (peer_state.conn >= C_STARTING_SYNC_S &&
3452 peer_state.conn <= C_WF_BITMAP_T));
3453
3454 if (cr)
4ac4aada 3455 ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
b411b363
PR
3456
3457 put_ldev(mdev);
4ac4aada
LE
3458 if (ns.conn == C_MASK) {
3459 ns.conn = C_CONNECTED;
b411b363 3460 if (mdev->state.disk == D_NEGOTIATING) {
82f59cc6 3461 drbd_force_state(mdev, NS(disk, D_FAILED));
b411b363
PR
3462 } else if (peer_state.disk == D_NEGOTIATING) {
3463 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3464 peer_state.disk = D_DISKLESS;
580b9767 3465 real_peer_disk = D_DISKLESS;
b411b363 3466 } else {
8169e41b 3467 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
82bc0194 3468 return -EIO;
4ac4aada 3469 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
38fa9988 3470 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 3471 return -EIO;
b411b363
PR
3472 }
3473 }
3474 }
3475
87eeee41 3476 spin_lock_irq(&mdev->tconn->req_lock);
4ac4aada 3477 if (mdev->state.i != os.i)
b411b363
PR
3478 goto retry;
3479 clear_bit(CONSIDER_RESYNC, &mdev->flags);
b411b363
PR
3480 ns.peer = peer_state.role;
3481 ns.pdsk = real_peer_disk;
3482 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4ac4aada 3483 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
b411b363 3484 ns.disk = mdev->new_state_tmp.disk;
4ac4aada
LE
3485 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3486 if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
481c6f50 3487 test_bit(NEW_CUR_UUID, &mdev->flags)) {
8554df1c 3488 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
481c6f50 3489 for temporal network outages! */
87eeee41 3490 spin_unlock_irq(&mdev->tconn->req_lock);
481c6f50 3491 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
2f5cdd0b 3492 tl_clear(mdev->tconn);
481c6f50
PR
3493 drbd_uuid_new_current(mdev);
3494 clear_bit(NEW_CUR_UUID, &mdev->flags);
38fa9988 3495 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
82bc0194 3496 return -EIO;
481c6f50 3497 }
65d922c3 3498 rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
b411b363 3499 ns = mdev->state;
87eeee41 3500 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
3501
3502 if (rv < SS_SUCCESS) {
38fa9988 3503 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 3504 return -EIO;
b411b363
PR
3505 }
3506
4ac4aada
LE
3507 if (os.conn > C_WF_REPORT_PARAMS) {
3508 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
b411b363
PR
3509 peer_state.disk != D_NEGOTIATING ) {
3510 /* we want resync, peer has not yet decided to sync... */
3511 /* Nowadays only used when forcing a node into primary role and
3512 setting its disk to UpToDate with that */
3513 drbd_send_uuids(mdev);
3514 drbd_send_state(mdev);
3515 }
3516 }
3517
89e58e75 3518 mdev->tconn->net_conf->want_lose = 0;
b411b363
PR
3519
3520 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3521
82bc0194 3522 return 0;
b411b363
PR
3523}
3524
e2857216 3525static int receive_sync_uuid(struct drbd_conf *mdev, struct packet_info *pi)
b411b363 3526{
e6ef8a5c 3527 struct p_rs_uuid *p = mdev->tconn->data.rbuf;
b411b363
PR
3528
3529 wait_event(mdev->misc_wait,
3530 mdev->state.conn == C_WF_SYNC_UUID ||
c4752ef1 3531 mdev->state.conn == C_BEHIND ||
b411b363
PR
3532 mdev->state.conn < C_CONNECTED ||
3533 mdev->state.disk < D_NEGOTIATING);
3534
3535 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3536
b411b363
PR
3537 /* Here the _drbd_uuid_ functions are right, current should
3538 _not_ be rotated into the history */
3539 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3540 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3541 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3542
62b0da3a 3543 drbd_print_uuids(mdev, "updated sync uuid");
b411b363
PR
3544 drbd_start_resync(mdev, C_SYNC_TARGET);
3545
3546 put_ldev(mdev);
3547 } else
3548 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3549
82bc0194 3550 return 0;
b411b363
PR
3551}
3552
2c46407d
AG
3553/**
3554 * receive_bitmap_plain
3555 *
3556 * Return 0 when done, 1 when another iteration is needed, and a negative error
3557 * code upon failure.
3558 */
3559static int
02918be2 3560receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
fc56815c 3561 struct p_header *h, struct bm_xfer_ctx *c)
b411b363 3562{
fc56815c 3563 unsigned long *buffer = (unsigned long *)h->payload;
b411b363
PR
3564 unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3565 unsigned want = num_words * sizeof(long);
2c46407d 3566 int err;
b411b363 3567
02918be2
PR
3568 if (want != data_size) {
3569 dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
2c46407d 3570 return -EIO;
b411b363
PR
3571 }
3572 if (want == 0)
2c46407d 3573 return 0;
82bc0194
AG
3574 err = drbd_recv_all(mdev->tconn, buffer, want);
3575 if (err)
2c46407d 3576 return err;
b411b363
PR
3577
3578 drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3579
3580 c->word_offset += num_words;
3581 c->bit_offset = c->word_offset * BITS_PER_LONG;
3582 if (c->bit_offset > c->bm_bits)
3583 c->bit_offset = c->bm_bits;
3584
2c46407d 3585 return 1;
b411b363
PR
3586}
3587
a02d1240
AG
3588static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
3589{
3590 return (enum drbd_bitmap_code)(p->encoding & 0x0f);
3591}
3592
3593static int dcbp_get_start(struct p_compressed_bm *p)
3594{
3595 return (p->encoding & 0x80) != 0;
3596}
3597
3598static int dcbp_get_pad_bits(struct p_compressed_bm *p)
3599{
3600 return (p->encoding >> 4) & 0x7;
3601}
3602
2c46407d
AG
3603/**
3604 * recv_bm_rle_bits
3605 *
3606 * Return 0 when done, 1 when another iteration is needed, and a negative error
3607 * code upon failure.
3608 */
3609static int
b411b363
PR
3610recv_bm_rle_bits(struct drbd_conf *mdev,
3611 struct p_compressed_bm *p,
c6d25cfe
PR
3612 struct bm_xfer_ctx *c,
3613 unsigned int len)
b411b363
PR
3614{
3615 struct bitstream bs;
3616 u64 look_ahead;
3617 u64 rl;
3618 u64 tmp;
3619 unsigned long s = c->bit_offset;
3620 unsigned long e;
a02d1240 3621 int toggle = dcbp_get_start(p);
b411b363
PR
3622 int have;
3623 int bits;
3624
a02d1240 3625 bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
b411b363
PR
3626
3627 bits = bitstream_get_bits(&bs, &look_ahead, 64);
3628 if (bits < 0)
2c46407d 3629 return -EIO;
b411b363
PR
3630
3631 for (have = bits; have > 0; s += rl, toggle = !toggle) {
3632 bits = vli_decode_bits(&rl, look_ahead);
3633 if (bits <= 0)
2c46407d 3634 return -EIO;
b411b363
PR
3635
3636 if (toggle) {
3637 e = s + rl -1;
3638 if (e >= c->bm_bits) {
3639 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
2c46407d 3640 return -EIO;
b411b363
PR
3641 }
3642 _drbd_bm_set_bits(mdev, s, e);
3643 }
3644
3645 if (have < bits) {
3646 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3647 have, bits, look_ahead,
3648 (unsigned int)(bs.cur.b - p->code),
3649 (unsigned int)bs.buf_len);
2c46407d 3650 return -EIO;
b411b363
PR
3651 }
3652 look_ahead >>= bits;
3653 have -= bits;
3654
3655 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3656 if (bits < 0)
2c46407d 3657 return -EIO;
b411b363
PR
3658 look_ahead |= tmp << have;
3659 have += bits;
3660 }
3661
3662 c->bit_offset = s;
3663 bm_xfer_ctx_bit_to_word_offset(c);
3664
2c46407d 3665 return (s != c->bm_bits);
b411b363
PR
3666}
3667
2c46407d
AG
3668/**
3669 * decode_bitmap_c
3670 *
3671 * Return 0 when done, 1 when another iteration is needed, and a negative error
3672 * code upon failure.
3673 */
3674static int
b411b363
PR
3675decode_bitmap_c(struct drbd_conf *mdev,
3676 struct p_compressed_bm *p,
c6d25cfe
PR
3677 struct bm_xfer_ctx *c,
3678 unsigned int len)
b411b363 3679{
a02d1240 3680 if (dcbp_get_code(p) == RLE_VLI_Bits)
c6d25cfe 3681 return recv_bm_rle_bits(mdev, p, c, len);
b411b363
PR
3682
3683 /* other variants had been implemented for evaluation,
3684 * but have been dropped as this one turned out to be "best"
3685 * during all our tests. */
3686
3687 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
38fa9988 3688 conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
2c46407d 3689 return -EIO;
b411b363
PR
3690}
3691
3692void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3693 const char *direction, struct bm_xfer_ctx *c)
3694{
3695 /* what would it take to transfer it "plaintext" */
c012949a 3696 unsigned plain = sizeof(struct p_header) *
b411b363
PR
3697 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3698 + c->bm_words * sizeof(long);
3699 unsigned total = c->bytes[0] + c->bytes[1];
3700 unsigned r;
3701
3702 /* total can not be zero. but just in case: */
3703 if (total == 0)
3704 return;
3705
3706 /* don't report if not compressed */
3707 if (total >= plain)
3708 return;
3709
3710 /* total < plain. check for overflow, still */
3711 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3712 : (1000 * total / plain);
3713
3714 if (r > 1000)
3715 r = 1000;
3716
3717 r = 1000 - r;
3718 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3719 "total %u; compression: %u.%u%%\n",
3720 direction,
3721 c->bytes[1], c->packets[1],
3722 c->bytes[0], c->packets[0],
3723 total, r/10, r % 10);
3724}
3725
3726/* Since we are processing the bitfield from lower addresses to higher,
3727 it does not matter if the process it in 32 bit chunks or 64 bit
3728 chunks as long as it is little endian. (Understand it as byte stream,
3729 beginning with the lowest byte...) If we would use big endian
3730 we would need to process it from the highest address to the lowest,
3731 in order to be agnostic to the 32 vs 64 bits issue.
3732
3733 returns 0 on failure, 1 if we successfully received it. */
e2857216 3734static int receive_bitmap(struct drbd_conf *mdev, struct packet_info *pi)
b411b363
PR
3735{
3736 struct bm_xfer_ctx c;
2c46407d 3737 int err;
e6ef8a5c 3738 struct p_header *h = mdev->tconn->data.rbuf;
b411b363 3739
20ceb2b2
LE
3740 drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3741 /* you are supposed to send additional out-of-sync information
3742 * if you actually set bits during this phase */
b411b363 3743
b411b363
PR
3744 c = (struct bm_xfer_ctx) {
3745 .bm_bits = drbd_bm_bits(mdev),
3746 .bm_words = drbd_bm_words(mdev),
3747 };
3748
2c46407d 3749 for(;;) {
e2857216
AG
3750 if (pi->cmd == P_BITMAP) {
3751 err = receive_bitmap_plain(mdev, pi->size, h, &c);
3752 } else if (pi->cmd == P_COMPRESSED_BITMAP) {
b411b363
PR
3753 /* MAYBE: sanity check that we speak proto >= 90,
3754 * and the feature is enabled! */
3755 struct p_compressed_bm *p;
3756
e2857216 3757 if (pi->size > BM_PACKET_PAYLOAD_BYTES) {
b411b363 3758 dev_err(DEV, "ReportCBitmap packet too large\n");
82bc0194 3759 err = -EIO;
b411b363
PR
3760 goto out;
3761 }
fc56815c
AG
3762
3763 p = mdev->tconn->data.rbuf;
e2857216 3764 err = drbd_recv_all(mdev->tconn, p->head.payload, pi->size);
82bc0194
AG
3765 if (err)
3766 goto out;
e2857216
AG
3767 if (pi->size <= (sizeof(*p) - sizeof(p->head))) {
3768 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
82bc0194 3769 err = -EIO;
78fcbdae 3770 goto out;
b411b363 3771 }
e2857216 3772 err = decode_bitmap_c(mdev, p, &c, pi->size);
b411b363 3773 } else {
e2857216 3774 dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
82bc0194 3775 err = -EIO;
b411b363
PR
3776 goto out;
3777 }
3778
e2857216
AG
3779 c.packets[pi->cmd == P_BITMAP]++;
3780 c.bytes[pi->cmd == P_BITMAP] += sizeof(struct p_header) + pi->size;
b411b363 3781
2c46407d
AG
3782 if (err <= 0) {
3783 if (err < 0)
3784 goto out;
b411b363 3785 break;
2c46407d 3786 }
e2857216 3787 err = drbd_recv_header(mdev->tconn, pi);
82bc0194 3788 if (err)
b411b363 3789 goto out;
2c46407d 3790 }
b411b363
PR
3791
3792 INFO_bm_xfer_stats(mdev, "receive", &c);
3793
3794 if (mdev->state.conn == C_WF_BITMAP_T) {
de1f8e4a
AG
3795 enum drbd_state_rv rv;
3796
82bc0194
AG
3797 err = drbd_send_bitmap(mdev);
3798 if (err)
b411b363
PR
3799 goto out;
3800 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
de1f8e4a
AG
3801 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3802 D_ASSERT(rv == SS_SUCCESS);
b411b363
PR
3803 } else if (mdev->state.conn != C_WF_BITMAP_S) {
3804 /* admin may have requested C_DISCONNECTING,
3805 * other threads may have noticed network errors */
3806 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3807 drbd_conn_str(mdev->state.conn));
3808 }
82bc0194 3809 err = 0;
b411b363 3810
b411b363 3811 out:
20ceb2b2 3812 drbd_bm_unlock(mdev);
82bc0194 3813 if (!err && mdev->state.conn == C_WF_BITMAP_S)
b411b363 3814 drbd_start_resync(mdev, C_SYNC_SOURCE);
82bc0194 3815 return err;
b411b363
PR
3816}
3817
2de876ef 3818static int _tconn_receive_skip(struct drbd_tconn *tconn, unsigned int data_size)
b411b363
PR
3819{
3820 /* TODO zero copy sink :) */
3821 static char sink[128];
3822 int size, want, r;
3823
02918be2 3824 size = data_size;
b411b363
PR
3825 while (size > 0) {
3826 want = min_t(int, size, sizeof(sink));
2de876ef
PR
3827 r = drbd_recv(tconn, sink, want);
3828 if (r <= 0)
841ce241 3829 break;
b411b363
PR
3830 size -= r;
3831 }
82bc0194 3832 return size ? -EIO : 0;
b411b363
PR
3833}
3834
e2857216 3835static int receive_skip(struct drbd_conf *mdev, struct packet_info *pi)
2de876ef
PR
3836{
3837 dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
e2857216 3838 pi->cmd, pi->size);
2de876ef 3839
e2857216 3840 return _tconn_receive_skip(mdev->tconn, pi->size);
2de876ef
PR
3841}
3842
e2857216 3843static int tconn_receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
2de876ef
PR
3844{
3845 conn_warn(tconn, "skipping packet for non existing volume type %d, l: %d!\n",
e2857216 3846 pi->cmd, pi->size);
2de876ef 3847
e2857216 3848 return _tconn_receive_skip(tconn, pi->size);
2de876ef
PR
3849}
3850
e2857216 3851static int receive_UnplugRemote(struct drbd_conf *mdev, struct packet_info *pi)
0ced55a3 3852{
e7f52dfb
LE
3853 /* Make sure we've acked all the TCP data associated
3854 * with the data requests being unplugged */
e42325a5 3855 drbd_tcp_quickack(mdev->tconn->data.socket);
0ced55a3 3856
82bc0194 3857 return 0;
0ced55a3
PR
3858}
3859
e2857216 3860static int receive_out_of_sync(struct drbd_conf *mdev, struct packet_info *pi)
73a01a18 3861{
e6ef8a5c 3862 struct p_block_desc *p = mdev->tconn->data.rbuf;
73a01a18 3863
f735e363
LE
3864 switch (mdev->state.conn) {
3865 case C_WF_SYNC_UUID:
3866 case C_WF_BITMAP_T:
3867 case C_BEHIND:
3868 break;
3869 default:
3870 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3871 drbd_conn_str(mdev->state.conn));
3872 }
3873
73a01a18
PR
3874 drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3875
82bc0194 3876 return 0;
73a01a18
PR
3877}
3878
02918be2
PR
3879struct data_cmd {
3880 int expect_payload;
3881 size_t pkt_size;
a4fbda8e 3882 enum mdev_or_conn fa_type; /* first argument's type */
d9ae84e7 3883 union {
e2857216
AG
3884 int (*mdev_fn)(struct drbd_conf *, struct packet_info *);
3885 int (*conn_fn)(struct drbd_tconn *, struct packet_info *);
d9ae84e7 3886 };
02918be2
PR
3887};
3888
3889static struct data_cmd drbd_cmd_handler[] = {
d9ae84e7
PR
3890 [P_DATA] = { 1, sizeof(struct p_data), MDEV, { receive_Data } },
3891 [P_DATA_REPLY] = { 1, sizeof(struct p_data), MDEV, { receive_DataReply } },
3892 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), MDEV, { receive_RSDataReply } } ,
3893 [P_BARRIER] = { 0, sizeof(struct p_barrier), MDEV, { receive_Barrier } } ,
3894 [P_BITMAP] = { 1, sizeof(struct p_header), MDEV, { receive_bitmap } } ,
3895 [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header), MDEV, { receive_bitmap } } ,
3896 [P_UNPLUG_REMOTE] = { 0, sizeof(struct p_header), MDEV, { receive_UnplugRemote } },
3897 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), MDEV, { receive_DataRequest } },
3898 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), MDEV, { receive_DataRequest } },
3899 [P_SYNC_PARAM] = { 1, sizeof(struct p_header), MDEV, { receive_SyncParam } },
3900 [P_SYNC_PARAM89] = { 1, sizeof(struct p_header), MDEV, { receive_SyncParam } },
7204624c 3901 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), CONN, { .conn_fn = receive_protocol } },
d9ae84e7
PR
3902 [P_UUIDS] = { 0, sizeof(struct p_uuids), MDEV, { receive_uuids } },
3903 [P_SIZES] = { 0, sizeof(struct p_sizes), MDEV, { receive_sizes } },
3904 [P_STATE] = { 0, sizeof(struct p_state), MDEV, { receive_state } },
3905 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), MDEV, { receive_req_state } },
3906 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), MDEV, { receive_sync_uuid } },
3907 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), MDEV, { receive_DataRequest } },
3908 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), MDEV, { receive_DataRequest } },
3909 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), MDEV, { receive_DataRequest } },
3910 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), MDEV, { receive_skip } },
3911 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), MDEV, { receive_out_of_sync } },
dfafcc8a 3912 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), CONN, { .conn_fn = receive_req_conn_state } },
b411b363
PR
3913};
3914
eefc2f7d 3915static void drbdd(struct drbd_tconn *tconn)
b411b363 3916{
e6ef8a5c 3917 struct p_header *header = tconn->data.rbuf;
77351055 3918 struct packet_info pi;
02918be2 3919 size_t shs; /* sub header size */
82bc0194 3920 int err;
b411b363 3921
eefc2f7d 3922 while (get_t_state(&tconn->receiver) == RUNNING) {
deebe195
AG
3923 struct data_cmd *cmd;
3924
eefc2f7d 3925 drbd_thread_current_set_cpu(&tconn->receiver);
69bc7bc3 3926 if (drbd_recv_header(tconn, &pi))
02918be2 3927 goto err_out;
b411b363 3928
deebe195
AG
3929 cmd = &drbd_cmd_handler[pi.cmd];
3930 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->mdev_fn)) {
eefc2f7d 3931 conn_err(tconn, "unknown packet type %d, l: %d!\n", pi.cmd, pi.size);
02918be2 3932 goto err_out;
0b33a916 3933 }
b411b363 3934
deebe195
AG
3935 shs = cmd->pkt_size - sizeof(struct p_header);
3936 if (pi.size - shs > 0 && !cmd->expect_payload) {
eefc2f7d 3937 conn_err(tconn, "No payload expected %s l:%d\n", cmdname(pi.cmd), pi.size);
02918be2 3938 goto err_out;
b411b363 3939 }
b411b363 3940
c13f7e1a 3941 if (shs) {
a5c31904
AG
3942 err = drbd_recv_all_warn(tconn, &header->payload, shs);
3943 if (err)
c13f7e1a 3944 goto err_out;
e2857216 3945 pi.size -= shs;
c13f7e1a
LE
3946 }
3947
e2857216
AG
3948 if (cmd->fa_type == CONN)
3949 err = cmd->conn_fn(tconn, &pi);
3950 else {
d9ae84e7 3951 struct drbd_conf *mdev = vnr_to_mdev(tconn, pi.vnr);
82bc0194 3952 err = mdev ?
e2857216
AG
3953 cmd->mdev_fn(mdev, &pi) :
3954 tconn_receive_skip(tconn, &pi);
d9ae84e7 3955 }
b411b363 3956
82bc0194 3957 if (unlikely(err)) {
eefc2f7d 3958 conn_err(tconn, "error receiving %s, l: %d!\n",
77351055 3959 cmdname(pi.cmd), pi.size);
02918be2 3960 goto err_out;
b411b363
PR
3961 }
3962 }
82bc0194 3963 return;
b411b363 3964
82bc0194
AG
3965 err_out:
3966 conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
b411b363
PR
3967}
3968
0e29d163 3969void conn_flush_workqueue(struct drbd_tconn *tconn)
b411b363
PR
3970{
3971 struct drbd_wq_barrier barr;
3972
3973 barr.w.cb = w_prev_work_done;
0e29d163 3974 barr.w.tconn = tconn;
b411b363 3975 init_completion(&barr.done);
0e29d163 3976 drbd_queue_work(&tconn->data.work, &barr.w);
b411b363
PR
3977 wait_for_completion(&barr.done);
3978}
3979
360cc740 3980static void drbd_disconnect(struct drbd_tconn *tconn)
b411b363 3981{
bbeb641c 3982 enum drbd_conns oc;
b411b363 3983 int rv = SS_UNKNOWN_ERROR;
b411b363 3984
bbeb641c 3985 if (tconn->cstate == C_STANDALONE)
b411b363 3986 return;
b411b363
PR
3987
3988 /* asender does not clean up anything. it must not interfere, either */
360cc740
PR
3989 drbd_thread_stop(&tconn->asender);
3990 drbd_free_sock(tconn);
3991
3992 idr_for_each(&tconn->volumes, drbd_disconnected, tconn);
360cc740
PR
3993 conn_info(tconn, "Connection closed\n");
3994
cb703454
PR
3995 if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
3996 conn_try_outdate_peer_async(tconn);
3997
360cc740 3998 spin_lock_irq(&tconn->req_lock);
bbeb641c
PR
3999 oc = tconn->cstate;
4000 if (oc >= C_UNCONNECTED)
4001 rv = _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4002
360cc740
PR
4003 spin_unlock_irq(&tconn->req_lock);
4004
bbeb641c 4005 if (oc == C_DISCONNECTING) {
360cc740
PR
4006 wait_event(tconn->net_cnt_wait, atomic_read(&tconn->net_cnt) == 0);
4007
4008 crypto_free_hash(tconn->cram_hmac_tfm);
4009 tconn->cram_hmac_tfm = NULL;
4010
4011 kfree(tconn->net_conf);
4012 tconn->net_conf = NULL;
bbeb641c 4013 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE);
360cc740
PR
4014 }
4015}
4016
4017static int drbd_disconnected(int vnr, void *p, void *data)
4018{
4019 struct drbd_conf *mdev = (struct drbd_conf *)p;
4020 enum drbd_fencing_p fp;
4021 unsigned int i;
b411b363 4022
85719573 4023 /* wait for current activity to cease. */
87eeee41 4024 spin_lock_irq(&mdev->tconn->req_lock);
b411b363
PR
4025 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4026 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4027 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
87eeee41 4028 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
4029
4030 /* We do not have data structures that would allow us to
4031 * get the rs_pending_cnt down to 0 again.
4032 * * On C_SYNC_TARGET we do not have any data structures describing
4033 * the pending RSDataRequest's we have sent.
4034 * * On C_SYNC_SOURCE there is no data structure that tracks
4035 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4036 * And no, it is not the sum of the reference counts in the
4037 * resync_LRU. The resync_LRU tracks the whole operation including
4038 * the disk-IO, while the rs_pending_cnt only tracks the blocks
4039 * on the fly. */
4040 drbd_rs_cancel_all(mdev);
4041 mdev->rs_total = 0;
4042 mdev->rs_failed = 0;
4043 atomic_set(&mdev->rs_pending_cnt, 0);
4044 wake_up(&mdev->misc_wait);
4045
7fde2be9
PR
4046 del_timer(&mdev->request_timer);
4047
b411b363 4048 del_timer_sync(&mdev->resync_timer);
b411b363
PR
4049 resync_timer_fn((unsigned long)mdev);
4050
b411b363
PR
4051 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4052 * w_make_resync_request etc. which may still be on the worker queue
4053 * to be "canceled" */
a21e9298 4054 drbd_flush_workqueue(mdev);
b411b363
PR
4055
4056 /* This also does reclaim_net_ee(). If we do this too early, we might
4057 * miss some resync ee and pages.*/
4058 drbd_process_done_ee(mdev);
4059
4060 kfree(mdev->p_uuid);
4061 mdev->p_uuid = NULL;
4062
fb22c402 4063 if (!is_susp(mdev->state))
2f5cdd0b 4064 tl_clear(mdev->tconn);
b411b363 4065
b411b363
PR
4066 drbd_md_sync(mdev);
4067
4068 fp = FP_DONT_CARE;
4069 if (get_ldev(mdev)) {
4070 fp = mdev->ldev->dc.fencing;
4071 put_ldev(mdev);
4072 }
4073
20ceb2b2
LE
4074 /* serialize with bitmap writeout triggered by the state change,
4075 * if any. */
4076 wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4077
b411b363
PR
4078 /* tcp_close and release of sendpage pages can be deferred. I don't
4079 * want to use SO_LINGER, because apparently it can be deferred for
4080 * more than 20 seconds (longest time I checked).
4081 *
4082 * Actually we don't care for exactly when the network stack does its
4083 * put_page(), but release our reference on these pages right here.
4084 */
4085 i = drbd_release_ee(mdev, &mdev->net_ee);
4086 if (i)
4087 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
435f0740
LE
4088 i = atomic_read(&mdev->pp_in_use_by_net);
4089 if (i)
4090 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
b411b363
PR
4091 i = atomic_read(&mdev->pp_in_use);
4092 if (i)
45bb912b 4093 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
b411b363
PR
4094
4095 D_ASSERT(list_empty(&mdev->read_ee));
4096 D_ASSERT(list_empty(&mdev->active_ee));
4097 D_ASSERT(list_empty(&mdev->sync_ee));
4098 D_ASSERT(list_empty(&mdev->done_ee));
4099
4100 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4101 atomic_set(&mdev->current_epoch->epoch_size, 0);
4102 D_ASSERT(list_empty(&mdev->current_epoch->list));
360cc740
PR
4103
4104 return 0;
b411b363
PR
4105}
4106
4107/*
4108 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4109 * we can agree on is stored in agreed_pro_version.
4110 *
4111 * feature flags and the reserved array should be enough room for future
4112 * enhancements of the handshake protocol, and possible plugins...
4113 *
4114 * for now, they are expected to be zero, but ignored.
4115 */
8a22cccc 4116static int drbd_send_handshake(struct drbd_tconn *tconn)
b411b363 4117{
e6b3ea83 4118 /* ASSERT current == mdev->tconn->receiver ... */
5a87d920 4119 struct p_handshake *p = tconn->data.sbuf;
e8d17b01 4120 int err;
b411b363 4121
8a22cccc
PR
4122 if (mutex_lock_interruptible(&tconn->data.mutex)) {
4123 conn_err(tconn, "interrupted during initial handshake\n");
e8d17b01 4124 return -EINTR;
b411b363
PR
4125 }
4126
8a22cccc
PR
4127 if (tconn->data.socket == NULL) {
4128 mutex_unlock(&tconn->data.mutex);
e8d17b01 4129 return -EIO;
b411b363
PR
4130 }
4131
4132 memset(p, 0, sizeof(*p));
4133 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4134 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
e8d17b01 4135 err = _conn_send_cmd(tconn, 0, tconn->data.socket, P_HAND_SHAKE,
ecf2363c 4136 &p->head, sizeof(*p), 0);
8a22cccc 4137 mutex_unlock(&tconn->data.mutex);
e8d17b01 4138 return err;
b411b363
PR
4139}
4140
4141/*
4142 * return values:
4143 * 1 yes, we have a valid connection
4144 * 0 oops, did not work out, please try again
4145 * -1 peer talks different language,
4146 * no point in trying again, please go standalone.
4147 */
65d11ed6 4148static int drbd_do_handshake(struct drbd_tconn *tconn)
b411b363 4149{
65d11ed6 4150 /* ASSERT current == tconn->receiver ... */
e6ef8a5c 4151 struct p_handshake *p = tconn->data.rbuf;
02918be2 4152 const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
77351055 4153 struct packet_info pi;
a5c31904 4154 int err;
b411b363 4155
e8d17b01
AG
4156 err = drbd_send_handshake(tconn);
4157 if (err)
b411b363
PR
4158 return 0;
4159
69bc7bc3
AG
4160 err = drbd_recv_header(tconn, &pi);
4161 if (err)
b411b363
PR
4162 return 0;
4163
77351055 4164 if (pi.cmd != P_HAND_SHAKE) {
65d11ed6 4165 conn_err(tconn, "expected HandShake packet, received: %s (0x%04x)\n",
77351055 4166 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4167 return -1;
4168 }
4169
77351055 4170 if (pi.size != expect) {
65d11ed6 4171 conn_err(tconn, "expected HandShake length: %u, received: %u\n",
77351055 4172 expect, pi.size);
b411b363
PR
4173 return -1;
4174 }
4175
a5c31904
AG
4176 err = drbd_recv_all_warn(tconn, &p->head.payload, expect);
4177 if (err)
b411b363 4178 return 0;
b411b363 4179
b411b363
PR
4180 p->protocol_min = be32_to_cpu(p->protocol_min);
4181 p->protocol_max = be32_to_cpu(p->protocol_max);
4182 if (p->protocol_max == 0)
4183 p->protocol_max = p->protocol_min;
4184
4185 if (PRO_VERSION_MAX < p->protocol_min ||
4186 PRO_VERSION_MIN > p->protocol_max)
4187 goto incompat;
4188
65d11ed6 4189 tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
b411b363 4190
65d11ed6
PR
4191 conn_info(tconn, "Handshake successful: "
4192 "Agreed network protocol version %d\n", tconn->agreed_pro_version);
b411b363
PR
4193
4194 return 1;
4195
4196 incompat:
65d11ed6 4197 conn_err(tconn, "incompatible DRBD dialects: "
b411b363
PR
4198 "I support %d-%d, peer supports %d-%d\n",
4199 PRO_VERSION_MIN, PRO_VERSION_MAX,
4200 p->protocol_min, p->protocol_max);
4201 return -1;
4202}
4203
4204#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
13e6037d 4205static int drbd_do_auth(struct drbd_tconn *tconn)
b411b363
PR
4206{
4207 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4208 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
b10d96cb 4209 return -1;
b411b363
PR
4210}
4211#else
4212#define CHALLENGE_LEN 64
b10d96cb
JT
4213
4214/* Return value:
4215 1 - auth succeeded,
4216 0 - failed, try again (network error),
4217 -1 - auth failed, don't try again.
4218*/
4219
13e6037d 4220static int drbd_do_auth(struct drbd_tconn *tconn)
b411b363
PR
4221{
4222 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4223 struct scatterlist sg;
4224 char *response = NULL;
4225 char *right_response = NULL;
4226 char *peers_ch = NULL;
13e6037d 4227 unsigned int key_len = strlen(tconn->net_conf->shared_secret);
b411b363
PR
4228 unsigned int resp_size;
4229 struct hash_desc desc;
77351055 4230 struct packet_info pi;
69bc7bc3 4231 int err, rv;
b411b363 4232
13e6037d 4233 desc.tfm = tconn->cram_hmac_tfm;
b411b363
PR
4234 desc.flags = 0;
4235
13e6037d
PR
4236 rv = crypto_hash_setkey(tconn->cram_hmac_tfm,
4237 (u8 *)tconn->net_conf->shared_secret, key_len);
b411b363 4238 if (rv) {
13e6037d 4239 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
b10d96cb 4240 rv = -1;
b411b363
PR
4241 goto fail;
4242 }
4243
4244 get_random_bytes(my_challenge, CHALLENGE_LEN);
4245
ce9879cb 4246 rv = !conn_send_cmd2(tconn, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
b411b363
PR
4247 if (!rv)
4248 goto fail;
4249
69bc7bc3
AG
4250 err = drbd_recv_header(tconn, &pi);
4251 if (err) {
4252 rv = 0;
b411b363 4253 goto fail;
69bc7bc3 4254 }
b411b363 4255
77351055 4256 if (pi.cmd != P_AUTH_CHALLENGE) {
13e6037d 4257 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
77351055 4258 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4259 rv = 0;
4260 goto fail;
4261 }
4262
77351055 4263 if (pi.size > CHALLENGE_LEN * 2) {
13e6037d 4264 conn_err(tconn, "expected AuthChallenge payload too big.\n");
b10d96cb 4265 rv = -1;
b411b363
PR
4266 goto fail;
4267 }
4268
77351055 4269 peers_ch = kmalloc(pi.size, GFP_NOIO);
b411b363 4270 if (peers_ch == NULL) {
13e6037d 4271 conn_err(tconn, "kmalloc of peers_ch failed\n");
b10d96cb 4272 rv = -1;
b411b363
PR
4273 goto fail;
4274 }
4275
a5c31904
AG
4276 err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4277 if (err) {
b411b363
PR
4278 rv = 0;
4279 goto fail;
4280 }
4281
13e6037d 4282 resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
b411b363
PR
4283 response = kmalloc(resp_size, GFP_NOIO);
4284 if (response == NULL) {
13e6037d 4285 conn_err(tconn, "kmalloc of response failed\n");
b10d96cb 4286 rv = -1;
b411b363
PR
4287 goto fail;
4288 }
4289
4290 sg_init_table(&sg, 1);
77351055 4291 sg_set_buf(&sg, peers_ch, pi.size);
b411b363
PR
4292
4293 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4294 if (rv) {
13e6037d 4295 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4296 rv = -1;
b411b363
PR
4297 goto fail;
4298 }
4299
ce9879cb 4300 rv = !conn_send_cmd2(tconn, P_AUTH_RESPONSE, response, resp_size);
b411b363
PR
4301 if (!rv)
4302 goto fail;
4303
69bc7bc3
AG
4304 err = drbd_recv_header(tconn, &pi);
4305 if (err) {
4306 rv = 0;
b411b363 4307 goto fail;
69bc7bc3 4308 }
b411b363 4309
77351055 4310 if (pi.cmd != P_AUTH_RESPONSE) {
13e6037d 4311 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
77351055 4312 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4313 rv = 0;
4314 goto fail;
4315 }
4316
77351055 4317 if (pi.size != resp_size) {
13e6037d 4318 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
b411b363
PR
4319 rv = 0;
4320 goto fail;
4321 }
4322
a5c31904
AG
4323 err = drbd_recv_all_warn(tconn, response , resp_size);
4324 if (err) {
b411b363
PR
4325 rv = 0;
4326 goto fail;
4327 }
4328
4329 right_response = kmalloc(resp_size, GFP_NOIO);
2d1ee87d 4330 if (right_response == NULL) {
13e6037d 4331 conn_err(tconn, "kmalloc of right_response failed\n");
b10d96cb 4332 rv = -1;
b411b363
PR
4333 goto fail;
4334 }
4335
4336 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4337
4338 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4339 if (rv) {
13e6037d 4340 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4341 rv = -1;
b411b363
PR
4342 goto fail;
4343 }
4344
4345 rv = !memcmp(response, right_response, resp_size);
4346
4347 if (rv)
13e6037d
PR
4348 conn_info(tconn, "Peer authenticated using %d bytes of '%s' HMAC\n",
4349 resp_size, tconn->net_conf->cram_hmac_alg);
b10d96cb
JT
4350 else
4351 rv = -1;
b411b363
PR
4352
4353 fail:
4354 kfree(peers_ch);
4355 kfree(response);
4356 kfree(right_response);
4357
4358 return rv;
4359}
4360#endif
4361
4362int drbdd_init(struct drbd_thread *thi)
4363{
392c8801 4364 struct drbd_tconn *tconn = thi->tconn;
b411b363
PR
4365 int h;
4366
4d641dd7 4367 conn_info(tconn, "receiver (re)started\n");
b411b363
PR
4368
4369 do {
4d641dd7 4370 h = drbd_connect(tconn);
b411b363 4371 if (h == 0) {
4d641dd7 4372 drbd_disconnect(tconn);
20ee6390 4373 schedule_timeout_interruptible(HZ);
b411b363
PR
4374 }
4375 if (h == -1) {
4d641dd7 4376 conn_warn(tconn, "Discarding network configuration.\n");
bbeb641c 4377 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
4378 }
4379 } while (h == 0);
4380
4381 if (h > 0) {
4d641dd7
PR
4382 if (get_net_conf(tconn)) {
4383 drbdd(tconn);
4384 put_net_conf(tconn);
b411b363
PR
4385 }
4386 }
4387
4d641dd7 4388 drbd_disconnect(tconn);
b411b363 4389
4d641dd7 4390 conn_info(tconn, "receiver terminated\n");
b411b363
PR
4391 return 0;
4392}
4393
4394/* ********* acknowledge sender ******** */
4395
e4f78ede
PR
4396static int got_conn_RqSReply(struct drbd_tconn *tconn, enum drbd_packet cmd)
4397{
e6ef8a5c 4398 struct p_req_state_reply *p = tconn->meta.rbuf;
e4f78ede
PR
4399 int retcode = be32_to_cpu(p->retcode);
4400
4401 if (retcode >= SS_SUCCESS) {
4402 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4403 } else {
4404 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4405 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4406 drbd_set_st_err_str(retcode), retcode);
4407 }
4408 wake_up(&tconn->ping_wait);
4409
4410 return true;
4411}
4412
d8763023 4413static int got_RqSReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4414{
e6ef8a5c 4415 struct p_req_state_reply *p = mdev->tconn->meta.rbuf;
b411b363
PR
4416 int retcode = be32_to_cpu(p->retcode);
4417
e4f78ede
PR
4418 if (retcode >= SS_SUCCESS) {
4419 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4420 } else {
4421 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4422 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4423 drbd_set_st_err_str(retcode), retcode);
b411b363 4424 }
e4f78ede
PR
4425 wake_up(&mdev->state_wait);
4426
81e84650 4427 return true;
b411b363
PR
4428}
4429
f19e4f8b 4430static int got_Ping(struct drbd_tconn *tconn, enum drbd_packet cmd)
b411b363 4431{
f19e4f8b 4432 return drbd_send_ping_ack(tconn);
b411b363
PR
4433
4434}
4435
f19e4f8b 4436static int got_PingAck(struct drbd_tconn *tconn, enum drbd_packet cmd)
b411b363
PR
4437{
4438 /* restore idle timeout */
2a67d8b9
PR
4439 tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4440 if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4441 wake_up(&tconn->ping_wait);
b411b363 4442
81e84650 4443 return true;
b411b363
PR
4444}
4445
d8763023 4446static int got_IsInSync(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4447{
e6ef8a5c 4448 struct p_block_ack *p = mdev->tconn->meta.rbuf;
b411b363
PR
4449 sector_t sector = be64_to_cpu(p->sector);
4450 int blksize = be32_to_cpu(p->blksize);
4451
31890f4a 4452 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
b411b363
PR
4453
4454 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4455
1d53f09e
LE
4456 if (get_ldev(mdev)) {
4457 drbd_rs_complete_io(mdev, sector);
4458 drbd_set_in_sync(mdev, sector, blksize);
4459 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4460 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4461 put_ldev(mdev);
4462 }
b411b363 4463 dec_rs_pending(mdev);
778f271d 4464 atomic_add(blksize >> 9, &mdev->rs_sect_in);
b411b363 4465
81e84650 4466 return true;
b411b363
PR
4467}
4468
bc9c5c41
AG
4469static int
4470validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4471 struct rb_root *root, const char *func,
4472 enum drbd_req_event what, bool missing_ok)
b411b363
PR
4473{
4474 struct drbd_request *req;
4475 struct bio_and_error m;
4476
87eeee41 4477 spin_lock_irq(&mdev->tconn->req_lock);
bc9c5c41 4478 req = find_request(mdev, root, id, sector, missing_ok, func);
b411b363 4479 if (unlikely(!req)) {
87eeee41 4480 spin_unlock_irq(&mdev->tconn->req_lock);
81e84650 4481 return false;
b411b363
PR
4482 }
4483 __req_mod(req, what, &m);
87eeee41 4484 spin_unlock_irq(&mdev->tconn->req_lock);
b411b363
PR
4485
4486 if (m.bio)
4487 complete_master_bio(mdev, &m);
81e84650 4488 return true;
b411b363
PR
4489}
4490
d8763023 4491static int got_BlockAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4492{
e6ef8a5c 4493 struct p_block_ack *p = mdev->tconn->meta.rbuf;
b411b363
PR
4494 sector_t sector = be64_to_cpu(p->sector);
4495 int blksize = be32_to_cpu(p->blksize);
4496 enum drbd_req_event what;
4497
4498 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4499
579b57ed 4500 if (p->block_id == ID_SYNCER) {
b411b363
PR
4501 drbd_set_in_sync(mdev, sector, blksize);
4502 dec_rs_pending(mdev);
81e84650 4503 return true;
b411b363 4504 }
257d0af6 4505 switch (cmd) {
b411b363 4506 case P_RS_WRITE_ACK:
89e58e75 4507 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
8554df1c 4508 what = WRITE_ACKED_BY_PEER_AND_SIS;
b411b363
PR
4509 break;
4510 case P_WRITE_ACK:
89e58e75 4511 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
8554df1c 4512 what = WRITE_ACKED_BY_PEER;
b411b363
PR
4513 break;
4514 case P_RECV_ACK:
89e58e75 4515 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B);
8554df1c 4516 what = RECV_ACKED_BY_PEER;
b411b363 4517 break;
7be8da07 4518 case P_DISCARD_WRITE:
89e58e75 4519 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
7be8da07
AG
4520 what = DISCARD_WRITE;
4521 break;
4522 case P_RETRY_WRITE:
4523 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4524 what = POSTPONE_WRITE;
b411b363
PR
4525 break;
4526 default:
4527 D_ASSERT(0);
81e84650 4528 return false;
b411b363
PR
4529 }
4530
4531 return validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41
AG
4532 &mdev->write_requests, __func__,
4533 what, false);
b411b363
PR
4534}
4535
d8763023 4536static int got_NegAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4537{
e6ef8a5c 4538 struct p_block_ack *p = mdev->tconn->meta.rbuf;
b411b363 4539 sector_t sector = be64_to_cpu(p->sector);
2deb8336 4540 int size = be32_to_cpu(p->blksize);
89e58e75
PR
4541 bool missing_ok = mdev->tconn->net_conf->wire_protocol == DRBD_PROT_A ||
4542 mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B;
c3afd8f5 4543 bool found;
b411b363
PR
4544
4545 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4546
579b57ed 4547 if (p->block_id == ID_SYNCER) {
b411b363
PR
4548 dec_rs_pending(mdev);
4549 drbd_rs_failed_io(mdev, sector, size);
81e84650 4550 return true;
b411b363 4551 }
2deb8336 4552
c3afd8f5 4553 found = validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41 4554 &mdev->write_requests, __func__,
8554df1c 4555 NEG_ACKED, missing_ok);
c3afd8f5
AG
4556 if (!found) {
4557 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4558 The master bio might already be completed, therefore the
4559 request is no longer in the collision hash. */
4560 /* In Protocol B we might already have got a P_RECV_ACK
4561 but then get a P_NEG_ACK afterwards. */
4562 if (!missing_ok)
2deb8336 4563 return false;
c3afd8f5 4564 drbd_set_out_of_sync(mdev, sector, size);
2deb8336 4565 }
2deb8336 4566 return true;
b411b363
PR
4567}
4568
d8763023 4569static int got_NegDReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4570{
e6ef8a5c 4571 struct p_block_ack *p = mdev->tconn->meta.rbuf;
b411b363
PR
4572 sector_t sector = be64_to_cpu(p->sector);
4573
4574 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
7be8da07 4575
b411b363
PR
4576 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4577 (unsigned long long)sector, be32_to_cpu(p->blksize));
4578
4579 return validate_req_change_req_state(mdev, p->block_id, sector,
bc9c5c41 4580 &mdev->read_requests, __func__,
8554df1c 4581 NEG_ACKED, false);
b411b363
PR
4582}
4583
d8763023 4584static int got_NegRSDReply(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363
PR
4585{
4586 sector_t sector;
4587 int size;
e6ef8a5c 4588 struct p_block_ack *p = mdev->tconn->meta.rbuf;
b411b363
PR
4589
4590 sector = be64_to_cpu(p->sector);
4591 size = be32_to_cpu(p->blksize);
b411b363
PR
4592
4593 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4594
4595 dec_rs_pending(mdev);
4596
4597 if (get_ldev_if_state(mdev, D_FAILED)) {
4598 drbd_rs_complete_io(mdev, sector);
257d0af6 4599 switch (cmd) {
d612d309
PR
4600 case P_NEG_RS_DREPLY:
4601 drbd_rs_failed_io(mdev, sector, size);
4602 case P_RS_CANCEL:
4603 break;
4604 default:
4605 D_ASSERT(0);
4606 put_ldev(mdev);
4607 return false;
4608 }
b411b363
PR
4609 put_ldev(mdev);
4610 }
4611
81e84650 4612 return true;
b411b363
PR
4613}
4614
d8763023 4615static int got_BarrierAck(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4616{
e6ef8a5c 4617 struct p_barrier_ack *p = mdev->tconn->meta.rbuf;
b411b363 4618
2f5cdd0b 4619 tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
b411b363 4620
c4752ef1
PR
4621 if (mdev->state.conn == C_AHEAD &&
4622 atomic_read(&mdev->ap_in_flight) == 0 &&
370a43e7
PR
4623 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4624 mdev->start_resync_timer.expires = jiffies + HZ;
4625 add_timer(&mdev->start_resync_timer);
c4752ef1
PR
4626 }
4627
81e84650 4628 return true;
b411b363
PR
4629}
4630
d8763023 4631static int got_OVResult(struct drbd_conf *mdev, enum drbd_packet cmd)
b411b363 4632{
e6ef8a5c 4633 struct p_block_ack *p = mdev->tconn->meta.rbuf;
b411b363
PR
4634 struct drbd_work *w;
4635 sector_t sector;
4636 int size;
4637
4638 sector = be64_to_cpu(p->sector);
4639 size = be32_to_cpu(p->blksize);
4640
4641 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4642
4643 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
8f7bed77 4644 drbd_ov_out_of_sync_found(mdev, sector, size);
b411b363 4645 else
8f7bed77 4646 ov_out_of_sync_print(mdev);
b411b363 4647
1d53f09e 4648 if (!get_ldev(mdev))
81e84650 4649 return true;
1d53f09e 4650
b411b363
PR
4651 drbd_rs_complete_io(mdev, sector);
4652 dec_rs_pending(mdev);
4653
ea5442af
LE
4654 --mdev->ov_left;
4655
4656 /* let's advance progress step marks only for every other megabyte */
4657 if ((mdev->ov_left & 0x200) == 0x200)
4658 drbd_advance_rs_marks(mdev, mdev->ov_left);
4659
4660 if (mdev->ov_left == 0) {
b411b363
PR
4661 w = kmalloc(sizeof(*w), GFP_NOIO);
4662 if (w) {
4663 w->cb = w_ov_finished;
a21e9298 4664 w->mdev = mdev;
e42325a5 4665 drbd_queue_work_front(&mdev->tconn->data.work, w);
b411b363
PR
4666 } else {
4667 dev_err(DEV, "kmalloc(w) failed.");
8f7bed77 4668 ov_out_of_sync_print(mdev);
b411b363
PR
4669 drbd_resync_finished(mdev);
4670 }
4671 }
1d53f09e 4672 put_ldev(mdev);
81e84650 4673 return true;
b411b363
PR
4674}
4675
d8763023 4676static int got_skip(struct drbd_conf *mdev, enum drbd_packet cmd)
0ced55a3 4677{
81e84650 4678 return true;
0ced55a3
PR
4679}
4680
32862ec7
PR
4681static int tconn_process_done_ee(struct drbd_tconn *tconn)
4682{
082a3439
PR
4683 struct drbd_conf *mdev;
4684 int i, not_empty = 0;
32862ec7
PR
4685
4686 do {
4687 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4688 flush_signals(current);
082a3439 4689 idr_for_each_entry(&tconn->volumes, mdev, i) {
e2b3032b 4690 if (drbd_process_done_ee(mdev))
082a3439
PR
4691 return 1; /* error */
4692 }
32862ec7 4693 set_bit(SIGNAL_ASENDER, &tconn->flags);
082a3439
PR
4694
4695 spin_lock_irq(&tconn->req_lock);
4696 idr_for_each_entry(&tconn->volumes, mdev, i) {
4697 not_empty = !list_empty(&mdev->done_ee);
4698 if (not_empty)
4699 break;
4700 }
4701 spin_unlock_irq(&tconn->req_lock);
32862ec7
PR
4702 } while (not_empty);
4703
4704 return 0;
4705}
4706
7201b972
AG
4707struct asender_cmd {
4708 size_t pkt_size;
a4fbda8e
PR
4709 enum mdev_or_conn fa_type; /* first argument's type */
4710 union {
4711 int (*mdev_fn)(struct drbd_conf *mdev, enum drbd_packet cmd);
4712 int (*conn_fn)(struct drbd_tconn *tconn, enum drbd_packet cmd);
4713 };
7201b972
AG
4714};
4715
4716static struct asender_cmd asender_tbl[] = {
f19e4f8b
PR
4717 [P_PING] = { sizeof(struct p_header), CONN, { .conn_fn = got_Ping } },
4718 [P_PING_ACK] = { sizeof(struct p_header), CONN, { .conn_fn = got_PingAck } },
a4fbda8e
PR
4719 [P_RECV_ACK] = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
4720 [P_WRITE_ACK] = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
4721 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
4722 [P_DISCARD_WRITE] = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
4723 [P_NEG_ACK] = { sizeof(struct p_block_ack), MDEV, { got_NegAck } },
4724 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), MDEV, { got_NegDReply } },
4725 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), MDEV, { got_NegRSDReply } },
4726 [P_OV_RESULT] = { sizeof(struct p_block_ack), MDEV, { got_OVResult } },
4727 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), MDEV, { got_BarrierAck } },
4728 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), MDEV, { got_RqSReply } },
4729 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), MDEV, { got_IsInSync } },
4730 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), MDEV, { got_skip } },
4731 [P_RS_CANCEL] = { sizeof(struct p_block_ack), MDEV, { got_NegRSDReply } },
e4f78ede 4732 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), CONN, {.conn_fn = got_conn_RqSReply}},
a4fbda8e 4733 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
7201b972
AG
4734};
4735
b411b363
PR
4736int drbd_asender(struct drbd_thread *thi)
4737{
392c8801 4738 struct drbd_tconn *tconn = thi->tconn;
e6ef8a5c 4739 struct p_header *h = tconn->meta.rbuf;
b411b363 4740 struct asender_cmd *cmd = NULL;
77351055 4741 struct packet_info pi;
257d0af6 4742 int rv;
b411b363
PR
4743 void *buf = h;
4744 int received = 0;
257d0af6 4745 int expect = sizeof(struct p_header);
f36af18c 4746 int ping_timeout_active = 0;
b411b363 4747
b411b363
PR
4748 current->policy = SCHED_RR; /* Make this a realtime task! */
4749 current->rt_priority = 2; /* more important than all other tasks */
4750
e77a0a5c 4751 while (get_t_state(thi) == RUNNING) {
80822284 4752 drbd_thread_current_set_cpu(thi);
32862ec7 4753 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
2a67d8b9 4754 if (!drbd_send_ping(tconn)) {
32862ec7 4755 conn_err(tconn, "drbd_send_ping has failed\n");
841ce241
AG
4756 goto reconnect;
4757 }
32862ec7
PR
4758 tconn->meta.socket->sk->sk_rcvtimeo =
4759 tconn->net_conf->ping_timeo*HZ/10;
f36af18c 4760 ping_timeout_active = 1;
b411b363
PR
4761 }
4762
32862ec7
PR
4763 /* TODO: conditionally cork; it may hurt latency if we cork without
4764 much to send */
4765 if (!tconn->net_conf->no_cork)
4766 drbd_tcp_cork(tconn->meta.socket);
082a3439
PR
4767 if (tconn_process_done_ee(tconn)) {
4768 conn_err(tconn, "tconn_process_done_ee() failed\n");
32862ec7 4769 goto reconnect;
082a3439 4770 }
b411b363 4771 /* but unconditionally uncork unless disabled */
32862ec7
PR
4772 if (!tconn->net_conf->no_cork)
4773 drbd_tcp_uncork(tconn->meta.socket);
b411b363
PR
4774
4775 /* short circuit, recv_msg would return EINTR anyways. */
4776 if (signal_pending(current))
4777 continue;
4778
32862ec7
PR
4779 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
4780 clear_bit(SIGNAL_ASENDER, &tconn->flags);
b411b363
PR
4781
4782 flush_signals(current);
4783
4784 /* Note:
4785 * -EINTR (on meta) we got a signal
4786 * -EAGAIN (on meta) rcvtimeo expired
4787 * -ECONNRESET other side closed the connection
4788 * -ERESTARTSYS (on data) we got a signal
4789 * rv < 0 other than above: unexpected error!
4790 * rv == expected: full header or command
4791 * rv < expected: "woken" by signal during receive
4792 * rv == 0 : "connection shut down by peer"
4793 */
4794 if (likely(rv > 0)) {
4795 received += rv;
4796 buf += rv;
4797 } else if (rv == 0) {
32862ec7 4798 conn_err(tconn, "meta connection shut down by peer.\n");
b411b363
PR
4799 goto reconnect;
4800 } else if (rv == -EAGAIN) {
cb6518cb
LE
4801 /* If the data socket received something meanwhile,
4802 * that is good enough: peer is still alive. */
32862ec7
PR
4803 if (time_after(tconn->last_received,
4804 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
cb6518cb 4805 continue;
f36af18c 4806 if (ping_timeout_active) {
32862ec7 4807 conn_err(tconn, "PingAck did not arrive in time.\n");
b411b363
PR
4808 goto reconnect;
4809 }
32862ec7 4810 set_bit(SEND_PING, &tconn->flags);
b411b363
PR
4811 continue;
4812 } else if (rv == -EINTR) {
4813 continue;
4814 } else {
32862ec7 4815 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
b411b363
PR
4816 goto reconnect;
4817 }
4818
4819 if (received == expect && cmd == NULL) {
8172f3e9 4820 if (decode_header(tconn, h, &pi))
b411b363 4821 goto reconnect;
7201b972
AG
4822 cmd = &asender_tbl[pi.cmd];
4823 if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd) {
32862ec7 4824 conn_err(tconn, "unknown command %d on meta (l: %d)\n",
77351055 4825 pi.cmd, pi.size);
b411b363
PR
4826 goto disconnect;
4827 }
4828 expect = cmd->pkt_size;
77351055 4829 if (pi.size != expect - sizeof(struct p_header)) {
32862ec7 4830 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
77351055 4831 pi.cmd, pi.size);
b411b363 4832 goto reconnect;
257d0af6 4833 }
b411b363
PR
4834 }
4835 if (received == expect) {
a4fbda8e
PR
4836 bool rv;
4837
4838 if (cmd->fa_type == CONN) {
4839 rv = cmd->conn_fn(tconn, pi.cmd);
4840 } else {
4841 struct drbd_conf *mdev = vnr_to_mdev(tconn, pi.vnr);
4842 rv = cmd->mdev_fn(mdev, pi.cmd);
4843 }
4844
4845 if (!rv)
b411b363
PR
4846 goto reconnect;
4847
a4fbda8e
PR
4848 tconn->last_received = jiffies;
4849
f36af18c
LE
4850 /* the idle_timeout (ping-int)
4851 * has been restored in got_PingAck() */
7201b972 4852 if (cmd == &asender_tbl[P_PING_ACK])
f36af18c
LE
4853 ping_timeout_active = 0;
4854
b411b363
PR
4855 buf = h;
4856 received = 0;
257d0af6 4857 expect = sizeof(struct p_header);
b411b363
PR
4858 cmd = NULL;
4859 }
4860 }
4861
4862 if (0) {
4863reconnect:
bbeb641c 4864 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
b411b363
PR
4865 }
4866 if (0) {
4867disconnect:
bbeb641c 4868 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363 4869 }
32862ec7 4870 clear_bit(SIGNAL_ASENDER, &tconn->flags);
b411b363 4871
32862ec7 4872 conn_info(tconn, "asender terminated\n");
b411b363
PR
4873
4874 return 0;
4875}
This page took 0.420872 seconds and 5 git commands to generate.