rbd: have snap_by_name() return a snapshot
[deliverable/linux.git] / drivers / block / rbd.c
CommitLineData
602adf40
YS
1/*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
dfc5606d 24 For usage instructions, please refer to:
602adf40 25
dfc5606d 26 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
59c2be1e 34#include <linux/parser.h>
602adf40
YS
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
aafb230e
AE
44#define RBD_DEBUG /* Activate rbd_assert() calls */
45
593a9e7b
AE
46/*
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
51 */
52#define SECTOR_SHIFT 9
53#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
54
f0f8cef5
AE
55#define RBD_DRV_NAME "rbd"
56#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
57
58#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
59
d4b125e9
AE
60#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
61#define RBD_MAX_SNAP_NAME_LEN \
62 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
35d489f9 64#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
65
66#define RBD_SNAP_HEAD_NAME "-"
67
9e15b77d
AE
68/* This allows a single page to hold an image name sent by OSD */
69#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 70#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 71
1e130199 72#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 73
d889140c
AE
74/* Feature bits */
75
5cbf6f12
AE
76#define RBD_FEATURE_LAYERING (1<<0)
77#define RBD_FEATURE_STRIPINGV2 (1<<1)
78#define RBD_FEATURES_ALL \
79 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
d889140c
AE
80
81/* Features supported by this (client software) implementation. */
82
770eba6e 83#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
d889140c 84
81a89793
AE
85/*
86 * An RBD device name will be "rbd#", where the "rbd" comes from
87 * RBD_DRV_NAME above, and # is a unique integer identifier.
88 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89 * enough to hold all possible device names.
90 */
602adf40 91#define DEV_NAME_LEN 32
81a89793 92#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40
YS
93
94/*
95 * block device image metadata (in-memory version)
96 */
97struct rbd_image_header {
f84344f3 98 /* These four fields never change for a given rbd image */
849b4260 99 char *object_prefix;
34b13184 100 u64 features;
602adf40
YS
101 __u8 obj_order;
102 __u8 crypt_type;
103 __u8 comp_type;
602adf40 104
f84344f3
AE
105 /* The remaining fields need to be updated occasionally */
106 u64 image_size;
107 struct ceph_snap_context *snapc;
602adf40
YS
108 char *snap_names;
109 u64 *snap_sizes;
59c2be1e
YS
110
111 u64 obj_version;
112};
113
0d7dbfce
AE
114/*
115 * An rbd image specification.
116 *
117 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
118 * identify an image. Each rbd_dev structure includes a pointer to
119 * an rbd_spec structure that encapsulates this identity.
120 *
121 * Each of the id's in an rbd_spec has an associated name. For a
122 * user-mapped image, the names are supplied and the id's associated
123 * with them are looked up. For a layered image, a parent image is
124 * defined by the tuple, and the names are looked up.
125 *
126 * An rbd_dev structure contains a parent_spec pointer which is
127 * non-null if the image it represents is a child in a layered
128 * image. This pointer will refer to the rbd_spec structure used
129 * by the parent rbd_dev for its own identity (i.e., the structure
130 * is shared between the parent and child).
131 *
132 * Since these structures are populated once, during the discovery
133 * phase of image construction, they are effectively immutable so
134 * we make no effort to synchronize access to them.
135 *
136 * Note that code herein does not assume the image name is known (it
137 * could be a null pointer).
0d7dbfce
AE
138 */
139struct rbd_spec {
140 u64 pool_id;
141 char *pool_name;
142
143 char *image_id;
0d7dbfce 144 char *image_name;
0d7dbfce
AE
145
146 u64 snap_id;
147 char *snap_name;
148
149 struct kref kref;
150};
151
602adf40 152/*
f0f8cef5 153 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
154 */
155struct rbd_client {
156 struct ceph_client *client;
157 struct kref kref;
158 struct list_head node;
159};
160
bf0d5f50
AE
161struct rbd_img_request;
162typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
163
164#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
165
166struct rbd_obj_request;
167typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
168
9969ebc5
AE
169enum obj_request_type {
170 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
171};
bf0d5f50 172
926f9b3f
AE
173enum obj_req_flags {
174 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
6365d33a 175 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
5679c59f
AE
176 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
177 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
926f9b3f
AE
178};
179
bf0d5f50
AE
180struct rbd_obj_request {
181 const char *object_name;
182 u64 offset; /* object start byte */
183 u64 length; /* bytes from offset */
926f9b3f 184 unsigned long flags;
bf0d5f50 185
c5b5ef6c
AE
186 /*
187 * An object request associated with an image will have its
188 * img_data flag set; a standalone object request will not.
189 *
190 * A standalone object request will have which == BAD_WHICH
191 * and a null obj_request pointer.
192 *
193 * An object request initiated in support of a layered image
194 * object (to check for its existence before a write) will
195 * have which == BAD_WHICH and a non-null obj_request pointer.
196 *
197 * Finally, an object request for rbd image data will have
198 * which != BAD_WHICH, and will have a non-null img_request
199 * pointer. The value of which will be in the range
200 * 0..(img_request->obj_request_count-1).
201 */
202 union {
203 struct rbd_obj_request *obj_request; /* STAT op */
204 struct {
205 struct rbd_img_request *img_request;
206 u64 img_offset;
207 /* links for img_request->obj_requests list */
208 struct list_head links;
209 };
210 };
bf0d5f50
AE
211 u32 which; /* posn image request list */
212
213 enum obj_request_type type;
788e2df3
AE
214 union {
215 struct bio *bio_list;
216 struct {
217 struct page **pages;
218 u32 page_count;
219 };
220 };
0eefd470 221 struct page **copyup_pages;
bf0d5f50
AE
222
223 struct ceph_osd_request *osd_req;
224
225 u64 xferred; /* bytes transferred */
226 u64 version;
1b83bef2 227 int result;
bf0d5f50
AE
228
229 rbd_obj_callback_t callback;
788e2df3 230 struct completion completion;
bf0d5f50
AE
231
232 struct kref kref;
233};
234
0c425248 235enum img_req_flags {
9849e986
AE
236 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
237 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
d0b2e944 238 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
0c425248
AE
239};
240
bf0d5f50 241struct rbd_img_request {
bf0d5f50
AE
242 struct rbd_device *rbd_dev;
243 u64 offset; /* starting image byte offset */
244 u64 length; /* byte count from offset */
0c425248 245 unsigned long flags;
bf0d5f50 246 union {
9849e986 247 u64 snap_id; /* for reads */
bf0d5f50 248 struct ceph_snap_context *snapc; /* for writes */
9849e986
AE
249 };
250 union {
251 struct request *rq; /* block request */
252 struct rbd_obj_request *obj_request; /* obj req initiator */
bf0d5f50 253 };
3d7efd18 254 struct page **copyup_pages;
bf0d5f50
AE
255 spinlock_t completion_lock;/* protects next_completion */
256 u32 next_completion;
257 rbd_img_callback_t callback;
55f27e09 258 u64 xferred;/* aggregate bytes transferred */
a5a337d4 259 int result; /* first nonzero obj_request result */
bf0d5f50
AE
260
261 u32 obj_request_count;
262 struct list_head obj_requests; /* rbd_obj_request structs */
263
264 struct kref kref;
265};
266
267#define for_each_obj_request(ireq, oreq) \
ef06f4d3 268 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
bf0d5f50 269#define for_each_obj_request_from(ireq, oreq) \
ef06f4d3 270 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
bf0d5f50 271#define for_each_obj_request_safe(ireq, oreq, n) \
ef06f4d3 272 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
bf0d5f50 273
dfc5606d 274struct rbd_snap {
dfc5606d 275 const char *name;
3591538f 276 u64 size;
dfc5606d
YS
277 struct list_head node;
278 u64 id;
34b13184 279 u64 features;
dfc5606d
YS
280};
281
f84344f3 282struct rbd_mapping {
99c1f08f 283 u64 size;
34b13184 284 u64 features;
f84344f3
AE
285 bool read_only;
286};
287
602adf40
YS
288/*
289 * a single device
290 */
291struct rbd_device {
de71a297 292 int dev_id; /* blkdev unique id */
602adf40
YS
293
294 int major; /* blkdev assigned major */
295 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 296
a30b71b9 297 u32 image_format; /* Either 1 or 2 */
602adf40
YS
298 struct rbd_client *rbd_client;
299
300 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
301
b82d167b 302 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
303
304 struct rbd_image_header header;
b82d167b 305 unsigned long flags; /* possibly lock protected */
0d7dbfce 306 struct rbd_spec *spec;
602adf40 307
0d7dbfce 308 char *header_name;
971f839a 309
0903e875
AE
310 struct ceph_file_layout layout;
311
59c2be1e 312 struct ceph_osd_event *watch_event;
975241af 313 struct rbd_obj_request *watch_request;
59c2be1e 314
86b00e0d
AE
315 struct rbd_spec *parent_spec;
316 u64 parent_overlap;
2f82ee54 317 struct rbd_device *parent;
86b00e0d 318
cc070d59
AE
319 u64 stripe_unit;
320 u64 stripe_count;
321
c666601a
JD
322 /* protects updating the header */
323 struct rw_semaphore header_rwsem;
f84344f3
AE
324
325 struct rbd_mapping mapping;
602adf40
YS
326
327 struct list_head node;
dfc5606d
YS
328
329 /* list of snapshots */
330 struct list_head snaps;
331
332 /* sysfs related */
333 struct device dev;
b82d167b 334 unsigned long open_count; /* protected by lock */
dfc5606d
YS
335};
336
b82d167b
AE
337/*
338 * Flag bits for rbd_dev->flags. If atomicity is required,
339 * rbd_dev->lock is used to protect access.
340 *
341 * Currently, only the "removing" flag (which is coupled with the
342 * "open_count" field) requires atomic access.
343 */
6d292906
AE
344enum rbd_dev_flags {
345 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 346 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
6d292906
AE
347};
348
602adf40 349static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 350
602adf40 351static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
352static DEFINE_SPINLOCK(rbd_dev_list_lock);
353
432b8587
AE
354static LIST_HEAD(rbd_client_list); /* clients */
355static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 356
3d7efd18
AE
357static int rbd_img_request_submit(struct rbd_img_request *img_request);
358
304f6808 359static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
304f6808 360
dfc5606d 361static void rbd_dev_release(struct device *dev);
6087b51b 362static void rbd_snap_destroy(struct rbd_snap *snap);
dfc5606d 363
f0f8cef5
AE
364static ssize_t rbd_add(struct bus_type *bus, const char *buf,
365 size_t count);
366static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
367 size_t count);
2f82ee54 368static int rbd_dev_probe(struct rbd_device *rbd_dev);
f0f8cef5
AE
369
370static struct bus_attribute rbd_bus_attrs[] = {
371 __ATTR(add, S_IWUSR, NULL, rbd_add),
372 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
373 __ATTR_NULL
374};
375
376static struct bus_type rbd_bus_type = {
377 .name = "rbd",
378 .bus_attrs = rbd_bus_attrs,
379};
380
381static void rbd_root_dev_release(struct device *dev)
382{
383}
384
385static struct device rbd_root_dev = {
386 .init_name = "rbd",
387 .release = rbd_root_dev_release,
388};
389
06ecc6cb
AE
390static __printf(2, 3)
391void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
392{
393 struct va_format vaf;
394 va_list args;
395
396 va_start(args, fmt);
397 vaf.fmt = fmt;
398 vaf.va = &args;
399
400 if (!rbd_dev)
401 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
402 else if (rbd_dev->disk)
403 printk(KERN_WARNING "%s: %s: %pV\n",
404 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
405 else if (rbd_dev->spec && rbd_dev->spec->image_name)
406 printk(KERN_WARNING "%s: image %s: %pV\n",
407 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
408 else if (rbd_dev->spec && rbd_dev->spec->image_id)
409 printk(KERN_WARNING "%s: id %s: %pV\n",
410 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
411 else /* punt */
412 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
413 RBD_DRV_NAME, rbd_dev, &vaf);
414 va_end(args);
415}
416
aafb230e
AE
417#ifdef RBD_DEBUG
418#define rbd_assert(expr) \
419 if (unlikely(!(expr))) { \
420 printk(KERN_ERR "\nAssertion failure in %s() " \
421 "at line %d:\n\n" \
422 "\trbd_assert(%s);\n\n", \
423 __func__, __LINE__, #expr); \
424 BUG(); \
425 }
426#else /* !RBD_DEBUG */
427# define rbd_assert(expr) ((void) 0)
428#endif /* !RBD_DEBUG */
dfc5606d 429
8b3e1a56 430static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
b454e36d 431static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
8b3e1a56 432
117973fb
AE
433static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
434static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
59c2be1e 435
602adf40
YS
436static int rbd_open(struct block_device *bdev, fmode_t mode)
437{
f0f8cef5 438 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 439 bool removing = false;
602adf40 440
f84344f3 441 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
602adf40
YS
442 return -EROFS;
443
a14ea269 444 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
445 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
446 removing = true;
447 else
448 rbd_dev->open_count++;
a14ea269 449 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
450 if (removing)
451 return -ENOENT;
452
42382b70 453 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 454 (void) get_device(&rbd_dev->dev);
f84344f3 455 set_device_ro(bdev, rbd_dev->mapping.read_only);
42382b70 456 mutex_unlock(&ctl_mutex);
340c7a2b 457
602adf40
YS
458 return 0;
459}
460
dfc5606d
YS
461static int rbd_release(struct gendisk *disk, fmode_t mode)
462{
463 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
464 unsigned long open_count_before;
465
a14ea269 466 spin_lock_irq(&rbd_dev->lock);
b82d167b 467 open_count_before = rbd_dev->open_count--;
a14ea269 468 spin_unlock_irq(&rbd_dev->lock);
b82d167b 469 rbd_assert(open_count_before > 0);
dfc5606d 470
42382b70 471 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 472 put_device(&rbd_dev->dev);
42382b70 473 mutex_unlock(&ctl_mutex);
dfc5606d
YS
474
475 return 0;
476}
477
602adf40
YS
478static const struct block_device_operations rbd_bd_ops = {
479 .owner = THIS_MODULE,
480 .open = rbd_open,
dfc5606d 481 .release = rbd_release,
602adf40
YS
482};
483
484/*
485 * Initialize an rbd client instance.
43ae4701 486 * We own *ceph_opts.
602adf40 487 */
f8c38929 488static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
489{
490 struct rbd_client *rbdc;
491 int ret = -ENOMEM;
492
37206ee5 493 dout("%s:\n", __func__);
602adf40
YS
494 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
495 if (!rbdc)
496 goto out_opt;
497
498 kref_init(&rbdc->kref);
499 INIT_LIST_HEAD(&rbdc->node);
500
bc534d86
AE
501 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
502
43ae4701 503 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 504 if (IS_ERR(rbdc->client))
bc534d86 505 goto out_mutex;
43ae4701 506 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
507
508 ret = ceph_open_session(rbdc->client);
509 if (ret < 0)
510 goto out_err;
511
432b8587 512 spin_lock(&rbd_client_list_lock);
602adf40 513 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 514 spin_unlock(&rbd_client_list_lock);
602adf40 515
bc534d86 516 mutex_unlock(&ctl_mutex);
37206ee5 517 dout("%s: rbdc %p\n", __func__, rbdc);
bc534d86 518
602adf40
YS
519 return rbdc;
520
521out_err:
522 ceph_destroy_client(rbdc->client);
bc534d86
AE
523out_mutex:
524 mutex_unlock(&ctl_mutex);
602adf40
YS
525 kfree(rbdc);
526out_opt:
43ae4701
AE
527 if (ceph_opts)
528 ceph_destroy_options(ceph_opts);
37206ee5
AE
529 dout("%s: error %d\n", __func__, ret);
530
28f259b7 531 return ERR_PTR(ret);
602adf40
YS
532}
533
2f82ee54
AE
534static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
535{
536 kref_get(&rbdc->kref);
537
538 return rbdc;
539}
540
602adf40 541/*
1f7ba331
AE
542 * Find a ceph client with specific addr and configuration. If
543 * found, bump its reference count.
602adf40 544 */
1f7ba331 545static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
546{
547 struct rbd_client *client_node;
1f7ba331 548 bool found = false;
602adf40 549
43ae4701 550 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
551 return NULL;
552
1f7ba331
AE
553 spin_lock(&rbd_client_list_lock);
554 list_for_each_entry(client_node, &rbd_client_list, node) {
555 if (!ceph_compare_options(ceph_opts, client_node->client)) {
2f82ee54
AE
556 __rbd_get_client(client_node);
557
1f7ba331
AE
558 found = true;
559 break;
560 }
561 }
562 spin_unlock(&rbd_client_list_lock);
563
564 return found ? client_node : NULL;
602adf40
YS
565}
566
59c2be1e
YS
567/*
568 * mount options
569 */
570enum {
59c2be1e
YS
571 Opt_last_int,
572 /* int args above */
573 Opt_last_string,
574 /* string args above */
cc0538b6
AE
575 Opt_read_only,
576 Opt_read_write,
577 /* Boolean args above */
578 Opt_last_bool,
59c2be1e
YS
579};
580
43ae4701 581static match_table_t rbd_opts_tokens = {
59c2be1e
YS
582 /* int args above */
583 /* string args above */
be466c1c 584 {Opt_read_only, "read_only"},
cc0538b6
AE
585 {Opt_read_only, "ro"}, /* Alternate spelling */
586 {Opt_read_write, "read_write"},
587 {Opt_read_write, "rw"}, /* Alternate spelling */
588 /* Boolean args above */
59c2be1e
YS
589 {-1, NULL}
590};
591
98571b5a
AE
592struct rbd_options {
593 bool read_only;
594};
595
596#define RBD_READ_ONLY_DEFAULT false
597
59c2be1e
YS
598static int parse_rbd_opts_token(char *c, void *private)
599{
43ae4701 600 struct rbd_options *rbd_opts = private;
59c2be1e
YS
601 substring_t argstr[MAX_OPT_ARGS];
602 int token, intval, ret;
603
43ae4701 604 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
605 if (token < 0)
606 return -EINVAL;
607
608 if (token < Opt_last_int) {
609 ret = match_int(&argstr[0], &intval);
610 if (ret < 0) {
611 pr_err("bad mount option arg (not int) "
612 "at '%s'\n", c);
613 return ret;
614 }
615 dout("got int token %d val %d\n", token, intval);
616 } else if (token > Opt_last_int && token < Opt_last_string) {
617 dout("got string token %d val %s\n", token,
618 argstr[0].from);
cc0538b6
AE
619 } else if (token > Opt_last_string && token < Opt_last_bool) {
620 dout("got Boolean token %d\n", token);
59c2be1e
YS
621 } else {
622 dout("got token %d\n", token);
623 }
624
625 switch (token) {
cc0538b6
AE
626 case Opt_read_only:
627 rbd_opts->read_only = true;
628 break;
629 case Opt_read_write:
630 rbd_opts->read_only = false;
631 break;
59c2be1e 632 default:
aafb230e
AE
633 rbd_assert(false);
634 break;
59c2be1e
YS
635 }
636 return 0;
637}
638
602adf40
YS
639/*
640 * Get a ceph client with specific addr and configuration, if one does
641 * not exist create it.
642 */
9d3997fd 643static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
602adf40 644{
f8c38929 645 struct rbd_client *rbdc;
59c2be1e 646
1f7ba331 647 rbdc = rbd_client_find(ceph_opts);
9d3997fd 648 if (rbdc) /* using an existing client */
43ae4701 649 ceph_destroy_options(ceph_opts);
9d3997fd 650 else
f8c38929 651 rbdc = rbd_client_create(ceph_opts);
602adf40 652
9d3997fd 653 return rbdc;
602adf40
YS
654}
655
656/*
657 * Destroy ceph client
d23a4b3f 658 *
432b8587 659 * Caller must hold rbd_client_list_lock.
602adf40
YS
660 */
661static void rbd_client_release(struct kref *kref)
662{
663 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
664
37206ee5 665 dout("%s: rbdc %p\n", __func__, rbdc);
cd9d9f5d 666 spin_lock(&rbd_client_list_lock);
602adf40 667 list_del(&rbdc->node);
cd9d9f5d 668 spin_unlock(&rbd_client_list_lock);
602adf40
YS
669
670 ceph_destroy_client(rbdc->client);
671 kfree(rbdc);
672}
673
674/*
675 * Drop reference to ceph client node. If it's not referenced anymore, release
676 * it.
677 */
9d3997fd 678static void rbd_put_client(struct rbd_client *rbdc)
602adf40 679{
c53d5893
AE
680 if (rbdc)
681 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
682}
683
a30b71b9
AE
684static bool rbd_image_format_valid(u32 image_format)
685{
686 return image_format == 1 || image_format == 2;
687}
688
8e94af8e
AE
689static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
690{
103a150f
AE
691 size_t size;
692 u32 snap_count;
693
694 /* The header has to start with the magic rbd header text */
695 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
696 return false;
697
db2388b6
AE
698 /* The bio layer requires at least sector-sized I/O */
699
700 if (ondisk->options.order < SECTOR_SHIFT)
701 return false;
702
703 /* If we use u64 in a few spots we may be able to loosen this */
704
705 if (ondisk->options.order > 8 * sizeof (int) - 1)
706 return false;
707
103a150f
AE
708 /*
709 * The size of a snapshot header has to fit in a size_t, and
710 * that limits the number of snapshots.
711 */
712 snap_count = le32_to_cpu(ondisk->snap_count);
713 size = SIZE_MAX - sizeof (struct ceph_snap_context);
714 if (snap_count > size / sizeof (__le64))
715 return false;
716
717 /*
718 * Not only that, but the size of the entire the snapshot
719 * header must also be representable in a size_t.
720 */
721 size -= snap_count * sizeof (__le64);
722 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
723 return false;
724
725 return true;
8e94af8e
AE
726}
727
602adf40
YS
728/*
729 * Create a new header structure, translate header format from the on-disk
730 * header.
731 */
732static int rbd_header_from_disk(struct rbd_image_header *header,
4156d998 733 struct rbd_image_header_ondisk *ondisk)
602adf40 734{
ccece235 735 u32 snap_count;
58c17b0e 736 size_t len;
d2bb24e5 737 size_t size;
621901d6 738 u32 i;
602adf40 739
6a52325f
AE
740 memset(header, 0, sizeof (*header));
741
103a150f
AE
742 snap_count = le32_to_cpu(ondisk->snap_count);
743
58c17b0e
AE
744 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
745 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
6a52325f 746 if (!header->object_prefix)
602adf40 747 return -ENOMEM;
58c17b0e
AE
748 memcpy(header->object_prefix, ondisk->object_prefix, len);
749 header->object_prefix[len] = '\0';
00f1f36f 750
602adf40 751 if (snap_count) {
f785cc1d
AE
752 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
753
621901d6
AE
754 /* Save a copy of the snapshot names */
755
f785cc1d
AE
756 if (snap_names_len > (u64) SIZE_MAX)
757 return -EIO;
758 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
602adf40 759 if (!header->snap_names)
6a52325f 760 goto out_err;
f785cc1d
AE
761 /*
762 * Note that rbd_dev_v1_header_read() guarantees
763 * the ondisk buffer we're working with has
764 * snap_names_len bytes beyond the end of the
765 * snapshot id array, this memcpy() is safe.
766 */
767 memcpy(header->snap_names, &ondisk->snaps[snap_count],
768 snap_names_len);
6a52325f 769
621901d6
AE
770 /* Record each snapshot's size */
771
d2bb24e5
AE
772 size = snap_count * sizeof (*header->snap_sizes);
773 header->snap_sizes = kmalloc(size, GFP_KERNEL);
602adf40 774 if (!header->snap_sizes)
6a52325f 775 goto out_err;
621901d6
AE
776 for (i = 0; i < snap_count; i++)
777 header->snap_sizes[i] =
778 le64_to_cpu(ondisk->snaps[i].image_size);
602adf40 779 } else {
ccece235 780 WARN_ON(ondisk->snap_names_len);
602adf40
YS
781 header->snap_names = NULL;
782 header->snap_sizes = NULL;
783 }
849b4260 784
34b13184 785 header->features = 0; /* No features support in v1 images */
602adf40
YS
786 header->obj_order = ondisk->options.order;
787 header->crypt_type = ondisk->options.crypt_type;
788 header->comp_type = ondisk->options.comp_type;
6a52325f 789
621901d6
AE
790 /* Allocate and fill in the snapshot context */
791
f84344f3 792 header->image_size = le64_to_cpu(ondisk->image_size);
6a52325f
AE
793 size = sizeof (struct ceph_snap_context);
794 size += snap_count * sizeof (header->snapc->snaps[0]);
795 header->snapc = kzalloc(size, GFP_KERNEL);
796 if (!header->snapc)
797 goto out_err;
602adf40
YS
798
799 atomic_set(&header->snapc->nref, 1);
505cbb9b 800 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 801 header->snapc->num_snaps = snap_count;
621901d6
AE
802 for (i = 0; i < snap_count; i++)
803 header->snapc->snaps[i] =
804 le64_to_cpu(ondisk->snaps[i].id);
602adf40
YS
805
806 return 0;
807
6a52325f 808out_err:
849b4260 809 kfree(header->snap_sizes);
ccece235 810 header->snap_sizes = NULL;
602adf40 811 kfree(header->snap_names);
ccece235 812 header->snap_names = NULL;
6a52325f
AE
813 kfree(header->object_prefix);
814 header->object_prefix = NULL;
ccece235 815
00f1f36f 816 return -ENOMEM;
602adf40
YS
817}
818
9e15b77d
AE
819static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
820{
821 struct rbd_snap *snap;
822
823 if (snap_id == CEPH_NOSNAP)
824 return RBD_SNAP_HEAD_NAME;
825
826 list_for_each_entry(snap, &rbd_dev->snaps, node)
827 if (snap_id == snap->id)
828 return snap->name;
829
830 return NULL;
831}
832
8b0241f8
AE
833static struct rbd_snap *snap_by_name(struct rbd_device *rbd_dev,
834 const char *snap_name)
602adf40 835{
e86924a8 836 struct rbd_snap *snap;
602adf40 837
8b0241f8
AE
838 list_for_each_entry(snap, &rbd_dev->snaps, node)
839 if (!strcmp(snap_name, snap->name))
840 return snap;
e86924a8 841
8b0241f8 842 return NULL;
602adf40
YS
843}
844
819d52bf 845static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
602adf40 846{
0d7dbfce 847 if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
cc9d734c 848 sizeof (RBD_SNAP_HEAD_NAME))) {
0d7dbfce 849 rbd_dev->spec->snap_id = CEPH_NOSNAP;
99c1f08f 850 rbd_dev->mapping.size = rbd_dev->header.image_size;
34b13184 851 rbd_dev->mapping.features = rbd_dev->header.features;
602adf40 852 } else {
8b0241f8
AE
853 struct rbd_snap *snap;
854
855 snap = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
856 if (!snap)
857 return -ENOENT;
858 rbd_dev->spec->snap_id = snap->id;
859 rbd_dev->mapping.size = snap->size;
860 rbd_dev->mapping.features = snap->features;
f84344f3 861 rbd_dev->mapping.read_only = true;
602adf40 862 }
6d292906
AE
863 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
864
8b0241f8 865 return 0;
602adf40
YS
866}
867
868static void rbd_header_free(struct rbd_image_header *header)
869{
849b4260 870 kfree(header->object_prefix);
d78fd7ae 871 header->object_prefix = NULL;
602adf40 872 kfree(header->snap_sizes);
d78fd7ae 873 header->snap_sizes = NULL;
849b4260 874 kfree(header->snap_names);
d78fd7ae 875 header->snap_names = NULL;
d1d25646 876 ceph_put_snap_context(header->snapc);
d78fd7ae 877 header->snapc = NULL;
602adf40
YS
878}
879
98571b5a 880static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
602adf40 881{
65ccfe21
AE
882 char *name;
883 u64 segment;
884 int ret;
602adf40 885
2fd82b9e 886 name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
65ccfe21
AE
887 if (!name)
888 return NULL;
889 segment = offset >> rbd_dev->header.obj_order;
2fd82b9e 890 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
65ccfe21 891 rbd_dev->header.object_prefix, segment);
2fd82b9e 892 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
65ccfe21
AE
893 pr_err("error formatting segment name for #%llu (%d)\n",
894 segment, ret);
895 kfree(name);
896 name = NULL;
897 }
602adf40 898
65ccfe21
AE
899 return name;
900}
602adf40 901
65ccfe21
AE
902static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
903{
904 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
602adf40 905
65ccfe21
AE
906 return offset & (segment_size - 1);
907}
908
909static u64 rbd_segment_length(struct rbd_device *rbd_dev,
910 u64 offset, u64 length)
911{
912 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
913
914 offset &= segment_size - 1;
915
aafb230e 916 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
917 if (offset + length > segment_size)
918 length = segment_size - offset;
919
920 return length;
602adf40
YS
921}
922
029bcbd8
JD
923/*
924 * returns the size of an object in the image
925 */
926static u64 rbd_obj_bytes(struct rbd_image_header *header)
927{
928 return 1 << header->obj_order;
929}
930
602adf40
YS
931/*
932 * bio helpers
933 */
934
935static void bio_chain_put(struct bio *chain)
936{
937 struct bio *tmp;
938
939 while (chain) {
940 tmp = chain;
941 chain = chain->bi_next;
942 bio_put(tmp);
943 }
944}
945
946/*
947 * zeros a bio chain, starting at specific offset
948 */
949static void zero_bio_chain(struct bio *chain, int start_ofs)
950{
951 struct bio_vec *bv;
952 unsigned long flags;
953 void *buf;
954 int i;
955 int pos = 0;
956
957 while (chain) {
958 bio_for_each_segment(bv, chain, i) {
959 if (pos + bv->bv_len > start_ofs) {
960 int remainder = max(start_ofs - pos, 0);
961 buf = bvec_kmap_irq(bv, &flags);
962 memset(buf + remainder, 0,
963 bv->bv_len - remainder);
85b5aaa6 964 bvec_kunmap_irq(buf, &flags);
602adf40
YS
965 }
966 pos += bv->bv_len;
967 }
968
969 chain = chain->bi_next;
970 }
971}
972
b9434c5b
AE
973/*
974 * similar to zero_bio_chain(), zeros data defined by a page array,
975 * starting at the given byte offset from the start of the array and
976 * continuing up to the given end offset. The pages array is
977 * assumed to be big enough to hold all bytes up to the end.
978 */
979static void zero_pages(struct page **pages, u64 offset, u64 end)
980{
981 struct page **page = &pages[offset >> PAGE_SHIFT];
982
983 rbd_assert(end > offset);
984 rbd_assert(end - offset <= (u64)SIZE_MAX);
985 while (offset < end) {
986 size_t page_offset;
987 size_t length;
988 unsigned long flags;
989 void *kaddr;
990
991 page_offset = (size_t)(offset & ~PAGE_MASK);
992 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
993 local_irq_save(flags);
994 kaddr = kmap_atomic(*page);
995 memset(kaddr + page_offset, 0, length);
996 kunmap_atomic(kaddr);
997 local_irq_restore(flags);
998
999 offset += length;
1000 page++;
1001 }
1002}
1003
602adf40 1004/*
f7760dad
AE
1005 * Clone a portion of a bio, starting at the given byte offset
1006 * and continuing for the number of bytes indicated.
602adf40 1007 */
f7760dad
AE
1008static struct bio *bio_clone_range(struct bio *bio_src,
1009 unsigned int offset,
1010 unsigned int len,
1011 gfp_t gfpmask)
602adf40 1012{
f7760dad
AE
1013 struct bio_vec *bv;
1014 unsigned int resid;
1015 unsigned short idx;
1016 unsigned int voff;
1017 unsigned short end_idx;
1018 unsigned short vcnt;
1019 struct bio *bio;
1020
1021 /* Handle the easy case for the caller */
1022
1023 if (!offset && len == bio_src->bi_size)
1024 return bio_clone(bio_src, gfpmask);
1025
1026 if (WARN_ON_ONCE(!len))
1027 return NULL;
1028 if (WARN_ON_ONCE(len > bio_src->bi_size))
1029 return NULL;
1030 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1031 return NULL;
1032
1033 /* Find first affected segment... */
1034
1035 resid = offset;
1036 __bio_for_each_segment(bv, bio_src, idx, 0) {
1037 if (resid < bv->bv_len)
1038 break;
1039 resid -= bv->bv_len;
602adf40 1040 }
f7760dad 1041 voff = resid;
602adf40 1042
f7760dad 1043 /* ...and the last affected segment */
602adf40 1044
f7760dad
AE
1045 resid += len;
1046 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1047 if (resid <= bv->bv_len)
1048 break;
1049 resid -= bv->bv_len;
1050 }
1051 vcnt = end_idx - idx + 1;
1052
1053 /* Build the clone */
1054
1055 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1056 if (!bio)
1057 return NULL; /* ENOMEM */
602adf40 1058
f7760dad
AE
1059 bio->bi_bdev = bio_src->bi_bdev;
1060 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1061 bio->bi_rw = bio_src->bi_rw;
1062 bio->bi_flags |= 1 << BIO_CLONED;
1063
1064 /*
1065 * Copy over our part of the bio_vec, then update the first
1066 * and last (or only) entries.
1067 */
1068 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1069 vcnt * sizeof (struct bio_vec));
1070 bio->bi_io_vec[0].bv_offset += voff;
1071 if (vcnt > 1) {
1072 bio->bi_io_vec[0].bv_len -= voff;
1073 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1074 } else {
1075 bio->bi_io_vec[0].bv_len = len;
602adf40
YS
1076 }
1077
f7760dad
AE
1078 bio->bi_vcnt = vcnt;
1079 bio->bi_size = len;
1080 bio->bi_idx = 0;
1081
1082 return bio;
1083}
1084
1085/*
1086 * Clone a portion of a bio chain, starting at the given byte offset
1087 * into the first bio in the source chain and continuing for the
1088 * number of bytes indicated. The result is another bio chain of
1089 * exactly the given length, or a null pointer on error.
1090 *
1091 * The bio_src and offset parameters are both in-out. On entry they
1092 * refer to the first source bio and the offset into that bio where
1093 * the start of data to be cloned is located.
1094 *
1095 * On return, bio_src is updated to refer to the bio in the source
1096 * chain that contains first un-cloned byte, and *offset will
1097 * contain the offset of that byte within that bio.
1098 */
1099static struct bio *bio_chain_clone_range(struct bio **bio_src,
1100 unsigned int *offset,
1101 unsigned int len,
1102 gfp_t gfpmask)
1103{
1104 struct bio *bi = *bio_src;
1105 unsigned int off = *offset;
1106 struct bio *chain = NULL;
1107 struct bio **end;
1108
1109 /* Build up a chain of clone bios up to the limit */
1110
1111 if (!bi || off >= bi->bi_size || !len)
1112 return NULL; /* Nothing to clone */
602adf40 1113
f7760dad
AE
1114 end = &chain;
1115 while (len) {
1116 unsigned int bi_size;
1117 struct bio *bio;
1118
f5400b7a
AE
1119 if (!bi) {
1120 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
f7760dad 1121 goto out_err; /* EINVAL; ran out of bio's */
f5400b7a 1122 }
f7760dad
AE
1123 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1124 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1125 if (!bio)
1126 goto out_err; /* ENOMEM */
1127
1128 *end = bio;
1129 end = &bio->bi_next;
602adf40 1130
f7760dad
AE
1131 off += bi_size;
1132 if (off == bi->bi_size) {
1133 bi = bi->bi_next;
1134 off = 0;
1135 }
1136 len -= bi_size;
1137 }
1138 *bio_src = bi;
1139 *offset = off;
1140
1141 return chain;
1142out_err:
1143 bio_chain_put(chain);
602adf40 1144
602adf40
YS
1145 return NULL;
1146}
1147
926f9b3f
AE
1148/*
1149 * The default/initial value for all object request flags is 0. For
1150 * each flag, once its value is set to 1 it is never reset to 0
1151 * again.
1152 */
57acbaa7 1153static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
926f9b3f 1154{
57acbaa7 1155 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
926f9b3f
AE
1156 struct rbd_device *rbd_dev;
1157
57acbaa7
AE
1158 rbd_dev = obj_request->img_request->rbd_dev;
1159 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
926f9b3f
AE
1160 obj_request);
1161 }
1162}
1163
57acbaa7 1164static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
926f9b3f
AE
1165{
1166 smp_mb();
57acbaa7 1167 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
926f9b3f
AE
1168}
1169
57acbaa7 1170static void obj_request_done_set(struct rbd_obj_request *obj_request)
6365d33a 1171{
57acbaa7
AE
1172 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1173 struct rbd_device *rbd_dev = NULL;
6365d33a 1174
57acbaa7
AE
1175 if (obj_request_img_data_test(obj_request))
1176 rbd_dev = obj_request->img_request->rbd_dev;
1177 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
6365d33a
AE
1178 obj_request);
1179 }
1180}
1181
57acbaa7 1182static bool obj_request_done_test(struct rbd_obj_request *obj_request)
6365d33a
AE
1183{
1184 smp_mb();
57acbaa7 1185 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
6365d33a
AE
1186}
1187
5679c59f
AE
1188/*
1189 * This sets the KNOWN flag after (possibly) setting the EXISTS
1190 * flag. The latter is set based on the "exists" value provided.
1191 *
1192 * Note that for our purposes once an object exists it never goes
1193 * away again. It's possible that the response from two existence
1194 * checks are separated by the creation of the target object, and
1195 * the first ("doesn't exist") response arrives *after* the second
1196 * ("does exist"). In that case we ignore the second one.
1197 */
1198static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1199 bool exists)
1200{
1201 if (exists)
1202 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1203 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1204 smp_mb();
1205}
1206
1207static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1208{
1209 smp_mb();
1210 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1211}
1212
1213static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1214{
1215 smp_mb();
1216 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1217}
1218
bf0d5f50
AE
1219static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1220{
37206ee5
AE
1221 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1222 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1223 kref_get(&obj_request->kref);
1224}
1225
1226static void rbd_obj_request_destroy(struct kref *kref);
1227static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1228{
1229 rbd_assert(obj_request != NULL);
37206ee5
AE
1230 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1231 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1232 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1233}
1234
1235static void rbd_img_request_get(struct rbd_img_request *img_request)
1236{
37206ee5
AE
1237 dout("%s: img %p (was %d)\n", __func__, img_request,
1238 atomic_read(&img_request->kref.refcount));
bf0d5f50
AE
1239 kref_get(&img_request->kref);
1240}
1241
1242static void rbd_img_request_destroy(struct kref *kref);
1243static void rbd_img_request_put(struct rbd_img_request *img_request)
1244{
1245 rbd_assert(img_request != NULL);
37206ee5
AE
1246 dout("%s: img %p (was %d)\n", __func__, img_request,
1247 atomic_read(&img_request->kref.refcount));
bf0d5f50
AE
1248 kref_put(&img_request->kref, rbd_img_request_destroy);
1249}
1250
1251static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1252 struct rbd_obj_request *obj_request)
1253{
25dcf954
AE
1254 rbd_assert(obj_request->img_request == NULL);
1255
b155e86c 1256 /* Image request now owns object's original reference */
bf0d5f50 1257 obj_request->img_request = img_request;
25dcf954 1258 obj_request->which = img_request->obj_request_count;
6365d33a
AE
1259 rbd_assert(!obj_request_img_data_test(obj_request));
1260 obj_request_img_data_set(obj_request);
bf0d5f50 1261 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954
AE
1262 img_request->obj_request_count++;
1263 list_add_tail(&obj_request->links, &img_request->obj_requests);
37206ee5
AE
1264 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1265 obj_request->which);
bf0d5f50
AE
1266}
1267
1268static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1269 struct rbd_obj_request *obj_request)
1270{
1271 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954 1272
37206ee5
AE
1273 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1274 obj_request->which);
bf0d5f50 1275 list_del(&obj_request->links);
25dcf954
AE
1276 rbd_assert(img_request->obj_request_count > 0);
1277 img_request->obj_request_count--;
1278 rbd_assert(obj_request->which == img_request->obj_request_count);
1279 obj_request->which = BAD_WHICH;
6365d33a 1280 rbd_assert(obj_request_img_data_test(obj_request));
bf0d5f50 1281 rbd_assert(obj_request->img_request == img_request);
bf0d5f50 1282 obj_request->img_request = NULL;
25dcf954 1283 obj_request->callback = NULL;
bf0d5f50
AE
1284 rbd_obj_request_put(obj_request);
1285}
1286
1287static bool obj_request_type_valid(enum obj_request_type type)
1288{
1289 switch (type) {
9969ebc5 1290 case OBJ_REQUEST_NODATA:
bf0d5f50 1291 case OBJ_REQUEST_BIO:
788e2df3 1292 case OBJ_REQUEST_PAGES:
bf0d5f50
AE
1293 return true;
1294 default:
1295 return false;
1296 }
1297}
1298
bf0d5f50
AE
1299static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1300 struct rbd_obj_request *obj_request)
1301{
37206ee5
AE
1302 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1303
bf0d5f50
AE
1304 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1305}
1306
1307static void rbd_img_request_complete(struct rbd_img_request *img_request)
1308{
55f27e09 1309
37206ee5 1310 dout("%s: img %p\n", __func__, img_request);
55f27e09
AE
1311
1312 /*
1313 * If no error occurred, compute the aggregate transfer
1314 * count for the image request. We could instead use
1315 * atomic64_cmpxchg() to update it as each object request
1316 * completes; not clear which way is better off hand.
1317 */
1318 if (!img_request->result) {
1319 struct rbd_obj_request *obj_request;
1320 u64 xferred = 0;
1321
1322 for_each_obj_request(img_request, obj_request)
1323 xferred += obj_request->xferred;
1324 img_request->xferred = xferred;
1325 }
1326
bf0d5f50
AE
1327 if (img_request->callback)
1328 img_request->callback(img_request);
1329 else
1330 rbd_img_request_put(img_request);
1331}
1332
788e2df3
AE
1333/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1334
1335static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1336{
37206ee5
AE
1337 dout("%s: obj %p\n", __func__, obj_request);
1338
788e2df3
AE
1339 return wait_for_completion_interruptible(&obj_request->completion);
1340}
1341
0c425248
AE
1342/*
1343 * The default/initial value for all image request flags is 0. Each
1344 * is conditionally set to 1 at image request initialization time
1345 * and currently never change thereafter.
1346 */
1347static void img_request_write_set(struct rbd_img_request *img_request)
1348{
1349 set_bit(IMG_REQ_WRITE, &img_request->flags);
1350 smp_mb();
1351}
1352
1353static bool img_request_write_test(struct rbd_img_request *img_request)
1354{
1355 smp_mb();
1356 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1357}
1358
9849e986
AE
1359static void img_request_child_set(struct rbd_img_request *img_request)
1360{
1361 set_bit(IMG_REQ_CHILD, &img_request->flags);
1362 smp_mb();
1363}
1364
1365static bool img_request_child_test(struct rbd_img_request *img_request)
1366{
1367 smp_mb();
1368 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1369}
1370
d0b2e944
AE
1371static void img_request_layered_set(struct rbd_img_request *img_request)
1372{
1373 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1374 smp_mb();
1375}
1376
1377static bool img_request_layered_test(struct rbd_img_request *img_request)
1378{
1379 smp_mb();
1380 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1381}
1382
6e2a4505
AE
1383static void
1384rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1385{
b9434c5b
AE
1386 u64 xferred = obj_request->xferred;
1387 u64 length = obj_request->length;
1388
6e2a4505
AE
1389 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1390 obj_request, obj_request->img_request, obj_request->result,
b9434c5b 1391 xferred, length);
6e2a4505
AE
1392 /*
1393 * ENOENT means a hole in the image. We zero-fill the
1394 * entire length of the request. A short read also implies
1395 * zero-fill to the end of the request. Either way we
1396 * update the xferred count to indicate the whole request
1397 * was satisfied.
1398 */
b9434c5b 1399 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
6e2a4505 1400 if (obj_request->result == -ENOENT) {
b9434c5b
AE
1401 if (obj_request->type == OBJ_REQUEST_BIO)
1402 zero_bio_chain(obj_request->bio_list, 0);
1403 else
1404 zero_pages(obj_request->pages, 0, length);
6e2a4505 1405 obj_request->result = 0;
b9434c5b
AE
1406 obj_request->xferred = length;
1407 } else if (xferred < length && !obj_request->result) {
1408 if (obj_request->type == OBJ_REQUEST_BIO)
1409 zero_bio_chain(obj_request->bio_list, xferred);
1410 else
1411 zero_pages(obj_request->pages, xferred, length);
1412 obj_request->xferred = length;
6e2a4505
AE
1413 }
1414 obj_request_done_set(obj_request);
1415}
1416
bf0d5f50
AE
1417static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1418{
37206ee5
AE
1419 dout("%s: obj %p cb %p\n", __func__, obj_request,
1420 obj_request->callback);
bf0d5f50
AE
1421 if (obj_request->callback)
1422 obj_request->callback(obj_request);
788e2df3
AE
1423 else
1424 complete_all(&obj_request->completion);
bf0d5f50
AE
1425}
1426
c47f9371 1427static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
39bf2c5d
AE
1428{
1429 dout("%s: obj %p\n", __func__, obj_request);
1430 obj_request_done_set(obj_request);
1431}
1432
c47f9371 1433static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1434{
57acbaa7 1435 struct rbd_img_request *img_request = NULL;
a9e8ba2c 1436 struct rbd_device *rbd_dev = NULL;
57acbaa7
AE
1437 bool layered = false;
1438
1439 if (obj_request_img_data_test(obj_request)) {
1440 img_request = obj_request->img_request;
1441 layered = img_request && img_request_layered_test(img_request);
a9e8ba2c 1442 rbd_dev = img_request->rbd_dev;
57acbaa7 1443 }
8b3e1a56
AE
1444
1445 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1446 obj_request, img_request, obj_request->result,
1447 obj_request->xferred, obj_request->length);
a9e8ba2c
AE
1448 if (layered && obj_request->result == -ENOENT &&
1449 obj_request->img_offset < rbd_dev->parent_overlap)
8b3e1a56
AE
1450 rbd_img_parent_read(obj_request);
1451 else if (img_request)
6e2a4505
AE
1452 rbd_img_obj_request_read_callback(obj_request);
1453 else
1454 obj_request_done_set(obj_request);
bf0d5f50
AE
1455}
1456
c47f9371 1457static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1458{
1b83bef2
SW
1459 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1460 obj_request->result, obj_request->length);
1461 /*
8b3e1a56
AE
1462 * There is no such thing as a successful short write. Set
1463 * it to our originally-requested length.
1b83bef2
SW
1464 */
1465 obj_request->xferred = obj_request->length;
07741308 1466 obj_request_done_set(obj_request);
bf0d5f50
AE
1467}
1468
fbfab539
AE
1469/*
1470 * For a simple stat call there's nothing to do. We'll do more if
1471 * this is part of a write sequence for a layered image.
1472 */
c47f9371 1473static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
fbfab539 1474{
37206ee5 1475 dout("%s: obj %p\n", __func__, obj_request);
fbfab539
AE
1476 obj_request_done_set(obj_request);
1477}
1478
bf0d5f50
AE
1479static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1480 struct ceph_msg *msg)
1481{
1482 struct rbd_obj_request *obj_request = osd_req->r_priv;
bf0d5f50
AE
1483 u16 opcode;
1484
37206ee5 1485 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
bf0d5f50 1486 rbd_assert(osd_req == obj_request->osd_req);
57acbaa7
AE
1487 if (obj_request_img_data_test(obj_request)) {
1488 rbd_assert(obj_request->img_request);
1489 rbd_assert(obj_request->which != BAD_WHICH);
1490 } else {
1491 rbd_assert(obj_request->which == BAD_WHICH);
1492 }
bf0d5f50 1493
1b83bef2
SW
1494 if (osd_req->r_result < 0)
1495 obj_request->result = osd_req->r_result;
bf0d5f50
AE
1496 obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1497
0eefd470 1498 BUG_ON(osd_req->r_num_ops > 2);
bf0d5f50 1499
c47f9371
AE
1500 /*
1501 * We support a 64-bit length, but ultimately it has to be
1502 * passed to blk_end_request(), which takes an unsigned int.
1503 */
1b83bef2 1504 obj_request->xferred = osd_req->r_reply_op_len[0];
8b3e1a56 1505 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
79528734 1506 opcode = osd_req->r_ops[0].op;
bf0d5f50
AE
1507 switch (opcode) {
1508 case CEPH_OSD_OP_READ:
c47f9371 1509 rbd_osd_read_callback(obj_request);
bf0d5f50
AE
1510 break;
1511 case CEPH_OSD_OP_WRITE:
c47f9371 1512 rbd_osd_write_callback(obj_request);
bf0d5f50 1513 break;
fbfab539 1514 case CEPH_OSD_OP_STAT:
c47f9371 1515 rbd_osd_stat_callback(obj_request);
fbfab539 1516 break;
36be9a76 1517 case CEPH_OSD_OP_CALL:
b8d70035 1518 case CEPH_OSD_OP_NOTIFY_ACK:
9969ebc5 1519 case CEPH_OSD_OP_WATCH:
c47f9371 1520 rbd_osd_trivial_callback(obj_request);
9969ebc5 1521 break;
bf0d5f50
AE
1522 default:
1523 rbd_warn(NULL, "%s: unsupported op %hu\n",
1524 obj_request->object_name, (unsigned short) opcode);
1525 break;
1526 }
1527
07741308 1528 if (obj_request_done_test(obj_request))
bf0d5f50
AE
1529 rbd_obj_request_complete(obj_request);
1530}
1531
9d4df01f 1532static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
430c28c3
AE
1533{
1534 struct rbd_img_request *img_request = obj_request->img_request;
8c042b0d 1535 struct ceph_osd_request *osd_req = obj_request->osd_req;
9d4df01f 1536 u64 snap_id;
430c28c3 1537
8c042b0d 1538 rbd_assert(osd_req != NULL);
430c28c3 1539
9d4df01f 1540 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
8c042b0d 1541 ceph_osdc_build_request(osd_req, obj_request->offset,
9d4df01f
AE
1542 NULL, snap_id, NULL);
1543}
1544
1545static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1546{
1547 struct rbd_img_request *img_request = obj_request->img_request;
1548 struct ceph_osd_request *osd_req = obj_request->osd_req;
1549 struct ceph_snap_context *snapc;
1550 struct timespec mtime = CURRENT_TIME;
1551
1552 rbd_assert(osd_req != NULL);
1553
1554 snapc = img_request ? img_request->snapc : NULL;
1555 ceph_osdc_build_request(osd_req, obj_request->offset,
1556 snapc, CEPH_NOSNAP, &mtime);
430c28c3
AE
1557}
1558
bf0d5f50
AE
1559static struct ceph_osd_request *rbd_osd_req_create(
1560 struct rbd_device *rbd_dev,
1561 bool write_request,
430c28c3 1562 struct rbd_obj_request *obj_request)
bf0d5f50 1563{
bf0d5f50
AE
1564 struct ceph_snap_context *snapc = NULL;
1565 struct ceph_osd_client *osdc;
1566 struct ceph_osd_request *osd_req;
bf0d5f50 1567
6365d33a
AE
1568 if (obj_request_img_data_test(obj_request)) {
1569 struct rbd_img_request *img_request = obj_request->img_request;
1570
0c425248
AE
1571 rbd_assert(write_request ==
1572 img_request_write_test(img_request));
1573 if (write_request)
bf0d5f50 1574 snapc = img_request->snapc;
bf0d5f50
AE
1575 }
1576
1577 /* Allocate and initialize the request, for the single op */
1578
1579 osdc = &rbd_dev->rbd_client->client->osdc;
1580 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1581 if (!osd_req)
1582 return NULL; /* ENOMEM */
bf0d5f50 1583
430c28c3 1584 if (write_request)
bf0d5f50 1585 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
430c28c3 1586 else
bf0d5f50 1587 osd_req->r_flags = CEPH_OSD_FLAG_READ;
bf0d5f50
AE
1588
1589 osd_req->r_callback = rbd_osd_req_callback;
1590 osd_req->r_priv = obj_request;
1591
1592 osd_req->r_oid_len = strlen(obj_request->object_name);
1593 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1594 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1595
1596 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1597
bf0d5f50
AE
1598 return osd_req;
1599}
1600
0eefd470
AE
1601/*
1602 * Create a copyup osd request based on the information in the
1603 * object request supplied. A copyup request has two osd ops,
1604 * a copyup method call, and a "normal" write request.
1605 */
1606static struct ceph_osd_request *
1607rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1608{
1609 struct rbd_img_request *img_request;
1610 struct ceph_snap_context *snapc;
1611 struct rbd_device *rbd_dev;
1612 struct ceph_osd_client *osdc;
1613 struct ceph_osd_request *osd_req;
1614
1615 rbd_assert(obj_request_img_data_test(obj_request));
1616 img_request = obj_request->img_request;
1617 rbd_assert(img_request);
1618 rbd_assert(img_request_write_test(img_request));
1619
1620 /* Allocate and initialize the request, for the two ops */
1621
1622 snapc = img_request->snapc;
1623 rbd_dev = img_request->rbd_dev;
1624 osdc = &rbd_dev->rbd_client->client->osdc;
1625 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1626 if (!osd_req)
1627 return NULL; /* ENOMEM */
1628
1629 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1630 osd_req->r_callback = rbd_osd_req_callback;
1631 osd_req->r_priv = obj_request;
1632
1633 osd_req->r_oid_len = strlen(obj_request->object_name);
1634 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1635 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1636
1637 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1638
1639 return osd_req;
1640}
1641
1642
bf0d5f50
AE
1643static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1644{
1645 ceph_osdc_put_request(osd_req);
1646}
1647
1648/* object_name is assumed to be a non-null pointer and NUL-terminated */
1649
1650static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1651 u64 offset, u64 length,
1652 enum obj_request_type type)
1653{
1654 struct rbd_obj_request *obj_request;
1655 size_t size;
1656 char *name;
1657
1658 rbd_assert(obj_request_type_valid(type));
1659
1660 size = strlen(object_name) + 1;
1661 obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1662 if (!obj_request)
1663 return NULL;
1664
1665 name = (char *)(obj_request + 1);
1666 obj_request->object_name = memcpy(name, object_name, size);
1667 obj_request->offset = offset;
1668 obj_request->length = length;
926f9b3f 1669 obj_request->flags = 0;
bf0d5f50
AE
1670 obj_request->which = BAD_WHICH;
1671 obj_request->type = type;
1672 INIT_LIST_HEAD(&obj_request->links);
788e2df3 1673 init_completion(&obj_request->completion);
bf0d5f50
AE
1674 kref_init(&obj_request->kref);
1675
37206ee5
AE
1676 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1677 offset, length, (int)type, obj_request);
1678
bf0d5f50
AE
1679 return obj_request;
1680}
1681
1682static void rbd_obj_request_destroy(struct kref *kref)
1683{
1684 struct rbd_obj_request *obj_request;
1685
1686 obj_request = container_of(kref, struct rbd_obj_request, kref);
1687
37206ee5
AE
1688 dout("%s: obj %p\n", __func__, obj_request);
1689
bf0d5f50
AE
1690 rbd_assert(obj_request->img_request == NULL);
1691 rbd_assert(obj_request->which == BAD_WHICH);
1692
1693 if (obj_request->osd_req)
1694 rbd_osd_req_destroy(obj_request->osd_req);
1695
1696 rbd_assert(obj_request_type_valid(obj_request->type));
1697 switch (obj_request->type) {
9969ebc5
AE
1698 case OBJ_REQUEST_NODATA:
1699 break; /* Nothing to do */
bf0d5f50
AE
1700 case OBJ_REQUEST_BIO:
1701 if (obj_request->bio_list)
1702 bio_chain_put(obj_request->bio_list);
1703 break;
788e2df3
AE
1704 case OBJ_REQUEST_PAGES:
1705 if (obj_request->pages)
1706 ceph_release_page_vector(obj_request->pages,
1707 obj_request->page_count);
1708 break;
bf0d5f50
AE
1709 }
1710
1711 kfree(obj_request);
1712}
1713
1714/*
1715 * Caller is responsible for filling in the list of object requests
1716 * that comprises the image request, and the Linux request pointer
1717 * (if there is one).
1718 */
cc344fa1
AE
1719static struct rbd_img_request *rbd_img_request_create(
1720 struct rbd_device *rbd_dev,
bf0d5f50 1721 u64 offset, u64 length,
9849e986
AE
1722 bool write_request,
1723 bool child_request)
bf0d5f50
AE
1724{
1725 struct rbd_img_request *img_request;
1726 struct ceph_snap_context *snapc = NULL;
1727
1728 img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1729 if (!img_request)
1730 return NULL;
1731
1732 if (write_request) {
1733 down_read(&rbd_dev->header_rwsem);
1734 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1735 up_read(&rbd_dev->header_rwsem);
1736 if (WARN_ON(!snapc)) {
1737 kfree(img_request);
1738 return NULL; /* Shouldn't happen */
1739 }
0c425248 1740
bf0d5f50
AE
1741 }
1742
1743 img_request->rq = NULL;
1744 img_request->rbd_dev = rbd_dev;
1745 img_request->offset = offset;
1746 img_request->length = length;
0c425248
AE
1747 img_request->flags = 0;
1748 if (write_request) {
1749 img_request_write_set(img_request);
bf0d5f50 1750 img_request->snapc = snapc;
0c425248 1751 } else {
bf0d5f50 1752 img_request->snap_id = rbd_dev->spec->snap_id;
0c425248 1753 }
9849e986
AE
1754 if (child_request)
1755 img_request_child_set(img_request);
d0b2e944
AE
1756 if (rbd_dev->parent_spec)
1757 img_request_layered_set(img_request);
bf0d5f50
AE
1758 spin_lock_init(&img_request->completion_lock);
1759 img_request->next_completion = 0;
1760 img_request->callback = NULL;
a5a337d4 1761 img_request->result = 0;
bf0d5f50
AE
1762 img_request->obj_request_count = 0;
1763 INIT_LIST_HEAD(&img_request->obj_requests);
1764 kref_init(&img_request->kref);
1765
1766 rbd_img_request_get(img_request); /* Avoid a warning */
1767 rbd_img_request_put(img_request); /* TEMPORARY */
1768
37206ee5
AE
1769 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1770 write_request ? "write" : "read", offset, length,
1771 img_request);
1772
bf0d5f50
AE
1773 return img_request;
1774}
1775
1776static void rbd_img_request_destroy(struct kref *kref)
1777{
1778 struct rbd_img_request *img_request;
1779 struct rbd_obj_request *obj_request;
1780 struct rbd_obj_request *next_obj_request;
1781
1782 img_request = container_of(kref, struct rbd_img_request, kref);
1783
37206ee5
AE
1784 dout("%s: img %p\n", __func__, img_request);
1785
bf0d5f50
AE
1786 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1787 rbd_img_obj_request_del(img_request, obj_request);
25dcf954 1788 rbd_assert(img_request->obj_request_count == 0);
bf0d5f50 1789
0c425248 1790 if (img_request_write_test(img_request))
bf0d5f50
AE
1791 ceph_put_snap_context(img_request->snapc);
1792
8b3e1a56
AE
1793 if (img_request_child_test(img_request))
1794 rbd_obj_request_put(img_request->obj_request);
1795
bf0d5f50
AE
1796 kfree(img_request);
1797}
1798
1217857f
AE
1799static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1800{
6365d33a 1801 struct rbd_img_request *img_request;
1217857f
AE
1802 unsigned int xferred;
1803 int result;
8b3e1a56 1804 bool more;
1217857f 1805
6365d33a
AE
1806 rbd_assert(obj_request_img_data_test(obj_request));
1807 img_request = obj_request->img_request;
1808
1217857f
AE
1809 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1810 xferred = (unsigned int)obj_request->xferred;
1811 result = obj_request->result;
1812 if (result) {
1813 struct rbd_device *rbd_dev = img_request->rbd_dev;
1814
1815 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1816 img_request_write_test(img_request) ? "write" : "read",
1817 obj_request->length, obj_request->img_offset,
1818 obj_request->offset);
1819 rbd_warn(rbd_dev, " result %d xferred %x\n",
1820 result, xferred);
1821 if (!img_request->result)
1822 img_request->result = result;
1823 }
1824
f1a4739f
AE
1825 /* Image object requests don't own their page array */
1826
1827 if (obj_request->type == OBJ_REQUEST_PAGES) {
1828 obj_request->pages = NULL;
1829 obj_request->page_count = 0;
1830 }
1831
8b3e1a56
AE
1832 if (img_request_child_test(img_request)) {
1833 rbd_assert(img_request->obj_request != NULL);
1834 more = obj_request->which < img_request->obj_request_count - 1;
1835 } else {
1836 rbd_assert(img_request->rq != NULL);
1837 more = blk_end_request(img_request->rq, result, xferred);
1838 }
1839
1840 return more;
1217857f
AE
1841}
1842
2169238d
AE
1843static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1844{
1845 struct rbd_img_request *img_request;
1846 u32 which = obj_request->which;
1847 bool more = true;
1848
6365d33a 1849 rbd_assert(obj_request_img_data_test(obj_request));
2169238d
AE
1850 img_request = obj_request->img_request;
1851
1852 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1853 rbd_assert(img_request != NULL);
2169238d
AE
1854 rbd_assert(img_request->obj_request_count > 0);
1855 rbd_assert(which != BAD_WHICH);
1856 rbd_assert(which < img_request->obj_request_count);
1857 rbd_assert(which >= img_request->next_completion);
1858
1859 spin_lock_irq(&img_request->completion_lock);
1860 if (which != img_request->next_completion)
1861 goto out;
1862
1863 for_each_obj_request_from(img_request, obj_request) {
2169238d
AE
1864 rbd_assert(more);
1865 rbd_assert(which < img_request->obj_request_count);
1866
1867 if (!obj_request_done_test(obj_request))
1868 break;
1217857f 1869 more = rbd_img_obj_end_request(obj_request);
2169238d
AE
1870 which++;
1871 }
1872
1873 rbd_assert(more ^ (which == img_request->obj_request_count));
1874 img_request->next_completion = which;
1875out:
1876 spin_unlock_irq(&img_request->completion_lock);
1877
1878 if (!more)
1879 rbd_img_request_complete(img_request);
1880}
1881
f1a4739f
AE
1882/*
1883 * Split up an image request into one or more object requests, each
1884 * to a different object. The "type" parameter indicates whether
1885 * "data_desc" is the pointer to the head of a list of bio
1886 * structures, or the base of a page array. In either case this
1887 * function assumes data_desc describes memory sufficient to hold
1888 * all data described by the image request.
1889 */
1890static int rbd_img_request_fill(struct rbd_img_request *img_request,
1891 enum obj_request_type type,
1892 void *data_desc)
bf0d5f50
AE
1893{
1894 struct rbd_device *rbd_dev = img_request->rbd_dev;
1895 struct rbd_obj_request *obj_request = NULL;
1896 struct rbd_obj_request *next_obj_request;
0c425248 1897 bool write_request = img_request_write_test(img_request);
f1a4739f
AE
1898 struct bio *bio_list;
1899 unsigned int bio_offset = 0;
1900 struct page **pages;
7da22d29 1901 u64 img_offset;
bf0d5f50
AE
1902 u64 resid;
1903 u16 opcode;
1904
f1a4739f
AE
1905 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1906 (int)type, data_desc);
37206ee5 1907
430c28c3 1908 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
7da22d29 1909 img_offset = img_request->offset;
bf0d5f50 1910 resid = img_request->length;
4dda41d3 1911 rbd_assert(resid > 0);
f1a4739f
AE
1912
1913 if (type == OBJ_REQUEST_BIO) {
1914 bio_list = data_desc;
1915 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1916 } else {
1917 rbd_assert(type == OBJ_REQUEST_PAGES);
1918 pages = data_desc;
1919 }
1920
bf0d5f50 1921 while (resid) {
2fa12320 1922 struct ceph_osd_request *osd_req;
bf0d5f50 1923 const char *object_name;
bf0d5f50
AE
1924 u64 offset;
1925 u64 length;
1926
7da22d29 1927 object_name = rbd_segment_name(rbd_dev, img_offset);
bf0d5f50
AE
1928 if (!object_name)
1929 goto out_unwind;
7da22d29
AE
1930 offset = rbd_segment_offset(rbd_dev, img_offset);
1931 length = rbd_segment_length(rbd_dev, img_offset, resid);
bf0d5f50 1932 obj_request = rbd_obj_request_create(object_name,
f1a4739f 1933 offset, length, type);
bf0d5f50
AE
1934 kfree(object_name); /* object request has its own copy */
1935 if (!obj_request)
1936 goto out_unwind;
1937
f1a4739f
AE
1938 if (type == OBJ_REQUEST_BIO) {
1939 unsigned int clone_size;
1940
1941 rbd_assert(length <= (u64)UINT_MAX);
1942 clone_size = (unsigned int)length;
1943 obj_request->bio_list =
1944 bio_chain_clone_range(&bio_list,
1945 &bio_offset,
1946 clone_size,
1947 GFP_ATOMIC);
1948 if (!obj_request->bio_list)
1949 goto out_partial;
1950 } else {
1951 unsigned int page_count;
1952
1953 obj_request->pages = pages;
1954 page_count = (u32)calc_pages_for(offset, length);
1955 obj_request->page_count = page_count;
1956 if ((offset + length) & ~PAGE_MASK)
1957 page_count--; /* more on last page */
1958 pages += page_count;
1959 }
bf0d5f50 1960
2fa12320
AE
1961 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1962 obj_request);
1963 if (!osd_req)
bf0d5f50 1964 goto out_partial;
2fa12320 1965 obj_request->osd_req = osd_req;
2169238d 1966 obj_request->callback = rbd_img_obj_callback;
430c28c3 1967
2fa12320
AE
1968 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1969 0, 0);
f1a4739f
AE
1970 if (type == OBJ_REQUEST_BIO)
1971 osd_req_op_extent_osd_data_bio(osd_req, 0,
1972 obj_request->bio_list, length);
1973 else
1974 osd_req_op_extent_osd_data_pages(osd_req, 0,
1975 obj_request->pages, length,
1976 offset & ~PAGE_MASK, false, false);
9d4df01f
AE
1977
1978 if (write_request)
1979 rbd_osd_req_format_write(obj_request);
1980 else
1981 rbd_osd_req_format_read(obj_request);
430c28c3 1982
7da22d29 1983 obj_request->img_offset = img_offset;
bf0d5f50
AE
1984 rbd_img_obj_request_add(img_request, obj_request);
1985
7da22d29 1986 img_offset += length;
bf0d5f50
AE
1987 resid -= length;
1988 }
1989
1990 return 0;
1991
1992out_partial:
1993 rbd_obj_request_put(obj_request);
1994out_unwind:
1995 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1996 rbd_obj_request_put(obj_request);
1997
1998 return -ENOMEM;
1999}
2000
0eefd470
AE
2001static void
2002rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2003{
2004 struct rbd_img_request *img_request;
2005 struct rbd_device *rbd_dev;
2006 u64 length;
2007 u32 page_count;
2008
2009 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2010 rbd_assert(obj_request_img_data_test(obj_request));
2011 img_request = obj_request->img_request;
2012 rbd_assert(img_request);
2013
2014 rbd_dev = img_request->rbd_dev;
2015 rbd_assert(rbd_dev);
2016 length = (u64)1 << rbd_dev->header.obj_order;
2017 page_count = (u32)calc_pages_for(0, length);
2018
2019 rbd_assert(obj_request->copyup_pages);
2020 ceph_release_page_vector(obj_request->copyup_pages, page_count);
2021 obj_request->copyup_pages = NULL;
2022
2023 /*
2024 * We want the transfer count to reflect the size of the
2025 * original write request. There is no such thing as a
2026 * successful short write, so if the request was successful
2027 * we can just set it to the originally-requested length.
2028 */
2029 if (!obj_request->result)
2030 obj_request->xferred = obj_request->length;
2031
2032 /* Finish up with the normal image object callback */
2033
2034 rbd_img_obj_callback(obj_request);
2035}
2036
3d7efd18
AE
2037static void
2038rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2039{
2040 struct rbd_obj_request *orig_request;
0eefd470
AE
2041 struct ceph_osd_request *osd_req;
2042 struct ceph_osd_client *osdc;
2043 struct rbd_device *rbd_dev;
3d7efd18 2044 struct page **pages;
3d7efd18
AE
2045 int result;
2046 u64 obj_size;
2047 u64 xferred;
2048
2049 rbd_assert(img_request_child_test(img_request));
2050
2051 /* First get what we need from the image request */
2052
2053 pages = img_request->copyup_pages;
2054 rbd_assert(pages != NULL);
2055 img_request->copyup_pages = NULL;
2056
2057 orig_request = img_request->obj_request;
2058 rbd_assert(orig_request != NULL);
0eefd470 2059 rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
3d7efd18
AE
2060 result = img_request->result;
2061 obj_size = img_request->length;
2062 xferred = img_request->xferred;
2063
0eefd470
AE
2064 rbd_dev = img_request->rbd_dev;
2065 rbd_assert(rbd_dev);
2066 rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2067
3d7efd18
AE
2068 rbd_img_request_put(img_request);
2069
0eefd470
AE
2070 if (result)
2071 goto out_err;
2072
2073 /* Allocate the new copyup osd request for the original request */
2074
2075 result = -ENOMEM;
2076 rbd_assert(!orig_request->osd_req);
2077 osd_req = rbd_osd_req_create_copyup(orig_request);
2078 if (!osd_req)
2079 goto out_err;
2080 orig_request->osd_req = osd_req;
2081 orig_request->copyup_pages = pages;
3d7efd18 2082
0eefd470 2083 /* Initialize the copyup op */
3d7efd18 2084
0eefd470
AE
2085 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2086 osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2087 false, false);
3d7efd18 2088
0eefd470
AE
2089 /* Then the original write request op */
2090
2091 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2092 orig_request->offset,
2093 orig_request->length, 0, 0);
2094 osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2095 orig_request->length);
2096
2097 rbd_osd_req_format_write(orig_request);
2098
2099 /* All set, send it off. */
2100
2101 orig_request->callback = rbd_img_obj_copyup_callback;
2102 osdc = &rbd_dev->rbd_client->client->osdc;
2103 result = rbd_obj_request_submit(osdc, orig_request);
2104 if (!result)
2105 return;
2106out_err:
2107 /* Record the error code and complete the request */
2108
2109 orig_request->result = result;
2110 orig_request->xferred = 0;
2111 obj_request_done_set(orig_request);
2112 rbd_obj_request_complete(orig_request);
3d7efd18
AE
2113}
2114
2115/*
2116 * Read from the parent image the range of data that covers the
2117 * entire target of the given object request. This is used for
2118 * satisfying a layered image write request when the target of an
2119 * object request from the image request does not exist.
2120 *
2121 * A page array big enough to hold the returned data is allocated
2122 * and supplied to rbd_img_request_fill() as the "data descriptor."
2123 * When the read completes, this page array will be transferred to
2124 * the original object request for the copyup operation.
2125 *
2126 * If an error occurs, record it as the result of the original
2127 * object request and mark it done so it gets completed.
2128 */
2129static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2130{
2131 struct rbd_img_request *img_request = NULL;
2132 struct rbd_img_request *parent_request = NULL;
2133 struct rbd_device *rbd_dev;
2134 u64 img_offset;
2135 u64 length;
2136 struct page **pages = NULL;
2137 u32 page_count;
2138 int result;
2139
2140 rbd_assert(obj_request_img_data_test(obj_request));
2141 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2142
2143 img_request = obj_request->img_request;
2144 rbd_assert(img_request != NULL);
2145 rbd_dev = img_request->rbd_dev;
2146 rbd_assert(rbd_dev->parent != NULL);
2147
0eefd470
AE
2148 /*
2149 * First things first. The original osd request is of no
2150 * use to use any more, we'll need a new one that can hold
2151 * the two ops in a copyup request. We'll get that later,
2152 * but for now we can release the old one.
2153 */
2154 rbd_osd_req_destroy(obj_request->osd_req);
2155 obj_request->osd_req = NULL;
2156
3d7efd18
AE
2157 /*
2158 * Determine the byte range covered by the object in the
2159 * child image to which the original request was to be sent.
2160 */
2161 img_offset = obj_request->img_offset - obj_request->offset;
2162 length = (u64)1 << rbd_dev->header.obj_order;
2163
a9e8ba2c
AE
2164 /*
2165 * There is no defined parent data beyond the parent
2166 * overlap, so limit what we read at that boundary if
2167 * necessary.
2168 */
2169 if (img_offset + length > rbd_dev->parent_overlap) {
2170 rbd_assert(img_offset < rbd_dev->parent_overlap);
2171 length = rbd_dev->parent_overlap - img_offset;
2172 }
2173
3d7efd18
AE
2174 /*
2175 * Allocate a page array big enough to receive the data read
2176 * from the parent.
2177 */
2178 page_count = (u32)calc_pages_for(0, length);
2179 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2180 if (IS_ERR(pages)) {
2181 result = PTR_ERR(pages);
2182 pages = NULL;
2183 goto out_err;
2184 }
2185
2186 result = -ENOMEM;
2187 parent_request = rbd_img_request_create(rbd_dev->parent,
2188 img_offset, length,
2189 false, true);
2190 if (!parent_request)
2191 goto out_err;
2192 rbd_obj_request_get(obj_request);
2193 parent_request->obj_request = obj_request;
2194
2195 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2196 if (result)
2197 goto out_err;
2198 parent_request->copyup_pages = pages;
2199
2200 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2201 result = rbd_img_request_submit(parent_request);
2202 if (!result)
2203 return 0;
2204
2205 parent_request->copyup_pages = NULL;
2206 parent_request->obj_request = NULL;
2207 rbd_obj_request_put(obj_request);
2208out_err:
2209 if (pages)
2210 ceph_release_page_vector(pages, page_count);
2211 if (parent_request)
2212 rbd_img_request_put(parent_request);
2213 obj_request->result = result;
2214 obj_request->xferred = 0;
2215 obj_request_done_set(obj_request);
2216
2217 return result;
2218}
2219
c5b5ef6c
AE
2220static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2221{
c5b5ef6c
AE
2222 struct rbd_obj_request *orig_request;
2223 int result;
2224
2225 rbd_assert(!obj_request_img_data_test(obj_request));
2226
2227 /*
2228 * All we need from the object request is the original
2229 * request and the result of the STAT op. Grab those, then
2230 * we're done with the request.
2231 */
2232 orig_request = obj_request->obj_request;
2233 obj_request->obj_request = NULL;
2234 rbd_assert(orig_request);
2235 rbd_assert(orig_request->img_request);
2236
2237 result = obj_request->result;
2238 obj_request->result = 0;
2239
2240 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2241 obj_request, orig_request, result,
2242 obj_request->xferred, obj_request->length);
2243 rbd_obj_request_put(obj_request);
2244
2245 rbd_assert(orig_request);
2246 rbd_assert(orig_request->img_request);
c5b5ef6c
AE
2247
2248 /*
2249 * Our only purpose here is to determine whether the object
2250 * exists, and we don't want to treat the non-existence as
2251 * an error. If something else comes back, transfer the
2252 * error to the original request and complete it now.
2253 */
2254 if (!result) {
2255 obj_request_existence_set(orig_request, true);
2256 } else if (result == -ENOENT) {
2257 obj_request_existence_set(orig_request, false);
2258 } else if (result) {
2259 orig_request->result = result;
3d7efd18 2260 goto out;
c5b5ef6c
AE
2261 }
2262
2263 /*
2264 * Resubmit the original request now that we have recorded
2265 * whether the target object exists.
2266 */
b454e36d 2267 orig_request->result = rbd_img_obj_request_submit(orig_request);
3d7efd18 2268out:
c5b5ef6c
AE
2269 if (orig_request->result)
2270 rbd_obj_request_complete(orig_request);
2271 rbd_obj_request_put(orig_request);
2272}
2273
2274static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2275{
2276 struct rbd_obj_request *stat_request;
2277 struct rbd_device *rbd_dev;
2278 struct ceph_osd_client *osdc;
2279 struct page **pages = NULL;
2280 u32 page_count;
2281 size_t size;
2282 int ret;
2283
2284 /*
2285 * The response data for a STAT call consists of:
2286 * le64 length;
2287 * struct {
2288 * le32 tv_sec;
2289 * le32 tv_nsec;
2290 * } mtime;
2291 */
2292 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2293 page_count = (u32)calc_pages_for(0, size);
2294 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2295 if (IS_ERR(pages))
2296 return PTR_ERR(pages);
2297
2298 ret = -ENOMEM;
2299 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2300 OBJ_REQUEST_PAGES);
2301 if (!stat_request)
2302 goto out;
2303
2304 rbd_obj_request_get(obj_request);
2305 stat_request->obj_request = obj_request;
2306 stat_request->pages = pages;
2307 stat_request->page_count = page_count;
2308
2309 rbd_assert(obj_request->img_request);
2310 rbd_dev = obj_request->img_request->rbd_dev;
2311 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2312 stat_request);
2313 if (!stat_request->osd_req)
2314 goto out;
2315 stat_request->callback = rbd_img_obj_exists_callback;
2316
2317 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2318 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2319 false, false);
9d4df01f 2320 rbd_osd_req_format_read(stat_request);
c5b5ef6c
AE
2321
2322 osdc = &rbd_dev->rbd_client->client->osdc;
2323 ret = rbd_obj_request_submit(osdc, stat_request);
2324out:
2325 if (ret)
2326 rbd_obj_request_put(obj_request);
2327
2328 return ret;
2329}
2330
b454e36d
AE
2331static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2332{
2333 struct rbd_img_request *img_request;
a9e8ba2c 2334 struct rbd_device *rbd_dev;
3d7efd18 2335 bool known;
b454e36d
AE
2336
2337 rbd_assert(obj_request_img_data_test(obj_request));
2338
2339 img_request = obj_request->img_request;
2340 rbd_assert(img_request);
a9e8ba2c 2341 rbd_dev = img_request->rbd_dev;
b454e36d 2342
b454e36d 2343 /*
a9e8ba2c
AE
2344 * Only writes to layered images need special handling.
2345 * Reads and non-layered writes are simple object requests.
2346 * Layered writes that start beyond the end of the overlap
2347 * with the parent have no parent data, so they too are
2348 * simple object requests. Finally, if the target object is
2349 * known to already exist, its parent data has already been
2350 * copied, so a write to the object can also be handled as a
2351 * simple object request.
b454e36d
AE
2352 */
2353 if (!img_request_write_test(img_request) ||
2354 !img_request_layered_test(img_request) ||
a9e8ba2c 2355 rbd_dev->parent_overlap <= obj_request->img_offset ||
3d7efd18
AE
2356 ((known = obj_request_known_test(obj_request)) &&
2357 obj_request_exists_test(obj_request))) {
b454e36d
AE
2358
2359 struct rbd_device *rbd_dev;
2360 struct ceph_osd_client *osdc;
2361
2362 rbd_dev = obj_request->img_request->rbd_dev;
2363 osdc = &rbd_dev->rbd_client->client->osdc;
2364
2365 return rbd_obj_request_submit(osdc, obj_request);
2366 }
2367
2368 /*
3d7efd18
AE
2369 * It's a layered write. The target object might exist but
2370 * we may not know that yet. If we know it doesn't exist,
2371 * start by reading the data for the full target object from
2372 * the parent so we can use it for a copyup to the target.
b454e36d 2373 */
3d7efd18
AE
2374 if (known)
2375 return rbd_img_obj_parent_read_full(obj_request);
2376
2377 /* We don't know whether the target exists. Go find out. */
b454e36d
AE
2378
2379 return rbd_img_obj_exists_submit(obj_request);
2380}
2381
bf0d5f50
AE
2382static int rbd_img_request_submit(struct rbd_img_request *img_request)
2383{
bf0d5f50 2384 struct rbd_obj_request *obj_request;
46faeed4 2385 struct rbd_obj_request *next_obj_request;
bf0d5f50 2386
37206ee5 2387 dout("%s: img %p\n", __func__, img_request);
46faeed4 2388 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
bf0d5f50
AE
2389 int ret;
2390
b454e36d 2391 ret = rbd_img_obj_request_submit(obj_request);
bf0d5f50
AE
2392 if (ret)
2393 return ret;
bf0d5f50
AE
2394 }
2395
2396 return 0;
2397}
8b3e1a56
AE
2398
2399static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2400{
2401 struct rbd_obj_request *obj_request;
a9e8ba2c
AE
2402 struct rbd_device *rbd_dev;
2403 u64 obj_end;
8b3e1a56
AE
2404
2405 rbd_assert(img_request_child_test(img_request));
2406
2407 obj_request = img_request->obj_request;
a9e8ba2c
AE
2408 rbd_assert(obj_request);
2409 rbd_assert(obj_request->img_request);
2410
8b3e1a56 2411 obj_request->result = img_request->result;
a9e8ba2c
AE
2412 if (obj_request->result)
2413 goto out;
2414
2415 /*
2416 * We need to zero anything beyond the parent overlap
2417 * boundary. Since rbd_img_obj_request_read_callback()
2418 * will zero anything beyond the end of a short read, an
2419 * easy way to do this is to pretend the data from the
2420 * parent came up short--ending at the overlap boundary.
2421 */
2422 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2423 obj_end = obj_request->img_offset + obj_request->length;
2424 rbd_dev = obj_request->img_request->rbd_dev;
2425 if (obj_end > rbd_dev->parent_overlap) {
2426 u64 xferred = 0;
2427
2428 if (obj_request->img_offset < rbd_dev->parent_overlap)
2429 xferred = rbd_dev->parent_overlap -
2430 obj_request->img_offset;
8b3e1a56 2431
a9e8ba2c
AE
2432 obj_request->xferred = min(img_request->xferred, xferred);
2433 } else {
2434 obj_request->xferred = img_request->xferred;
2435 }
2436out:
8b3e1a56
AE
2437 rbd_img_obj_request_read_callback(obj_request);
2438 rbd_obj_request_complete(obj_request);
2439}
2440
2441static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2442{
2443 struct rbd_device *rbd_dev;
2444 struct rbd_img_request *img_request;
2445 int result;
2446
2447 rbd_assert(obj_request_img_data_test(obj_request));
2448 rbd_assert(obj_request->img_request != NULL);
2449 rbd_assert(obj_request->result == (s32) -ENOENT);
2450 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2451
2452 rbd_dev = obj_request->img_request->rbd_dev;
2453 rbd_assert(rbd_dev->parent != NULL);
2454 /* rbd_read_finish(obj_request, obj_request->length); */
2455 img_request = rbd_img_request_create(rbd_dev->parent,
2456 obj_request->img_offset,
2457 obj_request->length,
2458 false, true);
2459 result = -ENOMEM;
2460 if (!img_request)
2461 goto out_err;
2462
2463 rbd_obj_request_get(obj_request);
2464 img_request->obj_request = obj_request;
2465
f1a4739f
AE
2466 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2467 obj_request->bio_list);
8b3e1a56
AE
2468 if (result)
2469 goto out_err;
2470
2471 img_request->callback = rbd_img_parent_read_callback;
2472 result = rbd_img_request_submit(img_request);
2473 if (result)
2474 goto out_err;
2475
2476 return;
2477out_err:
2478 if (img_request)
2479 rbd_img_request_put(img_request);
2480 obj_request->result = result;
2481 obj_request->xferred = 0;
2482 obj_request_done_set(obj_request);
2483}
bf0d5f50 2484
cf81b60e 2485static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
b8d70035
AE
2486 u64 ver, u64 notify_id)
2487{
2488 struct rbd_obj_request *obj_request;
2169238d 2489 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
b8d70035
AE
2490 int ret;
2491
2492 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2493 OBJ_REQUEST_NODATA);
2494 if (!obj_request)
2495 return -ENOMEM;
2496
2497 ret = -ENOMEM;
430c28c3 2498 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
b8d70035
AE
2499 if (!obj_request->osd_req)
2500 goto out;
2169238d 2501 obj_request->callback = rbd_obj_request_put;
b8d70035 2502
c99d2d4a
AE
2503 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2504 notify_id, ver, 0);
9d4df01f 2505 rbd_osd_req_format_read(obj_request);
430c28c3 2506
b8d70035 2507 ret = rbd_obj_request_submit(osdc, obj_request);
b8d70035 2508out:
cf81b60e
AE
2509 if (ret)
2510 rbd_obj_request_put(obj_request);
b8d70035
AE
2511
2512 return ret;
2513}
2514
2515static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2516{
2517 struct rbd_device *rbd_dev = (struct rbd_device *)data;
2518 u64 hver;
b8d70035
AE
2519
2520 if (!rbd_dev)
2521 return;
2522
37206ee5 2523 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
b8d70035
AE
2524 rbd_dev->header_name, (unsigned long long) notify_id,
2525 (unsigned int) opcode);
522a0cc0 2526 (void)rbd_dev_refresh(rbd_dev, &hver);
b8d70035 2527
cf81b60e 2528 rbd_obj_notify_ack(rbd_dev, hver, notify_id);
b8d70035
AE
2529}
2530
9969ebc5
AE
2531/*
2532 * Request sync osd watch/unwatch. The value of "start" determines
2533 * whether a watch request is being initiated or torn down.
2534 */
2535static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2536{
2537 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2538 struct rbd_obj_request *obj_request;
9969ebc5
AE
2539 int ret;
2540
2541 rbd_assert(start ^ !!rbd_dev->watch_event);
2542 rbd_assert(start ^ !!rbd_dev->watch_request);
2543
2544 if (start) {
3c663bbd 2545 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
9969ebc5
AE
2546 &rbd_dev->watch_event);
2547 if (ret < 0)
2548 return ret;
8eb87565 2549 rbd_assert(rbd_dev->watch_event != NULL);
9969ebc5
AE
2550 }
2551
2552 ret = -ENOMEM;
2553 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2554 OBJ_REQUEST_NODATA);
2555 if (!obj_request)
2556 goto out_cancel;
2557
430c28c3
AE
2558 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2559 if (!obj_request->osd_req)
2560 goto out_cancel;
2561
8eb87565 2562 if (start)
975241af 2563 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
8eb87565 2564 else
6977c3f9 2565 ceph_osdc_unregister_linger_request(osdc,
975241af 2566 rbd_dev->watch_request->osd_req);
2169238d
AE
2567
2568 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2569 rbd_dev->watch_event->cookie,
2570 rbd_dev->header.obj_version, start);
9d4df01f 2571 rbd_osd_req_format_write(obj_request);
2169238d 2572
9969ebc5
AE
2573 ret = rbd_obj_request_submit(osdc, obj_request);
2574 if (ret)
2575 goto out_cancel;
2576 ret = rbd_obj_request_wait(obj_request);
2577 if (ret)
2578 goto out_cancel;
9969ebc5
AE
2579 ret = obj_request->result;
2580 if (ret)
2581 goto out_cancel;
2582
8eb87565
AE
2583 /*
2584 * A watch request is set to linger, so the underlying osd
2585 * request won't go away until we unregister it. We retain
2586 * a pointer to the object request during that time (in
2587 * rbd_dev->watch_request), so we'll keep a reference to
2588 * it. We'll drop that reference (below) after we've
2589 * unregistered it.
2590 */
2591 if (start) {
2592 rbd_dev->watch_request = obj_request;
2593
2594 return 0;
2595 }
2596
2597 /* We have successfully torn down the watch request */
2598
2599 rbd_obj_request_put(rbd_dev->watch_request);
2600 rbd_dev->watch_request = NULL;
9969ebc5
AE
2601out_cancel:
2602 /* Cancel the event if we're tearing down, or on error */
2603 ceph_osdc_cancel_event(rbd_dev->watch_event);
2604 rbd_dev->watch_event = NULL;
9969ebc5
AE
2605 if (obj_request)
2606 rbd_obj_request_put(obj_request);
2607
2608 return ret;
2609}
2610
36be9a76 2611/*
f40eb349
AE
2612 * Synchronous osd object method call. Returns the number of bytes
2613 * returned in the outbound buffer, or a negative error code.
36be9a76
AE
2614 */
2615static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2616 const char *object_name,
2617 const char *class_name,
2618 const char *method_name,
4157976b 2619 const void *outbound,
36be9a76 2620 size_t outbound_size,
4157976b 2621 void *inbound,
36be9a76
AE
2622 size_t inbound_size,
2623 u64 *version)
2624{
2169238d 2625 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
36be9a76 2626 struct rbd_obj_request *obj_request;
36be9a76
AE
2627 struct page **pages;
2628 u32 page_count;
2629 int ret;
2630
2631 /*
6010a451
AE
2632 * Method calls are ultimately read operations. The result
2633 * should placed into the inbound buffer provided. They
2634 * also supply outbound data--parameters for the object
2635 * method. Currently if this is present it will be a
2636 * snapshot id.
36be9a76 2637 */
57385b51 2638 page_count = (u32)calc_pages_for(0, inbound_size);
36be9a76
AE
2639 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2640 if (IS_ERR(pages))
2641 return PTR_ERR(pages);
2642
2643 ret = -ENOMEM;
6010a451 2644 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
36be9a76
AE
2645 OBJ_REQUEST_PAGES);
2646 if (!obj_request)
2647 goto out;
2648
2649 obj_request->pages = pages;
2650 obj_request->page_count = page_count;
2651
430c28c3 2652 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
36be9a76
AE
2653 if (!obj_request->osd_req)
2654 goto out;
2655
c99d2d4a 2656 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
04017e29
AE
2657 class_name, method_name);
2658 if (outbound_size) {
2659 struct ceph_pagelist *pagelist;
2660
2661 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2662 if (!pagelist)
2663 goto out;
2664
2665 ceph_pagelist_init(pagelist);
2666 ceph_pagelist_append(pagelist, outbound, outbound_size);
2667 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2668 pagelist);
2669 }
a4ce40a9
AE
2670 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2671 obj_request->pages, inbound_size,
44cd188d 2672 0, false, false);
9d4df01f 2673 rbd_osd_req_format_read(obj_request);
430c28c3 2674
36be9a76
AE
2675 ret = rbd_obj_request_submit(osdc, obj_request);
2676 if (ret)
2677 goto out;
2678 ret = rbd_obj_request_wait(obj_request);
2679 if (ret)
2680 goto out;
2681
2682 ret = obj_request->result;
2683 if (ret < 0)
2684 goto out;
57385b51
AE
2685
2686 rbd_assert(obj_request->xferred < (u64)INT_MAX);
2687 ret = (int)obj_request->xferred;
903bb32e 2688 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
36be9a76
AE
2689 if (version)
2690 *version = obj_request->version;
2691out:
2692 if (obj_request)
2693 rbd_obj_request_put(obj_request);
2694 else
2695 ceph_release_page_vector(pages, page_count);
2696
2697 return ret;
2698}
2699
bf0d5f50 2700static void rbd_request_fn(struct request_queue *q)
cc344fa1 2701 __releases(q->queue_lock) __acquires(q->queue_lock)
bf0d5f50
AE
2702{
2703 struct rbd_device *rbd_dev = q->queuedata;
2704 bool read_only = rbd_dev->mapping.read_only;
2705 struct request *rq;
2706 int result;
2707
2708 while ((rq = blk_fetch_request(q))) {
2709 bool write_request = rq_data_dir(rq) == WRITE;
2710 struct rbd_img_request *img_request;
2711 u64 offset;
2712 u64 length;
2713
2714 /* Ignore any non-FS requests that filter through. */
2715
2716 if (rq->cmd_type != REQ_TYPE_FS) {
4dda41d3
AE
2717 dout("%s: non-fs request type %d\n", __func__,
2718 (int) rq->cmd_type);
2719 __blk_end_request_all(rq, 0);
2720 continue;
2721 }
2722
2723 /* Ignore/skip any zero-length requests */
2724
2725 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2726 length = (u64) blk_rq_bytes(rq);
2727
2728 if (!length) {
2729 dout("%s: zero-length request\n", __func__);
bf0d5f50
AE
2730 __blk_end_request_all(rq, 0);
2731 continue;
2732 }
2733
2734 spin_unlock_irq(q->queue_lock);
2735
2736 /* Disallow writes to a read-only device */
2737
2738 if (write_request) {
2739 result = -EROFS;
2740 if (read_only)
2741 goto end_request;
2742 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2743 }
2744
6d292906
AE
2745 /*
2746 * Quit early if the mapped snapshot no longer
2747 * exists. It's still possible the snapshot will
2748 * have disappeared by the time our request arrives
2749 * at the osd, but there's no sense in sending it if
2750 * we already know.
2751 */
2752 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
bf0d5f50
AE
2753 dout("request for non-existent snapshot");
2754 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2755 result = -ENXIO;
2756 goto end_request;
2757 }
2758
bf0d5f50
AE
2759 result = -EINVAL;
2760 if (WARN_ON(offset && length > U64_MAX - offset + 1))
2761 goto end_request; /* Shouldn't happen */
2762
2763 result = -ENOMEM;
2764 img_request = rbd_img_request_create(rbd_dev, offset, length,
9849e986 2765 write_request, false);
bf0d5f50
AE
2766 if (!img_request)
2767 goto end_request;
2768
2769 img_request->rq = rq;
2770
f1a4739f
AE
2771 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2772 rq->bio);
bf0d5f50
AE
2773 if (!result)
2774 result = rbd_img_request_submit(img_request);
2775 if (result)
2776 rbd_img_request_put(img_request);
2777end_request:
2778 spin_lock_irq(q->queue_lock);
2779 if (result < 0) {
7da22d29
AE
2780 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2781 write_request ? "write" : "read",
2782 length, offset, result);
2783
bf0d5f50
AE
2784 __blk_end_request_all(rq, result);
2785 }
2786 }
2787}
2788
602adf40
YS
2789/*
2790 * a queue callback. Makes sure that we don't create a bio that spans across
2791 * multiple osd objects. One exception would be with a single page bios,
f7760dad 2792 * which we handle later at bio_chain_clone_range()
602adf40
YS
2793 */
2794static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2795 struct bio_vec *bvec)
2796{
2797 struct rbd_device *rbd_dev = q->queuedata;
e5cfeed2
AE
2798 sector_t sector_offset;
2799 sector_t sectors_per_obj;
2800 sector_t obj_sector_offset;
2801 int ret;
2802
2803 /*
2804 * Find how far into its rbd object the partition-relative
2805 * bio start sector is to offset relative to the enclosing
2806 * device.
2807 */
2808 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2809 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2810 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2811
2812 /*
2813 * Compute the number of bytes from that offset to the end
2814 * of the object. Account for what's already used by the bio.
2815 */
2816 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2817 if (ret > bmd->bi_size)
2818 ret -= bmd->bi_size;
2819 else
2820 ret = 0;
2821
2822 /*
2823 * Don't send back more than was asked for. And if the bio
2824 * was empty, let the whole thing through because: "Note
2825 * that a block device *must* allow a single page to be
2826 * added to an empty bio."
2827 */
2828 rbd_assert(bvec->bv_len <= PAGE_SIZE);
2829 if (ret > (int) bvec->bv_len || !bmd->bi_size)
2830 ret = (int) bvec->bv_len;
2831
2832 return ret;
602adf40
YS
2833}
2834
2835static void rbd_free_disk(struct rbd_device *rbd_dev)
2836{
2837 struct gendisk *disk = rbd_dev->disk;
2838
2839 if (!disk)
2840 return;
2841
a0cab924
AE
2842 rbd_dev->disk = NULL;
2843 if (disk->flags & GENHD_FL_UP) {
602adf40 2844 del_gendisk(disk);
a0cab924
AE
2845 if (disk->queue)
2846 blk_cleanup_queue(disk->queue);
2847 }
602adf40
YS
2848 put_disk(disk);
2849}
2850
788e2df3
AE
2851static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2852 const char *object_name,
2853 u64 offset, u64 length,
80ef15bf 2854 void *buf, u64 *version)
788e2df3
AE
2855
2856{
2169238d 2857 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
788e2df3 2858 struct rbd_obj_request *obj_request;
788e2df3
AE
2859 struct page **pages = NULL;
2860 u32 page_count;
1ceae7ef 2861 size_t size;
788e2df3
AE
2862 int ret;
2863
2864 page_count = (u32) calc_pages_for(offset, length);
2865 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2866 if (IS_ERR(pages))
2867 ret = PTR_ERR(pages);
2868
2869 ret = -ENOMEM;
2870 obj_request = rbd_obj_request_create(object_name, offset, length,
36be9a76 2871 OBJ_REQUEST_PAGES);
788e2df3
AE
2872 if (!obj_request)
2873 goto out;
2874
2875 obj_request->pages = pages;
2876 obj_request->page_count = page_count;
2877
430c28c3 2878 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
788e2df3
AE
2879 if (!obj_request->osd_req)
2880 goto out;
2881
c99d2d4a
AE
2882 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2883 offset, length, 0, 0);
406e2c9f 2884 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
a4ce40a9 2885 obj_request->pages,
44cd188d
AE
2886 obj_request->length,
2887 obj_request->offset & ~PAGE_MASK,
2888 false, false);
9d4df01f 2889 rbd_osd_req_format_read(obj_request);
430c28c3 2890
788e2df3
AE
2891 ret = rbd_obj_request_submit(osdc, obj_request);
2892 if (ret)
2893 goto out;
2894 ret = rbd_obj_request_wait(obj_request);
2895 if (ret)
2896 goto out;
2897
2898 ret = obj_request->result;
2899 if (ret < 0)
2900 goto out;
1ceae7ef
AE
2901
2902 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2903 size = (size_t) obj_request->xferred;
903bb32e 2904 ceph_copy_from_page_vector(pages, buf, 0, size);
23ed6e13
AE
2905 rbd_assert(size <= (size_t) INT_MAX);
2906 ret = (int) size;
788e2df3
AE
2907 if (version)
2908 *version = obj_request->version;
2909out:
2910 if (obj_request)
2911 rbd_obj_request_put(obj_request);
2912 else
2913 ceph_release_page_vector(pages, page_count);
2914
2915 return ret;
2916}
2917
602adf40 2918/*
4156d998
AE
2919 * Read the complete header for the given rbd device.
2920 *
2921 * Returns a pointer to a dynamically-allocated buffer containing
2922 * the complete and validated header. Caller can pass the address
2923 * of a variable that will be filled in with the version of the
2924 * header object at the time it was read.
2925 *
2926 * Returns a pointer-coded errno if a failure occurs.
602adf40 2927 */
4156d998
AE
2928static struct rbd_image_header_ondisk *
2929rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
602adf40 2930{
4156d998 2931 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 2932 u32 snap_count = 0;
4156d998
AE
2933 u64 names_size = 0;
2934 u32 want_count;
2935 int ret;
602adf40 2936
00f1f36f 2937 /*
4156d998
AE
2938 * The complete header will include an array of its 64-bit
2939 * snapshot ids, followed by the names of those snapshots as
2940 * a contiguous block of NUL-terminated strings. Note that
2941 * the number of snapshots could change by the time we read
2942 * it in, in which case we re-read it.
00f1f36f 2943 */
4156d998
AE
2944 do {
2945 size_t size;
2946
2947 kfree(ondisk);
2948
2949 size = sizeof (*ondisk);
2950 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2951 size += names_size;
2952 ondisk = kmalloc(size, GFP_KERNEL);
2953 if (!ondisk)
2954 return ERR_PTR(-ENOMEM);
2955
788e2df3 2956 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
80ef15bf 2957 0, size, ondisk, version);
4156d998
AE
2958 if (ret < 0)
2959 goto out_err;
2960 if (WARN_ON((size_t) ret < size)) {
2961 ret = -ENXIO;
06ecc6cb
AE
2962 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2963 size, ret);
4156d998
AE
2964 goto out_err;
2965 }
2966 if (!rbd_dev_ondisk_valid(ondisk)) {
2967 ret = -ENXIO;
06ecc6cb 2968 rbd_warn(rbd_dev, "invalid header");
4156d998 2969 goto out_err;
81e759fb 2970 }
602adf40 2971
4156d998
AE
2972 names_size = le64_to_cpu(ondisk->snap_names_len);
2973 want_count = snap_count;
2974 snap_count = le32_to_cpu(ondisk->snap_count);
2975 } while (snap_count != want_count);
00f1f36f 2976
4156d998 2977 return ondisk;
00f1f36f 2978
4156d998
AE
2979out_err:
2980 kfree(ondisk);
2981
2982 return ERR_PTR(ret);
2983}
2984
2985/*
2986 * reload the ondisk the header
2987 */
2988static int rbd_read_header(struct rbd_device *rbd_dev,
2989 struct rbd_image_header *header)
2990{
2991 struct rbd_image_header_ondisk *ondisk;
2992 u64 ver = 0;
2993 int ret;
602adf40 2994
4156d998
AE
2995 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2996 if (IS_ERR(ondisk))
2997 return PTR_ERR(ondisk);
2998 ret = rbd_header_from_disk(header, ondisk);
2999 if (ret >= 0)
3000 header->obj_version = ver;
3001 kfree(ondisk);
3002
3003 return ret;
602adf40
YS
3004}
3005
41f38c2b 3006static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
dfc5606d
YS
3007{
3008 struct rbd_snap *snap;
a0593290 3009 struct rbd_snap *next;
dfc5606d 3010
6087b51b
AE
3011 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) {
3012 list_del(&snap->node);
3013 rbd_snap_destroy(snap);
3014 }
dfc5606d
YS
3015}
3016
9478554a
AE
3017static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
3018{
3019 sector_t size;
3020
0d7dbfce 3021 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
9478554a
AE
3022 return;
3023
3024 size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
3025 dout("setting size to %llu sectors", (unsigned long long) size);
3026 rbd_dev->mapping.size = (u64) size;
3027 set_capacity(rbd_dev->disk, size);
3028}
3029
602adf40
YS
3030/*
3031 * only read the first part of the ondisk header, without the snaps info
3032 */
117973fb 3033static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
602adf40
YS
3034{
3035 int ret;
3036 struct rbd_image_header h;
602adf40
YS
3037
3038 ret = rbd_read_header(rbd_dev, &h);
3039 if (ret < 0)
3040 return ret;
3041
a51aa0c0
JD
3042 down_write(&rbd_dev->header_rwsem);
3043
9478554a
AE
3044 /* Update image size, and check for resize of mapped image */
3045 rbd_dev->header.image_size = h.image_size;
3046 rbd_update_mapping_size(rbd_dev);
9db4b3e3 3047
849b4260 3048 /* rbd_dev->header.object_prefix shouldn't change */
602adf40 3049 kfree(rbd_dev->header.snap_sizes);
849b4260 3050 kfree(rbd_dev->header.snap_names);
d1d25646
JD
3051 /* osd requests may still refer to snapc */
3052 ceph_put_snap_context(rbd_dev->header.snapc);
602adf40 3053
b813623a
AE
3054 if (hver)
3055 *hver = h.obj_version;
a71b891b 3056 rbd_dev->header.obj_version = h.obj_version;
93a24e08 3057 rbd_dev->header.image_size = h.image_size;
602adf40
YS
3058 rbd_dev->header.snapc = h.snapc;
3059 rbd_dev->header.snap_names = h.snap_names;
3060 rbd_dev->header.snap_sizes = h.snap_sizes;
849b4260
AE
3061 /* Free the extra copy of the object prefix */
3062 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
3063 kfree(h.object_prefix);
3064
304f6808 3065 ret = rbd_dev_snaps_update(rbd_dev);
dfc5606d 3066
c666601a 3067 up_write(&rbd_dev->header_rwsem);
602adf40 3068
dfc5606d 3069 return ret;
602adf40
YS
3070}
3071
117973fb 3072static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1fe5e993
AE
3073{
3074 int ret;
3075
117973fb 3076 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1fe5e993 3077 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
117973fb
AE
3078 if (rbd_dev->image_format == 1)
3079 ret = rbd_dev_v1_refresh(rbd_dev, hver);
3080 else
3081 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1fe5e993 3082 mutex_unlock(&ctl_mutex);
d98df63e 3083 revalidate_disk(rbd_dev->disk);
522a0cc0
AE
3084 if (ret)
3085 rbd_warn(rbd_dev, "got notification but failed to "
3086 " update snaps: %d\n", ret);
1fe5e993
AE
3087
3088 return ret;
3089}
3090
602adf40
YS
3091static int rbd_init_disk(struct rbd_device *rbd_dev)
3092{
3093 struct gendisk *disk;
3094 struct request_queue *q;
593a9e7b 3095 u64 segment_size;
602adf40 3096
602adf40 3097 /* create gendisk info */
602adf40
YS
3098 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3099 if (!disk)
1fcdb8aa 3100 return -ENOMEM;
602adf40 3101
f0f8cef5 3102 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 3103 rbd_dev->dev_id);
602adf40
YS
3104 disk->major = rbd_dev->major;
3105 disk->first_minor = 0;
3106 disk->fops = &rbd_bd_ops;
3107 disk->private_data = rbd_dev;
3108
bf0d5f50 3109 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
602adf40
YS
3110 if (!q)
3111 goto out_disk;
029bcbd8 3112
593a9e7b
AE
3113 /* We use the default size, but let's be explicit about it. */
3114 blk_queue_physical_block_size(q, SECTOR_SIZE);
3115
029bcbd8 3116 /* set io sizes to object size */
593a9e7b
AE
3117 segment_size = rbd_obj_bytes(&rbd_dev->header);
3118 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3119 blk_queue_max_segment_size(q, segment_size);
3120 blk_queue_io_min(q, segment_size);
3121 blk_queue_io_opt(q, segment_size);
029bcbd8 3122
602adf40
YS
3123 blk_queue_merge_bvec(q, rbd_merge_bvec);
3124 disk->queue = q;
3125
3126 q->queuedata = rbd_dev;
3127
3128 rbd_dev->disk = disk;
602adf40 3129
12f02944
AE
3130 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
3131
602adf40 3132 return 0;
602adf40
YS
3133out_disk:
3134 put_disk(disk);
1fcdb8aa
AE
3135
3136 return -ENOMEM;
602adf40
YS
3137}
3138
dfc5606d
YS
3139/*
3140 sysfs
3141*/
3142
593a9e7b
AE
3143static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3144{
3145 return container_of(dev, struct rbd_device, dev);
3146}
3147
dfc5606d
YS
3148static ssize_t rbd_size_show(struct device *dev,
3149 struct device_attribute *attr, char *buf)
3150{
593a9e7b 3151 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0
JD
3152 sector_t size;
3153
3154 down_read(&rbd_dev->header_rwsem);
3155 size = get_capacity(rbd_dev->disk);
3156 up_read(&rbd_dev->header_rwsem);
dfc5606d 3157
a51aa0c0 3158 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
dfc5606d
YS
3159}
3160
34b13184
AE
3161/*
3162 * Note this shows the features for whatever's mapped, which is not
3163 * necessarily the base image.
3164 */
3165static ssize_t rbd_features_show(struct device *dev,
3166 struct device_attribute *attr, char *buf)
3167{
3168 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3169
3170 return sprintf(buf, "0x%016llx\n",
3171 (unsigned long long) rbd_dev->mapping.features);
3172}
3173
dfc5606d
YS
3174static ssize_t rbd_major_show(struct device *dev,
3175 struct device_attribute *attr, char *buf)
3176{
593a9e7b 3177 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 3178
dfc5606d
YS
3179 return sprintf(buf, "%d\n", rbd_dev->major);
3180}
3181
3182static ssize_t rbd_client_id_show(struct device *dev,
3183 struct device_attribute *attr, char *buf)
602adf40 3184{
593a9e7b 3185 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3186
1dbb4399
AE
3187 return sprintf(buf, "client%lld\n",
3188 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
3189}
3190
dfc5606d
YS
3191static ssize_t rbd_pool_show(struct device *dev,
3192 struct device_attribute *attr, char *buf)
602adf40 3193{
593a9e7b 3194 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3195
0d7dbfce 3196 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
3197}
3198
9bb2f334
AE
3199static ssize_t rbd_pool_id_show(struct device *dev,
3200 struct device_attribute *attr, char *buf)
3201{
3202 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3203
0d7dbfce
AE
3204 return sprintf(buf, "%llu\n",
3205 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
3206}
3207
dfc5606d
YS
3208static ssize_t rbd_name_show(struct device *dev,
3209 struct device_attribute *attr, char *buf)
3210{
593a9e7b 3211 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3212
a92ffdf8
AE
3213 if (rbd_dev->spec->image_name)
3214 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3215
3216 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
3217}
3218
589d30e0
AE
3219static ssize_t rbd_image_id_show(struct device *dev,
3220 struct device_attribute *attr, char *buf)
3221{
3222 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3223
0d7dbfce 3224 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
3225}
3226
34b13184
AE
3227/*
3228 * Shows the name of the currently-mapped snapshot (or
3229 * RBD_SNAP_HEAD_NAME for the base image).
3230 */
dfc5606d
YS
3231static ssize_t rbd_snap_show(struct device *dev,
3232 struct device_attribute *attr,
3233 char *buf)
3234{
593a9e7b 3235 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3236
0d7dbfce 3237 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
3238}
3239
86b00e0d
AE
3240/*
3241 * For an rbd v2 image, shows the pool id, image id, and snapshot id
3242 * for the parent image. If there is no parent, simply shows
3243 * "(no parent image)".
3244 */
3245static ssize_t rbd_parent_show(struct device *dev,
3246 struct device_attribute *attr,
3247 char *buf)
3248{
3249 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3250 struct rbd_spec *spec = rbd_dev->parent_spec;
3251 int count;
3252 char *bufp = buf;
3253
3254 if (!spec)
3255 return sprintf(buf, "(no parent image)\n");
3256
3257 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3258 (unsigned long long) spec->pool_id, spec->pool_name);
3259 if (count < 0)
3260 return count;
3261 bufp += count;
3262
3263 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3264 spec->image_name ? spec->image_name : "(unknown)");
3265 if (count < 0)
3266 return count;
3267 bufp += count;
3268
3269 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3270 (unsigned long long) spec->snap_id, spec->snap_name);
3271 if (count < 0)
3272 return count;
3273 bufp += count;
3274
3275 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3276 if (count < 0)
3277 return count;
3278 bufp += count;
3279
3280 return (ssize_t) (bufp - buf);
3281}
3282
dfc5606d
YS
3283static ssize_t rbd_image_refresh(struct device *dev,
3284 struct device_attribute *attr,
3285 const char *buf,
3286 size_t size)
3287{
593a9e7b 3288 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 3289 int ret;
602adf40 3290
117973fb 3291 ret = rbd_dev_refresh(rbd_dev, NULL);
b813623a
AE
3292
3293 return ret < 0 ? ret : size;
dfc5606d 3294}
602adf40 3295
dfc5606d 3296static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 3297static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d
YS
3298static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3299static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3300static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 3301static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 3302static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 3303static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
3304static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3305static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
86b00e0d 3306static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
3307
3308static struct attribute *rbd_attrs[] = {
3309 &dev_attr_size.attr,
34b13184 3310 &dev_attr_features.attr,
dfc5606d
YS
3311 &dev_attr_major.attr,
3312 &dev_attr_client_id.attr,
3313 &dev_attr_pool.attr,
9bb2f334 3314 &dev_attr_pool_id.attr,
dfc5606d 3315 &dev_attr_name.attr,
589d30e0 3316 &dev_attr_image_id.attr,
dfc5606d 3317 &dev_attr_current_snap.attr,
86b00e0d 3318 &dev_attr_parent.attr,
dfc5606d 3319 &dev_attr_refresh.attr,
dfc5606d
YS
3320 NULL
3321};
3322
3323static struct attribute_group rbd_attr_group = {
3324 .attrs = rbd_attrs,
3325};
3326
3327static const struct attribute_group *rbd_attr_groups[] = {
3328 &rbd_attr_group,
3329 NULL
3330};
3331
3332static void rbd_sysfs_dev_release(struct device *dev)
3333{
3334}
3335
3336static struct device_type rbd_device_type = {
3337 .name = "rbd",
3338 .groups = rbd_attr_groups,
3339 .release = rbd_sysfs_dev_release,
3340};
3341
8b8fb99c
AE
3342static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3343{
3344 kref_get(&spec->kref);
3345
3346 return spec;
3347}
3348
3349static void rbd_spec_free(struct kref *kref);
3350static void rbd_spec_put(struct rbd_spec *spec)
3351{
3352 if (spec)
3353 kref_put(&spec->kref, rbd_spec_free);
3354}
3355
3356static struct rbd_spec *rbd_spec_alloc(void)
3357{
3358 struct rbd_spec *spec;
3359
3360 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3361 if (!spec)
3362 return NULL;
3363 kref_init(&spec->kref);
3364
8b8fb99c
AE
3365 return spec;
3366}
3367
3368static void rbd_spec_free(struct kref *kref)
3369{
3370 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3371
3372 kfree(spec->pool_name);
3373 kfree(spec->image_id);
3374 kfree(spec->image_name);
3375 kfree(spec->snap_name);
3376 kfree(spec);
3377}
3378
cc344fa1 3379static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
c53d5893
AE
3380 struct rbd_spec *spec)
3381{
3382 struct rbd_device *rbd_dev;
3383
3384 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3385 if (!rbd_dev)
3386 return NULL;
3387
3388 spin_lock_init(&rbd_dev->lock);
6d292906 3389 rbd_dev->flags = 0;
c53d5893
AE
3390 INIT_LIST_HEAD(&rbd_dev->node);
3391 INIT_LIST_HEAD(&rbd_dev->snaps);
3392 init_rwsem(&rbd_dev->header_rwsem);
3393
3394 rbd_dev->spec = spec;
3395 rbd_dev->rbd_client = rbdc;
3396
0903e875
AE
3397 /* Initialize the layout used for all rbd requests */
3398
3399 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3400 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3401 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3402 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3403
c53d5893
AE
3404 return rbd_dev;
3405}
3406
3407static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3408{
86b00e0d 3409 rbd_spec_put(rbd_dev->parent_spec);
c53d5893
AE
3410 kfree(rbd_dev->header_name);
3411 rbd_put_client(rbd_dev->rbd_client);
3412 rbd_spec_put(rbd_dev->spec);
3413 kfree(rbd_dev);
3414}
3415
6087b51b 3416static void rbd_snap_destroy(struct rbd_snap *snap)
dfc5606d 3417{
3e83b65b
AE
3418 kfree(snap->name);
3419 kfree(snap);
dfc5606d
YS
3420}
3421
6087b51b 3422static struct rbd_snap *rbd_snap_create(struct rbd_device *rbd_dev,
c8d18425 3423 const char *snap_name,
34b13184
AE
3424 u64 snap_id, u64 snap_size,
3425 u64 snap_features)
dfc5606d 3426{
4e891e0a 3427 struct rbd_snap *snap;
4e891e0a
AE
3428
3429 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
dfc5606d 3430 if (!snap)
4e891e0a
AE
3431 return ERR_PTR(-ENOMEM);
3432
6e584f52 3433 snap->name = snap_name;
c8d18425
AE
3434 snap->id = snap_id;
3435 snap->size = snap_size;
34b13184 3436 snap->features = snap_features;
4e891e0a
AE
3437
3438 return snap;
dfc5606d
YS
3439}
3440
6e584f52
AE
3441/*
3442 * Returns a dynamically-allocated snapshot name if successful, or a
3443 * pointer-coded error otherwise.
3444 */
cd892126
AE
3445static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
3446 u64 *snap_size, u64 *snap_features)
3447{
3448 char *snap_name;
6e584f52 3449 int i;
cd892126
AE
3450
3451 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3452
cd892126
AE
3453 /* Skip over names until we find the one we are looking for */
3454
3455 snap_name = rbd_dev->header.snap_names;
6e584f52 3456 for (i = 0; i < which; i++)
cd892126
AE
3457 snap_name += strlen(snap_name) + 1;
3458
6e584f52
AE
3459 snap_name = kstrdup(snap_name, GFP_KERNEL);
3460 if (!snap_name)
3461 return ERR_PTR(-ENOMEM);
3462
3463 *snap_size = rbd_dev->header.snap_sizes[which];
3464 *snap_features = 0; /* No features for v1 */
3465
cd892126
AE
3466 return snap_name;
3467}
3468
9d475de5
AE
3469/*
3470 * Get the size and object order for an image snapshot, or if
3471 * snap_id is CEPH_NOSNAP, gets this information for the base
3472 * image.
3473 */
3474static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3475 u8 *order, u64 *snap_size)
3476{
3477 __le64 snapid = cpu_to_le64(snap_id);
3478 int ret;
3479 struct {
3480 u8 order;
3481 __le64 size;
3482 } __attribute__ ((packed)) size_buf = { 0 };
3483
36be9a76 3484 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
9d475de5 3485 "rbd", "get_size",
4157976b
AE
3486 &snapid, sizeof (snapid),
3487 &size_buf, sizeof (size_buf), NULL);
36be9a76 3488 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
3489 if (ret < 0)
3490 return ret;
57385b51
AE
3491 if (ret < sizeof (size_buf))
3492 return -ERANGE;
9d475de5 3493
c86f86e9
AE
3494 if (order)
3495 *order = size_buf.order;
9d475de5
AE
3496 *snap_size = le64_to_cpu(size_buf.size);
3497
3498 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
57385b51
AE
3499 (unsigned long long)snap_id, (unsigned int)*order,
3500 (unsigned long long)*snap_size);
9d475de5
AE
3501
3502 return 0;
3503}
3504
3505static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3506{
3507 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3508 &rbd_dev->header.obj_order,
3509 &rbd_dev->header.image_size);
3510}
3511
1e130199
AE
3512static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3513{
3514 void *reply_buf;
3515 int ret;
3516 void *p;
3517
3518 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3519 if (!reply_buf)
3520 return -ENOMEM;
3521
36be9a76 3522 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 3523 "rbd", "get_object_prefix", NULL, 0,
07b2391f 3524 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
36be9a76 3525 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
3526 if (ret < 0)
3527 goto out;
3528
3529 p = reply_buf;
3530 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
57385b51
AE
3531 p + ret, NULL, GFP_NOIO);
3532 ret = 0;
1e130199
AE
3533
3534 if (IS_ERR(rbd_dev->header.object_prefix)) {
3535 ret = PTR_ERR(rbd_dev->header.object_prefix);
3536 rbd_dev->header.object_prefix = NULL;
3537 } else {
3538 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
3539 }
1e130199
AE
3540out:
3541 kfree(reply_buf);
3542
3543 return ret;
3544}
3545
b1b5402a
AE
3546static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3547 u64 *snap_features)
3548{
3549 __le64 snapid = cpu_to_le64(snap_id);
3550 struct {
3551 __le64 features;
3552 __le64 incompat;
4157976b 3553 } __attribute__ ((packed)) features_buf = { 0 };
d889140c 3554 u64 incompat;
b1b5402a
AE
3555 int ret;
3556
36be9a76 3557 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b1b5402a 3558 "rbd", "get_features",
4157976b
AE
3559 &snapid, sizeof (snapid),
3560 &features_buf, sizeof (features_buf), NULL);
36be9a76 3561 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
3562 if (ret < 0)
3563 return ret;
57385b51
AE
3564 if (ret < sizeof (features_buf))
3565 return -ERANGE;
d889140c
AE
3566
3567 incompat = le64_to_cpu(features_buf.incompat);
5cbf6f12 3568 if (incompat & ~RBD_FEATURES_SUPPORTED)
b8f5c6ed 3569 return -ENXIO;
d889140c 3570
b1b5402a
AE
3571 *snap_features = le64_to_cpu(features_buf.features);
3572
3573 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
57385b51
AE
3574 (unsigned long long)snap_id,
3575 (unsigned long long)*snap_features,
3576 (unsigned long long)le64_to_cpu(features_buf.incompat));
b1b5402a
AE
3577
3578 return 0;
3579}
3580
3581static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3582{
3583 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3584 &rbd_dev->header.features);
3585}
3586
86b00e0d
AE
3587static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3588{
3589 struct rbd_spec *parent_spec;
3590 size_t size;
3591 void *reply_buf = NULL;
3592 __le64 snapid;
3593 void *p;
3594 void *end;
3595 char *image_id;
3596 u64 overlap;
86b00e0d
AE
3597 int ret;
3598
3599 parent_spec = rbd_spec_alloc();
3600 if (!parent_spec)
3601 return -ENOMEM;
3602
3603 size = sizeof (__le64) + /* pool_id */
3604 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3605 sizeof (__le64) + /* snap_id */
3606 sizeof (__le64); /* overlap */
3607 reply_buf = kmalloc(size, GFP_KERNEL);
3608 if (!reply_buf) {
3609 ret = -ENOMEM;
3610 goto out_err;
3611 }
3612
3613 snapid = cpu_to_le64(CEPH_NOSNAP);
36be9a76 3614 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
86b00e0d 3615 "rbd", "get_parent",
4157976b
AE
3616 &snapid, sizeof (snapid),
3617 reply_buf, size, NULL);
36be9a76 3618 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
86b00e0d
AE
3619 if (ret < 0)
3620 goto out_err;
3621
86b00e0d 3622 p = reply_buf;
57385b51
AE
3623 end = reply_buf + ret;
3624 ret = -ERANGE;
86b00e0d
AE
3625 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3626 if (parent_spec->pool_id == CEPH_NOPOOL)
3627 goto out; /* No parent? No problem. */
3628
0903e875
AE
3629 /* The ceph file layout needs to fit pool id in 32 bits */
3630
3631 ret = -EIO;
57385b51
AE
3632 if (WARN_ON(parent_spec->pool_id > (u64)U32_MAX))
3633 goto out_err;
0903e875 3634
979ed480 3635 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
3636 if (IS_ERR(image_id)) {
3637 ret = PTR_ERR(image_id);
3638 goto out_err;
3639 }
3640 parent_spec->image_id = image_id;
3641 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3642 ceph_decode_64_safe(&p, end, overlap, out_err);
3643
3644 rbd_dev->parent_overlap = overlap;
3645 rbd_dev->parent_spec = parent_spec;
3646 parent_spec = NULL; /* rbd_dev now owns this */
3647out:
3648 ret = 0;
3649out_err:
3650 kfree(reply_buf);
3651 rbd_spec_put(parent_spec);
3652
3653 return ret;
3654}
3655
cc070d59
AE
3656static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3657{
3658 struct {
3659 __le64 stripe_unit;
3660 __le64 stripe_count;
3661 } __attribute__ ((packed)) striping_info_buf = { 0 };
3662 size_t size = sizeof (striping_info_buf);
3663 void *p;
3664 u64 obj_size;
3665 u64 stripe_unit;
3666 u64 stripe_count;
3667 int ret;
3668
3669 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3670 "rbd", "get_stripe_unit_count", NULL, 0,
3671 (char *)&striping_info_buf, size, NULL);
3672 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3673 if (ret < 0)
3674 return ret;
3675 if (ret < size)
3676 return -ERANGE;
3677
3678 /*
3679 * We don't actually support the "fancy striping" feature
3680 * (STRIPINGV2) yet, but if the striping sizes are the
3681 * defaults the behavior is the same as before. So find
3682 * out, and only fail if the image has non-default values.
3683 */
3684 ret = -EINVAL;
3685 obj_size = (u64)1 << rbd_dev->header.obj_order;
3686 p = &striping_info_buf;
3687 stripe_unit = ceph_decode_64(&p);
3688 if (stripe_unit != obj_size) {
3689 rbd_warn(rbd_dev, "unsupported stripe unit "
3690 "(got %llu want %llu)",
3691 stripe_unit, obj_size);
3692 return -EINVAL;
3693 }
3694 stripe_count = ceph_decode_64(&p);
3695 if (stripe_count != 1) {
3696 rbd_warn(rbd_dev, "unsupported stripe count "
3697 "(got %llu want 1)", stripe_count);
3698 return -EINVAL;
3699 }
3700 rbd_dev->stripe_unit = stripe_unit;
3701 rbd_dev->stripe_count = stripe_count;
3702
3703 return 0;
3704}
3705
9e15b77d
AE
3706static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3707{
3708 size_t image_id_size;
3709 char *image_id;
3710 void *p;
3711 void *end;
3712 size_t size;
3713 void *reply_buf = NULL;
3714 size_t len = 0;
3715 char *image_name = NULL;
3716 int ret;
3717
3718 rbd_assert(!rbd_dev->spec->image_name);
3719
69e7a02f
AE
3720 len = strlen(rbd_dev->spec->image_id);
3721 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
3722 image_id = kmalloc(image_id_size, GFP_KERNEL);
3723 if (!image_id)
3724 return NULL;
3725
3726 p = image_id;
4157976b 3727 end = image_id + image_id_size;
57385b51 3728 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
9e15b77d
AE
3729
3730 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3731 reply_buf = kmalloc(size, GFP_KERNEL);
3732 if (!reply_buf)
3733 goto out;
3734
36be9a76 3735 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
9e15b77d
AE
3736 "rbd", "dir_get_name",
3737 image_id, image_id_size,
4157976b 3738 reply_buf, size, NULL);
9e15b77d
AE
3739 if (ret < 0)
3740 goto out;
3741 p = reply_buf;
f40eb349
AE
3742 end = reply_buf + ret;
3743
9e15b77d
AE
3744 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3745 if (IS_ERR(image_name))
3746 image_name = NULL;
3747 else
3748 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3749out:
3750 kfree(reply_buf);
3751 kfree(image_id);
3752
3753 return image_name;
3754}
3755
3756/*
3757 * When a parent image gets probed, we only have the pool, image,
3758 * and snapshot ids but not the names of any of them. This call
3759 * is made later to fill in those names. It has to be done after
3760 * rbd_dev_snaps_update() has completed because some of the
3761 * information (in particular, snapshot name) is not available
3762 * until then.
3763 */
3764static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
3765{
3766 struct ceph_osd_client *osdc;
3767 const char *name;
3768 void *reply_buf = NULL;
3769 int ret;
3770
3771 if (rbd_dev->spec->pool_name)
3772 return 0; /* Already have the names */
3773
3774 /* Look up the pool name */
3775
3776 osdc = &rbd_dev->rbd_client->client->osdc;
3777 name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
935dc89f
AE
3778 if (!name) {
3779 rbd_warn(rbd_dev, "there is no pool with id %llu",
3780 rbd_dev->spec->pool_id); /* Really a BUG() */
3781 return -EIO;
3782 }
9e15b77d
AE
3783
3784 rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3785 if (!rbd_dev->spec->pool_name)
3786 return -ENOMEM;
3787
3788 /* Fetch the image name; tolerate failure here */
3789
3790 name = rbd_dev_image_name(rbd_dev);
69e7a02f 3791 if (name)
4157976b 3792 rbd_dev->spec->image_name = (char *)name;
69e7a02f 3793 else
06ecc6cb 3794 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d
AE
3795
3796 /* Look up the snapshot name. */
3797
3798 name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3799 if (!name) {
935dc89f
AE
3800 rbd_warn(rbd_dev, "no snapshot with id %llu",
3801 rbd_dev->spec->snap_id); /* Really a BUG() */
9e15b77d
AE
3802 ret = -EIO;
3803 goto out_err;
3804 }
3805 rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3806 if(!rbd_dev->spec->snap_name)
3807 goto out_err;
3808
3809 return 0;
3810out_err:
3811 kfree(reply_buf);
3812 kfree(rbd_dev->spec->pool_name);
3813 rbd_dev->spec->pool_name = NULL;
3814
3815 return ret;
3816}
3817
6e14b1a6 3818static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
35d489f9
AE
3819{
3820 size_t size;
3821 int ret;
3822 void *reply_buf;
3823 void *p;
3824 void *end;
3825 u64 seq;
3826 u32 snap_count;
3827 struct ceph_snap_context *snapc;
3828 u32 i;
3829
3830 /*
3831 * We'll need room for the seq value (maximum snapshot id),
3832 * snapshot count, and array of that many snapshot ids.
3833 * For now we have a fixed upper limit on the number we're
3834 * prepared to receive.
3835 */
3836 size = sizeof (__le64) + sizeof (__le32) +
3837 RBD_MAX_SNAP_COUNT * sizeof (__le64);
3838 reply_buf = kzalloc(size, GFP_KERNEL);
3839 if (!reply_buf)
3840 return -ENOMEM;
3841
36be9a76 3842 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 3843 "rbd", "get_snapcontext", NULL, 0,
07b2391f 3844 reply_buf, size, ver);
36be9a76 3845 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
3846 if (ret < 0)
3847 goto out;
3848
35d489f9 3849 p = reply_buf;
57385b51
AE
3850 end = reply_buf + ret;
3851 ret = -ERANGE;
35d489f9
AE
3852 ceph_decode_64_safe(&p, end, seq, out);
3853 ceph_decode_32_safe(&p, end, snap_count, out);
3854
3855 /*
3856 * Make sure the reported number of snapshot ids wouldn't go
3857 * beyond the end of our buffer. But before checking that,
3858 * make sure the computed size of the snapshot context we
3859 * allocate is representable in a size_t.
3860 */
3861 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3862 / sizeof (u64)) {
3863 ret = -EINVAL;
3864 goto out;
3865 }
3866 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3867 goto out;
3868
3869 size = sizeof (struct ceph_snap_context) +
3870 snap_count * sizeof (snapc->snaps[0]);
3871 snapc = kmalloc(size, GFP_KERNEL);
3872 if (!snapc) {
3873 ret = -ENOMEM;
3874 goto out;
3875 }
57385b51 3876 ret = 0;
35d489f9
AE
3877
3878 atomic_set(&snapc->nref, 1);
3879 snapc->seq = seq;
3880 snapc->num_snaps = snap_count;
3881 for (i = 0; i < snap_count; i++)
3882 snapc->snaps[i] = ceph_decode_64(&p);
3883
3884 rbd_dev->header.snapc = snapc;
3885
3886 dout(" snap context seq = %llu, snap_count = %u\n",
57385b51 3887 (unsigned long long)seq, (unsigned int)snap_count);
35d489f9
AE
3888out:
3889 kfree(reply_buf);
3890
57385b51 3891 return ret;
35d489f9
AE
3892}
3893
b8b1e2db
AE
3894static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3895{
3896 size_t size;
3897 void *reply_buf;
3898 __le64 snap_id;
3899 int ret;
3900 void *p;
3901 void *end;
b8b1e2db
AE
3902 char *snap_name;
3903
3904 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3905 reply_buf = kmalloc(size, GFP_KERNEL);
3906 if (!reply_buf)
3907 return ERR_PTR(-ENOMEM);
3908
acb1b6ca 3909 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
b8b1e2db 3910 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
36be9a76 3911 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b8b1e2db 3912 "rbd", "get_snapshot_name",
4157976b 3913 &snap_id, sizeof (snap_id),
07b2391f 3914 reply_buf, size, NULL);
36be9a76 3915 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
f40eb349
AE
3916 if (ret < 0) {
3917 snap_name = ERR_PTR(ret);
b8b1e2db 3918 goto out;
f40eb349 3919 }
b8b1e2db
AE
3920
3921 p = reply_buf;
f40eb349 3922 end = reply_buf + ret;
e5c35534 3923 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
f40eb349 3924 if (IS_ERR(snap_name))
b8b1e2db 3925 goto out;
b8b1e2db 3926
f40eb349
AE
3927 dout(" snap_id 0x%016llx snap_name = %s\n",
3928 (unsigned long long)le64_to_cpu(snap_id), snap_name);
b8b1e2db
AE
3929out:
3930 kfree(reply_buf);
3931
f40eb349 3932 return snap_name;
b8b1e2db
AE
3933}
3934
3935static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3936 u64 *snap_size, u64 *snap_features)
3937{
e0b49868 3938 u64 snap_id;
acb1b6ca
AE
3939 u64 size;
3940 u64 features;
3941 char *snap_name;
b8b1e2db
AE
3942 int ret;
3943
acb1b6ca 3944 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
b8b1e2db 3945 snap_id = rbd_dev->header.snapc->snaps[which];
acb1b6ca 3946 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
b8b1e2db 3947 if (ret)
acb1b6ca
AE
3948 goto out_err;
3949
3950 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
b8b1e2db 3951 if (ret)
acb1b6ca
AE
3952 goto out_err;
3953
3954 snap_name = rbd_dev_v2_snap_name(rbd_dev, which);
3955 if (!IS_ERR(snap_name)) {
3956 *snap_size = size;
3957 *snap_features = features;
3958 }
b8b1e2db 3959
acb1b6ca
AE
3960 return snap_name;
3961out_err:
3962 return ERR_PTR(ret);
b8b1e2db
AE
3963}
3964
3965static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3966 u64 *snap_size, u64 *snap_features)
3967{
3968 if (rbd_dev->image_format == 1)
3969 return rbd_dev_v1_snap_info(rbd_dev, which,
3970 snap_size, snap_features);
3971 if (rbd_dev->image_format == 2)
3972 return rbd_dev_v2_snap_info(rbd_dev, which,
3973 snap_size, snap_features);
3974 return ERR_PTR(-EINVAL);
3975}
3976
117973fb
AE
3977static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3978{
3979 int ret;
3980 __u8 obj_order;
3981
3982 down_write(&rbd_dev->header_rwsem);
3983
3984 /* Grab old order first, to see if it changes */
3985
3986 obj_order = rbd_dev->header.obj_order,
3987 ret = rbd_dev_v2_image_size(rbd_dev);
3988 if (ret)
3989 goto out;
3990 if (rbd_dev->header.obj_order != obj_order) {
3991 ret = -EIO;
3992 goto out;
3993 }
3994 rbd_update_mapping_size(rbd_dev);
3995
3996 ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3997 dout("rbd_dev_v2_snap_context returned %d\n", ret);
3998 if (ret)
3999 goto out;
4000 ret = rbd_dev_snaps_update(rbd_dev);
4001 dout("rbd_dev_snaps_update returned %d\n", ret);
4002 if (ret)
4003 goto out;
117973fb
AE
4004out:
4005 up_write(&rbd_dev->header_rwsem);
4006
4007 return ret;
4008}
4009
dfc5606d 4010/*
35938150
AE
4011 * Scan the rbd device's current snapshot list and compare it to the
4012 * newly-received snapshot context. Remove any existing snapshots
4013 * not present in the new snapshot context. Add a new snapshot for
4014 * any snaphots in the snapshot context not in the current list.
4015 * And verify there are no changes to snapshots we already know
4016 * about.
4017 *
4018 * Assumes the snapshots in the snapshot context are sorted by
4019 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
4020 * are also maintained in that order.)
522a0cc0
AE
4021 *
4022 * Note that any error occurs while updating the snapshot list
4023 * aborts the update, and the entire list is cleared. The snapshot
4024 * list becomes inconsistent at that point anyway, so it might as
4025 * well be empty.
dfc5606d 4026 */
304f6808 4027static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
dfc5606d 4028{
35938150
AE
4029 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4030 const u32 snap_count = snapc->num_snaps;
35938150
AE
4031 struct list_head *head = &rbd_dev->snaps;
4032 struct list_head *links = head->next;
4033 u32 index = 0;
522a0cc0 4034 int ret = 0;
dfc5606d 4035
522a0cc0 4036 dout("%s: snap count is %u\n", __func__, (unsigned int)snap_count);
35938150
AE
4037 while (index < snap_count || links != head) {
4038 u64 snap_id;
4039 struct rbd_snap *snap;
cd892126
AE
4040 char *snap_name;
4041 u64 snap_size = 0;
4042 u64 snap_features = 0;
dfc5606d 4043
35938150
AE
4044 snap_id = index < snap_count ? snapc->snaps[index]
4045 : CEPH_NOSNAP;
4046 snap = links != head ? list_entry(links, struct rbd_snap, node)
4047 : NULL;
aafb230e 4048 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
dfc5606d 4049
35938150
AE
4050 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
4051 struct list_head *next = links->next;
dfc5606d 4052
6d292906
AE
4053 /*
4054 * A previously-existing snapshot is not in
4055 * the new snap context.
4056 *
522a0cc0
AE
4057 * If the now-missing snapshot is the one
4058 * the image represents, clear its existence
4059 * flag so we can avoid sending any more
4060 * requests to it.
6d292906 4061 */
0d7dbfce 4062 if (rbd_dev->spec->snap_id == snap->id)
6d292906 4063 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3e83b65b 4064 dout("removing %ssnap id %llu\n",
0d7dbfce
AE
4065 rbd_dev->spec->snap_id == snap->id ?
4066 "mapped " : "",
522a0cc0 4067 (unsigned long long)snap->id);
6087b51b
AE
4068
4069 list_del(&snap->node);
4070 rbd_snap_destroy(snap);
35938150
AE
4071
4072 /* Done with this list entry; advance */
4073
4074 links = next;
dfc5606d
YS
4075 continue;
4076 }
35938150 4077
b8b1e2db
AE
4078 snap_name = rbd_dev_snap_info(rbd_dev, index,
4079 &snap_size, &snap_features);
522a0cc0
AE
4080 if (IS_ERR(snap_name)) {
4081 ret = PTR_ERR(snap_name);
4082 dout("failed to get snap info, error %d\n", ret);
4083 goto out_err;
4084 }
cd892126 4085
522a0cc0
AE
4086 dout("entry %u: snap_id = %llu\n", (unsigned int)snap_count,
4087 (unsigned long long)snap_id);
35938150
AE
4088 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
4089 struct rbd_snap *new_snap;
4090
4091 /* We haven't seen this snapshot before */
4092
6087b51b 4093 new_snap = rbd_snap_create(rbd_dev, snap_name,
cd892126 4094 snap_id, snap_size, snap_features);
9fcbb800 4095 if (IS_ERR(new_snap)) {
522a0cc0
AE
4096 ret = PTR_ERR(new_snap);
4097 dout(" failed to add dev, error %d\n", ret);
4098 goto out_err;
9fcbb800 4099 }
35938150
AE
4100
4101 /* New goes before existing, or at end of list */
4102
9fcbb800 4103 dout(" added dev%s\n", snap ? "" : " at end\n");
35938150
AE
4104 if (snap)
4105 list_add_tail(&new_snap->node, &snap->node);
4106 else
523f3258 4107 list_add_tail(&new_snap->node, head);
35938150
AE
4108 } else {
4109 /* Already have this one */
4110
9fcbb800
AE
4111 dout(" already present\n");
4112
cd892126 4113 rbd_assert(snap->size == snap_size);
aafb230e 4114 rbd_assert(!strcmp(snap->name, snap_name));
cd892126 4115 rbd_assert(snap->features == snap_features);
35938150
AE
4116
4117 /* Done with this list entry; advance */
4118
4119 links = links->next;
dfc5606d 4120 }
35938150
AE
4121
4122 /* Advance to the next entry in the snapshot context */
4123
4124 index++;
dfc5606d 4125 }
9fcbb800 4126 dout("%s: done\n", __func__);
dfc5606d
YS
4127
4128 return 0;
522a0cc0
AE
4129out_err:
4130 rbd_remove_all_snaps(rbd_dev);
4131
4132 return ret;
dfc5606d
YS
4133}
4134
dfc5606d
YS
4135static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4136{
dfc5606d 4137 struct device *dev;
cd789ab9 4138 int ret;
dfc5606d
YS
4139
4140 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
dfc5606d 4141
cd789ab9 4142 dev = &rbd_dev->dev;
dfc5606d
YS
4143 dev->bus = &rbd_bus_type;
4144 dev->type = &rbd_device_type;
4145 dev->parent = &rbd_root_dev;
4146 dev->release = rbd_dev_release;
de71a297 4147 dev_set_name(dev, "%d", rbd_dev->dev_id);
dfc5606d 4148 ret = device_register(dev);
dfc5606d 4149
dfc5606d 4150 mutex_unlock(&ctl_mutex);
cd789ab9 4151
dfc5606d 4152 return ret;
602adf40
YS
4153}
4154
dfc5606d
YS
4155static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4156{
4157 device_unregister(&rbd_dev->dev);
4158}
4159
e2839308 4160static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
1ddbe94e
AE
4161
4162/*
499afd5b
AE
4163 * Get a unique rbd identifier for the given new rbd_dev, and add
4164 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 4165 */
e2839308 4166static void rbd_dev_id_get(struct rbd_device *rbd_dev)
b7f23c36 4167{
e2839308 4168 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
499afd5b
AE
4169
4170 spin_lock(&rbd_dev_list_lock);
4171 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4172 spin_unlock(&rbd_dev_list_lock);
e2839308
AE
4173 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4174 (unsigned long long) rbd_dev->dev_id);
1ddbe94e 4175}
b7f23c36 4176
1ddbe94e 4177/*
499afd5b
AE
4178 * Remove an rbd_dev from the global list, and record that its
4179 * identifier is no longer in use.
1ddbe94e 4180 */
e2839308 4181static void rbd_dev_id_put(struct rbd_device *rbd_dev)
1ddbe94e 4182{
d184f6bf 4183 struct list_head *tmp;
de71a297 4184 int rbd_id = rbd_dev->dev_id;
d184f6bf
AE
4185 int max_id;
4186
aafb230e 4187 rbd_assert(rbd_id > 0);
499afd5b 4188
e2839308
AE
4189 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4190 (unsigned long long) rbd_dev->dev_id);
499afd5b
AE
4191 spin_lock(&rbd_dev_list_lock);
4192 list_del_init(&rbd_dev->node);
d184f6bf
AE
4193
4194 /*
4195 * If the id being "put" is not the current maximum, there
4196 * is nothing special we need to do.
4197 */
e2839308 4198 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
d184f6bf
AE
4199 spin_unlock(&rbd_dev_list_lock);
4200 return;
4201 }
4202
4203 /*
4204 * We need to update the current maximum id. Search the
4205 * list to find out what it is. We're more likely to find
4206 * the maximum at the end, so search the list backward.
4207 */
4208 max_id = 0;
4209 list_for_each_prev(tmp, &rbd_dev_list) {
4210 struct rbd_device *rbd_dev;
4211
4212 rbd_dev = list_entry(tmp, struct rbd_device, node);
b213e0b1
AE
4213 if (rbd_dev->dev_id > max_id)
4214 max_id = rbd_dev->dev_id;
d184f6bf 4215 }
499afd5b 4216 spin_unlock(&rbd_dev_list_lock);
b7f23c36 4217
1ddbe94e 4218 /*
e2839308 4219 * The max id could have been updated by rbd_dev_id_get(), in
d184f6bf
AE
4220 * which case it now accurately reflects the new maximum.
4221 * Be careful not to overwrite the maximum value in that
4222 * case.
1ddbe94e 4223 */
e2839308
AE
4224 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4225 dout(" max dev id has been reset\n");
b7f23c36
AE
4226}
4227
e28fff26
AE
4228/*
4229 * Skips over white space at *buf, and updates *buf to point to the
4230 * first found non-space character (if any). Returns the length of
593a9e7b
AE
4231 * the token (string of non-white space characters) found. Note
4232 * that *buf must be terminated with '\0'.
e28fff26
AE
4233 */
4234static inline size_t next_token(const char **buf)
4235{
4236 /*
4237 * These are the characters that produce nonzero for
4238 * isspace() in the "C" and "POSIX" locales.
4239 */
4240 const char *spaces = " \f\n\r\t\v";
4241
4242 *buf += strspn(*buf, spaces); /* Find start of token */
4243
4244 return strcspn(*buf, spaces); /* Return token length */
4245}
4246
4247/*
4248 * Finds the next token in *buf, and if the provided token buffer is
4249 * big enough, copies the found token into it. The result, if
593a9e7b
AE
4250 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4251 * must be terminated with '\0' on entry.
e28fff26
AE
4252 *
4253 * Returns the length of the token found (not including the '\0').
4254 * Return value will be 0 if no token is found, and it will be >=
4255 * token_size if the token would not fit.
4256 *
593a9e7b 4257 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
4258 * found token. Note that this occurs even if the token buffer is
4259 * too small to hold it.
4260 */
4261static inline size_t copy_token(const char **buf,
4262 char *token,
4263 size_t token_size)
4264{
4265 size_t len;
4266
4267 len = next_token(buf);
4268 if (len < token_size) {
4269 memcpy(token, *buf, len);
4270 *(token + len) = '\0';
4271 }
4272 *buf += len;
4273
4274 return len;
4275}
4276
ea3352f4
AE
4277/*
4278 * Finds the next token in *buf, dynamically allocates a buffer big
4279 * enough to hold a copy of it, and copies the token into the new
4280 * buffer. The copy is guaranteed to be terminated with '\0'. Note
4281 * that a duplicate buffer is created even for a zero-length token.
4282 *
4283 * Returns a pointer to the newly-allocated duplicate, or a null
4284 * pointer if memory for the duplicate was not available. If
4285 * the lenp argument is a non-null pointer, the length of the token
4286 * (not including the '\0') is returned in *lenp.
4287 *
4288 * If successful, the *buf pointer will be updated to point beyond
4289 * the end of the found token.
4290 *
4291 * Note: uses GFP_KERNEL for allocation.
4292 */
4293static inline char *dup_token(const char **buf, size_t *lenp)
4294{
4295 char *dup;
4296 size_t len;
4297
4298 len = next_token(buf);
4caf35f9 4299 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
4300 if (!dup)
4301 return NULL;
ea3352f4
AE
4302 *(dup + len) = '\0';
4303 *buf += len;
4304
4305 if (lenp)
4306 *lenp = len;
4307
4308 return dup;
4309}
4310
a725f65e 4311/*
859c31df
AE
4312 * Parse the options provided for an "rbd add" (i.e., rbd image
4313 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4314 * and the data written is passed here via a NUL-terminated buffer.
4315 * Returns 0 if successful or an error code otherwise.
d22f76e7 4316 *
859c31df
AE
4317 * The information extracted from these options is recorded in
4318 * the other parameters which return dynamically-allocated
4319 * structures:
4320 * ceph_opts
4321 * The address of a pointer that will refer to a ceph options
4322 * structure. Caller must release the returned pointer using
4323 * ceph_destroy_options() when it is no longer needed.
4324 * rbd_opts
4325 * Address of an rbd options pointer. Fully initialized by
4326 * this function; caller must release with kfree().
4327 * spec
4328 * Address of an rbd image specification pointer. Fully
4329 * initialized by this function based on parsed options.
4330 * Caller must release with rbd_spec_put().
4331 *
4332 * The options passed take this form:
4333 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4334 * where:
4335 * <mon_addrs>
4336 * A comma-separated list of one or more monitor addresses.
4337 * A monitor address is an ip address, optionally followed
4338 * by a port number (separated by a colon).
4339 * I.e.: ip1[:port1][,ip2[:port2]...]
4340 * <options>
4341 * A comma-separated list of ceph and/or rbd options.
4342 * <pool_name>
4343 * The name of the rados pool containing the rbd image.
4344 * <image_name>
4345 * The name of the image in that pool to map.
4346 * <snap_id>
4347 * An optional snapshot id. If provided, the mapping will
4348 * present data from the image at the time that snapshot was
4349 * created. The image head is used if no snapshot id is
4350 * provided. Snapshot mappings are always read-only.
a725f65e 4351 */
859c31df 4352static int rbd_add_parse_args(const char *buf,
dc79b113 4353 struct ceph_options **ceph_opts,
859c31df
AE
4354 struct rbd_options **opts,
4355 struct rbd_spec **rbd_spec)
e28fff26 4356{
d22f76e7 4357 size_t len;
859c31df 4358 char *options;
0ddebc0c
AE
4359 const char *mon_addrs;
4360 size_t mon_addrs_size;
859c31df 4361 struct rbd_spec *spec = NULL;
4e9afeba 4362 struct rbd_options *rbd_opts = NULL;
859c31df 4363 struct ceph_options *copts;
dc79b113 4364 int ret;
e28fff26
AE
4365
4366 /* The first four tokens are required */
4367
7ef3214a 4368 len = next_token(&buf);
4fb5d671
AE
4369 if (!len) {
4370 rbd_warn(NULL, "no monitor address(es) provided");
4371 return -EINVAL;
4372 }
0ddebc0c 4373 mon_addrs = buf;
f28e565a 4374 mon_addrs_size = len + 1;
7ef3214a 4375 buf += len;
a725f65e 4376
dc79b113 4377 ret = -EINVAL;
f28e565a
AE
4378 options = dup_token(&buf, NULL);
4379 if (!options)
dc79b113 4380 return -ENOMEM;
4fb5d671
AE
4381 if (!*options) {
4382 rbd_warn(NULL, "no options provided");
4383 goto out_err;
4384 }
e28fff26 4385
859c31df
AE
4386 spec = rbd_spec_alloc();
4387 if (!spec)
f28e565a 4388 goto out_mem;
859c31df
AE
4389
4390 spec->pool_name = dup_token(&buf, NULL);
4391 if (!spec->pool_name)
4392 goto out_mem;
4fb5d671
AE
4393 if (!*spec->pool_name) {
4394 rbd_warn(NULL, "no pool name provided");
4395 goto out_err;
4396 }
e28fff26 4397
69e7a02f 4398 spec->image_name = dup_token(&buf, NULL);
859c31df 4399 if (!spec->image_name)
f28e565a 4400 goto out_mem;
4fb5d671
AE
4401 if (!*spec->image_name) {
4402 rbd_warn(NULL, "no image name provided");
4403 goto out_err;
4404 }
d4b125e9 4405
f28e565a
AE
4406 /*
4407 * Snapshot name is optional; default is to use "-"
4408 * (indicating the head/no snapshot).
4409 */
3feeb894 4410 len = next_token(&buf);
820a5f3e 4411 if (!len) {
3feeb894
AE
4412 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4413 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 4414 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 4415 ret = -ENAMETOOLONG;
f28e565a 4416 goto out_err;
849b4260 4417 }
4caf35f9 4418 spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
859c31df 4419 if (!spec->snap_name)
f28e565a 4420 goto out_mem;
859c31df 4421 *(spec->snap_name + len) = '\0';
e5c35534 4422
0ddebc0c 4423 /* Initialize all rbd options to the defaults */
e28fff26 4424
4e9afeba
AE
4425 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4426 if (!rbd_opts)
4427 goto out_mem;
4428
4429 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
d22f76e7 4430
859c31df 4431 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 4432 mon_addrs + mon_addrs_size - 1,
4e9afeba 4433 parse_rbd_opts_token, rbd_opts);
859c31df
AE
4434 if (IS_ERR(copts)) {
4435 ret = PTR_ERR(copts);
dc79b113
AE
4436 goto out_err;
4437 }
859c31df
AE
4438 kfree(options);
4439
4440 *ceph_opts = copts;
4e9afeba 4441 *opts = rbd_opts;
859c31df 4442 *rbd_spec = spec;
0ddebc0c 4443
dc79b113 4444 return 0;
f28e565a 4445out_mem:
dc79b113 4446 ret = -ENOMEM;
d22f76e7 4447out_err:
859c31df
AE
4448 kfree(rbd_opts);
4449 rbd_spec_put(spec);
f28e565a 4450 kfree(options);
d22f76e7 4451
dc79b113 4452 return ret;
a725f65e
AE
4453}
4454
589d30e0
AE
4455/*
4456 * An rbd format 2 image has a unique identifier, distinct from the
4457 * name given to it by the user. Internally, that identifier is
4458 * what's used to specify the names of objects related to the image.
4459 *
4460 * A special "rbd id" object is used to map an rbd image name to its
4461 * id. If that object doesn't exist, then there is no v2 rbd image
4462 * with the supplied name.
4463 *
4464 * This function will record the given rbd_dev's image_id field if
4465 * it can be determined, and in that case will return 0. If any
4466 * errors occur a negative errno will be returned and the rbd_dev's
4467 * image_id field will be unchanged (and should be NULL).
4468 */
4469static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4470{
4471 int ret;
4472 size_t size;
4473 char *object_name;
4474 void *response;
c0fba368 4475 char *image_id;
2f82ee54 4476
2c0d0a10
AE
4477 /*
4478 * When probing a parent image, the image id is already
4479 * known (and the image name likely is not). There's no
c0fba368
AE
4480 * need to fetch the image id again in this case. We
4481 * do still need to set the image format though.
2c0d0a10 4482 */
c0fba368
AE
4483 if (rbd_dev->spec->image_id) {
4484 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4485
2c0d0a10 4486 return 0;
c0fba368 4487 }
2c0d0a10 4488
589d30e0
AE
4489 /*
4490 * First, see if the format 2 image id file exists, and if
4491 * so, get the image's persistent id from it.
4492 */
69e7a02f 4493 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
589d30e0
AE
4494 object_name = kmalloc(size, GFP_NOIO);
4495 if (!object_name)
4496 return -ENOMEM;
0d7dbfce 4497 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
589d30e0
AE
4498 dout("rbd id object name is %s\n", object_name);
4499
4500 /* Response will be an encoded string, which includes a length */
4501
4502 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4503 response = kzalloc(size, GFP_NOIO);
4504 if (!response) {
4505 ret = -ENOMEM;
4506 goto out;
4507 }
4508
c0fba368
AE
4509 /* If it doesn't exist we'll assume it's a format 1 image */
4510
36be9a76 4511 ret = rbd_obj_method_sync(rbd_dev, object_name,
4157976b 4512 "rbd", "get_id", NULL, 0,
07b2391f 4513 response, RBD_IMAGE_ID_LEN_MAX, NULL);
36be9a76 4514 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
c0fba368
AE
4515 if (ret == -ENOENT) {
4516 image_id = kstrdup("", GFP_KERNEL);
4517 ret = image_id ? 0 : -ENOMEM;
4518 if (!ret)
4519 rbd_dev->image_format = 1;
4520 } else if (ret > sizeof (__le32)) {
4521 void *p = response;
4522
4523 image_id = ceph_extract_encoded_string(&p, p + ret,
979ed480 4524 NULL, GFP_NOIO);
c0fba368
AE
4525 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4526 if (!ret)
4527 rbd_dev->image_format = 2;
589d30e0 4528 } else {
c0fba368
AE
4529 ret = -EINVAL;
4530 }
4531
4532 if (!ret) {
4533 rbd_dev->spec->image_id = image_id;
4534 dout("image_id is %s\n", image_id);
589d30e0
AE
4535 }
4536out:
4537 kfree(response);
4538 kfree(object_name);
4539
4540 return ret;
4541}
4542
a30b71b9
AE
4543static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4544{
4545 int ret;
4546 size_t size;
4547
a30b71b9
AE
4548 /* Record the header object name for this rbd image. */
4549
69e7a02f 4550 size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
a30b71b9
AE
4551 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4552 if (!rbd_dev->header_name) {
4553 ret = -ENOMEM;
4554 goto out_err;
4555 }
0d7dbfce
AE
4556 sprintf(rbd_dev->header_name, "%s%s",
4557 rbd_dev->spec->image_name, RBD_SUFFIX);
a30b71b9
AE
4558
4559 /* Populate rbd image metadata */
4560
4561 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4562 if (ret < 0)
4563 goto out_err;
86b00e0d
AE
4564
4565 /* Version 1 images have no parent (no layering) */
4566
4567 rbd_dev->parent_spec = NULL;
4568 rbd_dev->parent_overlap = 0;
4569
a30b71b9
AE
4570 dout("discovered version 1 image, header name is %s\n",
4571 rbd_dev->header_name);
4572
4573 return 0;
4574
4575out_err:
4576 kfree(rbd_dev->header_name);
4577 rbd_dev->header_name = NULL;
0d7dbfce
AE
4578 kfree(rbd_dev->spec->image_id);
4579 rbd_dev->spec->image_id = NULL;
a30b71b9
AE
4580
4581 return ret;
4582}
4583
4584static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4585{
4586 size_t size;
9d475de5 4587 int ret;
6e14b1a6 4588 u64 ver = 0;
a30b71b9
AE
4589
4590 /*
4591 * Image id was filled in by the caller. Record the header
4592 * object name for this rbd image.
4593 */
979ed480 4594 size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
a30b71b9
AE
4595 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4596 if (!rbd_dev->header_name)
4597 return -ENOMEM;
4598 sprintf(rbd_dev->header_name, "%s%s",
0d7dbfce 4599 RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
9d475de5
AE
4600
4601 /* Get the size and object order for the image */
9d475de5 4602 ret = rbd_dev_v2_image_size(rbd_dev);
57385b51 4603 if (ret)
1e130199
AE
4604 goto out_err;
4605
4606 /* Get the object prefix (a.k.a. block_name) for the image */
4607
4608 ret = rbd_dev_v2_object_prefix(rbd_dev);
57385b51 4609 if (ret)
b1b5402a
AE
4610 goto out_err;
4611
d889140c 4612 /* Get the and check features for the image */
b1b5402a
AE
4613
4614 ret = rbd_dev_v2_features(rbd_dev);
57385b51 4615 if (ret)
9d475de5 4616 goto out_err;
35d489f9 4617
86b00e0d
AE
4618 /* If the image supports layering, get the parent info */
4619
4620 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4621 ret = rbd_dev_v2_parent_info(rbd_dev);
57385b51 4622 if (ret)
86b00e0d 4623 goto out_err;
770eba6e
AE
4624 rbd_warn(rbd_dev, "WARNING: kernel support for "
4625 "layered rbd images is EXPERIMENTAL!");
86b00e0d
AE
4626 }
4627
cc070d59
AE
4628 /* If the image supports fancy striping, get its parameters */
4629
4630 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4631 ret = rbd_dev_v2_striping_info(rbd_dev);
4632 if (ret < 0)
4633 goto out_err;
4634 }
4635
6e14b1a6
AE
4636 /* crypto and compression type aren't (yet) supported for v2 images */
4637
4638 rbd_dev->header.crypt_type = 0;
4639 rbd_dev->header.comp_type = 0;
35d489f9 4640
6e14b1a6
AE
4641 /* Get the snapshot context, plus the header version */
4642
4643 ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
35d489f9
AE
4644 if (ret)
4645 goto out_err;
6e14b1a6
AE
4646 rbd_dev->header.obj_version = ver;
4647
a30b71b9
AE
4648 dout("discovered version 2 image, header name is %s\n",
4649 rbd_dev->header_name);
4650
35152979 4651 return 0;
9d475de5 4652out_err:
86b00e0d
AE
4653 rbd_dev->parent_overlap = 0;
4654 rbd_spec_put(rbd_dev->parent_spec);
4655 rbd_dev->parent_spec = NULL;
9d475de5
AE
4656 kfree(rbd_dev->header_name);
4657 rbd_dev->header_name = NULL;
1e130199
AE
4658 kfree(rbd_dev->header.object_prefix);
4659 rbd_dev->header.object_prefix = NULL;
9d475de5
AE
4660
4661 return ret;
a30b71b9
AE
4662}
4663
83a06263
AE
4664static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
4665{
2f82ee54
AE
4666 struct rbd_device *parent = NULL;
4667 struct rbd_spec *parent_spec = NULL;
4668 struct rbd_client *rbdc = NULL;
83a06263
AE
4669 int ret;
4670
4671 /* no need to lock here, as rbd_dev is not registered yet */
4672 ret = rbd_dev_snaps_update(rbd_dev);
4673 if (ret)
4674 return ret;
4675
9e15b77d
AE
4676 ret = rbd_dev_probe_update_spec(rbd_dev);
4677 if (ret)
4678 goto err_out_snaps;
4679
83a06263
AE
4680 ret = rbd_dev_set_mapping(rbd_dev);
4681 if (ret)
4682 goto err_out_snaps;
4683
4684 /* generate unique id: find highest unique id, add one */
4685 rbd_dev_id_get(rbd_dev);
4686
4687 /* Fill in the device name, now that we have its id. */
4688 BUILD_BUG_ON(DEV_NAME_LEN
4689 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4690 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4691
4692 /* Get our block major device number. */
4693
4694 ret = register_blkdev(0, rbd_dev->name);
4695 if (ret < 0)
4696 goto err_out_id;
4697 rbd_dev->major = ret;
4698
4699 /* Set up the blkdev mapping. */
4700
4701 ret = rbd_init_disk(rbd_dev);
4702 if (ret)
4703 goto err_out_blkdev;
4704
4705 ret = rbd_bus_add_dev(rbd_dev);
4706 if (ret)
4707 goto err_out_disk;
4708
4709 /*
4710 * At this point cleanup in the event of an error is the job
4711 * of the sysfs code (initiated by rbd_bus_del_dev()).
4712 */
2f82ee54
AE
4713 /* Probe the parent if there is one */
4714
4715 if (rbd_dev->parent_spec) {
4716 /*
4717 * We need to pass a reference to the client and the
4718 * parent spec when creating the parent rbd_dev.
4719 * Images related by parent/child relationships
4720 * always share both.
4721 */
4722 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4723 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4724
4725 parent = rbd_dev_create(rbdc, parent_spec);
4726 if (!parent) {
4727 ret = -ENOMEM;
4728 goto err_out_spec;
4729 }
4730 rbdc = NULL; /* parent now owns reference */
4731 parent_spec = NULL; /* parent now owns reference */
4732 ret = rbd_dev_probe(parent);
4733 if (ret < 0)
4734 goto err_out_parent;
4735 rbd_dev->parent = parent;
4736 }
4737
9969ebc5 4738 ret = rbd_dev_header_watch_sync(rbd_dev, 1);
83a06263
AE
4739 if (ret)
4740 goto err_out_bus;
4741
4742 /* Everything's ready. Announce the disk to the world. */
4743
4744 add_disk(rbd_dev->disk);
4745
4746 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4747 (unsigned long long) rbd_dev->mapping.size);
4748
4749 return ret;
2f82ee54
AE
4750
4751err_out_parent:
4752 rbd_dev_destroy(parent);
4753err_out_spec:
4754 rbd_spec_put(parent_spec);
4755 rbd_put_client(rbdc);
83a06263
AE
4756err_out_bus:
4757 /* this will also clean up rest of rbd_dev stuff */
4758
4759 rbd_bus_del_dev(rbd_dev);
4760
4761 return ret;
4762err_out_disk:
4763 rbd_free_disk(rbd_dev);
4764err_out_blkdev:
4765 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4766err_out_id:
4767 rbd_dev_id_put(rbd_dev);
4768err_out_snaps:
4769 rbd_remove_all_snaps(rbd_dev);
4770
4771 return ret;
4772}
4773
a30b71b9
AE
4774/*
4775 * Probe for the existence of the header object for the given rbd
4776 * device. For format 2 images this includes determining the image
4777 * id.
4778 */
4779static int rbd_dev_probe(struct rbd_device *rbd_dev)
4780{
4781 int ret;
4782
4783 /*
4784 * Get the id from the image id object. If it's not a
4785 * format 2 image, we'll get ENOENT back, and we'll assume
4786 * it's a format 1 image.
4787 */
4788 ret = rbd_dev_image_id(rbd_dev);
4789 if (ret)
c0fba368
AE
4790 return ret;
4791 rbd_assert(rbd_dev->spec->image_id);
4792 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4793
4794 if (rbd_dev->image_format == 1)
a30b71b9
AE
4795 ret = rbd_dev_v1_probe(rbd_dev);
4796 else
4797 ret = rbd_dev_v2_probe(rbd_dev);
5655c4d9
AE
4798 if (ret)
4799 goto out_err;
83a06263
AE
4800
4801 ret = rbd_dev_probe_finish(rbd_dev);
4802 if (ret)
4803 rbd_header_free(&rbd_dev->header);
4804
5655c4d9
AE
4805 return ret;
4806out_err:
4807 kfree(rbd_dev->spec->image_id);
4808 rbd_dev->spec->image_id = NULL;
4809
4810 dout("probe failed, returning %d\n", ret);
4811
a30b71b9
AE
4812 return ret;
4813}
4814
59c2be1e
YS
4815static ssize_t rbd_add(struct bus_type *bus,
4816 const char *buf,
4817 size_t count)
602adf40 4818{
cb8627c7 4819 struct rbd_device *rbd_dev = NULL;
dc79b113 4820 struct ceph_options *ceph_opts = NULL;
4e9afeba 4821 struct rbd_options *rbd_opts = NULL;
859c31df 4822 struct rbd_spec *spec = NULL;
9d3997fd 4823 struct rbd_client *rbdc;
27cc2594
AE
4824 struct ceph_osd_client *osdc;
4825 int rc = -ENOMEM;
602adf40
YS
4826
4827 if (!try_module_get(THIS_MODULE))
4828 return -ENODEV;
4829
602adf40 4830 /* parse add command */
859c31df 4831 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 4832 if (rc < 0)
bd4ba655 4833 goto err_out_module;
78cea76e 4834
9d3997fd
AE
4835 rbdc = rbd_get_client(ceph_opts);
4836 if (IS_ERR(rbdc)) {
4837 rc = PTR_ERR(rbdc);
0ddebc0c 4838 goto err_out_args;
9d3997fd 4839 }
c53d5893 4840 ceph_opts = NULL; /* rbd_dev client now owns this */
602adf40 4841
602adf40 4842 /* pick the pool */
9d3997fd 4843 osdc = &rbdc->client->osdc;
859c31df 4844 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
602adf40
YS
4845 if (rc < 0)
4846 goto err_out_client;
859c31df
AE
4847 spec->pool_id = (u64) rc;
4848
0903e875
AE
4849 /* The ceph file layout needs to fit pool id in 32 bits */
4850
4851 if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4852 rc = -EIO;
4853 goto err_out_client;
4854 }
4855
c53d5893 4856 rbd_dev = rbd_dev_create(rbdc, spec);
bd4ba655
AE
4857 if (!rbd_dev)
4858 goto err_out_client;
c53d5893
AE
4859 rbdc = NULL; /* rbd_dev now owns this */
4860 spec = NULL; /* rbd_dev now owns this */
602adf40 4861
bd4ba655 4862 rbd_dev->mapping.read_only = rbd_opts->read_only;
c53d5893
AE
4863 kfree(rbd_opts);
4864 rbd_opts = NULL; /* done with this */
bd4ba655 4865
a30b71b9
AE
4866 rc = rbd_dev_probe(rbd_dev);
4867 if (rc < 0)
c53d5893 4868 goto err_out_rbd_dev;
05fd6f6f 4869
602adf40 4870 return count;
c53d5893
AE
4871err_out_rbd_dev:
4872 rbd_dev_destroy(rbd_dev);
bd4ba655 4873err_out_client:
9d3997fd 4874 rbd_put_client(rbdc);
0ddebc0c 4875err_out_args:
78cea76e
AE
4876 if (ceph_opts)
4877 ceph_destroy_options(ceph_opts);
4e9afeba 4878 kfree(rbd_opts);
859c31df 4879 rbd_spec_put(spec);
bd4ba655
AE
4880err_out_module:
4881 module_put(THIS_MODULE);
27cc2594 4882
602adf40 4883 dout("Error adding device %s\n", buf);
27cc2594
AE
4884
4885 return (ssize_t) rc;
602adf40
YS
4886}
4887
de71a297 4888static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
602adf40
YS
4889{
4890 struct list_head *tmp;
4891 struct rbd_device *rbd_dev;
4892
e124a82f 4893 spin_lock(&rbd_dev_list_lock);
602adf40
YS
4894 list_for_each(tmp, &rbd_dev_list) {
4895 rbd_dev = list_entry(tmp, struct rbd_device, node);
de71a297 4896 if (rbd_dev->dev_id == dev_id) {
e124a82f 4897 spin_unlock(&rbd_dev_list_lock);
602adf40 4898 return rbd_dev;
e124a82f 4899 }
602adf40 4900 }
e124a82f 4901 spin_unlock(&rbd_dev_list_lock);
602adf40
YS
4902 return NULL;
4903}
4904
dfc5606d 4905static void rbd_dev_release(struct device *dev)
602adf40 4906{
593a9e7b 4907 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 4908
59c2be1e 4909 if (rbd_dev->watch_event)
9969ebc5 4910 rbd_dev_header_watch_sync(rbd_dev, 0);
602adf40
YS
4911
4912 /* clean up and free blkdev */
4913 rbd_free_disk(rbd_dev);
4914 unregister_blkdev(rbd_dev->major, rbd_dev->name);
32eec68d 4915
2ac4e75d
AE
4916 /* release allocated disk header fields */
4917 rbd_header_free(&rbd_dev->header);
4918
32eec68d 4919 /* done with the id, and with the rbd_dev */
e2839308 4920 rbd_dev_id_put(rbd_dev);
c53d5893
AE
4921 rbd_assert(rbd_dev->rbd_client != NULL);
4922 rbd_dev_destroy(rbd_dev);
602adf40
YS
4923
4924 /* release module ref */
4925 module_put(THIS_MODULE);
602adf40
YS
4926}
4927
2f82ee54
AE
4928static void __rbd_remove(struct rbd_device *rbd_dev)
4929{
4930 rbd_remove_all_snaps(rbd_dev);
4931 rbd_bus_del_dev(rbd_dev);
4932}
4933
dfc5606d
YS
4934static ssize_t rbd_remove(struct bus_type *bus,
4935 const char *buf,
4936 size_t count)
602adf40
YS
4937{
4938 struct rbd_device *rbd_dev = NULL;
4939 int target_id, rc;
4940 unsigned long ul;
4941 int ret = count;
4942
4943 rc = strict_strtoul(buf, 10, &ul);
4944 if (rc)
4945 return rc;
4946
4947 /* convert to int; abort if we lost anything in the conversion */
4948 target_id = (int) ul;
4949 if (target_id != ul)
4950 return -EINVAL;
4951
4952 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4953
4954 rbd_dev = __rbd_get_dev(target_id);
4955 if (!rbd_dev) {
4956 ret = -ENOENT;
4957 goto done;
42382b70
AE
4958 }
4959
a14ea269 4960 spin_lock_irq(&rbd_dev->lock);
b82d167b 4961 if (rbd_dev->open_count)
42382b70 4962 ret = -EBUSY;
b82d167b
AE
4963 else
4964 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
a14ea269 4965 spin_unlock_irq(&rbd_dev->lock);
b82d167b 4966 if (ret < 0)
42382b70 4967 goto done;
602adf40 4968
2f82ee54
AE
4969 while (rbd_dev->parent_spec) {
4970 struct rbd_device *first = rbd_dev;
4971 struct rbd_device *second = first->parent;
4972 struct rbd_device *third;
4973
4974 /*
4975 * Follow to the parent with no grandparent and
4976 * remove it.
4977 */
4978 while (second && (third = second->parent)) {
4979 first = second;
4980 second = third;
4981 }
4982 __rbd_remove(second);
4983 rbd_spec_put(first->parent_spec);
4984 first->parent_spec = NULL;
4985 first->parent_overlap = 0;
4986 first->parent = NULL;
4987 }
4988 __rbd_remove(rbd_dev);
602adf40
YS
4989
4990done:
4991 mutex_unlock(&ctl_mutex);
aafb230e 4992
602adf40
YS
4993 return ret;
4994}
4995
602adf40
YS
4996/*
4997 * create control files in sysfs
dfc5606d 4998 * /sys/bus/rbd/...
602adf40
YS
4999 */
5000static int rbd_sysfs_init(void)
5001{
dfc5606d 5002 int ret;
602adf40 5003
fed4c143 5004 ret = device_register(&rbd_root_dev);
21079786 5005 if (ret < 0)
dfc5606d 5006 return ret;
602adf40 5007
fed4c143
AE
5008 ret = bus_register(&rbd_bus_type);
5009 if (ret < 0)
5010 device_unregister(&rbd_root_dev);
602adf40 5011
602adf40
YS
5012 return ret;
5013}
5014
5015static void rbd_sysfs_cleanup(void)
5016{
dfc5606d 5017 bus_unregister(&rbd_bus_type);
fed4c143 5018 device_unregister(&rbd_root_dev);
602adf40
YS
5019}
5020
cc344fa1 5021static int __init rbd_init(void)
602adf40
YS
5022{
5023 int rc;
5024
1e32d34c
AE
5025 if (!libceph_compatible(NULL)) {
5026 rbd_warn(NULL, "libceph incompatibility (quitting)");
5027
5028 return -EINVAL;
5029 }
602adf40
YS
5030 rc = rbd_sysfs_init();
5031 if (rc)
5032 return rc;
f0f8cef5 5033 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
602adf40
YS
5034 return 0;
5035}
5036
cc344fa1 5037static void __exit rbd_exit(void)
602adf40
YS
5038{
5039 rbd_sysfs_cleanup();
5040}
5041
5042module_init(rbd_init);
5043module_exit(rbd_exit);
5044
5045MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5046MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5047MODULE_DESCRIPTION("rados block device");
5048
5049/* following authorship retained from original osdblk.c */
5050MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5051
5052MODULE_LICENSE("GPL");
This page took 0.43325 seconds and 5 git commands to generate.