rbd: preserve snapc->seq in rbd_header_set_snap()
[deliverable/linux.git] / drivers / block / rbd.c
CommitLineData
602adf40
YS
1/*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
dfc5606d 24 For usage instructions, please refer to:
602adf40 25
dfc5606d 26 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
59c2be1e 34#include <linux/parser.h>
602adf40
YS
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
593a9e7b
AE
44/*
45 * The basic unit of block I/O is a sector. It is interpreted in a
46 * number of contexts in Linux (blk, bio, genhd), but the default is
47 * universally 512 bytes. These symbols are just slightly more
48 * meaningful than the bare numbers they represent.
49 */
50#define SECTOR_SHIFT 9
51#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
52
f0f8cef5
AE
53#define RBD_DRV_NAME "rbd"
54#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
55
56#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
57
602adf40
YS
58#define RBD_MAX_SNAP_NAME_LEN 32
59#define RBD_MAX_OPT_LEN 1024
60
61#define RBD_SNAP_HEAD_NAME "-"
62
81a89793
AE
63/*
64 * An RBD device name will be "rbd#", where the "rbd" comes from
65 * RBD_DRV_NAME above, and # is a unique integer identifier.
66 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67 * enough to hold all possible device names.
68 */
602adf40 69#define DEV_NAME_LEN 32
81a89793 70#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40 71
59c2be1e
YS
72#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
73
602adf40
YS
74/*
75 * block device image metadata (in-memory version)
76 */
77struct rbd_image_header {
78 u64 image_size;
849b4260 79 char *object_prefix;
602adf40
YS
80 __u8 obj_order;
81 __u8 crypt_type;
82 __u8 comp_type;
602adf40
YS
83 struct ceph_snap_context *snapc;
84 size_t snap_names_len;
85 u64 snap_seq;
86 u32 total_snaps;
87
88 char *snap_names;
89 u64 *snap_sizes;
59c2be1e
YS
90
91 u64 obj_version;
92};
93
94struct rbd_options {
95 int notify_timeout;
602adf40
YS
96};
97
98/*
f0f8cef5 99 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
100 */
101struct rbd_client {
102 struct ceph_client *client;
59c2be1e 103 struct rbd_options *rbd_opts;
602adf40
YS
104 struct kref kref;
105 struct list_head node;
106};
107
108/*
f0f8cef5 109 * a request completion status
602adf40 110 */
1fec7093
YS
111struct rbd_req_status {
112 int done;
113 int rc;
114 u64 bytes;
115};
116
117/*
118 * a collection of requests
119 */
120struct rbd_req_coll {
121 int total;
122 int num_done;
123 struct kref kref;
124 struct rbd_req_status status[0];
602adf40
YS
125};
126
f0f8cef5
AE
127/*
128 * a single io request
129 */
130struct rbd_request {
131 struct request *rq; /* blk layer request */
132 struct bio *bio; /* cloned bio */
133 struct page **pages; /* list of used pages */
134 u64 len;
135 int coll_index;
136 struct rbd_req_coll *coll;
137};
138
dfc5606d
YS
139struct rbd_snap {
140 struct device dev;
141 const char *name;
3591538f 142 u64 size;
dfc5606d
YS
143 struct list_head node;
144 u64 id;
145};
146
602adf40
YS
147/*
148 * a single device
149 */
150struct rbd_device {
151 int id; /* blkdev unique id */
152
153 int major; /* blkdev assigned major */
154 struct gendisk *disk; /* blkdev's gendisk and rq */
155 struct request_queue *q;
156
602adf40
YS
157 struct rbd_client *rbd_client;
158
159 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
160
161 spinlock_t lock; /* queue lock */
162
163 struct rbd_image_header header;
0bed54dc
AE
164 char *image_name;
165 size_t image_name_len;
166 char *header_name;
d22f76e7 167 char *pool_name;
9bb2f334 168 int pool_id;
602adf40 169
59c2be1e
YS
170 struct ceph_osd_event *watch_event;
171 struct ceph_osd_request *watch_request;
172
c666601a
JD
173 /* protects updating the header */
174 struct rw_semaphore header_rwsem;
e88a36ec 175 /* name of the snapshot this device reads from */
820a5f3e 176 char *snap_name;
e88a36ec 177 /* id of the snapshot this device reads from */
77dfe99f 178 u64 snap_id; /* current snapshot id */
e88a36ec
JD
179 /* whether the snap_id this device reads from still exists */
180 bool snap_exists;
181 int read_only;
602adf40
YS
182
183 struct list_head node;
dfc5606d
YS
184
185 /* list of snapshots */
186 struct list_head snaps;
187
188 /* sysfs related */
189 struct device dev;
190};
191
602adf40 192static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 193
602adf40 194static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
195static DEFINE_SPINLOCK(rbd_dev_list_lock);
196
432b8587
AE
197static LIST_HEAD(rbd_client_list); /* clients */
198static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 199
dfc5606d
YS
200static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
201static void rbd_dev_release(struct device *dev);
dfc5606d
YS
202static ssize_t rbd_snap_add(struct device *dev,
203 struct device_attribute *attr,
204 const char *buf,
205 size_t count);
206static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
69932487 207 struct rbd_snap *snap);
dfc5606d 208
f0f8cef5
AE
209static ssize_t rbd_add(struct bus_type *bus, const char *buf,
210 size_t count);
211static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
212 size_t count);
213
214static struct bus_attribute rbd_bus_attrs[] = {
215 __ATTR(add, S_IWUSR, NULL, rbd_add),
216 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
217 __ATTR_NULL
218};
219
220static struct bus_type rbd_bus_type = {
221 .name = "rbd",
222 .bus_attrs = rbd_bus_attrs,
223};
224
225static void rbd_root_dev_release(struct device *dev)
226{
227}
228
229static struct device rbd_root_dev = {
230 .init_name = "rbd",
231 .release = rbd_root_dev_release,
232};
233
dfc5606d 234
dfc5606d
YS
235static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
236{
237 return get_device(&rbd_dev->dev);
238}
239
240static void rbd_put_dev(struct rbd_device *rbd_dev)
241{
242 put_device(&rbd_dev->dev);
243}
602adf40 244
263c6ca0 245static int __rbd_refresh_header(struct rbd_device *rbd_dev);
59c2be1e 246
602adf40
YS
247static int rbd_open(struct block_device *bdev, fmode_t mode)
248{
f0f8cef5 249 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
602adf40 250
dfc5606d
YS
251 rbd_get_dev(rbd_dev);
252
602adf40
YS
253 set_device_ro(bdev, rbd_dev->read_only);
254
255 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
256 return -EROFS;
257
258 return 0;
259}
260
dfc5606d
YS
261static int rbd_release(struct gendisk *disk, fmode_t mode)
262{
263 struct rbd_device *rbd_dev = disk->private_data;
264
265 rbd_put_dev(rbd_dev);
266
267 return 0;
268}
269
602adf40
YS
270static const struct block_device_operations rbd_bd_ops = {
271 .owner = THIS_MODULE,
272 .open = rbd_open,
dfc5606d 273 .release = rbd_release,
602adf40
YS
274};
275
276/*
277 * Initialize an rbd client instance.
43ae4701 278 * We own *ceph_opts.
602adf40 279 */
43ae4701 280static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
59c2be1e 281 struct rbd_options *rbd_opts)
602adf40
YS
282{
283 struct rbd_client *rbdc;
284 int ret = -ENOMEM;
285
286 dout("rbd_client_create\n");
287 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
288 if (!rbdc)
289 goto out_opt;
290
291 kref_init(&rbdc->kref);
292 INIT_LIST_HEAD(&rbdc->node);
293
bc534d86
AE
294 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
295
43ae4701 296 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 297 if (IS_ERR(rbdc->client))
bc534d86 298 goto out_mutex;
43ae4701 299 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
300
301 ret = ceph_open_session(rbdc->client);
302 if (ret < 0)
303 goto out_err;
304
59c2be1e
YS
305 rbdc->rbd_opts = rbd_opts;
306
432b8587 307 spin_lock(&rbd_client_list_lock);
602adf40 308 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 309 spin_unlock(&rbd_client_list_lock);
602adf40 310
bc534d86
AE
311 mutex_unlock(&ctl_mutex);
312
602adf40
YS
313 dout("rbd_client_create created %p\n", rbdc);
314 return rbdc;
315
316out_err:
317 ceph_destroy_client(rbdc->client);
bc534d86
AE
318out_mutex:
319 mutex_unlock(&ctl_mutex);
602adf40
YS
320 kfree(rbdc);
321out_opt:
43ae4701
AE
322 if (ceph_opts)
323 ceph_destroy_options(ceph_opts);
28f259b7 324 return ERR_PTR(ret);
602adf40
YS
325}
326
327/*
328 * Find a ceph client with specific addr and configuration.
329 */
43ae4701 330static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
331{
332 struct rbd_client *client_node;
333
43ae4701 334 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
335 return NULL;
336
337 list_for_each_entry(client_node, &rbd_client_list, node)
43ae4701 338 if (!ceph_compare_options(ceph_opts, client_node->client))
602adf40
YS
339 return client_node;
340 return NULL;
341}
342
59c2be1e
YS
343/*
344 * mount options
345 */
346enum {
347 Opt_notify_timeout,
348 Opt_last_int,
349 /* int args above */
350 Opt_last_string,
351 /* string args above */
352};
353
43ae4701 354static match_table_t rbd_opts_tokens = {
59c2be1e
YS
355 {Opt_notify_timeout, "notify_timeout=%d"},
356 /* int args above */
357 /* string args above */
358 {-1, NULL}
359};
360
361static int parse_rbd_opts_token(char *c, void *private)
362{
43ae4701 363 struct rbd_options *rbd_opts = private;
59c2be1e
YS
364 substring_t argstr[MAX_OPT_ARGS];
365 int token, intval, ret;
366
43ae4701 367 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
368 if (token < 0)
369 return -EINVAL;
370
371 if (token < Opt_last_int) {
372 ret = match_int(&argstr[0], &intval);
373 if (ret < 0) {
374 pr_err("bad mount option arg (not int) "
375 "at '%s'\n", c);
376 return ret;
377 }
378 dout("got int token %d val %d\n", token, intval);
379 } else if (token > Opt_last_int && token < Opt_last_string) {
380 dout("got string token %d val %s\n", token,
381 argstr[0].from);
382 } else {
383 dout("got token %d\n", token);
384 }
385
386 switch (token) {
387 case Opt_notify_timeout:
43ae4701 388 rbd_opts->notify_timeout = intval;
59c2be1e
YS
389 break;
390 default:
391 BUG_ON(token);
392 }
393 return 0;
394}
395
602adf40
YS
396/*
397 * Get a ceph client with specific addr and configuration, if one does
398 * not exist create it.
399 */
5214ecc4
AE
400static struct rbd_client *rbd_get_client(const char *mon_addr,
401 size_t mon_addr_len,
402 char *options)
602adf40
YS
403{
404 struct rbd_client *rbdc;
43ae4701 405 struct ceph_options *ceph_opts;
59c2be1e
YS
406 struct rbd_options *rbd_opts;
407
408 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
409 if (!rbd_opts)
d720bcb0 410 return ERR_PTR(-ENOMEM);
59c2be1e
YS
411
412 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
602adf40 413
43ae4701
AE
414 ceph_opts = ceph_parse_options(options, mon_addr,
415 mon_addr + mon_addr_len,
416 parse_rbd_opts_token, rbd_opts);
417 if (IS_ERR(ceph_opts)) {
d720bcb0 418 kfree(rbd_opts);
43ae4701 419 return ERR_CAST(ceph_opts);
ee57741c 420 }
602adf40 421
432b8587 422 spin_lock(&rbd_client_list_lock);
43ae4701 423 rbdc = __rbd_client_find(ceph_opts);
602adf40 424 if (rbdc) {
602adf40
YS
425 /* using an existing client */
426 kref_get(&rbdc->kref);
432b8587 427 spin_unlock(&rbd_client_list_lock);
e6994d3d 428
43ae4701 429 ceph_destroy_options(ceph_opts);
e6994d3d
AE
430 kfree(rbd_opts);
431
d720bcb0 432 return rbdc;
602adf40 433 }
432b8587 434 spin_unlock(&rbd_client_list_lock);
602adf40 435
43ae4701 436 rbdc = rbd_client_create(ceph_opts, rbd_opts);
d97081b0 437
d720bcb0
AE
438 if (IS_ERR(rbdc))
439 kfree(rbd_opts);
602adf40 440
d720bcb0 441 return rbdc;
602adf40
YS
442}
443
444/*
445 * Destroy ceph client
d23a4b3f 446 *
432b8587 447 * Caller must hold rbd_client_list_lock.
602adf40
YS
448 */
449static void rbd_client_release(struct kref *kref)
450{
451 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
452
453 dout("rbd_release_client %p\n", rbdc);
cd9d9f5d 454 spin_lock(&rbd_client_list_lock);
602adf40 455 list_del(&rbdc->node);
cd9d9f5d 456 spin_unlock(&rbd_client_list_lock);
602adf40
YS
457
458 ceph_destroy_client(rbdc->client);
59c2be1e 459 kfree(rbdc->rbd_opts);
602adf40
YS
460 kfree(rbdc);
461}
462
463/*
464 * Drop reference to ceph client node. If it's not referenced anymore, release
465 * it.
466 */
467static void rbd_put_client(struct rbd_device *rbd_dev)
468{
469 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
470 rbd_dev->rbd_client = NULL;
602adf40
YS
471}
472
1fec7093
YS
473/*
474 * Destroy requests collection
475 */
476static void rbd_coll_release(struct kref *kref)
477{
478 struct rbd_req_coll *coll =
479 container_of(kref, struct rbd_req_coll, kref);
480
481 dout("rbd_coll_release %p\n", coll);
482 kfree(coll);
483}
602adf40
YS
484
485/*
486 * Create a new header structure, translate header format from the on-disk
487 * header.
488 */
489static int rbd_header_from_disk(struct rbd_image_header *header,
490 struct rbd_image_header_ondisk *ondisk,
50f7c4c9 491 u32 allocated_snaps,
602adf40
YS
492 gfp_t gfp_flags)
493{
50f7c4c9 494 u32 i, snap_count;
602adf40 495
21079786 496 if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
81e759fb 497 return -ENXIO;
81e759fb 498
00f1f36f 499 snap_count = le32_to_cpu(ondisk->snap_count);
50f7c4c9
XW
500 if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context))
501 / sizeof (*ondisk))
502 return -EINVAL;
602adf40 503 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
f9f9a190 504 snap_count * sizeof(u64),
602adf40
YS
505 gfp_flags);
506 if (!header->snapc)
507 return -ENOMEM;
00f1f36f 508
00f1f36f 509 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
602adf40
YS
510 if (snap_count) {
511 header->snap_names = kmalloc(header->snap_names_len,
f8ad495a 512 gfp_flags);
602adf40
YS
513 if (!header->snap_names)
514 goto err_snapc;
515 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
f8ad495a 516 gfp_flags);
602adf40
YS
517 if (!header->snap_sizes)
518 goto err_names;
519 } else {
520 header->snap_names = NULL;
521 header->snap_sizes = NULL;
522 }
849b4260
AE
523
524 header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
525 gfp_flags);
526 if (!header->object_prefix)
527 goto err_sizes;
528
ca1e49a6 529 memcpy(header->object_prefix, ondisk->block_name,
602adf40 530 sizeof(ondisk->block_name));
849b4260 531 header->object_prefix[sizeof (ondisk->block_name)] = '\0';
602adf40
YS
532
533 header->image_size = le64_to_cpu(ondisk->image_size);
534 header->obj_order = ondisk->options.order;
535 header->crypt_type = ondisk->options.crypt_type;
536 header->comp_type = ondisk->options.comp_type;
537
538 atomic_set(&header->snapc->nref, 1);
539 header->snap_seq = le64_to_cpu(ondisk->snap_seq);
540 header->snapc->num_snaps = snap_count;
541 header->total_snaps = snap_count;
542
21079786 543 if (snap_count && allocated_snaps == snap_count) {
602adf40
YS
544 for (i = 0; i < snap_count; i++) {
545 header->snapc->snaps[i] =
546 le64_to_cpu(ondisk->snaps[i].id);
547 header->snap_sizes[i] =
548 le64_to_cpu(ondisk->snaps[i].image_size);
549 }
550
551 /* copy snapshot names */
552 memcpy(header->snap_names, &ondisk->snaps[i],
553 header->snap_names_len);
554 }
555
556 return 0;
557
849b4260
AE
558err_sizes:
559 kfree(header->snap_sizes);
602adf40
YS
560err_names:
561 kfree(header->snap_names);
562err_snapc:
563 kfree(header->snapc);
00f1f36f 564 return -ENOMEM;
602adf40
YS
565}
566
602adf40
YS
567static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
568 u64 *seq, u64 *size)
569{
570 int i;
571 char *p = header->snap_names;
572
00f1f36f
AE
573 for (i = 0; i < header->total_snaps; i++) {
574 if (!strcmp(snap_name, p)) {
602adf40 575
00f1f36f 576 /* Found it. Pass back its id and/or size */
602adf40 577
00f1f36f
AE
578 if (seq)
579 *seq = header->snapc->snaps[i];
580 if (size)
581 *size = header->snap_sizes[i];
582 return i;
583 }
584 p += strlen(p) + 1; /* Skip ahead to the next name */
585 }
586 return -ENOENT;
602adf40
YS
587}
588
0ce1a794 589static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
602adf40 590{
78dc447d 591 int ret;
602adf40 592
0ce1a794 593 down_write(&rbd_dev->header_rwsem);
602adf40 594
0ce1a794 595 if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
cc9d734c 596 sizeof (RBD_SNAP_HEAD_NAME))) {
0ce1a794 597 rbd_dev->snap_id = CEPH_NOSNAP;
e88a36ec 598 rbd_dev->snap_exists = false;
0ce1a794 599 rbd_dev->read_only = 0;
602adf40 600 if (size)
78dc447d 601 *size = rbd_dev->header.image_size;
602adf40 602 } else {
78dc447d
AE
603 u64 snap_id = 0;
604
605 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
606 &snap_id, size);
602adf40
YS
607 if (ret < 0)
608 goto done;
78dc447d 609 rbd_dev->snap_id = snap_id;
e88a36ec 610 rbd_dev->snap_exists = true;
0ce1a794 611 rbd_dev->read_only = 1;
602adf40
YS
612 }
613
614 ret = 0;
615done:
0ce1a794 616 up_write(&rbd_dev->header_rwsem);
602adf40
YS
617 return ret;
618}
619
620static void rbd_header_free(struct rbd_image_header *header)
621{
849b4260 622 kfree(header->object_prefix);
602adf40 623 kfree(header->snap_sizes);
849b4260 624 kfree(header->snap_names);
d1d25646 625 ceph_put_snap_context(header->snapc);
602adf40
YS
626}
627
628/*
629 * get the actual striped segment name, offset and length
630 */
631static u64 rbd_get_segment(struct rbd_image_header *header,
ca1e49a6 632 const char *object_prefix,
602adf40
YS
633 u64 ofs, u64 len,
634 char *seg_name, u64 *segofs)
635{
636 u64 seg = ofs >> header->obj_order;
637
638 if (seg_name)
639 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
ca1e49a6 640 "%s.%012llx", object_prefix, seg);
602adf40
YS
641
642 ofs = ofs & ((1 << header->obj_order) - 1);
643 len = min_t(u64, len, (1 << header->obj_order) - ofs);
644
645 if (segofs)
646 *segofs = ofs;
647
648 return len;
649}
650
1fec7093
YS
651static int rbd_get_num_segments(struct rbd_image_header *header,
652 u64 ofs, u64 len)
653{
654 u64 start_seg = ofs >> header->obj_order;
655 u64 end_seg = (ofs + len - 1) >> header->obj_order;
656 return end_seg - start_seg + 1;
657}
658
029bcbd8
JD
659/*
660 * returns the size of an object in the image
661 */
662static u64 rbd_obj_bytes(struct rbd_image_header *header)
663{
664 return 1 << header->obj_order;
665}
666
602adf40
YS
667/*
668 * bio helpers
669 */
670
671static void bio_chain_put(struct bio *chain)
672{
673 struct bio *tmp;
674
675 while (chain) {
676 tmp = chain;
677 chain = chain->bi_next;
678 bio_put(tmp);
679 }
680}
681
682/*
683 * zeros a bio chain, starting at specific offset
684 */
685static void zero_bio_chain(struct bio *chain, int start_ofs)
686{
687 struct bio_vec *bv;
688 unsigned long flags;
689 void *buf;
690 int i;
691 int pos = 0;
692
693 while (chain) {
694 bio_for_each_segment(bv, chain, i) {
695 if (pos + bv->bv_len > start_ofs) {
696 int remainder = max(start_ofs - pos, 0);
697 buf = bvec_kmap_irq(bv, &flags);
698 memset(buf + remainder, 0,
699 bv->bv_len - remainder);
85b5aaa6 700 bvec_kunmap_irq(buf, &flags);
602adf40
YS
701 }
702 pos += bv->bv_len;
703 }
704
705 chain = chain->bi_next;
706 }
707}
708
709/*
710 * bio_chain_clone - clone a chain of bios up to a certain length.
711 * might return a bio_pair that will need to be released.
712 */
713static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
714 struct bio_pair **bp,
715 int len, gfp_t gfpmask)
716{
717 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
718 int total = 0;
719
720 if (*bp) {
721 bio_pair_release(*bp);
722 *bp = NULL;
723 }
724
725 while (old_chain && (total < len)) {
726 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
727 if (!tmp)
728 goto err_out;
729
730 if (total + old_chain->bi_size > len) {
731 struct bio_pair *bp;
732
733 /*
734 * this split can only happen with a single paged bio,
735 * split_bio will BUG_ON if this is not the case
736 */
737 dout("bio_chain_clone split! total=%d remaining=%d"
738 "bi_size=%d\n",
739 (int)total, (int)len-total,
740 (int)old_chain->bi_size);
741
742 /* split the bio. We'll release it either in the next
743 call, or it will have to be released outside */
593a9e7b 744 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
602adf40
YS
745 if (!bp)
746 goto err_out;
747
748 __bio_clone(tmp, &bp->bio1);
749
750 *next = &bp->bio2;
751 } else {
752 __bio_clone(tmp, old_chain);
753 *next = old_chain->bi_next;
754 }
755
756 tmp->bi_bdev = NULL;
757 gfpmask &= ~__GFP_WAIT;
758 tmp->bi_next = NULL;
759
760 if (!new_chain) {
761 new_chain = tail = tmp;
762 } else {
763 tail->bi_next = tmp;
764 tail = tmp;
765 }
766 old_chain = old_chain->bi_next;
767
768 total += tmp->bi_size;
769 }
770
771 BUG_ON(total < len);
772
773 if (tail)
774 tail->bi_next = NULL;
775
776 *old = old_chain;
777
778 return new_chain;
779
780err_out:
781 dout("bio_chain_clone with err\n");
782 bio_chain_put(new_chain);
783 return NULL;
784}
785
786/*
787 * helpers for osd request op vectors.
788 */
789static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
790 int num_ops,
791 int opcode,
792 u32 payload_len)
793{
794 *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
795 GFP_NOIO);
796 if (!*ops)
797 return -ENOMEM;
798 (*ops)[0].op = opcode;
799 /*
800 * op extent offset and length will be set later on
801 * in calc_raw_layout()
802 */
803 (*ops)[0].payload_len = payload_len;
804 return 0;
805}
806
807static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
808{
809 kfree(ops);
810}
811
1fec7093
YS
812static void rbd_coll_end_req_index(struct request *rq,
813 struct rbd_req_coll *coll,
814 int index,
815 int ret, u64 len)
816{
817 struct request_queue *q;
818 int min, max, i;
819
820 dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
821 coll, index, ret, len);
822
823 if (!rq)
824 return;
825
826 if (!coll) {
827 blk_end_request(rq, ret, len);
828 return;
829 }
830
831 q = rq->q;
832
833 spin_lock_irq(q->queue_lock);
834 coll->status[index].done = 1;
835 coll->status[index].rc = ret;
836 coll->status[index].bytes = len;
837 max = min = coll->num_done;
838 while (max < coll->total && coll->status[max].done)
839 max++;
840
841 for (i = min; i<max; i++) {
842 __blk_end_request(rq, coll->status[i].rc,
843 coll->status[i].bytes);
844 coll->num_done++;
845 kref_put(&coll->kref, rbd_coll_release);
846 }
847 spin_unlock_irq(q->queue_lock);
848}
849
850static void rbd_coll_end_req(struct rbd_request *req,
851 int ret, u64 len)
852{
853 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
854}
855
602adf40
YS
856/*
857 * Send ceph osd request
858 */
859static int rbd_do_request(struct request *rq,
0ce1a794 860 struct rbd_device *rbd_dev,
602adf40
YS
861 struct ceph_snap_context *snapc,
862 u64 snapid,
aded07ea 863 const char *object_name, u64 ofs, u64 len,
602adf40
YS
864 struct bio *bio,
865 struct page **pages,
866 int num_pages,
867 int flags,
868 struct ceph_osd_req_op *ops,
1fec7093
YS
869 struct rbd_req_coll *coll,
870 int coll_index,
602adf40 871 void (*rbd_cb)(struct ceph_osd_request *req,
59c2be1e
YS
872 struct ceph_msg *msg),
873 struct ceph_osd_request **linger_req,
874 u64 *ver)
602adf40
YS
875{
876 struct ceph_osd_request *req;
877 struct ceph_file_layout *layout;
878 int ret;
879 u64 bno;
880 struct timespec mtime = CURRENT_TIME;
881 struct rbd_request *req_data;
882 struct ceph_osd_request_head *reqhead;
1dbb4399 883 struct ceph_osd_client *osdc;
602adf40 884
602adf40 885 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1fec7093
YS
886 if (!req_data) {
887 if (coll)
888 rbd_coll_end_req_index(rq, coll, coll_index,
889 -ENOMEM, len);
890 return -ENOMEM;
891 }
892
893 if (coll) {
894 req_data->coll = coll;
895 req_data->coll_index = coll_index;
896 }
602adf40 897
aded07ea
AE
898 dout("rbd_do_request object_name=%s ofs=%lld len=%lld\n",
899 object_name, len, ofs);
602adf40 900
0ce1a794 901 osdc = &rbd_dev->rbd_client->client->osdc;
1dbb4399
AE
902 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
903 false, GFP_NOIO, pages, bio);
4ad12621 904 if (!req) {
4ad12621 905 ret = -ENOMEM;
602adf40
YS
906 goto done_pages;
907 }
908
909 req->r_callback = rbd_cb;
910
911 req_data->rq = rq;
912 req_data->bio = bio;
913 req_data->pages = pages;
914 req_data->len = len;
915
916 req->r_priv = req_data;
917
918 reqhead = req->r_request->front.iov_base;
919 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
920
aded07ea 921 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
602adf40
YS
922 req->r_oid_len = strlen(req->r_oid);
923
924 layout = &req->r_file_layout;
925 memset(layout, 0, sizeof(*layout));
926 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
927 layout->fl_stripe_count = cpu_to_le32(1);
928 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
0ce1a794 929 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
1dbb4399
AE
930 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
931 req, ops);
602adf40
YS
932
933 ceph_osdc_build_request(req, ofs, &len,
934 ops,
935 snapc,
936 &mtime,
937 req->r_oid, req->r_oid_len);
602adf40 938
59c2be1e 939 if (linger_req) {
1dbb4399 940 ceph_osdc_set_request_linger(osdc, req);
59c2be1e
YS
941 *linger_req = req;
942 }
943
1dbb4399 944 ret = ceph_osdc_start_request(osdc, req, false);
602adf40
YS
945 if (ret < 0)
946 goto done_err;
947
948 if (!rbd_cb) {
1dbb4399 949 ret = ceph_osdc_wait_request(osdc, req);
59c2be1e
YS
950 if (ver)
951 *ver = le64_to_cpu(req->r_reassert_version.version);
1fec7093
YS
952 dout("reassert_ver=%lld\n",
953 le64_to_cpu(req->r_reassert_version.version));
602adf40
YS
954 ceph_osdc_put_request(req);
955 }
956 return ret;
957
958done_err:
959 bio_chain_put(req_data->bio);
960 ceph_osdc_put_request(req);
961done_pages:
1fec7093 962 rbd_coll_end_req(req_data, ret, len);
602adf40 963 kfree(req_data);
602adf40
YS
964 return ret;
965}
966
967/*
968 * Ceph osd op callback
969 */
970static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
971{
972 struct rbd_request *req_data = req->r_priv;
973 struct ceph_osd_reply_head *replyhead;
974 struct ceph_osd_op *op;
975 __s32 rc;
976 u64 bytes;
977 int read_op;
978
979 /* parse reply */
980 replyhead = msg->front.iov_base;
981 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
982 op = (void *)(replyhead + 1);
983 rc = le32_to_cpu(replyhead->result);
984 bytes = le64_to_cpu(op->extent.length);
895cfcc8 985 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
602adf40
YS
986
987 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
988
989 if (rc == -ENOENT && read_op) {
990 zero_bio_chain(req_data->bio, 0);
991 rc = 0;
992 } else if (rc == 0 && read_op && bytes < req_data->len) {
993 zero_bio_chain(req_data->bio, bytes);
994 bytes = req_data->len;
995 }
996
1fec7093 997 rbd_coll_end_req(req_data, rc, bytes);
602adf40
YS
998
999 if (req_data->bio)
1000 bio_chain_put(req_data->bio);
1001
1002 ceph_osdc_put_request(req);
1003 kfree(req_data);
1004}
1005
59c2be1e
YS
1006static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1007{
1008 ceph_osdc_put_request(req);
1009}
1010
602adf40
YS
1011/*
1012 * Do a synchronous ceph osd operation
1013 */
0ce1a794 1014static int rbd_req_sync_op(struct rbd_device *rbd_dev,
602adf40
YS
1015 struct ceph_snap_context *snapc,
1016 u64 snapid,
1017 int opcode,
1018 int flags,
1019 struct ceph_osd_req_op *orig_ops,
aded07ea 1020 const char *object_name,
602adf40 1021 u64 ofs, u64 len,
59c2be1e
YS
1022 char *buf,
1023 struct ceph_osd_request **linger_req,
1024 u64 *ver)
602adf40
YS
1025{
1026 int ret;
1027 struct page **pages;
1028 int num_pages;
1029 struct ceph_osd_req_op *ops = orig_ops;
1030 u32 payload_len;
1031
1032 num_pages = calc_pages_for(ofs , len);
1033 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
b8d0638a
DC
1034 if (IS_ERR(pages))
1035 return PTR_ERR(pages);
602adf40
YS
1036
1037 if (!orig_ops) {
1038 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1039 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1040 if (ret < 0)
1041 goto done;
1042
1043 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1044 ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1045 if (ret < 0)
1046 goto done_ops;
1047 }
1048 }
1049
0ce1a794 1050 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
aded07ea 1051 object_name, ofs, len, NULL,
602adf40
YS
1052 pages, num_pages,
1053 flags,
1054 ops,
1fec7093 1055 NULL, 0,
59c2be1e
YS
1056 NULL,
1057 linger_req, ver);
602adf40
YS
1058 if (ret < 0)
1059 goto done_ops;
1060
1061 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1062 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1063
1064done_ops:
1065 if (!orig_ops)
1066 rbd_destroy_ops(ops);
1067done:
1068 ceph_release_page_vector(pages, num_pages);
1069 return ret;
1070}
1071
1072/*
1073 * Do an asynchronous ceph osd operation
1074 */
1075static int rbd_do_op(struct request *rq,
0ce1a794 1076 struct rbd_device *rbd_dev,
602adf40
YS
1077 struct ceph_snap_context *snapc,
1078 u64 snapid,
d1f57ea6 1079 int opcode, int flags,
602adf40 1080 u64 ofs, u64 len,
1fec7093
YS
1081 struct bio *bio,
1082 struct rbd_req_coll *coll,
1083 int coll_index)
602adf40
YS
1084{
1085 char *seg_name;
1086 u64 seg_ofs;
1087 u64 seg_len;
1088 int ret;
1089 struct ceph_osd_req_op *ops;
1090 u32 payload_len;
1091
1092 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1093 if (!seg_name)
1094 return -ENOMEM;
1095
1096 seg_len = rbd_get_segment(&rbd_dev->header,
ca1e49a6 1097 rbd_dev->header.object_prefix,
602adf40
YS
1098 ofs, len,
1099 seg_name, &seg_ofs);
602adf40
YS
1100
1101 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1102
1103 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1104 if (ret < 0)
1105 goto done;
1106
1107 /* we've taken care of segment sizes earlier when we
1108 cloned the bios. We should never have a segment
1109 truncated at this point */
1110 BUG_ON(seg_len < len);
1111
1112 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1113 seg_name, seg_ofs, seg_len,
1114 bio,
1115 NULL, 0,
1116 flags,
1117 ops,
1fec7093 1118 coll, coll_index,
59c2be1e 1119 rbd_req_cb, 0, NULL);
11f77002
SW
1120
1121 rbd_destroy_ops(ops);
602adf40
YS
1122done:
1123 kfree(seg_name);
1124 return ret;
1125}
1126
1127/*
1128 * Request async osd write
1129 */
1130static int rbd_req_write(struct request *rq,
1131 struct rbd_device *rbd_dev,
1132 struct ceph_snap_context *snapc,
1133 u64 ofs, u64 len,
1fec7093
YS
1134 struct bio *bio,
1135 struct rbd_req_coll *coll,
1136 int coll_index)
602adf40
YS
1137{
1138 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1139 CEPH_OSD_OP_WRITE,
1140 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1fec7093 1141 ofs, len, bio, coll, coll_index);
602adf40
YS
1142}
1143
1144/*
1145 * Request async osd read
1146 */
1147static int rbd_req_read(struct request *rq,
1148 struct rbd_device *rbd_dev,
1149 u64 snapid,
1150 u64 ofs, u64 len,
1fec7093
YS
1151 struct bio *bio,
1152 struct rbd_req_coll *coll,
1153 int coll_index)
602adf40
YS
1154{
1155 return rbd_do_op(rq, rbd_dev, NULL,
b06e6a6b 1156 snapid,
602adf40
YS
1157 CEPH_OSD_OP_READ,
1158 CEPH_OSD_FLAG_READ,
1fec7093 1159 ofs, len, bio, coll, coll_index);
602adf40
YS
1160}
1161
1162/*
1163 * Request sync osd read
1164 */
0ce1a794 1165static int rbd_req_sync_read(struct rbd_device *rbd_dev,
602adf40
YS
1166 struct ceph_snap_context *snapc,
1167 u64 snapid,
aded07ea 1168 const char *object_name,
602adf40 1169 u64 ofs, u64 len,
59c2be1e
YS
1170 char *buf,
1171 u64 *ver)
602adf40 1172{
0ce1a794 1173 return rbd_req_sync_op(rbd_dev, NULL,
b06e6a6b 1174 snapid,
602adf40
YS
1175 CEPH_OSD_OP_READ,
1176 CEPH_OSD_FLAG_READ,
1177 NULL,
d1f57ea6 1178 object_name, ofs, len, buf, NULL, ver);
602adf40
YS
1179}
1180
1181/*
59c2be1e
YS
1182 * Request sync osd watch
1183 */
0ce1a794 1184static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
59c2be1e
YS
1185 u64 ver,
1186 u64 notify_id,
aded07ea 1187 const char *object_name)
59c2be1e
YS
1188{
1189 struct ceph_osd_req_op *ops;
11f77002
SW
1190 int ret;
1191
1192 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
59c2be1e
YS
1193 if (ret < 0)
1194 return ret;
1195
a71b891b 1196 ops[0].watch.ver = cpu_to_le64(ver);
59c2be1e
YS
1197 ops[0].watch.cookie = notify_id;
1198 ops[0].watch.flag = 0;
1199
0ce1a794 1200 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
aded07ea 1201 object_name, 0, 0, NULL,
ad4f232f 1202 NULL, 0,
59c2be1e
YS
1203 CEPH_OSD_FLAG_READ,
1204 ops,
1fec7093 1205 NULL, 0,
59c2be1e
YS
1206 rbd_simple_req_cb, 0, NULL);
1207
1208 rbd_destroy_ops(ops);
1209 return ret;
1210}
1211
1212static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1213{
0ce1a794 1214 struct rbd_device *rbd_dev = (struct rbd_device *)data;
a71b891b 1215 u64 hver;
13143d2d
SW
1216 int rc;
1217
0ce1a794 1218 if (!rbd_dev)
59c2be1e
YS
1219 return;
1220
0bed54dc
AE
1221 dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n",
1222 rbd_dev->header_name, notify_id, (int) opcode);
59c2be1e 1223 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
0ce1a794 1224 rc = __rbd_refresh_header(rbd_dev);
a71b891b 1225 hver = rbd_dev->header.obj_version;
59c2be1e 1226 mutex_unlock(&ctl_mutex);
13143d2d 1227 if (rc)
f0f8cef5 1228 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
0ce1a794 1229 " update snaps: %d\n", rbd_dev->major, rc);
59c2be1e 1230
a71b891b 1231 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id, rbd_dev->header_name);
59c2be1e
YS
1232}
1233
1234/*
1235 * Request sync osd watch
1236 */
0ce1a794 1237static int rbd_req_sync_watch(struct rbd_device *rbd_dev,
aded07ea 1238 const char *object_name,
59c2be1e
YS
1239 u64 ver)
1240{
1241 struct ceph_osd_req_op *ops;
0ce1a794 1242 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
59c2be1e
YS
1243
1244 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1245 if (ret < 0)
1246 return ret;
1247
1248 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
0ce1a794 1249 (void *)rbd_dev, &rbd_dev->watch_event);
59c2be1e
YS
1250 if (ret < 0)
1251 goto fail;
1252
1253 ops[0].watch.ver = cpu_to_le64(ver);
0ce1a794 1254 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
59c2be1e
YS
1255 ops[0].watch.flag = 1;
1256
0ce1a794 1257 ret = rbd_req_sync_op(rbd_dev, NULL,
59c2be1e
YS
1258 CEPH_NOSNAP,
1259 0,
1260 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1261 ops,
d1f57ea6 1262 object_name, 0, 0, NULL,
0ce1a794 1263 &rbd_dev->watch_request, NULL);
59c2be1e
YS
1264
1265 if (ret < 0)
1266 goto fail_event;
1267
1268 rbd_destroy_ops(ops);
1269 return 0;
1270
1271fail_event:
0ce1a794
AE
1272 ceph_osdc_cancel_event(rbd_dev->watch_event);
1273 rbd_dev->watch_event = NULL;
59c2be1e
YS
1274fail:
1275 rbd_destroy_ops(ops);
1276 return ret;
1277}
1278
79e3057c
YS
1279/*
1280 * Request sync osd unwatch
1281 */
0ce1a794 1282static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev,
aded07ea 1283 const char *object_name)
79e3057c
YS
1284{
1285 struct ceph_osd_req_op *ops;
1286
1287 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1288 if (ret < 0)
1289 return ret;
1290
1291 ops[0].watch.ver = 0;
0ce1a794 1292 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
79e3057c
YS
1293 ops[0].watch.flag = 0;
1294
0ce1a794 1295 ret = rbd_req_sync_op(rbd_dev, NULL,
79e3057c
YS
1296 CEPH_NOSNAP,
1297 0,
1298 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1299 ops,
d1f57ea6 1300 object_name, 0, 0, NULL, NULL, NULL);
79e3057c
YS
1301
1302 rbd_destroy_ops(ops);
0ce1a794
AE
1303 ceph_osdc_cancel_event(rbd_dev->watch_event);
1304 rbd_dev->watch_event = NULL;
79e3057c
YS
1305 return ret;
1306}
1307
59c2be1e 1308struct rbd_notify_info {
0ce1a794 1309 struct rbd_device *rbd_dev;
59c2be1e
YS
1310};
1311
1312static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1313{
0ce1a794
AE
1314 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1315 if (!rbd_dev)
59c2be1e
YS
1316 return;
1317
0ce1a794 1318 dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n",
0bed54dc 1319 rbd_dev->header_name,
59c2be1e
YS
1320 notify_id, (int)opcode);
1321}
1322
1323/*
1324 * Request sync osd notify
1325 */
0ce1a794 1326static int rbd_req_sync_notify(struct rbd_device *rbd_dev,
aded07ea 1327 const char *object_name)
59c2be1e
YS
1328{
1329 struct ceph_osd_req_op *ops;
0ce1a794 1330 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
59c2be1e
YS
1331 struct ceph_osd_event *event;
1332 struct rbd_notify_info info;
1333 int payload_len = sizeof(u32) + sizeof(u32);
1334 int ret;
1335
1336 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1337 if (ret < 0)
1338 return ret;
1339
0ce1a794 1340 info.rbd_dev = rbd_dev;
59c2be1e
YS
1341
1342 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1343 (void *)&info, &event);
1344 if (ret < 0)
1345 goto fail;
1346
1347 ops[0].watch.ver = 1;
1348 ops[0].watch.flag = 1;
1349 ops[0].watch.cookie = event->cookie;
1350 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1351 ops[0].watch.timeout = 12;
1352
0ce1a794 1353 ret = rbd_req_sync_op(rbd_dev, NULL,
59c2be1e
YS
1354 CEPH_NOSNAP,
1355 0,
1356 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1357 ops,
d1f57ea6 1358 object_name, 0, 0, NULL, NULL, NULL);
59c2be1e
YS
1359 if (ret < 0)
1360 goto fail_event;
1361
1362 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1363 dout("ceph_osdc_wait_event returned %d\n", ret);
1364 rbd_destroy_ops(ops);
1365 return 0;
1366
1367fail_event:
1368 ceph_osdc_cancel_event(event);
1369fail:
1370 rbd_destroy_ops(ops);
1371 return ret;
1372}
1373
602adf40
YS
1374/*
1375 * Request sync osd read
1376 */
0ce1a794 1377static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
aded07ea
AE
1378 const char *object_name,
1379 const char *class_name,
1380 const char *method_name,
602adf40 1381 const char *data,
59c2be1e
YS
1382 int len,
1383 u64 *ver)
602adf40
YS
1384{
1385 struct ceph_osd_req_op *ops;
aded07ea
AE
1386 int class_name_len = strlen(class_name);
1387 int method_name_len = strlen(method_name);
602adf40 1388 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
aded07ea 1389 class_name_len + method_name_len + len);
602adf40
YS
1390 if (ret < 0)
1391 return ret;
1392
aded07ea
AE
1393 ops[0].cls.class_name = class_name;
1394 ops[0].cls.class_len = (__u8) class_name_len;
1395 ops[0].cls.method_name = method_name;
1396 ops[0].cls.method_len = (__u8) method_name_len;
602adf40
YS
1397 ops[0].cls.argc = 0;
1398 ops[0].cls.indata = data;
1399 ops[0].cls.indata_len = len;
1400
0ce1a794 1401 ret = rbd_req_sync_op(rbd_dev, NULL,
602adf40
YS
1402 CEPH_NOSNAP,
1403 0,
1404 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1405 ops,
d1f57ea6 1406 object_name, 0, 0, NULL, NULL, ver);
602adf40
YS
1407
1408 rbd_destroy_ops(ops);
1409
1410 dout("cls_exec returned %d\n", ret);
1411 return ret;
1412}
1413
1fec7093
YS
1414static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1415{
1416 struct rbd_req_coll *coll =
1417 kzalloc(sizeof(struct rbd_req_coll) +
1418 sizeof(struct rbd_req_status) * num_reqs,
1419 GFP_ATOMIC);
1420
1421 if (!coll)
1422 return NULL;
1423 coll->total = num_reqs;
1424 kref_init(&coll->kref);
1425 return coll;
1426}
1427
602adf40
YS
1428/*
1429 * block device queue callback
1430 */
1431static void rbd_rq_fn(struct request_queue *q)
1432{
1433 struct rbd_device *rbd_dev = q->queuedata;
1434 struct request *rq;
1435 struct bio_pair *bp = NULL;
1436
00f1f36f 1437 while ((rq = blk_fetch_request(q))) {
602adf40
YS
1438 struct bio *bio;
1439 struct bio *rq_bio, *next_bio = NULL;
1440 bool do_write;
1441 int size, op_size = 0;
1442 u64 ofs;
1fec7093
YS
1443 int num_segs, cur_seg = 0;
1444 struct rbd_req_coll *coll;
d1d25646 1445 struct ceph_snap_context *snapc;
602adf40
YS
1446
1447 /* peek at request from block layer */
1448 if (!rq)
1449 break;
1450
1451 dout("fetched request\n");
1452
1453 /* filter out block requests we don't understand */
1454 if ((rq->cmd_type != REQ_TYPE_FS)) {
1455 __blk_end_request_all(rq, 0);
00f1f36f 1456 continue;
602adf40
YS
1457 }
1458
1459 /* deduce our operation (read, write) */
1460 do_write = (rq_data_dir(rq) == WRITE);
1461
1462 size = blk_rq_bytes(rq);
593a9e7b 1463 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
602adf40
YS
1464 rq_bio = rq->bio;
1465 if (do_write && rbd_dev->read_only) {
1466 __blk_end_request_all(rq, -EROFS);
00f1f36f 1467 continue;
602adf40
YS
1468 }
1469
1470 spin_unlock_irq(q->queue_lock);
1471
d1d25646 1472 down_read(&rbd_dev->header_rwsem);
e88a36ec 1473
d1d25646 1474 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
e88a36ec 1475 up_read(&rbd_dev->header_rwsem);
d1d25646
JD
1476 dout("request for non-existent snapshot");
1477 spin_lock_irq(q->queue_lock);
1478 __blk_end_request_all(rq, -ENXIO);
1479 continue;
e88a36ec
JD
1480 }
1481
d1d25646
JD
1482 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1483
1484 up_read(&rbd_dev->header_rwsem);
1485
602adf40
YS
1486 dout("%s 0x%x bytes at 0x%llx\n",
1487 do_write ? "write" : "read",
593a9e7b 1488 size, blk_rq_pos(rq) * SECTOR_SIZE);
602adf40 1489
1fec7093
YS
1490 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1491 coll = rbd_alloc_coll(num_segs);
1492 if (!coll) {
1493 spin_lock_irq(q->queue_lock);
1494 __blk_end_request_all(rq, -ENOMEM);
d1d25646 1495 ceph_put_snap_context(snapc);
00f1f36f 1496 continue;
1fec7093
YS
1497 }
1498
602adf40
YS
1499 do {
1500 /* a bio clone to be passed down to OSD req */
1501 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1502 op_size = rbd_get_segment(&rbd_dev->header,
ca1e49a6 1503 rbd_dev->header.object_prefix,
602adf40
YS
1504 ofs, size,
1505 NULL, NULL);
1fec7093 1506 kref_get(&coll->kref);
602adf40
YS
1507 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1508 op_size, GFP_ATOMIC);
1509 if (!bio) {
1fec7093
YS
1510 rbd_coll_end_req_index(rq, coll, cur_seg,
1511 -ENOMEM, op_size);
1512 goto next_seg;
602adf40
YS
1513 }
1514
1fec7093 1515
602adf40
YS
1516 /* init OSD command: write or read */
1517 if (do_write)
1518 rbd_req_write(rq, rbd_dev,
d1d25646 1519 snapc,
602adf40 1520 ofs,
1fec7093
YS
1521 op_size, bio,
1522 coll, cur_seg);
602adf40
YS
1523 else
1524 rbd_req_read(rq, rbd_dev,
77dfe99f 1525 rbd_dev->snap_id,
602adf40 1526 ofs,
1fec7093
YS
1527 op_size, bio,
1528 coll, cur_seg);
602adf40 1529
1fec7093 1530next_seg:
602adf40
YS
1531 size -= op_size;
1532 ofs += op_size;
1533
1fec7093 1534 cur_seg++;
602adf40
YS
1535 rq_bio = next_bio;
1536 } while (size > 0);
1fec7093 1537 kref_put(&coll->kref, rbd_coll_release);
602adf40
YS
1538
1539 if (bp)
1540 bio_pair_release(bp);
602adf40 1541 spin_lock_irq(q->queue_lock);
d1d25646
JD
1542
1543 ceph_put_snap_context(snapc);
602adf40
YS
1544 }
1545}
1546
1547/*
1548 * a queue callback. Makes sure that we don't create a bio that spans across
1549 * multiple osd objects. One exception would be with a single page bios,
1550 * which we handle later at bio_chain_clone
1551 */
1552static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1553 struct bio_vec *bvec)
1554{
1555 struct rbd_device *rbd_dev = q->queuedata;
593a9e7b
AE
1556 unsigned int chunk_sectors;
1557 sector_t sector;
1558 unsigned int bio_sectors;
602adf40
YS
1559 int max;
1560
593a9e7b
AE
1561 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1562 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1563 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1564
602adf40 1565 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
593a9e7b 1566 + bio_sectors)) << SECTOR_SHIFT;
602adf40
YS
1567 if (max < 0)
1568 max = 0; /* bio_add cannot handle a negative return */
1569 if (max <= bvec->bv_len && bio_sectors == 0)
1570 return bvec->bv_len;
1571 return max;
1572}
1573
1574static void rbd_free_disk(struct rbd_device *rbd_dev)
1575{
1576 struct gendisk *disk = rbd_dev->disk;
1577
1578 if (!disk)
1579 return;
1580
1581 rbd_header_free(&rbd_dev->header);
1582
1583 if (disk->flags & GENHD_FL_UP)
1584 del_gendisk(disk);
1585 if (disk->queue)
1586 blk_cleanup_queue(disk->queue);
1587 put_disk(disk);
1588}
1589
1590/*
1591 * reload the ondisk the header
1592 */
1593static int rbd_read_header(struct rbd_device *rbd_dev,
1594 struct rbd_image_header *header)
1595{
1596 ssize_t rc;
1597 struct rbd_image_header_ondisk *dh;
50f7c4c9 1598 u32 snap_count = 0;
59c2be1e 1599 u64 ver;
00f1f36f 1600 size_t len;
602adf40 1601
00f1f36f
AE
1602 /*
1603 * First reads the fixed-size header to determine the number
1604 * of snapshots, then re-reads it, along with all snapshot
1605 * records as well as their stored names.
1606 */
1607 len = sizeof (*dh);
602adf40 1608 while (1) {
602adf40
YS
1609 dh = kmalloc(len, GFP_KERNEL);
1610 if (!dh)
1611 return -ENOMEM;
1612
1613 rc = rbd_req_sync_read(rbd_dev,
1614 NULL, CEPH_NOSNAP,
0bed54dc 1615 rbd_dev->header_name,
602adf40 1616 0, len,
59c2be1e 1617 (char *)dh, &ver);
602adf40
YS
1618 if (rc < 0)
1619 goto out_dh;
1620
1621 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
81e759fb 1622 if (rc < 0) {
00f1f36f 1623 if (rc == -ENXIO)
81e759fb 1624 pr_warning("unrecognized header format"
0bed54dc
AE
1625 " for image %s\n",
1626 rbd_dev->image_name);
602adf40 1627 goto out_dh;
81e759fb 1628 }
602adf40 1629
00f1f36f
AE
1630 if (snap_count == header->total_snaps)
1631 break;
1632
1633 snap_count = header->total_snaps;
1634 len = sizeof (*dh) +
1635 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1636 header->snap_names_len;
1637
1638 rbd_header_free(header);
1639 kfree(dh);
602adf40 1640 }
59c2be1e 1641 header->obj_version = ver;
602adf40
YS
1642
1643out_dh:
1644 kfree(dh);
1645 return rc;
1646}
1647
1648/*
1649 * create a snapshot
1650 */
0ce1a794 1651static int rbd_header_add_snap(struct rbd_device *rbd_dev,
602adf40
YS
1652 const char *snap_name,
1653 gfp_t gfp_flags)
1654{
1655 int name_len = strlen(snap_name);
1656 u64 new_snapid;
1657 int ret;
916d4d67 1658 void *data, *p, *e;
59c2be1e 1659 u64 ver;
1dbb4399 1660 struct ceph_mon_client *monc;
602adf40
YS
1661
1662 /* we should create a snapshot only if we're pointing at the head */
0ce1a794 1663 if (rbd_dev->snap_id != CEPH_NOSNAP)
602adf40
YS
1664 return -EINVAL;
1665
0ce1a794
AE
1666 monc = &rbd_dev->rbd_client->client->monc;
1667 ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
602adf40
YS
1668 dout("created snapid=%lld\n", new_snapid);
1669 if (ret < 0)
1670 return ret;
1671
1672 data = kmalloc(name_len + 16, gfp_flags);
1673 if (!data)
1674 return -ENOMEM;
1675
916d4d67
SW
1676 p = data;
1677 e = data + name_len + 16;
602adf40 1678
916d4d67
SW
1679 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1680 ceph_encode_64_safe(&p, e, new_snapid, bad);
602adf40 1681
0bed54dc 1682 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
0ce1a794 1683 "rbd", "snap_add",
916d4d67 1684 data, p - data, &ver);
602adf40 1685
916d4d67 1686 kfree(data);
602adf40
YS
1687
1688 if (ret < 0)
1689 return ret;
1690
0ce1a794
AE
1691 down_write(&rbd_dev->header_rwsem);
1692 rbd_dev->header.snapc->seq = new_snapid;
1693 up_write(&rbd_dev->header_rwsem);
602adf40
YS
1694
1695 return 0;
1696bad:
1697 return -ERANGE;
1698}
1699
dfc5606d
YS
1700static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1701{
1702 struct rbd_snap *snap;
1703
1704 while (!list_empty(&rbd_dev->snaps)) {
1705 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1706 __rbd_remove_snap_dev(rbd_dev, snap);
1707 }
1708}
1709
602adf40
YS
1710/*
1711 * only read the first part of the ondisk header, without the snaps info
1712 */
263c6ca0 1713static int __rbd_refresh_header(struct rbd_device *rbd_dev)
602adf40
YS
1714{
1715 int ret;
1716 struct rbd_image_header h;
602adf40
YS
1717
1718 ret = rbd_read_header(rbd_dev, &h);
1719 if (ret < 0)
1720 return ret;
1721
a51aa0c0
JD
1722 down_write(&rbd_dev->header_rwsem);
1723
9db4b3e3 1724 /* resized? */
474ef7ce
JD
1725 if (rbd_dev->snap_id == CEPH_NOSNAP) {
1726 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1727
1728 dout("setting size to %llu sectors", (unsigned long long) size);
1729 set_capacity(rbd_dev->disk, size);
1730 }
9db4b3e3 1731
849b4260 1732 /* rbd_dev->header.object_prefix shouldn't change */
602adf40 1733 kfree(rbd_dev->header.snap_sizes);
849b4260 1734 kfree(rbd_dev->header.snap_names);
d1d25646
JD
1735 /* osd requests may still refer to snapc */
1736 ceph_put_snap_context(rbd_dev->header.snapc);
602adf40 1737
a71b891b 1738 rbd_dev->header.obj_version = h.obj_version;
93a24e08 1739 rbd_dev->header.image_size = h.image_size;
602adf40
YS
1740 rbd_dev->header.total_snaps = h.total_snaps;
1741 rbd_dev->header.snapc = h.snapc;
1742 rbd_dev->header.snap_names = h.snap_names;
dfc5606d 1743 rbd_dev->header.snap_names_len = h.snap_names_len;
602adf40 1744 rbd_dev->header.snap_sizes = h.snap_sizes;
849b4260
AE
1745 /* Free the extra copy of the object prefix */
1746 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1747 kfree(h.object_prefix);
1748
dfc5606d
YS
1749 ret = __rbd_init_snaps_header(rbd_dev);
1750
c666601a 1751 up_write(&rbd_dev->header_rwsem);
602adf40 1752
dfc5606d 1753 return ret;
602adf40
YS
1754}
1755
1756static int rbd_init_disk(struct rbd_device *rbd_dev)
1757{
1758 struct gendisk *disk;
1759 struct request_queue *q;
1760 int rc;
593a9e7b 1761 u64 segment_size;
602adf40
YS
1762 u64 total_size = 0;
1763
1764 /* contact OSD, request size info about the object being mapped */
1765 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1766 if (rc)
1767 return rc;
1768
dfc5606d
YS
1769 /* no need to lock here, as rbd_dev is not registered yet */
1770 rc = __rbd_init_snaps_header(rbd_dev);
1771 if (rc)
1772 return rc;
1773
cc9d734c 1774 rc = rbd_header_set_snap(rbd_dev, &total_size);
602adf40
YS
1775 if (rc)
1776 return rc;
1777
1778 /* create gendisk info */
1779 rc = -ENOMEM;
1780 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1781 if (!disk)
1782 goto out;
1783
f0f8cef5 1784 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
aedfec59 1785 rbd_dev->id);
602adf40
YS
1786 disk->major = rbd_dev->major;
1787 disk->first_minor = 0;
1788 disk->fops = &rbd_bd_ops;
1789 disk->private_data = rbd_dev;
1790
1791 /* init rq */
1792 rc = -ENOMEM;
1793 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1794 if (!q)
1795 goto out_disk;
029bcbd8 1796
593a9e7b
AE
1797 /* We use the default size, but let's be explicit about it. */
1798 blk_queue_physical_block_size(q, SECTOR_SIZE);
1799
029bcbd8 1800 /* set io sizes to object size */
593a9e7b
AE
1801 segment_size = rbd_obj_bytes(&rbd_dev->header);
1802 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1803 blk_queue_max_segment_size(q, segment_size);
1804 blk_queue_io_min(q, segment_size);
1805 blk_queue_io_opt(q, segment_size);
029bcbd8 1806
602adf40
YS
1807 blk_queue_merge_bvec(q, rbd_merge_bvec);
1808 disk->queue = q;
1809
1810 q->queuedata = rbd_dev;
1811
1812 rbd_dev->disk = disk;
1813 rbd_dev->q = q;
1814
1815 /* finally, announce the disk to the world */
593a9e7b 1816 set_capacity(disk, total_size / SECTOR_SIZE);
602adf40
YS
1817 add_disk(disk);
1818
1819 pr_info("%s: added with size 0x%llx\n",
1820 disk->disk_name, (unsigned long long)total_size);
1821 return 0;
1822
1823out_disk:
1824 put_disk(disk);
1825out:
1826 return rc;
1827}
1828
dfc5606d
YS
1829/*
1830 sysfs
1831*/
1832
593a9e7b
AE
1833static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1834{
1835 return container_of(dev, struct rbd_device, dev);
1836}
1837
dfc5606d
YS
1838static ssize_t rbd_size_show(struct device *dev,
1839 struct device_attribute *attr, char *buf)
1840{
593a9e7b 1841 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0
JD
1842 sector_t size;
1843
1844 down_read(&rbd_dev->header_rwsem);
1845 size = get_capacity(rbd_dev->disk);
1846 up_read(&rbd_dev->header_rwsem);
dfc5606d 1847
a51aa0c0 1848 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
dfc5606d
YS
1849}
1850
1851static ssize_t rbd_major_show(struct device *dev,
1852 struct device_attribute *attr, char *buf)
1853{
593a9e7b 1854 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 1855
dfc5606d
YS
1856 return sprintf(buf, "%d\n", rbd_dev->major);
1857}
1858
1859static ssize_t rbd_client_id_show(struct device *dev,
1860 struct device_attribute *attr, char *buf)
602adf40 1861{
593a9e7b 1862 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 1863
1dbb4399
AE
1864 return sprintf(buf, "client%lld\n",
1865 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
1866}
1867
dfc5606d
YS
1868static ssize_t rbd_pool_show(struct device *dev,
1869 struct device_attribute *attr, char *buf)
602adf40 1870{
593a9e7b 1871 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
1872
1873 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1874}
1875
9bb2f334
AE
1876static ssize_t rbd_pool_id_show(struct device *dev,
1877 struct device_attribute *attr, char *buf)
1878{
1879 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1880
1881 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1882}
1883
dfc5606d
YS
1884static ssize_t rbd_name_show(struct device *dev,
1885 struct device_attribute *attr, char *buf)
1886{
593a9e7b 1887 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 1888
0bed54dc 1889 return sprintf(buf, "%s\n", rbd_dev->image_name);
dfc5606d
YS
1890}
1891
1892static ssize_t rbd_snap_show(struct device *dev,
1893 struct device_attribute *attr,
1894 char *buf)
1895{
593a9e7b 1896 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
1897
1898 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1899}
1900
1901static ssize_t rbd_image_refresh(struct device *dev,
1902 struct device_attribute *attr,
1903 const char *buf,
1904 size_t size)
1905{
593a9e7b 1906 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
1907 int rc;
1908 int ret = size;
602adf40
YS
1909
1910 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1911
263c6ca0 1912 rc = __rbd_refresh_header(rbd_dev);
dfc5606d
YS
1913 if (rc < 0)
1914 ret = rc;
602adf40 1915
dfc5606d
YS
1916 mutex_unlock(&ctl_mutex);
1917 return ret;
1918}
602adf40 1919
dfc5606d
YS
1920static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1921static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1922static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1923static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 1924static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d
YS
1925static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1926static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1927static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1928static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
dfc5606d
YS
1929
1930static struct attribute *rbd_attrs[] = {
1931 &dev_attr_size.attr,
1932 &dev_attr_major.attr,
1933 &dev_attr_client_id.attr,
1934 &dev_attr_pool.attr,
9bb2f334 1935 &dev_attr_pool_id.attr,
dfc5606d
YS
1936 &dev_attr_name.attr,
1937 &dev_attr_current_snap.attr,
1938 &dev_attr_refresh.attr,
1939 &dev_attr_create_snap.attr,
dfc5606d
YS
1940 NULL
1941};
1942
1943static struct attribute_group rbd_attr_group = {
1944 .attrs = rbd_attrs,
1945};
1946
1947static const struct attribute_group *rbd_attr_groups[] = {
1948 &rbd_attr_group,
1949 NULL
1950};
1951
1952static void rbd_sysfs_dev_release(struct device *dev)
1953{
1954}
1955
1956static struct device_type rbd_device_type = {
1957 .name = "rbd",
1958 .groups = rbd_attr_groups,
1959 .release = rbd_sysfs_dev_release,
1960};
1961
1962
1963/*
1964 sysfs - snapshots
1965*/
1966
1967static ssize_t rbd_snap_size_show(struct device *dev,
1968 struct device_attribute *attr,
1969 char *buf)
1970{
1971 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1972
3591538f 1973 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
dfc5606d
YS
1974}
1975
1976static ssize_t rbd_snap_id_show(struct device *dev,
1977 struct device_attribute *attr,
1978 char *buf)
1979{
1980 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1981
3591538f 1982 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
dfc5606d
YS
1983}
1984
1985static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1986static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1987
1988static struct attribute *rbd_snap_attrs[] = {
1989 &dev_attr_snap_size.attr,
1990 &dev_attr_snap_id.attr,
1991 NULL,
1992};
1993
1994static struct attribute_group rbd_snap_attr_group = {
1995 .attrs = rbd_snap_attrs,
1996};
1997
1998static void rbd_snap_dev_release(struct device *dev)
1999{
2000 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2001 kfree(snap->name);
2002 kfree(snap);
2003}
2004
2005static const struct attribute_group *rbd_snap_attr_groups[] = {
2006 &rbd_snap_attr_group,
2007 NULL
2008};
2009
2010static struct device_type rbd_snap_device_type = {
2011 .groups = rbd_snap_attr_groups,
2012 .release = rbd_snap_dev_release,
2013};
2014
2015static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
2016 struct rbd_snap *snap)
2017{
2018 list_del(&snap->node);
2019 device_unregister(&snap->dev);
2020}
2021
2022static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
2023 struct rbd_snap *snap,
2024 struct device *parent)
2025{
2026 struct device *dev = &snap->dev;
2027 int ret;
2028
2029 dev->type = &rbd_snap_device_type;
2030 dev->parent = parent;
2031 dev->release = rbd_snap_dev_release;
2032 dev_set_name(dev, "snap_%s", snap->name);
2033 ret = device_register(dev);
2034
2035 return ret;
2036}
2037
2038static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
2039 int i, const char *name,
2040 struct rbd_snap **snapp)
2041{
2042 int ret;
2043 struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
2044 if (!snap)
2045 return -ENOMEM;
2046 snap->name = kstrdup(name, GFP_KERNEL);
2047 snap->size = rbd_dev->header.snap_sizes[i];
2048 snap->id = rbd_dev->header.snapc->snaps[i];
2049 if (device_is_registered(&rbd_dev->dev)) {
2050 ret = rbd_register_snap_dev(rbd_dev, snap,
2051 &rbd_dev->dev);
2052 if (ret < 0)
2053 goto err;
2054 }
2055 *snapp = snap;
2056 return 0;
2057err:
2058 kfree(snap->name);
2059 kfree(snap);
2060 return ret;
2061}
2062
2063/*
2064 * search for the previous snap in a null delimited string list
2065 */
2066const char *rbd_prev_snap_name(const char *name, const char *start)
2067{
2068 if (name < start + 2)
2069 return NULL;
2070
2071 name -= 2;
2072 while (*name) {
2073 if (name == start)
2074 return start;
2075 name--;
2076 }
2077 return name + 1;
2078}
2079
2080/*
2081 * compare the old list of snapshots that we have to what's in the header
2082 * and update it accordingly. Note that the header holds the snapshots
2083 * in a reverse order (from newest to oldest) and we need to go from
2084 * older to new so that we don't get a duplicate snap name when
2085 * doing the process (e.g., removed snapshot and recreated a new
2086 * one with the same name.
2087 */
2088static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2089{
2090 const char *name, *first_name;
2091 int i = rbd_dev->header.total_snaps;
2092 struct rbd_snap *snap, *old_snap = NULL;
2093 int ret;
2094 struct list_head *p, *n;
2095
2096 first_name = rbd_dev->header.snap_names;
2097 name = first_name + rbd_dev->header.snap_names_len;
2098
2099 list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2100 u64 cur_id;
2101
2102 old_snap = list_entry(p, struct rbd_snap, node);
2103
2104 if (i)
2105 cur_id = rbd_dev->header.snapc->snaps[i - 1];
2106
2107 if (!i || old_snap->id < cur_id) {
e88a36ec
JD
2108 /*
2109 * old_snap->id was skipped, thus was
2110 * removed. If this rbd_dev is mapped to
2111 * the removed snapshot, record that it no
2112 * longer exists, to prevent further I/O.
2113 */
2114 if (rbd_dev->snap_id == old_snap->id)
2115 rbd_dev->snap_exists = false;
dfc5606d
YS
2116 __rbd_remove_snap_dev(rbd_dev, old_snap);
2117 continue;
2118 }
2119 if (old_snap->id == cur_id) {
2120 /* we have this snapshot already */
2121 i--;
2122 name = rbd_prev_snap_name(name, first_name);
2123 continue;
2124 }
2125 for (; i > 0;
2126 i--, name = rbd_prev_snap_name(name, first_name)) {
2127 if (!name) {
2128 WARN_ON(1);
2129 return -EINVAL;
2130 }
2131 cur_id = rbd_dev->header.snapc->snaps[i];
2132 /* snapshot removal? handle it above */
2133 if (cur_id >= old_snap->id)
2134 break;
2135 /* a new snapshot */
2136 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2137 if (ret < 0)
2138 return ret;
2139
2140 /* note that we add it backward so using n and not p */
2141 list_add(&snap->node, n);
2142 p = &snap->node;
2143 }
2144 }
2145 /* we're done going over the old snap list, just add what's left */
2146 for (; i > 0; i--) {
2147 name = rbd_prev_snap_name(name, first_name);
2148 if (!name) {
2149 WARN_ON(1);
2150 return -EINVAL;
2151 }
2152 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2153 if (ret < 0)
2154 return ret;
2155 list_add(&snap->node, &rbd_dev->snaps);
2156 }
2157
2158 return 0;
2159}
2160
dfc5606d
YS
2161static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2162{
f0f8cef5 2163 int ret;
dfc5606d
YS
2164 struct device *dev;
2165 struct rbd_snap *snap;
2166
2167 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2168 dev = &rbd_dev->dev;
2169
2170 dev->bus = &rbd_bus_type;
2171 dev->type = &rbd_device_type;
2172 dev->parent = &rbd_root_dev;
2173 dev->release = rbd_dev_release;
2174 dev_set_name(dev, "%d", rbd_dev->id);
2175 ret = device_register(dev);
2176 if (ret < 0)
f0f8cef5 2177 goto out;
dfc5606d
YS
2178
2179 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2180 ret = rbd_register_snap_dev(rbd_dev, snap,
2181 &rbd_dev->dev);
2182 if (ret < 0)
602adf40
YS
2183 break;
2184 }
f0f8cef5 2185out:
dfc5606d
YS
2186 mutex_unlock(&ctl_mutex);
2187 return ret;
602adf40
YS
2188}
2189
dfc5606d
YS
2190static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2191{
2192 device_unregister(&rbd_dev->dev);
2193}
2194
59c2be1e
YS
2195static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2196{
2197 int ret, rc;
2198
2199 do {
0bed54dc 2200 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->header_name,
59c2be1e
YS
2201 rbd_dev->header.obj_version);
2202 if (ret == -ERANGE) {
2203 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
263c6ca0 2204 rc = __rbd_refresh_header(rbd_dev);
59c2be1e
YS
2205 mutex_unlock(&ctl_mutex);
2206 if (rc < 0)
2207 return rc;
2208 }
2209 } while (ret == -ERANGE);
2210
2211 return ret;
2212}
2213
1ddbe94e
AE
2214static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2215
2216/*
499afd5b
AE
2217 * Get a unique rbd identifier for the given new rbd_dev, and add
2218 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 2219 */
499afd5b 2220static void rbd_id_get(struct rbd_device *rbd_dev)
b7f23c36 2221{
499afd5b
AE
2222 rbd_dev->id = atomic64_inc_return(&rbd_id_max);
2223
2224 spin_lock(&rbd_dev_list_lock);
2225 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2226 spin_unlock(&rbd_dev_list_lock);
1ddbe94e 2227}
b7f23c36 2228
1ddbe94e 2229/*
499afd5b
AE
2230 * Remove an rbd_dev from the global list, and record that its
2231 * identifier is no longer in use.
1ddbe94e 2232 */
499afd5b 2233static void rbd_id_put(struct rbd_device *rbd_dev)
1ddbe94e 2234{
d184f6bf
AE
2235 struct list_head *tmp;
2236 int rbd_id = rbd_dev->id;
2237 int max_id;
2238
2239 BUG_ON(rbd_id < 1);
499afd5b
AE
2240
2241 spin_lock(&rbd_dev_list_lock);
2242 list_del_init(&rbd_dev->node);
d184f6bf
AE
2243
2244 /*
2245 * If the id being "put" is not the current maximum, there
2246 * is nothing special we need to do.
2247 */
2248 if (rbd_id != atomic64_read(&rbd_id_max)) {
2249 spin_unlock(&rbd_dev_list_lock);
2250 return;
2251 }
2252
2253 /*
2254 * We need to update the current maximum id. Search the
2255 * list to find out what it is. We're more likely to find
2256 * the maximum at the end, so search the list backward.
2257 */
2258 max_id = 0;
2259 list_for_each_prev(tmp, &rbd_dev_list) {
2260 struct rbd_device *rbd_dev;
2261
2262 rbd_dev = list_entry(tmp, struct rbd_device, node);
2263 if (rbd_id > max_id)
2264 max_id = rbd_id;
2265 }
499afd5b 2266 spin_unlock(&rbd_dev_list_lock);
b7f23c36 2267
1ddbe94e 2268 /*
d184f6bf
AE
2269 * The max id could have been updated by rbd_id_get(), in
2270 * which case it now accurately reflects the new maximum.
2271 * Be careful not to overwrite the maximum value in that
2272 * case.
1ddbe94e 2273 */
d184f6bf 2274 atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
b7f23c36
AE
2275}
2276
e28fff26
AE
2277/*
2278 * Skips over white space at *buf, and updates *buf to point to the
2279 * first found non-space character (if any). Returns the length of
593a9e7b
AE
2280 * the token (string of non-white space characters) found. Note
2281 * that *buf must be terminated with '\0'.
e28fff26
AE
2282 */
2283static inline size_t next_token(const char **buf)
2284{
2285 /*
2286 * These are the characters that produce nonzero for
2287 * isspace() in the "C" and "POSIX" locales.
2288 */
2289 const char *spaces = " \f\n\r\t\v";
2290
2291 *buf += strspn(*buf, spaces); /* Find start of token */
2292
2293 return strcspn(*buf, spaces); /* Return token length */
2294}
2295
2296/*
2297 * Finds the next token in *buf, and if the provided token buffer is
2298 * big enough, copies the found token into it. The result, if
593a9e7b
AE
2299 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2300 * must be terminated with '\0' on entry.
e28fff26
AE
2301 *
2302 * Returns the length of the token found (not including the '\0').
2303 * Return value will be 0 if no token is found, and it will be >=
2304 * token_size if the token would not fit.
2305 *
593a9e7b 2306 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
2307 * found token. Note that this occurs even if the token buffer is
2308 * too small to hold it.
2309 */
2310static inline size_t copy_token(const char **buf,
2311 char *token,
2312 size_t token_size)
2313{
2314 size_t len;
2315
2316 len = next_token(buf);
2317 if (len < token_size) {
2318 memcpy(token, *buf, len);
2319 *(token + len) = '\0';
2320 }
2321 *buf += len;
2322
2323 return len;
2324}
2325
ea3352f4
AE
2326/*
2327 * Finds the next token in *buf, dynamically allocates a buffer big
2328 * enough to hold a copy of it, and copies the token into the new
2329 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2330 * that a duplicate buffer is created even for a zero-length token.
2331 *
2332 * Returns a pointer to the newly-allocated duplicate, or a null
2333 * pointer if memory for the duplicate was not available. If
2334 * the lenp argument is a non-null pointer, the length of the token
2335 * (not including the '\0') is returned in *lenp.
2336 *
2337 * If successful, the *buf pointer will be updated to point beyond
2338 * the end of the found token.
2339 *
2340 * Note: uses GFP_KERNEL for allocation.
2341 */
2342static inline char *dup_token(const char **buf, size_t *lenp)
2343{
2344 char *dup;
2345 size_t len;
2346
2347 len = next_token(buf);
2348 dup = kmalloc(len + 1, GFP_KERNEL);
2349 if (!dup)
2350 return NULL;
2351
2352 memcpy(dup, *buf, len);
2353 *(dup + len) = '\0';
2354 *buf += len;
2355
2356 if (lenp)
2357 *lenp = len;
2358
2359 return dup;
2360}
2361
a725f65e 2362/*
0bed54dc 2363 * This fills in the pool_name, image_name, image_name_len, snap_name,
a725f65e
AE
2364 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2365 * on the list of monitor addresses and other options provided via
2366 * /sys/bus/rbd/add.
d22f76e7
AE
2367 *
2368 * Note: rbd_dev is assumed to have been initially zero-filled.
a725f65e
AE
2369 */
2370static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2371 const char *buf,
7ef3214a 2372 const char **mon_addrs,
5214ecc4 2373 size_t *mon_addrs_size,
e28fff26 2374 char *options,
0bed54dc 2375 size_t options_size)
e28fff26 2376{
d22f76e7
AE
2377 size_t len;
2378 int ret;
e28fff26
AE
2379
2380 /* The first four tokens are required */
2381
7ef3214a
AE
2382 len = next_token(&buf);
2383 if (!len)
a725f65e 2384 return -EINVAL;
5214ecc4 2385 *mon_addrs_size = len + 1;
7ef3214a
AE
2386 *mon_addrs = buf;
2387
2388 buf += len;
a725f65e 2389
e28fff26
AE
2390 len = copy_token(&buf, options, options_size);
2391 if (!len || len >= options_size)
2392 return -EINVAL;
2393
bf3e5ae1 2394 ret = -ENOMEM;
d22f76e7
AE
2395 rbd_dev->pool_name = dup_token(&buf, NULL);
2396 if (!rbd_dev->pool_name)
d22f76e7 2397 goto out_err;
e28fff26 2398
0bed54dc
AE
2399 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2400 if (!rbd_dev->image_name)
bf3e5ae1 2401 goto out_err;
a725f65e 2402
cb8627c7
AE
2403 /* Create the name of the header object */
2404
0bed54dc 2405 rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
bf3e5ae1
AE
2406 + sizeof (RBD_SUFFIX),
2407 GFP_KERNEL);
0bed54dc 2408 if (!rbd_dev->header_name)
cb8627c7 2409 goto out_err;
0bed54dc 2410 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
a725f65e 2411
e28fff26 2412 /*
820a5f3e
AE
2413 * The snapshot name is optional. If none is is supplied,
2414 * we use the default value.
e28fff26 2415 */
820a5f3e
AE
2416 rbd_dev->snap_name = dup_token(&buf, &len);
2417 if (!rbd_dev->snap_name)
2418 goto out_err;
2419 if (!len) {
2420 /* Replace the empty name with the default */
2421 kfree(rbd_dev->snap_name);
2422 rbd_dev->snap_name
2423 = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2424 if (!rbd_dev->snap_name)
2425 goto out_err;
2426
e28fff26
AE
2427 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2428 sizeof (RBD_SNAP_HEAD_NAME));
849b4260 2429 }
e28fff26 2430
a725f65e 2431 return 0;
d22f76e7
AE
2432
2433out_err:
0bed54dc
AE
2434 kfree(rbd_dev->header_name);
2435 kfree(rbd_dev->image_name);
d22f76e7
AE
2436 kfree(rbd_dev->pool_name);
2437 rbd_dev->pool_name = NULL;
2438
2439 return ret;
a725f65e
AE
2440}
2441
59c2be1e
YS
2442static ssize_t rbd_add(struct bus_type *bus,
2443 const char *buf,
2444 size_t count)
602adf40 2445{
cb8627c7
AE
2446 char *options;
2447 struct rbd_device *rbd_dev = NULL;
7ef3214a
AE
2448 const char *mon_addrs = NULL;
2449 size_t mon_addrs_size = 0;
27cc2594
AE
2450 struct ceph_osd_client *osdc;
2451 int rc = -ENOMEM;
602adf40
YS
2452
2453 if (!try_module_get(THIS_MODULE))
2454 return -ENODEV;
2455
60571c7d 2456 options = kmalloc(count, GFP_KERNEL);
602adf40 2457 if (!options)
27cc2594 2458 goto err_nomem;
cb8627c7
AE
2459 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2460 if (!rbd_dev)
2461 goto err_nomem;
602adf40
YS
2462
2463 /* static rbd_device initialization */
2464 spin_lock_init(&rbd_dev->lock);
2465 INIT_LIST_HEAD(&rbd_dev->node);
dfc5606d 2466 INIT_LIST_HEAD(&rbd_dev->snaps);
c666601a 2467 init_rwsem(&rbd_dev->header_rwsem);
602adf40 2468
c666601a 2469 init_rwsem(&rbd_dev->header_rwsem);
0e805a1d 2470
d184f6bf 2471 /* generate unique id: find highest unique id, add one */
499afd5b 2472 rbd_id_get(rbd_dev);
602adf40 2473
a725f65e 2474 /* Fill in the device name, now that we have its id. */
81a89793
AE
2475 BUILD_BUG_ON(DEV_NAME_LEN
2476 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2477 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id);
a725f65e 2478
602adf40 2479 /* parse add command */
7ef3214a 2480 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
e28fff26 2481 options, count);
a725f65e 2482 if (rc)
f0f8cef5 2483 goto err_put_id;
e124a82f 2484
5214ecc4
AE
2485 rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2486 options);
d720bcb0
AE
2487 if (IS_ERR(rbd_dev->rbd_client)) {
2488 rc = PTR_ERR(rbd_dev->rbd_client);
f0f8cef5 2489 goto err_put_id;
d720bcb0 2490 }
602adf40 2491
602adf40 2492 /* pick the pool */
1dbb4399 2493 osdc = &rbd_dev->rbd_client->client->osdc;
602adf40
YS
2494 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2495 if (rc < 0)
2496 goto err_out_client;
9bb2f334 2497 rbd_dev->pool_id = rc;
602adf40
YS
2498
2499 /* register our block device */
27cc2594
AE
2500 rc = register_blkdev(0, rbd_dev->name);
2501 if (rc < 0)
602adf40 2502 goto err_out_client;
27cc2594 2503 rbd_dev->major = rc;
602adf40 2504
dfc5606d
YS
2505 rc = rbd_bus_add_dev(rbd_dev);
2506 if (rc)
766fc439
YS
2507 goto err_out_blkdev;
2508
32eec68d
AE
2509 /*
2510 * At this point cleanup in the event of an error is the job
2511 * of the sysfs code (initiated by rbd_bus_del_dev()).
2512 *
2513 * Set up and announce blkdev mapping.
2514 */
602adf40
YS
2515 rc = rbd_init_disk(rbd_dev);
2516 if (rc)
766fc439 2517 goto err_out_bus;
602adf40 2518
59c2be1e
YS
2519 rc = rbd_init_watch_dev(rbd_dev);
2520 if (rc)
2521 goto err_out_bus;
2522
602adf40
YS
2523 return count;
2524
766fc439 2525err_out_bus:
766fc439
YS
2526 /* this will also clean up rest of rbd_dev stuff */
2527
2528 rbd_bus_del_dev(rbd_dev);
2529 kfree(options);
766fc439
YS
2530 return rc;
2531
602adf40
YS
2532err_out_blkdev:
2533 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2534err_out_client:
2535 rbd_put_client(rbd_dev);
f0f8cef5 2536err_put_id:
cb8627c7 2537 if (rbd_dev->pool_name) {
820a5f3e 2538 kfree(rbd_dev->snap_name);
0bed54dc
AE
2539 kfree(rbd_dev->header_name);
2540 kfree(rbd_dev->image_name);
cb8627c7
AE
2541 kfree(rbd_dev->pool_name);
2542 }
499afd5b 2543 rbd_id_put(rbd_dev);
27cc2594 2544err_nomem:
27cc2594 2545 kfree(rbd_dev);
cb8627c7 2546 kfree(options);
27cc2594 2547
602adf40
YS
2548 dout("Error adding device %s\n", buf);
2549 module_put(THIS_MODULE);
27cc2594
AE
2550
2551 return (ssize_t) rc;
602adf40
YS
2552}
2553
2554static struct rbd_device *__rbd_get_dev(unsigned long id)
2555{
2556 struct list_head *tmp;
2557 struct rbd_device *rbd_dev;
2558
e124a82f 2559 spin_lock(&rbd_dev_list_lock);
602adf40
YS
2560 list_for_each(tmp, &rbd_dev_list) {
2561 rbd_dev = list_entry(tmp, struct rbd_device, node);
e124a82f
AE
2562 if (rbd_dev->id == id) {
2563 spin_unlock(&rbd_dev_list_lock);
602adf40 2564 return rbd_dev;
e124a82f 2565 }
602adf40 2566 }
e124a82f 2567 spin_unlock(&rbd_dev_list_lock);
602adf40
YS
2568 return NULL;
2569}
2570
dfc5606d 2571static void rbd_dev_release(struct device *dev)
602adf40 2572{
593a9e7b 2573 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 2574
1dbb4399
AE
2575 if (rbd_dev->watch_request) {
2576 struct ceph_client *client = rbd_dev->rbd_client->client;
2577
2578 ceph_osdc_unregister_linger_request(&client->osdc,
59c2be1e 2579 rbd_dev->watch_request);
1dbb4399 2580 }
59c2be1e 2581 if (rbd_dev->watch_event)
0bed54dc 2582 rbd_req_sync_unwatch(rbd_dev, rbd_dev->header_name);
59c2be1e 2583
602adf40
YS
2584 rbd_put_client(rbd_dev);
2585
2586 /* clean up and free blkdev */
2587 rbd_free_disk(rbd_dev);
2588 unregister_blkdev(rbd_dev->major, rbd_dev->name);
32eec68d
AE
2589
2590 /* done with the id, and with the rbd_dev */
820a5f3e 2591 kfree(rbd_dev->snap_name);
0bed54dc 2592 kfree(rbd_dev->header_name);
d22f76e7 2593 kfree(rbd_dev->pool_name);
0bed54dc 2594 kfree(rbd_dev->image_name);
32eec68d 2595 rbd_id_put(rbd_dev);
602adf40
YS
2596 kfree(rbd_dev);
2597
2598 /* release module ref */
2599 module_put(THIS_MODULE);
602adf40
YS
2600}
2601
dfc5606d
YS
2602static ssize_t rbd_remove(struct bus_type *bus,
2603 const char *buf,
2604 size_t count)
602adf40
YS
2605{
2606 struct rbd_device *rbd_dev = NULL;
2607 int target_id, rc;
2608 unsigned long ul;
2609 int ret = count;
2610
2611 rc = strict_strtoul(buf, 10, &ul);
2612 if (rc)
2613 return rc;
2614
2615 /* convert to int; abort if we lost anything in the conversion */
2616 target_id = (int) ul;
2617 if (target_id != ul)
2618 return -EINVAL;
2619
2620 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2621
2622 rbd_dev = __rbd_get_dev(target_id);
2623 if (!rbd_dev) {
2624 ret = -ENOENT;
2625 goto done;
2626 }
2627
dfc5606d
YS
2628 __rbd_remove_all_snaps(rbd_dev);
2629 rbd_bus_del_dev(rbd_dev);
602adf40
YS
2630
2631done:
2632 mutex_unlock(&ctl_mutex);
2633 return ret;
2634}
2635
dfc5606d
YS
2636static ssize_t rbd_snap_add(struct device *dev,
2637 struct device_attribute *attr,
2638 const char *buf,
2639 size_t count)
602adf40 2640{
593a9e7b 2641 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
2642 int ret;
2643 char *name = kmalloc(count + 1, GFP_KERNEL);
602adf40
YS
2644 if (!name)
2645 return -ENOMEM;
2646
dfc5606d 2647 snprintf(name, count, "%s", buf);
602adf40
YS
2648
2649 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2650
602adf40
YS
2651 ret = rbd_header_add_snap(rbd_dev,
2652 name, GFP_KERNEL);
2653 if (ret < 0)
59c2be1e 2654 goto err_unlock;
602adf40 2655
263c6ca0 2656 ret = __rbd_refresh_header(rbd_dev);
602adf40 2657 if (ret < 0)
59c2be1e
YS
2658 goto err_unlock;
2659
2660 /* shouldn't hold ctl_mutex when notifying.. notify might
2661 trigger a watch callback that would need to get that mutex */
2662 mutex_unlock(&ctl_mutex);
2663
2664 /* make a best effort, don't error if failed */
0bed54dc 2665 rbd_req_sync_notify(rbd_dev, rbd_dev->header_name);
602adf40
YS
2666
2667 ret = count;
59c2be1e
YS
2668 kfree(name);
2669 return ret;
2670
2671err_unlock:
602adf40 2672 mutex_unlock(&ctl_mutex);
602adf40
YS
2673 kfree(name);
2674 return ret;
2675}
2676
602adf40
YS
2677/*
2678 * create control files in sysfs
dfc5606d 2679 * /sys/bus/rbd/...
602adf40
YS
2680 */
2681static int rbd_sysfs_init(void)
2682{
dfc5606d 2683 int ret;
602adf40 2684
fed4c143 2685 ret = device_register(&rbd_root_dev);
21079786 2686 if (ret < 0)
dfc5606d 2687 return ret;
602adf40 2688
fed4c143
AE
2689 ret = bus_register(&rbd_bus_type);
2690 if (ret < 0)
2691 device_unregister(&rbd_root_dev);
602adf40 2692
602adf40
YS
2693 return ret;
2694}
2695
2696static void rbd_sysfs_cleanup(void)
2697{
dfc5606d 2698 bus_unregister(&rbd_bus_type);
fed4c143 2699 device_unregister(&rbd_root_dev);
602adf40
YS
2700}
2701
2702int __init rbd_init(void)
2703{
2704 int rc;
2705
2706 rc = rbd_sysfs_init();
2707 if (rc)
2708 return rc;
f0f8cef5 2709 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
602adf40
YS
2710 return 0;
2711}
2712
2713void __exit rbd_exit(void)
2714{
2715 rbd_sysfs_cleanup();
2716}
2717
2718module_init(rbd_init);
2719module_exit(rbd_exit);
2720
2721MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2722MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2723MODULE_DESCRIPTION("rados block device");
2724
2725/* following authorship retained from original osdblk.c */
2726MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2727
2728MODULE_LICENSE("GPL");
This page took 0.267932 seconds and 5 git commands to generate.