Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph...
[deliverable/linux.git] / drivers / block / drbd / drbd_receiver.c
CommitLineData
b411b363
PR
1/*
2 drbd_receiver.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
b411b363
PR
26#include <linux/module.h>
27
28#include <asm/uaccess.h>
29#include <net/sock.h>
30
b411b363
PR
31#include <linux/drbd.h>
32#include <linux/fs.h>
33#include <linux/file.h>
34#include <linux/in.h>
35#include <linux/mm.h>
36#include <linux/memcontrol.h>
37#include <linux/mm_inline.h>
38#include <linux/slab.h>
b411b363
PR
39#include <linux/pkt_sched.h>
40#define __KERNEL_SYSCALLS__
41#include <linux/unistd.h>
42#include <linux/vmalloc.h>
43#include <linux/random.h>
b411b363
PR
44#include <linux/string.h>
45#include <linux/scatterlist.h>
46#include "drbd_int.h"
b411b363
PR
47#include "drbd_req.h"
48
49#include "drbd_vli.h"
50
b411b363
PR
51enum finish_epoch {
52 FE_STILL_LIVE,
53 FE_DESTROYED,
54 FE_RECYCLED,
55};
56
57static int drbd_do_handshake(struct drbd_conf *mdev);
58static int drbd_do_auth(struct drbd_conf *mdev);
59
60static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
61static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
62
b411b363
PR
63
64#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
65
45bb912b
LE
66/*
67 * some helper functions to deal with single linked page lists,
68 * page->private being our "next" pointer.
69 */
70
71/* If at least n pages are linked at head, get n pages off.
72 * Otherwise, don't modify head, and return NULL.
73 * Locking is the responsibility of the caller.
74 */
75static struct page *page_chain_del(struct page **head, int n)
76{
77 struct page *page;
78 struct page *tmp;
79
80 BUG_ON(!n);
81 BUG_ON(!head);
82
83 page = *head;
23ce4227
PR
84
85 if (!page)
86 return NULL;
87
45bb912b
LE
88 while (page) {
89 tmp = page_chain_next(page);
90 if (--n == 0)
91 break; /* found sufficient pages */
92 if (tmp == NULL)
93 /* insufficient pages, don't use any of them. */
94 return NULL;
95 page = tmp;
96 }
97
98 /* add end of list marker for the returned list */
99 set_page_private(page, 0);
100 /* actual return value, and adjustment of head */
101 page = *head;
102 *head = tmp;
103 return page;
104}
105
106/* may be used outside of locks to find the tail of a (usually short)
107 * "private" page chain, before adding it back to a global chain head
108 * with page_chain_add() under a spinlock. */
109static struct page *page_chain_tail(struct page *page, int *len)
110{
111 struct page *tmp;
112 int i = 1;
113 while ((tmp = page_chain_next(page)))
114 ++i, page = tmp;
115 if (len)
116 *len = i;
117 return page;
118}
119
120static int page_chain_free(struct page *page)
121{
122 struct page *tmp;
123 int i = 0;
124 page_chain_for_each_safe(page, tmp) {
125 put_page(page);
126 ++i;
127 }
128 return i;
129}
130
131static void page_chain_add(struct page **head,
132 struct page *chain_first, struct page *chain_last)
133{
134#if 1
135 struct page *tmp;
136 tmp = page_chain_tail(chain_first, NULL);
137 BUG_ON(tmp != chain_last);
138#endif
139
140 /* add chain to head */
141 set_page_private(chain_last, (unsigned long)*head);
142 *head = chain_first;
143}
144
145static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
b411b363
PR
146{
147 struct page *page = NULL;
45bb912b
LE
148 struct page *tmp = NULL;
149 int i = 0;
b411b363
PR
150
151 /* Yes, testing drbd_pp_vacant outside the lock is racy.
152 * So what. It saves a spin_lock. */
45bb912b 153 if (drbd_pp_vacant >= number) {
b411b363 154 spin_lock(&drbd_pp_lock);
45bb912b
LE
155 page = page_chain_del(&drbd_pp_pool, number);
156 if (page)
157 drbd_pp_vacant -= number;
b411b363 158 spin_unlock(&drbd_pp_lock);
45bb912b
LE
159 if (page)
160 return page;
b411b363 161 }
45bb912b 162
b411b363
PR
163 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
164 * "criss-cross" setup, that might cause write-out on some other DRBD,
165 * which in turn might block on the other node at this very place. */
45bb912b
LE
166 for (i = 0; i < number; i++) {
167 tmp = alloc_page(GFP_TRY);
168 if (!tmp)
169 break;
170 set_page_private(tmp, (unsigned long)page);
171 page = tmp;
172 }
173
174 if (i == number)
175 return page;
176
177 /* Not enough pages immediately available this time.
178 * No need to jump around here, drbd_pp_alloc will retry this
179 * function "soon". */
180 if (page) {
181 tmp = page_chain_tail(page, NULL);
182 spin_lock(&drbd_pp_lock);
183 page_chain_add(&drbd_pp_pool, page, tmp);
184 drbd_pp_vacant += i;
185 spin_unlock(&drbd_pp_lock);
186 }
187 return NULL;
b411b363
PR
188}
189
b411b363
PR
190static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
191{
192 struct drbd_epoch_entry *e;
193 struct list_head *le, *tle;
194
195 /* The EEs are always appended to the end of the list. Since
196 they are sent in order over the wire, they have to finish
197 in order. As soon as we see the first not finished we can
198 stop to examine the list... */
199
200 list_for_each_safe(le, tle, &mdev->net_ee) {
201 e = list_entry(le, struct drbd_epoch_entry, w.list);
45bb912b 202 if (drbd_ee_has_active_page(e))
b411b363
PR
203 break;
204 list_move(le, to_be_freed);
205 }
206}
207
208static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
209{
210 LIST_HEAD(reclaimed);
211 struct drbd_epoch_entry *e, *t;
212
b411b363
PR
213 spin_lock_irq(&mdev->req_lock);
214 reclaim_net_ee(mdev, &reclaimed);
215 spin_unlock_irq(&mdev->req_lock);
216
217 list_for_each_entry_safe(e, t, &reclaimed, w.list)
435f0740 218 drbd_free_net_ee(mdev, e);
b411b363
PR
219}
220
221/**
45bb912b 222 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
b411b363 223 * @mdev: DRBD device.
45bb912b
LE
224 * @number: number of pages requested
225 * @retry: whether to retry, if not enough pages are available right now
226 *
227 * Tries to allocate number pages, first from our own page pool, then from
228 * the kernel, unless this allocation would exceed the max_buffers setting.
229 * Possibly retry until DRBD frees sufficient pages somewhere else.
b411b363 230 *
45bb912b 231 * Returns a page chain linked via page->private.
b411b363 232 */
45bb912b 233static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
b411b363
PR
234{
235 struct page *page = NULL;
236 DEFINE_WAIT(wait);
237
45bb912b
LE
238 /* Yes, we may run up to @number over max_buffers. If we
239 * follow it strictly, the admin will get it wrong anyways. */
240 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
241 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363 242
45bb912b 243 while (page == NULL) {
b411b363
PR
244 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
245
246 drbd_kick_lo_and_reclaim_net(mdev);
247
248 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
45bb912b 249 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363
PR
250 if (page)
251 break;
252 }
253
254 if (!retry)
255 break;
256
257 if (signal_pending(current)) {
258 dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
259 break;
260 }
261
262 schedule();
263 }
264 finish_wait(&drbd_pp_wait, &wait);
265
45bb912b
LE
266 if (page)
267 atomic_add(number, &mdev->pp_in_use);
b411b363
PR
268 return page;
269}
270
271/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
45bb912b
LE
272 * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
273 * Either links the page chain back to the global pool,
274 * or returns all pages to the system. */
435f0740 275static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
b411b363 276{
435f0740 277 atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
b411b363 278 int i;
435f0740 279
a73ff323
LE
280 if (page == NULL)
281 return;
282
1816a2b4 283 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE)*minor_count)
45bb912b
LE
284 i = page_chain_free(page);
285 else {
286 struct page *tmp;
287 tmp = page_chain_tail(page, &i);
288 spin_lock(&drbd_pp_lock);
289 page_chain_add(&drbd_pp_pool, page, tmp);
290 drbd_pp_vacant += i;
291 spin_unlock(&drbd_pp_lock);
b411b363 292 }
435f0740 293 i = atomic_sub_return(i, a);
45bb912b 294 if (i < 0)
435f0740
LE
295 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
296 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
b411b363
PR
297 wake_up(&drbd_pp_wait);
298}
299
300/*
301You need to hold the req_lock:
302 _drbd_wait_ee_list_empty()
303
304You must not have the req_lock:
305 drbd_free_ee()
306 drbd_alloc_ee()
307 drbd_init_ee()
308 drbd_release_ee()
309 drbd_ee_fix_bhs()
310 drbd_process_done_ee()
311 drbd_clear_done_ee()
312 drbd_wait_ee_list_empty()
313*/
314
315struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
316 u64 id,
317 sector_t sector,
318 unsigned int data_size,
319 gfp_t gfp_mask) __must_hold(local)
320{
b411b363 321 struct drbd_epoch_entry *e;
a73ff323 322 struct page *page = NULL;
45bb912b 323 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
b411b363 324
0cf9d27e 325 if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
b411b363
PR
326 return NULL;
327
328 e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
329 if (!e) {
330 if (!(gfp_mask & __GFP_NOWARN))
331 dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
332 return NULL;
333 }
334
a73ff323
LE
335 if (data_size) {
336 page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
337 if (!page)
338 goto fail;
339 }
b411b363 340
24c4830c 341 INIT_HLIST_NODE(&e->collision);
b411b363 342 e->epoch = NULL;
45bb912b
LE
343 e->mdev = mdev;
344 e->pages = page;
345 atomic_set(&e->pending_bios, 0);
346 e->size = data_size;
b411b363 347 e->flags = 0;
45bb912b 348 e->sector = sector;
45bb912b 349 e->block_id = id;
b411b363 350
b411b363
PR
351 return e;
352
45bb912b 353 fail:
b411b363 354 mempool_free(e, drbd_ee_mempool);
b411b363
PR
355 return NULL;
356}
357
435f0740 358void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e, int is_net)
b411b363 359{
c36c3ced
LE
360 if (e->flags & EE_HAS_DIGEST)
361 kfree(e->digest);
435f0740 362 drbd_pp_free(mdev, e->pages, is_net);
45bb912b 363 D_ASSERT(atomic_read(&e->pending_bios) == 0);
24c4830c 364 D_ASSERT(hlist_unhashed(&e->collision));
b411b363
PR
365 mempool_free(e, drbd_ee_mempool);
366}
367
368int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
369{
370 LIST_HEAD(work_list);
371 struct drbd_epoch_entry *e, *t;
372 int count = 0;
435f0740 373 int is_net = list == &mdev->net_ee;
b411b363
PR
374
375 spin_lock_irq(&mdev->req_lock);
376 list_splice_init(list, &work_list);
377 spin_unlock_irq(&mdev->req_lock);
378
379 list_for_each_entry_safe(e, t, &work_list, w.list) {
435f0740 380 drbd_free_some_ee(mdev, e, is_net);
b411b363
PR
381 count++;
382 }
383 return count;
384}
385
386
387/*
388 * This function is called from _asender only_
389 * but see also comments in _req_mod(,barrier_acked)
390 * and receive_Barrier.
391 *
392 * Move entries from net_ee to done_ee, if ready.
393 * Grab done_ee, call all callbacks, free the entries.
394 * The callbacks typically send out ACKs.
395 */
396static int drbd_process_done_ee(struct drbd_conf *mdev)
397{
398 LIST_HEAD(work_list);
399 LIST_HEAD(reclaimed);
400 struct drbd_epoch_entry *e, *t;
401 int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
402
403 spin_lock_irq(&mdev->req_lock);
404 reclaim_net_ee(mdev, &reclaimed);
405 list_splice_init(&mdev->done_ee, &work_list);
406 spin_unlock_irq(&mdev->req_lock);
407
408 list_for_each_entry_safe(e, t, &reclaimed, w.list)
435f0740 409 drbd_free_net_ee(mdev, e);
b411b363
PR
410
411 /* possible callbacks here:
412 * e_end_block, and e_end_resync_block, e_send_discard_ack.
413 * all ignore the last argument.
414 */
415 list_for_each_entry_safe(e, t, &work_list, w.list) {
b411b363
PR
416 /* list_del not necessary, next/prev members not touched */
417 ok = e->w.cb(mdev, &e->w, !ok) && ok;
418 drbd_free_ee(mdev, e);
419 }
420 wake_up(&mdev->ee_wait);
421
422 return ok;
423}
424
425void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
426{
427 DEFINE_WAIT(wait);
428
429 /* avoids spin_lock/unlock
430 * and calling prepare_to_wait in the fast path */
431 while (!list_empty(head)) {
432 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
433 spin_unlock_irq(&mdev->req_lock);
7eaceacc 434 io_schedule();
b411b363
PR
435 finish_wait(&mdev->ee_wait, &wait);
436 spin_lock_irq(&mdev->req_lock);
437 }
438}
439
440void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
441{
442 spin_lock_irq(&mdev->req_lock);
443 _drbd_wait_ee_list_empty(mdev, head);
444 spin_unlock_irq(&mdev->req_lock);
445}
446
447/* see also kernel_accept; which is only present since 2.6.18.
448 * also we want to log which part of it failed, exactly */
449static int drbd_accept(struct drbd_conf *mdev, const char **what,
450 struct socket *sock, struct socket **newsock)
451{
452 struct sock *sk = sock->sk;
453 int err = 0;
454
455 *what = "listen";
456 err = sock->ops->listen(sock, 5);
457 if (err < 0)
458 goto out;
459
460 *what = "sock_create_lite";
461 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
462 newsock);
463 if (err < 0)
464 goto out;
465
466 *what = "accept";
467 err = sock->ops->accept(sock, *newsock, 0);
468 if (err < 0) {
469 sock_release(*newsock);
470 *newsock = NULL;
471 goto out;
472 }
473 (*newsock)->ops = sock->ops;
47a4f1c1 474 __module_get((*newsock)->ops->owner);
b411b363
PR
475
476out:
477 return err;
478}
479
480static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
481 void *buf, size_t size, int flags)
482{
483 mm_segment_t oldfs;
484 struct kvec iov = {
485 .iov_base = buf,
486 .iov_len = size,
487 };
488 struct msghdr msg = {
489 .msg_iovlen = 1,
490 .msg_iov = (struct iovec *)&iov,
491 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
492 };
493 int rv;
494
495 oldfs = get_fs();
496 set_fs(KERNEL_DS);
497 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
498 set_fs(oldfs);
499
500 return rv;
501}
502
503static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
504{
505 mm_segment_t oldfs;
506 struct kvec iov = {
507 .iov_base = buf,
508 .iov_len = size,
509 };
510 struct msghdr msg = {
511 .msg_iovlen = 1,
512 .msg_iov = (struct iovec *)&iov,
513 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
514 };
515 int rv;
516
517 oldfs = get_fs();
518 set_fs(KERNEL_DS);
519
520 for (;;) {
521 rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
522 if (rv == size)
523 break;
524
525 /* Note:
526 * ECONNRESET other side closed the connection
527 * ERESTARTSYS (on sock) we got a signal
528 */
529
530 if (rv < 0) {
531 if (rv == -ECONNRESET)
532 dev_info(DEV, "sock was reset by peer\n");
533 else if (rv != -ERESTARTSYS)
534 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
535 break;
536 } else if (rv == 0) {
537 dev_info(DEV, "sock was shut down by peer\n");
538 break;
539 } else {
540 /* signal came in, or peer/link went down,
541 * after we read a partial message
542 */
543 /* D_ASSERT(signal_pending(current)); */
544 break;
545 }
546 };
547
548 set_fs(oldfs);
549
550 if (rv != size)
551 drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
552
553 return rv;
554}
555
5dbf1673
LE
556/* quoting tcp(7):
557 * On individual connections, the socket buffer size must be set prior to the
558 * listen(2) or connect(2) calls in order to have it take effect.
559 * This is our wrapper to do so.
560 */
561static void drbd_setbufsize(struct socket *sock, unsigned int snd,
562 unsigned int rcv)
563{
564 /* open coded SO_SNDBUF, SO_RCVBUF */
565 if (snd) {
566 sock->sk->sk_sndbuf = snd;
567 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
568 }
569 if (rcv) {
570 sock->sk->sk_rcvbuf = rcv;
571 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
572 }
573}
574
b411b363
PR
575static struct socket *drbd_try_connect(struct drbd_conf *mdev)
576{
577 const char *what;
578 struct socket *sock;
579 struct sockaddr_in6 src_in6;
580 int err;
581 int disconnect_on_error = 1;
582
583 if (!get_net_conf(mdev))
584 return NULL;
585
586 what = "sock_create_kern";
587 err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
588 SOCK_STREAM, IPPROTO_TCP, &sock);
589 if (err < 0) {
590 sock = NULL;
591 goto out;
592 }
593
594 sock->sk->sk_rcvtimeo =
595 sock->sk->sk_sndtimeo = mdev->net_conf->try_connect_int*HZ;
5dbf1673
LE
596 drbd_setbufsize(sock, mdev->net_conf->sndbuf_size,
597 mdev->net_conf->rcvbuf_size);
b411b363
PR
598
599 /* explicitly bind to the configured IP as source IP
600 * for the outgoing connections.
601 * This is needed for multihomed hosts and to be
602 * able to use lo: interfaces for drbd.
603 * Make sure to use 0 as port number, so linux selects
604 * a free one dynamically.
605 */
606 memcpy(&src_in6, mdev->net_conf->my_addr,
607 min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
608 if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
609 src_in6.sin6_port = 0;
610 else
611 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
612
613 what = "bind before connect";
614 err = sock->ops->bind(sock,
615 (struct sockaddr *) &src_in6,
616 mdev->net_conf->my_addr_len);
617 if (err < 0)
618 goto out;
619
620 /* connect may fail, peer not yet available.
621 * stay C_WF_CONNECTION, don't go Disconnecting! */
622 disconnect_on_error = 0;
623 what = "connect";
624 err = sock->ops->connect(sock,
625 (struct sockaddr *)mdev->net_conf->peer_addr,
626 mdev->net_conf->peer_addr_len, 0);
627
628out:
629 if (err < 0) {
630 if (sock) {
631 sock_release(sock);
632 sock = NULL;
633 }
634 switch (-err) {
635 /* timeout, busy, signal pending */
636 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
637 case EINTR: case ERESTARTSYS:
638 /* peer not (yet) available, network problem */
639 case ECONNREFUSED: case ENETUNREACH:
640 case EHOSTDOWN: case EHOSTUNREACH:
641 disconnect_on_error = 0;
642 break;
643 default:
644 dev_err(DEV, "%s failed, err = %d\n", what, err);
645 }
646 if (disconnect_on_error)
647 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
648 }
649 put_net_conf(mdev);
650 return sock;
651}
652
653static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
654{
655 int timeo, err;
656 struct socket *s_estab = NULL, *s_listen;
657 const char *what;
658
659 if (!get_net_conf(mdev))
660 return NULL;
661
662 what = "sock_create_kern";
663 err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
664 SOCK_STREAM, IPPROTO_TCP, &s_listen);
665 if (err) {
666 s_listen = NULL;
667 goto out;
668 }
669
670 timeo = mdev->net_conf->try_connect_int * HZ;
671 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
672
4a17fd52 673 s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
b411b363
PR
674 s_listen->sk->sk_rcvtimeo = timeo;
675 s_listen->sk->sk_sndtimeo = timeo;
5dbf1673
LE
676 drbd_setbufsize(s_listen, mdev->net_conf->sndbuf_size,
677 mdev->net_conf->rcvbuf_size);
b411b363
PR
678
679 what = "bind before listen";
680 err = s_listen->ops->bind(s_listen,
681 (struct sockaddr *) mdev->net_conf->my_addr,
682 mdev->net_conf->my_addr_len);
683 if (err < 0)
684 goto out;
685
686 err = drbd_accept(mdev, &what, s_listen, &s_estab);
687
688out:
689 if (s_listen)
690 sock_release(s_listen);
691 if (err < 0) {
692 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
693 dev_err(DEV, "%s failed, err = %d\n", what, err);
694 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
695 }
696 }
697 put_net_conf(mdev);
698
699 return s_estab;
700}
701
702static int drbd_send_fp(struct drbd_conf *mdev,
703 struct socket *sock, enum drbd_packets cmd)
704{
02918be2 705 struct p_header80 *h = &mdev->data.sbuf.header.h80;
b411b363
PR
706
707 return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
708}
709
710static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
711{
02918be2 712 struct p_header80 *h = &mdev->data.rbuf.header.h80;
b411b363
PR
713 int rr;
714
715 rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
716
717 if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
718 return be16_to_cpu(h->command);
719
720 return 0xffff;
721}
722
723/**
724 * drbd_socket_okay() - Free the socket if its connection is not okay
725 * @mdev: DRBD device.
726 * @sock: pointer to the pointer to the socket.
727 */
728static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
729{
730 int rr;
731 char tb[4];
732
733 if (!*sock)
81e84650 734 return false;
b411b363
PR
735
736 rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
737
738 if (rr > 0 || rr == -EAGAIN) {
81e84650 739 return true;
b411b363
PR
740 } else {
741 sock_release(*sock);
742 *sock = NULL;
81e84650 743 return false;
b411b363
PR
744 }
745}
746
747/*
748 * return values:
749 * 1 yes, we have a valid connection
750 * 0 oops, did not work out, please try again
751 * -1 peer talks different language,
752 * no point in trying again, please go standalone.
753 * -2 We do not have a network config...
754 */
755static int drbd_connect(struct drbd_conf *mdev)
756{
757 struct socket *s, *sock, *msock;
758 int try, h, ok;
197296ff 759 enum drbd_state_rv rv;
b411b363
PR
760
761 D_ASSERT(!mdev->data.socket);
762
b411b363
PR
763 if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
764 return -2;
765
766 clear_bit(DISCARD_CONCURRENT, &mdev->flags);
767
768 sock = NULL;
769 msock = NULL;
770
771 do {
772 for (try = 0;;) {
773 /* 3 tries, this should take less than a second! */
774 s = drbd_try_connect(mdev);
775 if (s || ++try >= 3)
776 break;
777 /* give the other side time to call bind() & listen() */
20ee6390 778 schedule_timeout_interruptible(HZ / 10);
b411b363
PR
779 }
780
781 if (s) {
782 if (!sock) {
783 drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
784 sock = s;
785 s = NULL;
786 } else if (!msock) {
787 drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
788 msock = s;
789 s = NULL;
790 } else {
791 dev_err(DEV, "Logic error in drbd_connect()\n");
792 goto out_release_sockets;
793 }
794 }
795
796 if (sock && msock) {
a8e40792 797 schedule_timeout_interruptible(mdev->net_conf->ping_timeo*HZ/10);
b411b363
PR
798 ok = drbd_socket_okay(mdev, &sock);
799 ok = drbd_socket_okay(mdev, &msock) && ok;
800 if (ok)
801 break;
802 }
803
804retry:
805 s = drbd_wait_for_connect(mdev);
806 if (s) {
807 try = drbd_recv_fp(mdev, s);
808 drbd_socket_okay(mdev, &sock);
809 drbd_socket_okay(mdev, &msock);
810 switch (try) {
811 case P_HAND_SHAKE_S:
812 if (sock) {
813 dev_warn(DEV, "initial packet S crossed\n");
814 sock_release(sock);
815 }
816 sock = s;
817 break;
818 case P_HAND_SHAKE_M:
819 if (msock) {
820 dev_warn(DEV, "initial packet M crossed\n");
821 sock_release(msock);
822 }
823 msock = s;
824 set_bit(DISCARD_CONCURRENT, &mdev->flags);
825 break;
826 default:
827 dev_warn(DEV, "Error receiving initial packet\n");
828 sock_release(s);
829 if (random32() & 1)
830 goto retry;
831 }
832 }
833
834 if (mdev->state.conn <= C_DISCONNECTING)
835 goto out_release_sockets;
836 if (signal_pending(current)) {
837 flush_signals(current);
838 smp_rmb();
839 if (get_t_state(&mdev->receiver) == Exiting)
840 goto out_release_sockets;
841 }
842
843 if (sock && msock) {
844 ok = drbd_socket_okay(mdev, &sock);
845 ok = drbd_socket_okay(mdev, &msock) && ok;
846 if (ok)
847 break;
848 }
849 } while (1);
850
4a17fd52
PE
851 msock->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
852 sock->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
b411b363
PR
853
854 sock->sk->sk_allocation = GFP_NOIO;
855 msock->sk->sk_allocation = GFP_NOIO;
856
857 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
858 msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
859
b411b363
PR
860 /* NOT YET ...
861 * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
862 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
863 * first set it to the P_HAND_SHAKE timeout,
864 * which we set to 4x the configured ping_timeout. */
865 sock->sk->sk_sndtimeo =
866 sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
867
868 msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
869 msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
870
871 /* we don't want delays.
25985edc 872 * we use TCP_CORK where appropriate, though */
b411b363
PR
873 drbd_tcp_nodelay(sock);
874 drbd_tcp_nodelay(msock);
875
876 mdev->data.socket = sock;
877 mdev->meta.socket = msock;
878 mdev->last_received = jiffies;
879
880 D_ASSERT(mdev->asender.task == NULL);
881
882 h = drbd_do_handshake(mdev);
883 if (h <= 0)
884 return h;
885
886 if (mdev->cram_hmac_tfm) {
887 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
b10d96cb
JT
888 switch (drbd_do_auth(mdev)) {
889 case -1:
b411b363
PR
890 dev_err(DEV, "Authentication of peer failed\n");
891 return -1;
b10d96cb
JT
892 case 0:
893 dev_err(DEV, "Authentication of peer failed, trying again.\n");
894 return 0;
b411b363
PR
895 }
896 }
897
b411b363
PR
898 sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
899 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
900
901 atomic_set(&mdev->packet_seq, 0);
902 mdev->peer_seq = 0;
903
148efa16 904 if (drbd_send_protocol(mdev) == -1)
7e2455c1 905 return -1;
197296ff 906 set_bit(STATE_SENT, &mdev->flags);
b411b363 907 drbd_send_sync_param(mdev, &mdev->sync_conf);
e89b591c 908 drbd_send_sizes(mdev, 0, 0);
b411b363 909 drbd_send_uuids(mdev);
f479ea06 910 drbd_send_current_state(mdev);
b411b363
PR
911 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
912 clear_bit(RESIZE_PENDING, &mdev->flags);
1e86ac48 913
197296ff
PR
914 spin_lock_irq(&mdev->req_lock);
915 rv = _drbd_set_state(_NS(mdev, conn, C_WF_REPORT_PARAMS), CS_VERBOSE, NULL);
916 if (mdev->state.conn != C_WF_REPORT_PARAMS)
917 clear_bit(STATE_SENT, &mdev->flags);
918 spin_unlock_irq(&mdev->req_lock);
919
920 if (rv < SS_SUCCESS)
1e86ac48
PR
921 return 0;
922
923 drbd_thread_start(&mdev->asender);
7fde2be9 924 mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
b411b363
PR
925
926 return 1;
927
928out_release_sockets:
929 if (sock)
930 sock_release(sock);
931 if (msock)
932 sock_release(msock);
933 return -1;
934}
935
02918be2 936static int drbd_recv_header(struct drbd_conf *mdev, enum drbd_packets *cmd, unsigned int *packet_size)
b411b363 937{
02918be2 938 union p_header *h = &mdev->data.rbuf.header;
b411b363
PR
939 int r;
940
941 r = drbd_recv(mdev, h, sizeof(*h));
b411b363 942 if (unlikely(r != sizeof(*h))) {
0ddc5549
LE
943 if (!signal_pending(current))
944 dev_warn(DEV, "short read expecting header on sock: r=%d\n", r);
81e84650 945 return false;
02918be2
PR
946 }
947
948 if (likely(h->h80.magic == BE_DRBD_MAGIC)) {
949 *cmd = be16_to_cpu(h->h80.command);
950 *packet_size = be16_to_cpu(h->h80.length);
951 } else if (h->h95.magic == BE_DRBD_MAGIC_BIG) {
952 *cmd = be16_to_cpu(h->h95.command);
953 *packet_size = be32_to_cpu(h->h95.length);
954 } else {
004352fa
LE
955 dev_err(DEV, "magic?? on data m: 0x%08x c: %d l: %d\n",
956 be32_to_cpu(h->h80.magic),
957 be16_to_cpu(h->h80.command),
958 be16_to_cpu(h->h80.length));
81e84650 959 return false;
b411b363
PR
960 }
961 mdev->last_received = jiffies;
962
81e84650 963 return true;
b411b363
PR
964}
965
2451fc3b 966static void drbd_flush(struct drbd_conf *mdev)
b411b363
PR
967{
968 int rv;
969
970 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
fbd9b09a 971 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
dd3932ed 972 NULL);
b411b363 973 if (rv) {
ebd2b0cd 974 dev_info(DEV, "local disk flush failed with status %d\n", rv);
b411b363
PR
975 /* would rather check on EOPNOTSUPP, but that is not reliable.
976 * don't try again for ANY return value != 0
977 * if (rv == -EOPNOTSUPP) */
978 drbd_bump_write_ordering(mdev, WO_drain_io);
979 }
980 put_ldev(mdev);
981 }
b411b363
PR
982}
983
984/**
985 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
986 * @mdev: DRBD device.
987 * @epoch: Epoch object.
988 * @ev: Epoch event.
989 */
990static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
991 struct drbd_epoch *epoch,
992 enum epoch_event ev)
993{
2451fc3b 994 int epoch_size;
b411b363 995 struct drbd_epoch *next_epoch;
b411b363
PR
996 enum finish_epoch rv = FE_STILL_LIVE;
997
998 spin_lock(&mdev->epoch_lock);
999 do {
1000 next_epoch = NULL;
b411b363
PR
1001
1002 epoch_size = atomic_read(&epoch->epoch_size);
1003
1004 switch (ev & ~EV_CLEANUP) {
1005 case EV_PUT:
1006 atomic_dec(&epoch->active);
1007 break;
1008 case EV_GOT_BARRIER_NR:
1009 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
b411b363
PR
1010 break;
1011 case EV_BECAME_LAST:
1012 /* nothing to do*/
1013 break;
1014 }
1015
b411b363
PR
1016 if (epoch_size != 0 &&
1017 atomic_read(&epoch->active) == 0 &&
80f9fd55 1018 (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
b411b363
PR
1019 if (!(ev & EV_CLEANUP)) {
1020 spin_unlock(&mdev->epoch_lock);
1021 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1022 spin_lock(&mdev->epoch_lock);
1023 }
80f9fd55
PR
1024 if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1025 dec_unacked(mdev);
b411b363
PR
1026
1027 if (mdev->current_epoch != epoch) {
1028 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1029 list_del(&epoch->list);
1030 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1031 mdev->epochs--;
b411b363
PR
1032 kfree(epoch);
1033
1034 if (rv == FE_STILL_LIVE)
1035 rv = FE_DESTROYED;
1036 } else {
1037 epoch->flags = 0;
1038 atomic_set(&epoch->epoch_size, 0);
698f9315 1039 /* atomic_set(&epoch->active, 0); is already zero */
b411b363
PR
1040 if (rv == FE_STILL_LIVE)
1041 rv = FE_RECYCLED;
2451fc3b 1042 wake_up(&mdev->ee_wait);
b411b363
PR
1043 }
1044 }
1045
1046 if (!next_epoch)
1047 break;
1048
1049 epoch = next_epoch;
1050 } while (1);
1051
1052 spin_unlock(&mdev->epoch_lock);
1053
b411b363
PR
1054 return rv;
1055}
1056
1057/**
1058 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1059 * @mdev: DRBD device.
1060 * @wo: Write ordering method to try.
1061 */
1062void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1063{
1064 enum write_ordering_e pwo;
1065 static char *write_ordering_str[] = {
1066 [WO_none] = "none",
1067 [WO_drain_io] = "drain",
1068 [WO_bdev_flush] = "flush",
b411b363
PR
1069 };
1070
1071 pwo = mdev->write_ordering;
1072 wo = min(pwo, wo);
b411b363
PR
1073 if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1074 wo = WO_drain_io;
1075 if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1076 wo = WO_none;
1077 mdev->write_ordering = wo;
2451fc3b 1078 if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
b411b363
PR
1079 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1080}
1081
45bb912b
LE
1082/**
1083 * drbd_submit_ee()
1084 * @mdev: DRBD device.
1085 * @e: epoch entry
1086 * @rw: flag field, see bio->bi_rw
10f6d992
LE
1087 *
1088 * May spread the pages to multiple bios,
1089 * depending on bio_add_page restrictions.
1090 *
1091 * Returns 0 if all bios have been submitted,
1092 * -ENOMEM if we could not allocate enough bios,
1093 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1094 * single page to an empty bio (which should never happen and likely indicates
1095 * that the lower level IO stack is in some way broken). This has been observed
1096 * on certain Xen deployments.
45bb912b
LE
1097 */
1098/* TODO allocate from our own bio_set. */
1099int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1100 const unsigned rw, const int fault_type)
1101{
1102 struct bio *bios = NULL;
1103 struct bio *bio;
1104 struct page *page = e->pages;
1105 sector_t sector = e->sector;
1106 unsigned ds = e->size;
1107 unsigned n_bios = 0;
1108 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
10f6d992 1109 int err = -ENOMEM;
45bb912b
LE
1110
1111 /* In most cases, we will only need one bio. But in case the lower
1112 * level restrictions happen to be different at this offset on this
1113 * side than those of the sending peer, we may need to submit the
9476f39d
LE
1114 * request in more than one bio.
1115 *
1116 * Plain bio_alloc is good enough here, this is no DRBD internally
1117 * generated bio, but a bio allocated on behalf of the peer.
1118 */
45bb912b
LE
1119next_bio:
1120 bio = bio_alloc(GFP_NOIO, nr_pages);
1121 if (!bio) {
1122 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1123 goto fail;
1124 }
1125 /* > e->sector, unless this is the first bio */
1126 bio->bi_sector = sector;
1127 bio->bi_bdev = mdev->ldev->backing_bdev;
45bb912b
LE
1128 bio->bi_rw = rw;
1129 bio->bi_private = e;
1130 bio->bi_end_io = drbd_endio_sec;
1131
1132 bio->bi_next = bios;
1133 bios = bio;
1134 ++n_bios;
1135
1136 page_chain_for_each(page) {
1137 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1138 if (!bio_add_page(bio, page, len, 0)) {
10f6d992
LE
1139 /* A single page must always be possible!
1140 * But in case it fails anyways,
1141 * we deal with it, and complain (below). */
1142 if (bio->bi_vcnt == 0) {
1143 dev_err(DEV,
1144 "bio_add_page failed for len=%u, "
1145 "bi_vcnt=0 (bi_sector=%llu)\n",
1146 len, (unsigned long long)bio->bi_sector);
1147 err = -ENOSPC;
1148 goto fail;
1149 }
45bb912b
LE
1150 goto next_bio;
1151 }
1152 ds -= len;
1153 sector += len >> 9;
1154 --nr_pages;
1155 }
1156 D_ASSERT(page == NULL);
1157 D_ASSERT(ds == 0);
1158
1159 atomic_set(&e->pending_bios, n_bios);
1160 do {
1161 bio = bios;
1162 bios = bios->bi_next;
1163 bio->bi_next = NULL;
1164
45bb912b 1165 drbd_generic_make_request(mdev, fault_type, bio);
45bb912b 1166 } while (bios);
45bb912b
LE
1167 return 0;
1168
1169fail:
1170 while (bios) {
1171 bio = bios;
1172 bios = bios->bi_next;
1173 bio_put(bio);
1174 }
10f6d992 1175 return err;
45bb912b
LE
1176}
1177
02918be2 1178static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
b411b363 1179{
2451fc3b 1180 int rv;
02918be2 1181 struct p_barrier *p = &mdev->data.rbuf.barrier;
b411b363
PR
1182 struct drbd_epoch *epoch;
1183
b411b363
PR
1184 inc_unacked(mdev);
1185
b411b363
PR
1186 mdev->current_epoch->barrier_nr = p->barrier;
1187 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1188
1189 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1190 * the activity log, which means it would not be resynced in case the
1191 * R_PRIMARY crashes now.
1192 * Therefore we must send the barrier_ack after the barrier request was
1193 * completed. */
1194 switch (mdev->write_ordering) {
b411b363
PR
1195 case WO_none:
1196 if (rv == FE_RECYCLED)
81e84650 1197 return true;
2451fc3b
PR
1198
1199 /* receiver context, in the writeout path of the other node.
1200 * avoid potential distributed deadlock */
1201 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1202 if (epoch)
1203 break;
1204 else
1205 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1206 /* Fall through */
b411b363
PR
1207
1208 case WO_bdev_flush:
1209 case WO_drain_io:
b411b363 1210 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
2451fc3b
PR
1211 drbd_flush(mdev);
1212
1213 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1214 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1215 if (epoch)
1216 break;
b411b363
PR
1217 }
1218
2451fc3b
PR
1219 epoch = mdev->current_epoch;
1220 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1221
1222 D_ASSERT(atomic_read(&epoch->active) == 0);
1223 D_ASSERT(epoch->flags == 0);
b411b363 1224
81e84650 1225 return true;
2451fc3b
PR
1226 default:
1227 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
81e84650 1228 return false;
b411b363
PR
1229 }
1230
1231 epoch->flags = 0;
1232 atomic_set(&epoch->epoch_size, 0);
1233 atomic_set(&epoch->active, 0);
1234
1235 spin_lock(&mdev->epoch_lock);
1236 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1237 list_add(&epoch->list, &mdev->current_epoch->list);
1238 mdev->current_epoch = epoch;
1239 mdev->epochs++;
b411b363
PR
1240 } else {
1241 /* The current_epoch got recycled while we allocated this one... */
1242 kfree(epoch);
1243 }
1244 spin_unlock(&mdev->epoch_lock);
1245
81e84650 1246 return true;
b411b363
PR
1247}
1248
1249/* used from receive_RSDataReply (recv_resync_read)
1250 * and from receive_Data */
1251static struct drbd_epoch_entry *
1252read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1253{
6666032a 1254 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
b411b363 1255 struct drbd_epoch_entry *e;
b411b363 1256 struct page *page;
45bb912b 1257 int dgs, ds, rr;
b411b363
PR
1258 void *dig_in = mdev->int_dig_in;
1259 void *dig_vv = mdev->int_dig_vv;
6b4388ac 1260 unsigned long *data;
b411b363
PR
1261
1262 dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1263 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1264
1265 if (dgs) {
1266 rr = drbd_recv(mdev, dig_in, dgs);
1267 if (rr != dgs) {
0ddc5549
LE
1268 if (!signal_pending(current))
1269 dev_warn(DEV,
1270 "short read receiving data digest: read %d expected %d\n",
1271 rr, dgs);
b411b363
PR
1272 return NULL;
1273 }
1274 }
1275
1276 data_size -= dgs;
1277
1278 ERR_IF(data_size & 0x1ff) return NULL;
1816a2b4 1279 ERR_IF(data_size > DRBD_MAX_BIO_SIZE) return NULL;
b411b363 1280
6666032a
LE
1281 /* even though we trust out peer,
1282 * we sometimes have to double check. */
1283 if (sector + (data_size>>9) > capacity) {
fdda6544
LE
1284 dev_err(DEV, "request from peer beyond end of local disk: "
1285 "capacity: %llus < sector: %llus + size: %u\n",
6666032a
LE
1286 (unsigned long long)capacity,
1287 (unsigned long long)sector, data_size);
1288 return NULL;
1289 }
1290
b411b363
PR
1291 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1292 * "criss-cross" setup, that might cause write-out on some other DRBD,
1293 * which in turn might block on the other node at this very place. */
1294 e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1295 if (!e)
1296 return NULL;
45bb912b 1297
a73ff323
LE
1298 if (!data_size)
1299 return e;
1300
b411b363 1301 ds = data_size;
45bb912b
LE
1302 page = e->pages;
1303 page_chain_for_each(page) {
1304 unsigned len = min_t(int, ds, PAGE_SIZE);
6b4388ac 1305 data = kmap(page);
45bb912b 1306 rr = drbd_recv(mdev, data, len);
0cf9d27e 1307 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
6b4388ac
PR
1308 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1309 data[0] = data[0] ^ (unsigned long)-1;
1310 }
b411b363 1311 kunmap(page);
45bb912b 1312 if (rr != len) {
b411b363 1313 drbd_free_ee(mdev, e);
0ddc5549
LE
1314 if (!signal_pending(current))
1315 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1316 rr, len);
b411b363
PR
1317 return NULL;
1318 }
1319 ds -= rr;
1320 }
1321
1322 if (dgs) {
45bb912b 1323 drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
b411b363 1324 if (memcmp(dig_in, dig_vv, dgs)) {
470be44a
LE
1325 dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1326 (unsigned long long)sector, data_size);
b411b363
PR
1327 drbd_bcast_ee(mdev, "digest failed",
1328 dgs, dig_in, dig_vv, e);
1329 drbd_free_ee(mdev, e);
1330 return NULL;
1331 }
1332 }
1333 mdev->recv_cnt += data_size>>9;
1334 return e;
1335}
1336
1337/* drbd_drain_block() just takes a data block
1338 * out of the socket input buffer, and discards it.
1339 */
1340static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1341{
1342 struct page *page;
1343 int rr, rv = 1;
1344 void *data;
1345
c3470cde 1346 if (!data_size)
81e84650 1347 return true;
c3470cde 1348
45bb912b 1349 page = drbd_pp_alloc(mdev, 1, 1);
b411b363
PR
1350
1351 data = kmap(page);
1352 while (data_size) {
1353 rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1354 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1355 rv = 0;
0ddc5549
LE
1356 if (!signal_pending(current))
1357 dev_warn(DEV,
1358 "short read receiving data: read %d expected %d\n",
1359 rr, min_t(int, data_size, PAGE_SIZE));
b411b363
PR
1360 break;
1361 }
1362 data_size -= rr;
1363 }
1364 kunmap(page);
435f0740 1365 drbd_pp_free(mdev, page, 0);
b411b363
PR
1366 return rv;
1367}
1368
1369static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1370 sector_t sector, int data_size)
1371{
1372 struct bio_vec *bvec;
1373 struct bio *bio;
1374 int dgs, rr, i, expect;
1375 void *dig_in = mdev->int_dig_in;
1376 void *dig_vv = mdev->int_dig_vv;
1377
1378 dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1379 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1380
1381 if (dgs) {
1382 rr = drbd_recv(mdev, dig_in, dgs);
1383 if (rr != dgs) {
0ddc5549
LE
1384 if (!signal_pending(current))
1385 dev_warn(DEV,
1386 "short read receiving data reply digest: read %d expected %d\n",
1387 rr, dgs);
b411b363
PR
1388 return 0;
1389 }
1390 }
1391
1392 data_size -= dgs;
1393
1394 /* optimistically update recv_cnt. if receiving fails below,
1395 * we disconnect anyways, and counters will be reset. */
1396 mdev->recv_cnt += data_size>>9;
1397
1398 bio = req->master_bio;
1399 D_ASSERT(sector == bio->bi_sector);
1400
1401 bio_for_each_segment(bvec, bio, i) {
1402 expect = min_t(int, data_size, bvec->bv_len);
1403 rr = drbd_recv(mdev,
1404 kmap(bvec->bv_page)+bvec->bv_offset,
1405 expect);
1406 kunmap(bvec->bv_page);
1407 if (rr != expect) {
0ddc5549
LE
1408 if (!signal_pending(current))
1409 dev_warn(DEV, "short read receiving data reply: "
1410 "read %d expected %d\n",
1411 rr, expect);
b411b363
PR
1412 return 0;
1413 }
1414 data_size -= rr;
1415 }
1416
1417 if (dgs) {
45bb912b 1418 drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
b411b363
PR
1419 if (memcmp(dig_in, dig_vv, dgs)) {
1420 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1421 return 0;
1422 }
1423 }
1424
1425 D_ASSERT(data_size == 0);
1426 return 1;
1427}
1428
1429/* e_end_resync_block() is called via
1430 * drbd_process_done_ee() by asender only */
1431static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1432{
1433 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1434 sector_t sector = e->sector;
1435 int ok;
1436
24c4830c 1437 D_ASSERT(hlist_unhashed(&e->collision));
b411b363 1438
45bb912b 1439 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1440 drbd_set_in_sync(mdev, sector, e->size);
1441 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1442 } else {
1443 /* Record failure to sync */
1444 drbd_rs_failed_io(mdev, sector, e->size);
1445
1446 ok = drbd_send_ack(mdev, P_NEG_ACK, e);
1447 }
1448 dec_unacked(mdev);
1449
1450 return ok;
1451}
1452
1453static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1454{
1455 struct drbd_epoch_entry *e;
1456
1457 e = read_in_block(mdev, ID_SYNCER, sector, data_size);
45bb912b
LE
1458 if (!e)
1459 goto fail;
b411b363
PR
1460
1461 dec_rs_pending(mdev);
1462
b411b363
PR
1463 inc_unacked(mdev);
1464 /* corresponding dec_unacked() in e_end_resync_block()
1465 * respective _drbd_clear_done_ee */
1466
45bb912b
LE
1467 e->w.cb = e_end_resync_block;
1468
b411b363
PR
1469 spin_lock_irq(&mdev->req_lock);
1470 list_add(&e->w.list, &mdev->sync_ee);
1471 spin_unlock_irq(&mdev->req_lock);
1472
0f0601f4 1473 atomic_add(data_size >> 9, &mdev->rs_sect_ev);
45bb912b 1474 if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
81e84650 1475 return true;
b411b363 1476
10f6d992
LE
1477 /* don't care for the reason here */
1478 dev_err(DEV, "submit failed, triggering re-connect\n");
22cc37a9
LE
1479 spin_lock_irq(&mdev->req_lock);
1480 list_del(&e->w.list);
1481 spin_unlock_irq(&mdev->req_lock);
1482
45bb912b
LE
1483 drbd_free_ee(mdev, e);
1484fail:
1485 put_ldev(mdev);
81e84650 1486 return false;
b411b363
PR
1487}
1488
02918be2 1489static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
b411b363
PR
1490{
1491 struct drbd_request *req;
1492 sector_t sector;
b411b363 1493 int ok;
02918be2 1494 struct p_data *p = &mdev->data.rbuf.data;
b411b363
PR
1495
1496 sector = be64_to_cpu(p->sector);
1497
1498 spin_lock_irq(&mdev->req_lock);
1499 req = _ar_id_to_req(mdev, p->block_id, sector);
1500 spin_unlock_irq(&mdev->req_lock);
1501 if (unlikely(!req)) {
1502 dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
81e84650 1503 return false;
b411b363
PR
1504 }
1505
24c4830c 1506 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
b411b363
PR
1507 * special casing it there for the various failure cases.
1508 * still no race with drbd_fail_pending_reads */
1509 ok = recv_dless_read(mdev, req, sector, data_size);
1510
1511 if (ok)
1512 req_mod(req, data_received);
1513 /* else: nothing. handled from drbd_disconnect...
1514 * I don't think we may complete this just yet
1515 * in case we are "on-disconnect: freeze" */
1516
1517 return ok;
1518}
1519
02918be2 1520static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
b411b363
PR
1521{
1522 sector_t sector;
b411b363 1523 int ok;
02918be2 1524 struct p_data *p = &mdev->data.rbuf.data;
b411b363
PR
1525
1526 sector = be64_to_cpu(p->sector);
1527 D_ASSERT(p->block_id == ID_SYNCER);
1528
1529 if (get_ldev(mdev)) {
1530 /* data is submitted to disk within recv_resync_read.
1531 * corresponding put_ldev done below on error,
1532 * or in drbd_endio_write_sec. */
1533 ok = recv_resync_read(mdev, sector, data_size);
1534 } else {
1535 if (__ratelimit(&drbd_ratelimit_state))
1536 dev_err(DEV, "Can not write resync data to local disk.\n");
1537
1538 ok = drbd_drain_block(mdev, data_size);
1539
2b2bf214 1540 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
b411b363
PR
1541 }
1542
778f271d
PR
1543 atomic_add(data_size >> 9, &mdev->rs_sect_in);
1544
b411b363
PR
1545 return ok;
1546}
1547
1548/* e_end_block() is called via drbd_process_done_ee().
1549 * this means this function only runs in the asender thread
1550 */
1551static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1552{
1553 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1554 sector_t sector = e->sector;
b411b363
PR
1555 int ok = 1, pcmd;
1556
b411b363 1557 if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
45bb912b 1558 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1559 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1560 mdev->state.conn <= C_PAUSED_SYNC_T &&
1561 e->flags & EE_MAY_SET_IN_SYNC) ?
1562 P_RS_WRITE_ACK : P_WRITE_ACK;
1563 ok &= drbd_send_ack(mdev, pcmd, e);
1564 if (pcmd == P_RS_WRITE_ACK)
1565 drbd_set_in_sync(mdev, sector, e->size);
1566 } else {
1567 ok = drbd_send_ack(mdev, P_NEG_ACK, e);
1568 /* we expect it to be marked out of sync anyways...
1569 * maybe assert this? */
1570 }
1571 dec_unacked(mdev);
1572 }
1573 /* we delete from the conflict detection hash _after_ we sent out the
1574 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
1575 if (mdev->net_conf->two_primaries) {
1576 spin_lock_irq(&mdev->req_lock);
24c4830c
BVA
1577 D_ASSERT(!hlist_unhashed(&e->collision));
1578 hlist_del_init(&e->collision);
b411b363
PR
1579 spin_unlock_irq(&mdev->req_lock);
1580 } else {
24c4830c 1581 D_ASSERT(hlist_unhashed(&e->collision));
b411b363
PR
1582 }
1583
1584 drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1585
1586 return ok;
1587}
1588
1589static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1590{
1591 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1592 int ok = 1;
1593
1594 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1595 ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1596
1597 spin_lock_irq(&mdev->req_lock);
24c4830c
BVA
1598 D_ASSERT(!hlist_unhashed(&e->collision));
1599 hlist_del_init(&e->collision);
b411b363
PR
1600 spin_unlock_irq(&mdev->req_lock);
1601
1602 dec_unacked(mdev);
1603
1604 return ok;
1605}
1606
b6a370ba
PR
1607static bool overlapping_resync_write(struct drbd_conf *mdev, struct drbd_epoch_entry *data_e)
1608{
1609
1610 struct drbd_epoch_entry *rs_e;
1611 bool rv = 0;
1612
1613 spin_lock_irq(&mdev->req_lock);
1614 list_for_each_entry(rs_e, &mdev->sync_ee, w.list) {
1615 if (overlaps(data_e->sector, data_e->size, rs_e->sector, rs_e->size)) {
1616 rv = 1;
1617 break;
1618 }
1619 }
1620 spin_unlock_irq(&mdev->req_lock);
1621
1622 return rv;
1623}
1624
b411b363
PR
1625/* Called from receive_Data.
1626 * Synchronize packets on sock with packets on msock.
1627 *
1628 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1629 * packet traveling on msock, they are still processed in the order they have
1630 * been sent.
1631 *
1632 * Note: we don't care for Ack packets overtaking P_DATA packets.
1633 *
1634 * In case packet_seq is larger than mdev->peer_seq number, there are
1635 * outstanding packets on the msock. We wait for them to arrive.
1636 * In case we are the logically next packet, we update mdev->peer_seq
1637 * ourselves. Correctly handles 32bit wrap around.
1638 *
1639 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1640 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1641 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1642 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1643 *
1644 * returns 0 if we may process the packet,
1645 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1646static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1647{
1648 DEFINE_WAIT(wait);
1649 unsigned int p_seq;
1650 long timeout;
1651 int ret = 0;
1652 spin_lock(&mdev->peer_seq_lock);
1653 for (;;) {
1654 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1655 if (seq_le(packet_seq, mdev->peer_seq+1))
1656 break;
1657 if (signal_pending(current)) {
1658 ret = -ERESTARTSYS;
1659 break;
1660 }
1661 p_seq = mdev->peer_seq;
1662 spin_unlock(&mdev->peer_seq_lock);
1663 timeout = schedule_timeout(30*HZ);
1664 spin_lock(&mdev->peer_seq_lock);
1665 if (timeout == 0 && p_seq == mdev->peer_seq) {
1666 ret = -ETIMEDOUT;
1667 dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1668 break;
1669 }
1670 }
1671 finish_wait(&mdev->seq_wait, &wait);
1672 if (mdev->peer_seq+1 == packet_seq)
1673 mdev->peer_seq++;
1674 spin_unlock(&mdev->peer_seq_lock);
1675 return ret;
1676}
1677
688593c5
LE
1678/* see also bio_flags_to_wire()
1679 * DRBD_REQ_*, because we need to semantically map the flags to data packet
1680 * flags and back. We may replicate to other kernel versions. */
1681static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
76d2e7ec 1682{
688593c5
LE
1683 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1684 (dpf & DP_FUA ? REQ_FUA : 0) |
1685 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1686 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
76d2e7ec
PR
1687}
1688
b411b363 1689/* mirrored write */
02918be2 1690static int receive_Data(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
b411b363
PR
1691{
1692 sector_t sector;
1693 struct drbd_epoch_entry *e;
02918be2 1694 struct p_data *p = &mdev->data.rbuf.data;
b411b363
PR
1695 int rw = WRITE;
1696 u32 dp_flags;
1697
b411b363 1698 if (!get_ldev(mdev)) {
b411b363
PR
1699 spin_lock(&mdev->peer_seq_lock);
1700 if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1701 mdev->peer_seq++;
1702 spin_unlock(&mdev->peer_seq_lock);
1703
2b2bf214 1704 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
b411b363
PR
1705 atomic_inc(&mdev->current_epoch->epoch_size);
1706 return drbd_drain_block(mdev, data_size);
1707 }
1708
1709 /* get_ldev(mdev) successful.
1710 * Corresponding put_ldev done either below (on various errors),
1711 * or in drbd_endio_write_sec, if we successfully submit the data at
1712 * the end of this function. */
1713
1714 sector = be64_to_cpu(p->sector);
1715 e = read_in_block(mdev, p->block_id, sector, data_size);
1716 if (!e) {
1717 put_ldev(mdev);
81e84650 1718 return false;
b411b363
PR
1719 }
1720
b411b363
PR
1721 e->w.cb = e_end_block;
1722
688593c5
LE
1723 dp_flags = be32_to_cpu(p->dp_flags);
1724 rw |= wire_flags_to_bio(mdev, dp_flags);
a73ff323
LE
1725 if (e->pages == NULL) {
1726 D_ASSERT(e->size == 0);
1727 D_ASSERT(dp_flags & DP_FLUSH);
1728 }
688593c5
LE
1729
1730 if (dp_flags & DP_MAY_SET_IN_SYNC)
1731 e->flags |= EE_MAY_SET_IN_SYNC;
1732
b411b363
PR
1733 spin_lock(&mdev->epoch_lock);
1734 e->epoch = mdev->current_epoch;
1735 atomic_inc(&e->epoch->epoch_size);
1736 atomic_inc(&e->epoch->active);
b411b363
PR
1737 spin_unlock(&mdev->epoch_lock);
1738
b411b363
PR
1739 /* I'm the receiver, I do hold a net_cnt reference. */
1740 if (!mdev->net_conf->two_primaries) {
1741 spin_lock_irq(&mdev->req_lock);
1742 } else {
1743 /* don't get the req_lock yet,
1744 * we may sleep in drbd_wait_peer_seq */
1745 const int size = e->size;
1746 const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1747 DEFINE_WAIT(wait);
1748 struct drbd_request *i;
1749 struct hlist_node *n;
1750 struct hlist_head *slot;
1751 int first;
1752
1753 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1754 BUG_ON(mdev->ee_hash == NULL);
1755 BUG_ON(mdev->tl_hash == NULL);
1756
1757 /* conflict detection and handling:
1758 * 1. wait on the sequence number,
1759 * in case this data packet overtook ACK packets.
1760 * 2. check our hash tables for conflicting requests.
1761 * we only need to walk the tl_hash, since an ee can not
1762 * have a conflict with an other ee: on the submitting
1763 * node, the corresponding req had already been conflicting,
1764 * and a conflicting req is never sent.
1765 *
1766 * Note: for two_primaries, we are protocol C,
1767 * so there cannot be any request that is DONE
1768 * but still on the transfer log.
1769 *
1770 * unconditionally add to the ee_hash.
1771 *
1772 * if no conflicting request is found:
1773 * submit.
1774 *
1775 * if any conflicting request is found
1776 * that has not yet been acked,
1777 * AND I have the "discard concurrent writes" flag:
1778 * queue (via done_ee) the P_DISCARD_ACK; OUT.
1779 *
1780 * if any conflicting request is found:
1781 * block the receiver, waiting on misc_wait
1782 * until no more conflicting requests are there,
1783 * or we get interrupted (disconnect).
1784 *
1785 * we do not just write after local io completion of those
1786 * requests, but only after req is done completely, i.e.
1787 * we wait for the P_DISCARD_ACK to arrive!
1788 *
1789 * then proceed normally, i.e. submit.
1790 */
1791 if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1792 goto out_interrupted;
1793
1794 spin_lock_irq(&mdev->req_lock);
1795
24c4830c 1796 hlist_add_head(&e->collision, ee_hash_slot(mdev, sector));
b411b363
PR
1797
1798#define OVERLAPS overlaps(i->sector, i->size, sector, size)
1799 slot = tl_hash_slot(mdev, sector);
1800 first = 1;
1801 for (;;) {
1802 int have_unacked = 0;
1803 int have_conflict = 0;
1804 prepare_to_wait(&mdev->misc_wait, &wait,
1805 TASK_INTERRUPTIBLE);
24c4830c 1806 hlist_for_each_entry(i, n, slot, collision) {
b411b363
PR
1807 if (OVERLAPS) {
1808 /* only ALERT on first iteration,
1809 * we may be woken up early... */
1810 if (first)
1811 dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1812 " new: %llus +%u; pending: %llus +%u\n",
1813 current->comm, current->pid,
1814 (unsigned long long)sector, size,
1815 (unsigned long long)i->sector, i->size);
1816 if (i->rq_state & RQ_NET_PENDING)
1817 ++have_unacked;
1818 ++have_conflict;
1819 }
1820 }
1821#undef OVERLAPS
1822 if (!have_conflict)
1823 break;
1824
1825 /* Discard Ack only for the _first_ iteration */
1826 if (first && discard && have_unacked) {
1827 dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1828 (unsigned long long)sector);
1829 inc_unacked(mdev);
1830 e->w.cb = e_send_discard_ack;
1831 list_add_tail(&e->w.list, &mdev->done_ee);
1832
1833 spin_unlock_irq(&mdev->req_lock);
1834
1835 /* we could probably send that P_DISCARD_ACK ourselves,
1836 * but I don't like the receiver using the msock */
1837
1838 put_ldev(mdev);
1839 wake_asender(mdev);
1840 finish_wait(&mdev->misc_wait, &wait);
81e84650 1841 return true;
b411b363
PR
1842 }
1843
1844 if (signal_pending(current)) {
24c4830c 1845 hlist_del_init(&e->collision);
b411b363
PR
1846
1847 spin_unlock_irq(&mdev->req_lock);
1848
1849 finish_wait(&mdev->misc_wait, &wait);
1850 goto out_interrupted;
1851 }
1852
1853 spin_unlock_irq(&mdev->req_lock);
1854 if (first) {
1855 first = 0;
1856 dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1857 "sec=%llus\n", (unsigned long long)sector);
1858 } else if (discard) {
1859 /* we had none on the first iteration.
1860 * there must be none now. */
1861 D_ASSERT(have_unacked == 0);
1862 }
1863 schedule();
1864 spin_lock_irq(&mdev->req_lock);
1865 }
1866 finish_wait(&mdev->misc_wait, &wait);
1867 }
1868
1869 list_add(&e->w.list, &mdev->active_ee);
1870 spin_unlock_irq(&mdev->req_lock);
1871
b6a370ba
PR
1872 if (mdev->state.conn == C_SYNC_TARGET)
1873 wait_event(mdev->ee_wait, !overlapping_resync_write(mdev, e));
1874
b411b363
PR
1875 switch (mdev->net_conf->wire_protocol) {
1876 case DRBD_PROT_C:
1877 inc_unacked(mdev);
1878 /* corresponding dec_unacked() in e_end_block()
1879 * respective _drbd_clear_done_ee */
1880 break;
1881 case DRBD_PROT_B:
1882 /* I really don't like it that the receiver thread
1883 * sends on the msock, but anyways */
1884 drbd_send_ack(mdev, P_RECV_ACK, e);
1885 break;
1886 case DRBD_PROT_A:
1887 /* nothing to do */
1888 break;
1889 }
1890
6719fb03 1891 if (mdev->state.pdsk < D_INCONSISTENT) {
b411b363
PR
1892 /* In case we have the only disk of the cluster, */
1893 drbd_set_out_of_sync(mdev, e->sector, e->size);
1894 e->flags |= EE_CALL_AL_COMPLETE_IO;
6719fb03 1895 e->flags &= ~EE_MAY_SET_IN_SYNC;
b411b363
PR
1896 drbd_al_begin_io(mdev, e->sector);
1897 }
1898
45bb912b 1899 if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
81e84650 1900 return true;
b411b363 1901
10f6d992
LE
1902 /* don't care for the reason here */
1903 dev_err(DEV, "submit failed, triggering re-connect\n");
22cc37a9
LE
1904 spin_lock_irq(&mdev->req_lock);
1905 list_del(&e->w.list);
24c4830c 1906 hlist_del_init(&e->collision);
22cc37a9
LE
1907 spin_unlock_irq(&mdev->req_lock);
1908 if (e->flags & EE_CALL_AL_COMPLETE_IO)
1909 drbd_al_complete_io(mdev, e->sector);
1910
b411b363 1911out_interrupted:
10f6d992 1912 drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + EV_CLEANUP);
b411b363
PR
1913 put_ldev(mdev);
1914 drbd_free_ee(mdev, e);
81e84650 1915 return false;
b411b363
PR
1916}
1917
0f0601f4
LE
1918/* We may throttle resync, if the lower device seems to be busy,
1919 * and current sync rate is above c_min_rate.
1920 *
1921 * To decide whether or not the lower device is busy, we use a scheme similar
1922 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
1923 * (more than 64 sectors) of activity we cannot account for with our own resync
1924 * activity, it obviously is "busy".
1925 *
1926 * The current sync rate used here uses only the most recent two step marks,
1927 * to have a short time average so we can react faster.
1928 */
e3555d85 1929int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
0f0601f4
LE
1930{
1931 struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
1932 unsigned long db, dt, dbdt;
e3555d85 1933 struct lc_element *tmp;
0f0601f4
LE
1934 int curr_events;
1935 int throttle = 0;
1936
1937 /* feature disabled? */
1938 if (mdev->sync_conf.c_min_rate == 0)
1939 return 0;
1940
e3555d85
PR
1941 spin_lock_irq(&mdev->al_lock);
1942 tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
1943 if (tmp) {
1944 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
1945 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
1946 spin_unlock_irq(&mdev->al_lock);
1947 return 0;
1948 }
1949 /* Do not slow down if app IO is already waiting for this extent */
1950 }
1951 spin_unlock_irq(&mdev->al_lock);
1952
0f0601f4
LE
1953 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
1954 (int)part_stat_read(&disk->part0, sectors[1]) -
1955 atomic_read(&mdev->rs_sect_ev);
e3555d85 1956
0f0601f4
LE
1957 if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
1958 unsigned long rs_left;
1959 int i;
1960
1961 mdev->rs_last_events = curr_events;
1962
1963 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
1964 * approx. */
2649f080
LE
1965 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
1966
1967 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
1968 rs_left = mdev->ov_left;
1969 else
1970 rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
0f0601f4
LE
1971
1972 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
1973 if (!dt)
1974 dt++;
1975 db = mdev->rs_mark_left[i] - rs_left;
1976 dbdt = Bit2KB(db/dt);
1977
1978 if (dbdt > mdev->sync_conf.c_min_rate)
1979 throttle = 1;
1980 }
1981 return throttle;
1982}
1983
1984
02918be2 1985static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int digest_size)
b411b363
PR
1986{
1987 sector_t sector;
1988 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1989 struct drbd_epoch_entry *e;
1990 struct digest_info *di = NULL;
b18b37be 1991 int size, verb;
b411b363 1992 unsigned int fault_type;
02918be2 1993 struct p_block_req *p = &mdev->data.rbuf.block_req;
b411b363
PR
1994
1995 sector = be64_to_cpu(p->sector);
1996 size = be32_to_cpu(p->blksize);
1997
1816a2b4 1998 if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_BIO_SIZE) {
b411b363
PR
1999 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2000 (unsigned long long)sector, size);
81e84650 2001 return false;
b411b363
PR
2002 }
2003 if (sector + (size>>9) > capacity) {
2004 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2005 (unsigned long long)sector, size);
81e84650 2006 return false;
b411b363
PR
2007 }
2008
2009 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
b18b37be
PR
2010 verb = 1;
2011 switch (cmd) {
2012 case P_DATA_REQUEST:
2013 drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2014 break;
2015 case P_RS_DATA_REQUEST:
2016 case P_CSUM_RS_REQUEST:
2017 case P_OV_REQUEST:
2018 drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2019 break;
2020 case P_OV_REPLY:
2021 verb = 0;
2022 dec_rs_pending(mdev);
2023 drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2024 break;
2025 default:
2026 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2027 cmdname(cmd));
2028 }
2029 if (verb && __ratelimit(&drbd_ratelimit_state))
b411b363
PR
2030 dev_err(DEV, "Can not satisfy peer's read request, "
2031 "no local data.\n");
b18b37be 2032
a821cc4a
LE
2033 /* drain possibly payload */
2034 return drbd_drain_block(mdev, digest_size);
b411b363
PR
2035 }
2036
2037 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2038 * "criss-cross" setup, that might cause write-out on some other DRBD,
2039 * which in turn might block on the other node at this very place. */
2040 e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2041 if (!e) {
2042 put_ldev(mdev);
81e84650 2043 return false;
b411b363
PR
2044 }
2045
02918be2 2046 switch (cmd) {
b411b363
PR
2047 case P_DATA_REQUEST:
2048 e->w.cb = w_e_end_data_req;
2049 fault_type = DRBD_FAULT_DT_RD;
80a40e43
LE
2050 /* application IO, don't drbd_rs_begin_io */
2051 goto submit;
2052
b411b363
PR
2053 case P_RS_DATA_REQUEST:
2054 e->w.cb = w_e_end_rsdata_req;
2055 fault_type = DRBD_FAULT_RS_RD;
5f9915bb
LE
2056 /* used in the sector offset progress display */
2057 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
b411b363
PR
2058 break;
2059
2060 case P_OV_REPLY:
2061 case P_CSUM_RS_REQUEST:
2062 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2063 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2064 if (!di)
2065 goto out_free_e;
2066
2067 di->digest_size = digest_size;
2068 di->digest = (((char *)di)+sizeof(struct digest_info));
2069
c36c3ced
LE
2070 e->digest = di;
2071 e->flags |= EE_HAS_DIGEST;
2072
b411b363
PR
2073 if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2074 goto out_free_e;
2075
02918be2 2076 if (cmd == P_CSUM_RS_REQUEST) {
b411b363
PR
2077 D_ASSERT(mdev->agreed_pro_version >= 89);
2078 e->w.cb = w_e_end_csum_rs_req;
5f9915bb
LE
2079 /* used in the sector offset progress display */
2080 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
02918be2 2081 } else if (cmd == P_OV_REPLY) {
2649f080
LE
2082 /* track progress, we may need to throttle */
2083 atomic_add(size >> 9, &mdev->rs_sect_in);
b411b363
PR
2084 e->w.cb = w_e_end_ov_reply;
2085 dec_rs_pending(mdev);
0f0601f4
LE
2086 /* drbd_rs_begin_io done when we sent this request,
2087 * but accounting still needs to be done. */
2088 goto submit_for_resync;
b411b363
PR
2089 }
2090 break;
2091
2092 case P_OV_REQUEST:
b411b363
PR
2093 if (mdev->ov_start_sector == ~(sector_t)0 &&
2094 mdev->agreed_pro_version >= 90) {
de228bba
LE
2095 unsigned long now = jiffies;
2096 int i;
b411b363
PR
2097 mdev->ov_start_sector = sector;
2098 mdev->ov_position = sector;
30b743a2
LE
2099 mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2100 mdev->rs_total = mdev->ov_left;
de228bba
LE
2101 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2102 mdev->rs_mark_left[i] = mdev->ov_left;
2103 mdev->rs_mark_time[i] = now;
2104 }
b411b363
PR
2105 dev_info(DEV, "Online Verify start sector: %llu\n",
2106 (unsigned long long)sector);
2107 }
2108 e->w.cb = w_e_end_ov_req;
2109 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2110 break;
2111
b411b363
PR
2112 default:
2113 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
02918be2 2114 cmdname(cmd));
b411b363 2115 fault_type = DRBD_FAULT_MAX;
80a40e43 2116 goto out_free_e;
b411b363
PR
2117 }
2118
0f0601f4
LE
2119 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2120 * wrt the receiver, but it is not as straightforward as it may seem.
2121 * Various places in the resync start and stop logic assume resync
2122 * requests are processed in order, requeuing this on the worker thread
2123 * introduces a bunch of new code for synchronization between threads.
2124 *
2125 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2126 * "forever", throttling after drbd_rs_begin_io will lock that extent
2127 * for application writes for the same time. For now, just throttle
2128 * here, where the rest of the code expects the receiver to sleep for
2129 * a while, anyways.
2130 */
2131
2132 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2133 * this defers syncer requests for some time, before letting at least
2134 * on request through. The resync controller on the receiving side
2135 * will adapt to the incoming rate accordingly.
2136 *
2137 * We cannot throttle here if remote is Primary/SyncTarget:
2138 * we would also throttle its application reads.
2139 * In that case, throttling is done on the SyncTarget only.
2140 */
e3555d85
PR
2141 if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2142 schedule_timeout_uninterruptible(HZ/10);
2143 if (drbd_rs_begin_io(mdev, sector))
80a40e43 2144 goto out_free_e;
b411b363 2145
0f0601f4
LE
2146submit_for_resync:
2147 atomic_add(size >> 9, &mdev->rs_sect_ev);
2148
80a40e43 2149submit:
b411b363 2150 inc_unacked(mdev);
80a40e43
LE
2151 spin_lock_irq(&mdev->req_lock);
2152 list_add_tail(&e->w.list, &mdev->read_ee);
2153 spin_unlock_irq(&mdev->req_lock);
b411b363 2154
45bb912b 2155 if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
81e84650 2156 return true;
b411b363 2157
10f6d992
LE
2158 /* don't care for the reason here */
2159 dev_err(DEV, "submit failed, triggering re-connect\n");
22cc37a9
LE
2160 spin_lock_irq(&mdev->req_lock);
2161 list_del(&e->w.list);
2162 spin_unlock_irq(&mdev->req_lock);
2163 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2164
b411b363 2165out_free_e:
b411b363
PR
2166 put_ldev(mdev);
2167 drbd_free_ee(mdev, e);
81e84650 2168 return false;
b411b363
PR
2169}
2170
2171static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2172{
2173 int self, peer, rv = -100;
2174 unsigned long ch_self, ch_peer;
2175
2176 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2177 peer = mdev->p_uuid[UI_BITMAP] & 1;
2178
2179 ch_peer = mdev->p_uuid[UI_SIZE];
2180 ch_self = mdev->comm_bm_set;
2181
2182 switch (mdev->net_conf->after_sb_0p) {
2183 case ASB_CONSENSUS:
2184 case ASB_DISCARD_SECONDARY:
2185 case ASB_CALL_HELPER:
2186 dev_err(DEV, "Configuration error.\n");
2187 break;
2188 case ASB_DISCONNECT:
2189 break;
2190 case ASB_DISCARD_YOUNGER_PRI:
2191 if (self == 0 && peer == 1) {
2192 rv = -1;
2193 break;
2194 }
2195 if (self == 1 && peer == 0) {
2196 rv = 1;
2197 break;
2198 }
2199 /* Else fall through to one of the other strategies... */
2200 case ASB_DISCARD_OLDER_PRI:
2201 if (self == 0 && peer == 1) {
2202 rv = 1;
2203 break;
2204 }
2205 if (self == 1 && peer == 0) {
2206 rv = -1;
2207 break;
2208 }
2209 /* Else fall through to one of the other strategies... */
ad19bf6e 2210 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
b411b363
PR
2211 "Using discard-least-changes instead\n");
2212 case ASB_DISCARD_ZERO_CHG:
2213 if (ch_peer == 0 && ch_self == 0) {
2214 rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2215 ? -1 : 1;
2216 break;
2217 } else {
2218 if (ch_peer == 0) { rv = 1; break; }
2219 if (ch_self == 0) { rv = -1; break; }
2220 }
2221 if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2222 break;
2223 case ASB_DISCARD_LEAST_CHG:
2224 if (ch_self < ch_peer)
2225 rv = -1;
2226 else if (ch_self > ch_peer)
2227 rv = 1;
2228 else /* ( ch_self == ch_peer ) */
2229 /* Well, then use something else. */
2230 rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2231 ? -1 : 1;
2232 break;
2233 case ASB_DISCARD_LOCAL:
2234 rv = -1;
2235 break;
2236 case ASB_DISCARD_REMOTE:
2237 rv = 1;
2238 }
2239
2240 return rv;
2241}
2242
2243static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2244{
6184ea21 2245 int hg, rv = -100;
b411b363
PR
2246
2247 switch (mdev->net_conf->after_sb_1p) {
2248 case ASB_DISCARD_YOUNGER_PRI:
2249 case ASB_DISCARD_OLDER_PRI:
2250 case ASB_DISCARD_LEAST_CHG:
2251 case ASB_DISCARD_LOCAL:
2252 case ASB_DISCARD_REMOTE:
2253 dev_err(DEV, "Configuration error.\n");
2254 break;
2255 case ASB_DISCONNECT:
2256 break;
2257 case ASB_CONSENSUS:
2258 hg = drbd_asb_recover_0p(mdev);
2259 if (hg == -1 && mdev->state.role == R_SECONDARY)
2260 rv = hg;
2261 if (hg == 1 && mdev->state.role == R_PRIMARY)
2262 rv = hg;
2263 break;
2264 case ASB_VIOLENTLY:
2265 rv = drbd_asb_recover_0p(mdev);
2266 break;
2267 case ASB_DISCARD_SECONDARY:
2268 return mdev->state.role == R_PRIMARY ? 1 : -1;
2269 case ASB_CALL_HELPER:
2270 hg = drbd_asb_recover_0p(mdev);
2271 if (hg == -1 && mdev->state.role == R_PRIMARY) {
bb437946
AG
2272 enum drbd_state_rv rv2;
2273
2274 drbd_set_role(mdev, R_SECONDARY, 0);
b411b363
PR
2275 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2276 * we might be here in C_WF_REPORT_PARAMS which is transient.
2277 * we do not need to wait for the after state change work either. */
bb437946
AG
2278 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2279 if (rv2 != SS_SUCCESS) {
b411b363
PR
2280 drbd_khelper(mdev, "pri-lost-after-sb");
2281 } else {
2282 dev_warn(DEV, "Successfully gave up primary role.\n");
2283 rv = hg;
2284 }
2285 } else
2286 rv = hg;
2287 }
2288
2289 return rv;
2290}
2291
2292static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2293{
6184ea21 2294 int hg, rv = -100;
b411b363
PR
2295
2296 switch (mdev->net_conf->after_sb_2p) {
2297 case ASB_DISCARD_YOUNGER_PRI:
2298 case ASB_DISCARD_OLDER_PRI:
2299 case ASB_DISCARD_LEAST_CHG:
2300 case ASB_DISCARD_LOCAL:
2301 case ASB_DISCARD_REMOTE:
2302 case ASB_CONSENSUS:
2303 case ASB_DISCARD_SECONDARY:
2304 dev_err(DEV, "Configuration error.\n");
2305 break;
2306 case ASB_VIOLENTLY:
2307 rv = drbd_asb_recover_0p(mdev);
2308 break;
2309 case ASB_DISCONNECT:
2310 break;
2311 case ASB_CALL_HELPER:
2312 hg = drbd_asb_recover_0p(mdev);
2313 if (hg == -1) {
bb437946
AG
2314 enum drbd_state_rv rv2;
2315
b411b363
PR
2316 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2317 * we might be here in C_WF_REPORT_PARAMS which is transient.
2318 * we do not need to wait for the after state change work either. */
bb437946
AG
2319 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2320 if (rv2 != SS_SUCCESS) {
b411b363
PR
2321 drbd_khelper(mdev, "pri-lost-after-sb");
2322 } else {
2323 dev_warn(DEV, "Successfully gave up primary role.\n");
2324 rv = hg;
2325 }
2326 } else
2327 rv = hg;
2328 }
2329
2330 return rv;
2331}
2332
2333static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2334 u64 bits, u64 flags)
2335{
2336 if (!uuid) {
2337 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2338 return;
2339 }
2340 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2341 text,
2342 (unsigned long long)uuid[UI_CURRENT],
2343 (unsigned long long)uuid[UI_BITMAP],
2344 (unsigned long long)uuid[UI_HISTORY_START],
2345 (unsigned long long)uuid[UI_HISTORY_END],
2346 (unsigned long long)bits,
2347 (unsigned long long)flags);
2348}
2349
2350/*
2351 100 after split brain try auto recover
2352 2 C_SYNC_SOURCE set BitMap
2353 1 C_SYNC_SOURCE use BitMap
2354 0 no Sync
2355 -1 C_SYNC_TARGET use BitMap
2356 -2 C_SYNC_TARGET set BitMap
2357 -100 after split brain, disconnect
2358-1000 unrelated data
4a23f264
PR
2359-1091 requires proto 91
2360-1096 requires proto 96
b411b363
PR
2361 */
2362static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2363{
2364 u64 self, peer;
2365 int i, j;
2366
2367 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2368 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2369
2370 *rule_nr = 10;
2371 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2372 return 0;
2373
2374 *rule_nr = 20;
2375 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2376 peer != UUID_JUST_CREATED)
2377 return -2;
2378
2379 *rule_nr = 30;
2380 if (self != UUID_JUST_CREATED &&
2381 (peer == UUID_JUST_CREATED || peer == (u64)0))
2382 return 2;
2383
2384 if (self == peer) {
2385 int rct, dc; /* roles at crash time */
2386
2387 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2388
2389 if (mdev->agreed_pro_version < 91)
4a23f264 2390 return -1091;
b411b363
PR
2391
2392 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2393 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2394 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2395 drbd_uuid_set_bm(mdev, 0UL);
2396
2397 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2398 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2399 *rule_nr = 34;
2400 } else {
2401 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2402 *rule_nr = 36;
2403 }
2404
2405 return 1;
2406 }
2407
2408 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2409
2410 if (mdev->agreed_pro_version < 91)
4a23f264 2411 return -1091;
b411b363
PR
2412
2413 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2414 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2415 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2416
2417 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2418 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2419 mdev->p_uuid[UI_BITMAP] = 0UL;
2420
2421 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2422 *rule_nr = 35;
2423 } else {
2424 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2425 *rule_nr = 37;
2426 }
2427
2428 return -1;
2429 }
2430
2431 /* Common power [off|failure] */
2432 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2433 (mdev->p_uuid[UI_FLAGS] & 2);
2434 /* lowest bit is set when we were primary,
2435 * next bit (weight 2) is set when peer was primary */
2436 *rule_nr = 40;
2437
2438 switch (rct) {
2439 case 0: /* !self_pri && !peer_pri */ return 0;
2440 case 1: /* self_pri && !peer_pri */ return 1;
2441 case 2: /* !self_pri && peer_pri */ return -1;
2442 case 3: /* self_pri && peer_pri */
2443 dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2444 return dc ? -1 : 1;
2445 }
2446 }
2447
2448 *rule_nr = 50;
2449 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2450 if (self == peer)
2451 return -1;
2452
2453 *rule_nr = 51;
2454 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2455 if (self == peer) {
4a23f264
PR
2456 if (mdev->agreed_pro_version < 96 ?
2457 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2458 (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2459 peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2460 /* The last P_SYNC_UUID did not get though. Undo the last start of
2461 resync as sync source modifications of the peer's UUIDs. */
2462
2463 if (mdev->agreed_pro_version < 91)
4a23f264 2464 return -1091;
b411b363
PR
2465
2466 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2467 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
4a23f264 2468
92b4ca29 2469 dev_info(DEV, "Lost last syncUUID packet, corrected:\n");
4a23f264
PR
2470 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2471
b411b363
PR
2472 return -1;
2473 }
2474 }
2475
2476 *rule_nr = 60;
2477 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2478 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2479 peer = mdev->p_uuid[i] & ~((u64)1);
2480 if (self == peer)
2481 return -2;
2482 }
2483
2484 *rule_nr = 70;
2485 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2486 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2487 if (self == peer)
2488 return 1;
2489
2490 *rule_nr = 71;
2491 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2492 if (self == peer) {
4a23f264
PR
2493 if (mdev->agreed_pro_version < 96 ?
2494 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2495 (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2496 self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2497 /* The last P_SYNC_UUID did not get though. Undo the last start of
2498 resync as sync source modifications of our UUIDs. */
2499
2500 if (mdev->agreed_pro_version < 91)
4a23f264 2501 return -1091;
b411b363
PR
2502
2503 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2504 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2505
4a23f264 2506 dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
b411b363
PR
2507 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2508 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2509
2510 return 1;
2511 }
2512 }
2513
2514
2515 *rule_nr = 80;
d8c2a36b 2516 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
b411b363
PR
2517 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2518 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2519 if (self == peer)
2520 return 2;
2521 }
2522
2523 *rule_nr = 90;
2524 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2525 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2526 if (self == peer && self != ((u64)0))
2527 return 100;
2528
2529 *rule_nr = 100;
2530 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2531 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2532 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2533 peer = mdev->p_uuid[j] & ~((u64)1);
2534 if (self == peer)
2535 return -100;
2536 }
2537 }
2538
2539 return -1000;
2540}
2541
2542/* drbd_sync_handshake() returns the new conn state on success, or
2543 CONN_MASK (-1) on failure.
2544 */
2545static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2546 enum drbd_disk_state peer_disk) __must_hold(local)
2547{
2548 int hg, rule_nr;
2549 enum drbd_conns rv = C_MASK;
2550 enum drbd_disk_state mydisk;
2551
2552 mydisk = mdev->state.disk;
2553 if (mydisk == D_NEGOTIATING)
2554 mydisk = mdev->new_state_tmp.disk;
2555
2556 dev_info(DEV, "drbd_sync_handshake:\n");
2557 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2558 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2559 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2560
2561 hg = drbd_uuid_compare(mdev, &rule_nr);
2562
2563 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2564
2565 if (hg == -1000) {
2566 dev_alert(DEV, "Unrelated data, aborting!\n");
2567 return C_MASK;
2568 }
4a23f264
PR
2569 if (hg < -1000) {
2570 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
b411b363
PR
2571 return C_MASK;
2572 }
2573
2574 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2575 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2576 int f = (hg == -100) || abs(hg) == 2;
2577 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2578 if (f)
2579 hg = hg*2;
2580 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2581 hg > 0 ? "source" : "target");
2582 }
2583
3a11a487
AG
2584 if (abs(hg) == 100)
2585 drbd_khelper(mdev, "initial-split-brain");
2586
b411b363
PR
2587 if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2588 int pcount = (mdev->state.role == R_PRIMARY)
2589 + (peer_role == R_PRIMARY);
2590 int forced = (hg == -100);
2591
2592 switch (pcount) {
2593 case 0:
2594 hg = drbd_asb_recover_0p(mdev);
2595 break;
2596 case 1:
2597 hg = drbd_asb_recover_1p(mdev);
2598 break;
2599 case 2:
2600 hg = drbd_asb_recover_2p(mdev);
2601 break;
2602 }
2603 if (abs(hg) < 100) {
2604 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2605 "automatically solved. Sync from %s node\n",
2606 pcount, (hg < 0) ? "peer" : "this");
2607 if (forced) {
2608 dev_warn(DEV, "Doing a full sync, since"
2609 " UUIDs where ambiguous.\n");
2610 hg = hg*2;
2611 }
2612 }
2613 }
2614
2615 if (hg == -100) {
2616 if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2617 hg = -1;
2618 if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2619 hg = 1;
2620
2621 if (abs(hg) < 100)
2622 dev_warn(DEV, "Split-Brain detected, manually solved. "
2623 "Sync from %s node\n",
2624 (hg < 0) ? "peer" : "this");
2625 }
2626
2627 if (hg == -100) {
580b9767
LE
2628 /* FIXME this log message is not correct if we end up here
2629 * after an attempted attach on a diskless node.
2630 * We just refuse to attach -- well, we drop the "connection"
2631 * to that disk, in a way... */
3a11a487 2632 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
b411b363
PR
2633 drbd_khelper(mdev, "split-brain");
2634 return C_MASK;
2635 }
2636
2637 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2638 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2639 return C_MASK;
2640 }
2641
2642 if (hg < 0 && /* by intention we do not use mydisk here. */
2643 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2644 switch (mdev->net_conf->rr_conflict) {
2645 case ASB_CALL_HELPER:
2646 drbd_khelper(mdev, "pri-lost");
2647 /* fall through */
2648 case ASB_DISCONNECT:
2649 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2650 return C_MASK;
2651 case ASB_VIOLENTLY:
2652 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2653 "assumption\n");
2654 }
2655 }
2656
cf14c2e9
PR
2657 if (mdev->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2658 if (hg == 0)
2659 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2660 else
2661 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2662 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2663 abs(hg) >= 2 ? "full" : "bit-map based");
2664 return C_MASK;
2665 }
2666
b411b363
PR
2667 if (abs(hg) >= 2) {
2668 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
20ceb2b2
LE
2669 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2670 BM_LOCKED_SET_ALLOWED))
b411b363
PR
2671 return C_MASK;
2672 }
2673
2674 if (hg > 0) { /* become sync source. */
2675 rv = C_WF_BITMAP_S;
2676 } else if (hg < 0) { /* become sync target */
2677 rv = C_WF_BITMAP_T;
2678 } else {
2679 rv = C_CONNECTED;
2680 if (drbd_bm_total_weight(mdev)) {
2681 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2682 drbd_bm_total_weight(mdev));
2683 }
2684 }
2685
2686 return rv;
2687}
2688
2689/* returns 1 if invalid */
2690static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2691{
2692 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2693 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2694 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2695 return 0;
2696
2697 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2698 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2699 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2700 return 1;
2701
2702 /* everything else is valid if they are equal on both sides. */
2703 if (peer == self)
2704 return 0;
2705
2706 /* everything es is invalid. */
2707 return 1;
2708}
2709
02918be2 2710static int receive_protocol(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
b411b363 2711{
02918be2 2712 struct p_protocol *p = &mdev->data.rbuf.protocol;
b411b363 2713 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
cf14c2e9 2714 int p_want_lose, p_two_primaries, cf;
b411b363
PR
2715 char p_integrity_alg[SHARED_SECRET_MAX] = "";
2716
b411b363
PR
2717 p_proto = be32_to_cpu(p->protocol);
2718 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
2719 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
2720 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
b411b363 2721 p_two_primaries = be32_to_cpu(p->two_primaries);
cf14c2e9
PR
2722 cf = be32_to_cpu(p->conn_flags);
2723 p_want_lose = cf & CF_WANT_LOSE;
2724
2725 clear_bit(CONN_DRY_RUN, &mdev->flags);
2726
2727 if (cf & CF_DRY_RUN)
2728 set_bit(CONN_DRY_RUN, &mdev->flags);
b411b363
PR
2729
2730 if (p_proto != mdev->net_conf->wire_protocol) {
2731 dev_err(DEV, "incompatible communication protocols\n");
2732 goto disconnect;
2733 }
2734
2735 if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2736 dev_err(DEV, "incompatible after-sb-0pri settings\n");
2737 goto disconnect;
2738 }
2739
2740 if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2741 dev_err(DEV, "incompatible after-sb-1pri settings\n");
2742 goto disconnect;
2743 }
2744
2745 if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2746 dev_err(DEV, "incompatible after-sb-2pri settings\n");
2747 goto disconnect;
2748 }
2749
2750 if (p_want_lose && mdev->net_conf->want_lose) {
2751 dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2752 goto disconnect;
2753 }
2754
2755 if (p_two_primaries != mdev->net_conf->two_primaries) {
2756 dev_err(DEV, "incompatible setting of the two-primaries options\n");
2757 goto disconnect;
2758 }
2759
2760 if (mdev->agreed_pro_version >= 87) {
2761 unsigned char *my_alg = mdev->net_conf->integrity_alg;
2762
2763 if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
81e84650 2764 return false;
b411b363
PR
2765
2766 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2767 if (strcmp(p_integrity_alg, my_alg)) {
2768 dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2769 goto disconnect;
2770 }
2771 dev_info(DEV, "data-integrity-alg: %s\n",
2772 my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2773 }
2774
81e84650 2775 return true;
b411b363
PR
2776
2777disconnect:
2778 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 2779 return false;
b411b363
PR
2780}
2781
2782/* helper function
2783 * input: alg name, feature name
2784 * return: NULL (alg name was "")
2785 * ERR_PTR(error) if something goes wrong
2786 * or the crypto hash ptr, if it worked out ok. */
2787struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2788 const char *alg, const char *name)
2789{
2790 struct crypto_hash *tfm;
2791
2792 if (!alg[0])
2793 return NULL;
2794
2795 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2796 if (IS_ERR(tfm)) {
2797 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2798 alg, name, PTR_ERR(tfm));
2799 return tfm;
2800 }
2801 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2802 crypto_free_hash(tfm);
2803 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2804 return ERR_PTR(-EINVAL);
2805 }
2806 return tfm;
2807}
2808
02918be2 2809static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int packet_size)
b411b363 2810{
81e84650 2811 int ok = true;
02918be2 2812 struct p_rs_param_95 *p = &mdev->data.rbuf.rs_param_95;
b411b363
PR
2813 unsigned int header_size, data_size, exp_max_sz;
2814 struct crypto_hash *verify_tfm = NULL;
2815 struct crypto_hash *csums_tfm = NULL;
2816 const int apv = mdev->agreed_pro_version;
778f271d
PR
2817 int *rs_plan_s = NULL;
2818 int fifo_size = 0;
b411b363
PR
2819
2820 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
2821 : apv == 88 ? sizeof(struct p_rs_param)
2822 + SHARED_SECRET_MAX
8e26f9cc
PR
2823 : apv <= 94 ? sizeof(struct p_rs_param_89)
2824 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
b411b363 2825
02918be2 2826 if (packet_size > exp_max_sz) {
b411b363 2827 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
02918be2 2828 packet_size, exp_max_sz);
81e84650 2829 return false;
b411b363
PR
2830 }
2831
2832 if (apv <= 88) {
02918be2
PR
2833 header_size = sizeof(struct p_rs_param) - sizeof(struct p_header80);
2834 data_size = packet_size - header_size;
8e26f9cc 2835 } else if (apv <= 94) {
02918be2
PR
2836 header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header80);
2837 data_size = packet_size - header_size;
b411b363 2838 D_ASSERT(data_size == 0);
8e26f9cc 2839 } else {
02918be2
PR
2840 header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header80);
2841 data_size = packet_size - header_size;
b411b363
PR
2842 D_ASSERT(data_size == 0);
2843 }
2844
2845 /* initialize verify_alg and csums_alg */
2846 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2847
02918be2 2848 if (drbd_recv(mdev, &p->head.payload, header_size) != header_size)
81e84650 2849 return false;
b411b363
PR
2850
2851 mdev->sync_conf.rate = be32_to_cpu(p->rate);
2852
2853 if (apv >= 88) {
2854 if (apv == 88) {
5de73827
PR
2855 if (data_size > SHARED_SECRET_MAX || data_size == 0) {
2856 dev_err(DEV, "verify-alg of wrong size, "
2857 "peer wants %u, accepting only up to %u byte\n",
2858 data_size, SHARED_SECRET_MAX);
81e84650 2859 return false;
b411b363
PR
2860 }
2861
2862 if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
81e84650 2863 return false;
b411b363
PR
2864
2865 /* we expect NUL terminated string */
2866 /* but just in case someone tries to be evil */
2867 D_ASSERT(p->verify_alg[data_size-1] == 0);
2868 p->verify_alg[data_size-1] = 0;
2869
2870 } else /* apv >= 89 */ {
2871 /* we still expect NUL terminated strings */
2872 /* but just in case someone tries to be evil */
2873 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2874 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2875 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2876 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2877 }
2878
2879 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2880 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2881 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2882 mdev->sync_conf.verify_alg, p->verify_alg);
2883 goto disconnect;
2884 }
2885 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2886 p->verify_alg, "verify-alg");
2887 if (IS_ERR(verify_tfm)) {
2888 verify_tfm = NULL;
2889 goto disconnect;
2890 }
2891 }
2892
2893 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2894 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2895 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2896 mdev->sync_conf.csums_alg, p->csums_alg);
2897 goto disconnect;
2898 }
2899 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2900 p->csums_alg, "csums-alg");
2901 if (IS_ERR(csums_tfm)) {
2902 csums_tfm = NULL;
2903 goto disconnect;
2904 }
2905 }
2906
8e26f9cc
PR
2907 if (apv > 94) {
2908 mdev->sync_conf.rate = be32_to_cpu(p->rate);
2909 mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
2910 mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target);
2911 mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target);
2912 mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate);
778f271d
PR
2913
2914 fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
2915 if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
2916 rs_plan_s = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
2917 if (!rs_plan_s) {
2918 dev_err(DEV, "kmalloc of fifo_buffer failed");
2919 goto disconnect;
2920 }
2921 }
8e26f9cc 2922 }
b411b363
PR
2923
2924 spin_lock(&mdev->peer_seq_lock);
2925 /* lock against drbd_nl_syncer_conf() */
2926 if (verify_tfm) {
2927 strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2928 mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2929 crypto_free_hash(mdev->verify_tfm);
2930 mdev->verify_tfm = verify_tfm;
2931 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2932 }
2933 if (csums_tfm) {
2934 strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2935 mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2936 crypto_free_hash(mdev->csums_tfm);
2937 mdev->csums_tfm = csums_tfm;
2938 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2939 }
778f271d
PR
2940 if (fifo_size != mdev->rs_plan_s.size) {
2941 kfree(mdev->rs_plan_s.values);
2942 mdev->rs_plan_s.values = rs_plan_s;
2943 mdev->rs_plan_s.size = fifo_size;
2944 mdev->rs_planed = 0;
2945 }
b411b363
PR
2946 spin_unlock(&mdev->peer_seq_lock);
2947 }
2948
2949 return ok;
2950disconnect:
2951 /* just for completeness: actually not needed,
2952 * as this is not reached if csums_tfm was ok. */
2953 crypto_free_hash(csums_tfm);
2954 /* but free the verify_tfm again, if csums_tfm did not work out */
2955 crypto_free_hash(verify_tfm);
2956 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 2957 return false;
b411b363
PR
2958}
2959
b411b363
PR
2960/* warn if the arguments differ by more than 12.5% */
2961static void warn_if_differ_considerably(struct drbd_conf *mdev,
2962 const char *s, sector_t a, sector_t b)
2963{
2964 sector_t d;
2965 if (a == 0 || b == 0)
2966 return;
2967 d = (a > b) ? (a - b) : (b - a);
2968 if (d > (a>>3) || d > (b>>3))
2969 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2970 (unsigned long long)a, (unsigned long long)b);
2971}
2972
02918be2 2973static int receive_sizes(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
b411b363 2974{
02918be2 2975 struct p_sizes *p = &mdev->data.rbuf.sizes;
b411b363 2976 enum determine_dev_size dd = unchanged;
b411b363
PR
2977 sector_t p_size, p_usize, my_usize;
2978 int ldsc = 0; /* local disk size changed */
e89b591c 2979 enum dds_flags ddsf;
b411b363 2980
b411b363
PR
2981 p_size = be64_to_cpu(p->d_size);
2982 p_usize = be64_to_cpu(p->u_size);
2983
2984 if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2985 dev_err(DEV, "some backing storage is needed\n");
2986 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 2987 return false;
b411b363
PR
2988 }
2989
2990 /* just store the peer's disk size for now.
2991 * we still need to figure out whether we accept that. */
2992 mdev->p_size = p_size;
2993
b411b363
PR
2994 if (get_ldev(mdev)) {
2995 warn_if_differ_considerably(mdev, "lower level device sizes",
2996 p_size, drbd_get_max_capacity(mdev->ldev));
2997 warn_if_differ_considerably(mdev, "user requested size",
2998 p_usize, mdev->ldev->dc.disk_size);
2999
3000 /* if this is the first connect, or an otherwise expected
3001 * param exchange, choose the minimum */
3002 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3003 p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3004 p_usize);
3005
3006 my_usize = mdev->ldev->dc.disk_size;
3007
3008 if (mdev->ldev->dc.disk_size != p_usize) {
3009 mdev->ldev->dc.disk_size = p_usize;
3010 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3011 (unsigned long)mdev->ldev->dc.disk_size);
3012 }
3013
3014 /* Never shrink a device with usable data during connect.
3015 But allow online shrinking if we are connected. */
a393db6f 3016 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
b411b363
PR
3017 drbd_get_capacity(mdev->this_bdev) &&
3018 mdev->state.disk >= D_OUTDATED &&
3019 mdev->state.conn < C_CONNECTED) {
3020 dev_err(DEV, "The peer's disk size is too small!\n");
3021 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3022 mdev->ldev->dc.disk_size = my_usize;
3023 put_ldev(mdev);
81e84650 3024 return false;
b411b363
PR
3025 }
3026 put_ldev(mdev);
3027 }
b411b363 3028
e89b591c 3029 ddsf = be16_to_cpu(p->dds_flags);
b411b363 3030 if (get_ldev(mdev)) {
24c4830c 3031 dd = drbd_determine_dev_size(mdev, ddsf);
b411b363
PR
3032 put_ldev(mdev);
3033 if (dd == dev_size_error)
81e84650 3034 return false;
b411b363
PR
3035 drbd_md_sync(mdev);
3036 } else {
3037 /* I am diskless, need to accept the peer's size. */
3038 drbd_set_my_capacity(mdev, p_size);
3039 }
3040
99432fcc
PR
3041 mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3042 drbd_reconsider_max_bio_size(mdev);
3043
b411b363
PR
3044 if (get_ldev(mdev)) {
3045 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3046 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3047 ldsc = 1;
3048 }
3049
b411b363
PR
3050 put_ldev(mdev);
3051 }
3052
3053 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3054 if (be64_to_cpu(p->c_size) !=
3055 drbd_get_capacity(mdev->this_bdev) || ldsc) {
3056 /* we have different sizes, probably peer
3057 * needs to know my new size... */
e89b591c 3058 drbd_send_sizes(mdev, 0, ddsf);
b411b363
PR
3059 }
3060 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3061 (dd == grew && mdev->state.conn == C_CONNECTED)) {
3062 if (mdev->state.pdsk >= D_INCONSISTENT &&
e89b591c
PR
3063 mdev->state.disk >= D_INCONSISTENT) {
3064 if (ddsf & DDSF_NO_RESYNC)
3065 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3066 else
3067 resync_after_online_grow(mdev);
3068 } else
b411b363
PR
3069 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3070 }
3071 }
3072
81e84650 3073 return true;
b411b363
PR
3074}
3075
02918be2 3076static int receive_uuids(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
b411b363 3077{
02918be2 3078 struct p_uuids *p = &mdev->data.rbuf.uuids;
b411b363 3079 u64 *p_uuid;
62b0da3a 3080 int i, updated_uuids = 0;
b411b363 3081
b411b363
PR
3082 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3083
3084 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3085 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3086
3087 kfree(mdev->p_uuid);
3088 mdev->p_uuid = p_uuid;
3089
3090 if (mdev->state.conn < C_CONNECTED &&
3091 mdev->state.disk < D_INCONSISTENT &&
3092 mdev->state.role == R_PRIMARY &&
3093 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3094 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3095 (unsigned long long)mdev->ed_uuid);
3096 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 3097 return false;
b411b363
PR
3098 }
3099
3100 if (get_ldev(mdev)) {
3101 int skip_initial_sync =
3102 mdev->state.conn == C_CONNECTED &&
3103 mdev->agreed_pro_version >= 90 &&
3104 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3105 (p_uuid[UI_FLAGS] & 8);
3106 if (skip_initial_sync) {
3107 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3108 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
20ceb2b2
LE
3109 "clear_n_write from receive_uuids",
3110 BM_LOCKED_TEST_ALLOWED);
b411b363
PR
3111 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3112 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3113 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3114 CS_VERBOSE, NULL);
3115 drbd_md_sync(mdev);
62b0da3a 3116 updated_uuids = 1;
b411b363
PR
3117 }
3118 put_ldev(mdev);
18a50fa2
PR
3119 } else if (mdev->state.disk < D_INCONSISTENT &&
3120 mdev->state.role == R_PRIMARY) {
3121 /* I am a diskless primary, the peer just created a new current UUID
3122 for me. */
62b0da3a 3123 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
b411b363
PR
3124 }
3125
3126 /* Before we test for the disk state, we should wait until an eventually
3127 ongoing cluster wide state change is finished. That is important if
3128 we are primary and are detaching from our disk. We need to see the
3129 new disk state... */
3130 wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3131 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
62b0da3a
LE
3132 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3133
3134 if (updated_uuids)
3135 drbd_print_uuids(mdev, "receiver updated UUIDs to");
b411b363 3136
81e84650 3137 return true;
b411b363
PR
3138}
3139
3140/**
3141 * convert_state() - Converts the peer's view of the cluster state to our point of view
3142 * @ps: The state as seen by the peer.
3143 */
3144static union drbd_state convert_state(union drbd_state ps)
3145{
3146 union drbd_state ms;
3147
3148 static enum drbd_conns c_tab[] = {
3149 [C_CONNECTED] = C_CONNECTED,
3150
3151 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3152 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3153 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3154 [C_VERIFY_S] = C_VERIFY_T,
3155 [C_MASK] = C_MASK,
3156 };
3157
3158 ms.i = ps.i;
3159
3160 ms.conn = c_tab[ps.conn];
3161 ms.peer = ps.role;
3162 ms.role = ps.peer;
3163 ms.pdsk = ps.disk;
3164 ms.disk = ps.pdsk;
3165 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3166
3167 return ms;
3168}
3169
02918be2 3170static int receive_req_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
b411b363 3171{
02918be2 3172 struct p_req_state *p = &mdev->data.rbuf.req_state;
b411b363 3173 union drbd_state mask, val;
bf885f8a 3174 enum drbd_state_rv rv;
b411b363 3175
b411b363
PR
3176 mask.i = be32_to_cpu(p->mask);
3177 val.i = be32_to_cpu(p->val);
3178
3179 if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3180 test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3181 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
81e84650 3182 return true;
b411b363
PR
3183 }
3184
3185 mask = convert_state(mask);
3186 val = convert_state(val);
3187
3188 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3189
3190 drbd_send_sr_reply(mdev, rv);
3191 drbd_md_sync(mdev);
3192
81e84650 3193 return true;
b411b363
PR
3194}
3195
02918be2 3196static int receive_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
b411b363 3197{
02918be2 3198 struct p_state *p = &mdev->data.rbuf.state;
4ac4aada 3199 union drbd_state os, ns, peer_state;
b411b363 3200 enum drbd_disk_state real_peer_disk;
65d922c3 3201 enum chg_state_flags cs_flags;
b411b363
PR
3202 int rv;
3203
b411b363
PR
3204 peer_state.i = be32_to_cpu(p->state);
3205
3206 real_peer_disk = peer_state.disk;
3207 if (peer_state.disk == D_NEGOTIATING) {
3208 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3209 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3210 }
3211
3212 spin_lock_irq(&mdev->req_lock);
3213 retry:
4ac4aada 3214 os = ns = mdev->state;
b411b363
PR
3215 spin_unlock_irq(&mdev->req_lock);
3216
545752d5
LE
3217 /* If some other part of the code (asender thread, timeout)
3218 * already decided to close the connection again,
3219 * we must not "re-establish" it here. */
3220 if (os.conn <= C_TEAR_DOWN)
3221 return false;
3222
40424e4a
LE
3223 /* If this is the "end of sync" confirmation, usually the peer disk
3224 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3225 * set) resync started in PausedSyncT, or if the timing of pause-/
3226 * unpause-sync events has been "just right", the peer disk may
3227 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3228 */
3229 if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3230 real_peer_disk == D_UP_TO_DATE &&
e9ef7bb6
LE
3231 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3232 /* If we are (becoming) SyncSource, but peer is still in sync
3233 * preparation, ignore its uptodate-ness to avoid flapping, it
3234 * will change to inconsistent once the peer reaches active
3235 * syncing states.
3236 * It may have changed syncer-paused flags, however, so we
3237 * cannot ignore this completely. */
3238 if (peer_state.conn > C_CONNECTED &&
3239 peer_state.conn < C_SYNC_SOURCE)
3240 real_peer_disk = D_INCONSISTENT;
3241
3242 /* if peer_state changes to connected at the same time,
3243 * it explicitly notifies us that it finished resync.
3244 * Maybe we should finish it up, too? */
3245 else if (os.conn >= C_SYNC_SOURCE &&
3246 peer_state.conn == C_CONNECTED) {
3247 if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3248 drbd_resync_finished(mdev);
81e84650 3249 return true;
e9ef7bb6
LE
3250 }
3251 }
3252
3253 /* peer says his disk is inconsistent, while we think it is uptodate,
3254 * and this happens while the peer still thinks we have a sync going on,
3255 * but we think we are already done with the sync.
3256 * We ignore this to avoid flapping pdsk.
3257 * This should not happen, if the peer is a recent version of drbd. */
3258 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3259 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3260 real_peer_disk = D_UP_TO_DATE;
3261
4ac4aada
LE
3262 if (ns.conn == C_WF_REPORT_PARAMS)
3263 ns.conn = C_CONNECTED;
b411b363 3264
67531718
PR
3265 if (peer_state.conn == C_AHEAD)
3266 ns.conn = C_BEHIND;
3267
b411b363
PR
3268 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3269 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3270 int cr; /* consider resync */
3271
3272 /* if we established a new connection */
4ac4aada 3273 cr = (os.conn < C_CONNECTED);
b411b363
PR
3274 /* if we had an established connection
3275 * and one of the nodes newly attaches a disk */
4ac4aada 3276 cr |= (os.conn == C_CONNECTED &&
b411b363 3277 (peer_state.disk == D_NEGOTIATING ||
4ac4aada 3278 os.disk == D_NEGOTIATING));
b411b363
PR
3279 /* if we have both been inconsistent, and the peer has been
3280 * forced to be UpToDate with --overwrite-data */
3281 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3282 /* if we had been plain connected, and the admin requested to
3283 * start a sync by "invalidate" or "invalidate-remote" */
4ac4aada 3284 cr |= (os.conn == C_CONNECTED &&
b411b363
PR
3285 (peer_state.conn >= C_STARTING_SYNC_S &&
3286 peer_state.conn <= C_WF_BITMAP_T));
3287
3288 if (cr)
4ac4aada 3289 ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
b411b363
PR
3290
3291 put_ldev(mdev);
4ac4aada
LE
3292 if (ns.conn == C_MASK) {
3293 ns.conn = C_CONNECTED;
b411b363 3294 if (mdev->state.disk == D_NEGOTIATING) {
82f59cc6 3295 drbd_force_state(mdev, NS(disk, D_FAILED));
b411b363
PR
3296 } else if (peer_state.disk == D_NEGOTIATING) {
3297 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3298 peer_state.disk = D_DISKLESS;
580b9767 3299 real_peer_disk = D_DISKLESS;
b411b363 3300 } else {
cf14c2e9 3301 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
81e84650 3302 return false;
4ac4aada 3303 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
b411b363 3304 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 3305 return false;
b411b363
PR
3306 }
3307 }
3308 }
3309
3310 spin_lock_irq(&mdev->req_lock);
4ac4aada 3311 if (mdev->state.i != os.i)
b411b363
PR
3312 goto retry;
3313 clear_bit(CONSIDER_RESYNC, &mdev->flags);
b411b363
PR
3314 ns.peer = peer_state.role;
3315 ns.pdsk = real_peer_disk;
3316 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4ac4aada 3317 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
b411b363 3318 ns.disk = mdev->new_state_tmp.disk;
4ac4aada
LE
3319 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3320 if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
481c6f50
PR
3321 test_bit(NEW_CUR_UUID, &mdev->flags)) {
3322 /* Do not allow tl_restart(resend) for a rebooted peer. We can only allow this
3323 for temporal network outages! */
3324 spin_unlock_irq(&mdev->req_lock);
3325 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3326 tl_clear(mdev);
3327 drbd_uuid_new_current(mdev);
3328 clear_bit(NEW_CUR_UUID, &mdev->flags);
3329 drbd_force_state(mdev, NS2(conn, C_PROTOCOL_ERROR, susp, 0));
81e84650 3330 return false;
481c6f50 3331 }
65d922c3 3332 rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
b411b363
PR
3333 ns = mdev->state;
3334 spin_unlock_irq(&mdev->req_lock);
3335
3336 if (rv < SS_SUCCESS) {
3337 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
81e84650 3338 return false;
b411b363
PR
3339 }
3340
4ac4aada
LE
3341 if (os.conn > C_WF_REPORT_PARAMS) {
3342 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
b411b363
PR
3343 peer_state.disk != D_NEGOTIATING ) {
3344 /* we want resync, peer has not yet decided to sync... */
3345 /* Nowadays only used when forcing a node into primary role and
3346 setting its disk to UpToDate with that */
3347 drbd_send_uuids(mdev);
f479ea06 3348 drbd_send_current_state(mdev);
b411b363
PR
3349 }
3350 }
3351
3352 mdev->net_conf->want_lose = 0;
3353
3354 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3355
81e84650 3356 return true;
b411b363
PR
3357}
3358
02918be2 3359static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
b411b363 3360{
02918be2 3361 struct p_rs_uuid *p = &mdev->data.rbuf.rs_uuid;
b411b363
PR
3362
3363 wait_event(mdev->misc_wait,
3364 mdev->state.conn == C_WF_SYNC_UUID ||
c4752ef1 3365 mdev->state.conn == C_BEHIND ||
b411b363
PR
3366 mdev->state.conn < C_CONNECTED ||
3367 mdev->state.disk < D_NEGOTIATING);
3368
3369 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3370
b411b363
PR
3371 /* Here the _drbd_uuid_ functions are right, current should
3372 _not_ be rotated into the history */
3373 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3374 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3375 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3376
62b0da3a 3377 drbd_print_uuids(mdev, "updated sync uuid");
b411b363
PR
3378 drbd_start_resync(mdev, C_SYNC_TARGET);
3379
3380 put_ldev(mdev);
3381 } else
3382 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3383
81e84650 3384 return true;
b411b363
PR
3385}
3386
2c46407d
AG
3387/**
3388 * receive_bitmap_plain
3389 *
3390 * Return 0 when done, 1 when another iteration is needed, and a negative error
3391 * code upon failure.
3392 */
3393static int
02918be2
PR
3394receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3395 unsigned long *buffer, struct bm_xfer_ctx *c)
b411b363
PR
3396{
3397 unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3398 unsigned want = num_words * sizeof(long);
2c46407d 3399 int err;
b411b363 3400
02918be2
PR
3401 if (want != data_size) {
3402 dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
2c46407d 3403 return -EIO;
b411b363
PR
3404 }
3405 if (want == 0)
2c46407d
AG
3406 return 0;
3407 err = drbd_recv(mdev, buffer, want);
3408 if (err != want) {
3409 if (err >= 0)
3410 err = -EIO;
3411 return err;
3412 }
b411b363
PR
3413
3414 drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3415
3416 c->word_offset += num_words;
3417 c->bit_offset = c->word_offset * BITS_PER_LONG;
3418 if (c->bit_offset > c->bm_bits)
3419 c->bit_offset = c->bm_bits;
3420
2c46407d 3421 return 1;
b411b363
PR
3422}
3423
2c46407d
AG
3424/**
3425 * recv_bm_rle_bits
3426 *
3427 * Return 0 when done, 1 when another iteration is needed, and a negative error
3428 * code upon failure.
3429 */
3430static int
b411b363
PR
3431recv_bm_rle_bits(struct drbd_conf *mdev,
3432 struct p_compressed_bm *p,
3433 struct bm_xfer_ctx *c)
3434{
3435 struct bitstream bs;
3436 u64 look_ahead;
3437 u64 rl;
3438 u64 tmp;
3439 unsigned long s = c->bit_offset;
3440 unsigned long e;
004352fa 3441 int len = be16_to_cpu(p->head.length) - (sizeof(*p) - sizeof(p->head));
b411b363
PR
3442 int toggle = DCBP_get_start(p);
3443 int have;
3444 int bits;
3445
3446 bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3447
3448 bits = bitstream_get_bits(&bs, &look_ahead, 64);
3449 if (bits < 0)
2c46407d 3450 return -EIO;
b411b363
PR
3451
3452 for (have = bits; have > 0; s += rl, toggle = !toggle) {
3453 bits = vli_decode_bits(&rl, look_ahead);
3454 if (bits <= 0)
2c46407d 3455 return -EIO;
b411b363
PR
3456
3457 if (toggle) {
3458 e = s + rl -1;
3459 if (e >= c->bm_bits) {
3460 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
2c46407d 3461 return -EIO;
b411b363
PR
3462 }
3463 _drbd_bm_set_bits(mdev, s, e);
3464 }
3465
3466 if (have < bits) {
3467 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3468 have, bits, look_ahead,
3469 (unsigned int)(bs.cur.b - p->code),
3470 (unsigned int)bs.buf_len);
2c46407d 3471 return -EIO;
b411b363
PR
3472 }
3473 look_ahead >>= bits;
3474 have -= bits;
3475
3476 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3477 if (bits < 0)
2c46407d 3478 return -EIO;
b411b363
PR
3479 look_ahead |= tmp << have;
3480 have += bits;
3481 }
3482
3483 c->bit_offset = s;
3484 bm_xfer_ctx_bit_to_word_offset(c);
3485
2c46407d 3486 return (s != c->bm_bits);
b411b363
PR
3487}
3488
2c46407d
AG
3489/**
3490 * decode_bitmap_c
3491 *
3492 * Return 0 when done, 1 when another iteration is needed, and a negative error
3493 * code upon failure.
3494 */
3495static int
b411b363
PR
3496decode_bitmap_c(struct drbd_conf *mdev,
3497 struct p_compressed_bm *p,
3498 struct bm_xfer_ctx *c)
3499{
3500 if (DCBP_get_code(p) == RLE_VLI_Bits)
3501 return recv_bm_rle_bits(mdev, p, c);
3502
3503 /* other variants had been implemented for evaluation,
3504 * but have been dropped as this one turned out to be "best"
3505 * during all our tests. */
3506
3507 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3508 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
2c46407d 3509 return -EIO;
b411b363
PR
3510}
3511
3512void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3513 const char *direction, struct bm_xfer_ctx *c)
3514{
3515 /* what would it take to transfer it "plaintext" */
0b70a13d 3516 unsigned plain = sizeof(struct p_header80) *
b411b363
PR
3517 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3518 + c->bm_words * sizeof(long);
3519 unsigned total = c->bytes[0] + c->bytes[1];
3520 unsigned r;
3521
3522 /* total can not be zero. but just in case: */
3523 if (total == 0)
3524 return;
3525
3526 /* don't report if not compressed */
3527 if (total >= plain)
3528 return;
3529
3530 /* total < plain. check for overflow, still */
3531 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3532 : (1000 * total / plain);
3533
3534 if (r > 1000)
3535 r = 1000;
3536
3537 r = 1000 - r;
3538 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3539 "total %u; compression: %u.%u%%\n",
3540 direction,
3541 c->bytes[1], c->packets[1],
3542 c->bytes[0], c->packets[0],
3543 total, r/10, r % 10);
3544}
3545
3546/* Since we are processing the bitfield from lower addresses to higher,
3547 it does not matter if the process it in 32 bit chunks or 64 bit
3548 chunks as long as it is little endian. (Understand it as byte stream,
3549 beginning with the lowest byte...) If we would use big endian
3550 we would need to process it from the highest address to the lowest,
3551 in order to be agnostic to the 32 vs 64 bits issue.
3552
3553 returns 0 on failure, 1 if we successfully received it. */
02918be2 3554static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
b411b363
PR
3555{
3556 struct bm_xfer_ctx c;
3557 void *buffer;
2c46407d 3558 int err;
81e84650 3559 int ok = false;
02918be2 3560 struct p_header80 *h = &mdev->data.rbuf.header.h80;
b411b363 3561
20ceb2b2
LE
3562 drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3563 /* you are supposed to send additional out-of-sync information
3564 * if you actually set bits during this phase */
b411b363
PR
3565
3566 /* maybe we should use some per thread scratch page,
3567 * and allocate that during initial device creation? */
3568 buffer = (unsigned long *) __get_free_page(GFP_NOIO);
3569 if (!buffer) {
3570 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3571 goto out;
3572 }
3573
3574 c = (struct bm_xfer_ctx) {
3575 .bm_bits = drbd_bm_bits(mdev),
3576 .bm_words = drbd_bm_words(mdev),
3577 };
3578
2c46407d 3579 for(;;) {
02918be2 3580 if (cmd == P_BITMAP) {
2c46407d 3581 err = receive_bitmap_plain(mdev, data_size, buffer, &c);
02918be2 3582 } else if (cmd == P_COMPRESSED_BITMAP) {
b411b363
PR
3583 /* MAYBE: sanity check that we speak proto >= 90,
3584 * and the feature is enabled! */
3585 struct p_compressed_bm *p;
3586
02918be2 3587 if (data_size > BM_PACKET_PAYLOAD_BYTES) {
b411b363
PR
3588 dev_err(DEV, "ReportCBitmap packet too large\n");
3589 goto out;
3590 }
3591 /* use the page buff */
3592 p = buffer;
3593 memcpy(p, h, sizeof(*h));
02918be2 3594 if (drbd_recv(mdev, p->head.payload, data_size) != data_size)
b411b363 3595 goto out;
004352fa
LE
3596 if (data_size <= (sizeof(*p) - sizeof(p->head))) {
3597 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size);
78fcbdae 3598 goto out;
b411b363 3599 }
2c46407d 3600 err = decode_bitmap_c(mdev, p, &c);
b411b363 3601 } else {
02918be2 3602 dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);
b411b363
PR
3603 goto out;
3604 }
3605
02918be2
PR
3606 c.packets[cmd == P_BITMAP]++;
3607 c.bytes[cmd == P_BITMAP] += sizeof(struct p_header80) + data_size;
b411b363 3608
2c46407d
AG
3609 if (err <= 0) {
3610 if (err < 0)
3611 goto out;
b411b363 3612 break;
2c46407d 3613 }
02918be2 3614 if (!drbd_recv_header(mdev, &cmd, &data_size))
b411b363 3615 goto out;
2c46407d 3616 }
b411b363
PR
3617
3618 INFO_bm_xfer_stats(mdev, "receive", &c);
3619
3620 if (mdev->state.conn == C_WF_BITMAP_T) {
de1f8e4a
AG
3621 enum drbd_state_rv rv;
3622
b411b363
PR
3623 ok = !drbd_send_bitmap(mdev);
3624 if (!ok)
3625 goto out;
3626 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
de1f8e4a
AG
3627 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3628 D_ASSERT(rv == SS_SUCCESS);
b411b363
PR
3629 } else if (mdev->state.conn != C_WF_BITMAP_S) {
3630 /* admin may have requested C_DISCONNECTING,
3631 * other threads may have noticed network errors */
3632 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3633 drbd_conn_str(mdev->state.conn));
3634 }
3635
81e84650 3636 ok = true;
b411b363 3637 out:
20ceb2b2 3638 drbd_bm_unlock(mdev);
b411b363
PR
3639 if (ok && mdev->state.conn == C_WF_BITMAP_S)
3640 drbd_start_resync(mdev, C_SYNC_SOURCE);
3641 free_page((unsigned long) buffer);
3642 return ok;
3643}
3644
02918be2 3645static int receive_skip(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
b411b363
PR
3646{
3647 /* TODO zero copy sink :) */
3648 static char sink[128];
3649 int size, want, r;
3650
02918be2
PR
3651 dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3652 cmd, data_size);
b411b363 3653
02918be2 3654 size = data_size;
b411b363
PR
3655 while (size > 0) {
3656 want = min_t(int, size, sizeof(sink));
3657 r = drbd_recv(mdev, sink, want);
3658 ERR_IF(r <= 0) break;
3659 size -= r;
3660 }
3661 return size == 0;
3662}
3663
02918be2 3664static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
0ced55a3 3665{
e7f52dfb
LE
3666 /* Make sure we've acked all the TCP data associated
3667 * with the data requests being unplugged */
3668 drbd_tcp_quickack(mdev->data.socket);
0ced55a3 3669
81e84650 3670 return true;
0ced55a3
PR
3671}
3672
73a01a18
PR
3673static int receive_out_of_sync(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3674{
3675 struct p_block_desc *p = &mdev->data.rbuf.block_desc;
3676
f735e363
LE
3677 switch (mdev->state.conn) {
3678 case C_WF_SYNC_UUID:
3679 case C_WF_BITMAP_T:
3680 case C_BEHIND:
3681 break;
3682 default:
3683 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3684 drbd_conn_str(mdev->state.conn));
3685 }
3686
73a01a18
PR
3687 drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3688
81e84650 3689 return true;
73a01a18
PR
3690}
3691
02918be2
PR
3692typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, enum drbd_packets cmd, unsigned int to_receive);
3693
3694struct data_cmd {
3695 int expect_payload;
3696 size_t pkt_size;
3697 drbd_cmd_handler_f function;
3698};
3699
3700static struct data_cmd drbd_cmd_handler[] = {
3701 [P_DATA] = { 1, sizeof(struct p_data), receive_Data },
3702 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
3703 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3704 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
3705 [P_BITMAP] = { 1, sizeof(struct p_header80), receive_bitmap } ,
3706 [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header80), receive_bitmap } ,
3707 [P_UNPLUG_REMOTE] = { 0, sizeof(struct p_header80), receive_UnplugRemote },
3708 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3709 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3710 [P_SYNC_PARAM] = { 1, sizeof(struct p_header80), receive_SyncParam },
3711 [P_SYNC_PARAM89] = { 1, sizeof(struct p_header80), receive_SyncParam },
3712 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
3713 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
3714 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
3715 [P_STATE] = { 0, sizeof(struct p_state), receive_state },
3716 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
3717 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
3718 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3719 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3720 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3721 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
73a01a18 3722 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
b411b363
PR
3723 /* anything missing from this table is in
3724 * the asender_tbl, see get_asender_cmd */
02918be2 3725 [P_MAX_CMD] = { 0, 0, NULL },
b411b363
PR
3726};
3727
02918be2
PR
3728/* All handler functions that expect a sub-header get that sub-heder in
3729 mdev->data.rbuf.header.head.payload.
3730
3731 Usually in mdev->data.rbuf.header.head the callback can find the usual
3732 p_header, but they may not rely on that. Since there is also p_header95 !
3733 */
b411b363
PR
3734
3735static void drbdd(struct drbd_conf *mdev)
3736{
02918be2
PR
3737 union p_header *header = &mdev->data.rbuf.header;
3738 unsigned int packet_size;
3739 enum drbd_packets cmd;
3740 size_t shs; /* sub header size */
3741 int rv;
b411b363
PR
3742
3743 while (get_t_state(&mdev->receiver) == Running) {
3744 drbd_thread_current_set_cpu(mdev);
02918be2
PR
3745 if (!drbd_recv_header(mdev, &cmd, &packet_size))
3746 goto err_out;
b411b363 3747
02918be2
PR
3748 if (unlikely(cmd >= P_MAX_CMD || !drbd_cmd_handler[cmd].function)) {
3749 dev_err(DEV, "unknown packet type %d, l: %d!\n", cmd, packet_size);
3750 goto err_out;
0b33a916 3751 }
b411b363 3752
02918be2 3753 shs = drbd_cmd_handler[cmd].pkt_size - sizeof(union p_header);
02918be2
PR
3754 if (packet_size - shs > 0 && !drbd_cmd_handler[cmd].expect_payload) {
3755 dev_err(DEV, "No payload expected %s l:%d\n", cmdname(cmd), packet_size);
3756 goto err_out;
b411b363 3757 }
b411b363 3758
c13f7e1a
LE
3759 if (shs) {
3760 rv = drbd_recv(mdev, &header->h80.payload, shs);
3761 if (unlikely(rv != shs)) {
0ddc5549
LE
3762 if (!signal_pending(current))
3763 dev_warn(DEV, "short read while reading sub header: rv=%d\n", rv);
c13f7e1a
LE
3764 goto err_out;
3765 }
3766 }
3767
02918be2 3768 rv = drbd_cmd_handler[cmd].function(mdev, cmd, packet_size - shs);
b411b363 3769
02918be2 3770 if (unlikely(!rv)) {
b411b363 3771 dev_err(DEV, "error receiving %s, l: %d!\n",
02918be2
PR
3772 cmdname(cmd), packet_size);
3773 goto err_out;
b411b363
PR
3774 }
3775 }
b411b363 3776
02918be2
PR
3777 if (0) {
3778 err_out:
3779 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3780 }
856c50c7
LE
3781 /* If we leave here, we probably want to update at least the
3782 * "Connected" indicator on stable storage. Do so explicitly here. */
3783 drbd_md_sync(mdev);
b411b363
PR
3784}
3785
3786void drbd_flush_workqueue(struct drbd_conf *mdev)
3787{
3788 struct drbd_wq_barrier barr;
3789
3790 barr.w.cb = w_prev_work_done;
3791 init_completion(&barr.done);
3792 drbd_queue_work(&mdev->data.work, &barr.w);
3793 wait_for_completion(&barr.done);
3794}
3795
f70b3511
PR
3796void drbd_free_tl_hash(struct drbd_conf *mdev)
3797{
3798 struct hlist_head *h;
3799
3800 spin_lock_irq(&mdev->req_lock);
3801
3802 if (!mdev->tl_hash || mdev->state.conn != C_STANDALONE) {
3803 spin_unlock_irq(&mdev->req_lock);
3804 return;
3805 }
3806 /* paranoia code */
3807 for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3808 if (h->first)
3809 dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3810 (int)(h - mdev->ee_hash), h->first);
3811 kfree(mdev->ee_hash);
3812 mdev->ee_hash = NULL;
3813 mdev->ee_hash_s = 0;
3814
c12e9c89
LE
3815 /* We may not have had the chance to wait for all locally pending
3816 * application requests. The hlist_add_fake() prevents access after
3817 * free on master bio completion. */
3818 for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++) {
3819 struct drbd_request *req;
3820 struct hlist_node *pos, *n;
3821 hlist_for_each_entry_safe(req, pos, n, h, collision) {
3822 hlist_del_init(&req->collision);
3823 hlist_add_fake(&req->collision);
3824 }
3825 }
3826
f70b3511
PR
3827 kfree(mdev->tl_hash);
3828 mdev->tl_hash = NULL;
3829 mdev->tl_hash_s = 0;
3830 spin_unlock_irq(&mdev->req_lock);
3831}
3832
b411b363
PR
3833static void drbd_disconnect(struct drbd_conf *mdev)
3834{
3835 enum drbd_fencing_p fp;
3836 union drbd_state os, ns;
3837 int rv = SS_UNKNOWN_ERROR;
3838 unsigned int i;
3839
3840 if (mdev->state.conn == C_STANDALONE)
3841 return;
b411b363 3842
545752d5
LE
3843 /* We are about to start the cleanup after connection loss.
3844 * Make sure drbd_make_request knows about that.
3845 * Usually we should be in some network failure state already,
3846 * but just in case we are not, we fix it up here.
3847 */
3848 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
3849
b411b363
PR
3850 /* asender does not clean up anything. it must not interfere, either */
3851 drbd_thread_stop(&mdev->asender);
b411b363 3852 drbd_free_sock(mdev);
b411b363 3853
85719573 3854 /* wait for current activity to cease. */
b411b363
PR
3855 spin_lock_irq(&mdev->req_lock);
3856 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3857 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3858 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3859 spin_unlock_irq(&mdev->req_lock);
3860
3861 /* We do not have data structures that would allow us to
3862 * get the rs_pending_cnt down to 0 again.
3863 * * On C_SYNC_TARGET we do not have any data structures describing
3864 * the pending RSDataRequest's we have sent.
3865 * * On C_SYNC_SOURCE there is no data structure that tracks
3866 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3867 * And no, it is not the sum of the reference counts in the
3868 * resync_LRU. The resync_LRU tracks the whole operation including
3869 * the disk-IO, while the rs_pending_cnt only tracks the blocks
3870 * on the fly. */
3871 drbd_rs_cancel_all(mdev);
3872 mdev->rs_total = 0;
3873 mdev->rs_failed = 0;
3874 atomic_set(&mdev->rs_pending_cnt, 0);
3875 wake_up(&mdev->misc_wait);
3876
3877 /* make sure syncer is stopped and w_resume_next_sg queued */
3878 del_timer_sync(&mdev->resync_timer);
b411b363
PR
3879 resync_timer_fn((unsigned long)mdev);
3880
b411b363
PR
3881 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3882 * w_make_resync_request etc. which may still be on the worker queue
3883 * to be "canceled" */
3884 drbd_flush_workqueue(mdev);
3885
3886 /* This also does reclaim_net_ee(). If we do this too early, we might
3887 * miss some resync ee and pages.*/
3888 drbd_process_done_ee(mdev);
3889
3890 kfree(mdev->p_uuid);
3891 mdev->p_uuid = NULL;
3892
fb22c402 3893 if (!is_susp(mdev->state))
b411b363
PR
3894 tl_clear(mdev);
3895
b411b363
PR
3896 dev_info(DEV, "Connection closed\n");
3897
3898 drbd_md_sync(mdev);
3899
3900 fp = FP_DONT_CARE;
3901 if (get_ldev(mdev)) {
3902 fp = mdev->ldev->dc.fencing;
3903 put_ldev(mdev);
3904 }
3905
87f7be4c
PR
3906 if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
3907 drbd_try_outdate_peer_async(mdev);
b411b363
PR
3908
3909 spin_lock_irq(&mdev->req_lock);
3910 os = mdev->state;
3911 if (os.conn >= C_UNCONNECTED) {
3912 /* Do not restart in case we are C_DISCONNECTING */
3913 ns = os;
3914 ns.conn = C_UNCONNECTED;
3915 rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3916 }
3917 spin_unlock_irq(&mdev->req_lock);
3918
3919 if (os.conn == C_DISCONNECTING) {
84dfb9f5 3920 wait_event(mdev->net_cnt_wait, atomic_read(&mdev->net_cnt) == 0);
b411b363 3921
b411b363
PR
3922 crypto_free_hash(mdev->cram_hmac_tfm);
3923 mdev->cram_hmac_tfm = NULL;
3924
3925 kfree(mdev->net_conf);
3926 mdev->net_conf = NULL;
3927 drbd_request_state(mdev, NS(conn, C_STANDALONE));
3928 }
3929
20ceb2b2
LE
3930 /* serialize with bitmap writeout triggered by the state change,
3931 * if any. */
3932 wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
3933
b411b363
PR
3934 /* tcp_close and release of sendpage pages can be deferred. I don't
3935 * want to use SO_LINGER, because apparently it can be deferred for
3936 * more than 20 seconds (longest time I checked).
3937 *
3938 * Actually we don't care for exactly when the network stack does its
3939 * put_page(), but release our reference on these pages right here.
3940 */
3941 i = drbd_release_ee(mdev, &mdev->net_ee);
3942 if (i)
3943 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
435f0740
LE
3944 i = atomic_read(&mdev->pp_in_use_by_net);
3945 if (i)
3946 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
b411b363
PR
3947 i = atomic_read(&mdev->pp_in_use);
3948 if (i)
45bb912b 3949 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
b411b363
PR
3950
3951 D_ASSERT(list_empty(&mdev->read_ee));
3952 D_ASSERT(list_empty(&mdev->active_ee));
3953 D_ASSERT(list_empty(&mdev->sync_ee));
3954 D_ASSERT(list_empty(&mdev->done_ee));
3955
3956 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3957 atomic_set(&mdev->current_epoch->epoch_size, 0);
3958 D_ASSERT(list_empty(&mdev->current_epoch->list));
3959}
3960
3961/*
3962 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3963 * we can agree on is stored in agreed_pro_version.
3964 *
3965 * feature flags and the reserved array should be enough room for future
3966 * enhancements of the handshake protocol, and possible plugins...
3967 *
3968 * for now, they are expected to be zero, but ignored.
3969 */
3970static int drbd_send_handshake(struct drbd_conf *mdev)
3971{
3972 /* ASSERT current == mdev->receiver ... */
3973 struct p_handshake *p = &mdev->data.sbuf.handshake;
3974 int ok;
3975
3976 if (mutex_lock_interruptible(&mdev->data.mutex)) {
3977 dev_err(DEV, "interrupted during initial handshake\n");
3978 return 0; /* interrupted. not ok. */
3979 }
3980
3981 if (mdev->data.socket == NULL) {
3982 mutex_unlock(&mdev->data.mutex);
3983 return 0;
3984 }
3985
3986 memset(p, 0, sizeof(*p));
3987 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3988 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3989 ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
0b70a13d 3990 (struct p_header80 *)p, sizeof(*p), 0 );
b411b363
PR
3991 mutex_unlock(&mdev->data.mutex);
3992 return ok;
3993}
3994
3995/*
3996 * return values:
3997 * 1 yes, we have a valid connection
3998 * 0 oops, did not work out, please try again
3999 * -1 peer talks different language,
4000 * no point in trying again, please go standalone.
4001 */
4002static int drbd_do_handshake(struct drbd_conf *mdev)
4003{
4004 /* ASSERT current == mdev->receiver ... */
4005 struct p_handshake *p = &mdev->data.rbuf.handshake;
02918be2
PR
4006 const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
4007 unsigned int length;
4008 enum drbd_packets cmd;
b411b363
PR
4009 int rv;
4010
4011 rv = drbd_send_handshake(mdev);
4012 if (!rv)
4013 return 0;
4014
02918be2 4015 rv = drbd_recv_header(mdev, &cmd, &length);
b411b363
PR
4016 if (!rv)
4017 return 0;
4018
02918be2 4019 if (cmd != P_HAND_SHAKE) {
b411b363 4020 dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
02918be2 4021 cmdname(cmd), cmd);
b411b363
PR
4022 return -1;
4023 }
4024
02918be2 4025 if (length != expect) {
b411b363 4026 dev_err(DEV, "expected HandShake length: %u, received: %u\n",
02918be2 4027 expect, length);
b411b363
PR
4028 return -1;
4029 }
4030
4031 rv = drbd_recv(mdev, &p->head.payload, expect);
4032
4033 if (rv != expect) {
0ddc5549
LE
4034 if (!signal_pending(current))
4035 dev_warn(DEV, "short read receiving handshake packet: l=%u\n", rv);
b411b363
PR
4036 return 0;
4037 }
4038
b411b363
PR
4039 p->protocol_min = be32_to_cpu(p->protocol_min);
4040 p->protocol_max = be32_to_cpu(p->protocol_max);
4041 if (p->protocol_max == 0)
4042 p->protocol_max = p->protocol_min;
4043
4044 if (PRO_VERSION_MAX < p->protocol_min ||
4045 PRO_VERSION_MIN > p->protocol_max)
4046 goto incompat;
4047
4048 mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4049
4050 dev_info(DEV, "Handshake successful: "
4051 "Agreed network protocol version %d\n", mdev->agreed_pro_version);
4052
4053 return 1;
4054
4055 incompat:
4056 dev_err(DEV, "incompatible DRBD dialects: "
4057 "I support %d-%d, peer supports %d-%d\n",
4058 PRO_VERSION_MIN, PRO_VERSION_MAX,
4059 p->protocol_min, p->protocol_max);
4060 return -1;
4061}
4062
4063#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4064static int drbd_do_auth(struct drbd_conf *mdev)
4065{
4066 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4067 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
b10d96cb 4068 return -1;
b411b363
PR
4069}
4070#else
4071#define CHALLENGE_LEN 64
b10d96cb
JT
4072
4073/* Return value:
4074 1 - auth succeeded,
4075 0 - failed, try again (network error),
4076 -1 - auth failed, don't try again.
4077*/
4078
b411b363
PR
4079static int drbd_do_auth(struct drbd_conf *mdev)
4080{
4081 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4082 struct scatterlist sg;
4083 char *response = NULL;
4084 char *right_response = NULL;
4085 char *peers_ch = NULL;
b411b363
PR
4086 unsigned int key_len = strlen(mdev->net_conf->shared_secret);
4087 unsigned int resp_size;
4088 struct hash_desc desc;
02918be2
PR
4089 enum drbd_packets cmd;
4090 unsigned int length;
b411b363
PR
4091 int rv;
4092
4093 desc.tfm = mdev->cram_hmac_tfm;
4094 desc.flags = 0;
4095
4096 rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
4097 (u8 *)mdev->net_conf->shared_secret, key_len);
4098 if (rv) {
4099 dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
b10d96cb 4100 rv = -1;
b411b363
PR
4101 goto fail;
4102 }
4103
4104 get_random_bytes(my_challenge, CHALLENGE_LEN);
4105
4106 rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4107 if (!rv)
4108 goto fail;
4109
02918be2 4110 rv = drbd_recv_header(mdev, &cmd, &length);
b411b363
PR
4111 if (!rv)
4112 goto fail;
4113
02918be2 4114 if (cmd != P_AUTH_CHALLENGE) {
b411b363 4115 dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
02918be2 4116 cmdname(cmd), cmd);
b411b363
PR
4117 rv = 0;
4118 goto fail;
4119 }
4120
02918be2 4121 if (length > CHALLENGE_LEN * 2) {
b411b363 4122 dev_err(DEV, "expected AuthChallenge payload too big.\n");
b10d96cb 4123 rv = -1;
b411b363
PR
4124 goto fail;
4125 }
4126
02918be2 4127 peers_ch = kmalloc(length, GFP_NOIO);
b411b363
PR
4128 if (peers_ch == NULL) {
4129 dev_err(DEV, "kmalloc of peers_ch failed\n");
b10d96cb 4130 rv = -1;
b411b363
PR
4131 goto fail;
4132 }
4133
02918be2 4134 rv = drbd_recv(mdev, peers_ch, length);
b411b363 4135
02918be2 4136 if (rv != length) {
0ddc5549
LE
4137 if (!signal_pending(current))
4138 dev_warn(DEV, "short read AuthChallenge: l=%u\n", rv);
b411b363
PR
4139 rv = 0;
4140 goto fail;
4141 }
4142
4143 resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
4144 response = kmalloc(resp_size, GFP_NOIO);
4145 if (response == NULL) {
4146 dev_err(DEV, "kmalloc of response failed\n");
b10d96cb 4147 rv = -1;
b411b363
PR
4148 goto fail;
4149 }
4150
4151 sg_init_table(&sg, 1);
02918be2 4152 sg_set_buf(&sg, peers_ch, length);
b411b363
PR
4153
4154 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4155 if (rv) {
4156 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4157 rv = -1;
b411b363
PR
4158 goto fail;
4159 }
4160
4161 rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
4162 if (!rv)
4163 goto fail;
4164
02918be2 4165 rv = drbd_recv_header(mdev, &cmd, &length);
b411b363
PR
4166 if (!rv)
4167 goto fail;
4168
02918be2 4169 if (cmd != P_AUTH_RESPONSE) {
b411b363 4170 dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
02918be2 4171 cmdname(cmd), cmd);
b411b363
PR
4172 rv = 0;
4173 goto fail;
4174 }
4175
02918be2 4176 if (length != resp_size) {
b411b363
PR
4177 dev_err(DEV, "expected AuthResponse payload of wrong size\n");
4178 rv = 0;
4179 goto fail;
4180 }
4181
4182 rv = drbd_recv(mdev, response , resp_size);
4183
4184 if (rv != resp_size) {
0ddc5549
LE
4185 if (!signal_pending(current))
4186 dev_warn(DEV, "short read receiving AuthResponse: l=%u\n", rv);
b411b363
PR
4187 rv = 0;
4188 goto fail;
4189 }
4190
4191 right_response = kmalloc(resp_size, GFP_NOIO);
2d1ee87d 4192 if (right_response == NULL) {
b411b363 4193 dev_err(DEV, "kmalloc of right_response failed\n");
b10d96cb 4194 rv = -1;
b411b363
PR
4195 goto fail;
4196 }
4197
4198 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4199
4200 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4201 if (rv) {
4202 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4203 rv = -1;
b411b363
PR
4204 goto fail;
4205 }
4206
4207 rv = !memcmp(response, right_response, resp_size);
4208
4209 if (rv)
4210 dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4211 resp_size, mdev->net_conf->cram_hmac_alg);
b10d96cb
JT
4212 else
4213 rv = -1;
b411b363
PR
4214
4215 fail:
4216 kfree(peers_ch);
4217 kfree(response);
4218 kfree(right_response);
4219
4220 return rv;
4221}
4222#endif
4223
4224int drbdd_init(struct drbd_thread *thi)
4225{
4226 struct drbd_conf *mdev = thi->mdev;
4227 unsigned int minor = mdev_to_minor(mdev);
4228 int h;
4229
4230 sprintf(current->comm, "drbd%d_receiver", minor);
4231
4232 dev_info(DEV, "receiver (re)started\n");
4233
4234 do {
4235 h = drbd_connect(mdev);
4236 if (h == 0) {
4237 drbd_disconnect(mdev);
20ee6390 4238 schedule_timeout_interruptible(HZ);
b411b363
PR
4239 }
4240 if (h == -1) {
4241 dev_warn(DEV, "Discarding network configuration.\n");
4242 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4243 }
4244 } while (h == 0);
4245
4246 if (h > 0) {
4247 if (get_net_conf(mdev)) {
4248 drbdd(mdev);
4249 put_net_conf(mdev);
4250 }
4251 }
4252
4253 drbd_disconnect(mdev);
4254
4255 dev_info(DEV, "receiver terminated\n");
4256 return 0;
4257}
4258
4259/* ********* acknowledge sender ******** */
4260
0b70a13d 4261static int got_RqSReply(struct drbd_conf *mdev, struct p_header80 *h)
b411b363
PR
4262{
4263 struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4264
4265 int retcode = be32_to_cpu(p->retcode);
4266
4267 if (retcode >= SS_SUCCESS) {
4268 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4269 } else {
4270 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4271 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4272 drbd_set_st_err_str(retcode), retcode);
4273 }
4274 wake_up(&mdev->state_wait);
4275
81e84650 4276 return true;
b411b363
PR
4277}
4278
0b70a13d 4279static int got_Ping(struct drbd_conf *mdev, struct p_header80 *h)
b411b363
PR
4280{
4281 return drbd_send_ping_ack(mdev);
4282
4283}
4284
0b70a13d 4285static int got_PingAck(struct drbd_conf *mdev, struct p_header80 *h)
b411b363
PR
4286{
4287 /* restore idle timeout */
4288 mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
309d1608
PR
4289 if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4290 wake_up(&mdev->misc_wait);
b411b363 4291
81e84650 4292 return true;
b411b363
PR
4293}
4294
0b70a13d 4295static int got_IsInSync(struct drbd_conf *mdev, struct p_header80 *h)
b411b363
PR
4296{
4297 struct p_block_ack *p = (struct p_block_ack *)h;
4298 sector_t sector = be64_to_cpu(p->sector);
4299 int blksize = be32_to_cpu(p->blksize);
4300
4301 D_ASSERT(mdev->agreed_pro_version >= 89);
4302
4303 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4304
1d53f09e
LE
4305 if (get_ldev(mdev)) {
4306 drbd_rs_complete_io(mdev, sector);
4307 drbd_set_in_sync(mdev, sector, blksize);
4308 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4309 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4310 put_ldev(mdev);
4311 }
b411b363 4312 dec_rs_pending(mdev);
778f271d 4313 atomic_add(blksize >> 9, &mdev->rs_sect_in);
b411b363 4314
81e84650 4315 return true;
b411b363
PR
4316}
4317
4318/* when we receive the ACK for a write request,
4319 * verify that we actually know about it */
4320static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4321 u64 id, sector_t sector)
4322{
4323 struct hlist_head *slot = tl_hash_slot(mdev, sector);
4324 struct hlist_node *n;
4325 struct drbd_request *req;
4326
24c4830c 4327 hlist_for_each_entry(req, n, slot, collision) {
b411b363
PR
4328 if ((unsigned long)req == (unsigned long)id) {
4329 if (req->sector != sector) {
4330 dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4331 "wrong sector (%llus versus %llus)\n", req,
4332 (unsigned long long)req->sector,
4333 (unsigned long long)sector);
4334 break;
4335 }
4336 return req;
4337 }
4338 }
b411b363
PR
4339 return NULL;
4340}
4341
4342typedef struct drbd_request *(req_validator_fn)
4343 (struct drbd_conf *mdev, u64 id, sector_t sector);
4344
4345static int validate_req_change_req_state(struct drbd_conf *mdev,
4346 u64 id, sector_t sector, req_validator_fn validator,
4347 const char *func, enum drbd_req_event what)
4348{
4349 struct drbd_request *req;
4350 struct bio_and_error m;
4351
4352 spin_lock_irq(&mdev->req_lock);
4353 req = validator(mdev, id, sector);
4354 if (unlikely(!req)) {
4355 spin_unlock_irq(&mdev->req_lock);
2deb8336
PR
4356
4357 dev_err(DEV, "%s: failed to find req %p, sector %llus\n", func,
4358 (void *)(unsigned long)id, (unsigned long long)sector);
81e84650 4359 return false;
b411b363
PR
4360 }
4361 __req_mod(req, what, &m);
4362 spin_unlock_irq(&mdev->req_lock);
4363
4364 if (m.bio)
4365 complete_master_bio(mdev, &m);
81e84650 4366 return true;
b411b363
PR
4367}
4368
0b70a13d 4369static int got_BlockAck(struct drbd_conf *mdev, struct p_header80 *h)
b411b363
PR
4370{
4371 struct p_block_ack *p = (struct p_block_ack *)h;
4372 sector_t sector = be64_to_cpu(p->sector);
4373 int blksize = be32_to_cpu(p->blksize);
4374 enum drbd_req_event what;
4375
4376 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4377
4378 if (is_syncer_block_id(p->block_id)) {
4379 drbd_set_in_sync(mdev, sector, blksize);
4380 dec_rs_pending(mdev);
81e84650 4381 return true;
b411b363
PR
4382 }
4383 switch (be16_to_cpu(h->command)) {
4384 case P_RS_WRITE_ACK:
4385 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4386 what = write_acked_by_peer_and_sis;
4387 break;
4388 case P_WRITE_ACK:
4389 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4390 what = write_acked_by_peer;
4391 break;
4392 case P_RECV_ACK:
4393 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4394 what = recv_acked_by_peer;
4395 break;
4396 case P_DISCARD_ACK:
4397 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4398 what = conflict_discarded_by_peer;
4399 break;
4400 default:
4401 D_ASSERT(0);
81e84650 4402 return false;
b411b363
PR
4403 }
4404
4405 return validate_req_change_req_state(mdev, p->block_id, sector,
4406 _ack_id_to_req, __func__ , what);
4407}
4408
0b70a13d 4409static int got_NegAck(struct drbd_conf *mdev, struct p_header80 *h)
b411b363
PR
4410{
4411 struct p_block_ack *p = (struct p_block_ack *)h;
4412 sector_t sector = be64_to_cpu(p->sector);
2deb8336
PR
4413 int size = be32_to_cpu(p->blksize);
4414 struct drbd_request *req;
4415 struct bio_and_error m;
b411b363
PR
4416
4417 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4418
4419 if (is_syncer_block_id(p->block_id)) {
b411b363
PR
4420 dec_rs_pending(mdev);
4421 drbd_rs_failed_io(mdev, sector, size);
81e84650 4422 return true;
b411b363 4423 }
2deb8336
PR
4424
4425 spin_lock_irq(&mdev->req_lock);
4426 req = _ack_id_to_req(mdev, p->block_id, sector);
4427 if (!req) {
4428 spin_unlock_irq(&mdev->req_lock);
4429 if (mdev->net_conf->wire_protocol == DRBD_PROT_A ||
4430 mdev->net_conf->wire_protocol == DRBD_PROT_B) {
4431 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4432 The master bio might already be completed, therefore the
4433 request is no longer in the collision hash.
4434 => Do not try to validate block_id as request. */
4435 /* In Protocol B we might already have got a P_RECV_ACK
4436 but then get a P_NEG_ACK after wards. */
4437 drbd_set_out_of_sync(mdev, sector, size);
4438 return true;
4439 } else {
4440 dev_err(DEV, "%s: failed to find req %p, sector %llus\n", __func__,
4441 (void *)(unsigned long)p->block_id, (unsigned long long)sector);
4442 return false;
4443 }
4444 }
4445 __req_mod(req, neg_acked, &m);
4446 spin_unlock_irq(&mdev->req_lock);
4447
4448 if (m.bio)
4449 complete_master_bio(mdev, &m);
4450 return true;
b411b363
PR
4451}
4452
0b70a13d 4453static int got_NegDReply(struct drbd_conf *mdev, struct p_header80 *h)
b411b363
PR
4454{
4455 struct p_block_ack *p = (struct p_block_ack *)h;
4456 sector_t sector = be64_to_cpu(p->sector);
4457
4458 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4459 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4460 (unsigned long long)sector, be32_to_cpu(p->blksize));
4461
4462 return validate_req_change_req_state(mdev, p->block_id, sector,
4463 _ar_id_to_req, __func__ , neg_acked);
4464}
4465
0b70a13d 4466static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header80 *h)
b411b363
PR
4467{
4468 sector_t sector;
4469 int size;
4470 struct p_block_ack *p = (struct p_block_ack *)h;
4471
4472 sector = be64_to_cpu(p->sector);
4473 size = be32_to_cpu(p->blksize);
b411b363
PR
4474
4475 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4476
4477 dec_rs_pending(mdev);
4478
4479 if (get_ldev_if_state(mdev, D_FAILED)) {
4480 drbd_rs_complete_io(mdev, sector);
d612d309
PR
4481 switch (be16_to_cpu(h->command)) {
4482 case P_NEG_RS_DREPLY:
4483 drbd_rs_failed_io(mdev, sector, size);
4484 case P_RS_CANCEL:
4485 break;
4486 default:
4487 D_ASSERT(0);
4488 put_ldev(mdev);
4489 return false;
4490 }
b411b363
PR
4491 put_ldev(mdev);
4492 }
4493
81e84650 4494 return true;
b411b363
PR
4495}
4496
0b70a13d 4497static int got_BarrierAck(struct drbd_conf *mdev, struct p_header80 *h)
b411b363
PR
4498{
4499 struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4500
4501 tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4502
c4752ef1
PR
4503 if (mdev->state.conn == C_AHEAD &&
4504 atomic_read(&mdev->ap_in_flight) == 0 &&
e89868a0 4505 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->flags)) {
370a43e7
PR
4506 mdev->start_resync_timer.expires = jiffies + HZ;
4507 add_timer(&mdev->start_resync_timer);
c4752ef1
PR
4508 }
4509
81e84650 4510 return true;
b411b363
PR
4511}
4512
0b70a13d 4513static int got_OVResult(struct drbd_conf *mdev, struct p_header80 *h)
b411b363
PR
4514{
4515 struct p_block_ack *p = (struct p_block_ack *)h;
4516 struct drbd_work *w;
4517 sector_t sector;
4518 int size;
4519
4520 sector = be64_to_cpu(p->sector);
4521 size = be32_to_cpu(p->blksize);
4522
4523 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4524
4525 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4526 drbd_ov_oos_found(mdev, sector, size);
4527 else
4528 ov_oos_print(mdev);
4529
1d53f09e 4530 if (!get_ldev(mdev))
81e84650 4531 return true;
1d53f09e 4532
b411b363
PR
4533 drbd_rs_complete_io(mdev, sector);
4534 dec_rs_pending(mdev);
4535
ea5442af
LE
4536 --mdev->ov_left;
4537
4538 /* let's advance progress step marks only for every other megabyte */
4539 if ((mdev->ov_left & 0x200) == 0x200)
4540 drbd_advance_rs_marks(mdev, mdev->ov_left);
4541
4542 if (mdev->ov_left == 0) {
b411b363
PR
4543 w = kmalloc(sizeof(*w), GFP_NOIO);
4544 if (w) {
4545 w->cb = w_ov_finished;
4546 drbd_queue_work_front(&mdev->data.work, w);
4547 } else {
4548 dev_err(DEV, "kmalloc(w) failed.");
4549 ov_oos_print(mdev);
4550 drbd_resync_finished(mdev);
4551 }
4552 }
1d53f09e 4553 put_ldev(mdev);
81e84650 4554 return true;
b411b363
PR
4555}
4556
02918be2 4557static int got_skip(struct drbd_conf *mdev, struct p_header80 *h)
0ced55a3 4558{
81e84650 4559 return true;
0ced55a3
PR
4560}
4561
b411b363
PR
4562struct asender_cmd {
4563 size_t pkt_size;
0b70a13d 4564 int (*process)(struct drbd_conf *mdev, struct p_header80 *h);
b411b363
PR
4565};
4566
4567static struct asender_cmd *get_asender_cmd(int cmd)
4568{
4569 static struct asender_cmd asender_tbl[] = {
4570 /* anything missing from this table is in
4571 * the drbd_cmd_handler (drbd_default_handler) table,
4572 * see the beginning of drbdd() */
0b70a13d
PR
4573 [P_PING] = { sizeof(struct p_header80), got_Ping },
4574 [P_PING_ACK] = { sizeof(struct p_header80), got_PingAck },
b411b363
PR
4575 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4576 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4577 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4578 [P_DISCARD_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4579 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
4580 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
4581 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply},
4582 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
4583 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
4584 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4585 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
02918be2 4586 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
d612d309 4587 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply},
b411b363
PR
4588 [P_MAX_CMD] = { 0, NULL },
4589 };
4590 if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4591 return NULL;
4592 return &asender_tbl[cmd];
4593}
4594
4595int drbd_asender(struct drbd_thread *thi)
4596{
4597 struct drbd_conf *mdev = thi->mdev;
02918be2 4598 struct p_header80 *h = &mdev->meta.rbuf.header.h80;
b411b363
PR
4599 struct asender_cmd *cmd = NULL;
4600
4601 int rv, len;
4602 void *buf = h;
4603 int received = 0;
0b70a13d 4604 int expect = sizeof(struct p_header80);
b411b363 4605 int empty;
f36af18c 4606 int ping_timeout_active = 0;
b411b363
PR
4607
4608 sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4609
4610 current->policy = SCHED_RR; /* Make this a realtime task! */
4611 current->rt_priority = 2; /* more important than all other tasks */
4612
4613 while (get_t_state(thi) == Running) {
4614 drbd_thread_current_set_cpu(mdev);
4615 if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4616 ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4617 mdev->meta.socket->sk->sk_rcvtimeo =
4618 mdev->net_conf->ping_timeo*HZ/10;
f36af18c 4619 ping_timeout_active = 1;
b411b363
PR
4620 }
4621
4622 /* conditionally cork;
4623 * it may hurt latency if we cork without much to send */
4624 if (!mdev->net_conf->no_cork &&
4625 3 < atomic_read(&mdev->unacked_cnt))
4626 drbd_tcp_cork(mdev->meta.socket);
4627 while (1) {
4628 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4629 flush_signals(current);
0f8488e1 4630 if (!drbd_process_done_ee(mdev))
b411b363 4631 goto reconnect;
b411b363
PR
4632 /* to avoid race with newly queued ACKs */
4633 set_bit(SIGNAL_ASENDER, &mdev->flags);
4634 spin_lock_irq(&mdev->req_lock);
4635 empty = list_empty(&mdev->done_ee);
4636 spin_unlock_irq(&mdev->req_lock);
4637 /* new ack may have been queued right here,
4638 * but then there is also a signal pending,
4639 * and we start over... */
4640 if (empty)
4641 break;
4642 }
4643 /* but unconditionally uncork unless disabled */
4644 if (!mdev->net_conf->no_cork)
4645 drbd_tcp_uncork(mdev->meta.socket);
4646
4647 /* short circuit, recv_msg would return EINTR anyways. */
4648 if (signal_pending(current))
4649 continue;
4650
4651 rv = drbd_recv_short(mdev, mdev->meta.socket,
4652 buf, expect-received, 0);
4653 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4654
4655 flush_signals(current);
4656
4657 /* Note:
4658 * -EINTR (on meta) we got a signal
4659 * -EAGAIN (on meta) rcvtimeo expired
4660 * -ECONNRESET other side closed the connection
4661 * -ERESTARTSYS (on data) we got a signal
4662 * rv < 0 other than above: unexpected error!
4663 * rv == expected: full header or command
4664 * rv < expected: "woken" by signal during receive
4665 * rv == 0 : "connection shut down by peer"
4666 */
4667 if (likely(rv > 0)) {
4668 received += rv;
4669 buf += rv;
4670 } else if (rv == 0) {
4671 dev_err(DEV, "meta connection shut down by peer.\n");
4672 goto reconnect;
4673 } else if (rv == -EAGAIN) {
cb6518cb
LE
4674 /* If the data socket received something meanwhile,
4675 * that is good enough: peer is still alive. */
4676 if (time_after(mdev->last_received,
4677 jiffies - mdev->meta.socket->sk->sk_rcvtimeo))
4678 continue;
f36af18c 4679 if (ping_timeout_active) {
b411b363
PR
4680 dev_err(DEV, "PingAck did not arrive in time.\n");
4681 goto reconnect;
4682 }
4683 set_bit(SEND_PING, &mdev->flags);
4684 continue;
4685 } else if (rv == -EINTR) {
4686 continue;
4687 } else {
4688 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4689 goto reconnect;
4690 }
4691
4692 if (received == expect && cmd == NULL) {
4693 if (unlikely(h->magic != BE_DRBD_MAGIC)) {
004352fa
LE
4694 dev_err(DEV, "magic?? on meta m: 0x%08x c: %d l: %d\n",
4695 be32_to_cpu(h->magic),
4696 be16_to_cpu(h->command),
4697 be16_to_cpu(h->length));
b411b363
PR
4698 goto reconnect;
4699 }
4700 cmd = get_asender_cmd(be16_to_cpu(h->command));
4701 len = be16_to_cpu(h->length);
4702 if (unlikely(cmd == NULL)) {
004352fa
LE
4703 dev_err(DEV, "unknown command?? on meta m: 0x%08x c: %d l: %d\n",
4704 be32_to_cpu(h->magic),
4705 be16_to_cpu(h->command),
4706 be16_to_cpu(h->length));
b411b363
PR
4707 goto disconnect;
4708 }
4709 expect = cmd->pkt_size;
0b70a13d 4710 ERR_IF(len != expect-sizeof(struct p_header80))
b411b363 4711 goto reconnect;
b411b363
PR
4712 }
4713 if (received == expect) {
cb6518cb 4714 mdev->last_received = jiffies;
b411b363 4715 D_ASSERT(cmd != NULL);
b411b363
PR
4716 if (!cmd->process(mdev, h))
4717 goto reconnect;
4718
f36af18c
LE
4719 /* the idle_timeout (ping-int)
4720 * has been restored in got_PingAck() */
4721 if (cmd == get_asender_cmd(P_PING_ACK))
4722 ping_timeout_active = 0;
4723
b411b363
PR
4724 buf = h;
4725 received = 0;
0b70a13d 4726 expect = sizeof(struct p_header80);
b411b363
PR
4727 cmd = NULL;
4728 }
4729 }
4730
4731 if (0) {
4732reconnect:
4733 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
856c50c7 4734 drbd_md_sync(mdev);
b411b363
PR
4735 }
4736 if (0) {
4737disconnect:
4738 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
856c50c7 4739 drbd_md_sync(mdev);
b411b363
PR
4740 }
4741 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4742
4743 D_ASSERT(mdev->state.conn < C_CONNECTED);
4744 dev_info(DEV, "asender terminated\n");
4745
4746 return 0;
4747}
This page took 0.415108 seconds and 5 git commands to generate.