Merge branch 'wimax-2.6.35.y' of git://git.kernel.org/pub/scm/linux/kernel/git/inaky...
[deliverable/linux.git] / drivers / block / drbd / drbd_worker.c
1 /*
2 drbd_worker.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24 */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/smp_lock.h>
30 #include <linux/wait.h>
31 #include <linux/mm.h>
32 #include <linux/memcontrol.h>
33 #include <linux/mm_inline.h>
34 #include <linux/slab.h>
35 #include <linux/random.h>
36 #include <linux/string.h>
37 #include <linux/scatterlist.h>
38
39 #include "drbd_int.h"
40 #include "drbd_req.h"
41
42 #define SLEEP_TIME (HZ/10)
43
44 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
45
46
47
48 /* defined here:
49 drbd_md_io_complete
50 drbd_endio_sec
51 drbd_endio_pri
52
53 * more endio handlers:
54 atodb_endio in drbd_actlog.c
55 drbd_bm_async_io_complete in drbd_bitmap.c
56
57 * For all these callbacks, note the following:
58 * The callbacks will be called in irq context by the IDE drivers,
59 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
60 * Try to get the locking right :)
61 *
62 */
63
64
65 /* About the global_state_lock
66 Each state transition on an device holds a read lock. In case we have
67 to evaluate the sync after dependencies, we grab a write lock, because
68 we need stable states on all devices for that. */
69 rwlock_t global_state_lock;
70
71 /* used for synchronous meta data and bitmap IO
72 * submitted by drbd_md_sync_page_io()
73 */
74 void drbd_md_io_complete(struct bio *bio, int error)
75 {
76 struct drbd_md_io *md_io;
77
78 md_io = (struct drbd_md_io *)bio->bi_private;
79 md_io->error = error;
80
81 complete(&md_io->event);
82 }
83
84 /* reads on behalf of the partner,
85 * "submitted" by the receiver
86 */
87 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
88 {
89 unsigned long flags = 0;
90 struct drbd_conf *mdev = e->mdev;
91
92 D_ASSERT(e->block_id != ID_VACANT);
93
94 spin_lock_irqsave(&mdev->req_lock, flags);
95 mdev->read_cnt += e->size >> 9;
96 list_del(&e->w.list);
97 if (list_empty(&mdev->read_ee))
98 wake_up(&mdev->ee_wait);
99 if (test_bit(__EE_WAS_ERROR, &e->flags))
100 __drbd_chk_io_error(mdev, FALSE);
101 spin_unlock_irqrestore(&mdev->req_lock, flags);
102
103 drbd_queue_work(&mdev->data.work, &e->w);
104 put_ldev(mdev);
105 }
106
107 static int is_failed_barrier(int ee_flags)
108 {
109 return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED))
110 == (EE_IS_BARRIER|EE_WAS_ERROR);
111 }
112
113 /* writes on behalf of the partner, or resync writes,
114 * "submitted" by the receiver, final stage. */
115 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
116 {
117 unsigned long flags = 0;
118 struct drbd_conf *mdev = e->mdev;
119 sector_t e_sector;
120 int do_wake;
121 int is_syncer_req;
122 int do_al_complete_io;
123
124 /* if this is a failed barrier request, disable use of barriers,
125 * and schedule for resubmission */
126 if (is_failed_barrier(e->flags)) {
127 drbd_bump_write_ordering(mdev, WO_bdev_flush);
128 spin_lock_irqsave(&mdev->req_lock, flags);
129 list_del(&e->w.list);
130 e->flags = (e->flags & ~EE_WAS_ERROR) | EE_RESUBMITTED;
131 e->w.cb = w_e_reissue;
132 /* put_ldev actually happens below, once we come here again. */
133 __release(local);
134 spin_unlock_irqrestore(&mdev->req_lock, flags);
135 drbd_queue_work(&mdev->data.work, &e->w);
136 return;
137 }
138
139 D_ASSERT(e->block_id != ID_VACANT);
140
141 /* after we moved e to done_ee,
142 * we may no longer access it,
143 * it may be freed/reused already!
144 * (as soon as we release the req_lock) */
145 e_sector = e->sector;
146 do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
147 is_syncer_req = is_syncer_block_id(e->block_id);
148
149 spin_lock_irqsave(&mdev->req_lock, flags);
150 mdev->writ_cnt += e->size >> 9;
151 list_del(&e->w.list); /* has been on active_ee or sync_ee */
152 list_add_tail(&e->w.list, &mdev->done_ee);
153
154 /* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
155 * neither did we wake possibly waiting conflicting requests.
156 * done from "drbd_process_done_ee" within the appropriate w.cb
157 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
158
159 do_wake = is_syncer_req
160 ? list_empty(&mdev->sync_ee)
161 : list_empty(&mdev->active_ee);
162
163 if (test_bit(__EE_WAS_ERROR, &e->flags))
164 __drbd_chk_io_error(mdev, FALSE);
165 spin_unlock_irqrestore(&mdev->req_lock, flags);
166
167 if (is_syncer_req)
168 drbd_rs_complete_io(mdev, e_sector);
169
170 if (do_wake)
171 wake_up(&mdev->ee_wait);
172
173 if (do_al_complete_io)
174 drbd_al_complete_io(mdev, e_sector);
175
176 wake_asender(mdev);
177 put_ldev(mdev);
178 }
179
180 /* writes on behalf of the partner, or resync writes,
181 * "submitted" by the receiver.
182 */
183 void drbd_endio_sec(struct bio *bio, int error)
184 {
185 struct drbd_epoch_entry *e = bio->bi_private;
186 struct drbd_conf *mdev = e->mdev;
187 int uptodate = bio_flagged(bio, BIO_UPTODATE);
188 int is_write = bio_data_dir(bio) == WRITE;
189
190 if (error)
191 dev_warn(DEV, "%s: error=%d s=%llus\n",
192 is_write ? "write" : "read", error,
193 (unsigned long long)e->sector);
194 if (!error && !uptodate) {
195 dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
196 is_write ? "write" : "read",
197 (unsigned long long)e->sector);
198 /* strange behavior of some lower level drivers...
199 * fail the request by clearing the uptodate flag,
200 * but do not return any error?! */
201 error = -EIO;
202 }
203
204 if (error)
205 set_bit(__EE_WAS_ERROR, &e->flags);
206
207 bio_put(bio); /* no need for the bio anymore */
208 if (atomic_dec_and_test(&e->pending_bios)) {
209 if (is_write)
210 drbd_endio_write_sec_final(e);
211 else
212 drbd_endio_read_sec_final(e);
213 }
214 }
215
216 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
217 */
218 void drbd_endio_pri(struct bio *bio, int error)
219 {
220 unsigned long flags;
221 struct drbd_request *req = bio->bi_private;
222 struct drbd_conf *mdev = req->mdev;
223 struct bio_and_error m;
224 enum drbd_req_event what;
225 int uptodate = bio_flagged(bio, BIO_UPTODATE);
226
227 if (error)
228 dev_warn(DEV, "p %s: error=%d\n",
229 bio_data_dir(bio) == WRITE ? "write" : "read", error);
230 if (!error && !uptodate) {
231 dev_warn(DEV, "p %s: setting error to -EIO\n",
232 bio_data_dir(bio) == WRITE ? "write" : "read");
233 /* strange behavior of some lower level drivers...
234 * fail the request by clearing the uptodate flag,
235 * but do not return any error?! */
236 error = -EIO;
237 }
238
239 /* to avoid recursion in __req_mod */
240 if (unlikely(error)) {
241 what = (bio_data_dir(bio) == WRITE)
242 ? write_completed_with_error
243 : (bio_rw(bio) == READ)
244 ? read_completed_with_error
245 : read_ahead_completed_with_error;
246 } else
247 what = completed_ok;
248
249 bio_put(req->private_bio);
250 req->private_bio = ERR_PTR(error);
251
252 spin_lock_irqsave(&mdev->req_lock, flags);
253 __req_mod(req, what, &m);
254 spin_unlock_irqrestore(&mdev->req_lock, flags);
255
256 if (m.bio)
257 complete_master_bio(mdev, &m);
258 }
259
260 int w_io_error(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
261 {
262 struct drbd_request *req = container_of(w, struct drbd_request, w);
263
264 /* NOTE: mdev->ldev can be NULL by the time we get here! */
265 /* D_ASSERT(mdev->ldev->dc.on_io_error != EP_PASS_ON); */
266
267 /* the only way this callback is scheduled is from _req_may_be_done,
268 * when it is done and had a local write error, see comments there */
269 drbd_req_free(req);
270
271 return TRUE;
272 }
273
274 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
275 {
276 struct drbd_request *req = container_of(w, struct drbd_request, w);
277
278 /* We should not detach for read io-error,
279 * but try to WRITE the P_DATA_REPLY to the failed location,
280 * to give the disk the chance to relocate that block */
281
282 spin_lock_irq(&mdev->req_lock);
283 if (cancel ||
284 mdev->state.conn < C_CONNECTED ||
285 mdev->state.pdsk <= D_INCONSISTENT) {
286 _req_mod(req, send_canceled);
287 spin_unlock_irq(&mdev->req_lock);
288 dev_alert(DEV, "WE ARE LOST. Local IO failure, no peer.\n");
289 return 1;
290 }
291 spin_unlock_irq(&mdev->req_lock);
292
293 return w_send_read_req(mdev, w, 0);
294 }
295
296 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
297 {
298 ERR_IF(cancel) return 1;
299 dev_err(DEV, "resync inactive, but callback triggered??\n");
300 return 1; /* Simply ignore this! */
301 }
302
303 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
304 {
305 struct hash_desc desc;
306 struct scatterlist sg;
307 struct page *page = e->pages;
308 struct page *tmp;
309 unsigned len;
310
311 desc.tfm = tfm;
312 desc.flags = 0;
313
314 sg_init_table(&sg, 1);
315 crypto_hash_init(&desc);
316
317 while ((tmp = page_chain_next(page))) {
318 /* all but the last page will be fully used */
319 sg_set_page(&sg, page, PAGE_SIZE, 0);
320 crypto_hash_update(&desc, &sg, sg.length);
321 page = tmp;
322 }
323 /* and now the last, possibly only partially used page */
324 len = e->size & (PAGE_SIZE - 1);
325 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
326 crypto_hash_update(&desc, &sg, sg.length);
327 crypto_hash_final(&desc, digest);
328 }
329
330 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
331 {
332 struct hash_desc desc;
333 struct scatterlist sg;
334 struct bio_vec *bvec;
335 int i;
336
337 desc.tfm = tfm;
338 desc.flags = 0;
339
340 sg_init_table(&sg, 1);
341 crypto_hash_init(&desc);
342
343 __bio_for_each_segment(bvec, bio, i, 0) {
344 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
345 crypto_hash_update(&desc, &sg, sg.length);
346 }
347 crypto_hash_final(&desc, digest);
348 }
349
350 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
351 {
352 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
353 int digest_size;
354 void *digest;
355 int ok;
356
357 D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
358
359 if (unlikely(cancel)) {
360 drbd_free_ee(mdev, e);
361 return 1;
362 }
363
364 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
365 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
366 digest = kmalloc(digest_size, GFP_NOIO);
367 if (digest) {
368 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
369
370 inc_rs_pending(mdev);
371 ok = drbd_send_drequest_csum(mdev,
372 e->sector,
373 e->size,
374 digest,
375 digest_size,
376 P_CSUM_RS_REQUEST);
377 kfree(digest);
378 } else {
379 dev_err(DEV, "kmalloc() of digest failed.\n");
380 ok = 0;
381 }
382 } else
383 ok = 1;
384
385 drbd_free_ee(mdev, e);
386
387 if (unlikely(!ok))
388 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
389 return ok;
390 }
391
392 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
393
394 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
395 {
396 struct drbd_epoch_entry *e;
397
398 if (!get_ldev(mdev))
399 return 0;
400
401 /* GFP_TRY, because if there is no memory available right now, this may
402 * be rescheduled for later. It is "only" background resync, after all. */
403 e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
404 if (!e)
405 goto fail;
406
407 spin_lock_irq(&mdev->req_lock);
408 list_add(&e->w.list, &mdev->read_ee);
409 spin_unlock_irq(&mdev->req_lock);
410
411 e->w.cb = w_e_send_csum;
412 if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
413 return 1;
414
415 drbd_free_ee(mdev, e);
416 fail:
417 put_ldev(mdev);
418 return 2;
419 }
420
421 void resync_timer_fn(unsigned long data)
422 {
423 unsigned long flags;
424 struct drbd_conf *mdev = (struct drbd_conf *) data;
425 int queue;
426
427 spin_lock_irqsave(&mdev->req_lock, flags);
428
429 if (likely(!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))) {
430 queue = 1;
431 if (mdev->state.conn == C_VERIFY_S)
432 mdev->resync_work.cb = w_make_ov_request;
433 else
434 mdev->resync_work.cb = w_make_resync_request;
435 } else {
436 queue = 0;
437 mdev->resync_work.cb = w_resync_inactive;
438 }
439
440 spin_unlock_irqrestore(&mdev->req_lock, flags);
441
442 /* harmless race: list_empty outside data.work.q_lock */
443 if (list_empty(&mdev->resync_work.list) && queue)
444 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
445 }
446
447 static int calc_resync_rate(struct drbd_conf *mdev)
448 {
449 int d = mdev->data_delay / 1000; /* us -> ms */
450 int td = mdev->sync_conf.throttle_th * 100; /* 0.1s -> ms */
451 int hd = mdev->sync_conf.hold_off_th * 100; /* 0.1s -> ms */
452 int cr = mdev->sync_conf.rate;
453
454 return d <= td ? cr :
455 d >= hd ? 0 :
456 cr + (cr * (td - d) / (hd - td));
457 }
458
459 int w_make_resync_request(struct drbd_conf *mdev,
460 struct drbd_work *w, int cancel)
461 {
462 unsigned long bit;
463 sector_t sector;
464 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
465 int max_segment_size;
466 int number, i, size, pe, mx;
467 int align, queued, sndbuf;
468
469 if (unlikely(cancel))
470 return 1;
471
472 if (unlikely(mdev->state.conn < C_CONNECTED)) {
473 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
474 return 0;
475 }
476
477 if (mdev->state.conn != C_SYNC_TARGET)
478 dev_err(DEV, "%s in w_make_resync_request\n",
479 drbd_conn_str(mdev->state.conn));
480
481 if (!get_ldev(mdev)) {
482 /* Since we only need to access mdev->rsync a
483 get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
484 to continue resync with a broken disk makes no sense at
485 all */
486 dev_err(DEV, "Disk broke down during resync!\n");
487 mdev->resync_work.cb = w_resync_inactive;
488 return 1;
489 }
490
491 /* starting with drbd 8.3.8, we can handle multi-bio EEs,
492 * if it should be necessary */
493 max_segment_size = mdev->agreed_pro_version < 94 ?
494 queue_max_segment_size(mdev->rq_queue) : DRBD_MAX_SEGMENT_SIZE;
495
496 mdev->c_sync_rate = calc_resync_rate(mdev);
497 number = SLEEP_TIME * mdev->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ);
498 pe = atomic_read(&mdev->rs_pending_cnt);
499
500 mutex_lock(&mdev->data.mutex);
501 if (mdev->data.socket)
502 mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
503 else
504 mx = 1;
505 mutex_unlock(&mdev->data.mutex);
506
507 /* For resync rates >160MB/sec, allow more pending RS requests */
508 if (number > mx)
509 mx = number;
510
511 /* Limit the number of pending RS requests to no more than the peer's receive buffer */
512 if ((pe + number) > mx) {
513 number = mx - pe;
514 }
515
516 for (i = 0; i < number; i++) {
517 /* Stop generating RS requests, when half of the send buffer is filled */
518 mutex_lock(&mdev->data.mutex);
519 if (mdev->data.socket) {
520 queued = mdev->data.socket->sk->sk_wmem_queued;
521 sndbuf = mdev->data.socket->sk->sk_sndbuf;
522 } else {
523 queued = 1;
524 sndbuf = 0;
525 }
526 mutex_unlock(&mdev->data.mutex);
527 if (queued > sndbuf / 2)
528 goto requeue;
529
530 next_sector:
531 size = BM_BLOCK_SIZE;
532 bit = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
533
534 if (bit == -1UL) {
535 mdev->bm_resync_fo = drbd_bm_bits(mdev);
536 mdev->resync_work.cb = w_resync_inactive;
537 put_ldev(mdev);
538 return 1;
539 }
540
541 sector = BM_BIT_TO_SECT(bit);
542
543 if (drbd_try_rs_begin_io(mdev, sector)) {
544 mdev->bm_resync_fo = bit;
545 goto requeue;
546 }
547 mdev->bm_resync_fo = bit + 1;
548
549 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
550 drbd_rs_complete_io(mdev, sector);
551 goto next_sector;
552 }
553
554 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
555 /* try to find some adjacent bits.
556 * we stop if we have already the maximum req size.
557 *
558 * Additionally always align bigger requests, in order to
559 * be prepared for all stripe sizes of software RAIDs.
560 */
561 align = 1;
562 for (;;) {
563 if (size + BM_BLOCK_SIZE > max_segment_size)
564 break;
565
566 /* Be always aligned */
567 if (sector & ((1<<(align+3))-1))
568 break;
569
570 /* do not cross extent boundaries */
571 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
572 break;
573 /* now, is it actually dirty, after all?
574 * caution, drbd_bm_test_bit is tri-state for some
575 * obscure reason; ( b == 0 ) would get the out-of-band
576 * only accidentally right because of the "oddly sized"
577 * adjustment below */
578 if (drbd_bm_test_bit(mdev, bit+1) != 1)
579 break;
580 bit++;
581 size += BM_BLOCK_SIZE;
582 if ((BM_BLOCK_SIZE << align) <= size)
583 align++;
584 i++;
585 }
586 /* if we merged some,
587 * reset the offset to start the next drbd_bm_find_next from */
588 if (size > BM_BLOCK_SIZE)
589 mdev->bm_resync_fo = bit + 1;
590 #endif
591
592 /* adjust very last sectors, in case we are oddly sized */
593 if (sector + (size>>9) > capacity)
594 size = (capacity-sector)<<9;
595 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
596 switch (read_for_csum(mdev, sector, size)) {
597 case 0: /* Disk failure*/
598 put_ldev(mdev);
599 return 0;
600 case 2: /* Allocation failed */
601 drbd_rs_complete_io(mdev, sector);
602 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
603 goto requeue;
604 /* case 1: everything ok */
605 }
606 } else {
607 inc_rs_pending(mdev);
608 if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
609 sector, size, ID_SYNCER)) {
610 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
611 dec_rs_pending(mdev);
612 put_ldev(mdev);
613 return 0;
614 }
615 }
616 }
617
618 if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
619 /* last syncer _request_ was sent,
620 * but the P_RS_DATA_REPLY not yet received. sync will end (and
621 * next sync group will resume), as soon as we receive the last
622 * resync data block, and the last bit is cleared.
623 * until then resync "work" is "inactive" ...
624 */
625 mdev->resync_work.cb = w_resync_inactive;
626 put_ldev(mdev);
627 return 1;
628 }
629
630 requeue:
631 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
632 put_ldev(mdev);
633 return 1;
634 }
635
636 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
637 {
638 int number, i, size;
639 sector_t sector;
640 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
641
642 if (unlikely(cancel))
643 return 1;
644
645 if (unlikely(mdev->state.conn < C_CONNECTED)) {
646 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
647 return 0;
648 }
649
650 number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
651 if (atomic_read(&mdev->rs_pending_cnt) > number)
652 goto requeue;
653
654 number -= atomic_read(&mdev->rs_pending_cnt);
655
656 sector = mdev->ov_position;
657 for (i = 0; i < number; i++) {
658 if (sector >= capacity) {
659 mdev->resync_work.cb = w_resync_inactive;
660 return 1;
661 }
662
663 size = BM_BLOCK_SIZE;
664
665 if (drbd_try_rs_begin_io(mdev, sector)) {
666 mdev->ov_position = sector;
667 goto requeue;
668 }
669
670 if (sector + (size>>9) > capacity)
671 size = (capacity-sector)<<9;
672
673 inc_rs_pending(mdev);
674 if (!drbd_send_ov_request(mdev, sector, size)) {
675 dec_rs_pending(mdev);
676 return 0;
677 }
678 sector += BM_SECT_PER_BIT;
679 }
680 mdev->ov_position = sector;
681
682 requeue:
683 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
684 return 1;
685 }
686
687
688 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
689 {
690 kfree(w);
691 ov_oos_print(mdev);
692 drbd_resync_finished(mdev);
693
694 return 1;
695 }
696
697 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
698 {
699 kfree(w);
700
701 drbd_resync_finished(mdev);
702
703 return 1;
704 }
705
706 int drbd_resync_finished(struct drbd_conf *mdev)
707 {
708 unsigned long db, dt, dbdt;
709 unsigned long n_oos;
710 union drbd_state os, ns;
711 struct drbd_work *w;
712 char *khelper_cmd = NULL;
713
714 /* Remove all elements from the resync LRU. Since future actions
715 * might set bits in the (main) bitmap, then the entries in the
716 * resync LRU would be wrong. */
717 if (drbd_rs_del_all(mdev)) {
718 /* In case this is not possible now, most probably because
719 * there are P_RS_DATA_REPLY Packets lingering on the worker's
720 * queue (or even the read operations for those packets
721 * is not finished by now). Retry in 100ms. */
722
723 drbd_kick_lo(mdev);
724 __set_current_state(TASK_INTERRUPTIBLE);
725 schedule_timeout(HZ / 10);
726 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
727 if (w) {
728 w->cb = w_resync_finished;
729 drbd_queue_work(&mdev->data.work, w);
730 return 1;
731 }
732 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
733 }
734
735 dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
736 if (dt <= 0)
737 dt = 1;
738 db = mdev->rs_total;
739 dbdt = Bit2KB(db/dt);
740 mdev->rs_paused /= HZ;
741
742 if (!get_ldev(mdev))
743 goto out;
744
745 spin_lock_irq(&mdev->req_lock);
746 os = mdev->state;
747
748 /* This protects us against multiple calls (that can happen in the presence
749 of application IO), and against connectivity loss just before we arrive here. */
750 if (os.conn <= C_CONNECTED)
751 goto out_unlock;
752
753 ns = os;
754 ns.conn = C_CONNECTED;
755
756 dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
757 (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
758 "Online verify " : "Resync",
759 dt + mdev->rs_paused, mdev->rs_paused, dbdt);
760
761 n_oos = drbd_bm_total_weight(mdev);
762
763 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
764 if (n_oos) {
765 dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
766 n_oos, Bit2KB(1));
767 khelper_cmd = "out-of-sync";
768 }
769 } else {
770 D_ASSERT((n_oos - mdev->rs_failed) == 0);
771
772 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
773 khelper_cmd = "after-resync-target";
774
775 if (mdev->csums_tfm && mdev->rs_total) {
776 const unsigned long s = mdev->rs_same_csum;
777 const unsigned long t = mdev->rs_total;
778 const int ratio =
779 (t == 0) ? 0 :
780 (t < 100000) ? ((s*100)/t) : (s/(t/100));
781 dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
782 "transferred %luK total %luK\n",
783 ratio,
784 Bit2KB(mdev->rs_same_csum),
785 Bit2KB(mdev->rs_total - mdev->rs_same_csum),
786 Bit2KB(mdev->rs_total));
787 }
788 }
789
790 if (mdev->rs_failed) {
791 dev_info(DEV, " %lu failed blocks\n", mdev->rs_failed);
792
793 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
794 ns.disk = D_INCONSISTENT;
795 ns.pdsk = D_UP_TO_DATE;
796 } else {
797 ns.disk = D_UP_TO_DATE;
798 ns.pdsk = D_INCONSISTENT;
799 }
800 } else {
801 ns.disk = D_UP_TO_DATE;
802 ns.pdsk = D_UP_TO_DATE;
803
804 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
805 if (mdev->p_uuid) {
806 int i;
807 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
808 _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
809 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
810 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
811 } else {
812 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
813 }
814 }
815
816 drbd_uuid_set_bm(mdev, 0UL);
817
818 if (mdev->p_uuid) {
819 /* Now the two UUID sets are equal, update what we
820 * know of the peer. */
821 int i;
822 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
823 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
824 }
825 }
826
827 _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
828 out_unlock:
829 spin_unlock_irq(&mdev->req_lock);
830 put_ldev(mdev);
831 out:
832 mdev->rs_total = 0;
833 mdev->rs_failed = 0;
834 mdev->rs_paused = 0;
835 mdev->ov_start_sector = 0;
836
837 if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
838 dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n");
839 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
840 }
841
842 if (khelper_cmd)
843 drbd_khelper(mdev, khelper_cmd);
844
845 return 1;
846 }
847
848 /* helper */
849 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
850 {
851 if (drbd_ee_has_active_page(e)) {
852 /* This might happen if sendpage() has not finished */
853 spin_lock_irq(&mdev->req_lock);
854 list_add_tail(&e->w.list, &mdev->net_ee);
855 spin_unlock_irq(&mdev->req_lock);
856 } else
857 drbd_free_ee(mdev, e);
858 }
859
860 /**
861 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
862 * @mdev: DRBD device.
863 * @w: work object.
864 * @cancel: The connection will be closed anyways
865 */
866 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
867 {
868 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
869 int ok;
870
871 if (unlikely(cancel)) {
872 drbd_free_ee(mdev, e);
873 dec_unacked(mdev);
874 return 1;
875 }
876
877 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
878 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
879 } else {
880 if (__ratelimit(&drbd_ratelimit_state))
881 dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
882 (unsigned long long)e->sector);
883
884 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
885 }
886
887 dec_unacked(mdev);
888
889 move_to_net_ee_or_free(mdev, e);
890
891 if (unlikely(!ok))
892 dev_err(DEV, "drbd_send_block() failed\n");
893 return ok;
894 }
895
896 /**
897 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
898 * @mdev: DRBD device.
899 * @w: work object.
900 * @cancel: The connection will be closed anyways
901 */
902 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
903 {
904 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
905 int ok;
906
907 if (unlikely(cancel)) {
908 drbd_free_ee(mdev, e);
909 dec_unacked(mdev);
910 return 1;
911 }
912
913 if (get_ldev_if_state(mdev, D_FAILED)) {
914 drbd_rs_complete_io(mdev, e->sector);
915 put_ldev(mdev);
916 }
917
918 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
919 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
920 inc_rs_pending(mdev);
921 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
922 } else {
923 if (__ratelimit(&drbd_ratelimit_state))
924 dev_err(DEV, "Not sending RSDataReply, "
925 "partner DISKLESS!\n");
926 ok = 1;
927 }
928 } else {
929 if (__ratelimit(&drbd_ratelimit_state))
930 dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
931 (unsigned long long)e->sector);
932
933 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
934
935 /* update resync data with failure */
936 drbd_rs_failed_io(mdev, e->sector, e->size);
937 }
938
939 dec_unacked(mdev);
940
941 move_to_net_ee_or_free(mdev, e);
942
943 if (unlikely(!ok))
944 dev_err(DEV, "drbd_send_block() failed\n");
945 return ok;
946 }
947
948 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
949 {
950 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
951 struct digest_info *di;
952 int digest_size;
953 void *digest = NULL;
954 int ok, eq = 0;
955
956 if (unlikely(cancel)) {
957 drbd_free_ee(mdev, e);
958 dec_unacked(mdev);
959 return 1;
960 }
961
962 drbd_rs_complete_io(mdev, e->sector);
963
964 di = (struct digest_info *)(unsigned long)e->block_id;
965
966 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
967 /* quick hack to try to avoid a race against reconfiguration.
968 * a real fix would be much more involved,
969 * introducing more locking mechanisms */
970 if (mdev->csums_tfm) {
971 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
972 D_ASSERT(digest_size == di->digest_size);
973 digest = kmalloc(digest_size, GFP_NOIO);
974 }
975 if (digest) {
976 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
977 eq = !memcmp(digest, di->digest, digest_size);
978 kfree(digest);
979 }
980
981 if (eq) {
982 drbd_set_in_sync(mdev, e->sector, e->size);
983 /* rs_same_csums unit is BM_BLOCK_SIZE */
984 mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
985 ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
986 } else {
987 inc_rs_pending(mdev);
988 e->block_id = ID_SYNCER;
989 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
990 }
991 } else {
992 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
993 if (__ratelimit(&drbd_ratelimit_state))
994 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
995 }
996
997 dec_unacked(mdev);
998
999 kfree(di);
1000
1001 move_to_net_ee_or_free(mdev, e);
1002
1003 if (unlikely(!ok))
1004 dev_err(DEV, "drbd_send_block/ack() failed\n");
1005 return ok;
1006 }
1007
1008 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1009 {
1010 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1011 int digest_size;
1012 void *digest;
1013 int ok = 1;
1014
1015 if (unlikely(cancel))
1016 goto out;
1017
1018 if (unlikely((e->flags & EE_WAS_ERROR) != 0))
1019 goto out;
1020
1021 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1022 /* FIXME if this allocation fails, online verify will not terminate! */
1023 digest = kmalloc(digest_size, GFP_NOIO);
1024 if (digest) {
1025 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1026 inc_rs_pending(mdev);
1027 ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1028 digest, digest_size, P_OV_REPLY);
1029 if (!ok)
1030 dec_rs_pending(mdev);
1031 kfree(digest);
1032 }
1033
1034 out:
1035 drbd_free_ee(mdev, e);
1036
1037 dec_unacked(mdev);
1038
1039 return ok;
1040 }
1041
1042 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1043 {
1044 if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1045 mdev->ov_last_oos_size += size>>9;
1046 } else {
1047 mdev->ov_last_oos_start = sector;
1048 mdev->ov_last_oos_size = size>>9;
1049 }
1050 drbd_set_out_of_sync(mdev, sector, size);
1051 set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1052 }
1053
1054 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1055 {
1056 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1057 struct digest_info *di;
1058 int digest_size;
1059 void *digest;
1060 int ok, eq = 0;
1061
1062 if (unlikely(cancel)) {
1063 drbd_free_ee(mdev, e);
1064 dec_unacked(mdev);
1065 return 1;
1066 }
1067
1068 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1069 * the resync lru has been cleaned up already */
1070 drbd_rs_complete_io(mdev, e->sector);
1071
1072 di = (struct digest_info *)(unsigned long)e->block_id;
1073
1074 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1075 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1076 digest = kmalloc(digest_size, GFP_NOIO);
1077 if (digest) {
1078 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1079
1080 D_ASSERT(digest_size == di->digest_size);
1081 eq = !memcmp(digest, di->digest, digest_size);
1082 kfree(digest);
1083 }
1084 } else {
1085 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1086 if (__ratelimit(&drbd_ratelimit_state))
1087 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1088 }
1089
1090 dec_unacked(mdev);
1091
1092 kfree(di);
1093
1094 if (!eq)
1095 drbd_ov_oos_found(mdev, e->sector, e->size);
1096 else
1097 ov_oos_print(mdev);
1098
1099 ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1100 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1101
1102 drbd_free_ee(mdev, e);
1103
1104 if (--mdev->ov_left == 0) {
1105 ov_oos_print(mdev);
1106 drbd_resync_finished(mdev);
1107 }
1108
1109 return ok;
1110 }
1111
1112 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1113 {
1114 struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1115 complete(&b->done);
1116 return 1;
1117 }
1118
1119 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1120 {
1121 struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1122 struct p_barrier *p = &mdev->data.sbuf.barrier;
1123 int ok = 1;
1124
1125 /* really avoid racing with tl_clear. w.cb may have been referenced
1126 * just before it was reassigned and re-queued, so double check that.
1127 * actually, this race was harmless, since we only try to send the
1128 * barrier packet here, and otherwise do nothing with the object.
1129 * but compare with the head of w_clear_epoch */
1130 spin_lock_irq(&mdev->req_lock);
1131 if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1132 cancel = 1;
1133 spin_unlock_irq(&mdev->req_lock);
1134 if (cancel)
1135 return 1;
1136
1137 if (!drbd_get_data_sock(mdev))
1138 return 0;
1139 p->barrier = b->br_number;
1140 /* inc_ap_pending was done where this was queued.
1141 * dec_ap_pending will be done in got_BarrierAck
1142 * or (on connection loss) in w_clear_epoch. */
1143 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1144 (struct p_header *)p, sizeof(*p), 0);
1145 drbd_put_data_sock(mdev);
1146
1147 return ok;
1148 }
1149
1150 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1151 {
1152 if (cancel)
1153 return 1;
1154 return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1155 }
1156
1157 /**
1158 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1159 * @mdev: DRBD device.
1160 * @w: work object.
1161 * @cancel: The connection will be closed anyways
1162 */
1163 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1164 {
1165 struct drbd_request *req = container_of(w, struct drbd_request, w);
1166 int ok;
1167
1168 if (unlikely(cancel)) {
1169 req_mod(req, send_canceled);
1170 return 1;
1171 }
1172
1173 ok = drbd_send_dblock(mdev, req);
1174 req_mod(req, ok ? handed_over_to_network : send_failed);
1175
1176 return ok;
1177 }
1178
1179 /**
1180 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1181 * @mdev: DRBD device.
1182 * @w: work object.
1183 * @cancel: The connection will be closed anyways
1184 */
1185 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1186 {
1187 struct drbd_request *req = container_of(w, struct drbd_request, w);
1188 int ok;
1189
1190 if (unlikely(cancel)) {
1191 req_mod(req, send_canceled);
1192 return 1;
1193 }
1194
1195 ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1196 (unsigned long)req);
1197
1198 if (!ok) {
1199 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1200 * so this is probably redundant */
1201 if (mdev->state.conn >= C_CONNECTED)
1202 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1203 }
1204 req_mod(req, ok ? handed_over_to_network : send_failed);
1205
1206 return ok;
1207 }
1208
1209 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1210 {
1211 struct drbd_conf *odev = mdev;
1212
1213 while (1) {
1214 if (odev->sync_conf.after == -1)
1215 return 1;
1216 odev = minor_to_mdev(odev->sync_conf.after);
1217 ERR_IF(!odev) return 1;
1218 if ((odev->state.conn >= C_SYNC_SOURCE &&
1219 odev->state.conn <= C_PAUSED_SYNC_T) ||
1220 odev->state.aftr_isp || odev->state.peer_isp ||
1221 odev->state.user_isp)
1222 return 0;
1223 }
1224 }
1225
1226 /**
1227 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1228 * @mdev: DRBD device.
1229 *
1230 * Called from process context only (admin command and after_state_ch).
1231 */
1232 static int _drbd_pause_after(struct drbd_conf *mdev)
1233 {
1234 struct drbd_conf *odev;
1235 int i, rv = 0;
1236
1237 for (i = 0; i < minor_count; i++) {
1238 odev = minor_to_mdev(i);
1239 if (!odev)
1240 continue;
1241 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1242 continue;
1243 if (!_drbd_may_sync_now(odev))
1244 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1245 != SS_NOTHING_TO_DO);
1246 }
1247
1248 return rv;
1249 }
1250
1251 /**
1252 * _drbd_resume_next() - Resume resync on all devices that may resync now
1253 * @mdev: DRBD device.
1254 *
1255 * Called from process context only (admin command and worker).
1256 */
1257 static int _drbd_resume_next(struct drbd_conf *mdev)
1258 {
1259 struct drbd_conf *odev;
1260 int i, rv = 0;
1261
1262 for (i = 0; i < minor_count; i++) {
1263 odev = minor_to_mdev(i);
1264 if (!odev)
1265 continue;
1266 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1267 continue;
1268 if (odev->state.aftr_isp) {
1269 if (_drbd_may_sync_now(odev))
1270 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1271 CS_HARD, NULL)
1272 != SS_NOTHING_TO_DO) ;
1273 }
1274 }
1275 return rv;
1276 }
1277
1278 void resume_next_sg(struct drbd_conf *mdev)
1279 {
1280 write_lock_irq(&global_state_lock);
1281 _drbd_resume_next(mdev);
1282 write_unlock_irq(&global_state_lock);
1283 }
1284
1285 void suspend_other_sg(struct drbd_conf *mdev)
1286 {
1287 write_lock_irq(&global_state_lock);
1288 _drbd_pause_after(mdev);
1289 write_unlock_irq(&global_state_lock);
1290 }
1291
1292 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1293 {
1294 struct drbd_conf *odev;
1295
1296 if (o_minor == -1)
1297 return NO_ERROR;
1298 if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1299 return ERR_SYNC_AFTER;
1300
1301 /* check for loops */
1302 odev = minor_to_mdev(o_minor);
1303 while (1) {
1304 if (odev == mdev)
1305 return ERR_SYNC_AFTER_CYCLE;
1306
1307 /* dependency chain ends here, no cycles. */
1308 if (odev->sync_conf.after == -1)
1309 return NO_ERROR;
1310
1311 /* follow the dependency chain */
1312 odev = minor_to_mdev(odev->sync_conf.after);
1313 }
1314 }
1315
1316 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1317 {
1318 int changes;
1319 int retcode;
1320
1321 write_lock_irq(&global_state_lock);
1322 retcode = sync_after_error(mdev, na);
1323 if (retcode == NO_ERROR) {
1324 mdev->sync_conf.after = na;
1325 do {
1326 changes = _drbd_pause_after(mdev);
1327 changes |= _drbd_resume_next(mdev);
1328 } while (changes);
1329 }
1330 write_unlock_irq(&global_state_lock);
1331 return retcode;
1332 }
1333
1334 static void ping_peer(struct drbd_conf *mdev)
1335 {
1336 clear_bit(GOT_PING_ACK, &mdev->flags);
1337 request_ping(mdev);
1338 wait_event(mdev->misc_wait,
1339 test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
1340 }
1341
1342 /**
1343 * drbd_start_resync() - Start the resync process
1344 * @mdev: DRBD device.
1345 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1346 *
1347 * This function might bring you directly into one of the
1348 * C_PAUSED_SYNC_* states.
1349 */
1350 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1351 {
1352 union drbd_state ns;
1353 int r;
1354
1355 if (mdev->state.conn >= C_SYNC_SOURCE) {
1356 dev_err(DEV, "Resync already running!\n");
1357 return;
1358 }
1359
1360 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1361 drbd_rs_cancel_all(mdev);
1362
1363 if (side == C_SYNC_TARGET) {
1364 /* Since application IO was locked out during C_WF_BITMAP_T and
1365 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1366 we check that we might make the data inconsistent. */
1367 r = drbd_khelper(mdev, "before-resync-target");
1368 r = (r >> 8) & 0xff;
1369 if (r > 0) {
1370 dev_info(DEV, "before-resync-target handler returned %d, "
1371 "dropping connection.\n", r);
1372 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1373 return;
1374 }
1375 }
1376
1377 drbd_state_lock(mdev);
1378
1379 if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1380 drbd_state_unlock(mdev);
1381 return;
1382 }
1383
1384 if (side == C_SYNC_TARGET) {
1385 mdev->bm_resync_fo = 0;
1386 } else /* side == C_SYNC_SOURCE */ {
1387 u64 uuid;
1388
1389 get_random_bytes(&uuid, sizeof(u64));
1390 drbd_uuid_set(mdev, UI_BITMAP, uuid);
1391 drbd_send_sync_uuid(mdev, uuid);
1392
1393 D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1394 }
1395
1396 write_lock_irq(&global_state_lock);
1397 ns = mdev->state;
1398
1399 ns.aftr_isp = !_drbd_may_sync_now(mdev);
1400
1401 ns.conn = side;
1402
1403 if (side == C_SYNC_TARGET)
1404 ns.disk = D_INCONSISTENT;
1405 else /* side == C_SYNC_SOURCE */
1406 ns.pdsk = D_INCONSISTENT;
1407
1408 r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1409 ns = mdev->state;
1410
1411 if (ns.conn < C_CONNECTED)
1412 r = SS_UNKNOWN_ERROR;
1413
1414 if (r == SS_SUCCESS) {
1415 mdev->rs_total =
1416 mdev->rs_mark_left = drbd_bm_total_weight(mdev);
1417 mdev->rs_failed = 0;
1418 mdev->rs_paused = 0;
1419 mdev->rs_start =
1420 mdev->rs_mark_time = jiffies;
1421 mdev->rs_same_csum = 0;
1422 _drbd_pause_after(mdev);
1423 }
1424 write_unlock_irq(&global_state_lock);
1425 put_ldev(mdev);
1426
1427 if (r == SS_SUCCESS) {
1428 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1429 drbd_conn_str(ns.conn),
1430 (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1431 (unsigned long) mdev->rs_total);
1432
1433 if (mdev->rs_total == 0) {
1434 /* Peer still reachable? Beware of failing before-resync-target handlers! */
1435 ping_peer(mdev);
1436 drbd_resync_finished(mdev);
1437 }
1438
1439 /* ns.conn may already be != mdev->state.conn,
1440 * we may have been paused in between, or become paused until
1441 * the timer triggers.
1442 * No matter, that is handled in resync_timer_fn() */
1443 if (ns.conn == C_SYNC_TARGET)
1444 mod_timer(&mdev->resync_timer, jiffies);
1445
1446 drbd_md_sync(mdev);
1447 }
1448 drbd_state_unlock(mdev);
1449 }
1450
1451 int drbd_worker(struct drbd_thread *thi)
1452 {
1453 struct drbd_conf *mdev = thi->mdev;
1454 struct drbd_work *w = NULL;
1455 LIST_HEAD(work_list);
1456 int intr = 0, i;
1457
1458 sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1459
1460 while (get_t_state(thi) == Running) {
1461 drbd_thread_current_set_cpu(mdev);
1462
1463 if (down_trylock(&mdev->data.work.s)) {
1464 mutex_lock(&mdev->data.mutex);
1465 if (mdev->data.socket && !mdev->net_conf->no_cork)
1466 drbd_tcp_uncork(mdev->data.socket);
1467 mutex_unlock(&mdev->data.mutex);
1468
1469 intr = down_interruptible(&mdev->data.work.s);
1470
1471 mutex_lock(&mdev->data.mutex);
1472 if (mdev->data.socket && !mdev->net_conf->no_cork)
1473 drbd_tcp_cork(mdev->data.socket);
1474 mutex_unlock(&mdev->data.mutex);
1475 }
1476
1477 if (intr) {
1478 D_ASSERT(intr == -EINTR);
1479 flush_signals(current);
1480 ERR_IF (get_t_state(thi) == Running)
1481 continue;
1482 break;
1483 }
1484
1485 if (get_t_state(thi) != Running)
1486 break;
1487 /* With this break, we have done a down() but not consumed
1488 the entry from the list. The cleanup code takes care of
1489 this... */
1490
1491 w = NULL;
1492 spin_lock_irq(&mdev->data.work.q_lock);
1493 ERR_IF(list_empty(&mdev->data.work.q)) {
1494 /* something terribly wrong in our logic.
1495 * we were able to down() the semaphore,
1496 * but the list is empty... doh.
1497 *
1498 * what is the best thing to do now?
1499 * try again from scratch, restarting the receiver,
1500 * asender, whatnot? could break even more ugly,
1501 * e.g. when we are primary, but no good local data.
1502 *
1503 * I'll try to get away just starting over this loop.
1504 */
1505 spin_unlock_irq(&mdev->data.work.q_lock);
1506 continue;
1507 }
1508 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1509 list_del_init(&w->list);
1510 spin_unlock_irq(&mdev->data.work.q_lock);
1511
1512 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1513 /* dev_warn(DEV, "worker: a callback failed! \n"); */
1514 if (mdev->state.conn >= C_CONNECTED)
1515 drbd_force_state(mdev,
1516 NS(conn, C_NETWORK_FAILURE));
1517 }
1518 }
1519 D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1520 D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1521
1522 spin_lock_irq(&mdev->data.work.q_lock);
1523 i = 0;
1524 while (!list_empty(&mdev->data.work.q)) {
1525 list_splice_init(&mdev->data.work.q, &work_list);
1526 spin_unlock_irq(&mdev->data.work.q_lock);
1527
1528 while (!list_empty(&work_list)) {
1529 w = list_entry(work_list.next, struct drbd_work, list);
1530 list_del_init(&w->list);
1531 w->cb(mdev, w, 1);
1532 i++; /* dead debugging code */
1533 }
1534
1535 spin_lock_irq(&mdev->data.work.q_lock);
1536 }
1537 sema_init(&mdev->data.work.s, 0);
1538 /* DANGEROUS race: if someone did queue his work within the spinlock,
1539 * but up() ed outside the spinlock, we could get an up() on the
1540 * semaphore without corresponding list entry.
1541 * So don't do that.
1542 */
1543 spin_unlock_irq(&mdev->data.work.q_lock);
1544
1545 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1546 /* _drbd_set_state only uses stop_nowait.
1547 * wait here for the Exiting receiver. */
1548 drbd_thread_stop(&mdev->receiver);
1549 drbd_mdev_cleanup(mdev);
1550
1551 dev_info(DEV, "worker terminated\n");
1552
1553 clear_bit(DEVICE_DYING, &mdev->flags);
1554 clear_bit(CONFIG_PENDING, &mdev->flags);
1555 wake_up(&mdev->state_wait);
1556
1557 return 0;
1558 }
This page took 0.090486 seconds and 5 git commands to generate.