drbd: Rename "mdev" to "device"
[deliverable/linux.git] / drivers / block / drbd / drbd_receiver.c
CommitLineData
b411b363
PR
1/*
2 drbd_receiver.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
b411b363
PR
26#include <linux/module.h>
27
28#include <asm/uaccess.h>
29#include <net/sock.h>
30
b411b363
PR
31#include <linux/drbd.h>
32#include <linux/fs.h>
33#include <linux/file.h>
34#include <linux/in.h>
35#include <linux/mm.h>
36#include <linux/memcontrol.h>
37#include <linux/mm_inline.h>
38#include <linux/slab.h>
b411b363
PR
39#include <linux/pkt_sched.h>
40#define __KERNEL_SYSCALLS__
41#include <linux/unistd.h>
42#include <linux/vmalloc.h>
43#include <linux/random.h>
b411b363
PR
44#include <linux/string.h>
45#include <linux/scatterlist.h>
46#include "drbd_int.h"
a3603a6e 47#include "drbd_protocol.h"
b411b363
PR
48#include "drbd_req.h"
49
50#include "drbd_vli.h"
51
77351055
PR
52struct packet_info {
53 enum drbd_packet cmd;
e2857216
AG
54 unsigned int size;
55 unsigned int vnr;
e658983a 56 void *data;
77351055
PR
57};
58
b411b363
PR
59enum finish_epoch {
60 FE_STILL_LIVE,
61 FE_DESTROYED,
62 FE_RECYCLED,
63};
64
6038178e 65static int drbd_do_features(struct drbd_tconn *tconn);
13e6037d 66static int drbd_do_auth(struct drbd_tconn *tconn);
b30ab791 67static int drbd_disconnected(struct drbd_device *device);
b411b363 68
1e9dd291 69static enum finish_epoch drbd_may_finish_epoch(struct drbd_tconn *, struct drbd_epoch *, enum epoch_event);
99920dc5 70static int e_end_block(struct drbd_work *, int);
b411b363 71
b411b363
PR
72
73#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
74
45bb912b
LE
75/*
76 * some helper functions to deal with single linked page lists,
77 * page->private being our "next" pointer.
78 */
79
80/* If at least n pages are linked at head, get n pages off.
81 * Otherwise, don't modify head, and return NULL.
82 * Locking is the responsibility of the caller.
83 */
84static struct page *page_chain_del(struct page **head, int n)
85{
86 struct page *page;
87 struct page *tmp;
88
89 BUG_ON(!n);
90 BUG_ON(!head);
91
92 page = *head;
23ce4227
PR
93
94 if (!page)
95 return NULL;
96
45bb912b
LE
97 while (page) {
98 tmp = page_chain_next(page);
99 if (--n == 0)
100 break; /* found sufficient pages */
101 if (tmp == NULL)
102 /* insufficient pages, don't use any of them. */
103 return NULL;
104 page = tmp;
105 }
106
107 /* add end of list marker for the returned list */
108 set_page_private(page, 0);
109 /* actual return value, and adjustment of head */
110 page = *head;
111 *head = tmp;
112 return page;
113}
114
115/* may be used outside of locks to find the tail of a (usually short)
116 * "private" page chain, before adding it back to a global chain head
117 * with page_chain_add() under a spinlock. */
118static struct page *page_chain_tail(struct page *page, int *len)
119{
120 struct page *tmp;
121 int i = 1;
122 while ((tmp = page_chain_next(page)))
123 ++i, page = tmp;
124 if (len)
125 *len = i;
126 return page;
127}
128
129static int page_chain_free(struct page *page)
130{
131 struct page *tmp;
132 int i = 0;
133 page_chain_for_each_safe(page, tmp) {
134 put_page(page);
135 ++i;
136 }
137 return i;
138}
139
140static void page_chain_add(struct page **head,
141 struct page *chain_first, struct page *chain_last)
142{
143#if 1
144 struct page *tmp;
145 tmp = page_chain_tail(chain_first, NULL);
146 BUG_ON(tmp != chain_last);
147#endif
148
149 /* add chain to head */
150 set_page_private(chain_last, (unsigned long)*head);
151 *head = chain_first;
152}
153
b30ab791 154static struct page *__drbd_alloc_pages(struct drbd_device *device,
18c2d522 155 unsigned int number)
b411b363
PR
156{
157 struct page *page = NULL;
45bb912b 158 struct page *tmp = NULL;
18c2d522 159 unsigned int i = 0;
b411b363
PR
160
161 /* Yes, testing drbd_pp_vacant outside the lock is racy.
162 * So what. It saves a spin_lock. */
45bb912b 163 if (drbd_pp_vacant >= number) {
b411b363 164 spin_lock(&drbd_pp_lock);
45bb912b
LE
165 page = page_chain_del(&drbd_pp_pool, number);
166 if (page)
167 drbd_pp_vacant -= number;
b411b363 168 spin_unlock(&drbd_pp_lock);
45bb912b
LE
169 if (page)
170 return page;
b411b363 171 }
45bb912b 172
b411b363
PR
173 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
174 * "criss-cross" setup, that might cause write-out on some other DRBD,
175 * which in turn might block on the other node at this very place. */
45bb912b
LE
176 for (i = 0; i < number; i++) {
177 tmp = alloc_page(GFP_TRY);
178 if (!tmp)
179 break;
180 set_page_private(tmp, (unsigned long)page);
181 page = tmp;
182 }
183
184 if (i == number)
185 return page;
186
187 /* Not enough pages immediately available this time.
c37c8ecf 188 * No need to jump around here, drbd_alloc_pages will retry this
45bb912b
LE
189 * function "soon". */
190 if (page) {
191 tmp = page_chain_tail(page, NULL);
192 spin_lock(&drbd_pp_lock);
193 page_chain_add(&drbd_pp_pool, page, tmp);
194 drbd_pp_vacant += i;
195 spin_unlock(&drbd_pp_lock);
196 }
197 return NULL;
b411b363
PR
198}
199
b30ab791 200static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
a990be46 201 struct list_head *to_be_freed)
b411b363 202{
db830c46 203 struct drbd_peer_request *peer_req;
b411b363
PR
204 struct list_head *le, *tle;
205
206 /* The EEs are always appended to the end of the list. Since
207 they are sent in order over the wire, they have to finish
208 in order. As soon as we see the first not finished we can
209 stop to examine the list... */
210
b30ab791 211 list_for_each_safe(le, tle, &device->net_ee) {
db830c46 212 peer_req = list_entry(le, struct drbd_peer_request, w.list);
045417f7 213 if (drbd_peer_req_has_active_page(peer_req))
b411b363
PR
214 break;
215 list_move(le, to_be_freed);
216 }
217}
218
b30ab791 219static void drbd_kick_lo_and_reclaim_net(struct drbd_device *device)
b411b363
PR
220{
221 LIST_HEAD(reclaimed);
db830c46 222 struct drbd_peer_request *peer_req, *t;
b411b363 223
b30ab791
AG
224 spin_lock_irq(&device->tconn->req_lock);
225 reclaim_finished_net_peer_reqs(device, &reclaimed);
226 spin_unlock_irq(&device->tconn->req_lock);
b411b363 227
db830c46 228 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
b30ab791 229 drbd_free_net_peer_req(device, peer_req);
b411b363
PR
230}
231
232/**
c37c8ecf 233 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
b30ab791 234 * @device: DRBD device.
45bb912b
LE
235 * @number: number of pages requested
236 * @retry: whether to retry, if not enough pages are available right now
237 *
238 * Tries to allocate number pages, first from our own page pool, then from
239 * the kernel, unless this allocation would exceed the max_buffers setting.
240 * Possibly retry until DRBD frees sufficient pages somewhere else.
b411b363 241 *
45bb912b 242 * Returns a page chain linked via page->private.
b411b363 243 */
b30ab791 244struct page *drbd_alloc_pages(struct drbd_device *device, unsigned int number,
c37c8ecf 245 bool retry)
b411b363
PR
246{
247 struct page *page = NULL;
44ed167d 248 struct net_conf *nc;
b411b363 249 DEFINE_WAIT(wait);
44ed167d 250 int mxb;
b411b363 251
45bb912b
LE
252 /* Yes, we may run up to @number over max_buffers. If we
253 * follow it strictly, the admin will get it wrong anyways. */
44ed167d 254 rcu_read_lock();
b30ab791 255 nc = rcu_dereference(device->tconn->net_conf);
44ed167d
PR
256 mxb = nc ? nc->max_buffers : 1000000;
257 rcu_read_unlock();
258
b30ab791
AG
259 if (atomic_read(&device->pp_in_use) < mxb)
260 page = __drbd_alloc_pages(device, number);
b411b363 261
45bb912b 262 while (page == NULL) {
b411b363
PR
263 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
264
b30ab791 265 drbd_kick_lo_and_reclaim_net(device);
b411b363 266
b30ab791
AG
267 if (atomic_read(&device->pp_in_use) < mxb) {
268 page = __drbd_alloc_pages(device, number);
b411b363
PR
269 if (page)
270 break;
271 }
272
273 if (!retry)
274 break;
275
276 if (signal_pending(current)) {
c37c8ecf 277 dev_warn(DEV, "drbd_alloc_pages interrupted!\n");
b411b363
PR
278 break;
279 }
280
281 schedule();
282 }
283 finish_wait(&drbd_pp_wait, &wait);
284
45bb912b 285 if (page)
b30ab791 286 atomic_add(number, &device->pp_in_use);
b411b363
PR
287 return page;
288}
289
c37c8ecf 290/* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
b30ab791 291 * Is also used from inside an other spin_lock_irq(&device->tconn->req_lock);
45bb912b
LE
292 * Either links the page chain back to the global pool,
293 * or returns all pages to the system. */
b30ab791 294static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
b411b363 295{
b30ab791 296 atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
b411b363 297 int i;
435f0740 298
a73ff323
LE
299 if (page == NULL)
300 return;
301
81a5d60e 302 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
45bb912b
LE
303 i = page_chain_free(page);
304 else {
305 struct page *tmp;
306 tmp = page_chain_tail(page, &i);
307 spin_lock(&drbd_pp_lock);
308 page_chain_add(&drbd_pp_pool, page, tmp);
309 drbd_pp_vacant += i;
310 spin_unlock(&drbd_pp_lock);
b411b363 311 }
435f0740 312 i = atomic_sub_return(i, a);
45bb912b 313 if (i < 0)
435f0740
LE
314 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
315 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
b411b363
PR
316 wake_up(&drbd_pp_wait);
317}
318
319/*
320You need to hold the req_lock:
321 _drbd_wait_ee_list_empty()
322
323You must not have the req_lock:
3967deb1 324 drbd_free_peer_req()
0db55363 325 drbd_alloc_peer_req()
7721f567 326 drbd_free_peer_reqs()
b411b363 327 drbd_ee_fix_bhs()
a990be46 328 drbd_finish_peer_reqs()
b411b363
PR
329 drbd_clear_done_ee()
330 drbd_wait_ee_list_empty()
331*/
332
f6ffca9f 333struct drbd_peer_request *
b30ab791 334drbd_alloc_peer_req(struct drbd_device *device, u64 id, sector_t sector,
0db55363 335 unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
b411b363 336{
db830c46 337 struct drbd_peer_request *peer_req;
a73ff323 338 struct page *page = NULL;
45bb912b 339 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
b411b363 340
b30ab791 341 if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
b411b363
PR
342 return NULL;
343
db830c46
AG
344 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
345 if (!peer_req) {
b411b363 346 if (!(gfp_mask & __GFP_NOWARN))
0db55363 347 dev_err(DEV, "%s: allocation failed\n", __func__);
b411b363
PR
348 return NULL;
349 }
350
a73ff323 351 if (data_size) {
b30ab791 352 page = drbd_alloc_pages(device, nr_pages, (gfp_mask & __GFP_WAIT));
a73ff323
LE
353 if (!page)
354 goto fail;
355 }
b411b363 356
db830c46
AG
357 drbd_clear_interval(&peer_req->i);
358 peer_req->i.size = data_size;
359 peer_req->i.sector = sector;
360 peer_req->i.local = false;
361 peer_req->i.waiting = false;
362
363 peer_req->epoch = NULL;
b30ab791 364 peer_req->w.device = device;
db830c46
AG
365 peer_req->pages = page;
366 atomic_set(&peer_req->pending_bios, 0);
367 peer_req->flags = 0;
9a8e7753
AG
368 /*
369 * The block_id is opaque to the receiver. It is not endianness
370 * converted, and sent back to the sender unchanged.
371 */
db830c46 372 peer_req->block_id = id;
b411b363 373
db830c46 374 return peer_req;
b411b363 375
45bb912b 376 fail:
db830c46 377 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
378 return NULL;
379}
380
b30ab791 381void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
f6ffca9f 382 int is_net)
b411b363 383{
db830c46
AG
384 if (peer_req->flags & EE_HAS_DIGEST)
385 kfree(peer_req->digest);
b30ab791 386 drbd_free_pages(device, peer_req->pages, is_net);
db830c46
AG
387 D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
388 D_ASSERT(drbd_interval_empty(&peer_req->i));
389 mempool_free(peer_req, drbd_ee_mempool);
b411b363
PR
390}
391
b30ab791 392int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
b411b363
PR
393{
394 LIST_HEAD(work_list);
db830c46 395 struct drbd_peer_request *peer_req, *t;
b411b363 396 int count = 0;
b30ab791 397 int is_net = list == &device->net_ee;
b411b363 398
b30ab791 399 spin_lock_irq(&device->tconn->req_lock);
b411b363 400 list_splice_init(list, &work_list);
b30ab791 401 spin_unlock_irq(&device->tconn->req_lock);
b411b363 402
db830c46 403 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
b30ab791 404 __drbd_free_peer_req(device, peer_req, is_net);
b411b363
PR
405 count++;
406 }
407 return count;
408}
409
b411b363 410/*
a990be46 411 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
b411b363 412 */
b30ab791 413static int drbd_finish_peer_reqs(struct drbd_device *device)
b411b363
PR
414{
415 LIST_HEAD(work_list);
416 LIST_HEAD(reclaimed);
db830c46 417 struct drbd_peer_request *peer_req, *t;
e2b3032b 418 int err = 0;
b411b363 419
b30ab791
AG
420 spin_lock_irq(&device->tconn->req_lock);
421 reclaim_finished_net_peer_reqs(device, &reclaimed);
422 list_splice_init(&device->done_ee, &work_list);
423 spin_unlock_irq(&device->tconn->req_lock);
b411b363 424
db830c46 425 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
b30ab791 426 drbd_free_net_peer_req(device, peer_req);
b411b363
PR
427
428 /* possible callbacks here:
d4dabbe2 429 * e_end_block, and e_end_resync_block, e_send_superseded.
b411b363
PR
430 * all ignore the last argument.
431 */
db830c46 432 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
e2b3032b
AG
433 int err2;
434
b411b363 435 /* list_del not necessary, next/prev members not touched */
e2b3032b
AG
436 err2 = peer_req->w.cb(&peer_req->w, !!err);
437 if (!err)
438 err = err2;
b30ab791 439 drbd_free_peer_req(device, peer_req);
b411b363 440 }
b30ab791 441 wake_up(&device->ee_wait);
b411b363 442
e2b3032b 443 return err;
b411b363
PR
444}
445
b30ab791 446static void _drbd_wait_ee_list_empty(struct drbd_device *device,
d4da1537 447 struct list_head *head)
b411b363
PR
448{
449 DEFINE_WAIT(wait);
450
451 /* avoids spin_lock/unlock
452 * and calling prepare_to_wait in the fast path */
453 while (!list_empty(head)) {
b30ab791
AG
454 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
455 spin_unlock_irq(&device->tconn->req_lock);
7eaceacc 456 io_schedule();
b30ab791
AG
457 finish_wait(&device->ee_wait, &wait);
458 spin_lock_irq(&device->tconn->req_lock);
b411b363
PR
459 }
460}
461
b30ab791 462static void drbd_wait_ee_list_empty(struct drbd_device *device,
d4da1537 463 struct list_head *head)
b411b363 464{
b30ab791
AG
465 spin_lock_irq(&device->tconn->req_lock);
466 _drbd_wait_ee_list_empty(device, head);
467 spin_unlock_irq(&device->tconn->req_lock);
b411b363
PR
468}
469
dbd9eea0 470static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
b411b363
PR
471{
472 mm_segment_t oldfs;
473 struct kvec iov = {
474 .iov_base = buf,
475 .iov_len = size,
476 };
477 struct msghdr msg = {
478 .msg_iovlen = 1,
479 .msg_iov = (struct iovec *)&iov,
480 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
481 };
482 int rv;
483
484 oldfs = get_fs();
485 set_fs(KERNEL_DS);
486 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
487 set_fs(oldfs);
488
489 return rv;
490}
491
de0ff338 492static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
b411b363 493{
b411b363
PR
494 int rv;
495
1393b59f 496 rv = drbd_recv_short(tconn->data.socket, buf, size, 0);
b411b363 497
dbd0820c
PR
498 if (rv < 0) {
499 if (rv == -ECONNRESET)
155522df 500 conn_info(tconn, "sock was reset by peer\n");
dbd0820c 501 else if (rv != -ERESTARTSYS)
155522df 502 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
dbd0820c 503 } else if (rv == 0) {
b66623e3
PR
504 if (test_bit(DISCONNECT_SENT, &tconn->flags)) {
505 long t;
506 rcu_read_lock();
507 t = rcu_dereference(tconn->net_conf)->ping_timeo * HZ/10;
508 rcu_read_unlock();
509
510 t = wait_event_timeout(tconn->ping_wait, tconn->cstate < C_WF_REPORT_PARAMS, t);
511
599377ac
PR
512 if (t)
513 goto out;
514 }
b66623e3 515 conn_info(tconn, "sock was shut down by peer\n");
599377ac
PR
516 }
517
b411b363 518 if (rv != size)
bbeb641c 519 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
b411b363 520
599377ac 521out:
b411b363
PR
522 return rv;
523}
524
c6967746
AG
525static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
526{
527 int err;
528
529 err = drbd_recv(tconn, buf, size);
530 if (err != size) {
531 if (err >= 0)
532 err = -EIO;
533 } else
534 err = 0;
535 return err;
536}
537
a5c31904
AG
538static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
539{
540 int err;
541
542 err = drbd_recv_all(tconn, buf, size);
543 if (err && !signal_pending(current))
544 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
545 return err;
546}
547
5dbf1673
LE
548/* quoting tcp(7):
549 * On individual connections, the socket buffer size must be set prior to the
550 * listen(2) or connect(2) calls in order to have it take effect.
551 * This is our wrapper to do so.
552 */
553static void drbd_setbufsize(struct socket *sock, unsigned int snd,
554 unsigned int rcv)
555{
556 /* open coded SO_SNDBUF, SO_RCVBUF */
557 if (snd) {
558 sock->sk->sk_sndbuf = snd;
559 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
560 }
561 if (rcv) {
562 sock->sk->sk_rcvbuf = rcv;
563 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
564 }
565}
566
eac3e990 567static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
b411b363
PR
568{
569 const char *what;
570 struct socket *sock;
571 struct sockaddr_in6 src_in6;
44ed167d
PR
572 struct sockaddr_in6 peer_in6;
573 struct net_conf *nc;
574 int err, peer_addr_len, my_addr_len;
69ef82de 575 int sndbuf_size, rcvbuf_size, connect_int;
b411b363
PR
576 int disconnect_on_error = 1;
577
44ed167d
PR
578 rcu_read_lock();
579 nc = rcu_dereference(tconn->net_conf);
580 if (!nc) {
581 rcu_read_unlock();
b411b363 582 return NULL;
44ed167d 583 }
44ed167d
PR
584 sndbuf_size = nc->sndbuf_size;
585 rcvbuf_size = nc->rcvbuf_size;
69ef82de 586 connect_int = nc->connect_int;
089c075d 587 rcu_read_unlock();
44ed167d 588
089c075d
AG
589 my_addr_len = min_t(int, tconn->my_addr_len, sizeof(src_in6));
590 memcpy(&src_in6, &tconn->my_addr, my_addr_len);
44ed167d 591
089c075d 592 if (((struct sockaddr *)&tconn->my_addr)->sa_family == AF_INET6)
44ed167d
PR
593 src_in6.sin6_port = 0;
594 else
595 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
596
089c075d
AG
597 peer_addr_len = min_t(int, tconn->peer_addr_len, sizeof(src_in6));
598 memcpy(&peer_in6, &tconn->peer_addr, peer_addr_len);
b411b363
PR
599
600 what = "sock_create_kern";
44ed167d
PR
601 err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
602 SOCK_STREAM, IPPROTO_TCP, &sock);
b411b363
PR
603 if (err < 0) {
604 sock = NULL;
605 goto out;
606 }
607
608 sock->sk->sk_rcvtimeo =
69ef82de 609 sock->sk->sk_sndtimeo = connect_int * HZ;
44ed167d 610 drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
b411b363
PR
611
612 /* explicitly bind to the configured IP as source IP
613 * for the outgoing connections.
614 * This is needed for multihomed hosts and to be
615 * able to use lo: interfaces for drbd.
616 * Make sure to use 0 as port number, so linux selects
617 * a free one dynamically.
618 */
b411b363 619 what = "bind before connect";
44ed167d 620 err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
b411b363
PR
621 if (err < 0)
622 goto out;
623
624 /* connect may fail, peer not yet available.
625 * stay C_WF_CONNECTION, don't go Disconnecting! */
626 disconnect_on_error = 0;
627 what = "connect";
44ed167d 628 err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
b411b363
PR
629
630out:
631 if (err < 0) {
632 if (sock) {
633 sock_release(sock);
634 sock = NULL;
635 }
636 switch (-err) {
637 /* timeout, busy, signal pending */
638 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
639 case EINTR: case ERESTARTSYS:
640 /* peer not (yet) available, network problem */
641 case ECONNREFUSED: case ENETUNREACH:
642 case EHOSTDOWN: case EHOSTUNREACH:
643 disconnect_on_error = 0;
644 break;
645 default:
eac3e990 646 conn_err(tconn, "%s failed, err = %d\n", what, err);
b411b363
PR
647 }
648 if (disconnect_on_error)
bbeb641c 649 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363 650 }
44ed167d 651
b411b363
PR
652 return sock;
653}
654
7a426fd8
PR
655struct accept_wait_data {
656 struct drbd_tconn *tconn;
657 struct socket *s_listen;
658 struct completion door_bell;
659 void (*original_sk_state_change)(struct sock *sk);
660
661};
662
715306f6 663static void drbd_incoming_connection(struct sock *sk)
7a426fd8
PR
664{
665 struct accept_wait_data *ad = sk->sk_user_data;
715306f6 666 void (*state_change)(struct sock *sk);
7a426fd8 667
715306f6
AG
668 state_change = ad->original_sk_state_change;
669 if (sk->sk_state == TCP_ESTABLISHED)
670 complete(&ad->door_bell);
671 state_change(sk);
7a426fd8
PR
672}
673
674static int prepare_listen_socket(struct drbd_tconn *tconn, struct accept_wait_data *ad)
b411b363 675{
1f3e509b 676 int err, sndbuf_size, rcvbuf_size, my_addr_len;
44ed167d 677 struct sockaddr_in6 my_addr;
1f3e509b 678 struct socket *s_listen;
44ed167d 679 struct net_conf *nc;
b411b363
PR
680 const char *what;
681
44ed167d
PR
682 rcu_read_lock();
683 nc = rcu_dereference(tconn->net_conf);
684 if (!nc) {
685 rcu_read_unlock();
7a426fd8 686 return -EIO;
44ed167d 687 }
44ed167d
PR
688 sndbuf_size = nc->sndbuf_size;
689 rcvbuf_size = nc->rcvbuf_size;
44ed167d 690 rcu_read_unlock();
b411b363 691
089c075d
AG
692 my_addr_len = min_t(int, tconn->my_addr_len, sizeof(struct sockaddr_in6));
693 memcpy(&my_addr, &tconn->my_addr, my_addr_len);
b411b363
PR
694
695 what = "sock_create_kern";
44ed167d 696 err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
1f3e509b 697 SOCK_STREAM, IPPROTO_TCP, &s_listen);
b411b363
PR
698 if (err) {
699 s_listen = NULL;
700 goto out;
701 }
702
98683650 703 s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
44ed167d 704 drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
b411b363
PR
705
706 what = "bind before listen";
44ed167d 707 err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
b411b363
PR
708 if (err < 0)
709 goto out;
710
7a426fd8
PR
711 ad->s_listen = s_listen;
712 write_lock_bh(&s_listen->sk->sk_callback_lock);
713 ad->original_sk_state_change = s_listen->sk->sk_state_change;
715306f6 714 s_listen->sk->sk_state_change = drbd_incoming_connection;
7a426fd8
PR
715 s_listen->sk->sk_user_data = ad;
716 write_unlock_bh(&s_listen->sk->sk_callback_lock);
b411b363 717
2820fd39
PR
718 what = "listen";
719 err = s_listen->ops->listen(s_listen, 5);
720 if (err < 0)
721 goto out;
722
7a426fd8 723 return 0;
b411b363
PR
724out:
725 if (s_listen)
726 sock_release(s_listen);
727 if (err < 0) {
728 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
1f3e509b
PR
729 conn_err(tconn, "%s failed, err = %d\n", what, err);
730 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
731 }
732 }
b411b363 733
7a426fd8 734 return -EIO;
b411b363
PR
735}
736
715306f6 737static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
b411b363 738{
715306f6
AG
739 write_lock_bh(&sk->sk_callback_lock);
740 sk->sk_state_change = ad->original_sk_state_change;
741 sk->sk_user_data = NULL;
742 write_unlock_bh(&sk->sk_callback_lock);
b411b363
PR
743}
744
7a426fd8 745static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn, struct accept_wait_data *ad)
b411b363 746{
1f3e509b
PR
747 int timeo, connect_int, err = 0;
748 struct socket *s_estab = NULL;
1f3e509b
PR
749 struct net_conf *nc;
750
751 rcu_read_lock();
752 nc = rcu_dereference(tconn->net_conf);
753 if (!nc) {
754 rcu_read_unlock();
755 return NULL;
756 }
757 connect_int = nc->connect_int;
758 rcu_read_unlock();
759
760 timeo = connect_int * HZ;
38b682b2
AM
761 /* 28.5% random jitter */
762 timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
1f3e509b 763
7a426fd8
PR
764 err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
765 if (err <= 0)
766 return NULL;
b411b363 767
7a426fd8 768 err = kernel_accept(ad->s_listen, &s_estab, 0);
b411b363
PR
769 if (err < 0) {
770 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
1f3e509b 771 conn_err(tconn, "accept failed, err = %d\n", err);
bbeb641c 772 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
773 }
774 }
b411b363 775
715306f6
AG
776 if (s_estab)
777 unregister_state_change(s_estab->sk, ad);
b411b363 778
b411b363
PR
779 return s_estab;
780}
b411b363 781
e658983a 782static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
b411b363 783
9f5bdc33
AG
784static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
785 enum drbd_packet cmd)
786{
787 if (!conn_prepare_command(tconn, sock))
788 return -EIO;
e658983a 789 return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
b411b363
PR
790}
791
9f5bdc33 792static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
b411b363 793{
9f5bdc33
AG
794 unsigned int header_size = drbd_header_size(tconn);
795 struct packet_info pi;
796 int err;
b411b363 797
9f5bdc33
AG
798 err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
799 if (err != header_size) {
800 if (err >= 0)
801 err = -EIO;
802 return err;
803 }
804 err = decode_header(tconn, tconn->data.rbuf, &pi);
805 if (err)
806 return err;
807 return pi.cmd;
b411b363
PR
808}
809
810/**
811 * drbd_socket_okay() - Free the socket if its connection is not okay
b411b363
PR
812 * @sock: pointer to the pointer to the socket.
813 */
dbd9eea0 814static int drbd_socket_okay(struct socket **sock)
b411b363
PR
815{
816 int rr;
817 char tb[4];
818
819 if (!*sock)
81e84650 820 return false;
b411b363 821
dbd9eea0 822 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
b411b363
PR
823
824 if (rr > 0 || rr == -EAGAIN) {
81e84650 825 return true;
b411b363
PR
826 } else {
827 sock_release(*sock);
828 *sock = NULL;
81e84650 829 return false;
b411b363
PR
830 }
831}
2325eb66
PR
832/* Gets called if a connection is established, or if a new minor gets created
833 in a connection */
b30ab791 834int drbd_connected(struct drbd_device *device)
907599e0 835{
0829f5ed 836 int err;
907599e0 837
b30ab791
AG
838 atomic_set(&device->packet_seq, 0);
839 device->peer_seq = 0;
907599e0 840
b30ab791
AG
841 device->state_mutex = device->tconn->agreed_pro_version < 100 ?
842 &device->tconn->cstate_mutex :
843 &device->own_state_mutex;
8410da8f 844
b30ab791 845 err = drbd_send_sync_param(device);
0829f5ed 846 if (!err)
b30ab791 847 err = drbd_send_sizes(device, 0, 0);
0829f5ed 848 if (!err)
b30ab791 849 err = drbd_send_uuids(device);
0829f5ed 850 if (!err)
b30ab791
AG
851 err = drbd_send_current_state(device);
852 clear_bit(USE_DEGR_WFC_T, &device->flags);
853 clear_bit(RESIZE_PENDING, &device->flags);
854 atomic_set(&device->ap_in_flight, 0);
855 mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
0829f5ed 856 return err;
907599e0 857}
b411b363
PR
858
859/*
860 * return values:
861 * 1 yes, we have a valid connection
862 * 0 oops, did not work out, please try again
863 * -1 peer talks different language,
864 * no point in trying again, please go standalone.
865 * -2 We do not have a network config...
866 */
81fa2e67 867static int conn_connect(struct drbd_tconn *tconn)
b411b363 868{
7da35862 869 struct drbd_socket sock, msock;
b30ab791 870 struct drbd_device *device;
44ed167d 871 struct net_conf *nc;
92f14951 872 int vnr, timeout, h, ok;
08b165ba 873 bool discard_my_data;
197296ff 874 enum drbd_state_rv rv;
7a426fd8
PR
875 struct accept_wait_data ad = {
876 .tconn = tconn,
877 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
878 };
b411b363 879
b66623e3 880 clear_bit(DISCONNECT_SENT, &tconn->flags);
bbeb641c 881 if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
b411b363
PR
882 return -2;
883
7da35862
PR
884 mutex_init(&sock.mutex);
885 sock.sbuf = tconn->data.sbuf;
886 sock.rbuf = tconn->data.rbuf;
887 sock.socket = NULL;
888 mutex_init(&msock.mutex);
889 msock.sbuf = tconn->meta.sbuf;
890 msock.rbuf = tconn->meta.rbuf;
891 msock.socket = NULL;
892
0916e0e3
AG
893 /* Assume that the peer only understands protocol 80 until we know better. */
894 tconn->agreed_pro_version = 80;
b411b363 895
7a426fd8
PR
896 if (prepare_listen_socket(tconn, &ad))
897 return 0;
b411b363
PR
898
899 do {
2bf89621 900 struct socket *s;
b411b363 901
92f14951 902 s = drbd_try_connect(tconn);
b411b363 903 if (s) {
7da35862
PR
904 if (!sock.socket) {
905 sock.socket = s;
906 send_first_packet(tconn, &sock, P_INITIAL_DATA);
907 } else if (!msock.socket) {
427c0434 908 clear_bit(RESOLVE_CONFLICTS, &tconn->flags);
7da35862
PR
909 msock.socket = s;
910 send_first_packet(tconn, &msock, P_INITIAL_META);
b411b363 911 } else {
81fa2e67 912 conn_err(tconn, "Logic error in conn_connect()\n");
b411b363
PR
913 goto out_release_sockets;
914 }
915 }
916
7da35862
PR
917 if (sock.socket && msock.socket) {
918 rcu_read_lock();
919 nc = rcu_dereference(tconn->net_conf);
920 timeout = nc->ping_timeo * HZ / 10;
921 rcu_read_unlock();
922 schedule_timeout_interruptible(timeout);
923 ok = drbd_socket_okay(&sock.socket);
924 ok = drbd_socket_okay(&msock.socket) && ok;
b411b363
PR
925 if (ok)
926 break;
927 }
928
929retry:
7a426fd8 930 s = drbd_wait_for_connect(tconn, &ad);
b411b363 931 if (s) {
92f14951 932 int fp = receive_first_packet(tconn, s);
7da35862
PR
933 drbd_socket_okay(&sock.socket);
934 drbd_socket_okay(&msock.socket);
92f14951 935 switch (fp) {
e5d6f33a 936 case P_INITIAL_DATA:
7da35862 937 if (sock.socket) {
907599e0 938 conn_warn(tconn, "initial packet S crossed\n");
7da35862 939 sock_release(sock.socket);
80c6eed4
PR
940 sock.socket = s;
941 goto randomize;
b411b363 942 }
7da35862 943 sock.socket = s;
b411b363 944 break;
e5d6f33a 945 case P_INITIAL_META:
427c0434 946 set_bit(RESOLVE_CONFLICTS, &tconn->flags);
7da35862 947 if (msock.socket) {
907599e0 948 conn_warn(tconn, "initial packet M crossed\n");
7da35862 949 sock_release(msock.socket);
80c6eed4
PR
950 msock.socket = s;
951 goto randomize;
b411b363 952 }
7da35862 953 msock.socket = s;
b411b363
PR
954 break;
955 default:
907599e0 956 conn_warn(tconn, "Error receiving initial packet\n");
b411b363 957 sock_release(s);
80c6eed4 958randomize:
38b682b2 959 if (prandom_u32() & 1)
b411b363
PR
960 goto retry;
961 }
962 }
963
bbeb641c 964 if (tconn->cstate <= C_DISCONNECTING)
b411b363
PR
965 goto out_release_sockets;
966 if (signal_pending(current)) {
967 flush_signals(current);
968 smp_rmb();
907599e0 969 if (get_t_state(&tconn->receiver) == EXITING)
b411b363
PR
970 goto out_release_sockets;
971 }
972
b666dbf8
PR
973 ok = drbd_socket_okay(&sock.socket);
974 ok = drbd_socket_okay(&msock.socket) && ok;
975 } while (!ok);
b411b363 976
7a426fd8
PR
977 if (ad.s_listen)
978 sock_release(ad.s_listen);
b411b363 979
98683650
PR
980 sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
981 msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
b411b363 982
7da35862
PR
983 sock.socket->sk->sk_allocation = GFP_NOIO;
984 msock.socket->sk->sk_allocation = GFP_NOIO;
b411b363 985
7da35862
PR
986 sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
987 msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
b411b363 988
b411b363 989 /* NOT YET ...
7da35862
PR
990 * sock.socket->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
991 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
6038178e 992 * first set it to the P_CONNECTION_FEATURES timeout,
b411b363 993 * which we set to 4x the configured ping_timeout. */
44ed167d
PR
994 rcu_read_lock();
995 nc = rcu_dereference(tconn->net_conf);
996
7da35862
PR
997 sock.socket->sk->sk_sndtimeo =
998 sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
b411b363 999
7da35862 1000 msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
44ed167d 1001 timeout = nc->timeout * HZ / 10;
08b165ba 1002 discard_my_data = nc->discard_my_data;
44ed167d 1003 rcu_read_unlock();
b411b363 1004
7da35862 1005 msock.socket->sk->sk_sndtimeo = timeout;
b411b363
PR
1006
1007 /* we don't want delays.
25985edc 1008 * we use TCP_CORK where appropriate, though */
7da35862
PR
1009 drbd_tcp_nodelay(sock.socket);
1010 drbd_tcp_nodelay(msock.socket);
b411b363 1011
7da35862
PR
1012 tconn->data.socket = sock.socket;
1013 tconn->meta.socket = msock.socket;
907599e0 1014 tconn->last_received = jiffies;
b411b363 1015
6038178e 1016 h = drbd_do_features(tconn);
b411b363
PR
1017 if (h <= 0)
1018 return h;
1019
907599e0 1020 if (tconn->cram_hmac_tfm) {
b30ab791 1021 /* drbd_request_state(device, NS(conn, WFAuth)); */
907599e0 1022 switch (drbd_do_auth(tconn)) {
b10d96cb 1023 case -1:
907599e0 1024 conn_err(tconn, "Authentication of peer failed\n");
b411b363 1025 return -1;
b10d96cb 1026 case 0:
907599e0 1027 conn_err(tconn, "Authentication of peer failed, trying again.\n");
b10d96cb 1028 return 0;
b411b363
PR
1029 }
1030 }
1031
7da35862
PR
1032 tconn->data.socket->sk->sk_sndtimeo = timeout;
1033 tconn->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
b411b363 1034
387eb308 1035 if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
7e2455c1 1036 return -1;
b411b363 1037
a1096a6e
PR
1038 set_bit(STATE_SENT, &tconn->flags);
1039
c141ebda 1040 rcu_read_lock();
b30ab791
AG
1041 idr_for_each_entry(&tconn->volumes, device, vnr) {
1042 kref_get(&device->kref);
26ea8f92
AG
1043 rcu_read_unlock();
1044
13c76aba
PR
1045 /* Prevent a race between resync-handshake and
1046 * being promoted to Primary.
1047 *
1048 * Grab and release the state mutex, so we know that any current
1049 * drbd_set_role() is finished, and any incoming drbd_set_role
1050 * will see the STATE_SENT flag, and wait for it to be cleared.
1051 */
b30ab791
AG
1052 mutex_lock(device->state_mutex);
1053 mutex_unlock(device->state_mutex);
13c76aba 1054
08b165ba 1055 if (discard_my_data)
b30ab791 1056 set_bit(DISCARD_MY_DATA, &device->flags);
08b165ba 1057 else
b30ab791 1058 clear_bit(DISCARD_MY_DATA, &device->flags);
08b165ba 1059
b30ab791
AG
1060 drbd_connected(device);
1061 kref_put(&device->kref, &drbd_minor_destroy);
c141ebda
PR
1062 rcu_read_lock();
1063 }
1064 rcu_read_unlock();
1065
a1096a6e 1066 rv = conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
ed635cb0 1067 if (rv < SS_SUCCESS || tconn->cstate != C_WF_REPORT_PARAMS) {
a1096a6e 1068 clear_bit(STATE_SENT, &tconn->flags);
1e86ac48 1069 return 0;
a1096a6e 1070 }
1e86ac48 1071
823bd832 1072 drbd_thread_start(&tconn->asender);
b411b363 1073
08b165ba
PR
1074 mutex_lock(&tconn->conf_update);
1075 /* The discard_my_data flag is a single-shot modifier to the next
1076 * connection attempt, the handshake of which is now well underway.
1077 * No need for rcu style copying of the whole struct
1078 * just to clear a single value. */
1079 tconn->net_conf->discard_my_data = 0;
1080 mutex_unlock(&tconn->conf_update);
1081
d3fcb490 1082 return h;
b411b363
PR
1083
1084out_release_sockets:
7a426fd8
PR
1085 if (ad.s_listen)
1086 sock_release(ad.s_listen);
7da35862
PR
1087 if (sock.socket)
1088 sock_release(sock.socket);
1089 if (msock.socket)
1090 sock_release(msock.socket);
b411b363
PR
1091 return -1;
1092}
1093
e658983a 1094static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
b411b363 1095{
e658983a
AG
1096 unsigned int header_size = drbd_header_size(tconn);
1097
0c8e36d9
AG
1098 if (header_size == sizeof(struct p_header100) &&
1099 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1100 struct p_header100 *h = header;
1101 if (h->pad != 0) {
1102 conn_err(tconn, "Header padding is not zero\n");
1103 return -EINVAL;
1104 }
1105 pi->vnr = be16_to_cpu(h->volume);
1106 pi->cmd = be16_to_cpu(h->command);
1107 pi->size = be32_to_cpu(h->length);
1108 } else if (header_size == sizeof(struct p_header95) &&
1109 *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
e658983a 1110 struct p_header95 *h = header;
e658983a 1111 pi->cmd = be16_to_cpu(h->command);
b55d84ba
AG
1112 pi->size = be32_to_cpu(h->length);
1113 pi->vnr = 0;
e658983a
AG
1114 } else if (header_size == sizeof(struct p_header80) &&
1115 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1116 struct p_header80 *h = header;
1117 pi->cmd = be16_to_cpu(h->command);
1118 pi->size = be16_to_cpu(h->length);
77351055 1119 pi->vnr = 0;
02918be2 1120 } else {
e658983a
AG
1121 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1122 be32_to_cpu(*(__be32 *)header),
1123 tconn->agreed_pro_version);
8172f3e9 1124 return -EINVAL;
b411b363 1125 }
e658983a 1126 pi->data = header + header_size;
8172f3e9 1127 return 0;
257d0af6 1128}
b411b363 1129
9ba7aa00 1130static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
257d0af6 1131{
e658983a 1132 void *buffer = tconn->data.rbuf;
69bc7bc3 1133 int err;
257d0af6 1134
e658983a 1135 err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
a5c31904 1136 if (err)
69bc7bc3 1137 return err;
257d0af6 1138
e658983a 1139 err = decode_header(tconn, buffer, pi);
9ba7aa00 1140 tconn->last_received = jiffies;
b411b363 1141
69bc7bc3 1142 return err;
b411b363
PR
1143}
1144
4b0007c0 1145static void drbd_flush(struct drbd_tconn *tconn)
b411b363
PR
1146{
1147 int rv;
b30ab791 1148 struct drbd_device *device;
4b0007c0
PR
1149 int vnr;
1150
1151 if (tconn->write_ordering >= WO_bdev_flush) {
615e087f 1152 rcu_read_lock();
b30ab791
AG
1153 idr_for_each_entry(&tconn->volumes, device, vnr) {
1154 if (!get_ldev(device))
615e087f 1155 continue;
b30ab791 1156 kref_get(&device->kref);
615e087f
LE
1157 rcu_read_unlock();
1158
b30ab791 1159 rv = blkdev_issue_flush(device->ldev->backing_bdev,
615e087f
LE
1160 GFP_NOIO, NULL);
1161 if (rv) {
1162 dev_info(DEV, "local disk flush failed with status %d\n", rv);
1163 /* would rather check on EOPNOTSUPP, but that is not reliable.
1164 * don't try again for ANY return value != 0
1165 * if (rv == -EOPNOTSUPP) */
1166 drbd_bump_write_ordering(tconn, WO_drain_io);
4b0007c0 1167 }
b30ab791
AG
1168 put_ldev(device);
1169 kref_put(&device->kref, &drbd_minor_destroy);
b411b363 1170
615e087f
LE
1171 rcu_read_lock();
1172 if (rv)
1173 break;
b411b363 1174 }
615e087f 1175 rcu_read_unlock();
b411b363 1176 }
b411b363
PR
1177}
1178
1179/**
1180 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
b30ab791 1181 * @device: DRBD device.
b411b363
PR
1182 * @epoch: Epoch object.
1183 * @ev: Epoch event.
1184 */
1e9dd291 1185static enum finish_epoch drbd_may_finish_epoch(struct drbd_tconn *tconn,
b411b363
PR
1186 struct drbd_epoch *epoch,
1187 enum epoch_event ev)
1188{
2451fc3b 1189 int epoch_size;
b411b363 1190 struct drbd_epoch *next_epoch;
b411b363
PR
1191 enum finish_epoch rv = FE_STILL_LIVE;
1192
12038a3a 1193 spin_lock(&tconn->epoch_lock);
b411b363
PR
1194 do {
1195 next_epoch = NULL;
b411b363
PR
1196
1197 epoch_size = atomic_read(&epoch->epoch_size);
1198
1199 switch (ev & ~EV_CLEANUP) {
1200 case EV_PUT:
1201 atomic_dec(&epoch->active);
1202 break;
1203 case EV_GOT_BARRIER_NR:
1204 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
b411b363
PR
1205 break;
1206 case EV_BECAME_LAST:
1207 /* nothing to do*/
1208 break;
1209 }
1210
b411b363
PR
1211 if (epoch_size != 0 &&
1212 atomic_read(&epoch->active) == 0 &&
80f9fd55 1213 (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
b411b363 1214 if (!(ev & EV_CLEANUP)) {
12038a3a 1215 spin_unlock(&tconn->epoch_lock);
9ed57dcb 1216 drbd_send_b_ack(epoch->tconn, epoch->barrier_nr, epoch_size);
12038a3a 1217 spin_lock(&tconn->epoch_lock);
b411b363 1218 }
9ed57dcb
LE
1219#if 0
1220 /* FIXME: dec unacked on connection, once we have
1221 * something to count pending connection packets in. */
80f9fd55 1222 if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
9ed57dcb
LE
1223 dec_unacked(epoch->tconn);
1224#endif
b411b363 1225
12038a3a 1226 if (tconn->current_epoch != epoch) {
b411b363
PR
1227 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1228 list_del(&epoch->list);
1229 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
12038a3a 1230 tconn->epochs--;
b411b363
PR
1231 kfree(epoch);
1232
1233 if (rv == FE_STILL_LIVE)
1234 rv = FE_DESTROYED;
1235 } else {
1236 epoch->flags = 0;
1237 atomic_set(&epoch->epoch_size, 0);
698f9315 1238 /* atomic_set(&epoch->active, 0); is already zero */
b411b363
PR
1239 if (rv == FE_STILL_LIVE)
1240 rv = FE_RECYCLED;
1241 }
1242 }
1243
1244 if (!next_epoch)
1245 break;
1246
1247 epoch = next_epoch;
1248 } while (1);
1249
12038a3a 1250 spin_unlock(&tconn->epoch_lock);
b411b363 1251
b411b363
PR
1252 return rv;
1253}
1254
1255/**
1256 * drbd_bump_write_ordering() - Fall back to an other write ordering method
4b0007c0 1257 * @tconn: DRBD connection.
b411b363
PR
1258 * @wo: Write ordering method to try.
1259 */
4b0007c0 1260void drbd_bump_write_ordering(struct drbd_tconn *tconn, enum write_ordering_e wo)
b411b363 1261{
daeda1cc 1262 struct disk_conf *dc;
b30ab791 1263 struct drbd_device *device;
b411b363 1264 enum write_ordering_e pwo;
4b0007c0 1265 int vnr;
b411b363
PR
1266 static char *write_ordering_str[] = {
1267 [WO_none] = "none",
1268 [WO_drain_io] = "drain",
1269 [WO_bdev_flush] = "flush",
b411b363
PR
1270 };
1271
4b0007c0 1272 pwo = tconn->write_ordering;
b411b363 1273 wo = min(pwo, wo);
daeda1cc 1274 rcu_read_lock();
b30ab791
AG
1275 idr_for_each_entry(&tconn->volumes, device, vnr) {
1276 if (!get_ldev_if_state(device, D_ATTACHING))
4b0007c0 1277 continue;
b30ab791 1278 dc = rcu_dereference(device->ldev->disk_conf);
4b0007c0
PR
1279
1280 if (wo == WO_bdev_flush && !dc->disk_flushes)
1281 wo = WO_drain_io;
1282 if (wo == WO_drain_io && !dc->disk_drain)
1283 wo = WO_none;
b30ab791 1284 put_ldev(device);
4b0007c0 1285 }
daeda1cc 1286 rcu_read_unlock();
4b0007c0
PR
1287 tconn->write_ordering = wo;
1288 if (pwo != tconn->write_ordering || wo == WO_bdev_flush)
1289 conn_info(tconn, "Method to ensure write ordering: %s\n", write_ordering_str[tconn->write_ordering]);
b411b363
PR
1290}
1291
45bb912b 1292/**
fbe29dec 1293 * drbd_submit_peer_request()
b30ab791 1294 * @device: DRBD device.
db830c46 1295 * @peer_req: peer request
45bb912b 1296 * @rw: flag field, see bio->bi_rw
10f6d992
LE
1297 *
1298 * May spread the pages to multiple bios,
1299 * depending on bio_add_page restrictions.
1300 *
1301 * Returns 0 if all bios have been submitted,
1302 * -ENOMEM if we could not allocate enough bios,
1303 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1304 * single page to an empty bio (which should never happen and likely indicates
1305 * that the lower level IO stack is in some way broken). This has been observed
1306 * on certain Xen deployments.
45bb912b
LE
1307 */
1308/* TODO allocate from our own bio_set. */
b30ab791 1309int drbd_submit_peer_request(struct drbd_device *device,
fbe29dec
AG
1310 struct drbd_peer_request *peer_req,
1311 const unsigned rw, const int fault_type)
45bb912b
LE
1312{
1313 struct bio *bios = NULL;
1314 struct bio *bio;
db830c46
AG
1315 struct page *page = peer_req->pages;
1316 sector_t sector = peer_req->i.sector;
1317 unsigned ds = peer_req->i.size;
45bb912b
LE
1318 unsigned n_bios = 0;
1319 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
10f6d992 1320 int err = -ENOMEM;
45bb912b
LE
1321
1322 /* In most cases, we will only need one bio. But in case the lower
1323 * level restrictions happen to be different at this offset on this
1324 * side than those of the sending peer, we may need to submit the
9476f39d
LE
1325 * request in more than one bio.
1326 *
1327 * Plain bio_alloc is good enough here, this is no DRBD internally
1328 * generated bio, but a bio allocated on behalf of the peer.
1329 */
45bb912b
LE
1330next_bio:
1331 bio = bio_alloc(GFP_NOIO, nr_pages);
1332 if (!bio) {
1333 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1334 goto fail;
1335 }
db830c46 1336 /* > peer_req->i.sector, unless this is the first bio */
4f024f37 1337 bio->bi_iter.bi_sector = sector;
b30ab791 1338 bio->bi_bdev = device->ldev->backing_bdev;
45bb912b 1339 bio->bi_rw = rw;
db830c46 1340 bio->bi_private = peer_req;
fcefa62e 1341 bio->bi_end_io = drbd_peer_request_endio;
45bb912b
LE
1342
1343 bio->bi_next = bios;
1344 bios = bio;
1345 ++n_bios;
1346
1347 page_chain_for_each(page) {
1348 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1349 if (!bio_add_page(bio, page, len, 0)) {
10f6d992
LE
1350 /* A single page must always be possible!
1351 * But in case it fails anyways,
1352 * we deal with it, and complain (below). */
1353 if (bio->bi_vcnt == 0) {
1354 dev_err(DEV,
1355 "bio_add_page failed for len=%u, "
1356 "bi_vcnt=0 (bi_sector=%llu)\n",
4f024f37 1357 len, (uint64_t)bio->bi_iter.bi_sector);
10f6d992
LE
1358 err = -ENOSPC;
1359 goto fail;
1360 }
45bb912b
LE
1361 goto next_bio;
1362 }
1363 ds -= len;
1364 sector += len >> 9;
1365 --nr_pages;
1366 }
1367 D_ASSERT(page == NULL);
1368 D_ASSERT(ds == 0);
1369
db830c46 1370 atomic_set(&peer_req->pending_bios, n_bios);
45bb912b
LE
1371 do {
1372 bio = bios;
1373 bios = bios->bi_next;
1374 bio->bi_next = NULL;
1375
b30ab791 1376 drbd_generic_make_request(device, fault_type, bio);
45bb912b 1377 } while (bios);
45bb912b
LE
1378 return 0;
1379
1380fail:
1381 while (bios) {
1382 bio = bios;
1383 bios = bios->bi_next;
1384 bio_put(bio);
1385 }
10f6d992 1386 return err;
45bb912b
LE
1387}
1388
b30ab791 1389static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
db830c46 1390 struct drbd_peer_request *peer_req)
53840641 1391{
db830c46 1392 struct drbd_interval *i = &peer_req->i;
53840641 1393
b30ab791 1394 drbd_remove_interval(&device->write_requests, i);
53840641
AG
1395 drbd_clear_interval(i);
1396
6c852bec 1397 /* Wake up any processes waiting for this peer request to complete. */
53840641 1398 if (i->waiting)
b30ab791 1399 wake_up(&device->misc_wait);
53840641
AG
1400}
1401
f63e631a 1402static void conn_wait_active_ee_empty(struct drbd_tconn *tconn)
77fede51 1403{
b30ab791 1404 struct drbd_device *device;
77fede51
PR
1405 int vnr;
1406
1407 rcu_read_lock();
b30ab791
AG
1408 idr_for_each_entry(&tconn->volumes, device, vnr) {
1409 kref_get(&device->kref);
77fede51 1410 rcu_read_unlock();
b30ab791
AG
1411 drbd_wait_ee_list_empty(device, &device->active_ee);
1412 kref_put(&device->kref, &drbd_minor_destroy);
77fede51
PR
1413 rcu_read_lock();
1414 }
1415 rcu_read_unlock();
1416}
1417
4a76b161 1418static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 1419{
2451fc3b 1420 int rv;
e658983a 1421 struct p_barrier *p = pi->data;
b411b363
PR
1422 struct drbd_epoch *epoch;
1423
9ed57dcb
LE
1424 /* FIXME these are unacked on connection,
1425 * not a specific (peer)device.
1426 */
12038a3a 1427 tconn->current_epoch->barrier_nr = p->barrier;
9ed57dcb 1428 tconn->current_epoch->tconn = tconn;
1e9dd291 1429 rv = drbd_may_finish_epoch(tconn, tconn->current_epoch, EV_GOT_BARRIER_NR);
b411b363
PR
1430
1431 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1432 * the activity log, which means it would not be resynced in case the
1433 * R_PRIMARY crashes now.
1434 * Therefore we must send the barrier_ack after the barrier request was
1435 * completed. */
4b0007c0 1436 switch (tconn->write_ordering) {
b411b363
PR
1437 case WO_none:
1438 if (rv == FE_RECYCLED)
82bc0194 1439 return 0;
2451fc3b
PR
1440
1441 /* receiver context, in the writeout path of the other node.
1442 * avoid potential distributed deadlock */
1443 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1444 if (epoch)
1445 break;
1446 else
9ed57dcb 1447 conn_warn(tconn, "Allocation of an epoch failed, slowing down\n");
2451fc3b 1448 /* Fall through */
b411b363
PR
1449
1450 case WO_bdev_flush:
1451 case WO_drain_io:
77fede51 1452 conn_wait_active_ee_empty(tconn);
4b0007c0 1453 drbd_flush(tconn);
2451fc3b 1454
12038a3a 1455 if (atomic_read(&tconn->current_epoch->epoch_size)) {
2451fc3b
PR
1456 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1457 if (epoch)
1458 break;
b411b363
PR
1459 }
1460
82bc0194 1461 return 0;
2451fc3b 1462 default:
9ed57dcb 1463 conn_err(tconn, "Strangeness in tconn->write_ordering %d\n", tconn->write_ordering);
82bc0194 1464 return -EIO;
b411b363
PR
1465 }
1466
1467 epoch->flags = 0;
1468 atomic_set(&epoch->epoch_size, 0);
1469 atomic_set(&epoch->active, 0);
1470
12038a3a
PR
1471 spin_lock(&tconn->epoch_lock);
1472 if (atomic_read(&tconn->current_epoch->epoch_size)) {
1473 list_add(&epoch->list, &tconn->current_epoch->list);
1474 tconn->current_epoch = epoch;
1475 tconn->epochs++;
b411b363
PR
1476 } else {
1477 /* The current_epoch got recycled while we allocated this one... */
1478 kfree(epoch);
1479 }
12038a3a 1480 spin_unlock(&tconn->epoch_lock);
b411b363 1481
82bc0194 1482 return 0;
b411b363
PR
1483}
1484
1485/* used from receive_RSDataReply (recv_resync_read)
1486 * and from receive_Data */
f6ffca9f 1487static struct drbd_peer_request *
b30ab791 1488read_in_block(struct drbd_device *device, u64 id, sector_t sector,
f6ffca9f 1489 int data_size) __must_hold(local)
b411b363 1490{
b30ab791 1491 const sector_t capacity = drbd_get_capacity(device->this_bdev);
db830c46 1492 struct drbd_peer_request *peer_req;
b411b363 1493 struct page *page;
a5c31904 1494 int dgs, ds, err;
b30ab791
AG
1495 void *dig_in = device->tconn->int_dig_in;
1496 void *dig_vv = device->tconn->int_dig_vv;
6b4388ac 1497 unsigned long *data;
b411b363 1498
88104ca4 1499 dgs = 0;
b30ab791
AG
1500 if (device->tconn->peer_integrity_tfm) {
1501 dgs = crypto_hash_digestsize(device->tconn->peer_integrity_tfm);
9f5bdc33
AG
1502 /*
1503 * FIXME: Receive the incoming digest into the receive buffer
1504 * here, together with its struct p_data?
1505 */
b30ab791 1506 err = drbd_recv_all_warn(device->tconn, dig_in, dgs);
a5c31904 1507 if (err)
b411b363 1508 return NULL;
88104ca4 1509 data_size -= dgs;
b411b363
PR
1510 }
1511
841ce241
AG
1512 if (!expect(IS_ALIGNED(data_size, 512)))
1513 return NULL;
1514 if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1515 return NULL;
b411b363 1516
6666032a
LE
1517 /* even though we trust out peer,
1518 * we sometimes have to double check. */
1519 if (sector + (data_size>>9) > capacity) {
fdda6544
LE
1520 dev_err(DEV, "request from peer beyond end of local disk: "
1521 "capacity: %llus < sector: %llus + size: %u\n",
6666032a
LE
1522 (unsigned long long)capacity,
1523 (unsigned long long)sector, data_size);
1524 return NULL;
1525 }
1526
b411b363
PR
1527 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1528 * "criss-cross" setup, that might cause write-out on some other DRBD,
1529 * which in turn might block on the other node at this very place. */
b30ab791 1530 peer_req = drbd_alloc_peer_req(device, id, sector, data_size, GFP_NOIO);
db830c46 1531 if (!peer_req)
b411b363 1532 return NULL;
45bb912b 1533
a73ff323 1534 if (!data_size)
81a3537a 1535 return peer_req;
a73ff323 1536
b411b363 1537 ds = data_size;
db830c46 1538 page = peer_req->pages;
45bb912b
LE
1539 page_chain_for_each(page) {
1540 unsigned len = min_t(int, ds, PAGE_SIZE);
6b4388ac 1541 data = kmap(page);
b30ab791
AG
1542 err = drbd_recv_all_warn(device->tconn, data, len);
1543 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
6b4388ac
PR
1544 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1545 data[0] = data[0] ^ (unsigned long)-1;
1546 }
b411b363 1547 kunmap(page);
a5c31904 1548 if (err) {
b30ab791 1549 drbd_free_peer_req(device, peer_req);
b411b363
PR
1550 return NULL;
1551 }
a5c31904 1552 ds -= len;
b411b363
PR
1553 }
1554
1555 if (dgs) {
b30ab791 1556 drbd_csum_ee(device, device->tconn->peer_integrity_tfm, peer_req, dig_vv);
b411b363 1557 if (memcmp(dig_in, dig_vv, dgs)) {
470be44a
LE
1558 dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1559 (unsigned long long)sector, data_size);
b30ab791 1560 drbd_free_peer_req(device, peer_req);
b411b363
PR
1561 return NULL;
1562 }
1563 }
b30ab791 1564 device->recv_cnt += data_size>>9;
db830c46 1565 return peer_req;
b411b363
PR
1566}
1567
1568/* drbd_drain_block() just takes a data block
1569 * out of the socket input buffer, and discards it.
1570 */
b30ab791 1571static int drbd_drain_block(struct drbd_device *device, int data_size)
b411b363
PR
1572{
1573 struct page *page;
a5c31904 1574 int err = 0;
b411b363
PR
1575 void *data;
1576
c3470cde 1577 if (!data_size)
fc5be839 1578 return 0;
c3470cde 1579
b30ab791 1580 page = drbd_alloc_pages(device, 1, 1);
b411b363
PR
1581
1582 data = kmap(page);
1583 while (data_size) {
fc5be839
AG
1584 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1585
b30ab791 1586 err = drbd_recv_all_warn(device->tconn, data, len);
a5c31904 1587 if (err)
b411b363 1588 break;
a5c31904 1589 data_size -= len;
b411b363
PR
1590 }
1591 kunmap(page);
b30ab791 1592 drbd_free_pages(device, page, 0);
fc5be839 1593 return err;
b411b363
PR
1594}
1595
b30ab791 1596static int recv_dless_read(struct drbd_device *device, struct drbd_request *req,
b411b363
PR
1597 sector_t sector, int data_size)
1598{
7988613b
KO
1599 struct bio_vec bvec;
1600 struct bvec_iter iter;
b411b363 1601 struct bio *bio;
7988613b 1602 int dgs, err, expect;
b30ab791
AG
1603 void *dig_in = device->tconn->int_dig_in;
1604 void *dig_vv = device->tconn->int_dig_vv;
b411b363 1605
88104ca4 1606 dgs = 0;
b30ab791
AG
1607 if (device->tconn->peer_integrity_tfm) {
1608 dgs = crypto_hash_digestsize(device->tconn->peer_integrity_tfm);
1609 err = drbd_recv_all_warn(device->tconn, dig_in, dgs);
a5c31904
AG
1610 if (err)
1611 return err;
88104ca4 1612 data_size -= dgs;
b411b363
PR
1613 }
1614
b411b363
PR
1615 /* optimistically update recv_cnt. if receiving fails below,
1616 * we disconnect anyways, and counters will be reset. */
b30ab791 1617 device->recv_cnt += data_size>>9;
b411b363
PR
1618
1619 bio = req->master_bio;
4f024f37 1620 D_ASSERT(sector == bio->bi_iter.bi_sector);
b411b363 1621
7988613b
KO
1622 bio_for_each_segment(bvec, bio, iter) {
1623 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1624 expect = min_t(int, data_size, bvec.bv_len);
b30ab791 1625 err = drbd_recv_all_warn(device->tconn, mapped, expect);
7988613b 1626 kunmap(bvec.bv_page);
a5c31904
AG
1627 if (err)
1628 return err;
1629 data_size -= expect;
b411b363
PR
1630 }
1631
1632 if (dgs) {
b30ab791 1633 drbd_csum_bio(device, device->tconn->peer_integrity_tfm, bio, dig_vv);
b411b363
PR
1634 if (memcmp(dig_in, dig_vv, dgs)) {
1635 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
28284cef 1636 return -EINVAL;
b411b363
PR
1637 }
1638 }
1639
1640 D_ASSERT(data_size == 0);
28284cef 1641 return 0;
b411b363
PR
1642}
1643
a990be46
AG
1644/*
1645 * e_end_resync_block() is called in asender context via
1646 * drbd_finish_peer_reqs().
1647 */
99920dc5 1648static int e_end_resync_block(struct drbd_work *w, int unused)
b411b363 1649{
8050e6d0
AG
1650 struct drbd_peer_request *peer_req =
1651 container_of(w, struct drbd_peer_request, w);
b30ab791 1652 struct drbd_device *device = w->device;
db830c46 1653 sector_t sector = peer_req->i.sector;
99920dc5 1654 int err;
b411b363 1655
db830c46 1656 D_ASSERT(drbd_interval_empty(&peer_req->i));
b411b363 1657
db830c46 1658 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
b30ab791
AG
1659 drbd_set_in_sync(device, sector, peer_req->i.size);
1660 err = drbd_send_ack(device, P_RS_WRITE_ACK, peer_req);
b411b363
PR
1661 } else {
1662 /* Record failure to sync */
b30ab791 1663 drbd_rs_failed_io(device, sector, peer_req->i.size);
b411b363 1664
b30ab791 1665 err = drbd_send_ack(device, P_NEG_ACK, peer_req);
b411b363 1666 }
b30ab791 1667 dec_unacked(device);
b411b363 1668
99920dc5 1669 return err;
b411b363
PR
1670}
1671
b30ab791 1672static int recv_resync_read(struct drbd_device *device, sector_t sector, int data_size) __releases(local)
b411b363 1673{
db830c46 1674 struct drbd_peer_request *peer_req;
b411b363 1675
b30ab791 1676 peer_req = read_in_block(device, ID_SYNCER, sector, data_size);
db830c46 1677 if (!peer_req)
45bb912b 1678 goto fail;
b411b363 1679
b30ab791 1680 dec_rs_pending(device);
b411b363 1681
b30ab791 1682 inc_unacked(device);
b411b363
PR
1683 /* corresponding dec_unacked() in e_end_resync_block()
1684 * respective _drbd_clear_done_ee */
1685
db830c46 1686 peer_req->w.cb = e_end_resync_block;
45bb912b 1687
b30ab791
AG
1688 spin_lock_irq(&device->tconn->req_lock);
1689 list_add(&peer_req->w.list, &device->sync_ee);
1690 spin_unlock_irq(&device->tconn->req_lock);
b411b363 1691
b30ab791
AG
1692 atomic_add(data_size >> 9, &device->rs_sect_ev);
1693 if (drbd_submit_peer_request(device, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
e1c1b0fc 1694 return 0;
b411b363 1695
10f6d992
LE
1696 /* don't care for the reason here */
1697 dev_err(DEV, "submit failed, triggering re-connect\n");
b30ab791 1698 spin_lock_irq(&device->tconn->req_lock);
db830c46 1699 list_del(&peer_req->w.list);
b30ab791 1700 spin_unlock_irq(&device->tconn->req_lock);
22cc37a9 1701
b30ab791 1702 drbd_free_peer_req(device, peer_req);
45bb912b 1703fail:
b30ab791 1704 put_ldev(device);
e1c1b0fc 1705 return -EIO;
b411b363
PR
1706}
1707
668eebc6 1708static struct drbd_request *
b30ab791 1709find_request(struct drbd_device *device, struct rb_root *root, u64 id,
bc9c5c41 1710 sector_t sector, bool missing_ok, const char *func)
51624585 1711{
51624585
AG
1712 struct drbd_request *req;
1713
bc9c5c41
AG
1714 /* Request object according to our peer */
1715 req = (struct drbd_request *)(unsigned long)id;
5e472264 1716 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
668eebc6 1717 return req;
c3afd8f5 1718 if (!missing_ok) {
5af172ed 1719 dev_err(DEV, "%s: failed to find request 0x%lx, sector %llus\n", func,
c3afd8f5
AG
1720 (unsigned long)id, (unsigned long long)sector);
1721 }
51624585 1722 return NULL;
b411b363
PR
1723}
1724
4a76b161 1725static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 1726{
b30ab791 1727 struct drbd_device *device;
b411b363
PR
1728 struct drbd_request *req;
1729 sector_t sector;
82bc0194 1730 int err;
e658983a 1731 struct p_data *p = pi->data;
4a76b161 1732
b30ab791
AG
1733 device = vnr_to_device(tconn, pi->vnr);
1734 if (!device)
4a76b161 1735 return -EIO;
b411b363
PR
1736
1737 sector = be64_to_cpu(p->sector);
1738
b30ab791
AG
1739 spin_lock_irq(&device->tconn->req_lock);
1740 req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
1741 spin_unlock_irq(&device->tconn->req_lock);
c3afd8f5 1742 if (unlikely(!req))
82bc0194 1743 return -EIO;
b411b363 1744
24c4830c 1745 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
b411b363
PR
1746 * special casing it there for the various failure cases.
1747 * still no race with drbd_fail_pending_reads */
b30ab791 1748 err = recv_dless_read(device, req, sector, pi->size);
82bc0194 1749 if (!err)
8554df1c 1750 req_mod(req, DATA_RECEIVED);
b411b363
PR
1751 /* else: nothing. handled from drbd_disconnect...
1752 * I don't think we may complete this just yet
1753 * in case we are "on-disconnect: freeze" */
1754
82bc0194 1755 return err;
b411b363
PR
1756}
1757
4a76b161 1758static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 1759{
b30ab791 1760 struct drbd_device *device;
b411b363 1761 sector_t sector;
82bc0194 1762 int err;
e658983a 1763 struct p_data *p = pi->data;
4a76b161 1764
b30ab791
AG
1765 device = vnr_to_device(tconn, pi->vnr);
1766 if (!device)
4a76b161 1767 return -EIO;
b411b363
PR
1768
1769 sector = be64_to_cpu(p->sector);
1770 D_ASSERT(p->block_id == ID_SYNCER);
1771
b30ab791 1772 if (get_ldev(device)) {
b411b363
PR
1773 /* data is submitted to disk within recv_resync_read.
1774 * corresponding put_ldev done below on error,
fcefa62e 1775 * or in drbd_peer_request_endio. */
b30ab791 1776 err = recv_resync_read(device, sector, pi->size);
b411b363
PR
1777 } else {
1778 if (__ratelimit(&drbd_ratelimit_state))
1779 dev_err(DEV, "Can not write resync data to local disk.\n");
1780
b30ab791 1781 err = drbd_drain_block(device, pi->size);
b411b363 1782
b30ab791 1783 drbd_send_ack_dp(device, P_NEG_ACK, p, pi->size);
b411b363
PR
1784 }
1785
b30ab791 1786 atomic_add(pi->size >> 9, &device->rs_sect_in);
778f271d 1787
82bc0194 1788 return err;
b411b363
PR
1789}
1790
b30ab791 1791static void restart_conflicting_writes(struct drbd_device *device,
7be8da07 1792 sector_t sector, int size)
b411b363 1793{
7be8da07
AG
1794 struct drbd_interval *i;
1795 struct drbd_request *req;
1796
b30ab791 1797 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
7be8da07
AG
1798 if (!i->local)
1799 continue;
1800 req = container_of(i, struct drbd_request, i);
1801 if (req->rq_state & RQ_LOCAL_PENDING ||
1802 !(req->rq_state & RQ_POSTPONED))
1803 continue;
2312f0b3
LE
1804 /* as it is RQ_POSTPONED, this will cause it to
1805 * be queued on the retry workqueue. */
d4dabbe2 1806 __req_mod(req, CONFLICT_RESOLVED, NULL);
7be8da07
AG
1807 }
1808}
b411b363 1809
a990be46
AG
1810/*
1811 * e_end_block() is called in asender context via drbd_finish_peer_reqs().
b411b363 1812 */
99920dc5 1813static int e_end_block(struct drbd_work *w, int cancel)
b411b363 1814{
8050e6d0
AG
1815 struct drbd_peer_request *peer_req =
1816 container_of(w, struct drbd_peer_request, w);
b30ab791 1817 struct drbd_device *device = w->device;
db830c46 1818 sector_t sector = peer_req->i.sector;
99920dc5 1819 int err = 0, pcmd;
b411b363 1820
303d1448 1821 if (peer_req->flags & EE_SEND_WRITE_ACK) {
db830c46 1822 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
b30ab791
AG
1823 pcmd = (device->state.conn >= C_SYNC_SOURCE &&
1824 device->state.conn <= C_PAUSED_SYNC_T &&
db830c46 1825 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
b411b363 1826 P_RS_WRITE_ACK : P_WRITE_ACK;
b30ab791 1827 err = drbd_send_ack(device, pcmd, peer_req);
b411b363 1828 if (pcmd == P_RS_WRITE_ACK)
b30ab791 1829 drbd_set_in_sync(device, sector, peer_req->i.size);
b411b363 1830 } else {
b30ab791 1831 err = drbd_send_ack(device, P_NEG_ACK, peer_req);
b411b363
PR
1832 /* we expect it to be marked out of sync anyways...
1833 * maybe assert this? */
1834 }
b30ab791 1835 dec_unacked(device);
b411b363
PR
1836 }
1837 /* we delete from the conflict detection hash _after_ we sent out the
1838 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
302bdeae 1839 if (peer_req->flags & EE_IN_INTERVAL_TREE) {
b30ab791 1840 spin_lock_irq(&device->tconn->req_lock);
db830c46 1841 D_ASSERT(!drbd_interval_empty(&peer_req->i));
b30ab791 1842 drbd_remove_epoch_entry_interval(device, peer_req);
7be8da07 1843 if (peer_req->flags & EE_RESTART_REQUESTS)
b30ab791
AG
1844 restart_conflicting_writes(device, sector, peer_req->i.size);
1845 spin_unlock_irq(&device->tconn->req_lock);
bb3bfe96 1846 } else
db830c46 1847 D_ASSERT(drbd_interval_empty(&peer_req->i));
b411b363 1848
b30ab791 1849 drbd_may_finish_epoch(device->tconn, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
b411b363 1850
99920dc5 1851 return err;
b411b363
PR
1852}
1853
7be8da07 1854static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
b411b363 1855{
b30ab791 1856 struct drbd_device *device = w->device;
8050e6d0
AG
1857 struct drbd_peer_request *peer_req =
1858 container_of(w, struct drbd_peer_request, w);
99920dc5 1859 int err;
b411b363 1860
b30ab791
AG
1861 err = drbd_send_ack(device, ack, peer_req);
1862 dec_unacked(device);
b411b363 1863
99920dc5 1864 return err;
b411b363
PR
1865}
1866
d4dabbe2 1867static int e_send_superseded(struct drbd_work *w, int unused)
7be8da07 1868{
d4dabbe2 1869 return e_send_ack(w, P_SUPERSEDED);
7be8da07
AG
1870}
1871
99920dc5 1872static int e_send_retry_write(struct drbd_work *w, int unused)
7be8da07 1873{
b30ab791 1874 struct drbd_tconn *tconn = w->device->tconn;
7be8da07
AG
1875
1876 return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
d4dabbe2 1877 P_RETRY_WRITE : P_SUPERSEDED);
7be8da07 1878}
b411b363 1879
3e394da1
AG
1880static bool seq_greater(u32 a, u32 b)
1881{
1882 /*
1883 * We assume 32-bit wrap-around here.
1884 * For 24-bit wrap-around, we would have to shift:
1885 * a <<= 8; b <<= 8;
1886 */
1887 return (s32)a - (s32)b > 0;
1888}
b411b363 1889
3e394da1
AG
1890static u32 seq_max(u32 a, u32 b)
1891{
1892 return seq_greater(a, b) ? a : b;
b411b363
PR
1893}
1894
b30ab791 1895static void update_peer_seq(struct drbd_device *device, unsigned int peer_seq)
3e394da1 1896{
3c13b680 1897 unsigned int newest_peer_seq;
3e394da1 1898
b30ab791
AG
1899 if (test_bit(RESOLVE_CONFLICTS, &device->tconn->flags)) {
1900 spin_lock(&device->peer_seq_lock);
1901 newest_peer_seq = seq_max(device->peer_seq, peer_seq);
1902 device->peer_seq = newest_peer_seq;
1903 spin_unlock(&device->peer_seq_lock);
1904 /* wake up only if we actually changed device->peer_seq */
3c13b680 1905 if (peer_seq == newest_peer_seq)
b30ab791 1906 wake_up(&device->seq_wait);
7be8da07 1907 }
b411b363
PR
1908}
1909
d93f6302 1910static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
b6a370ba 1911{
d93f6302
LE
1912 return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
1913}
b6a370ba 1914
d93f6302 1915/* maybe change sync_ee into interval trees as well? */
b30ab791 1916static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
d93f6302
LE
1917{
1918 struct drbd_peer_request *rs_req;
b6a370ba
PR
1919 bool rv = 0;
1920
b30ab791
AG
1921 spin_lock_irq(&device->tconn->req_lock);
1922 list_for_each_entry(rs_req, &device->sync_ee, w.list) {
d93f6302
LE
1923 if (overlaps(peer_req->i.sector, peer_req->i.size,
1924 rs_req->i.sector, rs_req->i.size)) {
b6a370ba
PR
1925 rv = 1;
1926 break;
1927 }
1928 }
b30ab791 1929 spin_unlock_irq(&device->tconn->req_lock);
b6a370ba
PR
1930
1931 return rv;
1932}
1933
b411b363
PR
1934/* Called from receive_Data.
1935 * Synchronize packets on sock with packets on msock.
1936 *
1937 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1938 * packet traveling on msock, they are still processed in the order they have
1939 * been sent.
1940 *
1941 * Note: we don't care for Ack packets overtaking P_DATA packets.
1942 *
b30ab791 1943 * In case packet_seq is larger than device->peer_seq number, there are
b411b363 1944 * outstanding packets on the msock. We wait for them to arrive.
b30ab791 1945 * In case we are the logically next packet, we update device->peer_seq
b411b363
PR
1946 * ourselves. Correctly handles 32bit wrap around.
1947 *
1948 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1949 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1950 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1951 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1952 *
1953 * returns 0 if we may process the packet,
1954 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
b30ab791 1955static int wait_for_and_update_peer_seq(struct drbd_device *device, const u32 peer_seq)
b411b363
PR
1956{
1957 DEFINE_WAIT(wait);
b411b363 1958 long timeout;
b874d231 1959 int ret = 0, tp;
7be8da07 1960
b30ab791 1961 if (!test_bit(RESOLVE_CONFLICTS, &device->tconn->flags))
7be8da07
AG
1962 return 0;
1963
b30ab791 1964 spin_lock(&device->peer_seq_lock);
b411b363 1965 for (;;) {
b30ab791
AG
1966 if (!seq_greater(peer_seq - 1, device->peer_seq)) {
1967 device->peer_seq = seq_max(device->peer_seq, peer_seq);
b411b363 1968 break;
7be8da07 1969 }
b874d231 1970
b411b363
PR
1971 if (signal_pending(current)) {
1972 ret = -ERESTARTSYS;
1973 break;
1974 }
b874d231
PR
1975
1976 rcu_read_lock();
b30ab791 1977 tp = rcu_dereference(device->tconn->net_conf)->two_primaries;
b874d231
PR
1978 rcu_read_unlock();
1979
1980 if (!tp)
1981 break;
1982
1983 /* Only need to wait if two_primaries is enabled */
b30ab791
AG
1984 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
1985 spin_unlock(&device->peer_seq_lock);
44ed167d 1986 rcu_read_lock();
b30ab791 1987 timeout = rcu_dereference(device->tconn->net_conf)->ping_timeo*HZ/10;
44ed167d 1988 rcu_read_unlock();
71b1c1eb 1989 timeout = schedule_timeout(timeout);
b30ab791 1990 spin_lock(&device->peer_seq_lock);
7be8da07 1991 if (!timeout) {
b411b363 1992 ret = -ETIMEDOUT;
71b1c1eb 1993 dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
b411b363
PR
1994 break;
1995 }
1996 }
b30ab791
AG
1997 spin_unlock(&device->peer_seq_lock);
1998 finish_wait(&device->seq_wait, &wait);
b411b363
PR
1999 return ret;
2000}
2001
688593c5
LE
2002/* see also bio_flags_to_wire()
2003 * DRBD_REQ_*, because we need to semantically map the flags to data packet
2004 * flags and back. We may replicate to other kernel versions. */
b30ab791 2005static unsigned long wire_flags_to_bio(struct drbd_device *device, u32 dpf)
76d2e7ec 2006{
688593c5
LE
2007 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2008 (dpf & DP_FUA ? REQ_FUA : 0) |
2009 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
2010 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
76d2e7ec
PR
2011}
2012
b30ab791 2013static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
7be8da07
AG
2014 unsigned int size)
2015{
2016 struct drbd_interval *i;
2017
2018 repeat:
b30ab791 2019 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
7be8da07
AG
2020 struct drbd_request *req;
2021 struct bio_and_error m;
2022
2023 if (!i->local)
2024 continue;
2025 req = container_of(i, struct drbd_request, i);
2026 if (!(req->rq_state & RQ_POSTPONED))
2027 continue;
2028 req->rq_state &= ~RQ_POSTPONED;
2029 __req_mod(req, NEG_ACKED, &m);
b30ab791 2030 spin_unlock_irq(&device->tconn->req_lock);
7be8da07 2031 if (m.bio)
b30ab791
AG
2032 complete_master_bio(device, &m);
2033 spin_lock_irq(&device->tconn->req_lock);
7be8da07
AG
2034 goto repeat;
2035 }
2036}
2037
b30ab791 2038static int handle_write_conflicts(struct drbd_device *device,
7be8da07
AG
2039 struct drbd_peer_request *peer_req)
2040{
b30ab791 2041 struct drbd_tconn *tconn = device->tconn;
427c0434 2042 bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &tconn->flags);
7be8da07
AG
2043 sector_t sector = peer_req->i.sector;
2044 const unsigned int size = peer_req->i.size;
2045 struct drbd_interval *i;
2046 bool equal;
2047 int err;
2048
2049 /*
2050 * Inserting the peer request into the write_requests tree will prevent
2051 * new conflicting local requests from being added.
2052 */
b30ab791 2053 drbd_insert_interval(&device->write_requests, &peer_req->i);
7be8da07
AG
2054
2055 repeat:
b30ab791 2056 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
7be8da07
AG
2057 if (i == &peer_req->i)
2058 continue;
2059
2060 if (!i->local) {
2061 /*
2062 * Our peer has sent a conflicting remote request; this
2063 * should not happen in a two-node setup. Wait for the
2064 * earlier peer request to complete.
2065 */
b30ab791 2066 err = drbd_wait_misc(device, i);
7be8da07
AG
2067 if (err)
2068 goto out;
2069 goto repeat;
2070 }
2071
2072 equal = i->sector == sector && i->size == size;
2073 if (resolve_conflicts) {
2074 /*
2075 * If the peer request is fully contained within the
d4dabbe2
LE
2076 * overlapping request, it can be considered overwritten
2077 * and thus superseded; otherwise, it will be retried
2078 * once all overlapping requests have completed.
7be8da07 2079 */
d4dabbe2 2080 bool superseded = i->sector <= sector && i->sector +
7be8da07
AG
2081 (i->size >> 9) >= sector + (size >> 9);
2082
2083 if (!equal)
2084 dev_alert(DEV, "Concurrent writes detected: "
2085 "local=%llus +%u, remote=%llus +%u, "
2086 "assuming %s came first\n",
2087 (unsigned long long)i->sector, i->size,
2088 (unsigned long long)sector, size,
d4dabbe2 2089 superseded ? "local" : "remote");
7be8da07 2090
b30ab791 2091 inc_unacked(device);
d4dabbe2 2092 peer_req->w.cb = superseded ? e_send_superseded :
7be8da07 2093 e_send_retry_write;
b30ab791
AG
2094 list_add_tail(&peer_req->w.list, &device->done_ee);
2095 wake_asender(device->tconn);
7be8da07
AG
2096
2097 err = -ENOENT;
2098 goto out;
2099 } else {
2100 struct drbd_request *req =
2101 container_of(i, struct drbd_request, i);
2102
2103 if (!equal)
2104 dev_alert(DEV, "Concurrent writes detected: "
2105 "local=%llus +%u, remote=%llus +%u\n",
2106 (unsigned long long)i->sector, i->size,
2107 (unsigned long long)sector, size);
2108
2109 if (req->rq_state & RQ_LOCAL_PENDING ||
2110 !(req->rq_state & RQ_POSTPONED)) {
2111 /*
2112 * Wait for the node with the discard flag to
d4dabbe2
LE
2113 * decide if this request has been superseded
2114 * or needs to be retried.
2115 * Requests that have been superseded will
7be8da07
AG
2116 * disappear from the write_requests tree.
2117 *
2118 * In addition, wait for the conflicting
2119 * request to finish locally before submitting
2120 * the conflicting peer request.
2121 */
b30ab791 2122 err = drbd_wait_misc(device, &req->i);
7be8da07 2123 if (err) {
b30ab791 2124 _conn_request_state(device->tconn,
7be8da07
AG
2125 NS(conn, C_TIMEOUT),
2126 CS_HARD);
b30ab791 2127 fail_postponed_requests(device, sector, size);
7be8da07
AG
2128 goto out;
2129 }
2130 goto repeat;
2131 }
2132 /*
2133 * Remember to restart the conflicting requests after
2134 * the new peer request has completed.
2135 */
2136 peer_req->flags |= EE_RESTART_REQUESTS;
2137 }
2138 }
2139 err = 0;
2140
2141 out:
2142 if (err)
b30ab791 2143 drbd_remove_epoch_entry_interval(device, peer_req);
7be8da07
AG
2144 return err;
2145}
2146
b411b363 2147/* mirrored write */
4a76b161 2148static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 2149{
b30ab791 2150 struct drbd_device *device;
b411b363 2151 sector_t sector;
db830c46 2152 struct drbd_peer_request *peer_req;
e658983a 2153 struct p_data *p = pi->data;
7be8da07 2154 u32 peer_seq = be32_to_cpu(p->seq_num);
b411b363
PR
2155 int rw = WRITE;
2156 u32 dp_flags;
302bdeae 2157 int err, tp;
b411b363 2158
b30ab791
AG
2159 device = vnr_to_device(tconn, pi->vnr);
2160 if (!device)
4a76b161 2161 return -EIO;
b411b363 2162
b30ab791 2163 if (!get_ldev(device)) {
82bc0194
AG
2164 int err2;
2165
b30ab791
AG
2166 err = wait_for_and_update_peer_seq(device, peer_seq);
2167 drbd_send_ack_dp(device, P_NEG_ACK, p, pi->size);
12038a3a 2168 atomic_inc(&tconn->current_epoch->epoch_size);
b30ab791 2169 err2 = drbd_drain_block(device, pi->size);
82bc0194
AG
2170 if (!err)
2171 err = err2;
2172 return err;
b411b363
PR
2173 }
2174
fcefa62e
AG
2175 /*
2176 * Corresponding put_ldev done either below (on various errors), or in
2177 * drbd_peer_request_endio, if we successfully submit the data at the
2178 * end of this function.
2179 */
b411b363
PR
2180
2181 sector = be64_to_cpu(p->sector);
b30ab791 2182 peer_req = read_in_block(device, p->block_id, sector, pi->size);
db830c46 2183 if (!peer_req) {
b30ab791 2184 put_ldev(device);
82bc0194 2185 return -EIO;
b411b363
PR
2186 }
2187
db830c46 2188 peer_req->w.cb = e_end_block;
b411b363 2189
688593c5 2190 dp_flags = be32_to_cpu(p->dp_flags);
b30ab791 2191 rw |= wire_flags_to_bio(device, dp_flags);
81a3537a
LE
2192 if (peer_req->pages == NULL) {
2193 D_ASSERT(peer_req->i.size == 0);
a73ff323
LE
2194 D_ASSERT(dp_flags & DP_FLUSH);
2195 }
688593c5
LE
2196
2197 if (dp_flags & DP_MAY_SET_IN_SYNC)
db830c46 2198 peer_req->flags |= EE_MAY_SET_IN_SYNC;
688593c5 2199
12038a3a
PR
2200 spin_lock(&tconn->epoch_lock);
2201 peer_req->epoch = tconn->current_epoch;
db830c46
AG
2202 atomic_inc(&peer_req->epoch->epoch_size);
2203 atomic_inc(&peer_req->epoch->active);
12038a3a 2204 spin_unlock(&tconn->epoch_lock);
b411b363 2205
302bdeae 2206 rcu_read_lock();
b30ab791 2207 tp = rcu_dereference(device->tconn->net_conf)->two_primaries;
302bdeae
PR
2208 rcu_read_unlock();
2209 if (tp) {
2210 peer_req->flags |= EE_IN_INTERVAL_TREE;
b30ab791 2211 err = wait_for_and_update_peer_seq(device, peer_seq);
7be8da07 2212 if (err)
b411b363 2213 goto out_interrupted;
b30ab791
AG
2214 spin_lock_irq(&device->tconn->req_lock);
2215 err = handle_write_conflicts(device, peer_req);
7be8da07 2216 if (err) {
b30ab791 2217 spin_unlock_irq(&device->tconn->req_lock);
7be8da07 2218 if (err == -ENOENT) {
b30ab791 2219 put_ldev(device);
82bc0194 2220 return 0;
b411b363 2221 }
7be8da07 2222 goto out_interrupted;
b411b363 2223 }
b874d231 2224 } else {
b30ab791
AG
2225 update_peer_seq(device, peer_seq);
2226 spin_lock_irq(&device->tconn->req_lock);
b874d231 2227 }
b30ab791
AG
2228 list_add(&peer_req->w.list, &device->active_ee);
2229 spin_unlock_irq(&device->tconn->req_lock);
b411b363 2230
b30ab791
AG
2231 if (device->state.conn == C_SYNC_TARGET)
2232 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
b411b363 2233
b30ab791 2234 if (device->tconn->agreed_pro_version < 100) {
44ed167d 2235 rcu_read_lock();
b30ab791 2236 switch (rcu_dereference(device->tconn->net_conf)->wire_protocol) {
303d1448
PR
2237 case DRBD_PROT_C:
2238 dp_flags |= DP_SEND_WRITE_ACK;
2239 break;
2240 case DRBD_PROT_B:
2241 dp_flags |= DP_SEND_RECEIVE_ACK;
2242 break;
b411b363 2243 }
44ed167d 2244 rcu_read_unlock();
b411b363
PR
2245 }
2246
303d1448
PR
2247 if (dp_flags & DP_SEND_WRITE_ACK) {
2248 peer_req->flags |= EE_SEND_WRITE_ACK;
b30ab791 2249 inc_unacked(device);
b411b363
PR
2250 /* corresponding dec_unacked() in e_end_block()
2251 * respective _drbd_clear_done_ee */
303d1448
PR
2252 }
2253
2254 if (dp_flags & DP_SEND_RECEIVE_ACK) {
b411b363
PR
2255 /* I really don't like it that the receiver thread
2256 * sends on the msock, but anyways */
b30ab791 2257 drbd_send_ack(device, P_RECV_ACK, peer_req);
b411b363
PR
2258 }
2259
b30ab791 2260 if (device->state.pdsk < D_INCONSISTENT) {
b411b363 2261 /* In case we have the only disk of the cluster, */
b30ab791 2262 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
db830c46
AG
2263 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2264 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
b30ab791 2265 drbd_al_begin_io(device, &peer_req->i, true);
b411b363
PR
2266 }
2267
b30ab791 2268 err = drbd_submit_peer_request(device, peer_req, rw, DRBD_FAULT_DT_WR);
82bc0194
AG
2269 if (!err)
2270 return 0;
b411b363 2271
10f6d992
LE
2272 /* don't care for the reason here */
2273 dev_err(DEV, "submit failed, triggering re-connect\n");
b30ab791 2274 spin_lock_irq(&device->tconn->req_lock);
db830c46 2275 list_del(&peer_req->w.list);
b30ab791
AG
2276 drbd_remove_epoch_entry_interval(device, peer_req);
2277 spin_unlock_irq(&device->tconn->req_lock);
db830c46 2278 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
b30ab791 2279 drbd_al_complete_io(device, &peer_req->i);
22cc37a9 2280
b411b363 2281out_interrupted:
1e9dd291 2282 drbd_may_finish_epoch(tconn, peer_req->epoch, EV_PUT + EV_CLEANUP);
b30ab791
AG
2283 put_ldev(device);
2284 drbd_free_peer_req(device, peer_req);
82bc0194 2285 return err;
b411b363
PR
2286}
2287
0f0601f4
LE
2288/* We may throttle resync, if the lower device seems to be busy,
2289 * and current sync rate is above c_min_rate.
2290 *
2291 * To decide whether or not the lower device is busy, we use a scheme similar
2292 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2293 * (more than 64 sectors) of activity we cannot account for with our own resync
2294 * activity, it obviously is "busy".
2295 *
2296 * The current sync rate used here uses only the most recent two step marks,
2297 * to have a short time average so we can react faster.
2298 */
b30ab791 2299int drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector)
0f0601f4 2300{
b30ab791 2301 struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
0f0601f4 2302 unsigned long db, dt, dbdt;
e3555d85 2303 struct lc_element *tmp;
0f0601f4
LE
2304 int curr_events;
2305 int throttle = 0;
daeda1cc
PR
2306 unsigned int c_min_rate;
2307
2308 rcu_read_lock();
b30ab791 2309 c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
daeda1cc 2310 rcu_read_unlock();
0f0601f4
LE
2311
2312 /* feature disabled? */
daeda1cc 2313 if (c_min_rate == 0)
0f0601f4
LE
2314 return 0;
2315
b30ab791
AG
2316 spin_lock_irq(&device->al_lock);
2317 tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
e3555d85
PR
2318 if (tmp) {
2319 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2320 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
b30ab791 2321 spin_unlock_irq(&device->al_lock);
e3555d85
PR
2322 return 0;
2323 }
2324 /* Do not slow down if app IO is already waiting for this extent */
2325 }
b30ab791 2326 spin_unlock_irq(&device->al_lock);
e3555d85 2327
0f0601f4
LE
2328 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2329 (int)part_stat_read(&disk->part0, sectors[1]) -
b30ab791 2330 atomic_read(&device->rs_sect_ev);
e3555d85 2331
b30ab791 2332 if (!device->rs_last_events || curr_events - device->rs_last_events > 64) {
0f0601f4
LE
2333 unsigned long rs_left;
2334 int i;
2335
b30ab791 2336 device->rs_last_events = curr_events;
0f0601f4
LE
2337
2338 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2339 * approx. */
b30ab791 2340 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2649f080 2341
b30ab791
AG
2342 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2343 rs_left = device->ov_left;
2649f080 2344 else
b30ab791 2345 rs_left = drbd_bm_total_weight(device) - device->rs_failed;
0f0601f4 2346
b30ab791 2347 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
0f0601f4
LE
2348 if (!dt)
2349 dt++;
b30ab791 2350 db = device->rs_mark_left[i] - rs_left;
0f0601f4
LE
2351 dbdt = Bit2KB(db/dt);
2352
daeda1cc 2353 if (dbdt > c_min_rate)
0f0601f4
LE
2354 throttle = 1;
2355 }
2356 return throttle;
2357}
2358
2359
4a76b161 2360static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 2361{
b30ab791 2362 struct drbd_device *device;
b411b363 2363 sector_t sector;
4a76b161 2364 sector_t capacity;
db830c46 2365 struct drbd_peer_request *peer_req;
b411b363 2366 struct digest_info *di = NULL;
b18b37be 2367 int size, verb;
b411b363 2368 unsigned int fault_type;
e658983a 2369 struct p_block_req *p = pi->data;
4a76b161 2370
b30ab791
AG
2371 device = vnr_to_device(tconn, pi->vnr);
2372 if (!device)
4a76b161 2373 return -EIO;
b30ab791 2374 capacity = drbd_get_capacity(device->this_bdev);
b411b363
PR
2375
2376 sector = be64_to_cpu(p->sector);
2377 size = be32_to_cpu(p->blksize);
2378
c670a398 2379 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
b411b363
PR
2380 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2381 (unsigned long long)sector, size);
82bc0194 2382 return -EINVAL;
b411b363
PR
2383 }
2384 if (sector + (size>>9) > capacity) {
2385 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2386 (unsigned long long)sector, size);
82bc0194 2387 return -EINVAL;
b411b363
PR
2388 }
2389
b30ab791 2390 if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
b18b37be 2391 verb = 1;
e2857216 2392 switch (pi->cmd) {
b18b37be 2393 case P_DATA_REQUEST:
b30ab791 2394 drbd_send_ack_rp(device, P_NEG_DREPLY, p);
b18b37be
PR
2395 break;
2396 case P_RS_DATA_REQUEST:
2397 case P_CSUM_RS_REQUEST:
2398 case P_OV_REQUEST:
b30ab791 2399 drbd_send_ack_rp(device, P_NEG_RS_DREPLY , p);
b18b37be
PR
2400 break;
2401 case P_OV_REPLY:
2402 verb = 0;
b30ab791
AG
2403 dec_rs_pending(device);
2404 drbd_send_ack_ex(device, P_OV_RESULT, sector, size, ID_IN_SYNC);
b18b37be
PR
2405 break;
2406 default:
49ba9b1b 2407 BUG();
b18b37be
PR
2408 }
2409 if (verb && __ratelimit(&drbd_ratelimit_state))
b411b363
PR
2410 dev_err(DEV, "Can not satisfy peer's read request, "
2411 "no local data.\n");
b18b37be 2412
a821cc4a 2413 /* drain possibly payload */
b30ab791 2414 return drbd_drain_block(device, pi->size);
b411b363
PR
2415 }
2416
2417 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2418 * "criss-cross" setup, that might cause write-out on some other DRBD,
2419 * which in turn might block on the other node at this very place. */
b30ab791 2420 peer_req = drbd_alloc_peer_req(device, p->block_id, sector, size, GFP_NOIO);
db830c46 2421 if (!peer_req) {
b30ab791 2422 put_ldev(device);
82bc0194 2423 return -ENOMEM;
b411b363
PR
2424 }
2425
e2857216 2426 switch (pi->cmd) {
b411b363 2427 case P_DATA_REQUEST:
db830c46 2428 peer_req->w.cb = w_e_end_data_req;
b411b363 2429 fault_type = DRBD_FAULT_DT_RD;
80a40e43
LE
2430 /* application IO, don't drbd_rs_begin_io */
2431 goto submit;
2432
b411b363 2433 case P_RS_DATA_REQUEST:
db830c46 2434 peer_req->w.cb = w_e_end_rsdata_req;
b411b363 2435 fault_type = DRBD_FAULT_RS_RD;
5f9915bb 2436 /* used in the sector offset progress display */
b30ab791 2437 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
b411b363
PR
2438 break;
2439
2440 case P_OV_REPLY:
2441 case P_CSUM_RS_REQUEST:
2442 fault_type = DRBD_FAULT_RS_RD;
e2857216 2443 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
b411b363
PR
2444 if (!di)
2445 goto out_free_e;
2446
e2857216 2447 di->digest_size = pi->size;
b411b363
PR
2448 di->digest = (((char *)di)+sizeof(struct digest_info));
2449
db830c46
AG
2450 peer_req->digest = di;
2451 peer_req->flags |= EE_HAS_DIGEST;
c36c3ced 2452
b30ab791 2453 if (drbd_recv_all(device->tconn, di->digest, pi->size))
b411b363
PR
2454 goto out_free_e;
2455
e2857216 2456 if (pi->cmd == P_CSUM_RS_REQUEST) {
b30ab791 2457 D_ASSERT(device->tconn->agreed_pro_version >= 89);
db830c46 2458 peer_req->w.cb = w_e_end_csum_rs_req;
5f9915bb 2459 /* used in the sector offset progress display */
b30ab791 2460 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
e2857216 2461 } else if (pi->cmd == P_OV_REPLY) {
2649f080 2462 /* track progress, we may need to throttle */
b30ab791 2463 atomic_add(size >> 9, &device->rs_sect_in);
db830c46 2464 peer_req->w.cb = w_e_end_ov_reply;
b30ab791 2465 dec_rs_pending(device);
0f0601f4
LE
2466 /* drbd_rs_begin_io done when we sent this request,
2467 * but accounting still needs to be done. */
2468 goto submit_for_resync;
b411b363
PR
2469 }
2470 break;
2471
2472 case P_OV_REQUEST:
b30ab791
AG
2473 if (device->ov_start_sector == ~(sector_t)0 &&
2474 device->tconn->agreed_pro_version >= 90) {
de228bba
LE
2475 unsigned long now = jiffies;
2476 int i;
b30ab791
AG
2477 device->ov_start_sector = sector;
2478 device->ov_position = sector;
2479 device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2480 device->rs_total = device->ov_left;
de228bba 2481 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
b30ab791
AG
2482 device->rs_mark_left[i] = device->ov_left;
2483 device->rs_mark_time[i] = now;
de228bba 2484 }
b411b363
PR
2485 dev_info(DEV, "Online Verify start sector: %llu\n",
2486 (unsigned long long)sector);
2487 }
db830c46 2488 peer_req->w.cb = w_e_end_ov_req;
b411b363 2489 fault_type = DRBD_FAULT_RS_RD;
b411b363
PR
2490 break;
2491
b411b363 2492 default:
49ba9b1b 2493 BUG();
b411b363
PR
2494 }
2495
0f0601f4
LE
2496 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2497 * wrt the receiver, but it is not as straightforward as it may seem.
2498 * Various places in the resync start and stop logic assume resync
2499 * requests are processed in order, requeuing this on the worker thread
2500 * introduces a bunch of new code for synchronization between threads.
2501 *
2502 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2503 * "forever", throttling after drbd_rs_begin_io will lock that extent
2504 * for application writes for the same time. For now, just throttle
2505 * here, where the rest of the code expects the receiver to sleep for
2506 * a while, anyways.
2507 */
2508
2509 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2510 * this defers syncer requests for some time, before letting at least
2511 * on request through. The resync controller on the receiving side
2512 * will adapt to the incoming rate accordingly.
2513 *
2514 * We cannot throttle here if remote is Primary/SyncTarget:
2515 * we would also throttle its application reads.
2516 * In that case, throttling is done on the SyncTarget only.
2517 */
b30ab791 2518 if (device->state.peer != R_PRIMARY && drbd_rs_should_slow_down(device, sector))
e3555d85 2519 schedule_timeout_uninterruptible(HZ/10);
b30ab791 2520 if (drbd_rs_begin_io(device, sector))
80a40e43 2521 goto out_free_e;
b411b363 2522
0f0601f4 2523submit_for_resync:
b30ab791 2524 atomic_add(size >> 9, &device->rs_sect_ev);
0f0601f4 2525
80a40e43 2526submit:
b30ab791
AG
2527 inc_unacked(device);
2528 spin_lock_irq(&device->tconn->req_lock);
2529 list_add_tail(&peer_req->w.list, &device->read_ee);
2530 spin_unlock_irq(&device->tconn->req_lock);
b411b363 2531
b30ab791 2532 if (drbd_submit_peer_request(device, peer_req, READ, fault_type) == 0)
82bc0194 2533 return 0;
b411b363 2534
10f6d992
LE
2535 /* don't care for the reason here */
2536 dev_err(DEV, "submit failed, triggering re-connect\n");
b30ab791 2537 spin_lock_irq(&device->tconn->req_lock);
db830c46 2538 list_del(&peer_req->w.list);
b30ab791 2539 spin_unlock_irq(&device->tconn->req_lock);
22cc37a9
LE
2540 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2541
b411b363 2542out_free_e:
b30ab791
AG
2543 put_ldev(device);
2544 drbd_free_peer_req(device, peer_req);
82bc0194 2545 return -EIO;
b411b363
PR
2546}
2547
b30ab791 2548static int drbd_asb_recover_0p(struct drbd_device *device) __must_hold(local)
b411b363
PR
2549{
2550 int self, peer, rv = -100;
2551 unsigned long ch_self, ch_peer;
44ed167d 2552 enum drbd_after_sb_p after_sb_0p;
b411b363 2553
b30ab791
AG
2554 self = device->ldev->md.uuid[UI_BITMAP] & 1;
2555 peer = device->p_uuid[UI_BITMAP] & 1;
b411b363 2556
b30ab791
AG
2557 ch_peer = device->p_uuid[UI_SIZE];
2558 ch_self = device->comm_bm_set;
b411b363 2559
44ed167d 2560 rcu_read_lock();
b30ab791 2561 after_sb_0p = rcu_dereference(device->tconn->net_conf)->after_sb_0p;
44ed167d
PR
2562 rcu_read_unlock();
2563 switch (after_sb_0p) {
b411b363
PR
2564 case ASB_CONSENSUS:
2565 case ASB_DISCARD_SECONDARY:
2566 case ASB_CALL_HELPER:
44ed167d 2567 case ASB_VIOLENTLY:
b411b363
PR
2568 dev_err(DEV, "Configuration error.\n");
2569 break;
2570 case ASB_DISCONNECT:
2571 break;
2572 case ASB_DISCARD_YOUNGER_PRI:
2573 if (self == 0 && peer == 1) {
2574 rv = -1;
2575 break;
2576 }
2577 if (self == 1 && peer == 0) {
2578 rv = 1;
2579 break;
2580 }
2581 /* Else fall through to one of the other strategies... */
2582 case ASB_DISCARD_OLDER_PRI:
2583 if (self == 0 && peer == 1) {
2584 rv = 1;
2585 break;
2586 }
2587 if (self == 1 && peer == 0) {
2588 rv = -1;
2589 break;
2590 }
2591 /* Else fall through to one of the other strategies... */
ad19bf6e 2592 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
b411b363
PR
2593 "Using discard-least-changes instead\n");
2594 case ASB_DISCARD_ZERO_CHG:
2595 if (ch_peer == 0 && ch_self == 0) {
b30ab791 2596 rv = test_bit(RESOLVE_CONFLICTS, &device->tconn->flags)
b411b363
PR
2597 ? -1 : 1;
2598 break;
2599 } else {
2600 if (ch_peer == 0) { rv = 1; break; }
2601 if (ch_self == 0) { rv = -1; break; }
2602 }
44ed167d 2603 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
b411b363
PR
2604 break;
2605 case ASB_DISCARD_LEAST_CHG:
2606 if (ch_self < ch_peer)
2607 rv = -1;
2608 else if (ch_self > ch_peer)
2609 rv = 1;
2610 else /* ( ch_self == ch_peer ) */
2611 /* Well, then use something else. */
b30ab791 2612 rv = test_bit(RESOLVE_CONFLICTS, &device->tconn->flags)
b411b363
PR
2613 ? -1 : 1;
2614 break;
2615 case ASB_DISCARD_LOCAL:
2616 rv = -1;
2617 break;
2618 case ASB_DISCARD_REMOTE:
2619 rv = 1;
2620 }
2621
2622 return rv;
2623}
2624
b30ab791 2625static int drbd_asb_recover_1p(struct drbd_device *device) __must_hold(local)
b411b363 2626{
6184ea21 2627 int hg, rv = -100;
44ed167d 2628 enum drbd_after_sb_p after_sb_1p;
b411b363 2629
44ed167d 2630 rcu_read_lock();
b30ab791 2631 after_sb_1p = rcu_dereference(device->tconn->net_conf)->after_sb_1p;
44ed167d
PR
2632 rcu_read_unlock();
2633 switch (after_sb_1p) {
b411b363
PR
2634 case ASB_DISCARD_YOUNGER_PRI:
2635 case ASB_DISCARD_OLDER_PRI:
2636 case ASB_DISCARD_LEAST_CHG:
2637 case ASB_DISCARD_LOCAL:
2638 case ASB_DISCARD_REMOTE:
44ed167d 2639 case ASB_DISCARD_ZERO_CHG:
b411b363
PR
2640 dev_err(DEV, "Configuration error.\n");
2641 break;
2642 case ASB_DISCONNECT:
2643 break;
2644 case ASB_CONSENSUS:
b30ab791
AG
2645 hg = drbd_asb_recover_0p(device);
2646 if (hg == -1 && device->state.role == R_SECONDARY)
b411b363 2647 rv = hg;
b30ab791 2648 if (hg == 1 && device->state.role == R_PRIMARY)
b411b363
PR
2649 rv = hg;
2650 break;
2651 case ASB_VIOLENTLY:
b30ab791 2652 rv = drbd_asb_recover_0p(device);
b411b363
PR
2653 break;
2654 case ASB_DISCARD_SECONDARY:
b30ab791 2655 return device->state.role == R_PRIMARY ? 1 : -1;
b411b363 2656 case ASB_CALL_HELPER:
b30ab791
AG
2657 hg = drbd_asb_recover_0p(device);
2658 if (hg == -1 && device->state.role == R_PRIMARY) {
bb437946
AG
2659 enum drbd_state_rv rv2;
2660
b411b363
PR
2661 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2662 * we might be here in C_WF_REPORT_PARAMS which is transient.
2663 * we do not need to wait for the after state change work either. */
b30ab791 2664 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
bb437946 2665 if (rv2 != SS_SUCCESS) {
b30ab791 2666 drbd_khelper(device, "pri-lost-after-sb");
b411b363
PR
2667 } else {
2668 dev_warn(DEV, "Successfully gave up primary role.\n");
2669 rv = hg;
2670 }
2671 } else
2672 rv = hg;
2673 }
2674
2675 return rv;
2676}
2677
b30ab791 2678static int drbd_asb_recover_2p(struct drbd_device *device) __must_hold(local)
b411b363 2679{
6184ea21 2680 int hg, rv = -100;
44ed167d 2681 enum drbd_after_sb_p after_sb_2p;
b411b363 2682
44ed167d 2683 rcu_read_lock();
b30ab791 2684 after_sb_2p = rcu_dereference(device->tconn->net_conf)->after_sb_2p;
44ed167d
PR
2685 rcu_read_unlock();
2686 switch (after_sb_2p) {
b411b363
PR
2687 case ASB_DISCARD_YOUNGER_PRI:
2688 case ASB_DISCARD_OLDER_PRI:
2689 case ASB_DISCARD_LEAST_CHG:
2690 case ASB_DISCARD_LOCAL:
2691 case ASB_DISCARD_REMOTE:
2692 case ASB_CONSENSUS:
2693 case ASB_DISCARD_SECONDARY:
44ed167d 2694 case ASB_DISCARD_ZERO_CHG:
b411b363
PR
2695 dev_err(DEV, "Configuration error.\n");
2696 break;
2697 case ASB_VIOLENTLY:
b30ab791 2698 rv = drbd_asb_recover_0p(device);
b411b363
PR
2699 break;
2700 case ASB_DISCONNECT:
2701 break;
2702 case ASB_CALL_HELPER:
b30ab791 2703 hg = drbd_asb_recover_0p(device);
b411b363 2704 if (hg == -1) {
bb437946
AG
2705 enum drbd_state_rv rv2;
2706
b411b363
PR
2707 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2708 * we might be here in C_WF_REPORT_PARAMS which is transient.
2709 * we do not need to wait for the after state change work either. */
b30ab791 2710 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
bb437946 2711 if (rv2 != SS_SUCCESS) {
b30ab791 2712 drbd_khelper(device, "pri-lost-after-sb");
b411b363
PR
2713 } else {
2714 dev_warn(DEV, "Successfully gave up primary role.\n");
2715 rv = hg;
2716 }
2717 } else
2718 rv = hg;
2719 }
2720
2721 return rv;
2722}
2723
b30ab791 2724static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
b411b363
PR
2725 u64 bits, u64 flags)
2726{
2727 if (!uuid) {
2728 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2729 return;
2730 }
2731 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2732 text,
2733 (unsigned long long)uuid[UI_CURRENT],
2734 (unsigned long long)uuid[UI_BITMAP],
2735 (unsigned long long)uuid[UI_HISTORY_START],
2736 (unsigned long long)uuid[UI_HISTORY_END],
2737 (unsigned long long)bits,
2738 (unsigned long long)flags);
2739}
2740
2741/*
2742 100 after split brain try auto recover
2743 2 C_SYNC_SOURCE set BitMap
2744 1 C_SYNC_SOURCE use BitMap
2745 0 no Sync
2746 -1 C_SYNC_TARGET use BitMap
2747 -2 C_SYNC_TARGET set BitMap
2748 -100 after split brain, disconnect
2749-1000 unrelated data
4a23f264
PR
2750-1091 requires proto 91
2751-1096 requires proto 96
b411b363 2752 */
b30ab791 2753static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_hold(local)
b411b363
PR
2754{
2755 u64 self, peer;
2756 int i, j;
2757
b30ab791
AG
2758 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2759 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
b411b363
PR
2760
2761 *rule_nr = 10;
2762 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2763 return 0;
2764
2765 *rule_nr = 20;
2766 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2767 peer != UUID_JUST_CREATED)
2768 return -2;
2769
2770 *rule_nr = 30;
2771 if (self != UUID_JUST_CREATED &&
2772 (peer == UUID_JUST_CREATED || peer == (u64)0))
2773 return 2;
2774
2775 if (self == peer) {
2776 int rct, dc; /* roles at crash time */
2777
b30ab791 2778 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
b411b363 2779
b30ab791 2780 if (device->tconn->agreed_pro_version < 91)
4a23f264 2781 return -1091;
b411b363 2782
b30ab791
AG
2783 if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2784 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
b411b363 2785 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
b30ab791
AG
2786 drbd_uuid_move_history(device);
2787 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
2788 device->ldev->md.uuid[UI_BITMAP] = 0;
b411b363 2789
b30ab791
AG
2790 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
2791 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
b411b363
PR
2792 *rule_nr = 34;
2793 } else {
2794 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2795 *rule_nr = 36;
2796 }
2797
2798 return 1;
2799 }
2800
b30ab791 2801 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
b411b363 2802
b30ab791 2803 if (device->tconn->agreed_pro_version < 91)
4a23f264 2804 return -1091;
b411b363 2805
b30ab791
AG
2806 if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2807 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
b411b363
PR
2808 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2809
b30ab791
AG
2810 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
2811 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
2812 device->p_uuid[UI_BITMAP] = 0UL;
b411b363 2813
b30ab791 2814 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
b411b363
PR
2815 *rule_nr = 35;
2816 } else {
2817 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2818 *rule_nr = 37;
2819 }
2820
2821 return -1;
2822 }
2823
2824 /* Common power [off|failure] */
b30ab791
AG
2825 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
2826 (device->p_uuid[UI_FLAGS] & 2);
b411b363
PR
2827 /* lowest bit is set when we were primary,
2828 * next bit (weight 2) is set when peer was primary */
2829 *rule_nr = 40;
2830
2831 switch (rct) {
2832 case 0: /* !self_pri && !peer_pri */ return 0;
2833 case 1: /* self_pri && !peer_pri */ return 1;
2834 case 2: /* !self_pri && peer_pri */ return -1;
2835 case 3: /* self_pri && peer_pri */
b30ab791 2836 dc = test_bit(RESOLVE_CONFLICTS, &device->tconn->flags);
b411b363
PR
2837 return dc ? -1 : 1;
2838 }
2839 }
2840
2841 *rule_nr = 50;
b30ab791 2842 peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
b411b363
PR
2843 if (self == peer)
2844 return -1;
2845
2846 *rule_nr = 51;
b30ab791 2847 peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
b411b363 2848 if (self == peer) {
b30ab791
AG
2849 if (device->tconn->agreed_pro_version < 96 ?
2850 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2851 (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2852 peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2853 /* The last P_SYNC_UUID did not get though. Undo the last start of
2854 resync as sync source modifications of the peer's UUIDs. */
2855
b30ab791 2856 if (device->tconn->agreed_pro_version < 91)
4a23f264 2857 return -1091;
b411b363 2858
b30ab791
AG
2859 device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
2860 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
4a23f264 2861
92b4ca29 2862 dev_info(DEV, "Lost last syncUUID packet, corrected:\n");
b30ab791 2863 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
4a23f264 2864
b411b363
PR
2865 return -1;
2866 }
2867 }
2868
2869 *rule_nr = 60;
b30ab791 2870 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
b411b363 2871 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
b30ab791 2872 peer = device->p_uuid[i] & ~((u64)1);
b411b363
PR
2873 if (self == peer)
2874 return -2;
2875 }
2876
2877 *rule_nr = 70;
b30ab791
AG
2878 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2879 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
b411b363
PR
2880 if (self == peer)
2881 return 1;
2882
2883 *rule_nr = 71;
b30ab791 2884 self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
b411b363 2885 if (self == peer) {
b30ab791
AG
2886 if (device->tconn->agreed_pro_version < 96 ?
2887 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2888 (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2889 self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
b411b363
PR
2890 /* The last P_SYNC_UUID did not get though. Undo the last start of
2891 resync as sync source modifications of our UUIDs. */
2892
b30ab791 2893 if (device->tconn->agreed_pro_version < 91)
4a23f264 2894 return -1091;
b411b363 2895
b30ab791
AG
2896 __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
2897 __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
b411b363 2898
4a23f264 2899 dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
b30ab791
AG
2900 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
2901 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
b411b363
PR
2902
2903 return 1;
2904 }
2905 }
2906
2907
2908 *rule_nr = 80;
b30ab791 2909 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
b411b363 2910 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
b30ab791 2911 self = device->ldev->md.uuid[i] & ~((u64)1);
b411b363
PR
2912 if (self == peer)
2913 return 2;
2914 }
2915
2916 *rule_nr = 90;
b30ab791
AG
2917 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2918 peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
b411b363
PR
2919 if (self == peer && self != ((u64)0))
2920 return 100;
2921
2922 *rule_nr = 100;
2923 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
b30ab791 2924 self = device->ldev->md.uuid[i] & ~((u64)1);
b411b363 2925 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
b30ab791 2926 peer = device->p_uuid[j] & ~((u64)1);
b411b363
PR
2927 if (self == peer)
2928 return -100;
2929 }
2930 }
2931
2932 return -1000;
2933}
2934
2935/* drbd_sync_handshake() returns the new conn state on success, or
2936 CONN_MASK (-1) on failure.
2937 */
b30ab791 2938static enum drbd_conns drbd_sync_handshake(struct drbd_device *device, enum drbd_role peer_role,
b411b363
PR
2939 enum drbd_disk_state peer_disk) __must_hold(local)
2940{
b411b363
PR
2941 enum drbd_conns rv = C_MASK;
2942 enum drbd_disk_state mydisk;
44ed167d 2943 struct net_conf *nc;
6dff2902 2944 int hg, rule_nr, rr_conflict, tentative;
b411b363 2945
b30ab791 2946 mydisk = device->state.disk;
b411b363 2947 if (mydisk == D_NEGOTIATING)
b30ab791 2948 mydisk = device->new_state_tmp.disk;
b411b363
PR
2949
2950 dev_info(DEV, "drbd_sync_handshake:\n");
9f2247bb 2951
b30ab791
AG
2952 spin_lock_irq(&device->ldev->md.uuid_lock);
2953 drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
2954 drbd_uuid_dump(device, "peer", device->p_uuid,
2955 device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
b411b363 2956
b30ab791
AG
2957 hg = drbd_uuid_compare(device, &rule_nr);
2958 spin_unlock_irq(&device->ldev->md.uuid_lock);
b411b363
PR
2959
2960 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2961
2962 if (hg == -1000) {
2963 dev_alert(DEV, "Unrelated data, aborting!\n");
2964 return C_MASK;
2965 }
4a23f264
PR
2966 if (hg < -1000) {
2967 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
b411b363
PR
2968 return C_MASK;
2969 }
2970
2971 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2972 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2973 int f = (hg == -100) || abs(hg) == 2;
2974 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2975 if (f)
2976 hg = hg*2;
2977 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2978 hg > 0 ? "source" : "target");
2979 }
2980
3a11a487 2981 if (abs(hg) == 100)
b30ab791 2982 drbd_khelper(device, "initial-split-brain");
3a11a487 2983
44ed167d 2984 rcu_read_lock();
b30ab791 2985 nc = rcu_dereference(device->tconn->net_conf);
44ed167d
PR
2986
2987 if (hg == 100 || (hg == -100 && nc->always_asbp)) {
b30ab791 2988 int pcount = (device->state.role == R_PRIMARY)
b411b363
PR
2989 + (peer_role == R_PRIMARY);
2990 int forced = (hg == -100);
2991
2992 switch (pcount) {
2993 case 0:
b30ab791 2994 hg = drbd_asb_recover_0p(device);
b411b363
PR
2995 break;
2996 case 1:
b30ab791 2997 hg = drbd_asb_recover_1p(device);
b411b363
PR
2998 break;
2999 case 2:
b30ab791 3000 hg = drbd_asb_recover_2p(device);
b411b363
PR
3001 break;
3002 }
3003 if (abs(hg) < 100) {
3004 dev_warn(DEV, "Split-Brain detected, %d primaries, "
3005 "automatically solved. Sync from %s node\n",
3006 pcount, (hg < 0) ? "peer" : "this");
3007 if (forced) {
3008 dev_warn(DEV, "Doing a full sync, since"
3009 " UUIDs where ambiguous.\n");
3010 hg = hg*2;
3011 }
3012 }
3013 }
3014
3015 if (hg == -100) {
b30ab791 3016 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
b411b363 3017 hg = -1;
b30ab791 3018 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
b411b363
PR
3019 hg = 1;
3020
3021 if (abs(hg) < 100)
3022 dev_warn(DEV, "Split-Brain detected, manually solved. "
3023 "Sync from %s node\n",
3024 (hg < 0) ? "peer" : "this");
3025 }
44ed167d 3026 rr_conflict = nc->rr_conflict;
6dff2902 3027 tentative = nc->tentative;
44ed167d 3028 rcu_read_unlock();
b411b363
PR
3029
3030 if (hg == -100) {
580b9767
LE
3031 /* FIXME this log message is not correct if we end up here
3032 * after an attempted attach on a diskless node.
3033 * We just refuse to attach -- well, we drop the "connection"
3034 * to that disk, in a way... */
3a11a487 3035 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
b30ab791 3036 drbd_khelper(device, "split-brain");
b411b363
PR
3037 return C_MASK;
3038 }
3039
3040 if (hg > 0 && mydisk <= D_INCONSISTENT) {
3041 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
3042 return C_MASK;
3043 }
3044
3045 if (hg < 0 && /* by intention we do not use mydisk here. */
b30ab791 3046 device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
44ed167d 3047 switch (rr_conflict) {
b411b363 3048 case ASB_CALL_HELPER:
b30ab791 3049 drbd_khelper(device, "pri-lost");
b411b363
PR
3050 /* fall through */
3051 case ASB_DISCONNECT:
3052 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
3053 return C_MASK;
3054 case ASB_VIOLENTLY:
3055 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
3056 "assumption\n");
3057 }
3058 }
3059
b30ab791 3060 if (tentative || test_bit(CONN_DRY_RUN, &device->tconn->flags)) {
cf14c2e9
PR
3061 if (hg == 0)
3062 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
3063 else
3064 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
3065 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3066 abs(hg) >= 2 ? "full" : "bit-map based");
3067 return C_MASK;
3068 }
3069
b411b363
PR
3070 if (abs(hg) >= 2) {
3071 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
b30ab791 3072 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
20ceb2b2 3073 BM_LOCKED_SET_ALLOWED))
b411b363
PR
3074 return C_MASK;
3075 }
3076
3077 if (hg > 0) { /* become sync source. */
3078 rv = C_WF_BITMAP_S;
3079 } else if (hg < 0) { /* become sync target */
3080 rv = C_WF_BITMAP_T;
3081 } else {
3082 rv = C_CONNECTED;
b30ab791 3083 if (drbd_bm_total_weight(device)) {
b411b363 3084 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
b30ab791 3085 drbd_bm_total_weight(device));
b411b363
PR
3086 }
3087 }
3088
3089 return rv;
3090}
3091
f179d76d 3092static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
b411b363
PR
3093{
3094 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
f179d76d
PR
3095 if (peer == ASB_DISCARD_REMOTE)
3096 return ASB_DISCARD_LOCAL;
b411b363
PR
3097
3098 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
f179d76d
PR
3099 if (peer == ASB_DISCARD_LOCAL)
3100 return ASB_DISCARD_REMOTE;
b411b363
PR
3101
3102 /* everything else is valid if they are equal on both sides. */
f179d76d 3103 return peer;
b411b363
PR
3104}
3105
e2857216 3106static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 3107{
e658983a 3108 struct p_protocol *p = pi->data;
036b17ea
PR
3109 enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3110 int p_proto, p_discard_my_data, p_two_primaries, cf;
3111 struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3112 char integrity_alg[SHARED_SECRET_MAX] = "";
accdbcc5 3113 struct crypto_hash *peer_integrity_tfm = NULL;
7aca6c75 3114 void *int_dig_in = NULL, *int_dig_vv = NULL;
b411b363 3115
b411b363
PR
3116 p_proto = be32_to_cpu(p->protocol);
3117 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
3118 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
3119 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
b411b363 3120 p_two_primaries = be32_to_cpu(p->two_primaries);
cf14c2e9 3121 cf = be32_to_cpu(p->conn_flags);
6139f60d 3122 p_discard_my_data = cf & CF_DISCARD_MY_DATA;
cf14c2e9 3123
86db0618
AG
3124 if (tconn->agreed_pro_version >= 87) {
3125 int err;
cf14c2e9 3126
88104ca4 3127 if (pi->size > sizeof(integrity_alg))
86db0618 3128 return -EIO;
88104ca4 3129 err = drbd_recv_all(tconn, integrity_alg, pi->size);
86db0618
AG
3130 if (err)
3131 return err;
036b17ea 3132 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
b411b363
PR
3133 }
3134
7d4c782c 3135 if (pi->cmd != P_PROTOCOL_UPDATE) {
fbc12f45 3136 clear_bit(CONN_DRY_RUN, &tconn->flags);
b411b363 3137
fbc12f45
AG
3138 if (cf & CF_DRY_RUN)
3139 set_bit(CONN_DRY_RUN, &tconn->flags);
b411b363 3140
fbc12f45
AG
3141 rcu_read_lock();
3142 nc = rcu_dereference(tconn->net_conf);
b411b363 3143
fbc12f45 3144 if (p_proto != nc->wire_protocol) {
d505d9be 3145 conn_err(tconn, "incompatible %s settings\n", "protocol");
fbc12f45
AG
3146 goto disconnect_rcu_unlock;
3147 }
b411b363 3148
fbc12f45 3149 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
d505d9be 3150 conn_err(tconn, "incompatible %s settings\n", "after-sb-0pri");
fbc12f45
AG
3151 goto disconnect_rcu_unlock;
3152 }
b411b363 3153
fbc12f45 3154 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
d505d9be 3155 conn_err(tconn, "incompatible %s settings\n", "after-sb-1pri");
fbc12f45
AG
3156 goto disconnect_rcu_unlock;
3157 }
b411b363 3158
fbc12f45 3159 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
d505d9be 3160 conn_err(tconn, "incompatible %s settings\n", "after-sb-2pri");
fbc12f45
AG
3161 goto disconnect_rcu_unlock;
3162 }
b411b363 3163
fbc12f45 3164 if (p_discard_my_data && nc->discard_my_data) {
d505d9be 3165 conn_err(tconn, "incompatible %s settings\n", "discard-my-data");
fbc12f45
AG
3166 goto disconnect_rcu_unlock;
3167 }
b411b363 3168
fbc12f45 3169 if (p_two_primaries != nc->two_primaries) {
d505d9be 3170 conn_err(tconn, "incompatible %s settings\n", "allow-two-primaries");
fbc12f45
AG
3171 goto disconnect_rcu_unlock;
3172 }
b411b363 3173
fbc12f45 3174 if (strcmp(integrity_alg, nc->integrity_alg)) {
d505d9be 3175 conn_err(tconn, "incompatible %s settings\n", "data-integrity-alg");
fbc12f45
AG
3176 goto disconnect_rcu_unlock;
3177 }
b411b363 3178
fbc12f45 3179 rcu_read_unlock();
b411b363
PR
3180 }
3181
7d4c782c
AG
3182 if (integrity_alg[0]) {
3183 int hash_size;
3184
3185 /*
3186 * We can only change the peer data integrity algorithm
3187 * here. Changing our own data integrity algorithm
3188 * requires that we send a P_PROTOCOL_UPDATE packet at
3189 * the same time; otherwise, the peer has no way to
3190 * tell between which packets the algorithm should
3191 * change.
3192 */
b411b363 3193
7d4c782c
AG
3194 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3195 if (!peer_integrity_tfm) {
3196 conn_err(tconn, "peer data-integrity-alg %s not supported\n",
3197 integrity_alg);
3198 goto disconnect;
3199 }
b411b363 3200
7d4c782c
AG
3201 hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3202 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3203 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3204 if (!(int_dig_in && int_dig_vv)) {
3205 conn_err(tconn, "Allocation of buffers for data integrity checking failed\n");
b411b363
PR
3206 goto disconnect;
3207 }
b411b363
PR
3208 }
3209
7d4c782c
AG
3210 new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3211 if (!new_net_conf) {
3212 conn_err(tconn, "Allocation of new net_conf failed\n");
3213 goto disconnect;
3214 }
3215
3216 mutex_lock(&tconn->data.mutex);
3217 mutex_lock(&tconn->conf_update);
3218 old_net_conf = tconn->net_conf;
3219 *new_net_conf = *old_net_conf;
3220
3221 new_net_conf->wire_protocol = p_proto;
3222 new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3223 new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3224 new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3225 new_net_conf->two_primaries = p_two_primaries;
3226
3227 rcu_assign_pointer(tconn->net_conf, new_net_conf);
3228 mutex_unlock(&tconn->conf_update);
3229 mutex_unlock(&tconn->data.mutex);
3230
3231 crypto_free_hash(tconn->peer_integrity_tfm);
3232 kfree(tconn->int_dig_in);
3233 kfree(tconn->int_dig_vv);
3234 tconn->peer_integrity_tfm = peer_integrity_tfm;
3235 tconn->int_dig_in = int_dig_in;
3236 tconn->int_dig_vv = int_dig_vv;
3237
3238 if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3239 conn_info(tconn, "peer data-integrity-alg: %s\n",
3240 integrity_alg[0] ? integrity_alg : "(none)");
3241
3242 synchronize_rcu();
3243 kfree(old_net_conf);
82bc0194 3244 return 0;
b411b363 3245
44ed167d
PR
3246disconnect_rcu_unlock:
3247 rcu_read_unlock();
b411b363 3248disconnect:
b792c35c 3249 crypto_free_hash(peer_integrity_tfm);
036b17ea
PR
3250 kfree(int_dig_in);
3251 kfree(int_dig_vv);
7204624c 3252 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 3253 return -EIO;
b411b363
PR
3254}
3255
3256/* helper function
3257 * input: alg name, feature name
3258 * return: NULL (alg name was "")
3259 * ERR_PTR(error) if something goes wrong
3260 * or the crypto hash ptr, if it worked out ok. */
f63e631a 3261static
b30ab791 3262struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
b411b363
PR
3263 const char *alg, const char *name)
3264{
3265 struct crypto_hash *tfm;
3266
3267 if (!alg[0])
3268 return NULL;
3269
3270 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3271 if (IS_ERR(tfm)) {
3272 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3273 alg, name, PTR_ERR(tfm));
3274 return tfm;
3275 }
b411b363
PR
3276 return tfm;
3277}
3278
4a76b161
AG
3279static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3280{
3281 void *buffer = tconn->data.rbuf;
3282 int size = pi->size;
3283
3284 while (size) {
3285 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3286 s = drbd_recv(tconn, buffer, s);
3287 if (s <= 0) {
3288 if (s < 0)
3289 return s;
3290 break;
3291 }
3292 size -= s;
3293 }
3294 if (size)
3295 return -EIO;
3296 return 0;
3297}
3298
3299/*
3300 * config_unknown_volume - device configuration command for unknown volume
3301 *
3302 * When a device is added to an existing connection, the node on which the
3303 * device is added first will send configuration commands to its peer but the
3304 * peer will not know about the device yet. It will warn and ignore these
3305 * commands. Once the device is added on the second node, the second node will
3306 * send the same device configuration commands, but in the other direction.
3307 *
3308 * (We can also end up here if drbd is misconfigured.)
3309 */
3310static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3311{
2fcb8f30
AG
3312 conn_warn(tconn, "%s packet received for volume %u, which is not configured locally\n",
3313 cmdname(pi->cmd), pi->vnr);
4a76b161
AG
3314 return ignore_remaining_packet(tconn, pi);
3315}
3316
3317static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 3318{
b30ab791 3319 struct drbd_device *device;
e658983a 3320 struct p_rs_param_95 *p;
b411b363
PR
3321 unsigned int header_size, data_size, exp_max_sz;
3322 struct crypto_hash *verify_tfm = NULL;
3323 struct crypto_hash *csums_tfm = NULL;
2ec91e0e 3324 struct net_conf *old_net_conf, *new_net_conf = NULL;
813472ce 3325 struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
4a76b161 3326 const int apv = tconn->agreed_pro_version;
813472ce 3327 struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
778f271d 3328 int fifo_size = 0;
82bc0194 3329 int err;
b411b363 3330
b30ab791
AG
3331 device = vnr_to_device(tconn, pi->vnr);
3332 if (!device)
4a76b161 3333 return config_unknown_volume(tconn, pi);
b411b363
PR
3334
3335 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
3336 : apv == 88 ? sizeof(struct p_rs_param)
3337 + SHARED_SECRET_MAX
8e26f9cc
PR
3338 : apv <= 94 ? sizeof(struct p_rs_param_89)
3339 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
b411b363 3340
e2857216 3341 if (pi->size > exp_max_sz) {
b411b363 3342 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
e2857216 3343 pi->size, exp_max_sz);
82bc0194 3344 return -EIO;
b411b363
PR
3345 }
3346
3347 if (apv <= 88) {
e658983a 3348 header_size = sizeof(struct p_rs_param);
e2857216 3349 data_size = pi->size - header_size;
8e26f9cc 3350 } else if (apv <= 94) {
e658983a 3351 header_size = sizeof(struct p_rs_param_89);
e2857216 3352 data_size = pi->size - header_size;
b411b363 3353 D_ASSERT(data_size == 0);
8e26f9cc 3354 } else {
e658983a 3355 header_size = sizeof(struct p_rs_param_95);
e2857216 3356 data_size = pi->size - header_size;
b411b363
PR
3357 D_ASSERT(data_size == 0);
3358 }
3359
3360 /* initialize verify_alg and csums_alg */
e658983a 3361 p = pi->data;
b411b363
PR
3362 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3363
b30ab791 3364 err = drbd_recv_all(device->tconn, p, header_size);
82bc0194
AG
3365 if (err)
3366 return err;
b411b363 3367
b30ab791
AG
3368 mutex_lock(&device->tconn->conf_update);
3369 old_net_conf = device->tconn->net_conf;
3370 if (get_ldev(device)) {
813472ce
PR
3371 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3372 if (!new_disk_conf) {
b30ab791
AG
3373 put_ldev(device);
3374 mutex_unlock(&device->tconn->conf_update);
813472ce
PR
3375 dev_err(DEV, "Allocation of new disk_conf failed\n");
3376 return -ENOMEM;
3377 }
daeda1cc 3378
b30ab791 3379 old_disk_conf = device->ldev->disk_conf;
813472ce 3380 *new_disk_conf = *old_disk_conf;
b411b363 3381
6394b935 3382 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
813472ce 3383 }
b411b363
PR
3384
3385 if (apv >= 88) {
3386 if (apv == 88) {
5de73827
PR
3387 if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3388 dev_err(DEV, "verify-alg of wrong size, "
3389 "peer wants %u, accepting only up to %u byte\n",
3390 data_size, SHARED_SECRET_MAX);
813472ce
PR
3391 err = -EIO;
3392 goto reconnect;
b411b363
PR
3393 }
3394
b30ab791 3395 err = drbd_recv_all(device->tconn, p->verify_alg, data_size);
813472ce
PR
3396 if (err)
3397 goto reconnect;
b411b363
PR
3398 /* we expect NUL terminated string */
3399 /* but just in case someone tries to be evil */
3400 D_ASSERT(p->verify_alg[data_size-1] == 0);
3401 p->verify_alg[data_size-1] = 0;
3402
3403 } else /* apv >= 89 */ {
3404 /* we still expect NUL terminated strings */
3405 /* but just in case someone tries to be evil */
3406 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3407 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3408 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3409 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3410 }
3411
2ec91e0e 3412 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
b30ab791 3413 if (device->state.conn == C_WF_REPORT_PARAMS) {
b411b363 3414 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2ec91e0e 3415 old_net_conf->verify_alg, p->verify_alg);
b411b363
PR
3416 goto disconnect;
3417 }
b30ab791 3418 verify_tfm = drbd_crypto_alloc_digest_safe(device,
b411b363
PR
3419 p->verify_alg, "verify-alg");
3420 if (IS_ERR(verify_tfm)) {
3421 verify_tfm = NULL;
3422 goto disconnect;
3423 }
3424 }
3425
2ec91e0e 3426 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
b30ab791 3427 if (device->state.conn == C_WF_REPORT_PARAMS) {
b411b363 3428 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2ec91e0e 3429 old_net_conf->csums_alg, p->csums_alg);
b411b363
PR
3430 goto disconnect;
3431 }
b30ab791 3432 csums_tfm = drbd_crypto_alloc_digest_safe(device,
b411b363
PR
3433 p->csums_alg, "csums-alg");
3434 if (IS_ERR(csums_tfm)) {
3435 csums_tfm = NULL;
3436 goto disconnect;
3437 }
3438 }
3439
813472ce 3440 if (apv > 94 && new_disk_conf) {
daeda1cc
PR
3441 new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3442 new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3443 new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3444 new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
778f271d 3445
daeda1cc 3446 fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
b30ab791 3447 if (fifo_size != device->rs_plan_s->size) {
813472ce
PR
3448 new_plan = fifo_alloc(fifo_size);
3449 if (!new_plan) {
778f271d 3450 dev_err(DEV, "kmalloc of fifo_buffer failed");
b30ab791 3451 put_ldev(device);
778f271d
PR
3452 goto disconnect;
3453 }
3454 }
8e26f9cc 3455 }
b411b363 3456
91fd4dad 3457 if (verify_tfm || csums_tfm) {
2ec91e0e
PR
3458 new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3459 if (!new_net_conf) {
91fd4dad
PR
3460 dev_err(DEV, "Allocation of new net_conf failed\n");
3461 goto disconnect;
3462 }
3463
2ec91e0e 3464 *new_net_conf = *old_net_conf;
91fd4dad
PR
3465
3466 if (verify_tfm) {
2ec91e0e
PR
3467 strcpy(new_net_conf->verify_alg, p->verify_alg);
3468 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
b30ab791
AG
3469 crypto_free_hash(device->tconn->verify_tfm);
3470 device->tconn->verify_tfm = verify_tfm;
91fd4dad
PR
3471 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3472 }
3473 if (csums_tfm) {
2ec91e0e
PR
3474 strcpy(new_net_conf->csums_alg, p->csums_alg);
3475 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
b30ab791
AG
3476 crypto_free_hash(device->tconn->csums_tfm);
3477 device->tconn->csums_tfm = csums_tfm;
91fd4dad
PR
3478 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3479 }
2ec91e0e 3480 rcu_assign_pointer(tconn->net_conf, new_net_conf);
778f271d 3481 }
b411b363
PR
3482 }
3483
813472ce 3484 if (new_disk_conf) {
b30ab791
AG
3485 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3486 put_ldev(device);
813472ce
PR
3487 }
3488
3489 if (new_plan) {
b30ab791
AG
3490 old_plan = device->rs_plan_s;
3491 rcu_assign_pointer(device->rs_plan_s, new_plan);
b411b363 3492 }
daeda1cc 3493
b30ab791 3494 mutex_unlock(&device->tconn->conf_update);
daeda1cc
PR
3495 synchronize_rcu();
3496 if (new_net_conf)
3497 kfree(old_net_conf);
3498 kfree(old_disk_conf);
813472ce 3499 kfree(old_plan);
daeda1cc 3500
82bc0194 3501 return 0;
b411b363 3502
813472ce
PR
3503reconnect:
3504 if (new_disk_conf) {
b30ab791 3505 put_ldev(device);
813472ce
PR
3506 kfree(new_disk_conf);
3507 }
b30ab791 3508 mutex_unlock(&device->tconn->conf_update);
813472ce
PR
3509 return -EIO;
3510
b411b363 3511disconnect:
813472ce
PR
3512 kfree(new_plan);
3513 if (new_disk_conf) {
b30ab791 3514 put_ldev(device);
813472ce
PR
3515 kfree(new_disk_conf);
3516 }
b30ab791 3517 mutex_unlock(&device->tconn->conf_update);
b411b363
PR
3518 /* just for completeness: actually not needed,
3519 * as this is not reached if csums_tfm was ok. */
3520 crypto_free_hash(csums_tfm);
3521 /* but free the verify_tfm again, if csums_tfm did not work out */
3522 crypto_free_hash(verify_tfm);
b30ab791 3523 conn_request_state(device->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 3524 return -EIO;
b411b363
PR
3525}
3526
b411b363 3527/* warn if the arguments differ by more than 12.5% */
b30ab791 3528static void warn_if_differ_considerably(struct drbd_device *device,
b411b363
PR
3529 const char *s, sector_t a, sector_t b)
3530{
3531 sector_t d;
3532 if (a == 0 || b == 0)
3533 return;
3534 d = (a > b) ? (a - b) : (b - a);
3535 if (d > (a>>3) || d > (b>>3))
3536 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3537 (unsigned long long)a, (unsigned long long)b);
3538}
3539
4a76b161 3540static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 3541{
b30ab791 3542 struct drbd_device *device;
e658983a 3543 struct p_sizes *p = pi->data;
e96c9633 3544 enum determine_dev_size dd = DS_UNCHANGED;
b411b363
PR
3545 sector_t p_size, p_usize, my_usize;
3546 int ldsc = 0; /* local disk size changed */
e89b591c 3547 enum dds_flags ddsf;
b411b363 3548
b30ab791
AG
3549 device = vnr_to_device(tconn, pi->vnr);
3550 if (!device)
4a76b161
AG
3551 return config_unknown_volume(tconn, pi);
3552
b411b363
PR
3553 p_size = be64_to_cpu(p->d_size);
3554 p_usize = be64_to_cpu(p->u_size);
3555
b411b363
PR
3556 /* just store the peer's disk size for now.
3557 * we still need to figure out whether we accept that. */
b30ab791 3558 device->p_size = p_size;
b411b363 3559
b30ab791 3560 if (get_ldev(device)) {
daeda1cc 3561 rcu_read_lock();
b30ab791 3562 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
daeda1cc
PR
3563 rcu_read_unlock();
3564
b30ab791
AG
3565 warn_if_differ_considerably(device, "lower level device sizes",
3566 p_size, drbd_get_max_capacity(device->ldev));
3567 warn_if_differ_considerably(device, "user requested size",
daeda1cc 3568 p_usize, my_usize);
b411b363
PR
3569
3570 /* if this is the first connect, or an otherwise expected
3571 * param exchange, choose the minimum */
b30ab791 3572 if (device->state.conn == C_WF_REPORT_PARAMS)
daeda1cc 3573 p_usize = min_not_zero(my_usize, p_usize);
b411b363
PR
3574
3575 /* Never shrink a device with usable data during connect.
3576 But allow online shrinking if we are connected. */
b30ab791
AG
3577 if (drbd_new_dev_size(device, device->ldev, p_usize, 0) <
3578 drbd_get_capacity(device->this_bdev) &&
3579 device->state.disk >= D_OUTDATED &&
3580 device->state.conn < C_CONNECTED) {
b411b363 3581 dev_err(DEV, "The peer's disk size is too small!\n");
b30ab791
AG
3582 conn_request_state(device->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3583 put_ldev(device);
82bc0194 3584 return -EIO;
b411b363 3585 }
daeda1cc
PR
3586
3587 if (my_usize != p_usize) {
3588 struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3589
3590 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3591 if (!new_disk_conf) {
3592 dev_err(DEV, "Allocation of new disk_conf failed\n");
b30ab791 3593 put_ldev(device);
daeda1cc
PR
3594 return -ENOMEM;
3595 }
3596
b30ab791
AG
3597 mutex_lock(&device->tconn->conf_update);
3598 old_disk_conf = device->ldev->disk_conf;
daeda1cc
PR
3599 *new_disk_conf = *old_disk_conf;
3600 new_disk_conf->disk_size = p_usize;
3601
b30ab791
AG
3602 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3603 mutex_unlock(&device->tconn->conf_update);
daeda1cc
PR
3604 synchronize_rcu();
3605 kfree(old_disk_conf);
3606
3607 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3608 (unsigned long)my_usize);
b411b363 3609 }
daeda1cc 3610
b30ab791 3611 put_ldev(device);
b411b363 3612 }
b411b363 3613
e89b591c 3614 ddsf = be16_to_cpu(p->dds_flags);
b30ab791
AG
3615 if (get_ldev(device)) {
3616 dd = drbd_determine_dev_size(device, ddsf, NULL);
3617 put_ldev(device);
e96c9633 3618 if (dd == DS_ERROR)
82bc0194 3619 return -EIO;
b30ab791 3620 drbd_md_sync(device);
b411b363
PR
3621 } else {
3622 /* I am diskless, need to accept the peer's size. */
b30ab791 3623 drbd_set_my_capacity(device, p_size);
b411b363
PR
3624 }
3625
b30ab791
AG
3626 device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3627 drbd_reconsider_max_bio_size(device);
99432fcc 3628
b30ab791
AG
3629 if (get_ldev(device)) {
3630 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
3631 device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
b411b363
PR
3632 ldsc = 1;
3633 }
3634
b30ab791 3635 put_ldev(device);
b411b363
PR
3636 }
3637
b30ab791 3638 if (device->state.conn > C_WF_REPORT_PARAMS) {
b411b363 3639 if (be64_to_cpu(p->c_size) !=
b30ab791 3640 drbd_get_capacity(device->this_bdev) || ldsc) {
b411b363
PR
3641 /* we have different sizes, probably peer
3642 * needs to know my new size... */
b30ab791 3643 drbd_send_sizes(device, 0, ddsf);
b411b363 3644 }
b30ab791
AG
3645 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
3646 (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
3647 if (device->state.pdsk >= D_INCONSISTENT &&
3648 device->state.disk >= D_INCONSISTENT) {
e89b591c
PR
3649 if (ddsf & DDSF_NO_RESYNC)
3650 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3651 else
b30ab791 3652 resync_after_online_grow(device);
e89b591c 3653 } else
b30ab791 3654 set_bit(RESYNC_AFTER_NEG, &device->flags);
b411b363
PR
3655 }
3656 }
3657
82bc0194 3658 return 0;
b411b363
PR
3659}
3660
4a76b161 3661static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 3662{
b30ab791 3663 struct drbd_device *device;
e658983a 3664 struct p_uuids *p = pi->data;
b411b363 3665 u64 *p_uuid;
62b0da3a 3666 int i, updated_uuids = 0;
b411b363 3667
b30ab791
AG
3668 device = vnr_to_device(tconn, pi->vnr);
3669 if (!device)
4a76b161
AG
3670 return config_unknown_volume(tconn, pi);
3671
b411b363 3672 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
063eacf8
JW
3673 if (!p_uuid) {
3674 dev_err(DEV, "kmalloc of p_uuid failed\n");
3675 return false;
3676 }
b411b363
PR
3677
3678 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3679 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3680
b30ab791
AG
3681 kfree(device->p_uuid);
3682 device->p_uuid = p_uuid;
b411b363 3683
b30ab791
AG
3684 if (device->state.conn < C_CONNECTED &&
3685 device->state.disk < D_INCONSISTENT &&
3686 device->state.role == R_PRIMARY &&
3687 (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
b411b363 3688 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
b30ab791
AG
3689 (unsigned long long)device->ed_uuid);
3690 conn_request_state(device->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 3691 return -EIO;
b411b363
PR
3692 }
3693
b30ab791 3694 if (get_ldev(device)) {
b411b363 3695 int skip_initial_sync =
b30ab791
AG
3696 device->state.conn == C_CONNECTED &&
3697 device->tconn->agreed_pro_version >= 90 &&
3698 device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
b411b363
PR
3699 (p_uuid[UI_FLAGS] & 8);
3700 if (skip_initial_sync) {
3701 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
b30ab791 3702 drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
20ceb2b2
LE
3703 "clear_n_write from receive_uuids",
3704 BM_LOCKED_TEST_ALLOWED);
b30ab791
AG
3705 _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
3706 _drbd_uuid_set(device, UI_BITMAP, 0);
3707 _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
b411b363 3708 CS_VERBOSE, NULL);
b30ab791 3709 drbd_md_sync(device);
62b0da3a 3710 updated_uuids = 1;
b411b363 3711 }
b30ab791
AG
3712 put_ldev(device);
3713 } else if (device->state.disk < D_INCONSISTENT &&
3714 device->state.role == R_PRIMARY) {
18a50fa2
PR
3715 /* I am a diskless primary, the peer just created a new current UUID
3716 for me. */
b30ab791 3717 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
b411b363
PR
3718 }
3719
3720 /* Before we test for the disk state, we should wait until an eventually
3721 ongoing cluster wide state change is finished. That is important if
3722 we are primary and are detaching from our disk. We need to see the
3723 new disk state... */
b30ab791
AG
3724 mutex_lock(device->state_mutex);
3725 mutex_unlock(device->state_mutex);
3726 if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
3727 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
62b0da3a
LE
3728
3729 if (updated_uuids)
b30ab791 3730 drbd_print_uuids(device, "receiver updated UUIDs to");
b411b363 3731
82bc0194 3732 return 0;
b411b363
PR
3733}
3734
3735/**
3736 * convert_state() - Converts the peer's view of the cluster state to our point of view
3737 * @ps: The state as seen by the peer.
3738 */
3739static union drbd_state convert_state(union drbd_state ps)
3740{
3741 union drbd_state ms;
3742
3743 static enum drbd_conns c_tab[] = {
369bea63 3744 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
b411b363
PR
3745 [C_CONNECTED] = C_CONNECTED,
3746
3747 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3748 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3749 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3750 [C_VERIFY_S] = C_VERIFY_T,
3751 [C_MASK] = C_MASK,
3752 };
3753
3754 ms.i = ps.i;
3755
3756 ms.conn = c_tab[ps.conn];
3757 ms.peer = ps.role;
3758 ms.role = ps.peer;
3759 ms.pdsk = ps.disk;
3760 ms.disk = ps.pdsk;
3761 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3762
3763 return ms;
3764}
3765
4a76b161 3766static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 3767{
b30ab791 3768 struct drbd_device *device;
e658983a 3769 struct p_req_state *p = pi->data;
b411b363 3770 union drbd_state mask, val;
bf885f8a 3771 enum drbd_state_rv rv;
b411b363 3772
b30ab791
AG
3773 device = vnr_to_device(tconn, pi->vnr);
3774 if (!device)
4a76b161
AG
3775 return -EIO;
3776
b411b363
PR
3777 mask.i = be32_to_cpu(p->mask);
3778 val.i = be32_to_cpu(p->val);
3779
b30ab791
AG
3780 if (test_bit(RESOLVE_CONFLICTS, &device->tconn->flags) &&
3781 mutex_is_locked(device->state_mutex)) {
3782 drbd_send_sr_reply(device, SS_CONCURRENT_ST_CHG);
82bc0194 3783 return 0;
b411b363
PR
3784 }
3785
3786 mask = convert_state(mask);
3787 val = convert_state(val);
3788
b30ab791
AG
3789 rv = drbd_change_state(device, CS_VERBOSE, mask, val);
3790 drbd_send_sr_reply(device, rv);
b411b363 3791
b30ab791 3792 drbd_md_sync(device);
b411b363 3793
82bc0194 3794 return 0;
b411b363
PR
3795}
3796
e2857216 3797static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 3798{
e658983a 3799 struct p_req_state *p = pi->data;
b411b363 3800 union drbd_state mask, val;
bf885f8a 3801 enum drbd_state_rv rv;
b411b363 3802
b411b363
PR
3803 mask.i = be32_to_cpu(p->mask);
3804 val.i = be32_to_cpu(p->val);
3805
427c0434 3806 if (test_bit(RESOLVE_CONFLICTS, &tconn->flags) &&
dfafcc8a
PR
3807 mutex_is_locked(&tconn->cstate_mutex)) {
3808 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
82bc0194 3809 return 0;
b411b363
PR
3810 }
3811
3812 mask = convert_state(mask);
3813 val = convert_state(val);
3814
778bcf2e 3815 rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
dfafcc8a 3816 conn_send_sr_reply(tconn, rv);
b411b363 3817
82bc0194 3818 return 0;
b411b363
PR
3819}
3820
4a76b161 3821static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 3822{
b30ab791 3823 struct drbd_device *device;
e658983a 3824 struct p_state *p = pi->data;
4ac4aada 3825 union drbd_state os, ns, peer_state;
b411b363 3826 enum drbd_disk_state real_peer_disk;
65d922c3 3827 enum chg_state_flags cs_flags;
b411b363
PR
3828 int rv;
3829
b30ab791
AG
3830 device = vnr_to_device(tconn, pi->vnr);
3831 if (!device)
4a76b161
AG
3832 return config_unknown_volume(tconn, pi);
3833
b411b363
PR
3834 peer_state.i = be32_to_cpu(p->state);
3835
3836 real_peer_disk = peer_state.disk;
3837 if (peer_state.disk == D_NEGOTIATING) {
b30ab791 3838 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
b411b363
PR
3839 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3840 }
3841
b30ab791 3842 spin_lock_irq(&device->tconn->req_lock);
b411b363 3843 retry:
b30ab791
AG
3844 os = ns = drbd_read_state(device);
3845 spin_unlock_irq(&device->tconn->req_lock);
b411b363 3846
545752d5
LE
3847 /* If some other part of the code (asender thread, timeout)
3848 * already decided to close the connection again,
3849 * we must not "re-establish" it here. */
3850 if (os.conn <= C_TEAR_DOWN)
58ffa580 3851 return -ECONNRESET;
545752d5 3852
40424e4a
LE
3853 /* If this is the "end of sync" confirmation, usually the peer disk
3854 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3855 * set) resync started in PausedSyncT, or if the timing of pause-/
3856 * unpause-sync events has been "just right", the peer disk may
3857 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3858 */
3859 if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3860 real_peer_disk == D_UP_TO_DATE &&
e9ef7bb6
LE
3861 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3862 /* If we are (becoming) SyncSource, but peer is still in sync
3863 * preparation, ignore its uptodate-ness to avoid flapping, it
3864 * will change to inconsistent once the peer reaches active
3865 * syncing states.
3866 * It may have changed syncer-paused flags, however, so we
3867 * cannot ignore this completely. */
3868 if (peer_state.conn > C_CONNECTED &&
3869 peer_state.conn < C_SYNC_SOURCE)
3870 real_peer_disk = D_INCONSISTENT;
3871
3872 /* if peer_state changes to connected at the same time,
3873 * it explicitly notifies us that it finished resync.
3874 * Maybe we should finish it up, too? */
3875 else if (os.conn >= C_SYNC_SOURCE &&
3876 peer_state.conn == C_CONNECTED) {
b30ab791
AG
3877 if (drbd_bm_total_weight(device) <= device->rs_failed)
3878 drbd_resync_finished(device);
82bc0194 3879 return 0;
e9ef7bb6
LE
3880 }
3881 }
3882
02b91b55
LE
3883 /* explicit verify finished notification, stop sector reached. */
3884 if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
3885 peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
b30ab791
AG
3886 ov_out_of_sync_print(device);
3887 drbd_resync_finished(device);
58ffa580 3888 return 0;
02b91b55
LE
3889 }
3890
e9ef7bb6
LE
3891 /* peer says his disk is inconsistent, while we think it is uptodate,
3892 * and this happens while the peer still thinks we have a sync going on,
3893 * but we think we are already done with the sync.
3894 * We ignore this to avoid flapping pdsk.
3895 * This should not happen, if the peer is a recent version of drbd. */
3896 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3897 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3898 real_peer_disk = D_UP_TO_DATE;
3899
4ac4aada
LE
3900 if (ns.conn == C_WF_REPORT_PARAMS)
3901 ns.conn = C_CONNECTED;
b411b363 3902
67531718
PR
3903 if (peer_state.conn == C_AHEAD)
3904 ns.conn = C_BEHIND;
3905
b30ab791
AG
3906 if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3907 get_ldev_if_state(device, D_NEGOTIATING)) {
b411b363
PR
3908 int cr; /* consider resync */
3909
3910 /* if we established a new connection */
4ac4aada 3911 cr = (os.conn < C_CONNECTED);
b411b363
PR
3912 /* if we had an established connection
3913 * and one of the nodes newly attaches a disk */
4ac4aada 3914 cr |= (os.conn == C_CONNECTED &&
b411b363 3915 (peer_state.disk == D_NEGOTIATING ||
4ac4aada 3916 os.disk == D_NEGOTIATING));
b411b363
PR
3917 /* if we have both been inconsistent, and the peer has been
3918 * forced to be UpToDate with --overwrite-data */
b30ab791 3919 cr |= test_bit(CONSIDER_RESYNC, &device->flags);
b411b363
PR
3920 /* if we had been plain connected, and the admin requested to
3921 * start a sync by "invalidate" or "invalidate-remote" */
4ac4aada 3922 cr |= (os.conn == C_CONNECTED &&
b411b363
PR
3923 (peer_state.conn >= C_STARTING_SYNC_S &&
3924 peer_state.conn <= C_WF_BITMAP_T));
3925
3926 if (cr)
b30ab791 3927 ns.conn = drbd_sync_handshake(device, peer_state.role, real_peer_disk);
b411b363 3928
b30ab791 3929 put_ldev(device);
4ac4aada
LE
3930 if (ns.conn == C_MASK) {
3931 ns.conn = C_CONNECTED;
b30ab791
AG
3932 if (device->state.disk == D_NEGOTIATING) {
3933 drbd_force_state(device, NS(disk, D_FAILED));
b411b363
PR
3934 } else if (peer_state.disk == D_NEGOTIATING) {
3935 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3936 peer_state.disk = D_DISKLESS;
580b9767 3937 real_peer_disk = D_DISKLESS;
b411b363 3938 } else {
b30ab791 3939 if (test_and_clear_bit(CONN_DRY_RUN, &device->tconn->flags))
82bc0194 3940 return -EIO;
4ac4aada 3941 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
b30ab791 3942 conn_request_state(device->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 3943 return -EIO;
b411b363
PR
3944 }
3945 }
3946 }
3947
b30ab791
AG
3948 spin_lock_irq(&device->tconn->req_lock);
3949 if (os.i != drbd_read_state(device).i)
b411b363 3950 goto retry;
b30ab791 3951 clear_bit(CONSIDER_RESYNC, &device->flags);
b411b363
PR
3952 ns.peer = peer_state.role;
3953 ns.pdsk = real_peer_disk;
3954 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4ac4aada 3955 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
b30ab791 3956 ns.disk = device->new_state_tmp.disk;
4ac4aada 3957 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
b30ab791
AG
3958 if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3959 test_bit(NEW_CUR_UUID, &device->flags)) {
8554df1c 3960 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
481c6f50 3961 for temporal network outages! */
b30ab791 3962 spin_unlock_irq(&device->tconn->req_lock);
481c6f50 3963 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
b30ab791
AG
3964 tl_clear(device->tconn);
3965 drbd_uuid_new_current(device);
3966 clear_bit(NEW_CUR_UUID, &device->flags);
3967 conn_request_state(device->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
82bc0194 3968 return -EIO;
481c6f50 3969 }
b30ab791
AG
3970 rv = _drbd_set_state(device, ns, cs_flags, NULL);
3971 ns = drbd_read_state(device);
3972 spin_unlock_irq(&device->tconn->req_lock);
b411b363
PR
3973
3974 if (rv < SS_SUCCESS) {
b30ab791 3975 conn_request_state(device->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
82bc0194 3976 return -EIO;
b411b363
PR
3977 }
3978
4ac4aada
LE
3979 if (os.conn > C_WF_REPORT_PARAMS) {
3980 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
b411b363
PR
3981 peer_state.disk != D_NEGOTIATING ) {
3982 /* we want resync, peer has not yet decided to sync... */
3983 /* Nowadays only used when forcing a node into primary role and
3984 setting its disk to UpToDate with that */
b30ab791
AG
3985 drbd_send_uuids(device);
3986 drbd_send_current_state(device);
b411b363
PR
3987 }
3988 }
3989
b30ab791 3990 clear_bit(DISCARD_MY_DATA, &device->flags);
b411b363 3991
b30ab791 3992 drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
b411b363 3993
82bc0194 3994 return 0;
b411b363
PR
3995}
3996
4a76b161 3997static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 3998{
b30ab791 3999 struct drbd_device *device;
e658983a 4000 struct p_rs_uuid *p = pi->data;
4a76b161 4001
b30ab791
AG
4002 device = vnr_to_device(tconn, pi->vnr);
4003 if (!device)
4a76b161 4004 return -EIO;
b411b363 4005
b30ab791
AG
4006 wait_event(device->misc_wait,
4007 device->state.conn == C_WF_SYNC_UUID ||
4008 device->state.conn == C_BEHIND ||
4009 device->state.conn < C_CONNECTED ||
4010 device->state.disk < D_NEGOTIATING);
b411b363 4011
b30ab791 4012 /* D_ASSERT( device->state.conn == C_WF_SYNC_UUID ); */
b411b363 4013
b411b363
PR
4014 /* Here the _drbd_uuid_ functions are right, current should
4015 _not_ be rotated into the history */
b30ab791
AG
4016 if (get_ldev_if_state(device, D_NEGOTIATING)) {
4017 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4018 _drbd_uuid_set(device, UI_BITMAP, 0UL);
b411b363 4019
b30ab791
AG
4020 drbd_print_uuids(device, "updated sync uuid");
4021 drbd_start_resync(device, C_SYNC_TARGET);
b411b363 4022
b30ab791 4023 put_ldev(device);
b411b363
PR
4024 } else
4025 dev_err(DEV, "Ignoring SyncUUID packet!\n");
4026
82bc0194 4027 return 0;
b411b363
PR
4028}
4029
2c46407d
AG
4030/**
4031 * receive_bitmap_plain
4032 *
4033 * Return 0 when done, 1 when another iteration is needed, and a negative error
4034 * code upon failure.
4035 */
4036static int
b30ab791 4037receive_bitmap_plain(struct drbd_device *device, unsigned int size,
e658983a 4038 unsigned long *p, struct bm_xfer_ctx *c)
b411b363 4039{
50d0b1ad 4040 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
b30ab791 4041 drbd_header_size(device->tconn);
e658983a 4042 unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
50d0b1ad 4043 c->bm_words - c->word_offset);
e658983a 4044 unsigned int want = num_words * sizeof(*p);
2c46407d 4045 int err;
b411b363 4046
50d0b1ad
AG
4047 if (want != size) {
4048 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
2c46407d 4049 return -EIO;
b411b363
PR
4050 }
4051 if (want == 0)
2c46407d 4052 return 0;
b30ab791 4053 err = drbd_recv_all(device->tconn, p, want);
82bc0194 4054 if (err)
2c46407d 4055 return err;
b411b363 4056
b30ab791 4057 drbd_bm_merge_lel(device, c->word_offset, num_words, p);
b411b363
PR
4058
4059 c->word_offset += num_words;
4060 c->bit_offset = c->word_offset * BITS_PER_LONG;
4061 if (c->bit_offset > c->bm_bits)
4062 c->bit_offset = c->bm_bits;
4063
2c46407d 4064 return 1;
b411b363
PR
4065}
4066
a02d1240
AG
4067static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4068{
4069 return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4070}
4071
4072static int dcbp_get_start(struct p_compressed_bm *p)
4073{
4074 return (p->encoding & 0x80) != 0;
4075}
4076
4077static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4078{
4079 return (p->encoding >> 4) & 0x7;
4080}
4081
2c46407d
AG
4082/**
4083 * recv_bm_rle_bits
4084 *
4085 * Return 0 when done, 1 when another iteration is needed, and a negative error
4086 * code upon failure.
4087 */
4088static int
b30ab791 4089recv_bm_rle_bits(struct drbd_device *device,
b411b363 4090 struct p_compressed_bm *p,
c6d25cfe
PR
4091 struct bm_xfer_ctx *c,
4092 unsigned int len)
b411b363
PR
4093{
4094 struct bitstream bs;
4095 u64 look_ahead;
4096 u64 rl;
4097 u64 tmp;
4098 unsigned long s = c->bit_offset;
4099 unsigned long e;
a02d1240 4100 int toggle = dcbp_get_start(p);
b411b363
PR
4101 int have;
4102 int bits;
4103
a02d1240 4104 bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
b411b363
PR
4105
4106 bits = bitstream_get_bits(&bs, &look_ahead, 64);
4107 if (bits < 0)
2c46407d 4108 return -EIO;
b411b363
PR
4109
4110 for (have = bits; have > 0; s += rl, toggle = !toggle) {
4111 bits = vli_decode_bits(&rl, look_ahead);
4112 if (bits <= 0)
2c46407d 4113 return -EIO;
b411b363
PR
4114
4115 if (toggle) {
4116 e = s + rl -1;
4117 if (e >= c->bm_bits) {
4118 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
2c46407d 4119 return -EIO;
b411b363 4120 }
b30ab791 4121 _drbd_bm_set_bits(device, s, e);
b411b363
PR
4122 }
4123
4124 if (have < bits) {
4125 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4126 have, bits, look_ahead,
4127 (unsigned int)(bs.cur.b - p->code),
4128 (unsigned int)bs.buf_len);
2c46407d 4129 return -EIO;
b411b363 4130 }
d2da5b0c
LE
4131 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4132 if (likely(bits < 64))
4133 look_ahead >>= bits;
4134 else
4135 look_ahead = 0;
b411b363
PR
4136 have -= bits;
4137
4138 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4139 if (bits < 0)
2c46407d 4140 return -EIO;
b411b363
PR
4141 look_ahead |= tmp << have;
4142 have += bits;
4143 }
4144
4145 c->bit_offset = s;
4146 bm_xfer_ctx_bit_to_word_offset(c);
4147
2c46407d 4148 return (s != c->bm_bits);
b411b363
PR
4149}
4150
2c46407d
AG
4151/**
4152 * decode_bitmap_c
4153 *
4154 * Return 0 when done, 1 when another iteration is needed, and a negative error
4155 * code upon failure.
4156 */
4157static int
b30ab791 4158decode_bitmap_c(struct drbd_device *device,
b411b363 4159 struct p_compressed_bm *p,
c6d25cfe
PR
4160 struct bm_xfer_ctx *c,
4161 unsigned int len)
b411b363 4162{
a02d1240 4163 if (dcbp_get_code(p) == RLE_VLI_Bits)
b30ab791 4164 return recv_bm_rle_bits(device, p, c, len - sizeof(*p));
b411b363
PR
4165
4166 /* other variants had been implemented for evaluation,
4167 * but have been dropped as this one turned out to be "best"
4168 * during all our tests. */
4169
4170 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
b30ab791 4171 conn_request_state(device->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
2c46407d 4172 return -EIO;
b411b363
PR
4173}
4174
b30ab791 4175void INFO_bm_xfer_stats(struct drbd_device *device,
b411b363
PR
4176 const char *direction, struct bm_xfer_ctx *c)
4177{
4178 /* what would it take to transfer it "plaintext" */
b30ab791 4179 unsigned int header_size = drbd_header_size(device->tconn);
50d0b1ad
AG
4180 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4181 unsigned int plain =
4182 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4183 c->bm_words * sizeof(unsigned long);
4184 unsigned int total = c->bytes[0] + c->bytes[1];
4185 unsigned int r;
b411b363
PR
4186
4187 /* total can not be zero. but just in case: */
4188 if (total == 0)
4189 return;
4190
4191 /* don't report if not compressed */
4192 if (total >= plain)
4193 return;
4194
4195 /* total < plain. check for overflow, still */
4196 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4197 : (1000 * total / plain);
4198
4199 if (r > 1000)
4200 r = 1000;
4201
4202 r = 1000 - r;
4203 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4204 "total %u; compression: %u.%u%%\n",
4205 direction,
4206 c->bytes[1], c->packets[1],
4207 c->bytes[0], c->packets[0],
4208 total, r/10, r % 10);
4209}
4210
4211/* Since we are processing the bitfield from lower addresses to higher,
4212 it does not matter if the process it in 32 bit chunks or 64 bit
4213 chunks as long as it is little endian. (Understand it as byte stream,
4214 beginning with the lowest byte...) If we would use big endian
4215 we would need to process it from the highest address to the lowest,
4216 in order to be agnostic to the 32 vs 64 bits issue.
4217
4218 returns 0 on failure, 1 if we successfully received it. */
4a76b161 4219static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4220{
b30ab791 4221 struct drbd_device *device;
b411b363 4222 struct bm_xfer_ctx c;
2c46407d 4223 int err;
4a76b161 4224
b30ab791
AG
4225 device = vnr_to_device(tconn, pi->vnr);
4226 if (!device)
4a76b161 4227 return -EIO;
b411b363 4228
b30ab791 4229 drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
20ceb2b2
LE
4230 /* you are supposed to send additional out-of-sync information
4231 * if you actually set bits during this phase */
b411b363 4232
b411b363 4233 c = (struct bm_xfer_ctx) {
b30ab791
AG
4234 .bm_bits = drbd_bm_bits(device),
4235 .bm_words = drbd_bm_words(device),
b411b363
PR
4236 };
4237
2c46407d 4238 for(;;) {
e658983a 4239 if (pi->cmd == P_BITMAP)
b30ab791 4240 err = receive_bitmap_plain(device, pi->size, pi->data, &c);
e658983a 4241 else if (pi->cmd == P_COMPRESSED_BITMAP) {
b411b363
PR
4242 /* MAYBE: sanity check that we speak proto >= 90,
4243 * and the feature is enabled! */
e658983a 4244 struct p_compressed_bm *p = pi->data;
b411b363 4245
50d0b1ad 4246 if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
b411b363 4247 dev_err(DEV, "ReportCBitmap packet too large\n");
82bc0194 4248 err = -EIO;
b411b363
PR
4249 goto out;
4250 }
e658983a 4251 if (pi->size <= sizeof(*p)) {
e2857216 4252 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
82bc0194 4253 err = -EIO;
78fcbdae 4254 goto out;
b411b363 4255 }
b30ab791 4256 err = drbd_recv_all(device->tconn, p, pi->size);
e658983a
AG
4257 if (err)
4258 goto out;
b30ab791 4259 err = decode_bitmap_c(device, p, &c, pi->size);
b411b363 4260 } else {
e2857216 4261 dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
82bc0194 4262 err = -EIO;
b411b363
PR
4263 goto out;
4264 }
4265
e2857216 4266 c.packets[pi->cmd == P_BITMAP]++;
50d0b1ad 4267 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
b411b363 4268
2c46407d
AG
4269 if (err <= 0) {
4270 if (err < 0)
4271 goto out;
b411b363 4272 break;
2c46407d 4273 }
b30ab791 4274 err = drbd_recv_header(device->tconn, pi);
82bc0194 4275 if (err)
b411b363 4276 goto out;
2c46407d 4277 }
b411b363 4278
b30ab791 4279 INFO_bm_xfer_stats(device, "receive", &c);
b411b363 4280
b30ab791 4281 if (device->state.conn == C_WF_BITMAP_T) {
de1f8e4a
AG
4282 enum drbd_state_rv rv;
4283
b30ab791 4284 err = drbd_send_bitmap(device);
82bc0194 4285 if (err)
b411b363
PR
4286 goto out;
4287 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
b30ab791 4288 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
de1f8e4a 4289 D_ASSERT(rv == SS_SUCCESS);
b30ab791 4290 } else if (device->state.conn != C_WF_BITMAP_S) {
b411b363
PR
4291 /* admin may have requested C_DISCONNECTING,
4292 * other threads may have noticed network errors */
4293 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
b30ab791 4294 drbd_conn_str(device->state.conn));
b411b363 4295 }
82bc0194 4296 err = 0;
b411b363 4297
b411b363 4298 out:
b30ab791
AG
4299 drbd_bm_unlock(device);
4300 if (!err && device->state.conn == C_WF_BITMAP_S)
4301 drbd_start_resync(device, C_SYNC_SOURCE);
82bc0194 4302 return err;
b411b363
PR
4303}
4304
4a76b161 4305static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4306{
4a76b161 4307 conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
e2857216 4308 pi->cmd, pi->size);
b411b363 4309
4a76b161 4310 return ignore_remaining_packet(tconn, pi);
b411b363
PR
4311}
4312
4a76b161 4313static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
0ced55a3 4314{
e7f52dfb
LE
4315 /* Make sure we've acked all the TCP data associated
4316 * with the data requests being unplugged */
4a76b161 4317 drbd_tcp_quickack(tconn->data.socket);
0ced55a3 4318
82bc0194 4319 return 0;
0ced55a3
PR
4320}
4321
4a76b161 4322static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
73a01a18 4323{
b30ab791 4324 struct drbd_device *device;
e658983a 4325 struct p_block_desc *p = pi->data;
4a76b161 4326
b30ab791
AG
4327 device = vnr_to_device(tconn, pi->vnr);
4328 if (!device)
4a76b161 4329 return -EIO;
73a01a18 4330
b30ab791 4331 switch (device->state.conn) {
f735e363
LE
4332 case C_WF_SYNC_UUID:
4333 case C_WF_BITMAP_T:
4334 case C_BEHIND:
4335 break;
4336 default:
4337 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
b30ab791 4338 drbd_conn_str(device->state.conn));
f735e363
LE
4339 }
4340
b30ab791 4341 drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
73a01a18 4342
82bc0194 4343 return 0;
73a01a18
PR
4344}
4345
02918be2
PR
4346struct data_cmd {
4347 int expect_payload;
4348 size_t pkt_size;
4a76b161 4349 int (*fn)(struct drbd_tconn *, struct packet_info *);
02918be2
PR
4350};
4351
4352static struct data_cmd drbd_cmd_handler[] = {
4353 [P_DATA] = { 1, sizeof(struct p_data), receive_Data },
4354 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
4355 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4356 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
e658983a
AG
4357 [P_BITMAP] = { 1, 0, receive_bitmap } ,
4358 [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4359 [P_UNPLUG_REMOTE] = { 0, 0, receive_UnplugRemote },
02918be2
PR
4360 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4361 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
e658983a
AG
4362 [P_SYNC_PARAM] = { 1, 0, receive_SyncParam },
4363 [P_SYNC_PARAM89] = { 1, 0, receive_SyncParam },
02918be2
PR
4364 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
4365 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
4366 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
4367 [P_STATE] = { 0, sizeof(struct p_state), receive_state },
4368 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
4369 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4370 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4371 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4372 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4373 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
73a01a18 4374 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4a76b161 4375 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
036b17ea 4376 [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
b411b363
PR
4377};
4378
eefc2f7d 4379static void drbdd(struct drbd_tconn *tconn)
b411b363 4380{
77351055 4381 struct packet_info pi;
02918be2 4382 size_t shs; /* sub header size */
82bc0194 4383 int err;
b411b363 4384
eefc2f7d 4385 while (get_t_state(&tconn->receiver) == RUNNING) {
deebe195 4386 struct data_cmd *cmd;
b411b363 4387
eefc2f7d 4388 drbd_thread_current_set_cpu(&tconn->receiver);
69bc7bc3 4389 if (drbd_recv_header(tconn, &pi))
02918be2 4390 goto err_out;
b411b363 4391
deebe195 4392 cmd = &drbd_cmd_handler[pi.cmd];
4a76b161 4393 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
2fcb8f30
AG
4394 conn_err(tconn, "Unexpected data packet %s (0x%04x)",
4395 cmdname(pi.cmd), pi.cmd);
02918be2 4396 goto err_out;
0b33a916 4397 }
b411b363 4398
e658983a
AG
4399 shs = cmd->pkt_size;
4400 if (pi.size > shs && !cmd->expect_payload) {
2fcb8f30
AG
4401 conn_err(tconn, "No payload expected %s l:%d\n",
4402 cmdname(pi.cmd), pi.size);
02918be2 4403 goto err_out;
b411b363 4404 }
b411b363 4405
c13f7e1a 4406 if (shs) {
e658983a 4407 err = drbd_recv_all_warn(tconn, pi.data, shs);
a5c31904 4408 if (err)
c13f7e1a 4409 goto err_out;
e2857216 4410 pi.size -= shs;
c13f7e1a
LE
4411 }
4412
4a76b161
AG
4413 err = cmd->fn(tconn, &pi);
4414 if (err) {
9f5bdc33
AG
4415 conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4416 cmdname(pi.cmd), err, pi.size);
02918be2 4417 goto err_out;
b411b363
PR
4418 }
4419 }
82bc0194 4420 return;
b411b363 4421
82bc0194
AG
4422 err_out:
4423 conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
b411b363
PR
4424}
4425
0e29d163 4426void conn_flush_workqueue(struct drbd_tconn *tconn)
b411b363
PR
4427{
4428 struct drbd_wq_barrier barr;
4429
4430 barr.w.cb = w_prev_work_done;
0e29d163 4431 barr.w.tconn = tconn;
b411b363 4432 init_completion(&barr.done);
d5b27b01 4433 drbd_queue_work(&tconn->sender_work, &barr.w);
b411b363
PR
4434 wait_for_completion(&barr.done);
4435}
4436
81fa2e67 4437static void conn_disconnect(struct drbd_tconn *tconn)
b411b363 4438{
b30ab791 4439 struct drbd_device *device;
bbeb641c 4440 enum drbd_conns oc;
376694a0 4441 int vnr;
b411b363 4442
bbeb641c 4443 if (tconn->cstate == C_STANDALONE)
b411b363 4444 return;
b411b363 4445
545752d5
LE
4446 /* We are about to start the cleanup after connection loss.
4447 * Make sure drbd_make_request knows about that.
4448 * Usually we should be in some network failure state already,
4449 * but just in case we are not, we fix it up here.
4450 */
b8853dbd 4451 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
545752d5 4452
b411b363 4453 /* asender does not clean up anything. it must not interfere, either */
360cc740
PR
4454 drbd_thread_stop(&tconn->asender);
4455 drbd_free_sock(tconn);
4456
c141ebda 4457 rcu_read_lock();
b30ab791
AG
4458 idr_for_each_entry(&tconn->volumes, device, vnr) {
4459 kref_get(&device->kref);
c141ebda 4460 rcu_read_unlock();
b30ab791
AG
4461 drbd_disconnected(device);
4462 kref_put(&device->kref, &drbd_minor_destroy);
c141ebda
PR
4463 rcu_read_lock();
4464 }
4465 rcu_read_unlock();
4466
12038a3a
PR
4467 if (!list_empty(&tconn->current_epoch->list))
4468 conn_err(tconn, "ASSERTION FAILED: tconn->current_epoch->list not empty\n");
4469 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4470 atomic_set(&tconn->current_epoch->epoch_size, 0);
b6dd1a89 4471 tconn->send.seen_any_write_yet = false;
12038a3a 4472
360cc740
PR
4473 conn_info(tconn, "Connection closed\n");
4474
cb703454
PR
4475 if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4476 conn_try_outdate_peer_async(tconn);
4477
360cc740 4478 spin_lock_irq(&tconn->req_lock);
bbeb641c
PR
4479 oc = tconn->cstate;
4480 if (oc >= C_UNCONNECTED)
376694a0 4481 _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
bbeb641c 4482
360cc740
PR
4483 spin_unlock_irq(&tconn->req_lock);
4484
f3dfa40a 4485 if (oc == C_DISCONNECTING)
d9cc6e23 4486 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
360cc740
PR
4487}
4488
b30ab791 4489static int drbd_disconnected(struct drbd_device *device)
360cc740 4490{
360cc740 4491 unsigned int i;
b411b363 4492
85719573 4493 /* wait for current activity to cease. */
b30ab791
AG
4494 spin_lock_irq(&device->tconn->req_lock);
4495 _drbd_wait_ee_list_empty(device, &device->active_ee);
4496 _drbd_wait_ee_list_empty(device, &device->sync_ee);
4497 _drbd_wait_ee_list_empty(device, &device->read_ee);
4498 spin_unlock_irq(&device->tconn->req_lock);
b411b363
PR
4499
4500 /* We do not have data structures that would allow us to
4501 * get the rs_pending_cnt down to 0 again.
4502 * * On C_SYNC_TARGET we do not have any data structures describing
4503 * the pending RSDataRequest's we have sent.
4504 * * On C_SYNC_SOURCE there is no data structure that tracks
4505 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4506 * And no, it is not the sum of the reference counts in the
4507 * resync_LRU. The resync_LRU tracks the whole operation including
4508 * the disk-IO, while the rs_pending_cnt only tracks the blocks
4509 * on the fly. */
b30ab791
AG
4510 drbd_rs_cancel_all(device);
4511 device->rs_total = 0;
4512 device->rs_failed = 0;
4513 atomic_set(&device->rs_pending_cnt, 0);
4514 wake_up(&device->misc_wait);
b411b363 4515
b30ab791
AG
4516 del_timer_sync(&device->resync_timer);
4517 resync_timer_fn((unsigned long)device);
b411b363 4518
b411b363
PR
4519 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4520 * w_make_resync_request etc. which may still be on the worker queue
4521 * to be "canceled" */
b30ab791 4522 drbd_flush_workqueue(device);
b411b363 4523
b30ab791 4524 drbd_finish_peer_reqs(device);
b411b363 4525
d10b4ea3
PR
4526 /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4527 might have issued a work again. The one before drbd_finish_peer_reqs() is
4528 necessary to reclain net_ee in drbd_finish_peer_reqs(). */
b30ab791 4529 drbd_flush_workqueue(device);
d10b4ea3 4530
08332d73
LE
4531 /* need to do it again, drbd_finish_peer_reqs() may have populated it
4532 * again via drbd_try_clear_on_disk_bm(). */
b30ab791 4533 drbd_rs_cancel_all(device);
b411b363 4534
b30ab791
AG
4535 kfree(device->p_uuid);
4536 device->p_uuid = NULL;
b411b363 4537
b30ab791
AG
4538 if (!drbd_suspended(device))
4539 tl_clear(device->tconn);
b411b363 4540
b30ab791 4541 drbd_md_sync(device);
b411b363 4542
20ceb2b2
LE
4543 /* serialize with bitmap writeout triggered by the state change,
4544 * if any. */
b30ab791 4545 wait_event(device->misc_wait, !test_bit(BITMAP_IO, &device->flags));
20ceb2b2 4546
b411b363
PR
4547 /* tcp_close and release of sendpage pages can be deferred. I don't
4548 * want to use SO_LINGER, because apparently it can be deferred for
4549 * more than 20 seconds (longest time I checked).
4550 *
4551 * Actually we don't care for exactly when the network stack does its
4552 * put_page(), but release our reference on these pages right here.
4553 */
b30ab791 4554 i = drbd_free_peer_reqs(device, &device->net_ee);
b411b363
PR
4555 if (i)
4556 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
b30ab791 4557 i = atomic_read(&device->pp_in_use_by_net);
435f0740
LE
4558 if (i)
4559 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
b30ab791 4560 i = atomic_read(&device->pp_in_use);
b411b363 4561 if (i)
45bb912b 4562 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
b411b363 4563
b30ab791
AG
4564 D_ASSERT(list_empty(&device->read_ee));
4565 D_ASSERT(list_empty(&device->active_ee));
4566 D_ASSERT(list_empty(&device->sync_ee));
4567 D_ASSERT(list_empty(&device->done_ee));
b411b363 4568
360cc740 4569 return 0;
b411b363
PR
4570}
4571
4572/*
4573 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4574 * we can agree on is stored in agreed_pro_version.
4575 *
4576 * feature flags and the reserved array should be enough room for future
4577 * enhancements of the handshake protocol, and possible plugins...
4578 *
4579 * for now, they are expected to be zero, but ignored.
4580 */
6038178e 4581static int drbd_send_features(struct drbd_tconn *tconn)
b411b363 4582{
9f5bdc33
AG
4583 struct drbd_socket *sock;
4584 struct p_connection_features *p;
b411b363 4585
9f5bdc33
AG
4586 sock = &tconn->data;
4587 p = conn_prepare_command(tconn, sock);
4588 if (!p)
e8d17b01 4589 return -EIO;
b411b363
PR
4590 memset(p, 0, sizeof(*p));
4591 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4592 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
9f5bdc33 4593 return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
b411b363
PR
4594}
4595
4596/*
4597 * return values:
4598 * 1 yes, we have a valid connection
4599 * 0 oops, did not work out, please try again
4600 * -1 peer talks different language,
4601 * no point in trying again, please go standalone.
4602 */
6038178e 4603static int drbd_do_features(struct drbd_tconn *tconn)
b411b363 4604{
65d11ed6 4605 /* ASSERT current == tconn->receiver ... */
e658983a
AG
4606 struct p_connection_features *p;
4607 const int expect = sizeof(struct p_connection_features);
77351055 4608 struct packet_info pi;
a5c31904 4609 int err;
b411b363 4610
6038178e 4611 err = drbd_send_features(tconn);
e8d17b01 4612 if (err)
b411b363
PR
4613 return 0;
4614
69bc7bc3
AG
4615 err = drbd_recv_header(tconn, &pi);
4616 if (err)
b411b363
PR
4617 return 0;
4618
6038178e
AG
4619 if (pi.cmd != P_CONNECTION_FEATURES) {
4620 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
2fcb8f30 4621 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4622 return -1;
4623 }
4624
77351055 4625 if (pi.size != expect) {
6038178e 4626 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
77351055 4627 expect, pi.size);
b411b363
PR
4628 return -1;
4629 }
4630
e658983a
AG
4631 p = pi.data;
4632 err = drbd_recv_all_warn(tconn, p, expect);
a5c31904 4633 if (err)
b411b363 4634 return 0;
b411b363 4635
b411b363
PR
4636 p->protocol_min = be32_to_cpu(p->protocol_min);
4637 p->protocol_max = be32_to_cpu(p->protocol_max);
4638 if (p->protocol_max == 0)
4639 p->protocol_max = p->protocol_min;
4640
4641 if (PRO_VERSION_MAX < p->protocol_min ||
4642 PRO_VERSION_MIN > p->protocol_max)
4643 goto incompat;
4644
65d11ed6 4645 tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
b411b363 4646
65d11ed6
PR
4647 conn_info(tconn, "Handshake successful: "
4648 "Agreed network protocol version %d\n", tconn->agreed_pro_version);
b411b363
PR
4649
4650 return 1;
4651
4652 incompat:
65d11ed6 4653 conn_err(tconn, "incompatible DRBD dialects: "
b411b363
PR
4654 "I support %d-%d, peer supports %d-%d\n",
4655 PRO_VERSION_MIN, PRO_VERSION_MAX,
4656 p->protocol_min, p->protocol_max);
4657 return -1;
4658}
4659
4660#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
13e6037d 4661static int drbd_do_auth(struct drbd_tconn *tconn)
b411b363 4662{
ef57f9e6
PR
4663 conn_err(tconn, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4664 conn_err(tconn, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
b10d96cb 4665 return -1;
b411b363
PR
4666}
4667#else
4668#define CHALLENGE_LEN 64
b10d96cb
JT
4669
4670/* Return value:
4671 1 - auth succeeded,
4672 0 - failed, try again (network error),
4673 -1 - auth failed, don't try again.
4674*/
4675
13e6037d 4676static int drbd_do_auth(struct drbd_tconn *tconn)
b411b363 4677{
9f5bdc33 4678 struct drbd_socket *sock;
b411b363
PR
4679 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4680 struct scatterlist sg;
4681 char *response = NULL;
4682 char *right_response = NULL;
4683 char *peers_ch = NULL;
44ed167d
PR
4684 unsigned int key_len;
4685 char secret[SHARED_SECRET_MAX]; /* 64 byte */
b411b363
PR
4686 unsigned int resp_size;
4687 struct hash_desc desc;
77351055 4688 struct packet_info pi;
44ed167d 4689 struct net_conf *nc;
69bc7bc3 4690 int err, rv;
b411b363 4691
9f5bdc33 4692 /* FIXME: Put the challenge/response into the preallocated socket buffer. */
b411b363 4693
44ed167d
PR
4694 rcu_read_lock();
4695 nc = rcu_dereference(tconn->net_conf);
4696 key_len = strlen(nc->shared_secret);
4697 memcpy(secret, nc->shared_secret, key_len);
4698 rcu_read_unlock();
4699
13e6037d 4700 desc.tfm = tconn->cram_hmac_tfm;
b411b363
PR
4701 desc.flags = 0;
4702
44ed167d 4703 rv = crypto_hash_setkey(tconn->cram_hmac_tfm, (u8 *)secret, key_len);
b411b363 4704 if (rv) {
13e6037d 4705 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
b10d96cb 4706 rv = -1;
b411b363
PR
4707 goto fail;
4708 }
4709
4710 get_random_bytes(my_challenge, CHALLENGE_LEN);
4711
9f5bdc33
AG
4712 sock = &tconn->data;
4713 if (!conn_prepare_command(tconn, sock)) {
4714 rv = 0;
4715 goto fail;
4716 }
e658983a 4717 rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
9f5bdc33 4718 my_challenge, CHALLENGE_LEN);
b411b363
PR
4719 if (!rv)
4720 goto fail;
4721
69bc7bc3
AG
4722 err = drbd_recv_header(tconn, &pi);
4723 if (err) {
4724 rv = 0;
b411b363 4725 goto fail;
69bc7bc3 4726 }
b411b363 4727
77351055 4728 if (pi.cmd != P_AUTH_CHALLENGE) {
13e6037d 4729 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
2fcb8f30 4730 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4731 rv = 0;
4732 goto fail;
4733 }
4734
77351055 4735 if (pi.size > CHALLENGE_LEN * 2) {
13e6037d 4736 conn_err(tconn, "expected AuthChallenge payload too big.\n");
b10d96cb 4737 rv = -1;
b411b363
PR
4738 goto fail;
4739 }
4740
77351055 4741 peers_ch = kmalloc(pi.size, GFP_NOIO);
b411b363 4742 if (peers_ch == NULL) {
13e6037d 4743 conn_err(tconn, "kmalloc of peers_ch failed\n");
b10d96cb 4744 rv = -1;
b411b363
PR
4745 goto fail;
4746 }
4747
a5c31904
AG
4748 err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4749 if (err) {
b411b363
PR
4750 rv = 0;
4751 goto fail;
4752 }
4753
13e6037d 4754 resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
b411b363
PR
4755 response = kmalloc(resp_size, GFP_NOIO);
4756 if (response == NULL) {
13e6037d 4757 conn_err(tconn, "kmalloc of response failed\n");
b10d96cb 4758 rv = -1;
b411b363
PR
4759 goto fail;
4760 }
4761
4762 sg_init_table(&sg, 1);
77351055 4763 sg_set_buf(&sg, peers_ch, pi.size);
b411b363
PR
4764
4765 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4766 if (rv) {
13e6037d 4767 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4768 rv = -1;
b411b363
PR
4769 goto fail;
4770 }
4771
9f5bdc33
AG
4772 if (!conn_prepare_command(tconn, sock)) {
4773 rv = 0;
b411b363 4774 goto fail;
9f5bdc33 4775 }
e658983a 4776 rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
9f5bdc33 4777 response, resp_size);
b411b363
PR
4778 if (!rv)
4779 goto fail;
4780
69bc7bc3
AG
4781 err = drbd_recv_header(tconn, &pi);
4782 if (err) {
b411b363
PR
4783 rv = 0;
4784 goto fail;
4785 }
4786
77351055 4787 if (pi.cmd != P_AUTH_RESPONSE) {
13e6037d 4788 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
2fcb8f30 4789 cmdname(pi.cmd), pi.cmd);
b411b363
PR
4790 rv = 0;
4791 goto fail;
4792 }
4793
77351055 4794 if (pi.size != resp_size) {
13e6037d 4795 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
b411b363
PR
4796 rv = 0;
4797 goto fail;
4798 }
b411b363 4799
a5c31904
AG
4800 err = drbd_recv_all_warn(tconn, response , resp_size);
4801 if (err) {
b411b363
PR
4802 rv = 0;
4803 goto fail;
4804 }
4805
4806 right_response = kmalloc(resp_size, GFP_NOIO);
2d1ee87d 4807 if (right_response == NULL) {
13e6037d 4808 conn_err(tconn, "kmalloc of right_response failed\n");
b10d96cb 4809 rv = -1;
b411b363
PR
4810 goto fail;
4811 }
4812
4813 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4814
4815 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4816 if (rv) {
13e6037d 4817 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4818 rv = -1;
b411b363
PR
4819 goto fail;
4820 }
4821
4822 rv = !memcmp(response, right_response, resp_size);
4823
4824 if (rv)
44ed167d
PR
4825 conn_info(tconn, "Peer authenticated using %d bytes HMAC\n",
4826 resp_size);
b10d96cb
JT
4827 else
4828 rv = -1;
b411b363
PR
4829
4830 fail:
4831 kfree(peers_ch);
4832 kfree(response);
4833 kfree(right_response);
4834
4835 return rv;
4836}
4837#endif
4838
4839int drbdd_init(struct drbd_thread *thi)
4840{
392c8801 4841 struct drbd_tconn *tconn = thi->tconn;
b411b363
PR
4842 int h;
4843
4d641dd7 4844 conn_info(tconn, "receiver (re)started\n");
b411b363
PR
4845
4846 do {
81fa2e67 4847 h = conn_connect(tconn);
b411b363 4848 if (h == 0) {
81fa2e67 4849 conn_disconnect(tconn);
20ee6390 4850 schedule_timeout_interruptible(HZ);
b411b363
PR
4851 }
4852 if (h == -1) {
4d641dd7 4853 conn_warn(tconn, "Discarding network configuration.\n");
bbeb641c 4854 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363
PR
4855 }
4856 } while (h == 0);
4857
91fd4dad
PR
4858 if (h > 0)
4859 drbdd(tconn);
b411b363 4860
81fa2e67 4861 conn_disconnect(tconn);
b411b363 4862
4d641dd7 4863 conn_info(tconn, "receiver terminated\n");
b411b363
PR
4864 return 0;
4865}
4866
4867/* ********* acknowledge sender ******** */
4868
e05e1e59 4869static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4870{
e658983a 4871 struct p_req_state_reply *p = pi->data;
e4f78ede
PR
4872 int retcode = be32_to_cpu(p->retcode);
4873
4874 if (retcode >= SS_SUCCESS) {
4875 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4876 } else {
4877 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4878 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4879 drbd_set_st_err_str(retcode), retcode);
4880 }
4881 wake_up(&tconn->ping_wait);
4882
2735a594 4883 return 0;
e4f78ede 4884}
b411b363 4885
1952e916 4886static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4887{
b30ab791 4888 struct drbd_device *device;
e658983a 4889 struct p_req_state_reply *p = pi->data;
b411b363
PR
4890 int retcode = be32_to_cpu(p->retcode);
4891
b30ab791
AG
4892 device = vnr_to_device(tconn, pi->vnr);
4893 if (!device)
2735a594 4894 return -EIO;
1952e916 4895
4d0fc3fd
PR
4896 if (test_bit(CONN_WD_ST_CHG_REQ, &tconn->flags)) {
4897 D_ASSERT(tconn->agreed_pro_version < 100);
4898 return got_conn_RqSReply(tconn, pi);
4899 }
4900
b411b363 4901 if (retcode >= SS_SUCCESS) {
b30ab791 4902 set_bit(CL_ST_CHG_SUCCESS, &device->flags);
b411b363 4903 } else {
b30ab791 4904 set_bit(CL_ST_CHG_FAIL, &device->flags);
b411b363 4905 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
e4f78ede 4906 drbd_set_st_err_str(retcode), retcode);
b411b363 4907 }
b30ab791 4908 wake_up(&device->state_wait);
b411b363 4909
2735a594 4910 return 0;
b411b363
PR
4911}
4912
e05e1e59 4913static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4914{
2735a594 4915 return drbd_send_ping_ack(tconn);
b411b363
PR
4916
4917}
4918
e05e1e59 4919static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363
PR
4920{
4921 /* restore idle timeout */
2a67d8b9
PR
4922 tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4923 if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4924 wake_up(&tconn->ping_wait);
b411b363 4925
2735a594 4926 return 0;
b411b363
PR
4927}
4928
1952e916 4929static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4930{
b30ab791 4931 struct drbd_device *device;
e658983a 4932 struct p_block_ack *p = pi->data;
b411b363
PR
4933 sector_t sector = be64_to_cpu(p->sector);
4934 int blksize = be32_to_cpu(p->blksize);
4935
b30ab791
AG
4936 device = vnr_to_device(tconn, pi->vnr);
4937 if (!device)
2735a594 4938 return -EIO;
1952e916 4939
b30ab791 4940 D_ASSERT(device->tconn->agreed_pro_version >= 89);
b411b363 4941
b30ab791 4942 update_peer_seq(device, be32_to_cpu(p->seq_num));
b411b363 4943
b30ab791
AG
4944 if (get_ldev(device)) {
4945 drbd_rs_complete_io(device, sector);
4946 drbd_set_in_sync(device, sector, blksize);
1d53f09e 4947 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
b30ab791
AG
4948 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4949 put_ldev(device);
1d53f09e 4950 }
b30ab791
AG
4951 dec_rs_pending(device);
4952 atomic_add(blksize >> 9, &device->rs_sect_in);
b411b363 4953
2735a594 4954 return 0;
b411b363
PR
4955}
4956
bc9c5c41 4957static int
b30ab791 4958validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
bc9c5c41
AG
4959 struct rb_root *root, const char *func,
4960 enum drbd_req_event what, bool missing_ok)
b411b363
PR
4961{
4962 struct drbd_request *req;
4963 struct bio_and_error m;
4964
b30ab791
AG
4965 spin_lock_irq(&device->tconn->req_lock);
4966 req = find_request(device, root, id, sector, missing_ok, func);
b411b363 4967 if (unlikely(!req)) {
b30ab791 4968 spin_unlock_irq(&device->tconn->req_lock);
85997675 4969 return -EIO;
b411b363
PR
4970 }
4971 __req_mod(req, what, &m);
b30ab791 4972 spin_unlock_irq(&device->tconn->req_lock);
b411b363
PR
4973
4974 if (m.bio)
b30ab791 4975 complete_master_bio(device, &m);
85997675 4976 return 0;
b411b363
PR
4977}
4978
1952e916 4979static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 4980{
b30ab791 4981 struct drbd_device *device;
e658983a 4982 struct p_block_ack *p = pi->data;
b411b363
PR
4983 sector_t sector = be64_to_cpu(p->sector);
4984 int blksize = be32_to_cpu(p->blksize);
4985 enum drbd_req_event what;
4986
b30ab791
AG
4987 device = vnr_to_device(tconn, pi->vnr);
4988 if (!device)
2735a594 4989 return -EIO;
1952e916 4990
b30ab791 4991 update_peer_seq(device, be32_to_cpu(p->seq_num));
b411b363 4992
579b57ed 4993 if (p->block_id == ID_SYNCER) {
b30ab791
AG
4994 drbd_set_in_sync(device, sector, blksize);
4995 dec_rs_pending(device);
2735a594 4996 return 0;
b411b363 4997 }
e05e1e59 4998 switch (pi->cmd) {
b411b363 4999 case P_RS_WRITE_ACK:
8554df1c 5000 what = WRITE_ACKED_BY_PEER_AND_SIS;
b411b363
PR
5001 break;
5002 case P_WRITE_ACK:
8554df1c 5003 what = WRITE_ACKED_BY_PEER;
b411b363
PR
5004 break;
5005 case P_RECV_ACK:
8554df1c 5006 what = RECV_ACKED_BY_PEER;
b411b363 5007 break;
d4dabbe2
LE
5008 case P_SUPERSEDED:
5009 what = CONFLICT_RESOLVED;
b411b363 5010 break;
7be8da07 5011 case P_RETRY_WRITE:
7be8da07 5012 what = POSTPONE_WRITE;
b411b363
PR
5013 break;
5014 default:
2735a594 5015 BUG();
b411b363
PR
5016 }
5017
b30ab791
AG
5018 return validate_req_change_req_state(device, p->block_id, sector,
5019 &device->write_requests, __func__,
2735a594 5020 what, false);
b411b363
PR
5021}
5022
1952e916 5023static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 5024{
b30ab791 5025 struct drbd_device *device;
e658983a 5026 struct p_block_ack *p = pi->data;
b411b363 5027 sector_t sector = be64_to_cpu(p->sector);
2deb8336 5028 int size = be32_to_cpu(p->blksize);
85997675 5029 int err;
b411b363 5030
b30ab791
AG
5031 device = vnr_to_device(tconn, pi->vnr);
5032 if (!device)
2735a594 5033 return -EIO;
b411b363 5034
b30ab791 5035 update_peer_seq(device, be32_to_cpu(p->seq_num));
b411b363 5036
579b57ed 5037 if (p->block_id == ID_SYNCER) {
b30ab791
AG
5038 dec_rs_pending(device);
5039 drbd_rs_failed_io(device, sector, size);
2735a594 5040 return 0;
b411b363 5041 }
2deb8336 5042
b30ab791
AG
5043 err = validate_req_change_req_state(device, p->block_id, sector,
5044 &device->write_requests, __func__,
303d1448 5045 NEG_ACKED, true);
85997675 5046 if (err) {
c3afd8f5
AG
5047 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5048 The master bio might already be completed, therefore the
5049 request is no longer in the collision hash. */
5050 /* In Protocol B we might already have got a P_RECV_ACK
5051 but then get a P_NEG_ACK afterwards. */
b30ab791 5052 drbd_set_out_of_sync(device, sector, size);
2deb8336 5053 }
2735a594 5054 return 0;
b411b363
PR
5055}
5056
1952e916 5057static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 5058{
b30ab791 5059 struct drbd_device *device;
e658983a 5060 struct p_block_ack *p = pi->data;
b411b363
PR
5061 sector_t sector = be64_to_cpu(p->sector);
5062
b30ab791
AG
5063 device = vnr_to_device(tconn, pi->vnr);
5064 if (!device)
2735a594 5065 return -EIO;
1952e916 5066
b30ab791 5067 update_peer_seq(device, be32_to_cpu(p->seq_num));
7be8da07 5068
380207d0 5069 dev_err(DEV, "Got NegDReply; Sector %llus, len %u.\n",
b411b363
PR
5070 (unsigned long long)sector, be32_to_cpu(p->blksize));
5071
b30ab791
AG
5072 return validate_req_change_req_state(device, p->block_id, sector,
5073 &device->read_requests, __func__,
2735a594 5074 NEG_ACKED, false);
b411b363
PR
5075}
5076
1952e916 5077static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 5078{
b30ab791 5079 struct drbd_device *device;
b411b363
PR
5080 sector_t sector;
5081 int size;
e658983a 5082 struct p_block_ack *p = pi->data;
1952e916 5083
b30ab791
AG
5084 device = vnr_to_device(tconn, pi->vnr);
5085 if (!device)
2735a594 5086 return -EIO;
b411b363
PR
5087
5088 sector = be64_to_cpu(p->sector);
5089 size = be32_to_cpu(p->blksize);
b411b363 5090
b30ab791 5091 update_peer_seq(device, be32_to_cpu(p->seq_num));
b411b363 5092
b30ab791 5093 dec_rs_pending(device);
b411b363 5094
b30ab791
AG
5095 if (get_ldev_if_state(device, D_FAILED)) {
5096 drbd_rs_complete_io(device, sector);
e05e1e59 5097 switch (pi->cmd) {
d612d309 5098 case P_NEG_RS_DREPLY:
b30ab791 5099 drbd_rs_failed_io(device, sector, size);
d612d309
PR
5100 case P_RS_CANCEL:
5101 break;
5102 default:
2735a594 5103 BUG();
d612d309 5104 }
b30ab791 5105 put_ldev(device);
b411b363
PR
5106 }
5107
2735a594 5108 return 0;
b411b363
PR
5109}
5110
1952e916 5111static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 5112{
e658983a 5113 struct p_barrier_ack *p = pi->data;
b30ab791 5114 struct drbd_device *device;
9ed57dcb 5115 int vnr;
1952e916 5116
9ed57dcb 5117 tl_release(tconn, p->barrier, be32_to_cpu(p->set_size));
b411b363 5118
9ed57dcb 5119 rcu_read_lock();
b30ab791
AG
5120 idr_for_each_entry(&tconn->volumes, device, vnr) {
5121 if (device->state.conn == C_AHEAD &&
5122 atomic_read(&device->ap_in_flight) == 0 &&
5123 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5124 device->start_resync_timer.expires = jiffies + HZ;
5125 add_timer(&device->start_resync_timer);
9ed57dcb 5126 }
c4752ef1 5127 }
9ed57dcb 5128 rcu_read_unlock();
c4752ef1 5129
2735a594 5130 return 0;
b411b363
PR
5131}
5132
1952e916 5133static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
b411b363 5134{
b30ab791 5135 struct drbd_device *device;
e658983a 5136 struct p_block_ack *p = pi->data;
b411b363
PR
5137 struct drbd_work *w;
5138 sector_t sector;
5139 int size;
5140
b30ab791
AG
5141 device = vnr_to_device(tconn, pi->vnr);
5142 if (!device)
2735a594 5143 return -EIO;
1952e916 5144
b411b363
PR
5145 sector = be64_to_cpu(p->sector);
5146 size = be32_to_cpu(p->blksize);
5147
b30ab791 5148 update_peer_seq(device, be32_to_cpu(p->seq_num));
b411b363
PR
5149
5150 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
b30ab791 5151 drbd_ov_out_of_sync_found(device, sector, size);
b411b363 5152 else
b30ab791 5153 ov_out_of_sync_print(device);
b411b363 5154
b30ab791 5155 if (!get_ldev(device))
2735a594 5156 return 0;
1d53f09e 5157
b30ab791
AG
5158 drbd_rs_complete_io(device, sector);
5159 dec_rs_pending(device);
b411b363 5160
b30ab791 5161 --device->ov_left;
ea5442af
LE
5162
5163 /* let's advance progress step marks only for every other megabyte */
b30ab791
AG
5164 if ((device->ov_left & 0x200) == 0x200)
5165 drbd_advance_rs_marks(device, device->ov_left);
ea5442af 5166
b30ab791 5167 if (device->ov_left == 0) {
b411b363
PR
5168 w = kmalloc(sizeof(*w), GFP_NOIO);
5169 if (w) {
5170 w->cb = w_ov_finished;
b30ab791
AG
5171 w->device = device;
5172 drbd_queue_work(&device->tconn->sender_work, w);
b411b363
PR
5173 } else {
5174 dev_err(DEV, "kmalloc(w) failed.");
b30ab791
AG
5175 ov_out_of_sync_print(device);
5176 drbd_resync_finished(device);
b411b363
PR
5177 }
5178 }
b30ab791 5179 put_ldev(device);
2735a594 5180 return 0;
b411b363
PR
5181}
5182
1952e916 5183static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
0ced55a3 5184{
2735a594 5185 return 0;
b411b363
PR
5186}
5187
a990be46 5188static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
0ced55a3 5189{
b30ab791 5190 struct drbd_device *device;
c141ebda 5191 int vnr, not_empty = 0;
32862ec7
PR
5192
5193 do {
5194 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5195 flush_signals(current);
c141ebda
PR
5196
5197 rcu_read_lock();
b30ab791
AG
5198 idr_for_each_entry(&tconn->volumes, device, vnr) {
5199 kref_get(&device->kref);
c141ebda 5200 rcu_read_unlock();
b30ab791
AG
5201 if (drbd_finish_peer_reqs(device)) {
5202 kref_put(&device->kref, &drbd_minor_destroy);
c141ebda 5203 return 1;
d3fcb490 5204 }
b30ab791 5205 kref_put(&device->kref, &drbd_minor_destroy);
c141ebda 5206 rcu_read_lock();
082a3439 5207 }
32862ec7 5208 set_bit(SIGNAL_ASENDER, &tconn->flags);
082a3439
PR
5209
5210 spin_lock_irq(&tconn->req_lock);
b30ab791
AG
5211 idr_for_each_entry(&tconn->volumes, device, vnr) {
5212 not_empty = !list_empty(&device->done_ee);
082a3439
PR
5213 if (not_empty)
5214 break;
5215 }
5216 spin_unlock_irq(&tconn->req_lock);
c141ebda 5217 rcu_read_unlock();
32862ec7
PR
5218 } while (not_empty);
5219
5220 return 0;
0ced55a3
PR
5221}
5222
b411b363
PR
5223struct asender_cmd {
5224 size_t pkt_size;
1952e916 5225 int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
b411b363
PR
5226};
5227
7201b972 5228static struct asender_cmd asender_tbl[] = {
e658983a
AG
5229 [P_PING] = { 0, got_Ping },
5230 [P_PING_ACK] = { 0, got_PingAck },
b411b363
PR
5231 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5232 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5233 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
d4dabbe2 5234 [P_SUPERSEDED] = { sizeof(struct p_block_ack), got_BlockAck },
b411b363
PR
5235 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
5236 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
1952e916 5237 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply },
b411b363
PR
5238 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
5239 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
5240 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5241 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
02918be2 5242 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
1952e916
AG
5243 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply },
5244 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5245 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), got_BlockAck },
7201b972 5246};
b411b363
PR
5247
5248int drbd_asender(struct drbd_thread *thi)
5249{
392c8801 5250 struct drbd_tconn *tconn = thi->tconn;
b411b363 5251 struct asender_cmd *cmd = NULL;
77351055 5252 struct packet_info pi;
257d0af6 5253 int rv;
e658983a 5254 void *buf = tconn->meta.rbuf;
b411b363 5255 int received = 0;
52b061a4
AG
5256 unsigned int header_size = drbd_header_size(tconn);
5257 int expect = header_size;
44ed167d
PR
5258 bool ping_timeout_active = false;
5259 struct net_conf *nc;
bb77d34e 5260 int ping_timeo, tcp_cork, ping_int;
3990e04d 5261 struct sched_param param = { .sched_priority = 2 };
b411b363 5262
3990e04d
PR
5263 rv = sched_setscheduler(current, SCHED_RR, &param);
5264 if (rv < 0)
5265 conn_err(tconn, "drbd_asender: ERROR set priority, ret=%d\n", rv);
b411b363 5266
e77a0a5c 5267 while (get_t_state(thi) == RUNNING) {
80822284 5268 drbd_thread_current_set_cpu(thi);
b411b363 5269
44ed167d
PR
5270 rcu_read_lock();
5271 nc = rcu_dereference(tconn->net_conf);
5272 ping_timeo = nc->ping_timeo;
bb77d34e 5273 tcp_cork = nc->tcp_cork;
44ed167d
PR
5274 ping_int = nc->ping_int;
5275 rcu_read_unlock();
5276
32862ec7 5277 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
a17647aa 5278 if (drbd_send_ping(tconn)) {
32862ec7 5279 conn_err(tconn, "drbd_send_ping has failed\n");
b411b363 5280 goto reconnect;
841ce241 5281 }
44ed167d
PR
5282 tconn->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5283 ping_timeout_active = true;
b411b363
PR
5284 }
5285
32862ec7
PR
5286 /* TODO: conditionally cork; it may hurt latency if we cork without
5287 much to send */
bb77d34e 5288 if (tcp_cork)
32862ec7 5289 drbd_tcp_cork(tconn->meta.socket);
a990be46
AG
5290 if (tconn_finish_peer_reqs(tconn)) {
5291 conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
32862ec7 5292 goto reconnect;
b411b363
PR
5293 }
5294 /* but unconditionally uncork unless disabled */
bb77d34e 5295 if (tcp_cork)
32862ec7 5296 drbd_tcp_uncork(tconn->meta.socket);
b411b363
PR
5297
5298 /* short circuit, recv_msg would return EINTR anyways. */
5299 if (signal_pending(current))
5300 continue;
5301
32862ec7
PR
5302 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
5303 clear_bit(SIGNAL_ASENDER, &tconn->flags);
b411b363
PR
5304
5305 flush_signals(current);
5306
5307 /* Note:
5308 * -EINTR (on meta) we got a signal
5309 * -EAGAIN (on meta) rcvtimeo expired
5310 * -ECONNRESET other side closed the connection
5311 * -ERESTARTSYS (on data) we got a signal
5312 * rv < 0 other than above: unexpected error!
5313 * rv == expected: full header or command
5314 * rv < expected: "woken" by signal during receive
5315 * rv == 0 : "connection shut down by peer"
5316 */
5317 if (likely(rv > 0)) {
5318 received += rv;
5319 buf += rv;
5320 } else if (rv == 0) {
b66623e3
PR
5321 if (test_bit(DISCONNECT_SENT, &tconn->flags)) {
5322 long t;
5323 rcu_read_lock();
5324 t = rcu_dereference(tconn->net_conf)->ping_timeo * HZ/10;
5325 rcu_read_unlock();
5326
5327 t = wait_event_timeout(tconn->ping_wait,
5328 tconn->cstate < C_WF_REPORT_PARAMS,
5329 t);
599377ac
PR
5330 if (t)
5331 break;
5332 }
32862ec7 5333 conn_err(tconn, "meta connection shut down by peer.\n");
b411b363
PR
5334 goto reconnect;
5335 } else if (rv == -EAGAIN) {
cb6518cb
LE
5336 /* If the data socket received something meanwhile,
5337 * that is good enough: peer is still alive. */
32862ec7
PR
5338 if (time_after(tconn->last_received,
5339 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
cb6518cb 5340 continue;
f36af18c 5341 if (ping_timeout_active) {
32862ec7 5342 conn_err(tconn, "PingAck did not arrive in time.\n");
b411b363
PR
5343 goto reconnect;
5344 }
32862ec7 5345 set_bit(SEND_PING, &tconn->flags);
b411b363
PR
5346 continue;
5347 } else if (rv == -EINTR) {
5348 continue;
5349 } else {
32862ec7 5350 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
b411b363
PR
5351 goto reconnect;
5352 }
5353
5354 if (received == expect && cmd == NULL) {
e658983a 5355 if (decode_header(tconn, tconn->meta.rbuf, &pi))
b411b363 5356 goto reconnect;
7201b972 5357 cmd = &asender_tbl[pi.cmd];
1952e916 5358 if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
2fcb8f30
AG
5359 conn_err(tconn, "Unexpected meta packet %s (0x%04x)\n",
5360 cmdname(pi.cmd), pi.cmd);
b411b363
PR
5361 goto disconnect;
5362 }
e658983a 5363 expect = header_size + cmd->pkt_size;
52b061a4 5364 if (pi.size != expect - header_size) {
32862ec7 5365 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
77351055 5366 pi.cmd, pi.size);
b411b363 5367 goto reconnect;
257d0af6 5368 }
b411b363
PR
5369 }
5370 if (received == expect) {
2735a594 5371 bool err;
a4fbda8e 5372
2735a594
AG
5373 err = cmd->fn(tconn, &pi);
5374 if (err) {
1952e916 5375 conn_err(tconn, "%pf failed\n", cmd->fn);
b411b363 5376 goto reconnect;
1952e916 5377 }
b411b363 5378
a4fbda8e 5379 tconn->last_received = jiffies;
f36af18c 5380
44ed167d
PR
5381 if (cmd == &asender_tbl[P_PING_ACK]) {
5382 /* restore idle timeout */
5383 tconn->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5384 ping_timeout_active = false;
5385 }
f36af18c 5386
e658983a 5387 buf = tconn->meta.rbuf;
b411b363 5388 received = 0;
52b061a4 5389 expect = header_size;
b411b363
PR
5390 cmd = NULL;
5391 }
5392 }
5393
5394 if (0) {
5395reconnect:
bbeb641c 5396 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
19fffd7b 5397 conn_md_sync(tconn);
b411b363
PR
5398 }
5399 if (0) {
5400disconnect:
bbeb641c 5401 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
b411b363 5402 }
32862ec7 5403 clear_bit(SIGNAL_ASENDER, &tconn->flags);
b411b363 5404
32862ec7 5405 conn_info(tconn, "asender terminated\n");
b411b363
PR
5406
5407 return 0;
5408}
This page took 0.702479 seconds and 5 git commands to generate.