Merge git://git.kernel.org/pub/scm/linux/kernel/git/davem/net
[deliverable/linux.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2 drbd_receiver.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_protocol.h"
48 #include "drbd_req.h"
49 #include "drbd_vli.h"
50
51 #define PRO_FEATURES (FF_TRIM)
52
53 struct packet_info {
54 enum drbd_packet cmd;
55 unsigned int size;
56 unsigned int vnr;
57 void *data;
58 };
59
60 enum finish_epoch {
61 FE_STILL_LIVE,
62 FE_DESTROYED,
63 FE_RECYCLED,
64 };
65
66 static int drbd_do_features(struct drbd_connection *connection);
67 static int drbd_do_auth(struct drbd_connection *connection);
68 static int drbd_disconnected(struct drbd_peer_device *);
69 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
70 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
71 static int e_end_block(struct drbd_work *, int);
72
73
74 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
75
76 /*
77 * some helper functions to deal with single linked page lists,
78 * page->private being our "next" pointer.
79 */
80
81 /* If at least n pages are linked at head, get n pages off.
82 * Otherwise, don't modify head, and return NULL.
83 * Locking is the responsibility of the caller.
84 */
85 static struct page *page_chain_del(struct page **head, int n)
86 {
87 struct page *page;
88 struct page *tmp;
89
90 BUG_ON(!n);
91 BUG_ON(!head);
92
93 page = *head;
94
95 if (!page)
96 return NULL;
97
98 while (page) {
99 tmp = page_chain_next(page);
100 if (--n == 0)
101 break; /* found sufficient pages */
102 if (tmp == NULL)
103 /* insufficient pages, don't use any of them. */
104 return NULL;
105 page = tmp;
106 }
107
108 /* add end of list marker for the returned list */
109 set_page_private(page, 0);
110 /* actual return value, and adjustment of head */
111 page = *head;
112 *head = tmp;
113 return page;
114 }
115
116 /* may be used outside of locks to find the tail of a (usually short)
117 * "private" page chain, before adding it back to a global chain head
118 * with page_chain_add() under a spinlock. */
119 static struct page *page_chain_tail(struct page *page, int *len)
120 {
121 struct page *tmp;
122 int i = 1;
123 while ((tmp = page_chain_next(page)))
124 ++i, page = tmp;
125 if (len)
126 *len = i;
127 return page;
128 }
129
130 static int page_chain_free(struct page *page)
131 {
132 struct page *tmp;
133 int i = 0;
134 page_chain_for_each_safe(page, tmp) {
135 put_page(page);
136 ++i;
137 }
138 return i;
139 }
140
141 static void page_chain_add(struct page **head,
142 struct page *chain_first, struct page *chain_last)
143 {
144 #if 1
145 struct page *tmp;
146 tmp = page_chain_tail(chain_first, NULL);
147 BUG_ON(tmp != chain_last);
148 #endif
149
150 /* add chain to head */
151 set_page_private(chain_last, (unsigned long)*head);
152 *head = chain_first;
153 }
154
155 static struct page *__drbd_alloc_pages(struct drbd_device *device,
156 unsigned int number)
157 {
158 struct page *page = NULL;
159 struct page *tmp = NULL;
160 unsigned int i = 0;
161
162 /* Yes, testing drbd_pp_vacant outside the lock is racy.
163 * So what. It saves a spin_lock. */
164 if (drbd_pp_vacant >= number) {
165 spin_lock(&drbd_pp_lock);
166 page = page_chain_del(&drbd_pp_pool, number);
167 if (page)
168 drbd_pp_vacant -= number;
169 spin_unlock(&drbd_pp_lock);
170 if (page)
171 return page;
172 }
173
174 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
175 * "criss-cross" setup, that might cause write-out on some other DRBD,
176 * which in turn might block on the other node at this very place. */
177 for (i = 0; i < number; i++) {
178 tmp = alloc_page(GFP_TRY);
179 if (!tmp)
180 break;
181 set_page_private(tmp, (unsigned long)page);
182 page = tmp;
183 }
184
185 if (i == number)
186 return page;
187
188 /* Not enough pages immediately available this time.
189 * No need to jump around here, drbd_alloc_pages will retry this
190 * function "soon". */
191 if (page) {
192 tmp = page_chain_tail(page, NULL);
193 spin_lock(&drbd_pp_lock);
194 page_chain_add(&drbd_pp_pool, page, tmp);
195 drbd_pp_vacant += i;
196 spin_unlock(&drbd_pp_lock);
197 }
198 return NULL;
199 }
200
201 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
202 struct list_head *to_be_freed)
203 {
204 struct drbd_peer_request *peer_req, *tmp;
205
206 /* The EEs are always appended to the end of the list. Since
207 they are sent in order over the wire, they have to finish
208 in order. As soon as we see the first not finished we can
209 stop to examine the list... */
210
211 list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
212 if (drbd_peer_req_has_active_page(peer_req))
213 break;
214 list_move(&peer_req->w.list, to_be_freed);
215 }
216 }
217
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_device *device)
219 {
220 LIST_HEAD(reclaimed);
221 struct drbd_peer_request *peer_req, *t;
222
223 spin_lock_irq(&device->resource->req_lock);
224 reclaim_finished_net_peer_reqs(device, &reclaimed);
225 spin_unlock_irq(&device->resource->req_lock);
226
227 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228 drbd_free_net_peer_req(device, peer_req);
229 }
230
231 /**
232 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233 * @device: DRBD device.
234 * @number: number of pages requested
235 * @retry: whether to retry, if not enough pages are available right now
236 *
237 * Tries to allocate number pages, first from our own page pool, then from
238 * the kernel.
239 * Possibly retry until DRBD frees sufficient pages somewhere else.
240 *
241 * If this allocation would exceed the max_buffers setting, we throttle
242 * allocation (schedule_timeout) to give the system some room to breathe.
243 *
244 * We do not use max-buffers as hard limit, because it could lead to
245 * congestion and further to a distributed deadlock during online-verify or
246 * (checksum based) resync, if the max-buffers, socket buffer sizes and
247 * resync-rate settings are mis-configured.
248 *
249 * Returns a page chain linked via page->private.
250 */
251 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
252 bool retry)
253 {
254 struct drbd_device *device = peer_device->device;
255 struct page *page = NULL;
256 struct net_conf *nc;
257 DEFINE_WAIT(wait);
258 unsigned int mxb;
259
260 rcu_read_lock();
261 nc = rcu_dereference(peer_device->connection->net_conf);
262 mxb = nc ? nc->max_buffers : 1000000;
263 rcu_read_unlock();
264
265 if (atomic_read(&device->pp_in_use) < mxb)
266 page = __drbd_alloc_pages(device, number);
267
268 while (page == NULL) {
269 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
270
271 drbd_kick_lo_and_reclaim_net(device);
272
273 if (atomic_read(&device->pp_in_use) < mxb) {
274 page = __drbd_alloc_pages(device, number);
275 if (page)
276 break;
277 }
278
279 if (!retry)
280 break;
281
282 if (signal_pending(current)) {
283 drbd_warn(device, "drbd_alloc_pages interrupted!\n");
284 break;
285 }
286
287 if (schedule_timeout(HZ/10) == 0)
288 mxb = UINT_MAX;
289 }
290 finish_wait(&drbd_pp_wait, &wait);
291
292 if (page)
293 atomic_add(number, &device->pp_in_use);
294 return page;
295 }
296
297 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
298 * Is also used from inside an other spin_lock_irq(&resource->req_lock);
299 * Either links the page chain back to the global pool,
300 * or returns all pages to the system. */
301 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
302 {
303 atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
304 int i;
305
306 if (page == NULL)
307 return;
308
309 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
310 i = page_chain_free(page);
311 else {
312 struct page *tmp;
313 tmp = page_chain_tail(page, &i);
314 spin_lock(&drbd_pp_lock);
315 page_chain_add(&drbd_pp_pool, page, tmp);
316 drbd_pp_vacant += i;
317 spin_unlock(&drbd_pp_lock);
318 }
319 i = atomic_sub_return(i, a);
320 if (i < 0)
321 drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
322 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
323 wake_up(&drbd_pp_wait);
324 }
325
326 /*
327 You need to hold the req_lock:
328 _drbd_wait_ee_list_empty()
329
330 You must not have the req_lock:
331 drbd_free_peer_req()
332 drbd_alloc_peer_req()
333 drbd_free_peer_reqs()
334 drbd_ee_fix_bhs()
335 drbd_finish_peer_reqs()
336 drbd_clear_done_ee()
337 drbd_wait_ee_list_empty()
338 */
339
340 struct drbd_peer_request *
341 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
342 unsigned int data_size, bool has_payload, gfp_t gfp_mask) __must_hold(local)
343 {
344 struct drbd_device *device = peer_device->device;
345 struct drbd_peer_request *peer_req;
346 struct page *page = NULL;
347 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
348
349 if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
350 return NULL;
351
352 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
353 if (!peer_req) {
354 if (!(gfp_mask & __GFP_NOWARN))
355 drbd_err(device, "%s: allocation failed\n", __func__);
356 return NULL;
357 }
358
359 if (has_payload && data_size) {
360 page = drbd_alloc_pages(peer_device, nr_pages, (gfp_mask & __GFP_WAIT));
361 if (!page)
362 goto fail;
363 }
364
365 memset(peer_req, 0, sizeof(*peer_req));
366 INIT_LIST_HEAD(&peer_req->w.list);
367 drbd_clear_interval(&peer_req->i);
368 peer_req->i.size = data_size;
369 peer_req->i.sector = sector;
370 peer_req->submit_jif = jiffies;
371 peer_req->peer_device = peer_device;
372 peer_req->pages = page;
373 /*
374 * The block_id is opaque to the receiver. It is not endianness
375 * converted, and sent back to the sender unchanged.
376 */
377 peer_req->block_id = id;
378
379 return peer_req;
380
381 fail:
382 mempool_free(peer_req, drbd_ee_mempool);
383 return NULL;
384 }
385
386 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
387 int is_net)
388 {
389 might_sleep();
390 if (peer_req->flags & EE_HAS_DIGEST)
391 kfree(peer_req->digest);
392 drbd_free_pages(device, peer_req->pages, is_net);
393 D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
394 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
395 if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
396 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
397 drbd_al_complete_io(device, &peer_req->i);
398 }
399 mempool_free(peer_req, drbd_ee_mempool);
400 }
401
402 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
403 {
404 LIST_HEAD(work_list);
405 struct drbd_peer_request *peer_req, *t;
406 int count = 0;
407 int is_net = list == &device->net_ee;
408
409 spin_lock_irq(&device->resource->req_lock);
410 list_splice_init(list, &work_list);
411 spin_unlock_irq(&device->resource->req_lock);
412
413 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
414 __drbd_free_peer_req(device, peer_req, is_net);
415 count++;
416 }
417 return count;
418 }
419
420 /*
421 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
422 */
423 static int drbd_finish_peer_reqs(struct drbd_device *device)
424 {
425 LIST_HEAD(work_list);
426 LIST_HEAD(reclaimed);
427 struct drbd_peer_request *peer_req, *t;
428 int err = 0;
429
430 spin_lock_irq(&device->resource->req_lock);
431 reclaim_finished_net_peer_reqs(device, &reclaimed);
432 list_splice_init(&device->done_ee, &work_list);
433 spin_unlock_irq(&device->resource->req_lock);
434
435 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
436 drbd_free_net_peer_req(device, peer_req);
437
438 /* possible callbacks here:
439 * e_end_block, and e_end_resync_block, e_send_superseded.
440 * all ignore the last argument.
441 */
442 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
443 int err2;
444
445 /* list_del not necessary, next/prev members not touched */
446 err2 = peer_req->w.cb(&peer_req->w, !!err);
447 if (!err)
448 err = err2;
449 drbd_free_peer_req(device, peer_req);
450 }
451 wake_up(&device->ee_wait);
452
453 return err;
454 }
455
456 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
457 struct list_head *head)
458 {
459 DEFINE_WAIT(wait);
460
461 /* avoids spin_lock/unlock
462 * and calling prepare_to_wait in the fast path */
463 while (!list_empty(head)) {
464 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
465 spin_unlock_irq(&device->resource->req_lock);
466 io_schedule();
467 finish_wait(&device->ee_wait, &wait);
468 spin_lock_irq(&device->resource->req_lock);
469 }
470 }
471
472 static void drbd_wait_ee_list_empty(struct drbd_device *device,
473 struct list_head *head)
474 {
475 spin_lock_irq(&device->resource->req_lock);
476 _drbd_wait_ee_list_empty(device, head);
477 spin_unlock_irq(&device->resource->req_lock);
478 }
479
480 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
481 {
482 struct kvec iov = {
483 .iov_base = buf,
484 .iov_len = size,
485 };
486 struct msghdr msg = {
487 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
488 };
489 return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
490 }
491
492 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
493 {
494 int rv;
495
496 rv = drbd_recv_short(connection->data.socket, buf, size, 0);
497
498 if (rv < 0) {
499 if (rv == -ECONNRESET)
500 drbd_info(connection, "sock was reset by peer\n");
501 else if (rv != -ERESTARTSYS)
502 drbd_err(connection, "sock_recvmsg returned %d\n", rv);
503 } else if (rv == 0) {
504 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
505 long t;
506 rcu_read_lock();
507 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
508 rcu_read_unlock();
509
510 t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
511
512 if (t)
513 goto out;
514 }
515 drbd_info(connection, "sock was shut down by peer\n");
516 }
517
518 if (rv != size)
519 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
520
521 out:
522 return rv;
523 }
524
525 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
526 {
527 int err;
528
529 err = drbd_recv(connection, buf, size);
530 if (err != size) {
531 if (err >= 0)
532 err = -EIO;
533 } else
534 err = 0;
535 return err;
536 }
537
538 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
539 {
540 int err;
541
542 err = drbd_recv_all(connection, buf, size);
543 if (err && !signal_pending(current))
544 drbd_warn(connection, "short read (expected size %d)\n", (int)size);
545 return err;
546 }
547
548 /* quoting tcp(7):
549 * On individual connections, the socket buffer size must be set prior to the
550 * listen(2) or connect(2) calls in order to have it take effect.
551 * This is our wrapper to do so.
552 */
553 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
554 unsigned int rcv)
555 {
556 /* open coded SO_SNDBUF, SO_RCVBUF */
557 if (snd) {
558 sock->sk->sk_sndbuf = snd;
559 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
560 }
561 if (rcv) {
562 sock->sk->sk_rcvbuf = rcv;
563 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
564 }
565 }
566
567 static struct socket *drbd_try_connect(struct drbd_connection *connection)
568 {
569 const char *what;
570 struct socket *sock;
571 struct sockaddr_in6 src_in6;
572 struct sockaddr_in6 peer_in6;
573 struct net_conf *nc;
574 int err, peer_addr_len, my_addr_len;
575 int sndbuf_size, rcvbuf_size, connect_int;
576 int disconnect_on_error = 1;
577
578 rcu_read_lock();
579 nc = rcu_dereference(connection->net_conf);
580 if (!nc) {
581 rcu_read_unlock();
582 return NULL;
583 }
584 sndbuf_size = nc->sndbuf_size;
585 rcvbuf_size = nc->rcvbuf_size;
586 connect_int = nc->connect_int;
587 rcu_read_unlock();
588
589 my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
590 memcpy(&src_in6, &connection->my_addr, my_addr_len);
591
592 if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
593 src_in6.sin6_port = 0;
594 else
595 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
596
597 peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
598 memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
599
600 what = "sock_create_kern";
601 err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
602 SOCK_STREAM, IPPROTO_TCP, &sock);
603 if (err < 0) {
604 sock = NULL;
605 goto out;
606 }
607
608 sock->sk->sk_rcvtimeo =
609 sock->sk->sk_sndtimeo = connect_int * HZ;
610 drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
611
612 /* explicitly bind to the configured IP as source IP
613 * for the outgoing connections.
614 * This is needed for multihomed hosts and to be
615 * able to use lo: interfaces for drbd.
616 * Make sure to use 0 as port number, so linux selects
617 * a free one dynamically.
618 */
619 what = "bind before connect";
620 err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
621 if (err < 0)
622 goto out;
623
624 /* connect may fail, peer not yet available.
625 * stay C_WF_CONNECTION, don't go Disconnecting! */
626 disconnect_on_error = 0;
627 what = "connect";
628 err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
629
630 out:
631 if (err < 0) {
632 if (sock) {
633 sock_release(sock);
634 sock = NULL;
635 }
636 switch (-err) {
637 /* timeout, busy, signal pending */
638 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
639 case EINTR: case ERESTARTSYS:
640 /* peer not (yet) available, network problem */
641 case ECONNREFUSED: case ENETUNREACH:
642 case EHOSTDOWN: case EHOSTUNREACH:
643 disconnect_on_error = 0;
644 break;
645 default:
646 drbd_err(connection, "%s failed, err = %d\n", what, err);
647 }
648 if (disconnect_on_error)
649 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
650 }
651
652 return sock;
653 }
654
655 struct accept_wait_data {
656 struct drbd_connection *connection;
657 struct socket *s_listen;
658 struct completion door_bell;
659 void (*original_sk_state_change)(struct sock *sk);
660
661 };
662
663 static void drbd_incoming_connection(struct sock *sk)
664 {
665 struct accept_wait_data *ad = sk->sk_user_data;
666 void (*state_change)(struct sock *sk);
667
668 state_change = ad->original_sk_state_change;
669 if (sk->sk_state == TCP_ESTABLISHED)
670 complete(&ad->door_bell);
671 state_change(sk);
672 }
673
674 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
675 {
676 int err, sndbuf_size, rcvbuf_size, my_addr_len;
677 struct sockaddr_in6 my_addr;
678 struct socket *s_listen;
679 struct net_conf *nc;
680 const char *what;
681
682 rcu_read_lock();
683 nc = rcu_dereference(connection->net_conf);
684 if (!nc) {
685 rcu_read_unlock();
686 return -EIO;
687 }
688 sndbuf_size = nc->sndbuf_size;
689 rcvbuf_size = nc->rcvbuf_size;
690 rcu_read_unlock();
691
692 my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
693 memcpy(&my_addr, &connection->my_addr, my_addr_len);
694
695 what = "sock_create_kern";
696 err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
697 SOCK_STREAM, IPPROTO_TCP, &s_listen);
698 if (err) {
699 s_listen = NULL;
700 goto out;
701 }
702
703 s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
704 drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
705
706 what = "bind before listen";
707 err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
708 if (err < 0)
709 goto out;
710
711 ad->s_listen = s_listen;
712 write_lock_bh(&s_listen->sk->sk_callback_lock);
713 ad->original_sk_state_change = s_listen->sk->sk_state_change;
714 s_listen->sk->sk_state_change = drbd_incoming_connection;
715 s_listen->sk->sk_user_data = ad;
716 write_unlock_bh(&s_listen->sk->sk_callback_lock);
717
718 what = "listen";
719 err = s_listen->ops->listen(s_listen, 5);
720 if (err < 0)
721 goto out;
722
723 return 0;
724 out:
725 if (s_listen)
726 sock_release(s_listen);
727 if (err < 0) {
728 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
729 drbd_err(connection, "%s failed, err = %d\n", what, err);
730 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
731 }
732 }
733
734 return -EIO;
735 }
736
737 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
738 {
739 write_lock_bh(&sk->sk_callback_lock);
740 sk->sk_state_change = ad->original_sk_state_change;
741 sk->sk_user_data = NULL;
742 write_unlock_bh(&sk->sk_callback_lock);
743 }
744
745 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
746 {
747 int timeo, connect_int, err = 0;
748 struct socket *s_estab = NULL;
749 struct net_conf *nc;
750
751 rcu_read_lock();
752 nc = rcu_dereference(connection->net_conf);
753 if (!nc) {
754 rcu_read_unlock();
755 return NULL;
756 }
757 connect_int = nc->connect_int;
758 rcu_read_unlock();
759
760 timeo = connect_int * HZ;
761 /* 28.5% random jitter */
762 timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
763
764 err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
765 if (err <= 0)
766 return NULL;
767
768 err = kernel_accept(ad->s_listen, &s_estab, 0);
769 if (err < 0) {
770 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
771 drbd_err(connection, "accept failed, err = %d\n", err);
772 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
773 }
774 }
775
776 if (s_estab)
777 unregister_state_change(s_estab->sk, ad);
778
779 return s_estab;
780 }
781
782 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
783
784 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
785 enum drbd_packet cmd)
786 {
787 if (!conn_prepare_command(connection, sock))
788 return -EIO;
789 return conn_send_command(connection, sock, cmd, 0, NULL, 0);
790 }
791
792 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
793 {
794 unsigned int header_size = drbd_header_size(connection);
795 struct packet_info pi;
796 struct net_conf *nc;
797 int err;
798
799 rcu_read_lock();
800 nc = rcu_dereference(connection->net_conf);
801 if (!nc) {
802 rcu_read_unlock();
803 return -EIO;
804 }
805 sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
806 rcu_read_unlock();
807
808 err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
809 if (err != header_size) {
810 if (err >= 0)
811 err = -EIO;
812 return err;
813 }
814 err = decode_header(connection, connection->data.rbuf, &pi);
815 if (err)
816 return err;
817 return pi.cmd;
818 }
819
820 /**
821 * drbd_socket_okay() - Free the socket if its connection is not okay
822 * @sock: pointer to the pointer to the socket.
823 */
824 static bool drbd_socket_okay(struct socket **sock)
825 {
826 int rr;
827 char tb[4];
828
829 if (!*sock)
830 return false;
831
832 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
833
834 if (rr > 0 || rr == -EAGAIN) {
835 return true;
836 } else {
837 sock_release(*sock);
838 *sock = NULL;
839 return false;
840 }
841 }
842
843 static bool connection_established(struct drbd_connection *connection,
844 struct socket **sock1,
845 struct socket **sock2)
846 {
847 struct net_conf *nc;
848 int timeout;
849 bool ok;
850
851 if (!*sock1 || !*sock2)
852 return false;
853
854 rcu_read_lock();
855 nc = rcu_dereference(connection->net_conf);
856 timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
857 rcu_read_unlock();
858 schedule_timeout_interruptible(timeout);
859
860 ok = drbd_socket_okay(sock1);
861 ok = drbd_socket_okay(sock2) && ok;
862
863 return ok;
864 }
865
866 /* Gets called if a connection is established, or if a new minor gets created
867 in a connection */
868 int drbd_connected(struct drbd_peer_device *peer_device)
869 {
870 struct drbd_device *device = peer_device->device;
871 int err;
872
873 atomic_set(&device->packet_seq, 0);
874 device->peer_seq = 0;
875
876 device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
877 &peer_device->connection->cstate_mutex :
878 &device->own_state_mutex;
879
880 err = drbd_send_sync_param(peer_device);
881 if (!err)
882 err = drbd_send_sizes(peer_device, 0, 0);
883 if (!err)
884 err = drbd_send_uuids(peer_device);
885 if (!err)
886 err = drbd_send_current_state(peer_device);
887 clear_bit(USE_DEGR_WFC_T, &device->flags);
888 clear_bit(RESIZE_PENDING, &device->flags);
889 atomic_set(&device->ap_in_flight, 0);
890 mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
891 return err;
892 }
893
894 /*
895 * return values:
896 * 1 yes, we have a valid connection
897 * 0 oops, did not work out, please try again
898 * -1 peer talks different language,
899 * no point in trying again, please go standalone.
900 * -2 We do not have a network config...
901 */
902 static int conn_connect(struct drbd_connection *connection)
903 {
904 struct drbd_socket sock, msock;
905 struct drbd_peer_device *peer_device;
906 struct net_conf *nc;
907 int vnr, timeout, h;
908 bool discard_my_data, ok;
909 enum drbd_state_rv rv;
910 struct accept_wait_data ad = {
911 .connection = connection,
912 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
913 };
914
915 clear_bit(DISCONNECT_SENT, &connection->flags);
916 if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
917 return -2;
918
919 mutex_init(&sock.mutex);
920 sock.sbuf = connection->data.sbuf;
921 sock.rbuf = connection->data.rbuf;
922 sock.socket = NULL;
923 mutex_init(&msock.mutex);
924 msock.sbuf = connection->meta.sbuf;
925 msock.rbuf = connection->meta.rbuf;
926 msock.socket = NULL;
927
928 /* Assume that the peer only understands protocol 80 until we know better. */
929 connection->agreed_pro_version = 80;
930
931 if (prepare_listen_socket(connection, &ad))
932 return 0;
933
934 do {
935 struct socket *s;
936
937 s = drbd_try_connect(connection);
938 if (s) {
939 if (!sock.socket) {
940 sock.socket = s;
941 send_first_packet(connection, &sock, P_INITIAL_DATA);
942 } else if (!msock.socket) {
943 clear_bit(RESOLVE_CONFLICTS, &connection->flags);
944 msock.socket = s;
945 send_first_packet(connection, &msock, P_INITIAL_META);
946 } else {
947 drbd_err(connection, "Logic error in conn_connect()\n");
948 goto out_release_sockets;
949 }
950 }
951
952 if (connection_established(connection, &sock.socket, &msock.socket))
953 break;
954
955 retry:
956 s = drbd_wait_for_connect(connection, &ad);
957 if (s) {
958 int fp = receive_first_packet(connection, s);
959 drbd_socket_okay(&sock.socket);
960 drbd_socket_okay(&msock.socket);
961 switch (fp) {
962 case P_INITIAL_DATA:
963 if (sock.socket) {
964 drbd_warn(connection, "initial packet S crossed\n");
965 sock_release(sock.socket);
966 sock.socket = s;
967 goto randomize;
968 }
969 sock.socket = s;
970 break;
971 case P_INITIAL_META:
972 set_bit(RESOLVE_CONFLICTS, &connection->flags);
973 if (msock.socket) {
974 drbd_warn(connection, "initial packet M crossed\n");
975 sock_release(msock.socket);
976 msock.socket = s;
977 goto randomize;
978 }
979 msock.socket = s;
980 break;
981 default:
982 drbd_warn(connection, "Error receiving initial packet\n");
983 sock_release(s);
984 randomize:
985 if (prandom_u32() & 1)
986 goto retry;
987 }
988 }
989
990 if (connection->cstate <= C_DISCONNECTING)
991 goto out_release_sockets;
992 if (signal_pending(current)) {
993 flush_signals(current);
994 smp_rmb();
995 if (get_t_state(&connection->receiver) == EXITING)
996 goto out_release_sockets;
997 }
998
999 ok = connection_established(connection, &sock.socket, &msock.socket);
1000 } while (!ok);
1001
1002 if (ad.s_listen)
1003 sock_release(ad.s_listen);
1004
1005 sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1006 msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1007
1008 sock.socket->sk->sk_allocation = GFP_NOIO;
1009 msock.socket->sk->sk_allocation = GFP_NOIO;
1010
1011 sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1012 msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1013
1014 /* NOT YET ...
1015 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1016 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1017 * first set it to the P_CONNECTION_FEATURES timeout,
1018 * which we set to 4x the configured ping_timeout. */
1019 rcu_read_lock();
1020 nc = rcu_dereference(connection->net_conf);
1021
1022 sock.socket->sk->sk_sndtimeo =
1023 sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1024
1025 msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1026 timeout = nc->timeout * HZ / 10;
1027 discard_my_data = nc->discard_my_data;
1028 rcu_read_unlock();
1029
1030 msock.socket->sk->sk_sndtimeo = timeout;
1031
1032 /* we don't want delays.
1033 * we use TCP_CORK where appropriate, though */
1034 drbd_tcp_nodelay(sock.socket);
1035 drbd_tcp_nodelay(msock.socket);
1036
1037 connection->data.socket = sock.socket;
1038 connection->meta.socket = msock.socket;
1039 connection->last_received = jiffies;
1040
1041 h = drbd_do_features(connection);
1042 if (h <= 0)
1043 return h;
1044
1045 if (connection->cram_hmac_tfm) {
1046 /* drbd_request_state(device, NS(conn, WFAuth)); */
1047 switch (drbd_do_auth(connection)) {
1048 case -1:
1049 drbd_err(connection, "Authentication of peer failed\n");
1050 return -1;
1051 case 0:
1052 drbd_err(connection, "Authentication of peer failed, trying again.\n");
1053 return 0;
1054 }
1055 }
1056
1057 connection->data.socket->sk->sk_sndtimeo = timeout;
1058 connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1059
1060 if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1061 return -1;
1062
1063 /* Prevent a race between resync-handshake and
1064 * being promoted to Primary.
1065 *
1066 * Grab and release the state mutex, so we know that any current
1067 * drbd_set_role() is finished, and any incoming drbd_set_role
1068 * will see the STATE_SENT flag, and wait for it to be cleared.
1069 */
1070 idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1071 mutex_lock(peer_device->device->state_mutex);
1072
1073 set_bit(STATE_SENT, &connection->flags);
1074
1075 idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1076 mutex_unlock(peer_device->device->state_mutex);
1077
1078 rcu_read_lock();
1079 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1080 struct drbd_device *device = peer_device->device;
1081 kref_get(&device->kref);
1082 rcu_read_unlock();
1083
1084 if (discard_my_data)
1085 set_bit(DISCARD_MY_DATA, &device->flags);
1086 else
1087 clear_bit(DISCARD_MY_DATA, &device->flags);
1088
1089 drbd_connected(peer_device);
1090 kref_put(&device->kref, drbd_destroy_device);
1091 rcu_read_lock();
1092 }
1093 rcu_read_unlock();
1094
1095 rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1096 if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1097 clear_bit(STATE_SENT, &connection->flags);
1098 return 0;
1099 }
1100
1101 drbd_thread_start(&connection->asender);
1102
1103 mutex_lock(&connection->resource->conf_update);
1104 /* The discard_my_data flag is a single-shot modifier to the next
1105 * connection attempt, the handshake of which is now well underway.
1106 * No need for rcu style copying of the whole struct
1107 * just to clear a single value. */
1108 connection->net_conf->discard_my_data = 0;
1109 mutex_unlock(&connection->resource->conf_update);
1110
1111 return h;
1112
1113 out_release_sockets:
1114 if (ad.s_listen)
1115 sock_release(ad.s_listen);
1116 if (sock.socket)
1117 sock_release(sock.socket);
1118 if (msock.socket)
1119 sock_release(msock.socket);
1120 return -1;
1121 }
1122
1123 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1124 {
1125 unsigned int header_size = drbd_header_size(connection);
1126
1127 if (header_size == sizeof(struct p_header100) &&
1128 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1129 struct p_header100 *h = header;
1130 if (h->pad != 0) {
1131 drbd_err(connection, "Header padding is not zero\n");
1132 return -EINVAL;
1133 }
1134 pi->vnr = be16_to_cpu(h->volume);
1135 pi->cmd = be16_to_cpu(h->command);
1136 pi->size = be32_to_cpu(h->length);
1137 } else if (header_size == sizeof(struct p_header95) &&
1138 *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1139 struct p_header95 *h = header;
1140 pi->cmd = be16_to_cpu(h->command);
1141 pi->size = be32_to_cpu(h->length);
1142 pi->vnr = 0;
1143 } else if (header_size == sizeof(struct p_header80) &&
1144 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1145 struct p_header80 *h = header;
1146 pi->cmd = be16_to_cpu(h->command);
1147 pi->size = be16_to_cpu(h->length);
1148 pi->vnr = 0;
1149 } else {
1150 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1151 be32_to_cpu(*(__be32 *)header),
1152 connection->agreed_pro_version);
1153 return -EINVAL;
1154 }
1155 pi->data = header + header_size;
1156 return 0;
1157 }
1158
1159 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1160 {
1161 void *buffer = connection->data.rbuf;
1162 int err;
1163
1164 err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1165 if (err)
1166 return err;
1167
1168 err = decode_header(connection, buffer, pi);
1169 connection->last_received = jiffies;
1170
1171 return err;
1172 }
1173
1174 static void drbd_flush(struct drbd_connection *connection)
1175 {
1176 int rv;
1177 struct drbd_peer_device *peer_device;
1178 int vnr;
1179
1180 if (connection->resource->write_ordering >= WO_bdev_flush) {
1181 rcu_read_lock();
1182 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1183 struct drbd_device *device = peer_device->device;
1184
1185 if (!get_ldev(device))
1186 continue;
1187 kref_get(&device->kref);
1188 rcu_read_unlock();
1189
1190 /* Right now, we have only this one synchronous code path
1191 * for flushes between request epochs.
1192 * We may want to make those asynchronous,
1193 * or at least parallelize the flushes to the volume devices.
1194 */
1195 device->flush_jif = jiffies;
1196 set_bit(FLUSH_PENDING, &device->flags);
1197 rv = blkdev_issue_flush(device->ldev->backing_bdev,
1198 GFP_NOIO, NULL);
1199 clear_bit(FLUSH_PENDING, &device->flags);
1200 if (rv) {
1201 drbd_info(device, "local disk flush failed with status %d\n", rv);
1202 /* would rather check on EOPNOTSUPP, but that is not reliable.
1203 * don't try again for ANY return value != 0
1204 * if (rv == -EOPNOTSUPP) */
1205 drbd_bump_write_ordering(connection->resource, NULL, WO_drain_io);
1206 }
1207 put_ldev(device);
1208 kref_put(&device->kref, drbd_destroy_device);
1209
1210 rcu_read_lock();
1211 if (rv)
1212 break;
1213 }
1214 rcu_read_unlock();
1215 }
1216 }
1217
1218 /**
1219 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1220 * @device: DRBD device.
1221 * @epoch: Epoch object.
1222 * @ev: Epoch event.
1223 */
1224 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1225 struct drbd_epoch *epoch,
1226 enum epoch_event ev)
1227 {
1228 int epoch_size;
1229 struct drbd_epoch *next_epoch;
1230 enum finish_epoch rv = FE_STILL_LIVE;
1231
1232 spin_lock(&connection->epoch_lock);
1233 do {
1234 next_epoch = NULL;
1235
1236 epoch_size = atomic_read(&epoch->epoch_size);
1237
1238 switch (ev & ~EV_CLEANUP) {
1239 case EV_PUT:
1240 atomic_dec(&epoch->active);
1241 break;
1242 case EV_GOT_BARRIER_NR:
1243 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1244 break;
1245 case EV_BECAME_LAST:
1246 /* nothing to do*/
1247 break;
1248 }
1249
1250 if (epoch_size != 0 &&
1251 atomic_read(&epoch->active) == 0 &&
1252 (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1253 if (!(ev & EV_CLEANUP)) {
1254 spin_unlock(&connection->epoch_lock);
1255 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1256 spin_lock(&connection->epoch_lock);
1257 }
1258 #if 0
1259 /* FIXME: dec unacked on connection, once we have
1260 * something to count pending connection packets in. */
1261 if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1262 dec_unacked(epoch->connection);
1263 #endif
1264
1265 if (connection->current_epoch != epoch) {
1266 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1267 list_del(&epoch->list);
1268 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1269 connection->epochs--;
1270 kfree(epoch);
1271
1272 if (rv == FE_STILL_LIVE)
1273 rv = FE_DESTROYED;
1274 } else {
1275 epoch->flags = 0;
1276 atomic_set(&epoch->epoch_size, 0);
1277 /* atomic_set(&epoch->active, 0); is already zero */
1278 if (rv == FE_STILL_LIVE)
1279 rv = FE_RECYCLED;
1280 }
1281 }
1282
1283 if (!next_epoch)
1284 break;
1285
1286 epoch = next_epoch;
1287 } while (1);
1288
1289 spin_unlock(&connection->epoch_lock);
1290
1291 return rv;
1292 }
1293
1294 static enum write_ordering_e
1295 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1296 {
1297 struct disk_conf *dc;
1298
1299 dc = rcu_dereference(bdev->disk_conf);
1300
1301 if (wo == WO_bdev_flush && !dc->disk_flushes)
1302 wo = WO_drain_io;
1303 if (wo == WO_drain_io && !dc->disk_drain)
1304 wo = WO_none;
1305
1306 return wo;
1307 }
1308
1309 /**
1310 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1311 * @connection: DRBD connection.
1312 * @wo: Write ordering method to try.
1313 */
1314 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1315 enum write_ordering_e wo)
1316 {
1317 struct drbd_device *device;
1318 enum write_ordering_e pwo;
1319 int vnr;
1320 static char *write_ordering_str[] = {
1321 [WO_none] = "none",
1322 [WO_drain_io] = "drain",
1323 [WO_bdev_flush] = "flush",
1324 };
1325
1326 pwo = resource->write_ordering;
1327 if (wo != WO_bdev_flush)
1328 wo = min(pwo, wo);
1329 rcu_read_lock();
1330 idr_for_each_entry(&resource->devices, device, vnr) {
1331 if (get_ldev(device)) {
1332 wo = max_allowed_wo(device->ldev, wo);
1333 if (device->ldev == bdev)
1334 bdev = NULL;
1335 put_ldev(device);
1336 }
1337 }
1338
1339 if (bdev)
1340 wo = max_allowed_wo(bdev, wo);
1341
1342 rcu_read_unlock();
1343
1344 resource->write_ordering = wo;
1345 if (pwo != resource->write_ordering || wo == WO_bdev_flush)
1346 drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1347 }
1348
1349 /**
1350 * drbd_submit_peer_request()
1351 * @device: DRBD device.
1352 * @peer_req: peer request
1353 * @rw: flag field, see bio->bi_rw
1354 *
1355 * May spread the pages to multiple bios,
1356 * depending on bio_add_page restrictions.
1357 *
1358 * Returns 0 if all bios have been submitted,
1359 * -ENOMEM if we could not allocate enough bios,
1360 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1361 * single page to an empty bio (which should never happen and likely indicates
1362 * that the lower level IO stack is in some way broken). This has been observed
1363 * on certain Xen deployments.
1364 */
1365 /* TODO allocate from our own bio_set. */
1366 int drbd_submit_peer_request(struct drbd_device *device,
1367 struct drbd_peer_request *peer_req,
1368 const unsigned rw, const int fault_type)
1369 {
1370 struct bio *bios = NULL;
1371 struct bio *bio;
1372 struct page *page = peer_req->pages;
1373 sector_t sector = peer_req->i.sector;
1374 unsigned data_size = peer_req->i.size;
1375 unsigned n_bios = 0;
1376 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1377 int err = -ENOMEM;
1378
1379 if (peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) {
1380 /* wait for all pending IO completions, before we start
1381 * zeroing things out. */
1382 conn_wait_active_ee_empty(first_peer_device(device)->connection);
1383 /* add it to the active list now,
1384 * so we can find it to present it in debugfs */
1385 peer_req->submit_jif = jiffies;
1386 peer_req->flags |= EE_SUBMITTED;
1387 spin_lock_irq(&device->resource->req_lock);
1388 list_add_tail(&peer_req->w.list, &device->active_ee);
1389 spin_unlock_irq(&device->resource->req_lock);
1390 if (blkdev_issue_zeroout(device->ldev->backing_bdev,
1391 sector, data_size >> 9, GFP_NOIO, false))
1392 peer_req->flags |= EE_WAS_ERROR;
1393 drbd_endio_write_sec_final(peer_req);
1394 return 0;
1395 }
1396
1397 /* Discards don't have any payload.
1398 * But the scsi layer still expects a bio_vec it can use internally,
1399 * see sd_setup_discard_cmnd() and blk_add_request_payload(). */
1400 if (peer_req->flags & EE_IS_TRIM)
1401 nr_pages = 1;
1402
1403 /* In most cases, we will only need one bio. But in case the lower
1404 * level restrictions happen to be different at this offset on this
1405 * side than those of the sending peer, we may need to submit the
1406 * request in more than one bio.
1407 *
1408 * Plain bio_alloc is good enough here, this is no DRBD internally
1409 * generated bio, but a bio allocated on behalf of the peer.
1410 */
1411 next_bio:
1412 bio = bio_alloc(GFP_NOIO, nr_pages);
1413 if (!bio) {
1414 drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1415 goto fail;
1416 }
1417 /* > peer_req->i.sector, unless this is the first bio */
1418 bio->bi_iter.bi_sector = sector;
1419 bio->bi_bdev = device->ldev->backing_bdev;
1420 bio->bi_rw = rw;
1421 bio->bi_private = peer_req;
1422 bio->bi_end_io = drbd_peer_request_endio;
1423
1424 bio->bi_next = bios;
1425 bios = bio;
1426 ++n_bios;
1427
1428 if (rw & REQ_DISCARD) {
1429 bio->bi_iter.bi_size = data_size;
1430 goto submit;
1431 }
1432
1433 page_chain_for_each(page) {
1434 unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1435 if (!bio_add_page(bio, page, len, 0)) {
1436 /* A single page must always be possible!
1437 * But in case it fails anyways,
1438 * we deal with it, and complain (below). */
1439 if (bio->bi_vcnt == 0) {
1440 drbd_err(device,
1441 "bio_add_page failed for len=%u, "
1442 "bi_vcnt=0 (bi_sector=%llu)\n",
1443 len, (uint64_t)bio->bi_iter.bi_sector);
1444 err = -ENOSPC;
1445 goto fail;
1446 }
1447 goto next_bio;
1448 }
1449 data_size -= len;
1450 sector += len >> 9;
1451 --nr_pages;
1452 }
1453 D_ASSERT(device, data_size == 0);
1454 submit:
1455 D_ASSERT(device, page == NULL);
1456
1457 atomic_set(&peer_req->pending_bios, n_bios);
1458 /* for debugfs: update timestamp, mark as submitted */
1459 peer_req->submit_jif = jiffies;
1460 peer_req->flags |= EE_SUBMITTED;
1461 do {
1462 bio = bios;
1463 bios = bios->bi_next;
1464 bio->bi_next = NULL;
1465
1466 drbd_generic_make_request(device, fault_type, bio);
1467 } while (bios);
1468 return 0;
1469
1470 fail:
1471 while (bios) {
1472 bio = bios;
1473 bios = bios->bi_next;
1474 bio_put(bio);
1475 }
1476 return err;
1477 }
1478
1479 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1480 struct drbd_peer_request *peer_req)
1481 {
1482 struct drbd_interval *i = &peer_req->i;
1483
1484 drbd_remove_interval(&device->write_requests, i);
1485 drbd_clear_interval(i);
1486
1487 /* Wake up any processes waiting for this peer request to complete. */
1488 if (i->waiting)
1489 wake_up(&device->misc_wait);
1490 }
1491
1492 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1493 {
1494 struct drbd_peer_device *peer_device;
1495 int vnr;
1496
1497 rcu_read_lock();
1498 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1499 struct drbd_device *device = peer_device->device;
1500
1501 kref_get(&device->kref);
1502 rcu_read_unlock();
1503 drbd_wait_ee_list_empty(device, &device->active_ee);
1504 kref_put(&device->kref, drbd_destroy_device);
1505 rcu_read_lock();
1506 }
1507 rcu_read_unlock();
1508 }
1509
1510 static struct drbd_peer_device *
1511 conn_peer_device(struct drbd_connection *connection, int volume_number)
1512 {
1513 return idr_find(&connection->peer_devices, volume_number);
1514 }
1515
1516 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1517 {
1518 int rv;
1519 struct p_barrier *p = pi->data;
1520 struct drbd_epoch *epoch;
1521
1522 /* FIXME these are unacked on connection,
1523 * not a specific (peer)device.
1524 */
1525 connection->current_epoch->barrier_nr = p->barrier;
1526 connection->current_epoch->connection = connection;
1527 rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1528
1529 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1530 * the activity log, which means it would not be resynced in case the
1531 * R_PRIMARY crashes now.
1532 * Therefore we must send the barrier_ack after the barrier request was
1533 * completed. */
1534 switch (connection->resource->write_ordering) {
1535 case WO_none:
1536 if (rv == FE_RECYCLED)
1537 return 0;
1538
1539 /* receiver context, in the writeout path of the other node.
1540 * avoid potential distributed deadlock */
1541 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1542 if (epoch)
1543 break;
1544 else
1545 drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1546 /* Fall through */
1547
1548 case WO_bdev_flush:
1549 case WO_drain_io:
1550 conn_wait_active_ee_empty(connection);
1551 drbd_flush(connection);
1552
1553 if (atomic_read(&connection->current_epoch->epoch_size)) {
1554 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1555 if (epoch)
1556 break;
1557 }
1558
1559 return 0;
1560 default:
1561 drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1562 connection->resource->write_ordering);
1563 return -EIO;
1564 }
1565
1566 epoch->flags = 0;
1567 atomic_set(&epoch->epoch_size, 0);
1568 atomic_set(&epoch->active, 0);
1569
1570 spin_lock(&connection->epoch_lock);
1571 if (atomic_read(&connection->current_epoch->epoch_size)) {
1572 list_add(&epoch->list, &connection->current_epoch->list);
1573 connection->current_epoch = epoch;
1574 connection->epochs++;
1575 } else {
1576 /* The current_epoch got recycled while we allocated this one... */
1577 kfree(epoch);
1578 }
1579 spin_unlock(&connection->epoch_lock);
1580
1581 return 0;
1582 }
1583
1584 /* used from receive_RSDataReply (recv_resync_read)
1585 * and from receive_Data */
1586 static struct drbd_peer_request *
1587 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1588 struct packet_info *pi) __must_hold(local)
1589 {
1590 struct drbd_device *device = peer_device->device;
1591 const sector_t capacity = drbd_get_capacity(device->this_bdev);
1592 struct drbd_peer_request *peer_req;
1593 struct page *page;
1594 int digest_size, err;
1595 unsigned int data_size = pi->size, ds;
1596 void *dig_in = peer_device->connection->int_dig_in;
1597 void *dig_vv = peer_device->connection->int_dig_vv;
1598 unsigned long *data;
1599 struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1600
1601 digest_size = 0;
1602 if (!trim && peer_device->connection->peer_integrity_tfm) {
1603 digest_size = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1604 /*
1605 * FIXME: Receive the incoming digest into the receive buffer
1606 * here, together with its struct p_data?
1607 */
1608 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1609 if (err)
1610 return NULL;
1611 data_size -= digest_size;
1612 }
1613
1614 if (trim) {
1615 D_ASSERT(peer_device, data_size == 0);
1616 data_size = be32_to_cpu(trim->size);
1617 }
1618
1619 if (!expect(IS_ALIGNED(data_size, 512)))
1620 return NULL;
1621 /* prepare for larger trim requests. */
1622 if (!trim && !expect(data_size <= DRBD_MAX_BIO_SIZE))
1623 return NULL;
1624
1625 /* even though we trust out peer,
1626 * we sometimes have to double check. */
1627 if (sector + (data_size>>9) > capacity) {
1628 drbd_err(device, "request from peer beyond end of local disk: "
1629 "capacity: %llus < sector: %llus + size: %u\n",
1630 (unsigned long long)capacity,
1631 (unsigned long long)sector, data_size);
1632 return NULL;
1633 }
1634
1635 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1636 * "criss-cross" setup, that might cause write-out on some other DRBD,
1637 * which in turn might block on the other node at this very place. */
1638 peer_req = drbd_alloc_peer_req(peer_device, id, sector, data_size, trim == NULL, GFP_NOIO);
1639 if (!peer_req)
1640 return NULL;
1641
1642 peer_req->flags |= EE_WRITE;
1643 if (trim)
1644 return peer_req;
1645
1646 ds = data_size;
1647 page = peer_req->pages;
1648 page_chain_for_each(page) {
1649 unsigned len = min_t(int, ds, PAGE_SIZE);
1650 data = kmap(page);
1651 err = drbd_recv_all_warn(peer_device->connection, data, len);
1652 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1653 drbd_err(device, "Fault injection: Corrupting data on receive\n");
1654 data[0] = data[0] ^ (unsigned long)-1;
1655 }
1656 kunmap(page);
1657 if (err) {
1658 drbd_free_peer_req(device, peer_req);
1659 return NULL;
1660 }
1661 ds -= len;
1662 }
1663
1664 if (digest_size) {
1665 drbd_csum_ee(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv);
1666 if (memcmp(dig_in, dig_vv, digest_size)) {
1667 drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1668 (unsigned long long)sector, data_size);
1669 drbd_free_peer_req(device, peer_req);
1670 return NULL;
1671 }
1672 }
1673 device->recv_cnt += data_size >> 9;
1674 return peer_req;
1675 }
1676
1677 /* drbd_drain_block() just takes a data block
1678 * out of the socket input buffer, and discards it.
1679 */
1680 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1681 {
1682 struct page *page;
1683 int err = 0;
1684 void *data;
1685
1686 if (!data_size)
1687 return 0;
1688
1689 page = drbd_alloc_pages(peer_device, 1, 1);
1690
1691 data = kmap(page);
1692 while (data_size) {
1693 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1694
1695 err = drbd_recv_all_warn(peer_device->connection, data, len);
1696 if (err)
1697 break;
1698 data_size -= len;
1699 }
1700 kunmap(page);
1701 drbd_free_pages(peer_device->device, page, 0);
1702 return err;
1703 }
1704
1705 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1706 sector_t sector, int data_size)
1707 {
1708 struct bio_vec bvec;
1709 struct bvec_iter iter;
1710 struct bio *bio;
1711 int digest_size, err, expect;
1712 void *dig_in = peer_device->connection->int_dig_in;
1713 void *dig_vv = peer_device->connection->int_dig_vv;
1714
1715 digest_size = 0;
1716 if (peer_device->connection->peer_integrity_tfm) {
1717 digest_size = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1718 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1719 if (err)
1720 return err;
1721 data_size -= digest_size;
1722 }
1723
1724 /* optimistically update recv_cnt. if receiving fails below,
1725 * we disconnect anyways, and counters will be reset. */
1726 peer_device->device->recv_cnt += data_size>>9;
1727
1728 bio = req->master_bio;
1729 D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1730
1731 bio_for_each_segment(bvec, bio, iter) {
1732 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1733 expect = min_t(int, data_size, bvec.bv_len);
1734 err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1735 kunmap(bvec.bv_page);
1736 if (err)
1737 return err;
1738 data_size -= expect;
1739 }
1740
1741 if (digest_size) {
1742 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1743 if (memcmp(dig_in, dig_vv, digest_size)) {
1744 drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1745 return -EINVAL;
1746 }
1747 }
1748
1749 D_ASSERT(peer_device->device, data_size == 0);
1750 return 0;
1751 }
1752
1753 /*
1754 * e_end_resync_block() is called in asender context via
1755 * drbd_finish_peer_reqs().
1756 */
1757 static int e_end_resync_block(struct drbd_work *w, int unused)
1758 {
1759 struct drbd_peer_request *peer_req =
1760 container_of(w, struct drbd_peer_request, w);
1761 struct drbd_peer_device *peer_device = peer_req->peer_device;
1762 struct drbd_device *device = peer_device->device;
1763 sector_t sector = peer_req->i.sector;
1764 int err;
1765
1766 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1767
1768 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1769 drbd_set_in_sync(device, sector, peer_req->i.size);
1770 err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
1771 } else {
1772 /* Record failure to sync */
1773 drbd_rs_failed_io(device, sector, peer_req->i.size);
1774
1775 err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1776 }
1777 dec_unacked(device);
1778
1779 return err;
1780 }
1781
1782 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1783 struct packet_info *pi) __releases(local)
1784 {
1785 struct drbd_device *device = peer_device->device;
1786 struct drbd_peer_request *peer_req;
1787
1788 peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
1789 if (!peer_req)
1790 goto fail;
1791
1792 dec_rs_pending(device);
1793
1794 inc_unacked(device);
1795 /* corresponding dec_unacked() in e_end_resync_block()
1796 * respective _drbd_clear_done_ee */
1797
1798 peer_req->w.cb = e_end_resync_block;
1799 peer_req->submit_jif = jiffies;
1800
1801 spin_lock_irq(&device->resource->req_lock);
1802 list_add_tail(&peer_req->w.list, &device->sync_ee);
1803 spin_unlock_irq(&device->resource->req_lock);
1804
1805 atomic_add(pi->size >> 9, &device->rs_sect_ev);
1806 if (drbd_submit_peer_request(device, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1807 return 0;
1808
1809 /* don't care for the reason here */
1810 drbd_err(device, "submit failed, triggering re-connect\n");
1811 spin_lock_irq(&device->resource->req_lock);
1812 list_del(&peer_req->w.list);
1813 spin_unlock_irq(&device->resource->req_lock);
1814
1815 drbd_free_peer_req(device, peer_req);
1816 fail:
1817 put_ldev(device);
1818 return -EIO;
1819 }
1820
1821 static struct drbd_request *
1822 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
1823 sector_t sector, bool missing_ok, const char *func)
1824 {
1825 struct drbd_request *req;
1826
1827 /* Request object according to our peer */
1828 req = (struct drbd_request *)(unsigned long)id;
1829 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1830 return req;
1831 if (!missing_ok) {
1832 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
1833 (unsigned long)id, (unsigned long long)sector);
1834 }
1835 return NULL;
1836 }
1837
1838 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
1839 {
1840 struct drbd_peer_device *peer_device;
1841 struct drbd_device *device;
1842 struct drbd_request *req;
1843 sector_t sector;
1844 int err;
1845 struct p_data *p = pi->data;
1846
1847 peer_device = conn_peer_device(connection, pi->vnr);
1848 if (!peer_device)
1849 return -EIO;
1850 device = peer_device->device;
1851
1852 sector = be64_to_cpu(p->sector);
1853
1854 spin_lock_irq(&device->resource->req_lock);
1855 req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
1856 spin_unlock_irq(&device->resource->req_lock);
1857 if (unlikely(!req))
1858 return -EIO;
1859
1860 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1861 * special casing it there for the various failure cases.
1862 * still no race with drbd_fail_pending_reads */
1863 err = recv_dless_read(peer_device, req, sector, pi->size);
1864 if (!err)
1865 req_mod(req, DATA_RECEIVED);
1866 /* else: nothing. handled from drbd_disconnect...
1867 * I don't think we may complete this just yet
1868 * in case we are "on-disconnect: freeze" */
1869
1870 return err;
1871 }
1872
1873 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
1874 {
1875 struct drbd_peer_device *peer_device;
1876 struct drbd_device *device;
1877 sector_t sector;
1878 int err;
1879 struct p_data *p = pi->data;
1880
1881 peer_device = conn_peer_device(connection, pi->vnr);
1882 if (!peer_device)
1883 return -EIO;
1884 device = peer_device->device;
1885
1886 sector = be64_to_cpu(p->sector);
1887 D_ASSERT(device, p->block_id == ID_SYNCER);
1888
1889 if (get_ldev(device)) {
1890 /* data is submitted to disk within recv_resync_read.
1891 * corresponding put_ldev done below on error,
1892 * or in drbd_peer_request_endio. */
1893 err = recv_resync_read(peer_device, sector, pi);
1894 } else {
1895 if (__ratelimit(&drbd_ratelimit_state))
1896 drbd_err(device, "Can not write resync data to local disk.\n");
1897
1898 err = drbd_drain_block(peer_device, pi->size);
1899
1900 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
1901 }
1902
1903 atomic_add(pi->size >> 9, &device->rs_sect_in);
1904
1905 return err;
1906 }
1907
1908 static void restart_conflicting_writes(struct drbd_device *device,
1909 sector_t sector, int size)
1910 {
1911 struct drbd_interval *i;
1912 struct drbd_request *req;
1913
1914 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
1915 if (!i->local)
1916 continue;
1917 req = container_of(i, struct drbd_request, i);
1918 if (req->rq_state & RQ_LOCAL_PENDING ||
1919 !(req->rq_state & RQ_POSTPONED))
1920 continue;
1921 /* as it is RQ_POSTPONED, this will cause it to
1922 * be queued on the retry workqueue. */
1923 __req_mod(req, CONFLICT_RESOLVED, NULL);
1924 }
1925 }
1926
1927 /*
1928 * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1929 */
1930 static int e_end_block(struct drbd_work *w, int cancel)
1931 {
1932 struct drbd_peer_request *peer_req =
1933 container_of(w, struct drbd_peer_request, w);
1934 struct drbd_peer_device *peer_device = peer_req->peer_device;
1935 struct drbd_device *device = peer_device->device;
1936 sector_t sector = peer_req->i.sector;
1937 int err = 0, pcmd;
1938
1939 if (peer_req->flags & EE_SEND_WRITE_ACK) {
1940 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1941 pcmd = (device->state.conn >= C_SYNC_SOURCE &&
1942 device->state.conn <= C_PAUSED_SYNC_T &&
1943 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1944 P_RS_WRITE_ACK : P_WRITE_ACK;
1945 err = drbd_send_ack(peer_device, pcmd, peer_req);
1946 if (pcmd == P_RS_WRITE_ACK)
1947 drbd_set_in_sync(device, sector, peer_req->i.size);
1948 } else {
1949 err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1950 /* we expect it to be marked out of sync anyways...
1951 * maybe assert this? */
1952 }
1953 dec_unacked(device);
1954 }
1955
1956 /* we delete from the conflict detection hash _after_ we sent out the
1957 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
1958 if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1959 spin_lock_irq(&device->resource->req_lock);
1960 D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
1961 drbd_remove_epoch_entry_interval(device, peer_req);
1962 if (peer_req->flags & EE_RESTART_REQUESTS)
1963 restart_conflicting_writes(device, sector, peer_req->i.size);
1964 spin_unlock_irq(&device->resource->req_lock);
1965 } else
1966 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1967
1968 drbd_may_finish_epoch(first_peer_device(device)->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1969
1970 return err;
1971 }
1972
1973 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1974 {
1975 struct drbd_peer_request *peer_req =
1976 container_of(w, struct drbd_peer_request, w);
1977 struct drbd_peer_device *peer_device = peer_req->peer_device;
1978 int err;
1979
1980 err = drbd_send_ack(peer_device, ack, peer_req);
1981 dec_unacked(peer_device->device);
1982
1983 return err;
1984 }
1985
1986 static int e_send_superseded(struct drbd_work *w, int unused)
1987 {
1988 return e_send_ack(w, P_SUPERSEDED);
1989 }
1990
1991 static int e_send_retry_write(struct drbd_work *w, int unused)
1992 {
1993 struct drbd_peer_request *peer_req =
1994 container_of(w, struct drbd_peer_request, w);
1995 struct drbd_connection *connection = peer_req->peer_device->connection;
1996
1997 return e_send_ack(w, connection->agreed_pro_version >= 100 ?
1998 P_RETRY_WRITE : P_SUPERSEDED);
1999 }
2000
2001 static bool seq_greater(u32 a, u32 b)
2002 {
2003 /*
2004 * We assume 32-bit wrap-around here.
2005 * For 24-bit wrap-around, we would have to shift:
2006 * a <<= 8; b <<= 8;
2007 */
2008 return (s32)a - (s32)b > 0;
2009 }
2010
2011 static u32 seq_max(u32 a, u32 b)
2012 {
2013 return seq_greater(a, b) ? a : b;
2014 }
2015
2016 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2017 {
2018 struct drbd_device *device = peer_device->device;
2019 unsigned int newest_peer_seq;
2020
2021 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2022 spin_lock(&device->peer_seq_lock);
2023 newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2024 device->peer_seq = newest_peer_seq;
2025 spin_unlock(&device->peer_seq_lock);
2026 /* wake up only if we actually changed device->peer_seq */
2027 if (peer_seq == newest_peer_seq)
2028 wake_up(&device->seq_wait);
2029 }
2030 }
2031
2032 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2033 {
2034 return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2035 }
2036
2037 /* maybe change sync_ee into interval trees as well? */
2038 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2039 {
2040 struct drbd_peer_request *rs_req;
2041 bool rv = 0;
2042
2043 spin_lock_irq(&device->resource->req_lock);
2044 list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2045 if (overlaps(peer_req->i.sector, peer_req->i.size,
2046 rs_req->i.sector, rs_req->i.size)) {
2047 rv = 1;
2048 break;
2049 }
2050 }
2051 spin_unlock_irq(&device->resource->req_lock);
2052
2053 return rv;
2054 }
2055
2056 /* Called from receive_Data.
2057 * Synchronize packets on sock with packets on msock.
2058 *
2059 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2060 * packet traveling on msock, they are still processed in the order they have
2061 * been sent.
2062 *
2063 * Note: we don't care for Ack packets overtaking P_DATA packets.
2064 *
2065 * In case packet_seq is larger than device->peer_seq number, there are
2066 * outstanding packets on the msock. We wait for them to arrive.
2067 * In case we are the logically next packet, we update device->peer_seq
2068 * ourselves. Correctly handles 32bit wrap around.
2069 *
2070 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2071 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2072 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2073 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2074 *
2075 * returns 0 if we may process the packet,
2076 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2077 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2078 {
2079 struct drbd_device *device = peer_device->device;
2080 DEFINE_WAIT(wait);
2081 long timeout;
2082 int ret = 0, tp;
2083
2084 if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2085 return 0;
2086
2087 spin_lock(&device->peer_seq_lock);
2088 for (;;) {
2089 if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2090 device->peer_seq = seq_max(device->peer_seq, peer_seq);
2091 break;
2092 }
2093
2094 if (signal_pending(current)) {
2095 ret = -ERESTARTSYS;
2096 break;
2097 }
2098
2099 rcu_read_lock();
2100 tp = rcu_dereference(first_peer_device(device)->connection->net_conf)->two_primaries;
2101 rcu_read_unlock();
2102
2103 if (!tp)
2104 break;
2105
2106 /* Only need to wait if two_primaries is enabled */
2107 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2108 spin_unlock(&device->peer_seq_lock);
2109 rcu_read_lock();
2110 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2111 rcu_read_unlock();
2112 timeout = schedule_timeout(timeout);
2113 spin_lock(&device->peer_seq_lock);
2114 if (!timeout) {
2115 ret = -ETIMEDOUT;
2116 drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2117 break;
2118 }
2119 }
2120 spin_unlock(&device->peer_seq_lock);
2121 finish_wait(&device->seq_wait, &wait);
2122 return ret;
2123 }
2124
2125 /* see also bio_flags_to_wire()
2126 * DRBD_REQ_*, because we need to semantically map the flags to data packet
2127 * flags and back. We may replicate to other kernel versions. */
2128 static unsigned long wire_flags_to_bio(u32 dpf)
2129 {
2130 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2131 (dpf & DP_FUA ? REQ_FUA : 0) |
2132 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
2133 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
2134 }
2135
2136 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2137 unsigned int size)
2138 {
2139 struct drbd_interval *i;
2140
2141 repeat:
2142 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2143 struct drbd_request *req;
2144 struct bio_and_error m;
2145
2146 if (!i->local)
2147 continue;
2148 req = container_of(i, struct drbd_request, i);
2149 if (!(req->rq_state & RQ_POSTPONED))
2150 continue;
2151 req->rq_state &= ~RQ_POSTPONED;
2152 __req_mod(req, NEG_ACKED, &m);
2153 spin_unlock_irq(&device->resource->req_lock);
2154 if (m.bio)
2155 complete_master_bio(device, &m);
2156 spin_lock_irq(&device->resource->req_lock);
2157 goto repeat;
2158 }
2159 }
2160
2161 static int handle_write_conflicts(struct drbd_device *device,
2162 struct drbd_peer_request *peer_req)
2163 {
2164 struct drbd_connection *connection = peer_req->peer_device->connection;
2165 bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2166 sector_t sector = peer_req->i.sector;
2167 const unsigned int size = peer_req->i.size;
2168 struct drbd_interval *i;
2169 bool equal;
2170 int err;
2171
2172 /*
2173 * Inserting the peer request into the write_requests tree will prevent
2174 * new conflicting local requests from being added.
2175 */
2176 drbd_insert_interval(&device->write_requests, &peer_req->i);
2177
2178 repeat:
2179 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2180 if (i == &peer_req->i)
2181 continue;
2182 if (i->completed)
2183 continue;
2184
2185 if (!i->local) {
2186 /*
2187 * Our peer has sent a conflicting remote request; this
2188 * should not happen in a two-node setup. Wait for the
2189 * earlier peer request to complete.
2190 */
2191 err = drbd_wait_misc(device, i);
2192 if (err)
2193 goto out;
2194 goto repeat;
2195 }
2196
2197 equal = i->sector == sector && i->size == size;
2198 if (resolve_conflicts) {
2199 /*
2200 * If the peer request is fully contained within the
2201 * overlapping request, it can be considered overwritten
2202 * and thus superseded; otherwise, it will be retried
2203 * once all overlapping requests have completed.
2204 */
2205 bool superseded = i->sector <= sector && i->sector +
2206 (i->size >> 9) >= sector + (size >> 9);
2207
2208 if (!equal)
2209 drbd_alert(device, "Concurrent writes detected: "
2210 "local=%llus +%u, remote=%llus +%u, "
2211 "assuming %s came first\n",
2212 (unsigned long long)i->sector, i->size,
2213 (unsigned long long)sector, size,
2214 superseded ? "local" : "remote");
2215
2216 peer_req->w.cb = superseded ? e_send_superseded :
2217 e_send_retry_write;
2218 list_add_tail(&peer_req->w.list, &device->done_ee);
2219 wake_asender(connection);
2220
2221 err = -ENOENT;
2222 goto out;
2223 } else {
2224 struct drbd_request *req =
2225 container_of(i, struct drbd_request, i);
2226
2227 if (!equal)
2228 drbd_alert(device, "Concurrent writes detected: "
2229 "local=%llus +%u, remote=%llus +%u\n",
2230 (unsigned long long)i->sector, i->size,
2231 (unsigned long long)sector, size);
2232
2233 if (req->rq_state & RQ_LOCAL_PENDING ||
2234 !(req->rq_state & RQ_POSTPONED)) {
2235 /*
2236 * Wait for the node with the discard flag to
2237 * decide if this request has been superseded
2238 * or needs to be retried.
2239 * Requests that have been superseded will
2240 * disappear from the write_requests tree.
2241 *
2242 * In addition, wait for the conflicting
2243 * request to finish locally before submitting
2244 * the conflicting peer request.
2245 */
2246 err = drbd_wait_misc(device, &req->i);
2247 if (err) {
2248 _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2249 fail_postponed_requests(device, sector, size);
2250 goto out;
2251 }
2252 goto repeat;
2253 }
2254 /*
2255 * Remember to restart the conflicting requests after
2256 * the new peer request has completed.
2257 */
2258 peer_req->flags |= EE_RESTART_REQUESTS;
2259 }
2260 }
2261 err = 0;
2262
2263 out:
2264 if (err)
2265 drbd_remove_epoch_entry_interval(device, peer_req);
2266 return err;
2267 }
2268
2269 /* mirrored write */
2270 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2271 {
2272 struct drbd_peer_device *peer_device;
2273 struct drbd_device *device;
2274 struct net_conf *nc;
2275 sector_t sector;
2276 struct drbd_peer_request *peer_req;
2277 struct p_data *p = pi->data;
2278 u32 peer_seq = be32_to_cpu(p->seq_num);
2279 int rw = WRITE;
2280 u32 dp_flags;
2281 int err, tp;
2282
2283 peer_device = conn_peer_device(connection, pi->vnr);
2284 if (!peer_device)
2285 return -EIO;
2286 device = peer_device->device;
2287
2288 if (!get_ldev(device)) {
2289 int err2;
2290
2291 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2292 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2293 atomic_inc(&connection->current_epoch->epoch_size);
2294 err2 = drbd_drain_block(peer_device, pi->size);
2295 if (!err)
2296 err = err2;
2297 return err;
2298 }
2299
2300 /*
2301 * Corresponding put_ldev done either below (on various errors), or in
2302 * drbd_peer_request_endio, if we successfully submit the data at the
2303 * end of this function.
2304 */
2305
2306 sector = be64_to_cpu(p->sector);
2307 peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2308 if (!peer_req) {
2309 put_ldev(device);
2310 return -EIO;
2311 }
2312
2313 peer_req->w.cb = e_end_block;
2314 peer_req->submit_jif = jiffies;
2315 peer_req->flags |= EE_APPLICATION;
2316
2317 dp_flags = be32_to_cpu(p->dp_flags);
2318 rw |= wire_flags_to_bio(dp_flags);
2319 if (pi->cmd == P_TRIM) {
2320 struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
2321 peer_req->flags |= EE_IS_TRIM;
2322 if (!blk_queue_discard(q))
2323 peer_req->flags |= EE_IS_TRIM_USE_ZEROOUT;
2324 D_ASSERT(peer_device, peer_req->i.size > 0);
2325 D_ASSERT(peer_device, rw & REQ_DISCARD);
2326 D_ASSERT(peer_device, peer_req->pages == NULL);
2327 } else if (peer_req->pages == NULL) {
2328 D_ASSERT(device, peer_req->i.size == 0);
2329 D_ASSERT(device, dp_flags & DP_FLUSH);
2330 }
2331
2332 if (dp_flags & DP_MAY_SET_IN_SYNC)
2333 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2334
2335 spin_lock(&connection->epoch_lock);
2336 peer_req->epoch = connection->current_epoch;
2337 atomic_inc(&peer_req->epoch->epoch_size);
2338 atomic_inc(&peer_req->epoch->active);
2339 spin_unlock(&connection->epoch_lock);
2340
2341 rcu_read_lock();
2342 nc = rcu_dereference(peer_device->connection->net_conf);
2343 tp = nc->two_primaries;
2344 if (peer_device->connection->agreed_pro_version < 100) {
2345 switch (nc->wire_protocol) {
2346 case DRBD_PROT_C:
2347 dp_flags |= DP_SEND_WRITE_ACK;
2348 break;
2349 case DRBD_PROT_B:
2350 dp_flags |= DP_SEND_RECEIVE_ACK;
2351 break;
2352 }
2353 }
2354 rcu_read_unlock();
2355
2356 if (dp_flags & DP_SEND_WRITE_ACK) {
2357 peer_req->flags |= EE_SEND_WRITE_ACK;
2358 inc_unacked(device);
2359 /* corresponding dec_unacked() in e_end_block()
2360 * respective _drbd_clear_done_ee */
2361 }
2362
2363 if (dp_flags & DP_SEND_RECEIVE_ACK) {
2364 /* I really don't like it that the receiver thread
2365 * sends on the msock, but anyways */
2366 drbd_send_ack(first_peer_device(device), P_RECV_ACK, peer_req);
2367 }
2368
2369 if (tp) {
2370 /* two primaries implies protocol C */
2371 D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2372 peer_req->flags |= EE_IN_INTERVAL_TREE;
2373 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2374 if (err)
2375 goto out_interrupted;
2376 spin_lock_irq(&device->resource->req_lock);
2377 err = handle_write_conflicts(device, peer_req);
2378 if (err) {
2379 spin_unlock_irq(&device->resource->req_lock);
2380 if (err == -ENOENT) {
2381 put_ldev(device);
2382 return 0;
2383 }
2384 goto out_interrupted;
2385 }
2386 } else {
2387 update_peer_seq(peer_device, peer_seq);
2388 spin_lock_irq(&device->resource->req_lock);
2389 }
2390 /* if we use the zeroout fallback code, we process synchronously
2391 * and we wait for all pending requests, respectively wait for
2392 * active_ee to become empty in drbd_submit_peer_request();
2393 * better not add ourselves here. */
2394 if ((peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) == 0)
2395 list_add_tail(&peer_req->w.list, &device->active_ee);
2396 spin_unlock_irq(&device->resource->req_lock);
2397
2398 if (device->state.conn == C_SYNC_TARGET)
2399 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2400
2401 if (device->state.pdsk < D_INCONSISTENT) {
2402 /* In case we have the only disk of the cluster, */
2403 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2404 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2405 drbd_al_begin_io(device, &peer_req->i);
2406 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2407 }
2408
2409 err = drbd_submit_peer_request(device, peer_req, rw, DRBD_FAULT_DT_WR);
2410 if (!err)
2411 return 0;
2412
2413 /* don't care for the reason here */
2414 drbd_err(device, "submit failed, triggering re-connect\n");
2415 spin_lock_irq(&device->resource->req_lock);
2416 list_del(&peer_req->w.list);
2417 drbd_remove_epoch_entry_interval(device, peer_req);
2418 spin_unlock_irq(&device->resource->req_lock);
2419 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2420 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2421 drbd_al_complete_io(device, &peer_req->i);
2422 }
2423
2424 out_interrupted:
2425 drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP);
2426 put_ldev(device);
2427 drbd_free_peer_req(device, peer_req);
2428 return err;
2429 }
2430
2431 /* We may throttle resync, if the lower device seems to be busy,
2432 * and current sync rate is above c_min_rate.
2433 *
2434 * To decide whether or not the lower device is busy, we use a scheme similar
2435 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2436 * (more than 64 sectors) of activity we cannot account for with our own resync
2437 * activity, it obviously is "busy".
2438 *
2439 * The current sync rate used here uses only the most recent two step marks,
2440 * to have a short time average so we can react faster.
2441 */
2442 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2443 bool throttle_if_app_is_waiting)
2444 {
2445 struct lc_element *tmp;
2446 bool throttle = drbd_rs_c_min_rate_throttle(device);
2447
2448 if (!throttle || throttle_if_app_is_waiting)
2449 return throttle;
2450
2451 spin_lock_irq(&device->al_lock);
2452 tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2453 if (tmp) {
2454 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2455 if (test_bit(BME_PRIORITY, &bm_ext->flags))
2456 throttle = false;
2457 /* Do not slow down if app IO is already waiting for this extent,
2458 * and our progress is necessary for application IO to complete. */
2459 }
2460 spin_unlock_irq(&device->al_lock);
2461
2462 return throttle;
2463 }
2464
2465 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2466 {
2467 struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2468 unsigned long db, dt, dbdt;
2469 unsigned int c_min_rate;
2470 int curr_events;
2471
2472 rcu_read_lock();
2473 c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2474 rcu_read_unlock();
2475
2476 /* feature disabled? */
2477 if (c_min_rate == 0)
2478 return false;
2479
2480 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2481 (int)part_stat_read(&disk->part0, sectors[1]) -
2482 atomic_read(&device->rs_sect_ev);
2483
2484 if (atomic_read(&device->ap_actlog_cnt)
2485 || curr_events - device->rs_last_events > 64) {
2486 unsigned long rs_left;
2487 int i;
2488
2489 device->rs_last_events = curr_events;
2490
2491 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2492 * approx. */
2493 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2494
2495 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2496 rs_left = device->ov_left;
2497 else
2498 rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2499
2500 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2501 if (!dt)
2502 dt++;
2503 db = device->rs_mark_left[i] - rs_left;
2504 dbdt = Bit2KB(db/dt);
2505
2506 if (dbdt > c_min_rate)
2507 return true;
2508 }
2509 return false;
2510 }
2511
2512 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2513 {
2514 struct drbd_peer_device *peer_device;
2515 struct drbd_device *device;
2516 sector_t sector;
2517 sector_t capacity;
2518 struct drbd_peer_request *peer_req;
2519 struct digest_info *di = NULL;
2520 int size, verb;
2521 unsigned int fault_type;
2522 struct p_block_req *p = pi->data;
2523
2524 peer_device = conn_peer_device(connection, pi->vnr);
2525 if (!peer_device)
2526 return -EIO;
2527 device = peer_device->device;
2528 capacity = drbd_get_capacity(device->this_bdev);
2529
2530 sector = be64_to_cpu(p->sector);
2531 size = be32_to_cpu(p->blksize);
2532
2533 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2534 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2535 (unsigned long long)sector, size);
2536 return -EINVAL;
2537 }
2538 if (sector + (size>>9) > capacity) {
2539 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2540 (unsigned long long)sector, size);
2541 return -EINVAL;
2542 }
2543
2544 if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2545 verb = 1;
2546 switch (pi->cmd) {
2547 case P_DATA_REQUEST:
2548 drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2549 break;
2550 case P_RS_DATA_REQUEST:
2551 case P_CSUM_RS_REQUEST:
2552 case P_OV_REQUEST:
2553 drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2554 break;
2555 case P_OV_REPLY:
2556 verb = 0;
2557 dec_rs_pending(device);
2558 drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2559 break;
2560 default:
2561 BUG();
2562 }
2563 if (verb && __ratelimit(&drbd_ratelimit_state))
2564 drbd_err(device, "Can not satisfy peer's read request, "
2565 "no local data.\n");
2566
2567 /* drain possibly payload */
2568 return drbd_drain_block(peer_device, pi->size);
2569 }
2570
2571 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2572 * "criss-cross" setup, that might cause write-out on some other DRBD,
2573 * which in turn might block on the other node at this very place. */
2574 peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2575 true /* has real payload */, GFP_NOIO);
2576 if (!peer_req) {
2577 put_ldev(device);
2578 return -ENOMEM;
2579 }
2580
2581 switch (pi->cmd) {
2582 case P_DATA_REQUEST:
2583 peer_req->w.cb = w_e_end_data_req;
2584 fault_type = DRBD_FAULT_DT_RD;
2585 /* application IO, don't drbd_rs_begin_io */
2586 peer_req->flags |= EE_APPLICATION;
2587 goto submit;
2588
2589 case P_RS_DATA_REQUEST:
2590 peer_req->w.cb = w_e_end_rsdata_req;
2591 fault_type = DRBD_FAULT_RS_RD;
2592 /* used in the sector offset progress display */
2593 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2594 break;
2595
2596 case P_OV_REPLY:
2597 case P_CSUM_RS_REQUEST:
2598 fault_type = DRBD_FAULT_RS_RD;
2599 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2600 if (!di)
2601 goto out_free_e;
2602
2603 di->digest_size = pi->size;
2604 di->digest = (((char *)di)+sizeof(struct digest_info));
2605
2606 peer_req->digest = di;
2607 peer_req->flags |= EE_HAS_DIGEST;
2608
2609 if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2610 goto out_free_e;
2611
2612 if (pi->cmd == P_CSUM_RS_REQUEST) {
2613 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2614 peer_req->w.cb = w_e_end_csum_rs_req;
2615 /* used in the sector offset progress display */
2616 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2617 /* remember to report stats in drbd_resync_finished */
2618 device->use_csums = true;
2619 } else if (pi->cmd == P_OV_REPLY) {
2620 /* track progress, we may need to throttle */
2621 atomic_add(size >> 9, &device->rs_sect_in);
2622 peer_req->w.cb = w_e_end_ov_reply;
2623 dec_rs_pending(device);
2624 /* drbd_rs_begin_io done when we sent this request,
2625 * but accounting still needs to be done. */
2626 goto submit_for_resync;
2627 }
2628 break;
2629
2630 case P_OV_REQUEST:
2631 if (device->ov_start_sector == ~(sector_t)0 &&
2632 peer_device->connection->agreed_pro_version >= 90) {
2633 unsigned long now = jiffies;
2634 int i;
2635 device->ov_start_sector = sector;
2636 device->ov_position = sector;
2637 device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2638 device->rs_total = device->ov_left;
2639 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2640 device->rs_mark_left[i] = device->ov_left;
2641 device->rs_mark_time[i] = now;
2642 }
2643 drbd_info(device, "Online Verify start sector: %llu\n",
2644 (unsigned long long)sector);
2645 }
2646 peer_req->w.cb = w_e_end_ov_req;
2647 fault_type = DRBD_FAULT_RS_RD;
2648 break;
2649
2650 default:
2651 BUG();
2652 }
2653
2654 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2655 * wrt the receiver, but it is not as straightforward as it may seem.
2656 * Various places in the resync start and stop logic assume resync
2657 * requests are processed in order, requeuing this on the worker thread
2658 * introduces a bunch of new code for synchronization between threads.
2659 *
2660 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2661 * "forever", throttling after drbd_rs_begin_io will lock that extent
2662 * for application writes for the same time. For now, just throttle
2663 * here, where the rest of the code expects the receiver to sleep for
2664 * a while, anyways.
2665 */
2666
2667 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2668 * this defers syncer requests for some time, before letting at least
2669 * on request through. The resync controller on the receiving side
2670 * will adapt to the incoming rate accordingly.
2671 *
2672 * We cannot throttle here if remote is Primary/SyncTarget:
2673 * we would also throttle its application reads.
2674 * In that case, throttling is done on the SyncTarget only.
2675 */
2676
2677 /* Even though this may be a resync request, we do add to "read_ee";
2678 * "sync_ee" is only used for resync WRITEs.
2679 * Add to list early, so debugfs can find this request
2680 * even if we have to sleep below. */
2681 spin_lock_irq(&device->resource->req_lock);
2682 list_add_tail(&peer_req->w.list, &device->read_ee);
2683 spin_unlock_irq(&device->resource->req_lock);
2684
2685 update_receiver_timing_details(connection, drbd_rs_should_slow_down);
2686 if (device->state.peer != R_PRIMARY
2687 && drbd_rs_should_slow_down(device, sector, false))
2688 schedule_timeout_uninterruptible(HZ/10);
2689 update_receiver_timing_details(connection, drbd_rs_begin_io);
2690 if (drbd_rs_begin_io(device, sector))
2691 goto out_free_e;
2692
2693 submit_for_resync:
2694 atomic_add(size >> 9, &device->rs_sect_ev);
2695
2696 submit:
2697 update_receiver_timing_details(connection, drbd_submit_peer_request);
2698 inc_unacked(device);
2699 if (drbd_submit_peer_request(device, peer_req, READ, fault_type) == 0)
2700 return 0;
2701
2702 /* don't care for the reason here */
2703 drbd_err(device, "submit failed, triggering re-connect\n");
2704
2705 out_free_e:
2706 spin_lock_irq(&device->resource->req_lock);
2707 list_del(&peer_req->w.list);
2708 spin_unlock_irq(&device->resource->req_lock);
2709 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2710
2711 put_ldev(device);
2712 drbd_free_peer_req(device, peer_req);
2713 return -EIO;
2714 }
2715
2716 /**
2717 * drbd_asb_recover_0p - Recover after split-brain with no remaining primaries
2718 */
2719 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2720 {
2721 struct drbd_device *device = peer_device->device;
2722 int self, peer, rv = -100;
2723 unsigned long ch_self, ch_peer;
2724 enum drbd_after_sb_p after_sb_0p;
2725
2726 self = device->ldev->md.uuid[UI_BITMAP] & 1;
2727 peer = device->p_uuid[UI_BITMAP] & 1;
2728
2729 ch_peer = device->p_uuid[UI_SIZE];
2730 ch_self = device->comm_bm_set;
2731
2732 rcu_read_lock();
2733 after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2734 rcu_read_unlock();
2735 switch (after_sb_0p) {
2736 case ASB_CONSENSUS:
2737 case ASB_DISCARD_SECONDARY:
2738 case ASB_CALL_HELPER:
2739 case ASB_VIOLENTLY:
2740 drbd_err(device, "Configuration error.\n");
2741 break;
2742 case ASB_DISCONNECT:
2743 break;
2744 case ASB_DISCARD_YOUNGER_PRI:
2745 if (self == 0 && peer == 1) {
2746 rv = -1;
2747 break;
2748 }
2749 if (self == 1 && peer == 0) {
2750 rv = 1;
2751 break;
2752 }
2753 /* Else fall through to one of the other strategies... */
2754 case ASB_DISCARD_OLDER_PRI:
2755 if (self == 0 && peer == 1) {
2756 rv = 1;
2757 break;
2758 }
2759 if (self == 1 && peer == 0) {
2760 rv = -1;
2761 break;
2762 }
2763 /* Else fall through to one of the other strategies... */
2764 drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2765 "Using discard-least-changes instead\n");
2766 case ASB_DISCARD_ZERO_CHG:
2767 if (ch_peer == 0 && ch_self == 0) {
2768 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2769 ? -1 : 1;
2770 break;
2771 } else {
2772 if (ch_peer == 0) { rv = 1; break; }
2773 if (ch_self == 0) { rv = -1; break; }
2774 }
2775 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2776 break;
2777 case ASB_DISCARD_LEAST_CHG:
2778 if (ch_self < ch_peer)
2779 rv = -1;
2780 else if (ch_self > ch_peer)
2781 rv = 1;
2782 else /* ( ch_self == ch_peer ) */
2783 /* Well, then use something else. */
2784 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2785 ? -1 : 1;
2786 break;
2787 case ASB_DISCARD_LOCAL:
2788 rv = -1;
2789 break;
2790 case ASB_DISCARD_REMOTE:
2791 rv = 1;
2792 }
2793
2794 return rv;
2795 }
2796
2797 /**
2798 * drbd_asb_recover_1p - Recover after split-brain with one remaining primary
2799 */
2800 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
2801 {
2802 struct drbd_device *device = peer_device->device;
2803 int hg, rv = -100;
2804 enum drbd_after_sb_p after_sb_1p;
2805
2806 rcu_read_lock();
2807 after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
2808 rcu_read_unlock();
2809 switch (after_sb_1p) {
2810 case ASB_DISCARD_YOUNGER_PRI:
2811 case ASB_DISCARD_OLDER_PRI:
2812 case ASB_DISCARD_LEAST_CHG:
2813 case ASB_DISCARD_LOCAL:
2814 case ASB_DISCARD_REMOTE:
2815 case ASB_DISCARD_ZERO_CHG:
2816 drbd_err(device, "Configuration error.\n");
2817 break;
2818 case ASB_DISCONNECT:
2819 break;
2820 case ASB_CONSENSUS:
2821 hg = drbd_asb_recover_0p(peer_device);
2822 if (hg == -1 && device->state.role == R_SECONDARY)
2823 rv = hg;
2824 if (hg == 1 && device->state.role == R_PRIMARY)
2825 rv = hg;
2826 break;
2827 case ASB_VIOLENTLY:
2828 rv = drbd_asb_recover_0p(peer_device);
2829 break;
2830 case ASB_DISCARD_SECONDARY:
2831 return device->state.role == R_PRIMARY ? 1 : -1;
2832 case ASB_CALL_HELPER:
2833 hg = drbd_asb_recover_0p(peer_device);
2834 if (hg == -1 && device->state.role == R_PRIMARY) {
2835 enum drbd_state_rv rv2;
2836
2837 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2838 * we might be here in C_WF_REPORT_PARAMS which is transient.
2839 * we do not need to wait for the after state change work either. */
2840 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2841 if (rv2 != SS_SUCCESS) {
2842 drbd_khelper(device, "pri-lost-after-sb");
2843 } else {
2844 drbd_warn(device, "Successfully gave up primary role.\n");
2845 rv = hg;
2846 }
2847 } else
2848 rv = hg;
2849 }
2850
2851 return rv;
2852 }
2853
2854 /**
2855 * drbd_asb_recover_2p - Recover after split-brain with two remaining primaries
2856 */
2857 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
2858 {
2859 struct drbd_device *device = peer_device->device;
2860 int hg, rv = -100;
2861 enum drbd_after_sb_p after_sb_2p;
2862
2863 rcu_read_lock();
2864 after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
2865 rcu_read_unlock();
2866 switch (after_sb_2p) {
2867 case ASB_DISCARD_YOUNGER_PRI:
2868 case ASB_DISCARD_OLDER_PRI:
2869 case ASB_DISCARD_LEAST_CHG:
2870 case ASB_DISCARD_LOCAL:
2871 case ASB_DISCARD_REMOTE:
2872 case ASB_CONSENSUS:
2873 case ASB_DISCARD_SECONDARY:
2874 case ASB_DISCARD_ZERO_CHG:
2875 drbd_err(device, "Configuration error.\n");
2876 break;
2877 case ASB_VIOLENTLY:
2878 rv = drbd_asb_recover_0p(peer_device);
2879 break;
2880 case ASB_DISCONNECT:
2881 break;
2882 case ASB_CALL_HELPER:
2883 hg = drbd_asb_recover_0p(peer_device);
2884 if (hg == -1) {
2885 enum drbd_state_rv rv2;
2886
2887 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2888 * we might be here in C_WF_REPORT_PARAMS which is transient.
2889 * we do not need to wait for the after state change work either. */
2890 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2891 if (rv2 != SS_SUCCESS) {
2892 drbd_khelper(device, "pri-lost-after-sb");
2893 } else {
2894 drbd_warn(device, "Successfully gave up primary role.\n");
2895 rv = hg;
2896 }
2897 } else
2898 rv = hg;
2899 }
2900
2901 return rv;
2902 }
2903
2904 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
2905 u64 bits, u64 flags)
2906 {
2907 if (!uuid) {
2908 drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
2909 return;
2910 }
2911 drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2912 text,
2913 (unsigned long long)uuid[UI_CURRENT],
2914 (unsigned long long)uuid[UI_BITMAP],
2915 (unsigned long long)uuid[UI_HISTORY_START],
2916 (unsigned long long)uuid[UI_HISTORY_END],
2917 (unsigned long long)bits,
2918 (unsigned long long)flags);
2919 }
2920
2921 /*
2922 100 after split brain try auto recover
2923 2 C_SYNC_SOURCE set BitMap
2924 1 C_SYNC_SOURCE use BitMap
2925 0 no Sync
2926 -1 C_SYNC_TARGET use BitMap
2927 -2 C_SYNC_TARGET set BitMap
2928 -100 after split brain, disconnect
2929 -1000 unrelated data
2930 -1091 requires proto 91
2931 -1096 requires proto 96
2932 */
2933 static int drbd_uuid_compare(struct drbd_device *const device, int *rule_nr) __must_hold(local)
2934 {
2935 struct drbd_peer_device *const peer_device = first_peer_device(device);
2936 struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
2937 u64 self, peer;
2938 int i, j;
2939
2940 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2941 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2942
2943 *rule_nr = 10;
2944 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2945 return 0;
2946
2947 *rule_nr = 20;
2948 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2949 peer != UUID_JUST_CREATED)
2950 return -2;
2951
2952 *rule_nr = 30;
2953 if (self != UUID_JUST_CREATED &&
2954 (peer == UUID_JUST_CREATED || peer == (u64)0))
2955 return 2;
2956
2957 if (self == peer) {
2958 int rct, dc; /* roles at crash time */
2959
2960 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2961
2962 if (connection->agreed_pro_version < 91)
2963 return -1091;
2964
2965 if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2966 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2967 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
2968 drbd_uuid_move_history(device);
2969 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
2970 device->ldev->md.uuid[UI_BITMAP] = 0;
2971
2972 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
2973 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
2974 *rule_nr = 34;
2975 } else {
2976 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
2977 *rule_nr = 36;
2978 }
2979
2980 return 1;
2981 }
2982
2983 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
2984
2985 if (connection->agreed_pro_version < 91)
2986 return -1091;
2987
2988 if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2989 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2990 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2991
2992 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
2993 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
2994 device->p_uuid[UI_BITMAP] = 0UL;
2995
2996 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2997 *rule_nr = 35;
2998 } else {
2999 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3000 *rule_nr = 37;
3001 }
3002
3003 return -1;
3004 }
3005
3006 /* Common power [off|failure] */
3007 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3008 (device->p_uuid[UI_FLAGS] & 2);
3009 /* lowest bit is set when we were primary,
3010 * next bit (weight 2) is set when peer was primary */
3011 *rule_nr = 40;
3012
3013 switch (rct) {
3014 case 0: /* !self_pri && !peer_pri */ return 0;
3015 case 1: /* self_pri && !peer_pri */ return 1;
3016 case 2: /* !self_pri && peer_pri */ return -1;
3017 case 3: /* self_pri && peer_pri */
3018 dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3019 return dc ? -1 : 1;
3020 }
3021 }
3022
3023 *rule_nr = 50;
3024 peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3025 if (self == peer)
3026 return -1;
3027
3028 *rule_nr = 51;
3029 peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3030 if (self == peer) {
3031 if (connection->agreed_pro_version < 96 ?
3032 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3033 (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3034 peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3035 /* The last P_SYNC_UUID did not get though. Undo the last start of
3036 resync as sync source modifications of the peer's UUIDs. */
3037
3038 if (connection->agreed_pro_version < 91)
3039 return -1091;
3040
3041 device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3042 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3043
3044 drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3045 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3046
3047 return -1;
3048 }
3049 }
3050
3051 *rule_nr = 60;
3052 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3053 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3054 peer = device->p_uuid[i] & ~((u64)1);
3055 if (self == peer)
3056 return -2;
3057 }
3058
3059 *rule_nr = 70;
3060 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3061 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3062 if (self == peer)
3063 return 1;
3064
3065 *rule_nr = 71;
3066 self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3067 if (self == peer) {
3068 if (connection->agreed_pro_version < 96 ?
3069 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3070 (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3071 self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3072 /* The last P_SYNC_UUID did not get though. Undo the last start of
3073 resync as sync source modifications of our UUIDs. */
3074
3075 if (connection->agreed_pro_version < 91)
3076 return -1091;
3077
3078 __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3079 __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3080
3081 drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3082 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3083 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3084
3085 return 1;
3086 }
3087 }
3088
3089
3090 *rule_nr = 80;
3091 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3092 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3093 self = device->ldev->md.uuid[i] & ~((u64)1);
3094 if (self == peer)
3095 return 2;
3096 }
3097
3098 *rule_nr = 90;
3099 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3100 peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3101 if (self == peer && self != ((u64)0))
3102 return 100;
3103
3104 *rule_nr = 100;
3105 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3106 self = device->ldev->md.uuid[i] & ~((u64)1);
3107 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3108 peer = device->p_uuid[j] & ~((u64)1);
3109 if (self == peer)
3110 return -100;
3111 }
3112 }
3113
3114 return -1000;
3115 }
3116
3117 /* drbd_sync_handshake() returns the new conn state on success, or
3118 CONN_MASK (-1) on failure.
3119 */
3120 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3121 enum drbd_role peer_role,
3122 enum drbd_disk_state peer_disk) __must_hold(local)
3123 {
3124 struct drbd_device *device = peer_device->device;
3125 enum drbd_conns rv = C_MASK;
3126 enum drbd_disk_state mydisk;
3127 struct net_conf *nc;
3128 int hg, rule_nr, rr_conflict, tentative;
3129
3130 mydisk = device->state.disk;
3131 if (mydisk == D_NEGOTIATING)
3132 mydisk = device->new_state_tmp.disk;
3133
3134 drbd_info(device, "drbd_sync_handshake:\n");
3135
3136 spin_lock_irq(&device->ldev->md.uuid_lock);
3137 drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3138 drbd_uuid_dump(device, "peer", device->p_uuid,
3139 device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3140
3141 hg = drbd_uuid_compare(device, &rule_nr);
3142 spin_unlock_irq(&device->ldev->md.uuid_lock);
3143
3144 drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3145
3146 if (hg == -1000) {
3147 drbd_alert(device, "Unrelated data, aborting!\n");
3148 return C_MASK;
3149 }
3150 if (hg < -1000) {
3151 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3152 return C_MASK;
3153 }
3154
3155 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3156 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
3157 int f = (hg == -100) || abs(hg) == 2;
3158 hg = mydisk > D_INCONSISTENT ? 1 : -1;
3159 if (f)
3160 hg = hg*2;
3161 drbd_info(device, "Becoming sync %s due to disk states.\n",
3162 hg > 0 ? "source" : "target");
3163 }
3164
3165 if (abs(hg) == 100)
3166 drbd_khelper(device, "initial-split-brain");
3167
3168 rcu_read_lock();
3169 nc = rcu_dereference(peer_device->connection->net_conf);
3170
3171 if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3172 int pcount = (device->state.role == R_PRIMARY)
3173 + (peer_role == R_PRIMARY);
3174 int forced = (hg == -100);
3175
3176 switch (pcount) {
3177 case 0:
3178 hg = drbd_asb_recover_0p(peer_device);
3179 break;
3180 case 1:
3181 hg = drbd_asb_recover_1p(peer_device);
3182 break;
3183 case 2:
3184 hg = drbd_asb_recover_2p(peer_device);
3185 break;
3186 }
3187 if (abs(hg) < 100) {
3188 drbd_warn(device, "Split-Brain detected, %d primaries, "
3189 "automatically solved. Sync from %s node\n",
3190 pcount, (hg < 0) ? "peer" : "this");
3191 if (forced) {
3192 drbd_warn(device, "Doing a full sync, since"
3193 " UUIDs where ambiguous.\n");
3194 hg = hg*2;
3195 }
3196 }
3197 }
3198
3199 if (hg == -100) {
3200 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3201 hg = -1;
3202 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3203 hg = 1;
3204
3205 if (abs(hg) < 100)
3206 drbd_warn(device, "Split-Brain detected, manually solved. "
3207 "Sync from %s node\n",
3208 (hg < 0) ? "peer" : "this");
3209 }
3210 rr_conflict = nc->rr_conflict;
3211 tentative = nc->tentative;
3212 rcu_read_unlock();
3213
3214 if (hg == -100) {
3215 /* FIXME this log message is not correct if we end up here
3216 * after an attempted attach on a diskless node.
3217 * We just refuse to attach -- well, we drop the "connection"
3218 * to that disk, in a way... */
3219 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3220 drbd_khelper(device, "split-brain");
3221 return C_MASK;
3222 }
3223
3224 if (hg > 0 && mydisk <= D_INCONSISTENT) {
3225 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3226 return C_MASK;
3227 }
3228
3229 if (hg < 0 && /* by intention we do not use mydisk here. */
3230 device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3231 switch (rr_conflict) {
3232 case ASB_CALL_HELPER:
3233 drbd_khelper(device, "pri-lost");
3234 /* fall through */
3235 case ASB_DISCONNECT:
3236 drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3237 return C_MASK;
3238 case ASB_VIOLENTLY:
3239 drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3240 "assumption\n");
3241 }
3242 }
3243
3244 if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3245 if (hg == 0)
3246 drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3247 else
3248 drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3249 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3250 abs(hg) >= 2 ? "full" : "bit-map based");
3251 return C_MASK;
3252 }
3253
3254 if (abs(hg) >= 2) {
3255 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3256 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3257 BM_LOCKED_SET_ALLOWED))
3258 return C_MASK;
3259 }
3260
3261 if (hg > 0) { /* become sync source. */
3262 rv = C_WF_BITMAP_S;
3263 } else if (hg < 0) { /* become sync target */
3264 rv = C_WF_BITMAP_T;
3265 } else {
3266 rv = C_CONNECTED;
3267 if (drbd_bm_total_weight(device)) {
3268 drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3269 drbd_bm_total_weight(device));
3270 }
3271 }
3272
3273 return rv;
3274 }
3275
3276 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3277 {
3278 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3279 if (peer == ASB_DISCARD_REMOTE)
3280 return ASB_DISCARD_LOCAL;
3281
3282 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3283 if (peer == ASB_DISCARD_LOCAL)
3284 return ASB_DISCARD_REMOTE;
3285
3286 /* everything else is valid if they are equal on both sides. */
3287 return peer;
3288 }
3289
3290 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3291 {
3292 struct p_protocol *p = pi->data;
3293 enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3294 int p_proto, p_discard_my_data, p_two_primaries, cf;
3295 struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3296 char integrity_alg[SHARED_SECRET_MAX] = "";
3297 struct crypto_hash *peer_integrity_tfm = NULL;
3298 void *int_dig_in = NULL, *int_dig_vv = NULL;
3299
3300 p_proto = be32_to_cpu(p->protocol);
3301 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
3302 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
3303 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
3304 p_two_primaries = be32_to_cpu(p->two_primaries);
3305 cf = be32_to_cpu(p->conn_flags);
3306 p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3307
3308 if (connection->agreed_pro_version >= 87) {
3309 int err;
3310
3311 if (pi->size > sizeof(integrity_alg))
3312 return -EIO;
3313 err = drbd_recv_all(connection, integrity_alg, pi->size);
3314 if (err)
3315 return err;
3316 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3317 }
3318
3319 if (pi->cmd != P_PROTOCOL_UPDATE) {
3320 clear_bit(CONN_DRY_RUN, &connection->flags);
3321
3322 if (cf & CF_DRY_RUN)
3323 set_bit(CONN_DRY_RUN, &connection->flags);
3324
3325 rcu_read_lock();
3326 nc = rcu_dereference(connection->net_conf);
3327
3328 if (p_proto != nc->wire_protocol) {
3329 drbd_err(connection, "incompatible %s settings\n", "protocol");
3330 goto disconnect_rcu_unlock;
3331 }
3332
3333 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3334 drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3335 goto disconnect_rcu_unlock;
3336 }
3337
3338 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3339 drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3340 goto disconnect_rcu_unlock;
3341 }
3342
3343 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3344 drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3345 goto disconnect_rcu_unlock;
3346 }
3347
3348 if (p_discard_my_data && nc->discard_my_data) {
3349 drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3350 goto disconnect_rcu_unlock;
3351 }
3352
3353 if (p_two_primaries != nc->two_primaries) {
3354 drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3355 goto disconnect_rcu_unlock;
3356 }
3357
3358 if (strcmp(integrity_alg, nc->integrity_alg)) {
3359 drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3360 goto disconnect_rcu_unlock;
3361 }
3362
3363 rcu_read_unlock();
3364 }
3365
3366 if (integrity_alg[0]) {
3367 int hash_size;
3368
3369 /*
3370 * We can only change the peer data integrity algorithm
3371 * here. Changing our own data integrity algorithm
3372 * requires that we send a P_PROTOCOL_UPDATE packet at
3373 * the same time; otherwise, the peer has no way to
3374 * tell between which packets the algorithm should
3375 * change.
3376 */
3377
3378 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3379 if (!peer_integrity_tfm) {
3380 drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3381 integrity_alg);
3382 goto disconnect;
3383 }
3384
3385 hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3386 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3387 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3388 if (!(int_dig_in && int_dig_vv)) {
3389 drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3390 goto disconnect;
3391 }
3392 }
3393
3394 new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3395 if (!new_net_conf) {
3396 drbd_err(connection, "Allocation of new net_conf failed\n");
3397 goto disconnect;
3398 }
3399
3400 mutex_lock(&connection->data.mutex);
3401 mutex_lock(&connection->resource->conf_update);
3402 old_net_conf = connection->net_conf;
3403 *new_net_conf = *old_net_conf;
3404
3405 new_net_conf->wire_protocol = p_proto;
3406 new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3407 new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3408 new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3409 new_net_conf->two_primaries = p_two_primaries;
3410
3411 rcu_assign_pointer(connection->net_conf, new_net_conf);
3412 mutex_unlock(&connection->resource->conf_update);
3413 mutex_unlock(&connection->data.mutex);
3414
3415 crypto_free_hash(connection->peer_integrity_tfm);
3416 kfree(connection->int_dig_in);
3417 kfree(connection->int_dig_vv);
3418 connection->peer_integrity_tfm = peer_integrity_tfm;
3419 connection->int_dig_in = int_dig_in;
3420 connection->int_dig_vv = int_dig_vv;
3421
3422 if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3423 drbd_info(connection, "peer data-integrity-alg: %s\n",
3424 integrity_alg[0] ? integrity_alg : "(none)");
3425
3426 synchronize_rcu();
3427 kfree(old_net_conf);
3428 return 0;
3429
3430 disconnect_rcu_unlock:
3431 rcu_read_unlock();
3432 disconnect:
3433 crypto_free_hash(peer_integrity_tfm);
3434 kfree(int_dig_in);
3435 kfree(int_dig_vv);
3436 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3437 return -EIO;
3438 }
3439
3440 /* helper function
3441 * input: alg name, feature name
3442 * return: NULL (alg name was "")
3443 * ERR_PTR(error) if something goes wrong
3444 * or the crypto hash ptr, if it worked out ok. */
3445 static struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3446 const char *alg, const char *name)
3447 {
3448 struct crypto_hash *tfm;
3449
3450 if (!alg[0])
3451 return NULL;
3452
3453 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3454 if (IS_ERR(tfm)) {
3455 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3456 alg, name, PTR_ERR(tfm));
3457 return tfm;
3458 }
3459 return tfm;
3460 }
3461
3462 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3463 {
3464 void *buffer = connection->data.rbuf;
3465 int size = pi->size;
3466
3467 while (size) {
3468 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3469 s = drbd_recv(connection, buffer, s);
3470 if (s <= 0) {
3471 if (s < 0)
3472 return s;
3473 break;
3474 }
3475 size -= s;
3476 }
3477 if (size)
3478 return -EIO;
3479 return 0;
3480 }
3481
3482 /*
3483 * config_unknown_volume - device configuration command for unknown volume
3484 *
3485 * When a device is added to an existing connection, the node on which the
3486 * device is added first will send configuration commands to its peer but the
3487 * peer will not know about the device yet. It will warn and ignore these
3488 * commands. Once the device is added on the second node, the second node will
3489 * send the same device configuration commands, but in the other direction.
3490 *
3491 * (We can also end up here if drbd is misconfigured.)
3492 */
3493 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3494 {
3495 drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3496 cmdname(pi->cmd), pi->vnr);
3497 return ignore_remaining_packet(connection, pi);
3498 }
3499
3500 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3501 {
3502 struct drbd_peer_device *peer_device;
3503 struct drbd_device *device;
3504 struct p_rs_param_95 *p;
3505 unsigned int header_size, data_size, exp_max_sz;
3506 struct crypto_hash *verify_tfm = NULL;
3507 struct crypto_hash *csums_tfm = NULL;
3508 struct net_conf *old_net_conf, *new_net_conf = NULL;
3509 struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3510 const int apv = connection->agreed_pro_version;
3511 struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3512 int fifo_size = 0;
3513 int err;
3514
3515 peer_device = conn_peer_device(connection, pi->vnr);
3516 if (!peer_device)
3517 return config_unknown_volume(connection, pi);
3518 device = peer_device->device;
3519
3520 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
3521 : apv == 88 ? sizeof(struct p_rs_param)
3522 + SHARED_SECRET_MAX
3523 : apv <= 94 ? sizeof(struct p_rs_param_89)
3524 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3525
3526 if (pi->size > exp_max_sz) {
3527 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3528 pi->size, exp_max_sz);
3529 return -EIO;
3530 }
3531
3532 if (apv <= 88) {
3533 header_size = sizeof(struct p_rs_param);
3534 data_size = pi->size - header_size;
3535 } else if (apv <= 94) {
3536 header_size = sizeof(struct p_rs_param_89);
3537 data_size = pi->size - header_size;
3538 D_ASSERT(device, data_size == 0);
3539 } else {
3540 header_size = sizeof(struct p_rs_param_95);
3541 data_size = pi->size - header_size;
3542 D_ASSERT(device, data_size == 0);
3543 }
3544
3545 /* initialize verify_alg and csums_alg */
3546 p = pi->data;
3547 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3548
3549 err = drbd_recv_all(peer_device->connection, p, header_size);
3550 if (err)
3551 return err;
3552
3553 mutex_lock(&connection->resource->conf_update);
3554 old_net_conf = peer_device->connection->net_conf;
3555 if (get_ldev(device)) {
3556 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3557 if (!new_disk_conf) {
3558 put_ldev(device);
3559 mutex_unlock(&connection->resource->conf_update);
3560 drbd_err(device, "Allocation of new disk_conf failed\n");
3561 return -ENOMEM;
3562 }
3563
3564 old_disk_conf = device->ldev->disk_conf;
3565 *new_disk_conf = *old_disk_conf;
3566
3567 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3568 }
3569
3570 if (apv >= 88) {
3571 if (apv == 88) {
3572 if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3573 drbd_err(device, "verify-alg of wrong size, "
3574 "peer wants %u, accepting only up to %u byte\n",
3575 data_size, SHARED_SECRET_MAX);
3576 err = -EIO;
3577 goto reconnect;
3578 }
3579
3580 err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3581 if (err)
3582 goto reconnect;
3583 /* we expect NUL terminated string */
3584 /* but just in case someone tries to be evil */
3585 D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3586 p->verify_alg[data_size-1] = 0;
3587
3588 } else /* apv >= 89 */ {
3589 /* we still expect NUL terminated strings */
3590 /* but just in case someone tries to be evil */
3591 D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3592 D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3593 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3594 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3595 }
3596
3597 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3598 if (device->state.conn == C_WF_REPORT_PARAMS) {
3599 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3600 old_net_conf->verify_alg, p->verify_alg);
3601 goto disconnect;
3602 }
3603 verify_tfm = drbd_crypto_alloc_digest_safe(device,
3604 p->verify_alg, "verify-alg");
3605 if (IS_ERR(verify_tfm)) {
3606 verify_tfm = NULL;
3607 goto disconnect;
3608 }
3609 }
3610
3611 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3612 if (device->state.conn == C_WF_REPORT_PARAMS) {
3613 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3614 old_net_conf->csums_alg, p->csums_alg);
3615 goto disconnect;
3616 }
3617 csums_tfm = drbd_crypto_alloc_digest_safe(device,
3618 p->csums_alg, "csums-alg");
3619 if (IS_ERR(csums_tfm)) {
3620 csums_tfm = NULL;
3621 goto disconnect;
3622 }
3623 }
3624
3625 if (apv > 94 && new_disk_conf) {
3626 new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3627 new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3628 new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3629 new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3630
3631 fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3632 if (fifo_size != device->rs_plan_s->size) {
3633 new_plan = fifo_alloc(fifo_size);
3634 if (!new_plan) {
3635 drbd_err(device, "kmalloc of fifo_buffer failed");
3636 put_ldev(device);
3637 goto disconnect;
3638 }
3639 }
3640 }
3641
3642 if (verify_tfm || csums_tfm) {
3643 new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3644 if (!new_net_conf) {
3645 drbd_err(device, "Allocation of new net_conf failed\n");
3646 goto disconnect;
3647 }
3648
3649 *new_net_conf = *old_net_conf;
3650
3651 if (verify_tfm) {
3652 strcpy(new_net_conf->verify_alg, p->verify_alg);
3653 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3654 crypto_free_hash(peer_device->connection->verify_tfm);
3655 peer_device->connection->verify_tfm = verify_tfm;
3656 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3657 }
3658 if (csums_tfm) {
3659 strcpy(new_net_conf->csums_alg, p->csums_alg);
3660 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3661 crypto_free_hash(peer_device->connection->csums_tfm);
3662 peer_device->connection->csums_tfm = csums_tfm;
3663 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3664 }
3665 rcu_assign_pointer(connection->net_conf, new_net_conf);
3666 }
3667 }
3668
3669 if (new_disk_conf) {
3670 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3671 put_ldev(device);
3672 }
3673
3674 if (new_plan) {
3675 old_plan = device->rs_plan_s;
3676 rcu_assign_pointer(device->rs_plan_s, new_plan);
3677 }
3678
3679 mutex_unlock(&connection->resource->conf_update);
3680 synchronize_rcu();
3681 if (new_net_conf)
3682 kfree(old_net_conf);
3683 kfree(old_disk_conf);
3684 kfree(old_plan);
3685
3686 return 0;
3687
3688 reconnect:
3689 if (new_disk_conf) {
3690 put_ldev(device);
3691 kfree(new_disk_conf);
3692 }
3693 mutex_unlock(&connection->resource->conf_update);
3694 return -EIO;
3695
3696 disconnect:
3697 kfree(new_plan);
3698 if (new_disk_conf) {
3699 put_ldev(device);
3700 kfree(new_disk_conf);
3701 }
3702 mutex_unlock(&connection->resource->conf_update);
3703 /* just for completeness: actually not needed,
3704 * as this is not reached if csums_tfm was ok. */
3705 crypto_free_hash(csums_tfm);
3706 /* but free the verify_tfm again, if csums_tfm did not work out */
3707 crypto_free_hash(verify_tfm);
3708 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3709 return -EIO;
3710 }
3711
3712 /* warn if the arguments differ by more than 12.5% */
3713 static void warn_if_differ_considerably(struct drbd_device *device,
3714 const char *s, sector_t a, sector_t b)
3715 {
3716 sector_t d;
3717 if (a == 0 || b == 0)
3718 return;
3719 d = (a > b) ? (a - b) : (b - a);
3720 if (d > (a>>3) || d > (b>>3))
3721 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3722 (unsigned long long)a, (unsigned long long)b);
3723 }
3724
3725 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3726 {
3727 struct drbd_peer_device *peer_device;
3728 struct drbd_device *device;
3729 struct p_sizes *p = pi->data;
3730 enum determine_dev_size dd = DS_UNCHANGED;
3731 sector_t p_size, p_usize, p_csize, my_usize;
3732 int ldsc = 0; /* local disk size changed */
3733 enum dds_flags ddsf;
3734
3735 peer_device = conn_peer_device(connection, pi->vnr);
3736 if (!peer_device)
3737 return config_unknown_volume(connection, pi);
3738 device = peer_device->device;
3739
3740 p_size = be64_to_cpu(p->d_size);
3741 p_usize = be64_to_cpu(p->u_size);
3742 p_csize = be64_to_cpu(p->c_size);
3743
3744 /* just store the peer's disk size for now.
3745 * we still need to figure out whether we accept that. */
3746 device->p_size = p_size;
3747
3748 if (get_ldev(device)) {
3749 rcu_read_lock();
3750 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
3751 rcu_read_unlock();
3752
3753 warn_if_differ_considerably(device, "lower level device sizes",
3754 p_size, drbd_get_max_capacity(device->ldev));
3755 warn_if_differ_considerably(device, "user requested size",
3756 p_usize, my_usize);
3757
3758 /* if this is the first connect, or an otherwise expected
3759 * param exchange, choose the minimum */
3760 if (device->state.conn == C_WF_REPORT_PARAMS)
3761 p_usize = min_not_zero(my_usize, p_usize);
3762
3763 /* Never shrink a device with usable data during connect.
3764 But allow online shrinking if we are connected. */
3765 if (drbd_new_dev_size(device, device->ldev, p_usize, 0) <
3766 drbd_get_capacity(device->this_bdev) &&
3767 device->state.disk >= D_OUTDATED &&
3768 device->state.conn < C_CONNECTED) {
3769 drbd_err(device, "The peer's disk size is too small!\n");
3770 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3771 put_ldev(device);
3772 return -EIO;
3773 }
3774
3775 if (my_usize != p_usize) {
3776 struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3777
3778 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3779 if (!new_disk_conf) {
3780 drbd_err(device, "Allocation of new disk_conf failed\n");
3781 put_ldev(device);
3782 return -ENOMEM;
3783 }
3784
3785 mutex_lock(&connection->resource->conf_update);
3786 old_disk_conf = device->ldev->disk_conf;
3787 *new_disk_conf = *old_disk_conf;
3788 new_disk_conf->disk_size = p_usize;
3789
3790 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3791 mutex_unlock(&connection->resource->conf_update);
3792 synchronize_rcu();
3793 kfree(old_disk_conf);
3794
3795 drbd_info(device, "Peer sets u_size to %lu sectors\n",
3796 (unsigned long)my_usize);
3797 }
3798
3799 put_ldev(device);
3800 }
3801
3802 device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3803 /* Leave drbd_reconsider_max_bio_size() before drbd_determine_dev_size().
3804 In case we cleared the QUEUE_FLAG_DISCARD from our queue in
3805 drbd_reconsider_max_bio_size(), we can be sure that after
3806 drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
3807
3808 ddsf = be16_to_cpu(p->dds_flags);
3809 if (get_ldev(device)) {
3810 drbd_reconsider_max_bio_size(device, device->ldev);
3811 dd = drbd_determine_dev_size(device, ddsf, NULL);
3812 put_ldev(device);
3813 if (dd == DS_ERROR)
3814 return -EIO;
3815 drbd_md_sync(device);
3816 } else {
3817 /*
3818 * I am diskless, need to accept the peer's *current* size.
3819 * I must NOT accept the peers backing disk size,
3820 * it may have been larger than mine all along...
3821 *
3822 * At this point, the peer knows more about my disk, or at
3823 * least about what we last agreed upon, than myself.
3824 * So if his c_size is less than his d_size, the most likely
3825 * reason is that *my* d_size was smaller last time we checked.
3826 *
3827 * However, if he sends a zero current size,
3828 * take his (user-capped or) backing disk size anyways.
3829 */
3830 drbd_reconsider_max_bio_size(device, NULL);
3831 drbd_set_my_capacity(device, p_csize ?: p_usize ?: p_size);
3832 }
3833
3834 if (get_ldev(device)) {
3835 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
3836 device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
3837 ldsc = 1;
3838 }
3839
3840 put_ldev(device);
3841 }
3842
3843 if (device->state.conn > C_WF_REPORT_PARAMS) {
3844 if (be64_to_cpu(p->c_size) !=
3845 drbd_get_capacity(device->this_bdev) || ldsc) {
3846 /* we have different sizes, probably peer
3847 * needs to know my new size... */
3848 drbd_send_sizes(peer_device, 0, ddsf);
3849 }
3850 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
3851 (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
3852 if (device->state.pdsk >= D_INCONSISTENT &&
3853 device->state.disk >= D_INCONSISTENT) {
3854 if (ddsf & DDSF_NO_RESYNC)
3855 drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
3856 else
3857 resync_after_online_grow(device);
3858 } else
3859 set_bit(RESYNC_AFTER_NEG, &device->flags);
3860 }
3861 }
3862
3863 return 0;
3864 }
3865
3866 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
3867 {
3868 struct drbd_peer_device *peer_device;
3869 struct drbd_device *device;
3870 struct p_uuids *p = pi->data;
3871 u64 *p_uuid;
3872 int i, updated_uuids = 0;
3873
3874 peer_device = conn_peer_device(connection, pi->vnr);
3875 if (!peer_device)
3876 return config_unknown_volume(connection, pi);
3877 device = peer_device->device;
3878
3879 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3880 if (!p_uuid) {
3881 drbd_err(device, "kmalloc of p_uuid failed\n");
3882 return false;
3883 }
3884
3885 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3886 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3887
3888 kfree(device->p_uuid);
3889 device->p_uuid = p_uuid;
3890
3891 if (device->state.conn < C_CONNECTED &&
3892 device->state.disk < D_INCONSISTENT &&
3893 device->state.role == R_PRIMARY &&
3894 (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3895 drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
3896 (unsigned long long)device->ed_uuid);
3897 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3898 return -EIO;
3899 }
3900
3901 if (get_ldev(device)) {
3902 int skip_initial_sync =
3903 device->state.conn == C_CONNECTED &&
3904 peer_device->connection->agreed_pro_version >= 90 &&
3905 device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3906 (p_uuid[UI_FLAGS] & 8);
3907 if (skip_initial_sync) {
3908 drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
3909 drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
3910 "clear_n_write from receive_uuids",
3911 BM_LOCKED_TEST_ALLOWED);
3912 _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
3913 _drbd_uuid_set(device, UI_BITMAP, 0);
3914 _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3915 CS_VERBOSE, NULL);
3916 drbd_md_sync(device);
3917 updated_uuids = 1;
3918 }
3919 put_ldev(device);
3920 } else if (device->state.disk < D_INCONSISTENT &&
3921 device->state.role == R_PRIMARY) {
3922 /* I am a diskless primary, the peer just created a new current UUID
3923 for me. */
3924 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3925 }
3926
3927 /* Before we test for the disk state, we should wait until an eventually
3928 ongoing cluster wide state change is finished. That is important if
3929 we are primary and are detaching from our disk. We need to see the
3930 new disk state... */
3931 mutex_lock(device->state_mutex);
3932 mutex_unlock(device->state_mutex);
3933 if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
3934 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3935
3936 if (updated_uuids)
3937 drbd_print_uuids(device, "receiver updated UUIDs to");
3938
3939 return 0;
3940 }
3941
3942 /**
3943 * convert_state() - Converts the peer's view of the cluster state to our point of view
3944 * @ps: The state as seen by the peer.
3945 */
3946 static union drbd_state convert_state(union drbd_state ps)
3947 {
3948 union drbd_state ms;
3949
3950 static enum drbd_conns c_tab[] = {
3951 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3952 [C_CONNECTED] = C_CONNECTED,
3953
3954 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3955 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3956 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3957 [C_VERIFY_S] = C_VERIFY_T,
3958 [C_MASK] = C_MASK,
3959 };
3960
3961 ms.i = ps.i;
3962
3963 ms.conn = c_tab[ps.conn];
3964 ms.peer = ps.role;
3965 ms.role = ps.peer;
3966 ms.pdsk = ps.disk;
3967 ms.disk = ps.pdsk;
3968 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3969
3970 return ms;
3971 }
3972
3973 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
3974 {
3975 struct drbd_peer_device *peer_device;
3976 struct drbd_device *device;
3977 struct p_req_state *p = pi->data;
3978 union drbd_state mask, val;
3979 enum drbd_state_rv rv;
3980
3981 peer_device = conn_peer_device(connection, pi->vnr);
3982 if (!peer_device)
3983 return -EIO;
3984 device = peer_device->device;
3985
3986 mask.i = be32_to_cpu(p->mask);
3987 val.i = be32_to_cpu(p->val);
3988
3989 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
3990 mutex_is_locked(device->state_mutex)) {
3991 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
3992 return 0;
3993 }
3994
3995 mask = convert_state(mask);
3996 val = convert_state(val);
3997
3998 rv = drbd_change_state(device, CS_VERBOSE, mask, val);
3999 drbd_send_sr_reply(peer_device, rv);
4000
4001 drbd_md_sync(device);
4002
4003 return 0;
4004 }
4005
4006 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4007 {
4008 struct p_req_state *p = pi->data;
4009 union drbd_state mask, val;
4010 enum drbd_state_rv rv;
4011
4012 mask.i = be32_to_cpu(p->mask);
4013 val.i = be32_to_cpu(p->val);
4014
4015 if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4016 mutex_is_locked(&connection->cstate_mutex)) {
4017 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4018 return 0;
4019 }
4020
4021 mask = convert_state(mask);
4022 val = convert_state(val);
4023
4024 rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4025 conn_send_sr_reply(connection, rv);
4026
4027 return 0;
4028 }
4029
4030 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4031 {
4032 struct drbd_peer_device *peer_device;
4033 struct drbd_device *device;
4034 struct p_state *p = pi->data;
4035 union drbd_state os, ns, peer_state;
4036 enum drbd_disk_state real_peer_disk;
4037 enum chg_state_flags cs_flags;
4038 int rv;
4039
4040 peer_device = conn_peer_device(connection, pi->vnr);
4041 if (!peer_device)
4042 return config_unknown_volume(connection, pi);
4043 device = peer_device->device;
4044
4045 peer_state.i = be32_to_cpu(p->state);
4046
4047 real_peer_disk = peer_state.disk;
4048 if (peer_state.disk == D_NEGOTIATING) {
4049 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4050 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4051 }
4052
4053 spin_lock_irq(&device->resource->req_lock);
4054 retry:
4055 os = ns = drbd_read_state(device);
4056 spin_unlock_irq(&device->resource->req_lock);
4057
4058 /* If some other part of the code (asender thread, timeout)
4059 * already decided to close the connection again,
4060 * we must not "re-establish" it here. */
4061 if (os.conn <= C_TEAR_DOWN)
4062 return -ECONNRESET;
4063
4064 /* If this is the "end of sync" confirmation, usually the peer disk
4065 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4066 * set) resync started in PausedSyncT, or if the timing of pause-/
4067 * unpause-sync events has been "just right", the peer disk may
4068 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4069 */
4070 if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4071 real_peer_disk == D_UP_TO_DATE &&
4072 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4073 /* If we are (becoming) SyncSource, but peer is still in sync
4074 * preparation, ignore its uptodate-ness to avoid flapping, it
4075 * will change to inconsistent once the peer reaches active
4076 * syncing states.
4077 * It may have changed syncer-paused flags, however, so we
4078 * cannot ignore this completely. */
4079 if (peer_state.conn > C_CONNECTED &&
4080 peer_state.conn < C_SYNC_SOURCE)
4081 real_peer_disk = D_INCONSISTENT;
4082
4083 /* if peer_state changes to connected at the same time,
4084 * it explicitly notifies us that it finished resync.
4085 * Maybe we should finish it up, too? */
4086 else if (os.conn >= C_SYNC_SOURCE &&
4087 peer_state.conn == C_CONNECTED) {
4088 if (drbd_bm_total_weight(device) <= device->rs_failed)
4089 drbd_resync_finished(device);
4090 return 0;
4091 }
4092 }
4093
4094 /* explicit verify finished notification, stop sector reached. */
4095 if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4096 peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4097 ov_out_of_sync_print(device);
4098 drbd_resync_finished(device);
4099 return 0;
4100 }
4101
4102 /* peer says his disk is inconsistent, while we think it is uptodate,
4103 * and this happens while the peer still thinks we have a sync going on,
4104 * but we think we are already done with the sync.
4105 * We ignore this to avoid flapping pdsk.
4106 * This should not happen, if the peer is a recent version of drbd. */
4107 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4108 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4109 real_peer_disk = D_UP_TO_DATE;
4110
4111 if (ns.conn == C_WF_REPORT_PARAMS)
4112 ns.conn = C_CONNECTED;
4113
4114 if (peer_state.conn == C_AHEAD)
4115 ns.conn = C_BEHIND;
4116
4117 if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4118 get_ldev_if_state(device, D_NEGOTIATING)) {
4119 int cr; /* consider resync */
4120
4121 /* if we established a new connection */
4122 cr = (os.conn < C_CONNECTED);
4123 /* if we had an established connection
4124 * and one of the nodes newly attaches a disk */
4125 cr |= (os.conn == C_CONNECTED &&
4126 (peer_state.disk == D_NEGOTIATING ||
4127 os.disk == D_NEGOTIATING));
4128 /* if we have both been inconsistent, and the peer has been
4129 * forced to be UpToDate with --overwrite-data */
4130 cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4131 /* if we had been plain connected, and the admin requested to
4132 * start a sync by "invalidate" or "invalidate-remote" */
4133 cr |= (os.conn == C_CONNECTED &&
4134 (peer_state.conn >= C_STARTING_SYNC_S &&
4135 peer_state.conn <= C_WF_BITMAP_T));
4136
4137 if (cr)
4138 ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4139
4140 put_ldev(device);
4141 if (ns.conn == C_MASK) {
4142 ns.conn = C_CONNECTED;
4143 if (device->state.disk == D_NEGOTIATING) {
4144 drbd_force_state(device, NS(disk, D_FAILED));
4145 } else if (peer_state.disk == D_NEGOTIATING) {
4146 drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4147 peer_state.disk = D_DISKLESS;
4148 real_peer_disk = D_DISKLESS;
4149 } else {
4150 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4151 return -EIO;
4152 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4153 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4154 return -EIO;
4155 }
4156 }
4157 }
4158
4159 spin_lock_irq(&device->resource->req_lock);
4160 if (os.i != drbd_read_state(device).i)
4161 goto retry;
4162 clear_bit(CONSIDER_RESYNC, &device->flags);
4163 ns.peer = peer_state.role;
4164 ns.pdsk = real_peer_disk;
4165 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4166 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4167 ns.disk = device->new_state_tmp.disk;
4168 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4169 if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4170 test_bit(NEW_CUR_UUID, &device->flags)) {
4171 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4172 for temporal network outages! */
4173 spin_unlock_irq(&device->resource->req_lock);
4174 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4175 tl_clear(peer_device->connection);
4176 drbd_uuid_new_current(device);
4177 clear_bit(NEW_CUR_UUID, &device->flags);
4178 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4179 return -EIO;
4180 }
4181 rv = _drbd_set_state(device, ns, cs_flags, NULL);
4182 ns = drbd_read_state(device);
4183 spin_unlock_irq(&device->resource->req_lock);
4184
4185 if (rv < SS_SUCCESS) {
4186 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4187 return -EIO;
4188 }
4189
4190 if (os.conn > C_WF_REPORT_PARAMS) {
4191 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4192 peer_state.disk != D_NEGOTIATING ) {
4193 /* we want resync, peer has not yet decided to sync... */
4194 /* Nowadays only used when forcing a node into primary role and
4195 setting its disk to UpToDate with that */
4196 drbd_send_uuids(peer_device);
4197 drbd_send_current_state(peer_device);
4198 }
4199 }
4200
4201 clear_bit(DISCARD_MY_DATA, &device->flags);
4202
4203 drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4204
4205 return 0;
4206 }
4207
4208 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4209 {
4210 struct drbd_peer_device *peer_device;
4211 struct drbd_device *device;
4212 struct p_rs_uuid *p = pi->data;
4213
4214 peer_device = conn_peer_device(connection, pi->vnr);
4215 if (!peer_device)
4216 return -EIO;
4217 device = peer_device->device;
4218
4219 wait_event(device->misc_wait,
4220 device->state.conn == C_WF_SYNC_UUID ||
4221 device->state.conn == C_BEHIND ||
4222 device->state.conn < C_CONNECTED ||
4223 device->state.disk < D_NEGOTIATING);
4224
4225 /* D_ASSERT(device, device->state.conn == C_WF_SYNC_UUID ); */
4226
4227 /* Here the _drbd_uuid_ functions are right, current should
4228 _not_ be rotated into the history */
4229 if (get_ldev_if_state(device, D_NEGOTIATING)) {
4230 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4231 _drbd_uuid_set(device, UI_BITMAP, 0UL);
4232
4233 drbd_print_uuids(device, "updated sync uuid");
4234 drbd_start_resync(device, C_SYNC_TARGET);
4235
4236 put_ldev(device);
4237 } else
4238 drbd_err(device, "Ignoring SyncUUID packet!\n");
4239
4240 return 0;
4241 }
4242
4243 /**
4244 * receive_bitmap_plain
4245 *
4246 * Return 0 when done, 1 when another iteration is needed, and a negative error
4247 * code upon failure.
4248 */
4249 static int
4250 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4251 unsigned long *p, struct bm_xfer_ctx *c)
4252 {
4253 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4254 drbd_header_size(peer_device->connection);
4255 unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4256 c->bm_words - c->word_offset);
4257 unsigned int want = num_words * sizeof(*p);
4258 int err;
4259
4260 if (want != size) {
4261 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4262 return -EIO;
4263 }
4264 if (want == 0)
4265 return 0;
4266 err = drbd_recv_all(peer_device->connection, p, want);
4267 if (err)
4268 return err;
4269
4270 drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4271
4272 c->word_offset += num_words;
4273 c->bit_offset = c->word_offset * BITS_PER_LONG;
4274 if (c->bit_offset > c->bm_bits)
4275 c->bit_offset = c->bm_bits;
4276
4277 return 1;
4278 }
4279
4280 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4281 {
4282 return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4283 }
4284
4285 static int dcbp_get_start(struct p_compressed_bm *p)
4286 {
4287 return (p->encoding & 0x80) != 0;
4288 }
4289
4290 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4291 {
4292 return (p->encoding >> 4) & 0x7;
4293 }
4294
4295 /**
4296 * recv_bm_rle_bits
4297 *
4298 * Return 0 when done, 1 when another iteration is needed, and a negative error
4299 * code upon failure.
4300 */
4301 static int
4302 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4303 struct p_compressed_bm *p,
4304 struct bm_xfer_ctx *c,
4305 unsigned int len)
4306 {
4307 struct bitstream bs;
4308 u64 look_ahead;
4309 u64 rl;
4310 u64 tmp;
4311 unsigned long s = c->bit_offset;
4312 unsigned long e;
4313 int toggle = dcbp_get_start(p);
4314 int have;
4315 int bits;
4316
4317 bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4318
4319 bits = bitstream_get_bits(&bs, &look_ahead, 64);
4320 if (bits < 0)
4321 return -EIO;
4322
4323 for (have = bits; have > 0; s += rl, toggle = !toggle) {
4324 bits = vli_decode_bits(&rl, look_ahead);
4325 if (bits <= 0)
4326 return -EIO;
4327
4328 if (toggle) {
4329 e = s + rl -1;
4330 if (e >= c->bm_bits) {
4331 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4332 return -EIO;
4333 }
4334 _drbd_bm_set_bits(peer_device->device, s, e);
4335 }
4336
4337 if (have < bits) {
4338 drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4339 have, bits, look_ahead,
4340 (unsigned int)(bs.cur.b - p->code),
4341 (unsigned int)bs.buf_len);
4342 return -EIO;
4343 }
4344 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4345 if (likely(bits < 64))
4346 look_ahead >>= bits;
4347 else
4348 look_ahead = 0;
4349 have -= bits;
4350
4351 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4352 if (bits < 0)
4353 return -EIO;
4354 look_ahead |= tmp << have;
4355 have += bits;
4356 }
4357
4358 c->bit_offset = s;
4359 bm_xfer_ctx_bit_to_word_offset(c);
4360
4361 return (s != c->bm_bits);
4362 }
4363
4364 /**
4365 * decode_bitmap_c
4366 *
4367 * Return 0 when done, 1 when another iteration is needed, and a negative error
4368 * code upon failure.
4369 */
4370 static int
4371 decode_bitmap_c(struct drbd_peer_device *peer_device,
4372 struct p_compressed_bm *p,
4373 struct bm_xfer_ctx *c,
4374 unsigned int len)
4375 {
4376 if (dcbp_get_code(p) == RLE_VLI_Bits)
4377 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4378
4379 /* other variants had been implemented for evaluation,
4380 * but have been dropped as this one turned out to be "best"
4381 * during all our tests. */
4382
4383 drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4384 conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4385 return -EIO;
4386 }
4387
4388 void INFO_bm_xfer_stats(struct drbd_device *device,
4389 const char *direction, struct bm_xfer_ctx *c)
4390 {
4391 /* what would it take to transfer it "plaintext" */
4392 unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4393 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4394 unsigned int plain =
4395 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4396 c->bm_words * sizeof(unsigned long);
4397 unsigned int total = c->bytes[0] + c->bytes[1];
4398 unsigned int r;
4399
4400 /* total can not be zero. but just in case: */
4401 if (total == 0)
4402 return;
4403
4404 /* don't report if not compressed */
4405 if (total >= plain)
4406 return;
4407
4408 /* total < plain. check for overflow, still */
4409 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4410 : (1000 * total / plain);
4411
4412 if (r > 1000)
4413 r = 1000;
4414
4415 r = 1000 - r;
4416 drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4417 "total %u; compression: %u.%u%%\n",
4418 direction,
4419 c->bytes[1], c->packets[1],
4420 c->bytes[0], c->packets[0],
4421 total, r/10, r % 10);
4422 }
4423
4424 /* Since we are processing the bitfield from lower addresses to higher,
4425 it does not matter if the process it in 32 bit chunks or 64 bit
4426 chunks as long as it is little endian. (Understand it as byte stream,
4427 beginning with the lowest byte...) If we would use big endian
4428 we would need to process it from the highest address to the lowest,
4429 in order to be agnostic to the 32 vs 64 bits issue.
4430
4431 returns 0 on failure, 1 if we successfully received it. */
4432 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4433 {
4434 struct drbd_peer_device *peer_device;
4435 struct drbd_device *device;
4436 struct bm_xfer_ctx c;
4437 int err;
4438
4439 peer_device = conn_peer_device(connection, pi->vnr);
4440 if (!peer_device)
4441 return -EIO;
4442 device = peer_device->device;
4443
4444 drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4445 /* you are supposed to send additional out-of-sync information
4446 * if you actually set bits during this phase */
4447
4448 c = (struct bm_xfer_ctx) {
4449 .bm_bits = drbd_bm_bits(device),
4450 .bm_words = drbd_bm_words(device),
4451 };
4452
4453 for(;;) {
4454 if (pi->cmd == P_BITMAP)
4455 err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4456 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4457 /* MAYBE: sanity check that we speak proto >= 90,
4458 * and the feature is enabled! */
4459 struct p_compressed_bm *p = pi->data;
4460
4461 if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4462 drbd_err(device, "ReportCBitmap packet too large\n");
4463 err = -EIO;
4464 goto out;
4465 }
4466 if (pi->size <= sizeof(*p)) {
4467 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4468 err = -EIO;
4469 goto out;
4470 }
4471 err = drbd_recv_all(peer_device->connection, p, pi->size);
4472 if (err)
4473 goto out;
4474 err = decode_bitmap_c(peer_device, p, &c, pi->size);
4475 } else {
4476 drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4477 err = -EIO;
4478 goto out;
4479 }
4480
4481 c.packets[pi->cmd == P_BITMAP]++;
4482 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4483
4484 if (err <= 0) {
4485 if (err < 0)
4486 goto out;
4487 break;
4488 }
4489 err = drbd_recv_header(peer_device->connection, pi);
4490 if (err)
4491 goto out;
4492 }
4493
4494 INFO_bm_xfer_stats(device, "receive", &c);
4495
4496 if (device->state.conn == C_WF_BITMAP_T) {
4497 enum drbd_state_rv rv;
4498
4499 err = drbd_send_bitmap(device);
4500 if (err)
4501 goto out;
4502 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4503 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4504 D_ASSERT(device, rv == SS_SUCCESS);
4505 } else if (device->state.conn != C_WF_BITMAP_S) {
4506 /* admin may have requested C_DISCONNECTING,
4507 * other threads may have noticed network errors */
4508 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4509 drbd_conn_str(device->state.conn));
4510 }
4511 err = 0;
4512
4513 out:
4514 drbd_bm_unlock(device);
4515 if (!err && device->state.conn == C_WF_BITMAP_S)
4516 drbd_start_resync(device, C_SYNC_SOURCE);
4517 return err;
4518 }
4519
4520 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4521 {
4522 drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4523 pi->cmd, pi->size);
4524
4525 return ignore_remaining_packet(connection, pi);
4526 }
4527
4528 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4529 {
4530 /* Make sure we've acked all the TCP data associated
4531 * with the data requests being unplugged */
4532 drbd_tcp_quickack(connection->data.socket);
4533
4534 return 0;
4535 }
4536
4537 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4538 {
4539 struct drbd_peer_device *peer_device;
4540 struct drbd_device *device;
4541 struct p_block_desc *p = pi->data;
4542
4543 peer_device = conn_peer_device(connection, pi->vnr);
4544 if (!peer_device)
4545 return -EIO;
4546 device = peer_device->device;
4547
4548 switch (device->state.conn) {
4549 case C_WF_SYNC_UUID:
4550 case C_WF_BITMAP_T:
4551 case C_BEHIND:
4552 break;
4553 default:
4554 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4555 drbd_conn_str(device->state.conn));
4556 }
4557
4558 drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4559
4560 return 0;
4561 }
4562
4563 struct data_cmd {
4564 int expect_payload;
4565 size_t pkt_size;
4566 int (*fn)(struct drbd_connection *, struct packet_info *);
4567 };
4568
4569 static struct data_cmd drbd_cmd_handler[] = {
4570 [P_DATA] = { 1, sizeof(struct p_data), receive_Data },
4571 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
4572 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4573 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4574 [P_BITMAP] = { 1, 0, receive_bitmap } ,
4575 [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4576 [P_UNPLUG_REMOTE] = { 0, 0, receive_UnplugRemote },
4577 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4578 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4579 [P_SYNC_PARAM] = { 1, 0, receive_SyncParam },
4580 [P_SYNC_PARAM89] = { 1, 0, receive_SyncParam },
4581 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
4582 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
4583 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
4584 [P_STATE] = { 0, sizeof(struct p_state), receive_state },
4585 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
4586 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4587 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4588 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4589 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4590 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
4591 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4592 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4593 [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4594 [P_TRIM] = { 0, sizeof(struct p_trim), receive_Data },
4595 };
4596
4597 static void drbdd(struct drbd_connection *connection)
4598 {
4599 struct packet_info pi;
4600 size_t shs; /* sub header size */
4601 int err;
4602
4603 while (get_t_state(&connection->receiver) == RUNNING) {
4604 struct data_cmd *cmd;
4605
4606 drbd_thread_current_set_cpu(&connection->receiver);
4607 update_receiver_timing_details(connection, drbd_recv_header);
4608 if (drbd_recv_header(connection, &pi))
4609 goto err_out;
4610
4611 cmd = &drbd_cmd_handler[pi.cmd];
4612 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4613 drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4614 cmdname(pi.cmd), pi.cmd);
4615 goto err_out;
4616 }
4617
4618 shs = cmd->pkt_size;
4619 if (pi.size > shs && !cmd->expect_payload) {
4620 drbd_err(connection, "No payload expected %s l:%d\n",
4621 cmdname(pi.cmd), pi.size);
4622 goto err_out;
4623 }
4624
4625 if (shs) {
4626 update_receiver_timing_details(connection, drbd_recv_all_warn);
4627 err = drbd_recv_all_warn(connection, pi.data, shs);
4628 if (err)
4629 goto err_out;
4630 pi.size -= shs;
4631 }
4632
4633 update_receiver_timing_details(connection, cmd->fn);
4634 err = cmd->fn(connection, &pi);
4635 if (err) {
4636 drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
4637 cmdname(pi.cmd), err, pi.size);
4638 goto err_out;
4639 }
4640 }
4641 return;
4642
4643 err_out:
4644 conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4645 }
4646
4647 static void conn_disconnect(struct drbd_connection *connection)
4648 {
4649 struct drbd_peer_device *peer_device;
4650 enum drbd_conns oc;
4651 int vnr;
4652
4653 if (connection->cstate == C_STANDALONE)
4654 return;
4655
4656 /* We are about to start the cleanup after connection loss.
4657 * Make sure drbd_make_request knows about that.
4658 * Usually we should be in some network failure state already,
4659 * but just in case we are not, we fix it up here.
4660 */
4661 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4662
4663 /* asender does not clean up anything. it must not interfere, either */
4664 drbd_thread_stop(&connection->asender);
4665 drbd_free_sock(connection);
4666
4667 rcu_read_lock();
4668 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
4669 struct drbd_device *device = peer_device->device;
4670 kref_get(&device->kref);
4671 rcu_read_unlock();
4672 drbd_disconnected(peer_device);
4673 kref_put(&device->kref, drbd_destroy_device);
4674 rcu_read_lock();
4675 }
4676 rcu_read_unlock();
4677
4678 if (!list_empty(&connection->current_epoch->list))
4679 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4680 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4681 atomic_set(&connection->current_epoch->epoch_size, 0);
4682 connection->send.seen_any_write_yet = false;
4683
4684 drbd_info(connection, "Connection closed\n");
4685
4686 if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
4687 conn_try_outdate_peer_async(connection);
4688
4689 spin_lock_irq(&connection->resource->req_lock);
4690 oc = connection->cstate;
4691 if (oc >= C_UNCONNECTED)
4692 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4693
4694 spin_unlock_irq(&connection->resource->req_lock);
4695
4696 if (oc == C_DISCONNECTING)
4697 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4698 }
4699
4700 static int drbd_disconnected(struct drbd_peer_device *peer_device)
4701 {
4702 struct drbd_device *device = peer_device->device;
4703 unsigned int i;
4704
4705 /* wait for current activity to cease. */
4706 spin_lock_irq(&device->resource->req_lock);
4707 _drbd_wait_ee_list_empty(device, &device->active_ee);
4708 _drbd_wait_ee_list_empty(device, &device->sync_ee);
4709 _drbd_wait_ee_list_empty(device, &device->read_ee);
4710 spin_unlock_irq(&device->resource->req_lock);
4711
4712 /* We do not have data structures that would allow us to
4713 * get the rs_pending_cnt down to 0 again.
4714 * * On C_SYNC_TARGET we do not have any data structures describing
4715 * the pending RSDataRequest's we have sent.
4716 * * On C_SYNC_SOURCE there is no data structure that tracks
4717 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4718 * And no, it is not the sum of the reference counts in the
4719 * resync_LRU. The resync_LRU tracks the whole operation including
4720 * the disk-IO, while the rs_pending_cnt only tracks the blocks
4721 * on the fly. */
4722 drbd_rs_cancel_all(device);
4723 device->rs_total = 0;
4724 device->rs_failed = 0;
4725 atomic_set(&device->rs_pending_cnt, 0);
4726 wake_up(&device->misc_wait);
4727
4728 del_timer_sync(&device->resync_timer);
4729 resync_timer_fn((unsigned long)device);
4730
4731 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4732 * w_make_resync_request etc. which may still be on the worker queue
4733 * to be "canceled" */
4734 drbd_flush_workqueue(&peer_device->connection->sender_work);
4735
4736 drbd_finish_peer_reqs(device);
4737
4738 /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4739 might have issued a work again. The one before drbd_finish_peer_reqs() is
4740 necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4741 drbd_flush_workqueue(&peer_device->connection->sender_work);
4742
4743 /* need to do it again, drbd_finish_peer_reqs() may have populated it
4744 * again via drbd_try_clear_on_disk_bm(). */
4745 drbd_rs_cancel_all(device);
4746
4747 kfree(device->p_uuid);
4748 device->p_uuid = NULL;
4749
4750 if (!drbd_suspended(device))
4751 tl_clear(peer_device->connection);
4752
4753 drbd_md_sync(device);
4754
4755 /* serialize with bitmap writeout triggered by the state change,
4756 * if any. */
4757 wait_event(device->misc_wait, !test_bit(BITMAP_IO, &device->flags));
4758
4759 /* tcp_close and release of sendpage pages can be deferred. I don't
4760 * want to use SO_LINGER, because apparently it can be deferred for
4761 * more than 20 seconds (longest time I checked).
4762 *
4763 * Actually we don't care for exactly when the network stack does its
4764 * put_page(), but release our reference on these pages right here.
4765 */
4766 i = drbd_free_peer_reqs(device, &device->net_ee);
4767 if (i)
4768 drbd_info(device, "net_ee not empty, killed %u entries\n", i);
4769 i = atomic_read(&device->pp_in_use_by_net);
4770 if (i)
4771 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
4772 i = atomic_read(&device->pp_in_use);
4773 if (i)
4774 drbd_info(device, "pp_in_use = %d, expected 0\n", i);
4775
4776 D_ASSERT(device, list_empty(&device->read_ee));
4777 D_ASSERT(device, list_empty(&device->active_ee));
4778 D_ASSERT(device, list_empty(&device->sync_ee));
4779 D_ASSERT(device, list_empty(&device->done_ee));
4780
4781 return 0;
4782 }
4783
4784 /*
4785 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4786 * we can agree on is stored in agreed_pro_version.
4787 *
4788 * feature flags and the reserved array should be enough room for future
4789 * enhancements of the handshake protocol, and possible plugins...
4790 *
4791 * for now, they are expected to be zero, but ignored.
4792 */
4793 static int drbd_send_features(struct drbd_connection *connection)
4794 {
4795 struct drbd_socket *sock;
4796 struct p_connection_features *p;
4797
4798 sock = &connection->data;
4799 p = conn_prepare_command(connection, sock);
4800 if (!p)
4801 return -EIO;
4802 memset(p, 0, sizeof(*p));
4803 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4804 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4805 p->feature_flags = cpu_to_be32(PRO_FEATURES);
4806 return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4807 }
4808
4809 /*
4810 * return values:
4811 * 1 yes, we have a valid connection
4812 * 0 oops, did not work out, please try again
4813 * -1 peer talks different language,
4814 * no point in trying again, please go standalone.
4815 */
4816 static int drbd_do_features(struct drbd_connection *connection)
4817 {
4818 /* ASSERT current == connection->receiver ... */
4819 struct p_connection_features *p;
4820 const int expect = sizeof(struct p_connection_features);
4821 struct packet_info pi;
4822 int err;
4823
4824 err = drbd_send_features(connection);
4825 if (err)
4826 return 0;
4827
4828 err = drbd_recv_header(connection, &pi);
4829 if (err)
4830 return 0;
4831
4832 if (pi.cmd != P_CONNECTION_FEATURES) {
4833 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4834 cmdname(pi.cmd), pi.cmd);
4835 return -1;
4836 }
4837
4838 if (pi.size != expect) {
4839 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
4840 expect, pi.size);
4841 return -1;
4842 }
4843
4844 p = pi.data;
4845 err = drbd_recv_all_warn(connection, p, expect);
4846 if (err)
4847 return 0;
4848
4849 p->protocol_min = be32_to_cpu(p->protocol_min);
4850 p->protocol_max = be32_to_cpu(p->protocol_max);
4851 if (p->protocol_max == 0)
4852 p->protocol_max = p->protocol_min;
4853
4854 if (PRO_VERSION_MAX < p->protocol_min ||
4855 PRO_VERSION_MIN > p->protocol_max)
4856 goto incompat;
4857
4858 connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4859 connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
4860
4861 drbd_info(connection, "Handshake successful: "
4862 "Agreed network protocol version %d\n", connection->agreed_pro_version);
4863
4864 drbd_info(connection, "Agreed to%ssupport TRIM on protocol level\n",
4865 connection->agreed_features & FF_TRIM ? " " : " not ");
4866
4867 return 1;
4868
4869 incompat:
4870 drbd_err(connection, "incompatible DRBD dialects: "
4871 "I support %d-%d, peer supports %d-%d\n",
4872 PRO_VERSION_MIN, PRO_VERSION_MAX,
4873 p->protocol_min, p->protocol_max);
4874 return -1;
4875 }
4876
4877 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4878 static int drbd_do_auth(struct drbd_connection *connection)
4879 {
4880 drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4881 drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4882 return -1;
4883 }
4884 #else
4885 #define CHALLENGE_LEN 64
4886
4887 /* Return value:
4888 1 - auth succeeded,
4889 0 - failed, try again (network error),
4890 -1 - auth failed, don't try again.
4891 */
4892
4893 static int drbd_do_auth(struct drbd_connection *connection)
4894 {
4895 struct drbd_socket *sock;
4896 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4897 struct scatterlist sg;
4898 char *response = NULL;
4899 char *right_response = NULL;
4900 char *peers_ch = NULL;
4901 unsigned int key_len;
4902 char secret[SHARED_SECRET_MAX]; /* 64 byte */
4903 unsigned int resp_size;
4904 struct hash_desc desc;
4905 struct packet_info pi;
4906 struct net_conf *nc;
4907 int err, rv;
4908
4909 /* FIXME: Put the challenge/response into the preallocated socket buffer. */
4910
4911 rcu_read_lock();
4912 nc = rcu_dereference(connection->net_conf);
4913 key_len = strlen(nc->shared_secret);
4914 memcpy(secret, nc->shared_secret, key_len);
4915 rcu_read_unlock();
4916
4917 desc.tfm = connection->cram_hmac_tfm;
4918 desc.flags = 0;
4919
4920 rv = crypto_hash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
4921 if (rv) {
4922 drbd_err(connection, "crypto_hash_setkey() failed with %d\n", rv);
4923 rv = -1;
4924 goto fail;
4925 }
4926
4927 get_random_bytes(my_challenge, CHALLENGE_LEN);
4928
4929 sock = &connection->data;
4930 if (!conn_prepare_command(connection, sock)) {
4931 rv = 0;
4932 goto fail;
4933 }
4934 rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
4935 my_challenge, CHALLENGE_LEN);
4936 if (!rv)
4937 goto fail;
4938
4939 err = drbd_recv_header(connection, &pi);
4940 if (err) {
4941 rv = 0;
4942 goto fail;
4943 }
4944
4945 if (pi.cmd != P_AUTH_CHALLENGE) {
4946 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4947 cmdname(pi.cmd), pi.cmd);
4948 rv = 0;
4949 goto fail;
4950 }
4951
4952 if (pi.size > CHALLENGE_LEN * 2) {
4953 drbd_err(connection, "expected AuthChallenge payload too big.\n");
4954 rv = -1;
4955 goto fail;
4956 }
4957
4958 if (pi.size < CHALLENGE_LEN) {
4959 drbd_err(connection, "AuthChallenge payload too small.\n");
4960 rv = -1;
4961 goto fail;
4962 }
4963
4964 peers_ch = kmalloc(pi.size, GFP_NOIO);
4965 if (peers_ch == NULL) {
4966 drbd_err(connection, "kmalloc of peers_ch failed\n");
4967 rv = -1;
4968 goto fail;
4969 }
4970
4971 err = drbd_recv_all_warn(connection, peers_ch, pi.size);
4972 if (err) {
4973 rv = 0;
4974 goto fail;
4975 }
4976
4977 if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
4978 drbd_err(connection, "Peer presented the same challenge!\n");
4979 rv = -1;
4980 goto fail;
4981 }
4982
4983 resp_size = crypto_hash_digestsize(connection->cram_hmac_tfm);
4984 response = kmalloc(resp_size, GFP_NOIO);
4985 if (response == NULL) {
4986 drbd_err(connection, "kmalloc of response failed\n");
4987 rv = -1;
4988 goto fail;
4989 }
4990
4991 sg_init_table(&sg, 1);
4992 sg_set_buf(&sg, peers_ch, pi.size);
4993
4994 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4995 if (rv) {
4996 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
4997 rv = -1;
4998 goto fail;
4999 }
5000
5001 if (!conn_prepare_command(connection, sock)) {
5002 rv = 0;
5003 goto fail;
5004 }
5005 rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5006 response, resp_size);
5007 if (!rv)
5008 goto fail;
5009
5010 err = drbd_recv_header(connection, &pi);
5011 if (err) {
5012 rv = 0;
5013 goto fail;
5014 }
5015
5016 if (pi.cmd != P_AUTH_RESPONSE) {
5017 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5018 cmdname(pi.cmd), pi.cmd);
5019 rv = 0;
5020 goto fail;
5021 }
5022
5023 if (pi.size != resp_size) {
5024 drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5025 rv = 0;
5026 goto fail;
5027 }
5028
5029 err = drbd_recv_all_warn(connection, response , resp_size);
5030 if (err) {
5031 rv = 0;
5032 goto fail;
5033 }
5034
5035 right_response = kmalloc(resp_size, GFP_NOIO);
5036 if (right_response == NULL) {
5037 drbd_err(connection, "kmalloc of right_response failed\n");
5038 rv = -1;
5039 goto fail;
5040 }
5041
5042 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
5043
5044 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
5045 if (rv) {
5046 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5047 rv = -1;
5048 goto fail;
5049 }
5050
5051 rv = !memcmp(response, right_response, resp_size);
5052
5053 if (rv)
5054 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5055 resp_size);
5056 else
5057 rv = -1;
5058
5059 fail:
5060 kfree(peers_ch);
5061 kfree(response);
5062 kfree(right_response);
5063
5064 return rv;
5065 }
5066 #endif
5067
5068 int drbd_receiver(struct drbd_thread *thi)
5069 {
5070 struct drbd_connection *connection = thi->connection;
5071 int h;
5072
5073 drbd_info(connection, "receiver (re)started\n");
5074
5075 do {
5076 h = conn_connect(connection);
5077 if (h == 0) {
5078 conn_disconnect(connection);
5079 schedule_timeout_interruptible(HZ);
5080 }
5081 if (h == -1) {
5082 drbd_warn(connection, "Discarding network configuration.\n");
5083 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5084 }
5085 } while (h == 0);
5086
5087 if (h > 0)
5088 drbdd(connection);
5089
5090 conn_disconnect(connection);
5091
5092 drbd_info(connection, "receiver terminated\n");
5093 return 0;
5094 }
5095
5096 /* ********* acknowledge sender ******** */
5097
5098 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5099 {
5100 struct p_req_state_reply *p = pi->data;
5101 int retcode = be32_to_cpu(p->retcode);
5102
5103 if (retcode >= SS_SUCCESS) {
5104 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5105 } else {
5106 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5107 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5108 drbd_set_st_err_str(retcode), retcode);
5109 }
5110 wake_up(&connection->ping_wait);
5111
5112 return 0;
5113 }
5114
5115 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5116 {
5117 struct drbd_peer_device *peer_device;
5118 struct drbd_device *device;
5119 struct p_req_state_reply *p = pi->data;
5120 int retcode = be32_to_cpu(p->retcode);
5121
5122 peer_device = conn_peer_device(connection, pi->vnr);
5123 if (!peer_device)
5124 return -EIO;
5125 device = peer_device->device;
5126
5127 if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5128 D_ASSERT(device, connection->agreed_pro_version < 100);
5129 return got_conn_RqSReply(connection, pi);
5130 }
5131
5132 if (retcode >= SS_SUCCESS) {
5133 set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5134 } else {
5135 set_bit(CL_ST_CHG_FAIL, &device->flags);
5136 drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5137 drbd_set_st_err_str(retcode), retcode);
5138 }
5139 wake_up(&device->state_wait);
5140
5141 return 0;
5142 }
5143
5144 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5145 {
5146 return drbd_send_ping_ack(connection);
5147
5148 }
5149
5150 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5151 {
5152 /* restore idle timeout */
5153 connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5154 if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5155 wake_up(&connection->ping_wait);
5156
5157 return 0;
5158 }
5159
5160 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5161 {
5162 struct drbd_peer_device *peer_device;
5163 struct drbd_device *device;
5164 struct p_block_ack *p = pi->data;
5165 sector_t sector = be64_to_cpu(p->sector);
5166 int blksize = be32_to_cpu(p->blksize);
5167
5168 peer_device = conn_peer_device(connection, pi->vnr);
5169 if (!peer_device)
5170 return -EIO;
5171 device = peer_device->device;
5172
5173 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5174
5175 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5176
5177 if (get_ldev(device)) {
5178 drbd_rs_complete_io(device, sector);
5179 drbd_set_in_sync(device, sector, blksize);
5180 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5181 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5182 put_ldev(device);
5183 }
5184 dec_rs_pending(device);
5185 atomic_add(blksize >> 9, &device->rs_sect_in);
5186
5187 return 0;
5188 }
5189
5190 static int
5191 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5192 struct rb_root *root, const char *func,
5193 enum drbd_req_event what, bool missing_ok)
5194 {
5195 struct drbd_request *req;
5196 struct bio_and_error m;
5197
5198 spin_lock_irq(&device->resource->req_lock);
5199 req = find_request(device, root, id, sector, missing_ok, func);
5200 if (unlikely(!req)) {
5201 spin_unlock_irq(&device->resource->req_lock);
5202 return -EIO;
5203 }
5204 __req_mod(req, what, &m);
5205 spin_unlock_irq(&device->resource->req_lock);
5206
5207 if (m.bio)
5208 complete_master_bio(device, &m);
5209 return 0;
5210 }
5211
5212 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5213 {
5214 struct drbd_peer_device *peer_device;
5215 struct drbd_device *device;
5216 struct p_block_ack *p = pi->data;
5217 sector_t sector = be64_to_cpu(p->sector);
5218 int blksize = be32_to_cpu(p->blksize);
5219 enum drbd_req_event what;
5220
5221 peer_device = conn_peer_device(connection, pi->vnr);
5222 if (!peer_device)
5223 return -EIO;
5224 device = peer_device->device;
5225
5226 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5227
5228 if (p->block_id == ID_SYNCER) {
5229 drbd_set_in_sync(device, sector, blksize);
5230 dec_rs_pending(device);
5231 return 0;
5232 }
5233 switch (pi->cmd) {
5234 case P_RS_WRITE_ACK:
5235 what = WRITE_ACKED_BY_PEER_AND_SIS;
5236 break;
5237 case P_WRITE_ACK:
5238 what = WRITE_ACKED_BY_PEER;
5239 break;
5240 case P_RECV_ACK:
5241 what = RECV_ACKED_BY_PEER;
5242 break;
5243 case P_SUPERSEDED:
5244 what = CONFLICT_RESOLVED;
5245 break;
5246 case P_RETRY_WRITE:
5247 what = POSTPONE_WRITE;
5248 break;
5249 default:
5250 BUG();
5251 }
5252
5253 return validate_req_change_req_state(device, p->block_id, sector,
5254 &device->write_requests, __func__,
5255 what, false);
5256 }
5257
5258 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5259 {
5260 struct drbd_peer_device *peer_device;
5261 struct drbd_device *device;
5262 struct p_block_ack *p = pi->data;
5263 sector_t sector = be64_to_cpu(p->sector);
5264 int size = be32_to_cpu(p->blksize);
5265 int err;
5266
5267 peer_device = conn_peer_device(connection, pi->vnr);
5268 if (!peer_device)
5269 return -EIO;
5270 device = peer_device->device;
5271
5272 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5273
5274 if (p->block_id == ID_SYNCER) {
5275 dec_rs_pending(device);
5276 drbd_rs_failed_io(device, sector, size);
5277 return 0;
5278 }
5279
5280 err = validate_req_change_req_state(device, p->block_id, sector,
5281 &device->write_requests, __func__,
5282 NEG_ACKED, true);
5283 if (err) {
5284 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5285 The master bio might already be completed, therefore the
5286 request is no longer in the collision hash. */
5287 /* In Protocol B we might already have got a P_RECV_ACK
5288 but then get a P_NEG_ACK afterwards. */
5289 drbd_set_out_of_sync(device, sector, size);
5290 }
5291 return 0;
5292 }
5293
5294 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5295 {
5296 struct drbd_peer_device *peer_device;
5297 struct drbd_device *device;
5298 struct p_block_ack *p = pi->data;
5299 sector_t sector = be64_to_cpu(p->sector);
5300
5301 peer_device = conn_peer_device(connection, pi->vnr);
5302 if (!peer_device)
5303 return -EIO;
5304 device = peer_device->device;
5305
5306 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5307
5308 drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5309 (unsigned long long)sector, be32_to_cpu(p->blksize));
5310
5311 return validate_req_change_req_state(device, p->block_id, sector,
5312 &device->read_requests, __func__,
5313 NEG_ACKED, false);
5314 }
5315
5316 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5317 {
5318 struct drbd_peer_device *peer_device;
5319 struct drbd_device *device;
5320 sector_t sector;
5321 int size;
5322 struct p_block_ack *p = pi->data;
5323
5324 peer_device = conn_peer_device(connection, pi->vnr);
5325 if (!peer_device)
5326 return -EIO;
5327 device = peer_device->device;
5328
5329 sector = be64_to_cpu(p->sector);
5330 size = be32_to_cpu(p->blksize);
5331
5332 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5333
5334 dec_rs_pending(device);
5335
5336 if (get_ldev_if_state(device, D_FAILED)) {
5337 drbd_rs_complete_io(device, sector);
5338 switch (pi->cmd) {
5339 case P_NEG_RS_DREPLY:
5340 drbd_rs_failed_io(device, sector, size);
5341 case P_RS_CANCEL:
5342 break;
5343 default:
5344 BUG();
5345 }
5346 put_ldev(device);
5347 }
5348
5349 return 0;
5350 }
5351
5352 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5353 {
5354 struct p_barrier_ack *p = pi->data;
5355 struct drbd_peer_device *peer_device;
5356 int vnr;
5357
5358 tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5359
5360 rcu_read_lock();
5361 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5362 struct drbd_device *device = peer_device->device;
5363
5364 if (device->state.conn == C_AHEAD &&
5365 atomic_read(&device->ap_in_flight) == 0 &&
5366 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5367 device->start_resync_timer.expires = jiffies + HZ;
5368 add_timer(&device->start_resync_timer);
5369 }
5370 }
5371 rcu_read_unlock();
5372
5373 return 0;
5374 }
5375
5376 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5377 {
5378 struct drbd_peer_device *peer_device;
5379 struct drbd_device *device;
5380 struct p_block_ack *p = pi->data;
5381 struct drbd_device_work *dw;
5382 sector_t sector;
5383 int size;
5384
5385 peer_device = conn_peer_device(connection, pi->vnr);
5386 if (!peer_device)
5387 return -EIO;
5388 device = peer_device->device;
5389
5390 sector = be64_to_cpu(p->sector);
5391 size = be32_to_cpu(p->blksize);
5392
5393 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5394
5395 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5396 drbd_ov_out_of_sync_found(device, sector, size);
5397 else
5398 ov_out_of_sync_print(device);
5399
5400 if (!get_ldev(device))
5401 return 0;
5402
5403 drbd_rs_complete_io(device, sector);
5404 dec_rs_pending(device);
5405
5406 --device->ov_left;
5407
5408 /* let's advance progress step marks only for every other megabyte */
5409 if ((device->ov_left & 0x200) == 0x200)
5410 drbd_advance_rs_marks(device, device->ov_left);
5411
5412 if (device->ov_left == 0) {
5413 dw = kmalloc(sizeof(*dw), GFP_NOIO);
5414 if (dw) {
5415 dw->w.cb = w_ov_finished;
5416 dw->device = device;
5417 drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5418 } else {
5419 drbd_err(device, "kmalloc(dw) failed.");
5420 ov_out_of_sync_print(device);
5421 drbd_resync_finished(device);
5422 }
5423 }
5424 put_ldev(device);
5425 return 0;
5426 }
5427
5428 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5429 {
5430 return 0;
5431 }
5432
5433 static int connection_finish_peer_reqs(struct drbd_connection *connection)
5434 {
5435 struct drbd_peer_device *peer_device;
5436 int vnr, not_empty = 0;
5437
5438 do {
5439 clear_bit(SIGNAL_ASENDER, &connection->flags);
5440 flush_signals(current);
5441
5442 rcu_read_lock();
5443 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5444 struct drbd_device *device = peer_device->device;
5445 kref_get(&device->kref);
5446 rcu_read_unlock();
5447 if (drbd_finish_peer_reqs(device)) {
5448 kref_put(&device->kref, drbd_destroy_device);
5449 return 1;
5450 }
5451 kref_put(&device->kref, drbd_destroy_device);
5452 rcu_read_lock();
5453 }
5454 set_bit(SIGNAL_ASENDER, &connection->flags);
5455
5456 spin_lock_irq(&connection->resource->req_lock);
5457 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5458 struct drbd_device *device = peer_device->device;
5459 not_empty = !list_empty(&device->done_ee);
5460 if (not_empty)
5461 break;
5462 }
5463 spin_unlock_irq(&connection->resource->req_lock);
5464 rcu_read_unlock();
5465 } while (not_empty);
5466
5467 return 0;
5468 }
5469
5470 struct asender_cmd {
5471 size_t pkt_size;
5472 int (*fn)(struct drbd_connection *connection, struct packet_info *);
5473 };
5474
5475 static struct asender_cmd asender_tbl[] = {
5476 [P_PING] = { 0, got_Ping },
5477 [P_PING_ACK] = { 0, got_PingAck },
5478 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5479 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5480 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5481 [P_SUPERSEDED] = { sizeof(struct p_block_ack), got_BlockAck },
5482 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
5483 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
5484 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply },
5485 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
5486 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
5487 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5488 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
5489 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
5490 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply },
5491 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5492 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), got_BlockAck },
5493 };
5494
5495 int drbd_asender(struct drbd_thread *thi)
5496 {
5497 struct drbd_connection *connection = thi->connection;
5498 struct asender_cmd *cmd = NULL;
5499 struct packet_info pi;
5500 int rv;
5501 void *buf = connection->meta.rbuf;
5502 int received = 0;
5503 unsigned int header_size = drbd_header_size(connection);
5504 int expect = header_size;
5505 bool ping_timeout_active = false;
5506 struct net_conf *nc;
5507 int ping_timeo, tcp_cork, ping_int;
5508 struct sched_param param = { .sched_priority = 2 };
5509
5510 rv = sched_setscheduler(current, SCHED_RR, &param);
5511 if (rv < 0)
5512 drbd_err(connection, "drbd_asender: ERROR set priority, ret=%d\n", rv);
5513
5514 while (get_t_state(thi) == RUNNING) {
5515 drbd_thread_current_set_cpu(thi);
5516
5517 rcu_read_lock();
5518 nc = rcu_dereference(connection->net_conf);
5519 ping_timeo = nc->ping_timeo;
5520 tcp_cork = nc->tcp_cork;
5521 ping_int = nc->ping_int;
5522 rcu_read_unlock();
5523
5524 if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5525 if (drbd_send_ping(connection)) {
5526 drbd_err(connection, "drbd_send_ping has failed\n");
5527 goto reconnect;
5528 }
5529 connection->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5530 ping_timeout_active = true;
5531 }
5532
5533 /* TODO: conditionally cork; it may hurt latency if we cork without
5534 much to send */
5535 if (tcp_cork)
5536 drbd_tcp_cork(connection->meta.socket);
5537 if (connection_finish_peer_reqs(connection)) {
5538 drbd_err(connection, "connection_finish_peer_reqs() failed\n");
5539 goto reconnect;
5540 }
5541 /* but unconditionally uncork unless disabled */
5542 if (tcp_cork)
5543 drbd_tcp_uncork(connection->meta.socket);
5544
5545 /* short circuit, recv_msg would return EINTR anyways. */
5546 if (signal_pending(current))
5547 continue;
5548
5549 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5550 clear_bit(SIGNAL_ASENDER, &connection->flags);
5551
5552 flush_signals(current);
5553
5554 /* Note:
5555 * -EINTR (on meta) we got a signal
5556 * -EAGAIN (on meta) rcvtimeo expired
5557 * -ECONNRESET other side closed the connection
5558 * -ERESTARTSYS (on data) we got a signal
5559 * rv < 0 other than above: unexpected error!
5560 * rv == expected: full header or command
5561 * rv < expected: "woken" by signal during receive
5562 * rv == 0 : "connection shut down by peer"
5563 */
5564 received_more:
5565 if (likely(rv > 0)) {
5566 received += rv;
5567 buf += rv;
5568 } else if (rv == 0) {
5569 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5570 long t;
5571 rcu_read_lock();
5572 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5573 rcu_read_unlock();
5574
5575 t = wait_event_timeout(connection->ping_wait,
5576 connection->cstate < C_WF_REPORT_PARAMS,
5577 t);
5578 if (t)
5579 break;
5580 }
5581 drbd_err(connection, "meta connection shut down by peer.\n");
5582 goto reconnect;
5583 } else if (rv == -EAGAIN) {
5584 /* If the data socket received something meanwhile,
5585 * that is good enough: peer is still alive. */
5586 if (time_after(connection->last_received,
5587 jiffies - connection->meta.socket->sk->sk_rcvtimeo))
5588 continue;
5589 if (ping_timeout_active) {
5590 drbd_err(connection, "PingAck did not arrive in time.\n");
5591 goto reconnect;
5592 }
5593 set_bit(SEND_PING, &connection->flags);
5594 continue;
5595 } else if (rv == -EINTR) {
5596 continue;
5597 } else {
5598 drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5599 goto reconnect;
5600 }
5601
5602 if (received == expect && cmd == NULL) {
5603 if (decode_header(connection, connection->meta.rbuf, &pi))
5604 goto reconnect;
5605 cmd = &asender_tbl[pi.cmd];
5606 if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5607 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5608 cmdname(pi.cmd), pi.cmd);
5609 goto disconnect;
5610 }
5611 expect = header_size + cmd->pkt_size;
5612 if (pi.size != expect - header_size) {
5613 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5614 pi.cmd, pi.size);
5615 goto reconnect;
5616 }
5617 }
5618 if (received == expect) {
5619 bool err;
5620
5621 err = cmd->fn(connection, &pi);
5622 if (err) {
5623 drbd_err(connection, "%pf failed\n", cmd->fn);
5624 goto reconnect;
5625 }
5626
5627 connection->last_received = jiffies;
5628
5629 if (cmd == &asender_tbl[P_PING_ACK]) {
5630 /* restore idle timeout */
5631 connection->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5632 ping_timeout_active = false;
5633 }
5634
5635 buf = connection->meta.rbuf;
5636 received = 0;
5637 expect = header_size;
5638 cmd = NULL;
5639 }
5640 if (test_bit(SEND_PING, &connection->flags))
5641 continue;
5642 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, MSG_DONTWAIT);
5643 if (rv > 0)
5644 goto received_more;
5645 }
5646
5647 if (0) {
5648 reconnect:
5649 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5650 conn_md_sync(connection);
5651 }
5652 if (0) {
5653 disconnect:
5654 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5655 }
5656 clear_bit(SIGNAL_ASENDER, &connection->flags);
5657
5658 drbd_info(connection, "asender terminated\n");
5659
5660 return 0;
5661 }
This page took 0.157101 seconds and 5 git commands to generate.