Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/mason/linux...
[deliverable/linux.git] / drivers / block / drbd / drbd_worker.c
1 /*
2 drbd_worker.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24 */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_req.h"
40
41 static int w_make_ov_request(struct drbd_work *w, int cancel);
42
43
44 /* endio handlers:
45 * drbd_md_io_complete (defined here)
46 * drbd_request_endio (defined here)
47 * drbd_peer_request_endio (defined here)
48 * bm_async_io_complete (defined in drbd_bitmap.c)
49 *
50 * For all these callbacks, note the following:
51 * The callbacks will be called in irq context by the IDE drivers,
52 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
53 * Try to get the locking right :)
54 *
55 */
56
57
58 /* About the global_state_lock
59 Each state transition on an device holds a read lock. In case we have
60 to evaluate the resync after dependencies, we grab a write lock, because
61 we need stable states on all devices for that. */
62 rwlock_t global_state_lock;
63
64 /* used for synchronous meta data and bitmap IO
65 * submitted by drbd_md_sync_page_io()
66 */
67 void drbd_md_io_complete(struct bio *bio, int error)
68 {
69 struct drbd_md_io *md_io;
70 struct drbd_conf *mdev;
71
72 md_io = (struct drbd_md_io *)bio->bi_private;
73 mdev = container_of(md_io, struct drbd_conf, md_io);
74
75 md_io->error = error;
76
77 /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
78 * to timeout on the lower level device, and eventually detach from it.
79 * If this io completion runs after that timeout expired, this
80 * drbd_md_put_buffer() may allow us to finally try and re-attach.
81 * During normal operation, this only puts that extra reference
82 * down to 1 again.
83 * Make sure we first drop the reference, and only then signal
84 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
85 * next drbd_md_sync_page_io(), that we trigger the
86 * ASSERT(atomic_read(&mdev->md_io_in_use) == 1) there.
87 */
88 drbd_md_put_buffer(mdev);
89 md_io->done = 1;
90 wake_up(&mdev->misc_wait);
91 bio_put(bio);
92 put_ldev(mdev);
93 }
94
95 /* reads on behalf of the partner,
96 * "submitted" by the receiver
97 */
98 void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
99 {
100 unsigned long flags = 0;
101 struct drbd_conf *mdev = peer_req->w.mdev;
102
103 spin_lock_irqsave(&mdev->tconn->req_lock, flags);
104 mdev->read_cnt += peer_req->i.size >> 9;
105 list_del(&peer_req->w.list);
106 if (list_empty(&mdev->read_ee))
107 wake_up(&mdev->ee_wait);
108 if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
109 __drbd_chk_io_error(mdev, DRBD_READ_ERROR);
110 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
111
112 drbd_queue_work(&mdev->tconn->sender_work, &peer_req->w);
113 put_ldev(mdev);
114 }
115
116 /* writes on behalf of the partner, or resync writes,
117 * "submitted" by the receiver, final stage. */
118 static void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
119 {
120 unsigned long flags = 0;
121 struct drbd_conf *mdev = peer_req->w.mdev;
122 struct drbd_interval i;
123 int do_wake;
124 u64 block_id;
125 int do_al_complete_io;
126
127 /* after we moved peer_req to done_ee,
128 * we may no longer access it,
129 * it may be freed/reused already!
130 * (as soon as we release the req_lock) */
131 i = peer_req->i;
132 do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
133 block_id = peer_req->block_id;
134
135 spin_lock_irqsave(&mdev->tconn->req_lock, flags);
136 mdev->writ_cnt += peer_req->i.size >> 9;
137 list_move_tail(&peer_req->w.list, &mdev->done_ee);
138
139 /*
140 * Do not remove from the write_requests tree here: we did not send the
141 * Ack yet and did not wake possibly waiting conflicting requests.
142 * Removed from the tree from "drbd_process_done_ee" within the
143 * appropriate w.cb (e_end_block/e_end_resync_block) or from
144 * _drbd_clear_done_ee.
145 */
146
147 do_wake = list_empty(block_id == ID_SYNCER ? &mdev->sync_ee : &mdev->active_ee);
148
149 if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
150 __drbd_chk_io_error(mdev, DRBD_WRITE_ERROR);
151 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
152
153 if (block_id == ID_SYNCER)
154 drbd_rs_complete_io(mdev, i.sector);
155
156 if (do_wake)
157 wake_up(&mdev->ee_wait);
158
159 if (do_al_complete_io)
160 drbd_al_complete_io(mdev, &i);
161
162 wake_asender(mdev->tconn);
163 put_ldev(mdev);
164 }
165
166 /* writes on behalf of the partner, or resync writes,
167 * "submitted" by the receiver.
168 */
169 void drbd_peer_request_endio(struct bio *bio, int error)
170 {
171 struct drbd_peer_request *peer_req = bio->bi_private;
172 struct drbd_conf *mdev = peer_req->w.mdev;
173 int uptodate = bio_flagged(bio, BIO_UPTODATE);
174 int is_write = bio_data_dir(bio) == WRITE;
175
176 if (error && __ratelimit(&drbd_ratelimit_state))
177 dev_warn(DEV, "%s: error=%d s=%llus\n",
178 is_write ? "write" : "read", error,
179 (unsigned long long)peer_req->i.sector);
180 if (!error && !uptodate) {
181 if (__ratelimit(&drbd_ratelimit_state))
182 dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
183 is_write ? "write" : "read",
184 (unsigned long long)peer_req->i.sector);
185 /* strange behavior of some lower level drivers...
186 * fail the request by clearing the uptodate flag,
187 * but do not return any error?! */
188 error = -EIO;
189 }
190
191 if (error)
192 set_bit(__EE_WAS_ERROR, &peer_req->flags);
193
194 bio_put(bio); /* no need for the bio anymore */
195 if (atomic_dec_and_test(&peer_req->pending_bios)) {
196 if (is_write)
197 drbd_endio_write_sec_final(peer_req);
198 else
199 drbd_endio_read_sec_final(peer_req);
200 }
201 }
202
203 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
204 */
205 void drbd_request_endio(struct bio *bio, int error)
206 {
207 unsigned long flags;
208 struct drbd_request *req = bio->bi_private;
209 struct drbd_conf *mdev = req->w.mdev;
210 struct bio_and_error m;
211 enum drbd_req_event what;
212 int uptodate = bio_flagged(bio, BIO_UPTODATE);
213
214 if (!error && !uptodate) {
215 dev_warn(DEV, "p %s: setting error to -EIO\n",
216 bio_data_dir(bio) == WRITE ? "write" : "read");
217 /* strange behavior of some lower level drivers...
218 * fail the request by clearing the uptodate flag,
219 * but do not return any error?! */
220 error = -EIO;
221 }
222
223
224 /* If this request was aborted locally before,
225 * but now was completed "successfully",
226 * chances are that this caused arbitrary data corruption.
227 *
228 * "aborting" requests, or force-detaching the disk, is intended for
229 * completely blocked/hung local backing devices which do no longer
230 * complete requests at all, not even do error completions. In this
231 * situation, usually a hard-reset and failover is the only way out.
232 *
233 * By "aborting", basically faking a local error-completion,
234 * we allow for a more graceful swichover by cleanly migrating services.
235 * Still the affected node has to be rebooted "soon".
236 *
237 * By completing these requests, we allow the upper layers to re-use
238 * the associated data pages.
239 *
240 * If later the local backing device "recovers", and now DMAs some data
241 * from disk into the original request pages, in the best case it will
242 * just put random data into unused pages; but typically it will corrupt
243 * meanwhile completely unrelated data, causing all sorts of damage.
244 *
245 * Which means delayed successful completion,
246 * especially for READ requests,
247 * is a reason to panic().
248 *
249 * We assume that a delayed *error* completion is OK,
250 * though we still will complain noisily about it.
251 */
252 if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
253 if (__ratelimit(&drbd_ratelimit_state))
254 dev_emerg(DEV, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
255
256 if (!error)
257 panic("possible random memory corruption caused by delayed completion of aborted local request\n");
258 }
259
260 /* to avoid recursion in __req_mod */
261 if (unlikely(error)) {
262 what = (bio_data_dir(bio) == WRITE)
263 ? WRITE_COMPLETED_WITH_ERROR
264 : (bio_rw(bio) == READ)
265 ? READ_COMPLETED_WITH_ERROR
266 : READ_AHEAD_COMPLETED_WITH_ERROR;
267 } else
268 what = COMPLETED_OK;
269
270 bio_put(req->private_bio);
271 req->private_bio = ERR_PTR(error);
272
273 /* not req_mod(), we need irqsave here! */
274 spin_lock_irqsave(&mdev->tconn->req_lock, flags);
275 __req_mod(req, what, &m);
276 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
277 put_ldev(mdev);
278
279 if (m.bio)
280 complete_master_bio(mdev, &m);
281 }
282
283 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm,
284 struct drbd_peer_request *peer_req, void *digest)
285 {
286 struct hash_desc desc;
287 struct scatterlist sg;
288 struct page *page = peer_req->pages;
289 struct page *tmp;
290 unsigned len;
291
292 desc.tfm = tfm;
293 desc.flags = 0;
294
295 sg_init_table(&sg, 1);
296 crypto_hash_init(&desc);
297
298 while ((tmp = page_chain_next(page))) {
299 /* all but the last page will be fully used */
300 sg_set_page(&sg, page, PAGE_SIZE, 0);
301 crypto_hash_update(&desc, &sg, sg.length);
302 page = tmp;
303 }
304 /* and now the last, possibly only partially used page */
305 len = peer_req->i.size & (PAGE_SIZE - 1);
306 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
307 crypto_hash_update(&desc, &sg, sg.length);
308 crypto_hash_final(&desc, digest);
309 }
310
311 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
312 {
313 struct hash_desc desc;
314 struct scatterlist sg;
315 struct bio_vec *bvec;
316 int i;
317
318 desc.tfm = tfm;
319 desc.flags = 0;
320
321 sg_init_table(&sg, 1);
322 crypto_hash_init(&desc);
323
324 bio_for_each_segment(bvec, bio, i) {
325 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
326 crypto_hash_update(&desc, &sg, sg.length);
327 }
328 crypto_hash_final(&desc, digest);
329 }
330
331 /* MAYBE merge common code with w_e_end_ov_req */
332 static int w_e_send_csum(struct drbd_work *w, int cancel)
333 {
334 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
335 struct drbd_conf *mdev = w->mdev;
336 int digest_size;
337 void *digest;
338 int err = 0;
339
340 if (unlikely(cancel))
341 goto out;
342
343 if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
344 goto out;
345
346 digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
347 digest = kmalloc(digest_size, GFP_NOIO);
348 if (digest) {
349 sector_t sector = peer_req->i.sector;
350 unsigned int size = peer_req->i.size;
351 drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
352 /* Free peer_req and pages before send.
353 * In case we block on congestion, we could otherwise run into
354 * some distributed deadlock, if the other side blocks on
355 * congestion as well, because our receiver blocks in
356 * drbd_alloc_pages due to pp_in_use > max_buffers. */
357 drbd_free_peer_req(mdev, peer_req);
358 peer_req = NULL;
359 inc_rs_pending(mdev);
360 err = drbd_send_drequest_csum(mdev, sector, size,
361 digest, digest_size,
362 P_CSUM_RS_REQUEST);
363 kfree(digest);
364 } else {
365 dev_err(DEV, "kmalloc() of digest failed.\n");
366 err = -ENOMEM;
367 }
368
369 out:
370 if (peer_req)
371 drbd_free_peer_req(mdev, peer_req);
372
373 if (unlikely(err))
374 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
375 return err;
376 }
377
378 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
379
380 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
381 {
382 struct drbd_peer_request *peer_req;
383
384 if (!get_ldev(mdev))
385 return -EIO;
386
387 if (drbd_rs_should_slow_down(mdev, sector))
388 goto defer;
389
390 /* GFP_TRY, because if there is no memory available right now, this may
391 * be rescheduled for later. It is "only" background resync, after all. */
392 peer_req = drbd_alloc_peer_req(mdev, ID_SYNCER /* unused */, sector,
393 size, GFP_TRY);
394 if (!peer_req)
395 goto defer;
396
397 peer_req->w.cb = w_e_send_csum;
398 spin_lock_irq(&mdev->tconn->req_lock);
399 list_add(&peer_req->w.list, &mdev->read_ee);
400 spin_unlock_irq(&mdev->tconn->req_lock);
401
402 atomic_add(size >> 9, &mdev->rs_sect_ev);
403 if (drbd_submit_peer_request(mdev, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
404 return 0;
405
406 /* If it failed because of ENOMEM, retry should help. If it failed
407 * because bio_add_page failed (probably broken lower level driver),
408 * retry may or may not help.
409 * If it does not, you may need to force disconnect. */
410 spin_lock_irq(&mdev->tconn->req_lock);
411 list_del(&peer_req->w.list);
412 spin_unlock_irq(&mdev->tconn->req_lock);
413
414 drbd_free_peer_req(mdev, peer_req);
415 defer:
416 put_ldev(mdev);
417 return -EAGAIN;
418 }
419
420 int w_resync_timer(struct drbd_work *w, int cancel)
421 {
422 struct drbd_conf *mdev = w->mdev;
423 switch (mdev->state.conn) {
424 case C_VERIFY_S:
425 w_make_ov_request(w, cancel);
426 break;
427 case C_SYNC_TARGET:
428 w_make_resync_request(w, cancel);
429 break;
430 }
431
432 return 0;
433 }
434
435 void resync_timer_fn(unsigned long data)
436 {
437 struct drbd_conf *mdev = (struct drbd_conf *) data;
438
439 if (list_empty(&mdev->resync_work.list))
440 drbd_queue_work(&mdev->tconn->sender_work, &mdev->resync_work);
441 }
442
443 static void fifo_set(struct fifo_buffer *fb, int value)
444 {
445 int i;
446
447 for (i = 0; i < fb->size; i++)
448 fb->values[i] = value;
449 }
450
451 static int fifo_push(struct fifo_buffer *fb, int value)
452 {
453 int ov;
454
455 ov = fb->values[fb->head_index];
456 fb->values[fb->head_index++] = value;
457
458 if (fb->head_index >= fb->size)
459 fb->head_index = 0;
460
461 return ov;
462 }
463
464 static void fifo_add_val(struct fifo_buffer *fb, int value)
465 {
466 int i;
467
468 for (i = 0; i < fb->size; i++)
469 fb->values[i] += value;
470 }
471
472 struct fifo_buffer *fifo_alloc(int fifo_size)
473 {
474 struct fifo_buffer *fb;
475
476 fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
477 if (!fb)
478 return NULL;
479
480 fb->head_index = 0;
481 fb->size = fifo_size;
482 fb->total = 0;
483
484 return fb;
485 }
486
487 static int drbd_rs_controller(struct drbd_conf *mdev)
488 {
489 struct disk_conf *dc;
490 unsigned int sect_in; /* Number of sectors that came in since the last turn */
491 unsigned int want; /* The number of sectors we want in the proxy */
492 int req_sect; /* Number of sectors to request in this turn */
493 int correction; /* Number of sectors more we need in the proxy*/
494 int cps; /* correction per invocation of drbd_rs_controller() */
495 int steps; /* Number of time steps to plan ahead */
496 int curr_corr;
497 int max_sect;
498 struct fifo_buffer *plan;
499
500 sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
501 mdev->rs_in_flight -= sect_in;
502
503 dc = rcu_dereference(mdev->ldev->disk_conf);
504 plan = rcu_dereference(mdev->rs_plan_s);
505
506 steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
507
508 if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
509 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
510 } else { /* normal path */
511 want = dc->c_fill_target ? dc->c_fill_target :
512 sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
513 }
514
515 correction = want - mdev->rs_in_flight - plan->total;
516
517 /* Plan ahead */
518 cps = correction / steps;
519 fifo_add_val(plan, cps);
520 plan->total += cps * steps;
521
522 /* What we do in this step */
523 curr_corr = fifo_push(plan, 0);
524 plan->total -= curr_corr;
525
526 req_sect = sect_in + curr_corr;
527 if (req_sect < 0)
528 req_sect = 0;
529
530 max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
531 if (req_sect > max_sect)
532 req_sect = max_sect;
533
534 /*
535 dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
536 sect_in, mdev->rs_in_flight, want, correction,
537 steps, cps, mdev->rs_planed, curr_corr, req_sect);
538 */
539
540 return req_sect;
541 }
542
543 static int drbd_rs_number_requests(struct drbd_conf *mdev)
544 {
545 int number;
546
547 rcu_read_lock();
548 if (rcu_dereference(mdev->rs_plan_s)->size) {
549 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
550 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
551 } else {
552 mdev->c_sync_rate = rcu_dereference(mdev->ldev->disk_conf)->resync_rate;
553 number = SLEEP_TIME * mdev->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ);
554 }
555 rcu_read_unlock();
556
557 /* ignore the amount of pending requests, the resync controller should
558 * throttle down to incoming reply rate soon enough anyways. */
559 return number;
560 }
561
562 int w_make_resync_request(struct drbd_work *w, int cancel)
563 {
564 struct drbd_conf *mdev = w->mdev;
565 unsigned long bit;
566 sector_t sector;
567 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
568 int max_bio_size;
569 int number, rollback_i, size;
570 int align, queued, sndbuf;
571 int i = 0;
572
573 if (unlikely(cancel))
574 return 0;
575
576 if (mdev->rs_total == 0) {
577 /* empty resync? */
578 drbd_resync_finished(mdev);
579 return 0;
580 }
581
582 if (!get_ldev(mdev)) {
583 /* Since we only need to access mdev->rsync a
584 get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
585 to continue resync with a broken disk makes no sense at
586 all */
587 dev_err(DEV, "Disk broke down during resync!\n");
588 return 0;
589 }
590
591 max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
592 number = drbd_rs_number_requests(mdev);
593 if (number == 0)
594 goto requeue;
595
596 for (i = 0; i < number; i++) {
597 /* Stop generating RS requests, when half of the send buffer is filled */
598 mutex_lock(&mdev->tconn->data.mutex);
599 if (mdev->tconn->data.socket) {
600 queued = mdev->tconn->data.socket->sk->sk_wmem_queued;
601 sndbuf = mdev->tconn->data.socket->sk->sk_sndbuf;
602 } else {
603 queued = 1;
604 sndbuf = 0;
605 }
606 mutex_unlock(&mdev->tconn->data.mutex);
607 if (queued > sndbuf / 2)
608 goto requeue;
609
610 next_sector:
611 size = BM_BLOCK_SIZE;
612 bit = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
613
614 if (bit == DRBD_END_OF_BITMAP) {
615 mdev->bm_resync_fo = drbd_bm_bits(mdev);
616 put_ldev(mdev);
617 return 0;
618 }
619
620 sector = BM_BIT_TO_SECT(bit);
621
622 if (drbd_rs_should_slow_down(mdev, sector) ||
623 drbd_try_rs_begin_io(mdev, sector)) {
624 mdev->bm_resync_fo = bit;
625 goto requeue;
626 }
627 mdev->bm_resync_fo = bit + 1;
628
629 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
630 drbd_rs_complete_io(mdev, sector);
631 goto next_sector;
632 }
633
634 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
635 /* try to find some adjacent bits.
636 * we stop if we have already the maximum req size.
637 *
638 * Additionally always align bigger requests, in order to
639 * be prepared for all stripe sizes of software RAIDs.
640 */
641 align = 1;
642 rollback_i = i;
643 for (;;) {
644 if (size + BM_BLOCK_SIZE > max_bio_size)
645 break;
646
647 /* Be always aligned */
648 if (sector & ((1<<(align+3))-1))
649 break;
650
651 /* do not cross extent boundaries */
652 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
653 break;
654 /* now, is it actually dirty, after all?
655 * caution, drbd_bm_test_bit is tri-state for some
656 * obscure reason; ( b == 0 ) would get the out-of-band
657 * only accidentally right because of the "oddly sized"
658 * adjustment below */
659 if (drbd_bm_test_bit(mdev, bit+1) != 1)
660 break;
661 bit++;
662 size += BM_BLOCK_SIZE;
663 if ((BM_BLOCK_SIZE << align) <= size)
664 align++;
665 i++;
666 }
667 /* if we merged some,
668 * reset the offset to start the next drbd_bm_find_next from */
669 if (size > BM_BLOCK_SIZE)
670 mdev->bm_resync_fo = bit + 1;
671 #endif
672
673 /* adjust very last sectors, in case we are oddly sized */
674 if (sector + (size>>9) > capacity)
675 size = (capacity-sector)<<9;
676 if (mdev->tconn->agreed_pro_version >= 89 && mdev->tconn->csums_tfm) {
677 switch (read_for_csum(mdev, sector, size)) {
678 case -EIO: /* Disk failure */
679 put_ldev(mdev);
680 return -EIO;
681 case -EAGAIN: /* allocation failed, or ldev busy */
682 drbd_rs_complete_io(mdev, sector);
683 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
684 i = rollback_i;
685 goto requeue;
686 case 0:
687 /* everything ok */
688 break;
689 default:
690 BUG();
691 }
692 } else {
693 int err;
694
695 inc_rs_pending(mdev);
696 err = drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
697 sector, size, ID_SYNCER);
698 if (err) {
699 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
700 dec_rs_pending(mdev);
701 put_ldev(mdev);
702 return err;
703 }
704 }
705 }
706
707 if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
708 /* last syncer _request_ was sent,
709 * but the P_RS_DATA_REPLY not yet received. sync will end (and
710 * next sync group will resume), as soon as we receive the last
711 * resync data block, and the last bit is cleared.
712 * until then resync "work" is "inactive" ...
713 */
714 put_ldev(mdev);
715 return 0;
716 }
717
718 requeue:
719 mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
720 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
721 put_ldev(mdev);
722 return 0;
723 }
724
725 static int w_make_ov_request(struct drbd_work *w, int cancel)
726 {
727 struct drbd_conf *mdev = w->mdev;
728 int number, i, size;
729 sector_t sector;
730 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
731 bool stop_sector_reached = false;
732
733 if (unlikely(cancel))
734 return 1;
735
736 number = drbd_rs_number_requests(mdev);
737
738 sector = mdev->ov_position;
739 for (i = 0; i < number; i++) {
740 if (sector >= capacity)
741 return 1;
742
743 /* We check for "finished" only in the reply path:
744 * w_e_end_ov_reply().
745 * We need to send at least one request out. */
746 stop_sector_reached = i > 0
747 && verify_can_do_stop_sector(mdev)
748 && sector >= mdev->ov_stop_sector;
749 if (stop_sector_reached)
750 break;
751
752 size = BM_BLOCK_SIZE;
753
754 if (drbd_rs_should_slow_down(mdev, sector) ||
755 drbd_try_rs_begin_io(mdev, sector)) {
756 mdev->ov_position = sector;
757 goto requeue;
758 }
759
760 if (sector + (size>>9) > capacity)
761 size = (capacity-sector)<<9;
762
763 inc_rs_pending(mdev);
764 if (drbd_send_ov_request(mdev, sector, size)) {
765 dec_rs_pending(mdev);
766 return 0;
767 }
768 sector += BM_SECT_PER_BIT;
769 }
770 mdev->ov_position = sector;
771
772 requeue:
773 mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
774 if (i == 0 || !stop_sector_reached)
775 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
776 return 1;
777 }
778
779 int w_ov_finished(struct drbd_work *w, int cancel)
780 {
781 struct drbd_conf *mdev = w->mdev;
782 kfree(w);
783 ov_out_of_sync_print(mdev);
784 drbd_resync_finished(mdev);
785
786 return 0;
787 }
788
789 static int w_resync_finished(struct drbd_work *w, int cancel)
790 {
791 struct drbd_conf *mdev = w->mdev;
792 kfree(w);
793
794 drbd_resync_finished(mdev);
795
796 return 0;
797 }
798
799 static void ping_peer(struct drbd_conf *mdev)
800 {
801 struct drbd_tconn *tconn = mdev->tconn;
802
803 clear_bit(GOT_PING_ACK, &tconn->flags);
804 request_ping(tconn);
805 wait_event(tconn->ping_wait,
806 test_bit(GOT_PING_ACK, &tconn->flags) || mdev->state.conn < C_CONNECTED);
807 }
808
809 int drbd_resync_finished(struct drbd_conf *mdev)
810 {
811 unsigned long db, dt, dbdt;
812 unsigned long n_oos;
813 union drbd_state os, ns;
814 struct drbd_work *w;
815 char *khelper_cmd = NULL;
816 int verify_done = 0;
817
818 /* Remove all elements from the resync LRU. Since future actions
819 * might set bits in the (main) bitmap, then the entries in the
820 * resync LRU would be wrong. */
821 if (drbd_rs_del_all(mdev)) {
822 /* In case this is not possible now, most probably because
823 * there are P_RS_DATA_REPLY Packets lingering on the worker's
824 * queue (or even the read operations for those packets
825 * is not finished by now). Retry in 100ms. */
826
827 schedule_timeout_interruptible(HZ / 10);
828 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
829 if (w) {
830 w->cb = w_resync_finished;
831 w->mdev = mdev;
832 drbd_queue_work(&mdev->tconn->sender_work, w);
833 return 1;
834 }
835 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
836 }
837
838 dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
839 if (dt <= 0)
840 dt = 1;
841
842 db = mdev->rs_total;
843 /* adjust for verify start and stop sectors, respective reached position */
844 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
845 db -= mdev->ov_left;
846
847 dbdt = Bit2KB(db/dt);
848 mdev->rs_paused /= HZ;
849
850 if (!get_ldev(mdev))
851 goto out;
852
853 ping_peer(mdev);
854
855 spin_lock_irq(&mdev->tconn->req_lock);
856 os = drbd_read_state(mdev);
857
858 verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
859
860 /* This protects us against multiple calls (that can happen in the presence
861 of application IO), and against connectivity loss just before we arrive here. */
862 if (os.conn <= C_CONNECTED)
863 goto out_unlock;
864
865 ns = os;
866 ns.conn = C_CONNECTED;
867
868 dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
869 verify_done ? "Online verify" : "Resync",
870 dt + mdev->rs_paused, mdev->rs_paused, dbdt);
871
872 n_oos = drbd_bm_total_weight(mdev);
873
874 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
875 if (n_oos) {
876 dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
877 n_oos, Bit2KB(1));
878 khelper_cmd = "out-of-sync";
879 }
880 } else {
881 D_ASSERT((n_oos - mdev->rs_failed) == 0);
882
883 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
884 khelper_cmd = "after-resync-target";
885
886 if (mdev->tconn->csums_tfm && mdev->rs_total) {
887 const unsigned long s = mdev->rs_same_csum;
888 const unsigned long t = mdev->rs_total;
889 const int ratio =
890 (t == 0) ? 0 :
891 (t < 100000) ? ((s*100)/t) : (s/(t/100));
892 dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
893 "transferred %luK total %luK\n",
894 ratio,
895 Bit2KB(mdev->rs_same_csum),
896 Bit2KB(mdev->rs_total - mdev->rs_same_csum),
897 Bit2KB(mdev->rs_total));
898 }
899 }
900
901 if (mdev->rs_failed) {
902 dev_info(DEV, " %lu failed blocks\n", mdev->rs_failed);
903
904 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
905 ns.disk = D_INCONSISTENT;
906 ns.pdsk = D_UP_TO_DATE;
907 } else {
908 ns.disk = D_UP_TO_DATE;
909 ns.pdsk = D_INCONSISTENT;
910 }
911 } else {
912 ns.disk = D_UP_TO_DATE;
913 ns.pdsk = D_UP_TO_DATE;
914
915 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
916 if (mdev->p_uuid) {
917 int i;
918 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
919 _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
920 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
921 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
922 } else {
923 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
924 }
925 }
926
927 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
928 /* for verify runs, we don't update uuids here,
929 * so there would be nothing to report. */
930 drbd_uuid_set_bm(mdev, 0UL);
931 drbd_print_uuids(mdev, "updated UUIDs");
932 if (mdev->p_uuid) {
933 /* Now the two UUID sets are equal, update what we
934 * know of the peer. */
935 int i;
936 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
937 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
938 }
939 }
940 }
941
942 _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
943 out_unlock:
944 spin_unlock_irq(&mdev->tconn->req_lock);
945 put_ldev(mdev);
946 out:
947 mdev->rs_total = 0;
948 mdev->rs_failed = 0;
949 mdev->rs_paused = 0;
950
951 /* reset start sector, if we reached end of device */
952 if (verify_done && mdev->ov_left == 0)
953 mdev->ov_start_sector = 0;
954
955 drbd_md_sync(mdev);
956
957 if (khelper_cmd)
958 drbd_khelper(mdev, khelper_cmd);
959
960 return 1;
961 }
962
963 /* helper */
964 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
965 {
966 if (drbd_peer_req_has_active_page(peer_req)) {
967 /* This might happen if sendpage() has not finished */
968 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
969 atomic_add(i, &mdev->pp_in_use_by_net);
970 atomic_sub(i, &mdev->pp_in_use);
971 spin_lock_irq(&mdev->tconn->req_lock);
972 list_add_tail(&peer_req->w.list, &mdev->net_ee);
973 spin_unlock_irq(&mdev->tconn->req_lock);
974 wake_up(&drbd_pp_wait);
975 } else
976 drbd_free_peer_req(mdev, peer_req);
977 }
978
979 /**
980 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
981 * @mdev: DRBD device.
982 * @w: work object.
983 * @cancel: The connection will be closed anyways
984 */
985 int w_e_end_data_req(struct drbd_work *w, int cancel)
986 {
987 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
988 struct drbd_conf *mdev = w->mdev;
989 int err;
990
991 if (unlikely(cancel)) {
992 drbd_free_peer_req(mdev, peer_req);
993 dec_unacked(mdev);
994 return 0;
995 }
996
997 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
998 err = drbd_send_block(mdev, P_DATA_REPLY, peer_req);
999 } else {
1000 if (__ratelimit(&drbd_ratelimit_state))
1001 dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
1002 (unsigned long long)peer_req->i.sector);
1003
1004 err = drbd_send_ack(mdev, P_NEG_DREPLY, peer_req);
1005 }
1006
1007 dec_unacked(mdev);
1008
1009 move_to_net_ee_or_free(mdev, peer_req);
1010
1011 if (unlikely(err))
1012 dev_err(DEV, "drbd_send_block() failed\n");
1013 return err;
1014 }
1015
1016 /**
1017 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1018 * @mdev: DRBD device.
1019 * @w: work object.
1020 * @cancel: The connection will be closed anyways
1021 */
1022 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1023 {
1024 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1025 struct drbd_conf *mdev = w->mdev;
1026 int err;
1027
1028 if (unlikely(cancel)) {
1029 drbd_free_peer_req(mdev, peer_req);
1030 dec_unacked(mdev);
1031 return 0;
1032 }
1033
1034 if (get_ldev_if_state(mdev, D_FAILED)) {
1035 drbd_rs_complete_io(mdev, peer_req->i.sector);
1036 put_ldev(mdev);
1037 }
1038
1039 if (mdev->state.conn == C_AHEAD) {
1040 err = drbd_send_ack(mdev, P_RS_CANCEL, peer_req);
1041 } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1042 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
1043 inc_rs_pending(mdev);
1044 err = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
1045 } else {
1046 if (__ratelimit(&drbd_ratelimit_state))
1047 dev_err(DEV, "Not sending RSDataReply, "
1048 "partner DISKLESS!\n");
1049 err = 0;
1050 }
1051 } else {
1052 if (__ratelimit(&drbd_ratelimit_state))
1053 dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1054 (unsigned long long)peer_req->i.sector);
1055
1056 err = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
1057
1058 /* update resync data with failure */
1059 drbd_rs_failed_io(mdev, peer_req->i.sector, peer_req->i.size);
1060 }
1061
1062 dec_unacked(mdev);
1063
1064 move_to_net_ee_or_free(mdev, peer_req);
1065
1066 if (unlikely(err))
1067 dev_err(DEV, "drbd_send_block() failed\n");
1068 return err;
1069 }
1070
1071 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1072 {
1073 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1074 struct drbd_conf *mdev = w->mdev;
1075 struct digest_info *di;
1076 int digest_size;
1077 void *digest = NULL;
1078 int err, eq = 0;
1079
1080 if (unlikely(cancel)) {
1081 drbd_free_peer_req(mdev, peer_req);
1082 dec_unacked(mdev);
1083 return 0;
1084 }
1085
1086 if (get_ldev(mdev)) {
1087 drbd_rs_complete_io(mdev, peer_req->i.sector);
1088 put_ldev(mdev);
1089 }
1090
1091 di = peer_req->digest;
1092
1093 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1094 /* quick hack to try to avoid a race against reconfiguration.
1095 * a real fix would be much more involved,
1096 * introducing more locking mechanisms */
1097 if (mdev->tconn->csums_tfm) {
1098 digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
1099 D_ASSERT(digest_size == di->digest_size);
1100 digest = kmalloc(digest_size, GFP_NOIO);
1101 }
1102 if (digest) {
1103 drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
1104 eq = !memcmp(digest, di->digest, digest_size);
1105 kfree(digest);
1106 }
1107
1108 if (eq) {
1109 drbd_set_in_sync(mdev, peer_req->i.sector, peer_req->i.size);
1110 /* rs_same_csums unit is BM_BLOCK_SIZE */
1111 mdev->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1112 err = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, peer_req);
1113 } else {
1114 inc_rs_pending(mdev);
1115 peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1116 peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1117 kfree(di);
1118 err = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
1119 }
1120 } else {
1121 err = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
1122 if (__ratelimit(&drbd_ratelimit_state))
1123 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1124 }
1125
1126 dec_unacked(mdev);
1127 move_to_net_ee_or_free(mdev, peer_req);
1128
1129 if (unlikely(err))
1130 dev_err(DEV, "drbd_send_block/ack() failed\n");
1131 return err;
1132 }
1133
1134 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1135 {
1136 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1137 struct drbd_conf *mdev = w->mdev;
1138 sector_t sector = peer_req->i.sector;
1139 unsigned int size = peer_req->i.size;
1140 int digest_size;
1141 void *digest;
1142 int err = 0;
1143
1144 if (unlikely(cancel))
1145 goto out;
1146
1147 digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1148 digest = kmalloc(digest_size, GFP_NOIO);
1149 if (!digest) {
1150 err = 1; /* terminate the connection in case the allocation failed */
1151 goto out;
1152 }
1153
1154 if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1155 drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1156 else
1157 memset(digest, 0, digest_size);
1158
1159 /* Free e and pages before send.
1160 * In case we block on congestion, we could otherwise run into
1161 * some distributed deadlock, if the other side blocks on
1162 * congestion as well, because our receiver blocks in
1163 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1164 drbd_free_peer_req(mdev, peer_req);
1165 peer_req = NULL;
1166 inc_rs_pending(mdev);
1167 err = drbd_send_drequest_csum(mdev, sector, size, digest, digest_size, P_OV_REPLY);
1168 if (err)
1169 dec_rs_pending(mdev);
1170 kfree(digest);
1171
1172 out:
1173 if (peer_req)
1174 drbd_free_peer_req(mdev, peer_req);
1175 dec_unacked(mdev);
1176 return err;
1177 }
1178
1179 void drbd_ov_out_of_sync_found(struct drbd_conf *mdev, sector_t sector, int size)
1180 {
1181 if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1182 mdev->ov_last_oos_size += size>>9;
1183 } else {
1184 mdev->ov_last_oos_start = sector;
1185 mdev->ov_last_oos_size = size>>9;
1186 }
1187 drbd_set_out_of_sync(mdev, sector, size);
1188 }
1189
1190 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1191 {
1192 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1193 struct drbd_conf *mdev = w->mdev;
1194 struct digest_info *di;
1195 void *digest;
1196 sector_t sector = peer_req->i.sector;
1197 unsigned int size = peer_req->i.size;
1198 int digest_size;
1199 int err, eq = 0;
1200 bool stop_sector_reached = false;
1201
1202 if (unlikely(cancel)) {
1203 drbd_free_peer_req(mdev, peer_req);
1204 dec_unacked(mdev);
1205 return 0;
1206 }
1207
1208 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1209 * the resync lru has been cleaned up already */
1210 if (get_ldev(mdev)) {
1211 drbd_rs_complete_io(mdev, peer_req->i.sector);
1212 put_ldev(mdev);
1213 }
1214
1215 di = peer_req->digest;
1216
1217 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1218 digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1219 digest = kmalloc(digest_size, GFP_NOIO);
1220 if (digest) {
1221 drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1222
1223 D_ASSERT(digest_size == di->digest_size);
1224 eq = !memcmp(digest, di->digest, digest_size);
1225 kfree(digest);
1226 }
1227 }
1228
1229 /* Free peer_req and pages before send.
1230 * In case we block on congestion, we could otherwise run into
1231 * some distributed deadlock, if the other side blocks on
1232 * congestion as well, because our receiver blocks in
1233 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1234 drbd_free_peer_req(mdev, peer_req);
1235 if (!eq)
1236 drbd_ov_out_of_sync_found(mdev, sector, size);
1237 else
1238 ov_out_of_sync_print(mdev);
1239
1240 err = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1241 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1242
1243 dec_unacked(mdev);
1244
1245 --mdev->ov_left;
1246
1247 /* let's advance progress step marks only for every other megabyte */
1248 if ((mdev->ov_left & 0x200) == 0x200)
1249 drbd_advance_rs_marks(mdev, mdev->ov_left);
1250
1251 stop_sector_reached = verify_can_do_stop_sector(mdev) &&
1252 (sector + (size>>9)) >= mdev->ov_stop_sector;
1253
1254 if (mdev->ov_left == 0 || stop_sector_reached) {
1255 ov_out_of_sync_print(mdev);
1256 drbd_resync_finished(mdev);
1257 }
1258
1259 return err;
1260 }
1261
1262 int w_prev_work_done(struct drbd_work *w, int cancel)
1263 {
1264 struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1265
1266 complete(&b->done);
1267 return 0;
1268 }
1269
1270 /* FIXME
1271 * We need to track the number of pending barrier acks,
1272 * and to be able to wait for them.
1273 * See also comment in drbd_adm_attach before drbd_suspend_io.
1274 */
1275 int drbd_send_barrier(struct drbd_tconn *tconn)
1276 {
1277 struct p_barrier *p;
1278 struct drbd_socket *sock;
1279
1280 sock = &tconn->data;
1281 p = conn_prepare_command(tconn, sock);
1282 if (!p)
1283 return -EIO;
1284 p->barrier = tconn->send.current_epoch_nr;
1285 p->pad = 0;
1286 tconn->send.current_epoch_writes = 0;
1287
1288 return conn_send_command(tconn, sock, P_BARRIER, sizeof(*p), NULL, 0);
1289 }
1290
1291 int w_send_write_hint(struct drbd_work *w, int cancel)
1292 {
1293 struct drbd_conf *mdev = w->mdev;
1294 struct drbd_socket *sock;
1295
1296 if (cancel)
1297 return 0;
1298 sock = &mdev->tconn->data;
1299 if (!drbd_prepare_command(mdev, sock))
1300 return -EIO;
1301 return drbd_send_command(mdev, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1302 }
1303
1304 static void re_init_if_first_write(struct drbd_tconn *tconn, unsigned int epoch)
1305 {
1306 if (!tconn->send.seen_any_write_yet) {
1307 tconn->send.seen_any_write_yet = true;
1308 tconn->send.current_epoch_nr = epoch;
1309 tconn->send.current_epoch_writes = 0;
1310 }
1311 }
1312
1313 static void maybe_send_barrier(struct drbd_tconn *tconn, unsigned int epoch)
1314 {
1315 /* re-init if first write on this connection */
1316 if (!tconn->send.seen_any_write_yet)
1317 return;
1318 if (tconn->send.current_epoch_nr != epoch) {
1319 if (tconn->send.current_epoch_writes)
1320 drbd_send_barrier(tconn);
1321 tconn->send.current_epoch_nr = epoch;
1322 }
1323 }
1324
1325 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1326 {
1327 struct drbd_request *req = container_of(w, struct drbd_request, w);
1328 struct drbd_conf *mdev = w->mdev;
1329 struct drbd_tconn *tconn = mdev->tconn;
1330 int err;
1331
1332 if (unlikely(cancel)) {
1333 req_mod(req, SEND_CANCELED);
1334 return 0;
1335 }
1336
1337 /* this time, no tconn->send.current_epoch_writes++;
1338 * If it was sent, it was the closing barrier for the last
1339 * replicated epoch, before we went into AHEAD mode.
1340 * No more barriers will be sent, until we leave AHEAD mode again. */
1341 maybe_send_barrier(tconn, req->epoch);
1342
1343 err = drbd_send_out_of_sync(mdev, req);
1344 req_mod(req, OOS_HANDED_TO_NETWORK);
1345
1346 return err;
1347 }
1348
1349 /**
1350 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1351 * @mdev: DRBD device.
1352 * @w: work object.
1353 * @cancel: The connection will be closed anyways
1354 */
1355 int w_send_dblock(struct drbd_work *w, int cancel)
1356 {
1357 struct drbd_request *req = container_of(w, struct drbd_request, w);
1358 struct drbd_conf *mdev = w->mdev;
1359 struct drbd_tconn *tconn = mdev->tconn;
1360 int err;
1361
1362 if (unlikely(cancel)) {
1363 req_mod(req, SEND_CANCELED);
1364 return 0;
1365 }
1366
1367 re_init_if_first_write(tconn, req->epoch);
1368 maybe_send_barrier(tconn, req->epoch);
1369 tconn->send.current_epoch_writes++;
1370
1371 err = drbd_send_dblock(mdev, req);
1372 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1373
1374 return err;
1375 }
1376
1377 /**
1378 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1379 * @mdev: DRBD device.
1380 * @w: work object.
1381 * @cancel: The connection will be closed anyways
1382 */
1383 int w_send_read_req(struct drbd_work *w, int cancel)
1384 {
1385 struct drbd_request *req = container_of(w, struct drbd_request, w);
1386 struct drbd_conf *mdev = w->mdev;
1387 struct drbd_tconn *tconn = mdev->tconn;
1388 int err;
1389
1390 if (unlikely(cancel)) {
1391 req_mod(req, SEND_CANCELED);
1392 return 0;
1393 }
1394
1395 /* Even read requests may close a write epoch,
1396 * if there was any yet. */
1397 maybe_send_barrier(tconn, req->epoch);
1398
1399 err = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size,
1400 (unsigned long)req);
1401
1402 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1403
1404 return err;
1405 }
1406
1407 int w_restart_disk_io(struct drbd_work *w, int cancel)
1408 {
1409 struct drbd_request *req = container_of(w, struct drbd_request, w);
1410 struct drbd_conf *mdev = w->mdev;
1411
1412 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1413 drbd_al_begin_io(mdev, &req->i);
1414
1415 drbd_req_make_private_bio(req, req->master_bio);
1416 req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1417 generic_make_request(req->private_bio);
1418
1419 return 0;
1420 }
1421
1422 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1423 {
1424 struct drbd_conf *odev = mdev;
1425 int resync_after;
1426
1427 while (1) {
1428 if (!odev->ldev)
1429 return 1;
1430 rcu_read_lock();
1431 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1432 rcu_read_unlock();
1433 if (resync_after == -1)
1434 return 1;
1435 odev = minor_to_mdev(resync_after);
1436 if (!expect(odev))
1437 return 1;
1438 if ((odev->state.conn >= C_SYNC_SOURCE &&
1439 odev->state.conn <= C_PAUSED_SYNC_T) ||
1440 odev->state.aftr_isp || odev->state.peer_isp ||
1441 odev->state.user_isp)
1442 return 0;
1443 }
1444 }
1445
1446 /**
1447 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1448 * @mdev: DRBD device.
1449 *
1450 * Called from process context only (admin command and after_state_ch).
1451 */
1452 static int _drbd_pause_after(struct drbd_conf *mdev)
1453 {
1454 struct drbd_conf *odev;
1455 int i, rv = 0;
1456
1457 rcu_read_lock();
1458 idr_for_each_entry(&minors, odev, i) {
1459 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1460 continue;
1461 if (!_drbd_may_sync_now(odev))
1462 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1463 != SS_NOTHING_TO_DO);
1464 }
1465 rcu_read_unlock();
1466
1467 return rv;
1468 }
1469
1470 /**
1471 * _drbd_resume_next() - Resume resync on all devices that may resync now
1472 * @mdev: DRBD device.
1473 *
1474 * Called from process context only (admin command and worker).
1475 */
1476 static int _drbd_resume_next(struct drbd_conf *mdev)
1477 {
1478 struct drbd_conf *odev;
1479 int i, rv = 0;
1480
1481 rcu_read_lock();
1482 idr_for_each_entry(&minors, odev, i) {
1483 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1484 continue;
1485 if (odev->state.aftr_isp) {
1486 if (_drbd_may_sync_now(odev))
1487 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1488 CS_HARD, NULL)
1489 != SS_NOTHING_TO_DO) ;
1490 }
1491 }
1492 rcu_read_unlock();
1493 return rv;
1494 }
1495
1496 void resume_next_sg(struct drbd_conf *mdev)
1497 {
1498 write_lock_irq(&global_state_lock);
1499 _drbd_resume_next(mdev);
1500 write_unlock_irq(&global_state_lock);
1501 }
1502
1503 void suspend_other_sg(struct drbd_conf *mdev)
1504 {
1505 write_lock_irq(&global_state_lock);
1506 _drbd_pause_after(mdev);
1507 write_unlock_irq(&global_state_lock);
1508 }
1509
1510 /* caller must hold global_state_lock */
1511 enum drbd_ret_code drbd_resync_after_valid(struct drbd_conf *mdev, int o_minor)
1512 {
1513 struct drbd_conf *odev;
1514 int resync_after;
1515
1516 if (o_minor == -1)
1517 return NO_ERROR;
1518 if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1519 return ERR_RESYNC_AFTER;
1520
1521 /* check for loops */
1522 odev = minor_to_mdev(o_minor);
1523 while (1) {
1524 if (odev == mdev)
1525 return ERR_RESYNC_AFTER_CYCLE;
1526
1527 rcu_read_lock();
1528 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1529 rcu_read_unlock();
1530 /* dependency chain ends here, no cycles. */
1531 if (resync_after == -1)
1532 return NO_ERROR;
1533
1534 /* follow the dependency chain */
1535 odev = minor_to_mdev(resync_after);
1536 }
1537 }
1538
1539 /* caller must hold global_state_lock */
1540 void drbd_resync_after_changed(struct drbd_conf *mdev)
1541 {
1542 int changes;
1543
1544 do {
1545 changes = _drbd_pause_after(mdev);
1546 changes |= _drbd_resume_next(mdev);
1547 } while (changes);
1548 }
1549
1550 void drbd_rs_controller_reset(struct drbd_conf *mdev)
1551 {
1552 struct fifo_buffer *plan;
1553
1554 atomic_set(&mdev->rs_sect_in, 0);
1555 atomic_set(&mdev->rs_sect_ev, 0);
1556 mdev->rs_in_flight = 0;
1557
1558 /* Updating the RCU protected object in place is necessary since
1559 this function gets called from atomic context.
1560 It is valid since all other updates also lead to an completely
1561 empty fifo */
1562 rcu_read_lock();
1563 plan = rcu_dereference(mdev->rs_plan_s);
1564 plan->total = 0;
1565 fifo_set(plan, 0);
1566 rcu_read_unlock();
1567 }
1568
1569 void start_resync_timer_fn(unsigned long data)
1570 {
1571 struct drbd_conf *mdev = (struct drbd_conf *) data;
1572
1573 drbd_queue_work(&mdev->tconn->sender_work, &mdev->start_resync_work);
1574 }
1575
1576 int w_start_resync(struct drbd_work *w, int cancel)
1577 {
1578 struct drbd_conf *mdev = w->mdev;
1579
1580 if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
1581 dev_warn(DEV, "w_start_resync later...\n");
1582 mdev->start_resync_timer.expires = jiffies + HZ/10;
1583 add_timer(&mdev->start_resync_timer);
1584 return 0;
1585 }
1586
1587 drbd_start_resync(mdev, C_SYNC_SOURCE);
1588 clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->flags);
1589 return 0;
1590 }
1591
1592 /**
1593 * drbd_start_resync() - Start the resync process
1594 * @mdev: DRBD device.
1595 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1596 *
1597 * This function might bring you directly into one of the
1598 * C_PAUSED_SYNC_* states.
1599 */
1600 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1601 {
1602 union drbd_state ns;
1603 int r;
1604
1605 if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1606 dev_err(DEV, "Resync already running!\n");
1607 return;
1608 }
1609
1610 if (!test_bit(B_RS_H_DONE, &mdev->flags)) {
1611 if (side == C_SYNC_TARGET) {
1612 /* Since application IO was locked out during C_WF_BITMAP_T and
1613 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1614 we check that we might make the data inconsistent. */
1615 r = drbd_khelper(mdev, "before-resync-target");
1616 r = (r >> 8) & 0xff;
1617 if (r > 0) {
1618 dev_info(DEV, "before-resync-target handler returned %d, "
1619 "dropping connection.\n", r);
1620 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1621 return;
1622 }
1623 } else /* C_SYNC_SOURCE */ {
1624 r = drbd_khelper(mdev, "before-resync-source");
1625 r = (r >> 8) & 0xff;
1626 if (r > 0) {
1627 if (r == 3) {
1628 dev_info(DEV, "before-resync-source handler returned %d, "
1629 "ignoring. Old userland tools?", r);
1630 } else {
1631 dev_info(DEV, "before-resync-source handler returned %d, "
1632 "dropping connection.\n", r);
1633 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1634 return;
1635 }
1636 }
1637 }
1638 }
1639
1640 if (current == mdev->tconn->worker.task) {
1641 /* The worker should not sleep waiting for state_mutex,
1642 that can take long */
1643 if (!mutex_trylock(mdev->state_mutex)) {
1644 set_bit(B_RS_H_DONE, &mdev->flags);
1645 mdev->start_resync_timer.expires = jiffies + HZ/5;
1646 add_timer(&mdev->start_resync_timer);
1647 return;
1648 }
1649 } else {
1650 mutex_lock(mdev->state_mutex);
1651 }
1652 clear_bit(B_RS_H_DONE, &mdev->flags);
1653
1654 write_lock_irq(&global_state_lock);
1655 if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1656 write_unlock_irq(&global_state_lock);
1657 mutex_unlock(mdev->state_mutex);
1658 return;
1659 }
1660
1661 ns = drbd_read_state(mdev);
1662
1663 ns.aftr_isp = !_drbd_may_sync_now(mdev);
1664
1665 ns.conn = side;
1666
1667 if (side == C_SYNC_TARGET)
1668 ns.disk = D_INCONSISTENT;
1669 else /* side == C_SYNC_SOURCE */
1670 ns.pdsk = D_INCONSISTENT;
1671
1672 r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1673 ns = drbd_read_state(mdev);
1674
1675 if (ns.conn < C_CONNECTED)
1676 r = SS_UNKNOWN_ERROR;
1677
1678 if (r == SS_SUCCESS) {
1679 unsigned long tw = drbd_bm_total_weight(mdev);
1680 unsigned long now = jiffies;
1681 int i;
1682
1683 mdev->rs_failed = 0;
1684 mdev->rs_paused = 0;
1685 mdev->rs_same_csum = 0;
1686 mdev->rs_last_events = 0;
1687 mdev->rs_last_sect_ev = 0;
1688 mdev->rs_total = tw;
1689 mdev->rs_start = now;
1690 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1691 mdev->rs_mark_left[i] = tw;
1692 mdev->rs_mark_time[i] = now;
1693 }
1694 _drbd_pause_after(mdev);
1695 }
1696 write_unlock_irq(&global_state_lock);
1697
1698 if (r == SS_SUCCESS) {
1699 /* reset rs_last_bcast when a resync or verify is started,
1700 * to deal with potential jiffies wrap. */
1701 mdev->rs_last_bcast = jiffies - HZ;
1702
1703 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1704 drbd_conn_str(ns.conn),
1705 (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1706 (unsigned long) mdev->rs_total);
1707 if (side == C_SYNC_TARGET)
1708 mdev->bm_resync_fo = 0;
1709
1710 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1711 * with w_send_oos, or the sync target will get confused as to
1712 * how much bits to resync. We cannot do that always, because for an
1713 * empty resync and protocol < 95, we need to do it here, as we call
1714 * drbd_resync_finished from here in that case.
1715 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1716 * and from after_state_ch otherwise. */
1717 if (side == C_SYNC_SOURCE && mdev->tconn->agreed_pro_version < 96)
1718 drbd_gen_and_send_sync_uuid(mdev);
1719
1720 if (mdev->tconn->agreed_pro_version < 95 && mdev->rs_total == 0) {
1721 /* This still has a race (about when exactly the peers
1722 * detect connection loss) that can lead to a full sync
1723 * on next handshake. In 8.3.9 we fixed this with explicit
1724 * resync-finished notifications, but the fix
1725 * introduces a protocol change. Sleeping for some
1726 * time longer than the ping interval + timeout on the
1727 * SyncSource, to give the SyncTarget the chance to
1728 * detect connection loss, then waiting for a ping
1729 * response (implicit in drbd_resync_finished) reduces
1730 * the race considerably, but does not solve it. */
1731 if (side == C_SYNC_SOURCE) {
1732 struct net_conf *nc;
1733 int timeo;
1734
1735 rcu_read_lock();
1736 nc = rcu_dereference(mdev->tconn->net_conf);
1737 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1738 rcu_read_unlock();
1739 schedule_timeout_interruptible(timeo);
1740 }
1741 drbd_resync_finished(mdev);
1742 }
1743
1744 drbd_rs_controller_reset(mdev);
1745 /* ns.conn may already be != mdev->state.conn,
1746 * we may have been paused in between, or become paused until
1747 * the timer triggers.
1748 * No matter, that is handled in resync_timer_fn() */
1749 if (ns.conn == C_SYNC_TARGET)
1750 mod_timer(&mdev->resync_timer, jiffies);
1751
1752 drbd_md_sync(mdev);
1753 }
1754 put_ldev(mdev);
1755 mutex_unlock(mdev->state_mutex);
1756 }
1757
1758 /* If the resource already closed the current epoch, but we did not
1759 * (because we have not yet seen new requests), we should send the
1760 * corresponding barrier now. Must be checked within the same spinlock
1761 * that is used to check for new requests. */
1762 bool need_to_send_barrier(struct drbd_tconn *connection)
1763 {
1764 if (!connection->send.seen_any_write_yet)
1765 return false;
1766
1767 /* Skip barriers that do not contain any writes.
1768 * This may happen during AHEAD mode. */
1769 if (!connection->send.current_epoch_writes)
1770 return false;
1771
1772 /* ->req_lock is held when requests are queued on
1773 * connection->sender_work, and put into ->transfer_log.
1774 * It is also held when ->current_tle_nr is increased.
1775 * So either there are already new requests queued,
1776 * and corresponding barriers will be send there.
1777 * Or nothing new is queued yet, so the difference will be 1.
1778 */
1779 if (atomic_read(&connection->current_tle_nr) !=
1780 connection->send.current_epoch_nr + 1)
1781 return false;
1782
1783 return true;
1784 }
1785
1786 bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
1787 {
1788 spin_lock_irq(&queue->q_lock);
1789 list_splice_init(&queue->q, work_list);
1790 spin_unlock_irq(&queue->q_lock);
1791 return !list_empty(work_list);
1792 }
1793
1794 bool dequeue_work_item(struct drbd_work_queue *queue, struct list_head *work_list)
1795 {
1796 spin_lock_irq(&queue->q_lock);
1797 if (!list_empty(&queue->q))
1798 list_move(queue->q.next, work_list);
1799 spin_unlock_irq(&queue->q_lock);
1800 return !list_empty(work_list);
1801 }
1802
1803 void wait_for_work(struct drbd_tconn *connection, struct list_head *work_list)
1804 {
1805 DEFINE_WAIT(wait);
1806 struct net_conf *nc;
1807 int uncork, cork;
1808
1809 dequeue_work_item(&connection->sender_work, work_list);
1810 if (!list_empty(work_list))
1811 return;
1812
1813 /* Still nothing to do?
1814 * Maybe we still need to close the current epoch,
1815 * even if no new requests are queued yet.
1816 *
1817 * Also, poke TCP, just in case.
1818 * Then wait for new work (or signal). */
1819 rcu_read_lock();
1820 nc = rcu_dereference(connection->net_conf);
1821 uncork = nc ? nc->tcp_cork : 0;
1822 rcu_read_unlock();
1823 if (uncork) {
1824 mutex_lock(&connection->data.mutex);
1825 if (connection->data.socket)
1826 drbd_tcp_uncork(connection->data.socket);
1827 mutex_unlock(&connection->data.mutex);
1828 }
1829
1830 for (;;) {
1831 int send_barrier;
1832 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
1833 spin_lock_irq(&connection->req_lock);
1834 spin_lock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
1835 /* dequeue single item only,
1836 * we still use drbd_queue_work_front() in some places */
1837 if (!list_empty(&connection->sender_work.q))
1838 list_move(connection->sender_work.q.next, work_list);
1839 spin_unlock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
1840 if (!list_empty(work_list) || signal_pending(current)) {
1841 spin_unlock_irq(&connection->req_lock);
1842 break;
1843 }
1844 send_barrier = need_to_send_barrier(connection);
1845 spin_unlock_irq(&connection->req_lock);
1846 if (send_barrier) {
1847 drbd_send_barrier(connection);
1848 connection->send.current_epoch_nr++;
1849 }
1850 schedule();
1851 /* may be woken up for other things but new work, too,
1852 * e.g. if the current epoch got closed.
1853 * In which case we send the barrier above. */
1854 }
1855 finish_wait(&connection->sender_work.q_wait, &wait);
1856
1857 /* someone may have changed the config while we have been waiting above. */
1858 rcu_read_lock();
1859 nc = rcu_dereference(connection->net_conf);
1860 cork = nc ? nc->tcp_cork : 0;
1861 rcu_read_unlock();
1862 mutex_lock(&connection->data.mutex);
1863 if (connection->data.socket) {
1864 if (cork)
1865 drbd_tcp_cork(connection->data.socket);
1866 else if (!uncork)
1867 drbd_tcp_uncork(connection->data.socket);
1868 }
1869 mutex_unlock(&connection->data.mutex);
1870 }
1871
1872 int drbd_worker(struct drbd_thread *thi)
1873 {
1874 struct drbd_tconn *tconn = thi->tconn;
1875 struct drbd_work *w = NULL;
1876 struct drbd_conf *mdev;
1877 LIST_HEAD(work_list);
1878 int vnr;
1879
1880 while (get_t_state(thi) == RUNNING) {
1881 drbd_thread_current_set_cpu(thi);
1882
1883 /* as long as we use drbd_queue_work_front(),
1884 * we may only dequeue single work items here, not batches. */
1885 if (list_empty(&work_list))
1886 wait_for_work(tconn, &work_list);
1887
1888 if (signal_pending(current)) {
1889 flush_signals(current);
1890 if (get_t_state(thi) == RUNNING) {
1891 conn_warn(tconn, "Worker got an unexpected signal\n");
1892 continue;
1893 }
1894 break;
1895 }
1896
1897 if (get_t_state(thi) != RUNNING)
1898 break;
1899
1900 while (!list_empty(&work_list)) {
1901 w = list_first_entry(&work_list, struct drbd_work, list);
1902 list_del_init(&w->list);
1903 if (w->cb(w, tconn->cstate < C_WF_REPORT_PARAMS) == 0)
1904 continue;
1905 if (tconn->cstate >= C_WF_REPORT_PARAMS)
1906 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
1907 }
1908 }
1909
1910 do {
1911 while (!list_empty(&work_list)) {
1912 w = list_first_entry(&work_list, struct drbd_work, list);
1913 list_del_init(&w->list);
1914 w->cb(w, 1);
1915 }
1916 dequeue_work_batch(&tconn->sender_work, &work_list);
1917 } while (!list_empty(&work_list));
1918
1919 rcu_read_lock();
1920 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1921 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1922 kref_get(&mdev->kref);
1923 rcu_read_unlock();
1924 drbd_mdev_cleanup(mdev);
1925 kref_put(&mdev->kref, &drbd_minor_destroy);
1926 rcu_read_lock();
1927 }
1928 rcu_read_unlock();
1929
1930 return 0;
1931 }
This page took 0.074822 seconds and 6 git commands to generate.