ceph: ceph_pagelist_append might sleep while atomic
[deliverable/linux.git] / drivers / block / rbd.c
CommitLineData
e2a58ee5 1
602adf40
YS
2/*
3 rbd.c -- Export ceph rados objects as a Linux block device
4
5
6 based on drivers/block/osdblk.c:
7
8 Copyright 2009 Red Hat, Inc.
9
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
dfc5606d 25 For usage instructions, please refer to:
602adf40 26
dfc5606d 27 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
28
29 */
30
31#include <linux/ceph/libceph.h>
32#include <linux/ceph/osd_client.h>
33#include <linux/ceph/mon_client.h>
34#include <linux/ceph/decode.h>
59c2be1e 35#include <linux/parser.h>
30d1cff8 36#include <linux/bsearch.h>
602adf40
YS
37
38#include <linux/kernel.h>
39#include <linux/device.h>
40#include <linux/module.h>
41#include <linux/fs.h>
42#include <linux/blkdev.h>
1c2a9dfe 43#include <linux/slab.h>
602adf40
YS
44
45#include "rbd_types.h"
46
aafb230e
AE
47#define RBD_DEBUG /* Activate rbd_assert() calls */
48
593a9e7b
AE
49/*
50 * The basic unit of block I/O is a sector. It is interpreted in a
51 * number of contexts in Linux (blk, bio, genhd), but the default is
52 * universally 512 bytes. These symbols are just slightly more
53 * meaningful than the bare numbers they represent.
54 */
55#define SECTOR_SHIFT 9
56#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
57
a2acd00e
AE
58/*
59 * Increment the given counter and return its updated value.
60 * If the counter is already 0 it will not be incremented.
61 * If the counter is already at its maximum value returns
62 * -EINVAL without updating it.
63 */
64static int atomic_inc_return_safe(atomic_t *v)
65{
66 unsigned int counter;
67
68 counter = (unsigned int)__atomic_add_unless(v, 1, 0);
69 if (counter <= (unsigned int)INT_MAX)
70 return (int)counter;
71
72 atomic_dec(v);
73
74 return -EINVAL;
75}
76
77/* Decrement the counter. Return the resulting value, or -EINVAL */
78static int atomic_dec_return_safe(atomic_t *v)
79{
80 int counter;
81
82 counter = atomic_dec_return(v);
83 if (counter >= 0)
84 return counter;
85
86 atomic_inc(v);
87
88 return -EINVAL;
89}
90
f0f8cef5
AE
91#define RBD_DRV_NAME "rbd"
92#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
93
94#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
95
d4b125e9
AE
96#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
97#define RBD_MAX_SNAP_NAME_LEN \
98 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
99
35d489f9 100#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
101
102#define RBD_SNAP_HEAD_NAME "-"
103
9682fc6d
AE
104#define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
105
9e15b77d
AE
106/* This allows a single page to hold an image name sent by OSD */
107#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 108#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 109
1e130199 110#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 111
d889140c
AE
112/* Feature bits */
113
5cbf6f12
AE
114#define RBD_FEATURE_LAYERING (1<<0)
115#define RBD_FEATURE_STRIPINGV2 (1<<1)
116#define RBD_FEATURES_ALL \
117 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
d889140c
AE
118
119/* Features supported by this (client software) implementation. */
120
770eba6e 121#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
d889140c 122
81a89793
AE
123/*
124 * An RBD device name will be "rbd#", where the "rbd" comes from
125 * RBD_DRV_NAME above, and # is a unique integer identifier.
126 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
127 * enough to hold all possible device names.
128 */
602adf40 129#define DEV_NAME_LEN 32
81a89793 130#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40
YS
131
132/*
133 * block device image metadata (in-memory version)
134 */
135struct rbd_image_header {
f35a4dee 136 /* These six fields never change for a given rbd image */
849b4260 137 char *object_prefix;
602adf40
YS
138 __u8 obj_order;
139 __u8 crypt_type;
140 __u8 comp_type;
f35a4dee
AE
141 u64 stripe_unit;
142 u64 stripe_count;
143 u64 features; /* Might be changeable someday? */
602adf40 144
f84344f3
AE
145 /* The remaining fields need to be updated occasionally */
146 u64 image_size;
147 struct ceph_snap_context *snapc;
f35a4dee
AE
148 char *snap_names; /* format 1 only */
149 u64 *snap_sizes; /* format 1 only */
59c2be1e
YS
150};
151
0d7dbfce
AE
152/*
153 * An rbd image specification.
154 *
155 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
156 * identify an image. Each rbd_dev structure includes a pointer to
157 * an rbd_spec structure that encapsulates this identity.
158 *
159 * Each of the id's in an rbd_spec has an associated name. For a
160 * user-mapped image, the names are supplied and the id's associated
161 * with them are looked up. For a layered image, a parent image is
162 * defined by the tuple, and the names are looked up.
163 *
164 * An rbd_dev structure contains a parent_spec pointer which is
165 * non-null if the image it represents is a child in a layered
166 * image. This pointer will refer to the rbd_spec structure used
167 * by the parent rbd_dev for its own identity (i.e., the structure
168 * is shared between the parent and child).
169 *
170 * Since these structures are populated once, during the discovery
171 * phase of image construction, they are effectively immutable so
172 * we make no effort to synchronize access to them.
173 *
174 * Note that code herein does not assume the image name is known (it
175 * could be a null pointer).
0d7dbfce
AE
176 */
177struct rbd_spec {
178 u64 pool_id;
ecb4dc22 179 const char *pool_name;
0d7dbfce 180
ecb4dc22
AE
181 const char *image_id;
182 const char *image_name;
0d7dbfce
AE
183
184 u64 snap_id;
ecb4dc22 185 const char *snap_name;
0d7dbfce
AE
186
187 struct kref kref;
188};
189
602adf40 190/*
f0f8cef5 191 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
192 */
193struct rbd_client {
194 struct ceph_client *client;
195 struct kref kref;
196 struct list_head node;
197};
198
bf0d5f50
AE
199struct rbd_img_request;
200typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
201
202#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
203
204struct rbd_obj_request;
205typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
206
9969ebc5
AE
207enum obj_request_type {
208 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
209};
bf0d5f50 210
926f9b3f
AE
211enum obj_req_flags {
212 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
6365d33a 213 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
5679c59f
AE
214 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
215 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
926f9b3f
AE
216};
217
bf0d5f50
AE
218struct rbd_obj_request {
219 const char *object_name;
220 u64 offset; /* object start byte */
221 u64 length; /* bytes from offset */
926f9b3f 222 unsigned long flags;
bf0d5f50 223
c5b5ef6c
AE
224 /*
225 * An object request associated with an image will have its
226 * img_data flag set; a standalone object request will not.
227 *
228 * A standalone object request will have which == BAD_WHICH
229 * and a null obj_request pointer.
230 *
231 * An object request initiated in support of a layered image
232 * object (to check for its existence before a write) will
233 * have which == BAD_WHICH and a non-null obj_request pointer.
234 *
235 * Finally, an object request for rbd image data will have
236 * which != BAD_WHICH, and will have a non-null img_request
237 * pointer. The value of which will be in the range
238 * 0..(img_request->obj_request_count-1).
239 */
240 union {
241 struct rbd_obj_request *obj_request; /* STAT op */
242 struct {
243 struct rbd_img_request *img_request;
244 u64 img_offset;
245 /* links for img_request->obj_requests list */
246 struct list_head links;
247 };
248 };
bf0d5f50
AE
249 u32 which; /* posn image request list */
250
251 enum obj_request_type type;
788e2df3
AE
252 union {
253 struct bio *bio_list;
254 struct {
255 struct page **pages;
256 u32 page_count;
257 };
258 };
0eefd470 259 struct page **copyup_pages;
ebda6408 260 u32 copyup_page_count;
bf0d5f50
AE
261
262 struct ceph_osd_request *osd_req;
263
264 u64 xferred; /* bytes transferred */
1b83bef2 265 int result;
bf0d5f50
AE
266
267 rbd_obj_callback_t callback;
788e2df3 268 struct completion completion;
bf0d5f50
AE
269
270 struct kref kref;
271};
272
0c425248 273enum img_req_flags {
9849e986
AE
274 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
275 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
d0b2e944 276 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
0c425248
AE
277};
278
bf0d5f50 279struct rbd_img_request {
bf0d5f50
AE
280 struct rbd_device *rbd_dev;
281 u64 offset; /* starting image byte offset */
282 u64 length; /* byte count from offset */
0c425248 283 unsigned long flags;
bf0d5f50 284 union {
9849e986 285 u64 snap_id; /* for reads */
bf0d5f50 286 struct ceph_snap_context *snapc; /* for writes */
9849e986
AE
287 };
288 union {
289 struct request *rq; /* block request */
290 struct rbd_obj_request *obj_request; /* obj req initiator */
bf0d5f50 291 };
3d7efd18 292 struct page **copyup_pages;
ebda6408 293 u32 copyup_page_count;
bf0d5f50
AE
294 spinlock_t completion_lock;/* protects next_completion */
295 u32 next_completion;
296 rbd_img_callback_t callback;
55f27e09 297 u64 xferred;/* aggregate bytes transferred */
a5a337d4 298 int result; /* first nonzero obj_request result */
bf0d5f50
AE
299
300 u32 obj_request_count;
301 struct list_head obj_requests; /* rbd_obj_request structs */
302
303 struct kref kref;
304};
305
306#define for_each_obj_request(ireq, oreq) \
ef06f4d3 307 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
bf0d5f50 308#define for_each_obj_request_from(ireq, oreq) \
ef06f4d3 309 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
bf0d5f50 310#define for_each_obj_request_safe(ireq, oreq, n) \
ef06f4d3 311 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
bf0d5f50 312
f84344f3 313struct rbd_mapping {
99c1f08f 314 u64 size;
34b13184 315 u64 features;
f84344f3
AE
316 bool read_only;
317};
318
602adf40
YS
319/*
320 * a single device
321 */
322struct rbd_device {
de71a297 323 int dev_id; /* blkdev unique id */
602adf40
YS
324
325 int major; /* blkdev assigned major */
326 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 327
a30b71b9 328 u32 image_format; /* Either 1 or 2 */
602adf40
YS
329 struct rbd_client *rbd_client;
330
331 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
332
b82d167b 333 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
334
335 struct rbd_image_header header;
b82d167b 336 unsigned long flags; /* possibly lock protected */
0d7dbfce 337 struct rbd_spec *spec;
602adf40 338
0d7dbfce 339 char *header_name;
971f839a 340
0903e875
AE
341 struct ceph_file_layout layout;
342
59c2be1e 343 struct ceph_osd_event *watch_event;
975241af 344 struct rbd_obj_request *watch_request;
59c2be1e 345
86b00e0d
AE
346 struct rbd_spec *parent_spec;
347 u64 parent_overlap;
a2acd00e 348 atomic_t parent_ref;
2f82ee54 349 struct rbd_device *parent;
86b00e0d 350
c666601a
JD
351 /* protects updating the header */
352 struct rw_semaphore header_rwsem;
f84344f3
AE
353
354 struct rbd_mapping mapping;
602adf40
YS
355
356 struct list_head node;
dfc5606d 357
dfc5606d
YS
358 /* sysfs related */
359 struct device dev;
b82d167b 360 unsigned long open_count; /* protected by lock */
dfc5606d
YS
361};
362
b82d167b
AE
363/*
364 * Flag bits for rbd_dev->flags. If atomicity is required,
365 * rbd_dev->lock is used to protect access.
366 *
367 * Currently, only the "removing" flag (which is coupled with the
368 * "open_count" field) requires atomic access.
369 */
6d292906
AE
370enum rbd_dev_flags {
371 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 372 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
6d292906
AE
373};
374
602adf40 375static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 376
602adf40 377static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
378static DEFINE_SPINLOCK(rbd_dev_list_lock);
379
432b8587
AE
380static LIST_HEAD(rbd_client_list); /* clients */
381static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 382
78c2a44a
AE
383/* Slab caches for frequently-allocated structures */
384
1c2a9dfe 385static struct kmem_cache *rbd_img_request_cache;
868311b1 386static struct kmem_cache *rbd_obj_request_cache;
78c2a44a 387static struct kmem_cache *rbd_segment_name_cache;
1c2a9dfe 388
3d7efd18
AE
389static int rbd_img_request_submit(struct rbd_img_request *img_request);
390
200a6a8b 391static void rbd_dev_device_release(struct device *dev);
dfc5606d 392
f0f8cef5
AE
393static ssize_t rbd_add(struct bus_type *bus, const char *buf,
394 size_t count);
395static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
396 size_t count);
1f3ef788 397static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
a2acd00e 398static void rbd_spec_put(struct rbd_spec *spec);
f0f8cef5
AE
399
400static struct bus_attribute rbd_bus_attrs[] = {
401 __ATTR(add, S_IWUSR, NULL, rbd_add),
402 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
403 __ATTR_NULL
404};
405
406static struct bus_type rbd_bus_type = {
407 .name = "rbd",
408 .bus_attrs = rbd_bus_attrs,
409};
410
411static void rbd_root_dev_release(struct device *dev)
412{
413}
414
415static struct device rbd_root_dev = {
416 .init_name = "rbd",
417 .release = rbd_root_dev_release,
418};
419
06ecc6cb
AE
420static __printf(2, 3)
421void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
422{
423 struct va_format vaf;
424 va_list args;
425
426 va_start(args, fmt);
427 vaf.fmt = fmt;
428 vaf.va = &args;
429
430 if (!rbd_dev)
431 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
432 else if (rbd_dev->disk)
433 printk(KERN_WARNING "%s: %s: %pV\n",
434 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
435 else if (rbd_dev->spec && rbd_dev->spec->image_name)
436 printk(KERN_WARNING "%s: image %s: %pV\n",
437 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
438 else if (rbd_dev->spec && rbd_dev->spec->image_id)
439 printk(KERN_WARNING "%s: id %s: %pV\n",
440 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
441 else /* punt */
442 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
443 RBD_DRV_NAME, rbd_dev, &vaf);
444 va_end(args);
445}
446
aafb230e
AE
447#ifdef RBD_DEBUG
448#define rbd_assert(expr) \
449 if (unlikely(!(expr))) { \
450 printk(KERN_ERR "\nAssertion failure in %s() " \
451 "at line %d:\n\n" \
452 "\trbd_assert(%s);\n\n", \
453 __func__, __LINE__, #expr); \
454 BUG(); \
455 }
456#else /* !RBD_DEBUG */
457# define rbd_assert(expr) ((void) 0)
458#endif /* !RBD_DEBUG */
dfc5606d 459
b454e36d 460static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
05a46afd
AE
461static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
462static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
8b3e1a56 463
cc4a38bd 464static int rbd_dev_refresh(struct rbd_device *rbd_dev);
2df3fac7
AE
465static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
466static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
54cac61f
AE
467static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
468 u64 snap_id);
2ad3d716
AE
469static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
470 u8 *order, u64 *snap_size);
471static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
472 u64 *snap_features);
473static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
59c2be1e 474
602adf40
YS
475static int rbd_open(struct block_device *bdev, fmode_t mode)
476{
f0f8cef5 477 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 478 bool removing = false;
602adf40 479
f84344f3 480 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
602adf40
YS
481 return -EROFS;
482
a14ea269 483 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
484 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
485 removing = true;
486 else
487 rbd_dev->open_count++;
a14ea269 488 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
489 if (removing)
490 return -ENOENT;
491
42382b70 492 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 493 (void) get_device(&rbd_dev->dev);
f84344f3 494 set_device_ro(bdev, rbd_dev->mapping.read_only);
42382b70 495 mutex_unlock(&ctl_mutex);
340c7a2b 496
602adf40
YS
497 return 0;
498}
499
dfc5606d
YS
500static int rbd_release(struct gendisk *disk, fmode_t mode)
501{
502 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
503 unsigned long open_count_before;
504
a14ea269 505 spin_lock_irq(&rbd_dev->lock);
b82d167b 506 open_count_before = rbd_dev->open_count--;
a14ea269 507 spin_unlock_irq(&rbd_dev->lock);
b82d167b 508 rbd_assert(open_count_before > 0);
dfc5606d 509
42382b70 510 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 511 put_device(&rbd_dev->dev);
42382b70 512 mutex_unlock(&ctl_mutex);
dfc5606d
YS
513
514 return 0;
515}
516
602adf40
YS
517static const struct block_device_operations rbd_bd_ops = {
518 .owner = THIS_MODULE,
519 .open = rbd_open,
dfc5606d 520 .release = rbd_release,
602adf40
YS
521};
522
523/*
524 * Initialize an rbd client instance.
43ae4701 525 * We own *ceph_opts.
602adf40 526 */
f8c38929 527static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
528{
529 struct rbd_client *rbdc;
530 int ret = -ENOMEM;
531
37206ee5 532 dout("%s:\n", __func__);
602adf40
YS
533 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
534 if (!rbdc)
535 goto out_opt;
536
537 kref_init(&rbdc->kref);
538 INIT_LIST_HEAD(&rbdc->node);
539
bc534d86
AE
540 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
541
43ae4701 542 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 543 if (IS_ERR(rbdc->client))
bc534d86 544 goto out_mutex;
43ae4701 545 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
546
547 ret = ceph_open_session(rbdc->client);
548 if (ret < 0)
549 goto out_err;
550
432b8587 551 spin_lock(&rbd_client_list_lock);
602adf40 552 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 553 spin_unlock(&rbd_client_list_lock);
602adf40 554
bc534d86 555 mutex_unlock(&ctl_mutex);
37206ee5 556 dout("%s: rbdc %p\n", __func__, rbdc);
bc534d86 557
602adf40
YS
558 return rbdc;
559
560out_err:
561 ceph_destroy_client(rbdc->client);
bc534d86
AE
562out_mutex:
563 mutex_unlock(&ctl_mutex);
602adf40
YS
564 kfree(rbdc);
565out_opt:
43ae4701
AE
566 if (ceph_opts)
567 ceph_destroy_options(ceph_opts);
37206ee5
AE
568 dout("%s: error %d\n", __func__, ret);
569
28f259b7 570 return ERR_PTR(ret);
602adf40
YS
571}
572
2f82ee54
AE
573static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
574{
575 kref_get(&rbdc->kref);
576
577 return rbdc;
578}
579
602adf40 580/*
1f7ba331
AE
581 * Find a ceph client with specific addr and configuration. If
582 * found, bump its reference count.
602adf40 583 */
1f7ba331 584static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
585{
586 struct rbd_client *client_node;
1f7ba331 587 bool found = false;
602adf40 588
43ae4701 589 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
590 return NULL;
591
1f7ba331
AE
592 spin_lock(&rbd_client_list_lock);
593 list_for_each_entry(client_node, &rbd_client_list, node) {
594 if (!ceph_compare_options(ceph_opts, client_node->client)) {
2f82ee54
AE
595 __rbd_get_client(client_node);
596
1f7ba331
AE
597 found = true;
598 break;
599 }
600 }
601 spin_unlock(&rbd_client_list_lock);
602
603 return found ? client_node : NULL;
602adf40
YS
604}
605
59c2be1e
YS
606/*
607 * mount options
608 */
609enum {
59c2be1e
YS
610 Opt_last_int,
611 /* int args above */
612 Opt_last_string,
613 /* string args above */
cc0538b6
AE
614 Opt_read_only,
615 Opt_read_write,
616 /* Boolean args above */
617 Opt_last_bool,
59c2be1e
YS
618};
619
43ae4701 620static match_table_t rbd_opts_tokens = {
59c2be1e
YS
621 /* int args above */
622 /* string args above */
be466c1c 623 {Opt_read_only, "read_only"},
cc0538b6
AE
624 {Opt_read_only, "ro"}, /* Alternate spelling */
625 {Opt_read_write, "read_write"},
626 {Opt_read_write, "rw"}, /* Alternate spelling */
627 /* Boolean args above */
59c2be1e
YS
628 {-1, NULL}
629};
630
98571b5a
AE
631struct rbd_options {
632 bool read_only;
633};
634
635#define RBD_READ_ONLY_DEFAULT false
636
59c2be1e
YS
637static int parse_rbd_opts_token(char *c, void *private)
638{
43ae4701 639 struct rbd_options *rbd_opts = private;
59c2be1e
YS
640 substring_t argstr[MAX_OPT_ARGS];
641 int token, intval, ret;
642
43ae4701 643 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
644 if (token < 0)
645 return -EINVAL;
646
647 if (token < Opt_last_int) {
648 ret = match_int(&argstr[0], &intval);
649 if (ret < 0) {
650 pr_err("bad mount option arg (not int) "
651 "at '%s'\n", c);
652 return ret;
653 }
654 dout("got int token %d val %d\n", token, intval);
655 } else if (token > Opt_last_int && token < Opt_last_string) {
656 dout("got string token %d val %s\n", token,
657 argstr[0].from);
cc0538b6
AE
658 } else if (token > Opt_last_string && token < Opt_last_bool) {
659 dout("got Boolean token %d\n", token);
59c2be1e
YS
660 } else {
661 dout("got token %d\n", token);
662 }
663
664 switch (token) {
cc0538b6
AE
665 case Opt_read_only:
666 rbd_opts->read_only = true;
667 break;
668 case Opt_read_write:
669 rbd_opts->read_only = false;
670 break;
59c2be1e 671 default:
aafb230e
AE
672 rbd_assert(false);
673 break;
59c2be1e
YS
674 }
675 return 0;
676}
677
602adf40
YS
678/*
679 * Get a ceph client with specific addr and configuration, if one does
680 * not exist create it.
681 */
9d3997fd 682static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
602adf40 683{
f8c38929 684 struct rbd_client *rbdc;
59c2be1e 685
1f7ba331 686 rbdc = rbd_client_find(ceph_opts);
9d3997fd 687 if (rbdc) /* using an existing client */
43ae4701 688 ceph_destroy_options(ceph_opts);
9d3997fd 689 else
f8c38929 690 rbdc = rbd_client_create(ceph_opts);
602adf40 691
9d3997fd 692 return rbdc;
602adf40
YS
693}
694
695/*
696 * Destroy ceph client
d23a4b3f 697 *
432b8587 698 * Caller must hold rbd_client_list_lock.
602adf40
YS
699 */
700static void rbd_client_release(struct kref *kref)
701{
702 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
703
37206ee5 704 dout("%s: rbdc %p\n", __func__, rbdc);
cd9d9f5d 705 spin_lock(&rbd_client_list_lock);
602adf40 706 list_del(&rbdc->node);
cd9d9f5d 707 spin_unlock(&rbd_client_list_lock);
602adf40
YS
708
709 ceph_destroy_client(rbdc->client);
710 kfree(rbdc);
711}
712
713/*
714 * Drop reference to ceph client node. If it's not referenced anymore, release
715 * it.
716 */
9d3997fd 717static void rbd_put_client(struct rbd_client *rbdc)
602adf40 718{
c53d5893
AE
719 if (rbdc)
720 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
721}
722
a30b71b9
AE
723static bool rbd_image_format_valid(u32 image_format)
724{
725 return image_format == 1 || image_format == 2;
726}
727
8e94af8e
AE
728static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
729{
103a150f
AE
730 size_t size;
731 u32 snap_count;
732
733 /* The header has to start with the magic rbd header text */
734 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
735 return false;
736
db2388b6
AE
737 /* The bio layer requires at least sector-sized I/O */
738
739 if (ondisk->options.order < SECTOR_SHIFT)
740 return false;
741
742 /* If we use u64 in a few spots we may be able to loosen this */
743
744 if (ondisk->options.order > 8 * sizeof (int) - 1)
745 return false;
746
103a150f
AE
747 /*
748 * The size of a snapshot header has to fit in a size_t, and
749 * that limits the number of snapshots.
750 */
751 snap_count = le32_to_cpu(ondisk->snap_count);
752 size = SIZE_MAX - sizeof (struct ceph_snap_context);
753 if (snap_count > size / sizeof (__le64))
754 return false;
755
756 /*
757 * Not only that, but the size of the entire the snapshot
758 * header must also be representable in a size_t.
759 */
760 size -= snap_count * sizeof (__le64);
761 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
762 return false;
763
764 return true;
8e94af8e
AE
765}
766
602adf40 767/*
bb23e37a
AE
768 * Fill an rbd image header with information from the given format 1
769 * on-disk header.
602adf40 770 */
662518b1 771static int rbd_header_from_disk(struct rbd_device *rbd_dev,
4156d998 772 struct rbd_image_header_ondisk *ondisk)
602adf40 773{
662518b1 774 struct rbd_image_header *header = &rbd_dev->header;
bb23e37a
AE
775 bool first_time = header->object_prefix == NULL;
776 struct ceph_snap_context *snapc;
777 char *object_prefix = NULL;
778 char *snap_names = NULL;
779 u64 *snap_sizes = NULL;
ccece235 780 u32 snap_count;
d2bb24e5 781 size_t size;
bb23e37a 782 int ret = -ENOMEM;
621901d6 783 u32 i;
602adf40 784
bb23e37a
AE
785 /* Allocate this now to avoid having to handle failure below */
786
787 if (first_time) {
788 size_t len;
789
790 len = strnlen(ondisk->object_prefix,
791 sizeof (ondisk->object_prefix));
792 object_prefix = kmalloc(len + 1, GFP_KERNEL);
793 if (!object_prefix)
794 return -ENOMEM;
795 memcpy(object_prefix, ondisk->object_prefix, len);
796 object_prefix[len] = '\0';
797 }
103a150f 798
bb23e37a 799 /* Allocate the snapshot context and fill it in */
00f1f36f 800
bb23e37a
AE
801 snap_count = le32_to_cpu(ondisk->snap_count);
802 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
803 if (!snapc)
804 goto out_err;
805 snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 806 if (snap_count) {
bb23e37a 807 struct rbd_image_snap_ondisk *snaps;
f785cc1d
AE
808 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
809
bb23e37a 810 /* We'll keep a copy of the snapshot names... */
621901d6 811
bb23e37a
AE
812 if (snap_names_len > (u64)SIZE_MAX)
813 goto out_2big;
814 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
815 if (!snap_names)
6a52325f
AE
816 goto out_err;
817
bb23e37a 818 /* ...as well as the array of their sizes. */
621901d6 819
d2bb24e5 820 size = snap_count * sizeof (*header->snap_sizes);
bb23e37a
AE
821 snap_sizes = kmalloc(size, GFP_KERNEL);
822 if (!snap_sizes)
6a52325f 823 goto out_err;
bb23e37a
AE
824
825 /*
826 * Copy the names, and fill in each snapshot's id
827 * and size.
828 *
99a41ebc 829 * Note that rbd_dev_v1_header_info() guarantees the
bb23e37a
AE
830 * ondisk buffer we're working with has
831 * snap_names_len bytes beyond the end of the
832 * snapshot id array, this memcpy() is safe.
833 */
834 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
835 snaps = ondisk->snaps;
836 for (i = 0; i < snap_count; i++) {
837 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
838 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
839 }
602adf40 840 }
849b4260 841
bb23e37a
AE
842 /* We won't fail any more, fill in the header */
843
662518b1 844 down_write(&rbd_dev->header_rwsem);
bb23e37a
AE
845 if (first_time) {
846 header->object_prefix = object_prefix;
847 header->obj_order = ondisk->options.order;
848 header->crypt_type = ondisk->options.crypt_type;
849 header->comp_type = ondisk->options.comp_type;
850 /* The rest aren't used for format 1 images */
851 header->stripe_unit = 0;
852 header->stripe_count = 0;
853 header->features = 0;
662518b1
AE
854 } else {
855 ceph_put_snap_context(header->snapc);
856 kfree(header->snap_names);
857 kfree(header->snap_sizes);
bb23e37a 858 }
6a52325f 859
bb23e37a 860 /* The remaining fields always get updated (when we refresh) */
621901d6 861
f84344f3 862 header->image_size = le64_to_cpu(ondisk->image_size);
bb23e37a
AE
863 header->snapc = snapc;
864 header->snap_names = snap_names;
865 header->snap_sizes = snap_sizes;
602adf40 866
662518b1
AE
867 /* Make sure mapping size is consistent with header info */
868
869 if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
870 if (rbd_dev->mapping.size != header->image_size)
871 rbd_dev->mapping.size = header->image_size;
872
873 up_write(&rbd_dev->header_rwsem);
874
602adf40 875 return 0;
bb23e37a
AE
876out_2big:
877 ret = -EIO;
6a52325f 878out_err:
bb23e37a
AE
879 kfree(snap_sizes);
880 kfree(snap_names);
881 ceph_put_snap_context(snapc);
882 kfree(object_prefix);
883
884 return ret;
602adf40
YS
885}
886
9682fc6d
AE
887static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
888{
889 const char *snap_name;
890
891 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
892
893 /* Skip over names until we find the one we are looking for */
894
895 snap_name = rbd_dev->header.snap_names;
896 while (which--)
897 snap_name += strlen(snap_name) + 1;
898
899 return kstrdup(snap_name, GFP_KERNEL);
900}
901
30d1cff8
AE
902/*
903 * Snapshot id comparison function for use with qsort()/bsearch().
904 * Note that result is for snapshots in *descending* order.
905 */
906static int snapid_compare_reverse(const void *s1, const void *s2)
907{
908 u64 snap_id1 = *(u64 *)s1;
909 u64 snap_id2 = *(u64 *)s2;
910
911 if (snap_id1 < snap_id2)
912 return 1;
913 return snap_id1 == snap_id2 ? 0 : -1;
914}
915
916/*
917 * Search a snapshot context to see if the given snapshot id is
918 * present.
919 *
920 * Returns the position of the snapshot id in the array if it's found,
921 * or BAD_SNAP_INDEX otherwise.
922 *
923 * Note: The snapshot array is in kept sorted (by the osd) in
924 * reverse order, highest snapshot id first.
925 */
9682fc6d
AE
926static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
927{
928 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
30d1cff8 929 u64 *found;
9682fc6d 930
30d1cff8
AE
931 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
932 sizeof (snap_id), snapid_compare_reverse);
9682fc6d 933
30d1cff8 934 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
9682fc6d
AE
935}
936
2ad3d716
AE
937static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
938 u64 snap_id)
9e15b77d 939{
54cac61f 940 u32 which;
9e15b77d 941
54cac61f
AE
942 which = rbd_dev_snap_index(rbd_dev, snap_id);
943 if (which == BAD_SNAP_INDEX)
944 return NULL;
945
946 return _rbd_dev_v1_snap_name(rbd_dev, which);
947}
948
949static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
950{
9e15b77d
AE
951 if (snap_id == CEPH_NOSNAP)
952 return RBD_SNAP_HEAD_NAME;
953
54cac61f
AE
954 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
955 if (rbd_dev->image_format == 1)
956 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
9e15b77d 957
54cac61f 958 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
9e15b77d
AE
959}
960
2ad3d716
AE
961static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
962 u64 *snap_size)
602adf40 963{
2ad3d716
AE
964 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
965 if (snap_id == CEPH_NOSNAP) {
966 *snap_size = rbd_dev->header.image_size;
967 } else if (rbd_dev->image_format == 1) {
968 u32 which;
602adf40 969
2ad3d716
AE
970 which = rbd_dev_snap_index(rbd_dev, snap_id);
971 if (which == BAD_SNAP_INDEX)
972 return -ENOENT;
e86924a8 973
2ad3d716
AE
974 *snap_size = rbd_dev->header.snap_sizes[which];
975 } else {
976 u64 size = 0;
977 int ret;
978
979 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
980 if (ret)
981 return ret;
982
983 *snap_size = size;
984 }
985 return 0;
602adf40
YS
986}
987
2ad3d716
AE
988static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
989 u64 *snap_features)
602adf40 990{
2ad3d716
AE
991 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
992 if (snap_id == CEPH_NOSNAP) {
993 *snap_features = rbd_dev->header.features;
994 } else if (rbd_dev->image_format == 1) {
995 *snap_features = 0; /* No features for format 1 */
602adf40 996 } else {
2ad3d716
AE
997 u64 features = 0;
998 int ret;
8b0241f8 999
2ad3d716
AE
1000 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1001 if (ret)
1002 return ret;
1003
1004 *snap_features = features;
1005 }
1006 return 0;
1007}
1008
1009static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1010{
8f4b7d98 1011 u64 snap_id = rbd_dev->spec->snap_id;
2ad3d716
AE
1012 u64 size = 0;
1013 u64 features = 0;
1014 int ret;
1015
2ad3d716
AE
1016 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1017 if (ret)
1018 return ret;
1019 ret = rbd_snap_features(rbd_dev, snap_id, &features);
1020 if (ret)
1021 return ret;
1022
1023 rbd_dev->mapping.size = size;
1024 rbd_dev->mapping.features = features;
1025
8b0241f8 1026 return 0;
602adf40
YS
1027}
1028
d1cf5788
AE
1029static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1030{
1031 rbd_dev->mapping.size = 0;
1032 rbd_dev->mapping.features = 0;
d1cf5788
AE
1033}
1034
98571b5a 1035static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
602adf40 1036{
65ccfe21
AE
1037 char *name;
1038 u64 segment;
1039 int ret;
602adf40 1040
78c2a44a 1041 name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
65ccfe21
AE
1042 if (!name)
1043 return NULL;
1044 segment = offset >> rbd_dev->header.obj_order;
2fd82b9e 1045 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
65ccfe21 1046 rbd_dev->header.object_prefix, segment);
2fd82b9e 1047 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
65ccfe21
AE
1048 pr_err("error formatting segment name for #%llu (%d)\n",
1049 segment, ret);
1050 kfree(name);
1051 name = NULL;
1052 }
602adf40 1053
65ccfe21
AE
1054 return name;
1055}
602adf40 1056
78c2a44a
AE
1057static void rbd_segment_name_free(const char *name)
1058{
1059 /* The explicit cast here is needed to drop the const qualifier */
1060
1061 kmem_cache_free(rbd_segment_name_cache, (void *)name);
1062}
1063
65ccfe21
AE
1064static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1065{
1066 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
602adf40 1067
65ccfe21
AE
1068 return offset & (segment_size - 1);
1069}
1070
1071static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1072 u64 offset, u64 length)
1073{
1074 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1075
1076 offset &= segment_size - 1;
1077
aafb230e 1078 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
1079 if (offset + length > segment_size)
1080 length = segment_size - offset;
1081
1082 return length;
602adf40
YS
1083}
1084
029bcbd8
JD
1085/*
1086 * returns the size of an object in the image
1087 */
1088static u64 rbd_obj_bytes(struct rbd_image_header *header)
1089{
1090 return 1 << header->obj_order;
1091}
1092
602adf40
YS
1093/*
1094 * bio helpers
1095 */
1096
1097static void bio_chain_put(struct bio *chain)
1098{
1099 struct bio *tmp;
1100
1101 while (chain) {
1102 tmp = chain;
1103 chain = chain->bi_next;
1104 bio_put(tmp);
1105 }
1106}
1107
1108/*
1109 * zeros a bio chain, starting at specific offset
1110 */
1111static void zero_bio_chain(struct bio *chain, int start_ofs)
1112{
1113 struct bio_vec *bv;
1114 unsigned long flags;
1115 void *buf;
1116 int i;
1117 int pos = 0;
1118
1119 while (chain) {
1120 bio_for_each_segment(bv, chain, i) {
1121 if (pos + bv->bv_len > start_ofs) {
1122 int remainder = max(start_ofs - pos, 0);
1123 buf = bvec_kmap_irq(bv, &flags);
1124 memset(buf + remainder, 0,
1125 bv->bv_len - remainder);
85b5aaa6 1126 bvec_kunmap_irq(buf, &flags);
602adf40
YS
1127 }
1128 pos += bv->bv_len;
1129 }
1130
1131 chain = chain->bi_next;
1132 }
1133}
1134
b9434c5b
AE
1135/*
1136 * similar to zero_bio_chain(), zeros data defined by a page array,
1137 * starting at the given byte offset from the start of the array and
1138 * continuing up to the given end offset. The pages array is
1139 * assumed to be big enough to hold all bytes up to the end.
1140 */
1141static void zero_pages(struct page **pages, u64 offset, u64 end)
1142{
1143 struct page **page = &pages[offset >> PAGE_SHIFT];
1144
1145 rbd_assert(end > offset);
1146 rbd_assert(end - offset <= (u64)SIZE_MAX);
1147 while (offset < end) {
1148 size_t page_offset;
1149 size_t length;
1150 unsigned long flags;
1151 void *kaddr;
1152
1153 page_offset = (size_t)(offset & ~PAGE_MASK);
1154 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1155 local_irq_save(flags);
1156 kaddr = kmap_atomic(*page);
1157 memset(kaddr + page_offset, 0, length);
1158 kunmap_atomic(kaddr);
1159 local_irq_restore(flags);
1160
1161 offset += length;
1162 page++;
1163 }
1164}
1165
602adf40 1166/*
f7760dad
AE
1167 * Clone a portion of a bio, starting at the given byte offset
1168 * and continuing for the number of bytes indicated.
602adf40 1169 */
f7760dad
AE
1170static struct bio *bio_clone_range(struct bio *bio_src,
1171 unsigned int offset,
1172 unsigned int len,
1173 gfp_t gfpmask)
602adf40 1174{
f7760dad
AE
1175 struct bio_vec *bv;
1176 unsigned int resid;
1177 unsigned short idx;
1178 unsigned int voff;
1179 unsigned short end_idx;
1180 unsigned short vcnt;
1181 struct bio *bio;
1182
1183 /* Handle the easy case for the caller */
1184
1185 if (!offset && len == bio_src->bi_size)
1186 return bio_clone(bio_src, gfpmask);
1187
1188 if (WARN_ON_ONCE(!len))
1189 return NULL;
1190 if (WARN_ON_ONCE(len > bio_src->bi_size))
1191 return NULL;
1192 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1193 return NULL;
1194
1195 /* Find first affected segment... */
1196
1197 resid = offset;
1198 __bio_for_each_segment(bv, bio_src, idx, 0) {
1199 if (resid < bv->bv_len)
1200 break;
1201 resid -= bv->bv_len;
602adf40 1202 }
f7760dad 1203 voff = resid;
602adf40 1204
f7760dad 1205 /* ...and the last affected segment */
602adf40 1206
f7760dad
AE
1207 resid += len;
1208 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1209 if (resid <= bv->bv_len)
1210 break;
1211 resid -= bv->bv_len;
1212 }
1213 vcnt = end_idx - idx + 1;
1214
1215 /* Build the clone */
1216
1217 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1218 if (!bio)
1219 return NULL; /* ENOMEM */
602adf40 1220
f7760dad
AE
1221 bio->bi_bdev = bio_src->bi_bdev;
1222 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1223 bio->bi_rw = bio_src->bi_rw;
1224 bio->bi_flags |= 1 << BIO_CLONED;
1225
1226 /*
1227 * Copy over our part of the bio_vec, then update the first
1228 * and last (or only) entries.
1229 */
1230 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1231 vcnt * sizeof (struct bio_vec));
1232 bio->bi_io_vec[0].bv_offset += voff;
1233 if (vcnt > 1) {
1234 bio->bi_io_vec[0].bv_len -= voff;
1235 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1236 } else {
1237 bio->bi_io_vec[0].bv_len = len;
602adf40
YS
1238 }
1239
f7760dad
AE
1240 bio->bi_vcnt = vcnt;
1241 bio->bi_size = len;
1242 bio->bi_idx = 0;
1243
1244 return bio;
1245}
1246
1247/*
1248 * Clone a portion of a bio chain, starting at the given byte offset
1249 * into the first bio in the source chain and continuing for the
1250 * number of bytes indicated. The result is another bio chain of
1251 * exactly the given length, or a null pointer on error.
1252 *
1253 * The bio_src and offset parameters are both in-out. On entry they
1254 * refer to the first source bio and the offset into that bio where
1255 * the start of data to be cloned is located.
1256 *
1257 * On return, bio_src is updated to refer to the bio in the source
1258 * chain that contains first un-cloned byte, and *offset will
1259 * contain the offset of that byte within that bio.
1260 */
1261static struct bio *bio_chain_clone_range(struct bio **bio_src,
1262 unsigned int *offset,
1263 unsigned int len,
1264 gfp_t gfpmask)
1265{
1266 struct bio *bi = *bio_src;
1267 unsigned int off = *offset;
1268 struct bio *chain = NULL;
1269 struct bio **end;
1270
1271 /* Build up a chain of clone bios up to the limit */
1272
1273 if (!bi || off >= bi->bi_size || !len)
1274 return NULL; /* Nothing to clone */
602adf40 1275
f7760dad
AE
1276 end = &chain;
1277 while (len) {
1278 unsigned int bi_size;
1279 struct bio *bio;
1280
f5400b7a
AE
1281 if (!bi) {
1282 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
f7760dad 1283 goto out_err; /* EINVAL; ran out of bio's */
f5400b7a 1284 }
f7760dad
AE
1285 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1286 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1287 if (!bio)
1288 goto out_err; /* ENOMEM */
1289
1290 *end = bio;
1291 end = &bio->bi_next;
602adf40 1292
f7760dad
AE
1293 off += bi_size;
1294 if (off == bi->bi_size) {
1295 bi = bi->bi_next;
1296 off = 0;
1297 }
1298 len -= bi_size;
1299 }
1300 *bio_src = bi;
1301 *offset = off;
1302
1303 return chain;
1304out_err:
1305 bio_chain_put(chain);
602adf40 1306
602adf40
YS
1307 return NULL;
1308}
1309
926f9b3f
AE
1310/*
1311 * The default/initial value for all object request flags is 0. For
1312 * each flag, once its value is set to 1 it is never reset to 0
1313 * again.
1314 */
57acbaa7 1315static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
926f9b3f 1316{
57acbaa7 1317 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
926f9b3f
AE
1318 struct rbd_device *rbd_dev;
1319
57acbaa7
AE
1320 rbd_dev = obj_request->img_request->rbd_dev;
1321 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
926f9b3f
AE
1322 obj_request);
1323 }
1324}
1325
57acbaa7 1326static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
926f9b3f
AE
1327{
1328 smp_mb();
57acbaa7 1329 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
926f9b3f
AE
1330}
1331
57acbaa7 1332static void obj_request_done_set(struct rbd_obj_request *obj_request)
6365d33a 1333{
57acbaa7
AE
1334 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1335 struct rbd_device *rbd_dev = NULL;
6365d33a 1336
57acbaa7
AE
1337 if (obj_request_img_data_test(obj_request))
1338 rbd_dev = obj_request->img_request->rbd_dev;
1339 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
6365d33a
AE
1340 obj_request);
1341 }
1342}
1343
57acbaa7 1344static bool obj_request_done_test(struct rbd_obj_request *obj_request)
6365d33a
AE
1345{
1346 smp_mb();
57acbaa7 1347 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
6365d33a
AE
1348}
1349
5679c59f
AE
1350/*
1351 * This sets the KNOWN flag after (possibly) setting the EXISTS
1352 * flag. The latter is set based on the "exists" value provided.
1353 *
1354 * Note that for our purposes once an object exists it never goes
1355 * away again. It's possible that the response from two existence
1356 * checks are separated by the creation of the target object, and
1357 * the first ("doesn't exist") response arrives *after* the second
1358 * ("does exist"). In that case we ignore the second one.
1359 */
1360static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1361 bool exists)
1362{
1363 if (exists)
1364 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1365 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1366 smp_mb();
1367}
1368
1369static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1370{
1371 smp_mb();
1372 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1373}
1374
1375static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1376{
1377 smp_mb();
1378 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1379}
1380
bf0d5f50
AE
1381static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1382{
37206ee5
AE
1383 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1384 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1385 kref_get(&obj_request->kref);
1386}
1387
1388static void rbd_obj_request_destroy(struct kref *kref);
1389static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1390{
1391 rbd_assert(obj_request != NULL);
37206ee5
AE
1392 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1393 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1394 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1395}
1396
e93f3152
AE
1397static bool img_request_child_test(struct rbd_img_request *img_request);
1398static void rbd_parent_request_destroy(struct kref *kref);
bf0d5f50
AE
1399static void rbd_img_request_destroy(struct kref *kref);
1400static void rbd_img_request_put(struct rbd_img_request *img_request)
1401{
1402 rbd_assert(img_request != NULL);
37206ee5
AE
1403 dout("%s: img %p (was %d)\n", __func__, img_request,
1404 atomic_read(&img_request->kref.refcount));
e93f3152
AE
1405 if (img_request_child_test(img_request))
1406 kref_put(&img_request->kref, rbd_parent_request_destroy);
1407 else
1408 kref_put(&img_request->kref, rbd_img_request_destroy);
bf0d5f50
AE
1409}
1410
1411static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1412 struct rbd_obj_request *obj_request)
1413{
25dcf954
AE
1414 rbd_assert(obj_request->img_request == NULL);
1415
b155e86c 1416 /* Image request now owns object's original reference */
bf0d5f50 1417 obj_request->img_request = img_request;
25dcf954 1418 obj_request->which = img_request->obj_request_count;
6365d33a
AE
1419 rbd_assert(!obj_request_img_data_test(obj_request));
1420 obj_request_img_data_set(obj_request);
bf0d5f50 1421 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954
AE
1422 img_request->obj_request_count++;
1423 list_add_tail(&obj_request->links, &img_request->obj_requests);
37206ee5
AE
1424 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1425 obj_request->which);
bf0d5f50
AE
1426}
1427
1428static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1429 struct rbd_obj_request *obj_request)
1430{
1431 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954 1432
37206ee5
AE
1433 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1434 obj_request->which);
bf0d5f50 1435 list_del(&obj_request->links);
25dcf954
AE
1436 rbd_assert(img_request->obj_request_count > 0);
1437 img_request->obj_request_count--;
1438 rbd_assert(obj_request->which == img_request->obj_request_count);
1439 obj_request->which = BAD_WHICH;
6365d33a 1440 rbd_assert(obj_request_img_data_test(obj_request));
bf0d5f50 1441 rbd_assert(obj_request->img_request == img_request);
bf0d5f50 1442 obj_request->img_request = NULL;
25dcf954 1443 obj_request->callback = NULL;
bf0d5f50
AE
1444 rbd_obj_request_put(obj_request);
1445}
1446
1447static bool obj_request_type_valid(enum obj_request_type type)
1448{
1449 switch (type) {
9969ebc5 1450 case OBJ_REQUEST_NODATA:
bf0d5f50 1451 case OBJ_REQUEST_BIO:
788e2df3 1452 case OBJ_REQUEST_PAGES:
bf0d5f50
AE
1453 return true;
1454 default:
1455 return false;
1456 }
1457}
1458
bf0d5f50
AE
1459static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1460 struct rbd_obj_request *obj_request)
1461{
37206ee5
AE
1462 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1463
bf0d5f50
AE
1464 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1465}
1466
1467static void rbd_img_request_complete(struct rbd_img_request *img_request)
1468{
55f27e09 1469
37206ee5 1470 dout("%s: img %p\n", __func__, img_request);
55f27e09
AE
1471
1472 /*
1473 * If no error occurred, compute the aggregate transfer
1474 * count for the image request. We could instead use
1475 * atomic64_cmpxchg() to update it as each object request
1476 * completes; not clear which way is better off hand.
1477 */
1478 if (!img_request->result) {
1479 struct rbd_obj_request *obj_request;
1480 u64 xferred = 0;
1481
1482 for_each_obj_request(img_request, obj_request)
1483 xferred += obj_request->xferred;
1484 img_request->xferred = xferred;
1485 }
1486
bf0d5f50
AE
1487 if (img_request->callback)
1488 img_request->callback(img_request);
1489 else
1490 rbd_img_request_put(img_request);
1491}
1492
788e2df3
AE
1493/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1494
1495static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1496{
37206ee5
AE
1497 dout("%s: obj %p\n", __func__, obj_request);
1498
788e2df3
AE
1499 return wait_for_completion_interruptible(&obj_request->completion);
1500}
1501
0c425248
AE
1502/*
1503 * The default/initial value for all image request flags is 0. Each
1504 * is conditionally set to 1 at image request initialization time
1505 * and currently never change thereafter.
1506 */
1507static void img_request_write_set(struct rbd_img_request *img_request)
1508{
1509 set_bit(IMG_REQ_WRITE, &img_request->flags);
1510 smp_mb();
1511}
1512
1513static bool img_request_write_test(struct rbd_img_request *img_request)
1514{
1515 smp_mb();
1516 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1517}
1518
9849e986
AE
1519static void img_request_child_set(struct rbd_img_request *img_request)
1520{
1521 set_bit(IMG_REQ_CHILD, &img_request->flags);
1522 smp_mb();
1523}
1524
e93f3152
AE
1525static void img_request_child_clear(struct rbd_img_request *img_request)
1526{
1527 clear_bit(IMG_REQ_CHILD, &img_request->flags);
1528 smp_mb();
1529}
1530
9849e986
AE
1531static bool img_request_child_test(struct rbd_img_request *img_request)
1532{
1533 smp_mb();
1534 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1535}
1536
d0b2e944
AE
1537static void img_request_layered_set(struct rbd_img_request *img_request)
1538{
1539 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1540 smp_mb();
1541}
1542
a2acd00e
AE
1543static void img_request_layered_clear(struct rbd_img_request *img_request)
1544{
1545 clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1546 smp_mb();
1547}
1548
d0b2e944
AE
1549static bool img_request_layered_test(struct rbd_img_request *img_request)
1550{
1551 smp_mb();
1552 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1553}
1554
6e2a4505
AE
1555static void
1556rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1557{
b9434c5b
AE
1558 u64 xferred = obj_request->xferred;
1559 u64 length = obj_request->length;
1560
6e2a4505
AE
1561 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1562 obj_request, obj_request->img_request, obj_request->result,
b9434c5b 1563 xferred, length);
6e2a4505
AE
1564 /*
1565 * ENOENT means a hole in the image. We zero-fill the
1566 * entire length of the request. A short read also implies
1567 * zero-fill to the end of the request. Either way we
1568 * update the xferred count to indicate the whole request
1569 * was satisfied.
1570 */
b9434c5b 1571 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
6e2a4505 1572 if (obj_request->result == -ENOENT) {
b9434c5b
AE
1573 if (obj_request->type == OBJ_REQUEST_BIO)
1574 zero_bio_chain(obj_request->bio_list, 0);
1575 else
1576 zero_pages(obj_request->pages, 0, length);
6e2a4505 1577 obj_request->result = 0;
b9434c5b
AE
1578 obj_request->xferred = length;
1579 } else if (xferred < length && !obj_request->result) {
1580 if (obj_request->type == OBJ_REQUEST_BIO)
1581 zero_bio_chain(obj_request->bio_list, xferred);
1582 else
1583 zero_pages(obj_request->pages, xferred, length);
1584 obj_request->xferred = length;
6e2a4505
AE
1585 }
1586 obj_request_done_set(obj_request);
1587}
1588
bf0d5f50
AE
1589static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1590{
37206ee5
AE
1591 dout("%s: obj %p cb %p\n", __func__, obj_request,
1592 obj_request->callback);
bf0d5f50
AE
1593 if (obj_request->callback)
1594 obj_request->callback(obj_request);
788e2df3
AE
1595 else
1596 complete_all(&obj_request->completion);
bf0d5f50
AE
1597}
1598
c47f9371 1599static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
39bf2c5d
AE
1600{
1601 dout("%s: obj %p\n", __func__, obj_request);
1602 obj_request_done_set(obj_request);
1603}
1604
c47f9371 1605static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1606{
57acbaa7 1607 struct rbd_img_request *img_request = NULL;
a9e8ba2c 1608 struct rbd_device *rbd_dev = NULL;
57acbaa7
AE
1609 bool layered = false;
1610
1611 if (obj_request_img_data_test(obj_request)) {
1612 img_request = obj_request->img_request;
1613 layered = img_request && img_request_layered_test(img_request);
a9e8ba2c 1614 rbd_dev = img_request->rbd_dev;
57acbaa7 1615 }
8b3e1a56
AE
1616
1617 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1618 obj_request, img_request, obj_request->result,
1619 obj_request->xferred, obj_request->length);
a9e8ba2c
AE
1620 if (layered && obj_request->result == -ENOENT &&
1621 obj_request->img_offset < rbd_dev->parent_overlap)
8b3e1a56
AE
1622 rbd_img_parent_read(obj_request);
1623 else if (img_request)
6e2a4505
AE
1624 rbd_img_obj_request_read_callback(obj_request);
1625 else
1626 obj_request_done_set(obj_request);
bf0d5f50
AE
1627}
1628
c47f9371 1629static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1630{
1b83bef2
SW
1631 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1632 obj_request->result, obj_request->length);
1633 /*
8b3e1a56
AE
1634 * There is no such thing as a successful short write. Set
1635 * it to our originally-requested length.
1b83bef2
SW
1636 */
1637 obj_request->xferred = obj_request->length;
07741308 1638 obj_request_done_set(obj_request);
bf0d5f50
AE
1639}
1640
fbfab539
AE
1641/*
1642 * For a simple stat call there's nothing to do. We'll do more if
1643 * this is part of a write sequence for a layered image.
1644 */
c47f9371 1645static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
fbfab539 1646{
37206ee5 1647 dout("%s: obj %p\n", __func__, obj_request);
fbfab539
AE
1648 obj_request_done_set(obj_request);
1649}
1650
bf0d5f50
AE
1651static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1652 struct ceph_msg *msg)
1653{
1654 struct rbd_obj_request *obj_request = osd_req->r_priv;
bf0d5f50
AE
1655 u16 opcode;
1656
37206ee5 1657 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
bf0d5f50 1658 rbd_assert(osd_req == obj_request->osd_req);
57acbaa7
AE
1659 if (obj_request_img_data_test(obj_request)) {
1660 rbd_assert(obj_request->img_request);
1661 rbd_assert(obj_request->which != BAD_WHICH);
1662 } else {
1663 rbd_assert(obj_request->which == BAD_WHICH);
1664 }
bf0d5f50 1665
1b83bef2
SW
1666 if (osd_req->r_result < 0)
1667 obj_request->result = osd_req->r_result;
bf0d5f50 1668
0eefd470 1669 BUG_ON(osd_req->r_num_ops > 2);
bf0d5f50 1670
c47f9371
AE
1671 /*
1672 * We support a 64-bit length, but ultimately it has to be
1673 * passed to blk_end_request(), which takes an unsigned int.
1674 */
1b83bef2 1675 obj_request->xferred = osd_req->r_reply_op_len[0];
8b3e1a56 1676 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
79528734 1677 opcode = osd_req->r_ops[0].op;
bf0d5f50
AE
1678 switch (opcode) {
1679 case CEPH_OSD_OP_READ:
c47f9371 1680 rbd_osd_read_callback(obj_request);
bf0d5f50
AE
1681 break;
1682 case CEPH_OSD_OP_WRITE:
c47f9371 1683 rbd_osd_write_callback(obj_request);
bf0d5f50 1684 break;
fbfab539 1685 case CEPH_OSD_OP_STAT:
c47f9371 1686 rbd_osd_stat_callback(obj_request);
fbfab539 1687 break;
36be9a76 1688 case CEPH_OSD_OP_CALL:
b8d70035 1689 case CEPH_OSD_OP_NOTIFY_ACK:
9969ebc5 1690 case CEPH_OSD_OP_WATCH:
c47f9371 1691 rbd_osd_trivial_callback(obj_request);
9969ebc5 1692 break;
bf0d5f50
AE
1693 default:
1694 rbd_warn(NULL, "%s: unsupported op %hu\n",
1695 obj_request->object_name, (unsigned short) opcode);
1696 break;
1697 }
1698
07741308 1699 if (obj_request_done_test(obj_request))
bf0d5f50
AE
1700 rbd_obj_request_complete(obj_request);
1701}
1702
9d4df01f 1703static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
430c28c3
AE
1704{
1705 struct rbd_img_request *img_request = obj_request->img_request;
8c042b0d 1706 struct ceph_osd_request *osd_req = obj_request->osd_req;
9d4df01f 1707 u64 snap_id;
430c28c3 1708
8c042b0d 1709 rbd_assert(osd_req != NULL);
430c28c3 1710
9d4df01f 1711 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
8c042b0d 1712 ceph_osdc_build_request(osd_req, obj_request->offset,
9d4df01f
AE
1713 NULL, snap_id, NULL);
1714}
1715
1716static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1717{
1718 struct rbd_img_request *img_request = obj_request->img_request;
1719 struct ceph_osd_request *osd_req = obj_request->osd_req;
1720 struct ceph_snap_context *snapc;
1721 struct timespec mtime = CURRENT_TIME;
1722
1723 rbd_assert(osd_req != NULL);
1724
1725 snapc = img_request ? img_request->snapc : NULL;
1726 ceph_osdc_build_request(osd_req, obj_request->offset,
1727 snapc, CEPH_NOSNAP, &mtime);
430c28c3
AE
1728}
1729
bf0d5f50
AE
1730static struct ceph_osd_request *rbd_osd_req_create(
1731 struct rbd_device *rbd_dev,
1732 bool write_request,
430c28c3 1733 struct rbd_obj_request *obj_request)
bf0d5f50 1734{
bf0d5f50
AE
1735 struct ceph_snap_context *snapc = NULL;
1736 struct ceph_osd_client *osdc;
1737 struct ceph_osd_request *osd_req;
bf0d5f50 1738
6365d33a
AE
1739 if (obj_request_img_data_test(obj_request)) {
1740 struct rbd_img_request *img_request = obj_request->img_request;
1741
0c425248
AE
1742 rbd_assert(write_request ==
1743 img_request_write_test(img_request));
1744 if (write_request)
bf0d5f50 1745 snapc = img_request->snapc;
bf0d5f50
AE
1746 }
1747
1748 /* Allocate and initialize the request, for the single op */
1749
1750 osdc = &rbd_dev->rbd_client->client->osdc;
1751 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1752 if (!osd_req)
1753 return NULL; /* ENOMEM */
bf0d5f50 1754
430c28c3 1755 if (write_request)
bf0d5f50 1756 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
430c28c3 1757 else
bf0d5f50 1758 osd_req->r_flags = CEPH_OSD_FLAG_READ;
bf0d5f50
AE
1759
1760 osd_req->r_callback = rbd_osd_req_callback;
1761 osd_req->r_priv = obj_request;
1762
1763 osd_req->r_oid_len = strlen(obj_request->object_name);
1764 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1765 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1766
1767 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1768
bf0d5f50
AE
1769 return osd_req;
1770}
1771
0eefd470
AE
1772/*
1773 * Create a copyup osd request based on the information in the
1774 * object request supplied. A copyup request has two osd ops,
1775 * a copyup method call, and a "normal" write request.
1776 */
1777static struct ceph_osd_request *
1778rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1779{
1780 struct rbd_img_request *img_request;
1781 struct ceph_snap_context *snapc;
1782 struct rbd_device *rbd_dev;
1783 struct ceph_osd_client *osdc;
1784 struct ceph_osd_request *osd_req;
1785
1786 rbd_assert(obj_request_img_data_test(obj_request));
1787 img_request = obj_request->img_request;
1788 rbd_assert(img_request);
1789 rbd_assert(img_request_write_test(img_request));
1790
1791 /* Allocate and initialize the request, for the two ops */
1792
1793 snapc = img_request->snapc;
1794 rbd_dev = img_request->rbd_dev;
1795 osdc = &rbd_dev->rbd_client->client->osdc;
1796 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1797 if (!osd_req)
1798 return NULL; /* ENOMEM */
1799
1800 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1801 osd_req->r_callback = rbd_osd_req_callback;
1802 osd_req->r_priv = obj_request;
1803
1804 osd_req->r_oid_len = strlen(obj_request->object_name);
1805 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1806 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1807
1808 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1809
1810 return osd_req;
1811}
1812
1813
bf0d5f50
AE
1814static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1815{
1816 ceph_osdc_put_request(osd_req);
1817}
1818
1819/* object_name is assumed to be a non-null pointer and NUL-terminated */
1820
1821static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1822 u64 offset, u64 length,
1823 enum obj_request_type type)
1824{
1825 struct rbd_obj_request *obj_request;
1826 size_t size;
1827 char *name;
1828
1829 rbd_assert(obj_request_type_valid(type));
1830
1831 size = strlen(object_name) + 1;
f907ad55
AE
1832 name = kmalloc(size, GFP_KERNEL);
1833 if (!name)
bf0d5f50
AE
1834 return NULL;
1835
868311b1 1836 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
f907ad55
AE
1837 if (!obj_request) {
1838 kfree(name);
1839 return NULL;
1840 }
1841
bf0d5f50
AE
1842 obj_request->object_name = memcpy(name, object_name, size);
1843 obj_request->offset = offset;
1844 obj_request->length = length;
926f9b3f 1845 obj_request->flags = 0;
bf0d5f50
AE
1846 obj_request->which = BAD_WHICH;
1847 obj_request->type = type;
1848 INIT_LIST_HEAD(&obj_request->links);
788e2df3 1849 init_completion(&obj_request->completion);
bf0d5f50
AE
1850 kref_init(&obj_request->kref);
1851
37206ee5
AE
1852 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1853 offset, length, (int)type, obj_request);
1854
bf0d5f50
AE
1855 return obj_request;
1856}
1857
1858static void rbd_obj_request_destroy(struct kref *kref)
1859{
1860 struct rbd_obj_request *obj_request;
1861
1862 obj_request = container_of(kref, struct rbd_obj_request, kref);
1863
37206ee5
AE
1864 dout("%s: obj %p\n", __func__, obj_request);
1865
bf0d5f50
AE
1866 rbd_assert(obj_request->img_request == NULL);
1867 rbd_assert(obj_request->which == BAD_WHICH);
1868
1869 if (obj_request->osd_req)
1870 rbd_osd_req_destroy(obj_request->osd_req);
1871
1872 rbd_assert(obj_request_type_valid(obj_request->type));
1873 switch (obj_request->type) {
9969ebc5
AE
1874 case OBJ_REQUEST_NODATA:
1875 break; /* Nothing to do */
bf0d5f50
AE
1876 case OBJ_REQUEST_BIO:
1877 if (obj_request->bio_list)
1878 bio_chain_put(obj_request->bio_list);
1879 break;
788e2df3
AE
1880 case OBJ_REQUEST_PAGES:
1881 if (obj_request->pages)
1882 ceph_release_page_vector(obj_request->pages,
1883 obj_request->page_count);
1884 break;
bf0d5f50
AE
1885 }
1886
f907ad55 1887 kfree(obj_request->object_name);
868311b1
AE
1888 obj_request->object_name = NULL;
1889 kmem_cache_free(rbd_obj_request_cache, obj_request);
bf0d5f50
AE
1890}
1891
fb65d228
AE
1892/* It's OK to call this for a device with no parent */
1893
1894static void rbd_spec_put(struct rbd_spec *spec);
1895static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1896{
1897 rbd_dev_remove_parent(rbd_dev);
1898 rbd_spec_put(rbd_dev->parent_spec);
1899 rbd_dev->parent_spec = NULL;
1900 rbd_dev->parent_overlap = 0;
1901}
1902
a2acd00e
AE
1903/*
1904 * Parent image reference counting is used to determine when an
1905 * image's parent fields can be safely torn down--after there are no
1906 * more in-flight requests to the parent image. When the last
1907 * reference is dropped, cleaning them up is safe.
1908 */
1909static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1910{
1911 int counter;
1912
1913 if (!rbd_dev->parent_spec)
1914 return;
1915
1916 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1917 if (counter > 0)
1918 return;
1919
1920 /* Last reference; clean up parent data structures */
1921
1922 if (!counter)
1923 rbd_dev_unparent(rbd_dev);
1924 else
1925 rbd_warn(rbd_dev, "parent reference underflow\n");
1926}
1927
1928/*
1929 * If an image has a non-zero parent overlap, get a reference to its
1930 * parent.
1931 *
392a9dad
AE
1932 * We must get the reference before checking for the overlap to
1933 * coordinate properly with zeroing the parent overlap in
1934 * rbd_dev_v2_parent_info() when an image gets flattened. We
1935 * drop it again if there is no overlap.
1936 *
a2acd00e
AE
1937 * Returns true if the rbd device has a parent with a non-zero
1938 * overlap and a reference for it was successfully taken, or
1939 * false otherwise.
1940 */
1941static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1942{
1943 int counter;
1944
1945 if (!rbd_dev->parent_spec)
1946 return false;
1947
1948 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1949 if (counter > 0 && rbd_dev->parent_overlap)
1950 return true;
1951
1952 /* Image was flattened, but parent is not yet torn down */
1953
1954 if (counter < 0)
1955 rbd_warn(rbd_dev, "parent reference overflow\n");
1956
1957 return false;
1958}
1959
bf0d5f50
AE
1960/*
1961 * Caller is responsible for filling in the list of object requests
1962 * that comprises the image request, and the Linux request pointer
1963 * (if there is one).
1964 */
cc344fa1
AE
1965static struct rbd_img_request *rbd_img_request_create(
1966 struct rbd_device *rbd_dev,
bf0d5f50 1967 u64 offset, u64 length,
e93f3152 1968 bool write_request)
bf0d5f50
AE
1969{
1970 struct rbd_img_request *img_request;
bf0d5f50 1971
1c2a9dfe 1972 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
bf0d5f50
AE
1973 if (!img_request)
1974 return NULL;
1975
1976 if (write_request) {
1977 down_read(&rbd_dev->header_rwsem);
812164f8 1978 ceph_get_snap_context(rbd_dev->header.snapc);
bf0d5f50 1979 up_read(&rbd_dev->header_rwsem);
bf0d5f50
AE
1980 }
1981
1982 img_request->rq = NULL;
1983 img_request->rbd_dev = rbd_dev;
1984 img_request->offset = offset;
1985 img_request->length = length;
0c425248
AE
1986 img_request->flags = 0;
1987 if (write_request) {
1988 img_request_write_set(img_request);
468521c1 1989 img_request->snapc = rbd_dev->header.snapc;
0c425248 1990 } else {
bf0d5f50 1991 img_request->snap_id = rbd_dev->spec->snap_id;
0c425248 1992 }
a2acd00e 1993 if (rbd_dev_parent_get(rbd_dev))
d0b2e944 1994 img_request_layered_set(img_request);
bf0d5f50
AE
1995 spin_lock_init(&img_request->completion_lock);
1996 img_request->next_completion = 0;
1997 img_request->callback = NULL;
a5a337d4 1998 img_request->result = 0;
bf0d5f50
AE
1999 img_request->obj_request_count = 0;
2000 INIT_LIST_HEAD(&img_request->obj_requests);
2001 kref_init(&img_request->kref);
2002
37206ee5
AE
2003 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
2004 write_request ? "write" : "read", offset, length,
2005 img_request);
2006
bf0d5f50
AE
2007 return img_request;
2008}
2009
2010static void rbd_img_request_destroy(struct kref *kref)
2011{
2012 struct rbd_img_request *img_request;
2013 struct rbd_obj_request *obj_request;
2014 struct rbd_obj_request *next_obj_request;
2015
2016 img_request = container_of(kref, struct rbd_img_request, kref);
2017
37206ee5
AE
2018 dout("%s: img %p\n", __func__, img_request);
2019
bf0d5f50
AE
2020 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2021 rbd_img_obj_request_del(img_request, obj_request);
25dcf954 2022 rbd_assert(img_request->obj_request_count == 0);
bf0d5f50 2023
a2acd00e
AE
2024 if (img_request_layered_test(img_request)) {
2025 img_request_layered_clear(img_request);
2026 rbd_dev_parent_put(img_request->rbd_dev);
2027 }
2028
0c425248 2029 if (img_request_write_test(img_request))
812164f8 2030 ceph_put_snap_context(img_request->snapc);
bf0d5f50 2031
1c2a9dfe 2032 kmem_cache_free(rbd_img_request_cache, img_request);
bf0d5f50
AE
2033}
2034
e93f3152
AE
2035static struct rbd_img_request *rbd_parent_request_create(
2036 struct rbd_obj_request *obj_request,
2037 u64 img_offset, u64 length)
2038{
2039 struct rbd_img_request *parent_request;
2040 struct rbd_device *rbd_dev;
2041
2042 rbd_assert(obj_request->img_request);
2043 rbd_dev = obj_request->img_request->rbd_dev;
2044
2045 parent_request = rbd_img_request_create(rbd_dev->parent,
2046 img_offset, length, false);
2047 if (!parent_request)
2048 return NULL;
2049
2050 img_request_child_set(parent_request);
2051 rbd_obj_request_get(obj_request);
2052 parent_request->obj_request = obj_request;
2053
2054 return parent_request;
2055}
2056
2057static void rbd_parent_request_destroy(struct kref *kref)
2058{
2059 struct rbd_img_request *parent_request;
2060 struct rbd_obj_request *orig_request;
2061
2062 parent_request = container_of(kref, struct rbd_img_request, kref);
2063 orig_request = parent_request->obj_request;
2064
2065 parent_request->obj_request = NULL;
2066 rbd_obj_request_put(orig_request);
2067 img_request_child_clear(parent_request);
2068
2069 rbd_img_request_destroy(kref);
2070}
2071
1217857f
AE
2072static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2073{
6365d33a 2074 struct rbd_img_request *img_request;
1217857f
AE
2075 unsigned int xferred;
2076 int result;
8b3e1a56 2077 bool more;
1217857f 2078
6365d33a
AE
2079 rbd_assert(obj_request_img_data_test(obj_request));
2080 img_request = obj_request->img_request;
2081
1217857f
AE
2082 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2083 xferred = (unsigned int)obj_request->xferred;
2084 result = obj_request->result;
2085 if (result) {
2086 struct rbd_device *rbd_dev = img_request->rbd_dev;
2087
2088 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
2089 img_request_write_test(img_request) ? "write" : "read",
2090 obj_request->length, obj_request->img_offset,
2091 obj_request->offset);
2092 rbd_warn(rbd_dev, " result %d xferred %x\n",
2093 result, xferred);
2094 if (!img_request->result)
2095 img_request->result = result;
2096 }
2097
f1a4739f
AE
2098 /* Image object requests don't own their page array */
2099
2100 if (obj_request->type == OBJ_REQUEST_PAGES) {
2101 obj_request->pages = NULL;
2102 obj_request->page_count = 0;
2103 }
2104
8b3e1a56
AE
2105 if (img_request_child_test(img_request)) {
2106 rbd_assert(img_request->obj_request != NULL);
2107 more = obj_request->which < img_request->obj_request_count - 1;
2108 } else {
2109 rbd_assert(img_request->rq != NULL);
2110 more = blk_end_request(img_request->rq, result, xferred);
2111 }
2112
2113 return more;
1217857f
AE
2114}
2115
2169238d
AE
2116static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2117{
2118 struct rbd_img_request *img_request;
2119 u32 which = obj_request->which;
2120 bool more = true;
2121
6365d33a 2122 rbd_assert(obj_request_img_data_test(obj_request));
2169238d
AE
2123 img_request = obj_request->img_request;
2124
2125 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2126 rbd_assert(img_request != NULL);
2169238d
AE
2127 rbd_assert(img_request->obj_request_count > 0);
2128 rbd_assert(which != BAD_WHICH);
2129 rbd_assert(which < img_request->obj_request_count);
2130 rbd_assert(which >= img_request->next_completion);
2131
2132 spin_lock_irq(&img_request->completion_lock);
2133 if (which != img_request->next_completion)
2134 goto out;
2135
2136 for_each_obj_request_from(img_request, obj_request) {
2169238d
AE
2137 rbd_assert(more);
2138 rbd_assert(which < img_request->obj_request_count);
2139
2140 if (!obj_request_done_test(obj_request))
2141 break;
1217857f 2142 more = rbd_img_obj_end_request(obj_request);
2169238d
AE
2143 which++;
2144 }
2145
2146 rbd_assert(more ^ (which == img_request->obj_request_count));
2147 img_request->next_completion = which;
2148out:
2149 spin_unlock_irq(&img_request->completion_lock);
2150
2151 if (!more)
2152 rbd_img_request_complete(img_request);
2153}
2154
f1a4739f
AE
2155/*
2156 * Split up an image request into one or more object requests, each
2157 * to a different object. The "type" parameter indicates whether
2158 * "data_desc" is the pointer to the head of a list of bio
2159 * structures, or the base of a page array. In either case this
2160 * function assumes data_desc describes memory sufficient to hold
2161 * all data described by the image request.
2162 */
2163static int rbd_img_request_fill(struct rbd_img_request *img_request,
2164 enum obj_request_type type,
2165 void *data_desc)
bf0d5f50
AE
2166{
2167 struct rbd_device *rbd_dev = img_request->rbd_dev;
2168 struct rbd_obj_request *obj_request = NULL;
2169 struct rbd_obj_request *next_obj_request;
0c425248 2170 bool write_request = img_request_write_test(img_request);
f1a4739f
AE
2171 struct bio *bio_list;
2172 unsigned int bio_offset = 0;
2173 struct page **pages;
7da22d29 2174 u64 img_offset;
bf0d5f50
AE
2175 u64 resid;
2176 u16 opcode;
2177
f1a4739f
AE
2178 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2179 (int)type, data_desc);
37206ee5 2180
430c28c3 2181 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
7da22d29 2182 img_offset = img_request->offset;
bf0d5f50 2183 resid = img_request->length;
4dda41d3 2184 rbd_assert(resid > 0);
f1a4739f
AE
2185
2186 if (type == OBJ_REQUEST_BIO) {
2187 bio_list = data_desc;
2188 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2189 } else {
2190 rbd_assert(type == OBJ_REQUEST_PAGES);
2191 pages = data_desc;
2192 }
2193
bf0d5f50 2194 while (resid) {
2fa12320 2195 struct ceph_osd_request *osd_req;
bf0d5f50 2196 const char *object_name;
bf0d5f50
AE
2197 u64 offset;
2198 u64 length;
2199
7da22d29 2200 object_name = rbd_segment_name(rbd_dev, img_offset);
bf0d5f50
AE
2201 if (!object_name)
2202 goto out_unwind;
7da22d29
AE
2203 offset = rbd_segment_offset(rbd_dev, img_offset);
2204 length = rbd_segment_length(rbd_dev, img_offset, resid);
bf0d5f50 2205 obj_request = rbd_obj_request_create(object_name,
f1a4739f 2206 offset, length, type);
78c2a44a
AE
2207 /* object request has its own copy of the object name */
2208 rbd_segment_name_free(object_name);
bf0d5f50
AE
2209 if (!obj_request)
2210 goto out_unwind;
2211
f1a4739f
AE
2212 if (type == OBJ_REQUEST_BIO) {
2213 unsigned int clone_size;
2214
2215 rbd_assert(length <= (u64)UINT_MAX);
2216 clone_size = (unsigned int)length;
2217 obj_request->bio_list =
2218 bio_chain_clone_range(&bio_list,
2219 &bio_offset,
2220 clone_size,
2221 GFP_ATOMIC);
2222 if (!obj_request->bio_list)
2223 goto out_partial;
2224 } else {
2225 unsigned int page_count;
2226
2227 obj_request->pages = pages;
2228 page_count = (u32)calc_pages_for(offset, length);
2229 obj_request->page_count = page_count;
2230 if ((offset + length) & ~PAGE_MASK)
2231 page_count--; /* more on last page */
2232 pages += page_count;
2233 }
bf0d5f50 2234
2fa12320
AE
2235 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2236 obj_request);
2237 if (!osd_req)
bf0d5f50 2238 goto out_partial;
2fa12320 2239 obj_request->osd_req = osd_req;
2169238d 2240 obj_request->callback = rbd_img_obj_callback;
430c28c3 2241
2fa12320
AE
2242 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2243 0, 0);
f1a4739f
AE
2244 if (type == OBJ_REQUEST_BIO)
2245 osd_req_op_extent_osd_data_bio(osd_req, 0,
2246 obj_request->bio_list, length);
2247 else
2248 osd_req_op_extent_osd_data_pages(osd_req, 0,
2249 obj_request->pages, length,
2250 offset & ~PAGE_MASK, false, false);
9d4df01f
AE
2251
2252 if (write_request)
2253 rbd_osd_req_format_write(obj_request);
2254 else
2255 rbd_osd_req_format_read(obj_request);
430c28c3 2256
7da22d29 2257 obj_request->img_offset = img_offset;
bf0d5f50
AE
2258 rbd_img_obj_request_add(img_request, obj_request);
2259
7da22d29 2260 img_offset += length;
bf0d5f50
AE
2261 resid -= length;
2262 }
2263
2264 return 0;
2265
2266out_partial:
2267 rbd_obj_request_put(obj_request);
2268out_unwind:
2269 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2270 rbd_obj_request_put(obj_request);
2271
2272 return -ENOMEM;
2273}
2274
0eefd470
AE
2275static void
2276rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2277{
2278 struct rbd_img_request *img_request;
2279 struct rbd_device *rbd_dev;
ebda6408 2280 struct page **pages;
0eefd470
AE
2281 u32 page_count;
2282
2283 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2284 rbd_assert(obj_request_img_data_test(obj_request));
2285 img_request = obj_request->img_request;
2286 rbd_assert(img_request);
2287
2288 rbd_dev = img_request->rbd_dev;
2289 rbd_assert(rbd_dev);
0eefd470 2290
ebda6408
AE
2291 pages = obj_request->copyup_pages;
2292 rbd_assert(pages != NULL);
0eefd470 2293 obj_request->copyup_pages = NULL;
ebda6408
AE
2294 page_count = obj_request->copyup_page_count;
2295 rbd_assert(page_count);
2296 obj_request->copyup_page_count = 0;
2297 ceph_release_page_vector(pages, page_count);
0eefd470
AE
2298
2299 /*
2300 * We want the transfer count to reflect the size of the
2301 * original write request. There is no such thing as a
2302 * successful short write, so if the request was successful
2303 * we can just set it to the originally-requested length.
2304 */
2305 if (!obj_request->result)
2306 obj_request->xferred = obj_request->length;
2307
2308 /* Finish up with the normal image object callback */
2309
2310 rbd_img_obj_callback(obj_request);
2311}
2312
3d7efd18
AE
2313static void
2314rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2315{
2316 struct rbd_obj_request *orig_request;
0eefd470
AE
2317 struct ceph_osd_request *osd_req;
2318 struct ceph_osd_client *osdc;
2319 struct rbd_device *rbd_dev;
3d7efd18 2320 struct page **pages;
ebda6408 2321 u32 page_count;
bbea1c1a 2322 int img_result;
ebda6408 2323 u64 parent_length;
b91f09f1
AE
2324 u64 offset;
2325 u64 length;
3d7efd18
AE
2326
2327 rbd_assert(img_request_child_test(img_request));
2328
2329 /* First get what we need from the image request */
2330
2331 pages = img_request->copyup_pages;
2332 rbd_assert(pages != NULL);
2333 img_request->copyup_pages = NULL;
ebda6408
AE
2334 page_count = img_request->copyup_page_count;
2335 rbd_assert(page_count);
2336 img_request->copyup_page_count = 0;
3d7efd18
AE
2337
2338 orig_request = img_request->obj_request;
2339 rbd_assert(orig_request != NULL);
b91f09f1 2340 rbd_assert(obj_request_type_valid(orig_request->type));
bbea1c1a 2341 img_result = img_request->result;
ebda6408
AE
2342 parent_length = img_request->length;
2343 rbd_assert(parent_length == img_request->xferred);
91c6febb 2344 rbd_img_request_put(img_request);
3d7efd18 2345
91c6febb
AE
2346 rbd_assert(orig_request->img_request);
2347 rbd_dev = orig_request->img_request->rbd_dev;
0eefd470 2348 rbd_assert(rbd_dev);
0eefd470 2349
bbea1c1a
AE
2350 /*
2351 * If the overlap has become 0 (most likely because the
2352 * image has been flattened) we need to free the pages
2353 * and re-submit the original write request.
2354 */
2355 if (!rbd_dev->parent_overlap) {
2356 struct ceph_osd_client *osdc;
2357
2358 ceph_release_page_vector(pages, page_count);
2359 osdc = &rbd_dev->rbd_client->client->osdc;
2360 img_result = rbd_obj_request_submit(osdc, orig_request);
2361 if (!img_result)
2362 return;
2363 }
2364
2365 if (img_result)
0eefd470
AE
2366 goto out_err;
2367
8785b1d4
AE
2368 /*
2369 * The original osd request is of no use to use any more.
2370 * We need a new one that can hold the two ops in a copyup
2371 * request. Allocate the new copyup osd request for the
2372 * original request, and release the old one.
2373 */
bbea1c1a 2374 img_result = -ENOMEM;
0eefd470
AE
2375 osd_req = rbd_osd_req_create_copyup(orig_request);
2376 if (!osd_req)
2377 goto out_err;
8785b1d4 2378 rbd_osd_req_destroy(orig_request->osd_req);
0eefd470
AE
2379 orig_request->osd_req = osd_req;
2380 orig_request->copyup_pages = pages;
ebda6408 2381 orig_request->copyup_page_count = page_count;
3d7efd18 2382
0eefd470 2383 /* Initialize the copyup op */
3d7efd18 2384
0eefd470 2385 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
ebda6408 2386 osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
0eefd470 2387 false, false);
3d7efd18 2388
0eefd470
AE
2389 /* Then the original write request op */
2390
b91f09f1
AE
2391 offset = orig_request->offset;
2392 length = orig_request->length;
0eefd470 2393 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
b91f09f1
AE
2394 offset, length, 0, 0);
2395 if (orig_request->type == OBJ_REQUEST_BIO)
2396 osd_req_op_extent_osd_data_bio(osd_req, 1,
2397 orig_request->bio_list, length);
2398 else
2399 osd_req_op_extent_osd_data_pages(osd_req, 1,
2400 orig_request->pages, length,
2401 offset & ~PAGE_MASK, false, false);
0eefd470
AE
2402
2403 rbd_osd_req_format_write(orig_request);
2404
2405 /* All set, send it off. */
2406
2407 orig_request->callback = rbd_img_obj_copyup_callback;
2408 osdc = &rbd_dev->rbd_client->client->osdc;
bbea1c1a
AE
2409 img_result = rbd_obj_request_submit(osdc, orig_request);
2410 if (!img_result)
0eefd470
AE
2411 return;
2412out_err:
2413 /* Record the error code and complete the request */
2414
bbea1c1a 2415 orig_request->result = img_result;
0eefd470
AE
2416 orig_request->xferred = 0;
2417 obj_request_done_set(orig_request);
2418 rbd_obj_request_complete(orig_request);
3d7efd18
AE
2419}
2420
2421/*
2422 * Read from the parent image the range of data that covers the
2423 * entire target of the given object request. This is used for
2424 * satisfying a layered image write request when the target of an
2425 * object request from the image request does not exist.
2426 *
2427 * A page array big enough to hold the returned data is allocated
2428 * and supplied to rbd_img_request_fill() as the "data descriptor."
2429 * When the read completes, this page array will be transferred to
2430 * the original object request for the copyup operation.
2431 *
2432 * If an error occurs, record it as the result of the original
2433 * object request and mark it done so it gets completed.
2434 */
2435static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2436{
2437 struct rbd_img_request *img_request = NULL;
2438 struct rbd_img_request *parent_request = NULL;
2439 struct rbd_device *rbd_dev;
2440 u64 img_offset;
2441 u64 length;
2442 struct page **pages = NULL;
2443 u32 page_count;
2444 int result;
2445
2446 rbd_assert(obj_request_img_data_test(obj_request));
b91f09f1 2447 rbd_assert(obj_request_type_valid(obj_request->type));
3d7efd18
AE
2448
2449 img_request = obj_request->img_request;
2450 rbd_assert(img_request != NULL);
2451 rbd_dev = img_request->rbd_dev;
2452 rbd_assert(rbd_dev->parent != NULL);
2453
2454 /*
2455 * Determine the byte range covered by the object in the
2456 * child image to which the original request was to be sent.
2457 */
2458 img_offset = obj_request->img_offset - obj_request->offset;
2459 length = (u64)1 << rbd_dev->header.obj_order;
2460
a9e8ba2c
AE
2461 /*
2462 * There is no defined parent data beyond the parent
2463 * overlap, so limit what we read at that boundary if
2464 * necessary.
2465 */
2466 if (img_offset + length > rbd_dev->parent_overlap) {
2467 rbd_assert(img_offset < rbd_dev->parent_overlap);
2468 length = rbd_dev->parent_overlap - img_offset;
2469 }
2470
3d7efd18
AE
2471 /*
2472 * Allocate a page array big enough to receive the data read
2473 * from the parent.
2474 */
2475 page_count = (u32)calc_pages_for(0, length);
2476 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2477 if (IS_ERR(pages)) {
2478 result = PTR_ERR(pages);
2479 pages = NULL;
2480 goto out_err;
2481 }
2482
2483 result = -ENOMEM;
e93f3152
AE
2484 parent_request = rbd_parent_request_create(obj_request,
2485 img_offset, length);
3d7efd18
AE
2486 if (!parent_request)
2487 goto out_err;
3d7efd18
AE
2488
2489 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2490 if (result)
2491 goto out_err;
2492 parent_request->copyup_pages = pages;
ebda6408 2493 parent_request->copyup_page_count = page_count;
3d7efd18
AE
2494
2495 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2496 result = rbd_img_request_submit(parent_request);
2497 if (!result)
2498 return 0;
2499
2500 parent_request->copyup_pages = NULL;
ebda6408 2501 parent_request->copyup_page_count = 0;
3d7efd18
AE
2502 parent_request->obj_request = NULL;
2503 rbd_obj_request_put(obj_request);
2504out_err:
2505 if (pages)
2506 ceph_release_page_vector(pages, page_count);
2507 if (parent_request)
2508 rbd_img_request_put(parent_request);
2509 obj_request->result = result;
2510 obj_request->xferred = 0;
2511 obj_request_done_set(obj_request);
2512
2513 return result;
2514}
2515
c5b5ef6c
AE
2516static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2517{
c5b5ef6c 2518 struct rbd_obj_request *orig_request;
638f5abe 2519 struct rbd_device *rbd_dev;
c5b5ef6c
AE
2520 int result;
2521
2522 rbd_assert(!obj_request_img_data_test(obj_request));
2523
2524 /*
2525 * All we need from the object request is the original
2526 * request and the result of the STAT op. Grab those, then
2527 * we're done with the request.
2528 */
2529 orig_request = obj_request->obj_request;
2530 obj_request->obj_request = NULL;
2531 rbd_assert(orig_request);
2532 rbd_assert(orig_request->img_request);
2533
2534 result = obj_request->result;
2535 obj_request->result = 0;
2536
2537 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2538 obj_request, orig_request, result,
2539 obj_request->xferred, obj_request->length);
2540 rbd_obj_request_put(obj_request);
2541
638f5abe
AE
2542 /*
2543 * If the overlap has become 0 (most likely because the
2544 * image has been flattened) we need to free the pages
2545 * and re-submit the original write request.
2546 */
2547 rbd_dev = orig_request->img_request->rbd_dev;
2548 if (!rbd_dev->parent_overlap) {
2549 struct ceph_osd_client *osdc;
2550
2551 rbd_obj_request_put(orig_request);
2552 osdc = &rbd_dev->rbd_client->client->osdc;
2553 result = rbd_obj_request_submit(osdc, orig_request);
2554 if (!result)
2555 return;
2556 }
c5b5ef6c
AE
2557
2558 /*
2559 * Our only purpose here is to determine whether the object
2560 * exists, and we don't want to treat the non-existence as
2561 * an error. If something else comes back, transfer the
2562 * error to the original request and complete it now.
2563 */
2564 if (!result) {
2565 obj_request_existence_set(orig_request, true);
2566 } else if (result == -ENOENT) {
2567 obj_request_existence_set(orig_request, false);
2568 } else if (result) {
2569 orig_request->result = result;
3d7efd18 2570 goto out;
c5b5ef6c
AE
2571 }
2572
2573 /*
2574 * Resubmit the original request now that we have recorded
2575 * whether the target object exists.
2576 */
b454e36d 2577 orig_request->result = rbd_img_obj_request_submit(orig_request);
3d7efd18 2578out:
c5b5ef6c
AE
2579 if (orig_request->result)
2580 rbd_obj_request_complete(orig_request);
2581 rbd_obj_request_put(orig_request);
2582}
2583
2584static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2585{
2586 struct rbd_obj_request *stat_request;
2587 struct rbd_device *rbd_dev;
2588 struct ceph_osd_client *osdc;
2589 struct page **pages = NULL;
2590 u32 page_count;
2591 size_t size;
2592 int ret;
2593
2594 /*
2595 * The response data for a STAT call consists of:
2596 * le64 length;
2597 * struct {
2598 * le32 tv_sec;
2599 * le32 tv_nsec;
2600 * } mtime;
2601 */
2602 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2603 page_count = (u32)calc_pages_for(0, size);
2604 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2605 if (IS_ERR(pages))
2606 return PTR_ERR(pages);
2607
2608 ret = -ENOMEM;
2609 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2610 OBJ_REQUEST_PAGES);
2611 if (!stat_request)
2612 goto out;
2613
2614 rbd_obj_request_get(obj_request);
2615 stat_request->obj_request = obj_request;
2616 stat_request->pages = pages;
2617 stat_request->page_count = page_count;
2618
2619 rbd_assert(obj_request->img_request);
2620 rbd_dev = obj_request->img_request->rbd_dev;
2621 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2622 stat_request);
2623 if (!stat_request->osd_req)
2624 goto out;
2625 stat_request->callback = rbd_img_obj_exists_callback;
2626
2627 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2628 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2629 false, false);
9d4df01f 2630 rbd_osd_req_format_read(stat_request);
c5b5ef6c
AE
2631
2632 osdc = &rbd_dev->rbd_client->client->osdc;
2633 ret = rbd_obj_request_submit(osdc, stat_request);
2634out:
2635 if (ret)
2636 rbd_obj_request_put(obj_request);
2637
2638 return ret;
2639}
2640
b454e36d
AE
2641static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2642{
2643 struct rbd_img_request *img_request;
a9e8ba2c 2644 struct rbd_device *rbd_dev;
3d7efd18 2645 bool known;
b454e36d
AE
2646
2647 rbd_assert(obj_request_img_data_test(obj_request));
2648
2649 img_request = obj_request->img_request;
2650 rbd_assert(img_request);
a9e8ba2c 2651 rbd_dev = img_request->rbd_dev;
b454e36d 2652
b454e36d 2653 /*
a9e8ba2c
AE
2654 * Only writes to layered images need special handling.
2655 * Reads and non-layered writes are simple object requests.
2656 * Layered writes that start beyond the end of the overlap
2657 * with the parent have no parent data, so they too are
2658 * simple object requests. Finally, if the target object is
2659 * known to already exist, its parent data has already been
2660 * copied, so a write to the object can also be handled as a
2661 * simple object request.
b454e36d
AE
2662 */
2663 if (!img_request_write_test(img_request) ||
2664 !img_request_layered_test(img_request) ||
a9e8ba2c 2665 rbd_dev->parent_overlap <= obj_request->img_offset ||
3d7efd18
AE
2666 ((known = obj_request_known_test(obj_request)) &&
2667 obj_request_exists_test(obj_request))) {
b454e36d
AE
2668
2669 struct rbd_device *rbd_dev;
2670 struct ceph_osd_client *osdc;
2671
2672 rbd_dev = obj_request->img_request->rbd_dev;
2673 osdc = &rbd_dev->rbd_client->client->osdc;
2674
2675 return rbd_obj_request_submit(osdc, obj_request);
2676 }
2677
2678 /*
3d7efd18
AE
2679 * It's a layered write. The target object might exist but
2680 * we may not know that yet. If we know it doesn't exist,
2681 * start by reading the data for the full target object from
2682 * the parent so we can use it for a copyup to the target.
b454e36d 2683 */
3d7efd18
AE
2684 if (known)
2685 return rbd_img_obj_parent_read_full(obj_request);
2686
2687 /* We don't know whether the target exists. Go find out. */
b454e36d
AE
2688
2689 return rbd_img_obj_exists_submit(obj_request);
2690}
2691
bf0d5f50
AE
2692static int rbd_img_request_submit(struct rbd_img_request *img_request)
2693{
bf0d5f50 2694 struct rbd_obj_request *obj_request;
46faeed4 2695 struct rbd_obj_request *next_obj_request;
bf0d5f50 2696
37206ee5 2697 dout("%s: img %p\n", __func__, img_request);
46faeed4 2698 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
bf0d5f50
AE
2699 int ret;
2700
b454e36d 2701 ret = rbd_img_obj_request_submit(obj_request);
bf0d5f50
AE
2702 if (ret)
2703 return ret;
bf0d5f50
AE
2704 }
2705
2706 return 0;
2707}
8b3e1a56
AE
2708
2709static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2710{
2711 struct rbd_obj_request *obj_request;
a9e8ba2c
AE
2712 struct rbd_device *rbd_dev;
2713 u64 obj_end;
02c74fba
AE
2714 u64 img_xferred;
2715 int img_result;
8b3e1a56
AE
2716
2717 rbd_assert(img_request_child_test(img_request));
2718
02c74fba
AE
2719 /* First get what we need from the image request and release it */
2720
8b3e1a56 2721 obj_request = img_request->obj_request;
02c74fba
AE
2722 img_xferred = img_request->xferred;
2723 img_result = img_request->result;
2724 rbd_img_request_put(img_request);
2725
2726 /*
2727 * If the overlap has become 0 (most likely because the
2728 * image has been flattened) we need to re-submit the
2729 * original request.
2730 */
a9e8ba2c
AE
2731 rbd_assert(obj_request);
2732 rbd_assert(obj_request->img_request);
02c74fba
AE
2733 rbd_dev = obj_request->img_request->rbd_dev;
2734 if (!rbd_dev->parent_overlap) {
2735 struct ceph_osd_client *osdc;
2736
2737 osdc = &rbd_dev->rbd_client->client->osdc;
2738 img_result = rbd_obj_request_submit(osdc, obj_request);
2739 if (!img_result)
2740 return;
2741 }
a9e8ba2c 2742
02c74fba 2743 obj_request->result = img_result;
a9e8ba2c
AE
2744 if (obj_request->result)
2745 goto out;
2746
2747 /*
2748 * We need to zero anything beyond the parent overlap
2749 * boundary. Since rbd_img_obj_request_read_callback()
2750 * will zero anything beyond the end of a short read, an
2751 * easy way to do this is to pretend the data from the
2752 * parent came up short--ending at the overlap boundary.
2753 */
2754 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2755 obj_end = obj_request->img_offset + obj_request->length;
a9e8ba2c
AE
2756 if (obj_end > rbd_dev->parent_overlap) {
2757 u64 xferred = 0;
2758
2759 if (obj_request->img_offset < rbd_dev->parent_overlap)
2760 xferred = rbd_dev->parent_overlap -
2761 obj_request->img_offset;
8b3e1a56 2762
02c74fba 2763 obj_request->xferred = min(img_xferred, xferred);
a9e8ba2c 2764 } else {
02c74fba 2765 obj_request->xferred = img_xferred;
a9e8ba2c
AE
2766 }
2767out:
8b3e1a56
AE
2768 rbd_img_obj_request_read_callback(obj_request);
2769 rbd_obj_request_complete(obj_request);
2770}
2771
2772static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2773{
8b3e1a56
AE
2774 struct rbd_img_request *img_request;
2775 int result;
2776
2777 rbd_assert(obj_request_img_data_test(obj_request));
2778 rbd_assert(obj_request->img_request != NULL);
2779 rbd_assert(obj_request->result == (s32) -ENOENT);
5b2ab72d 2780 rbd_assert(obj_request_type_valid(obj_request->type));
8b3e1a56 2781
8b3e1a56 2782 /* rbd_read_finish(obj_request, obj_request->length); */
e93f3152 2783 img_request = rbd_parent_request_create(obj_request,
8b3e1a56 2784 obj_request->img_offset,
e93f3152 2785 obj_request->length);
8b3e1a56
AE
2786 result = -ENOMEM;
2787 if (!img_request)
2788 goto out_err;
2789
5b2ab72d
AE
2790 if (obj_request->type == OBJ_REQUEST_BIO)
2791 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2792 obj_request->bio_list);
2793 else
2794 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
2795 obj_request->pages);
8b3e1a56
AE
2796 if (result)
2797 goto out_err;
2798
2799 img_request->callback = rbd_img_parent_read_callback;
2800 result = rbd_img_request_submit(img_request);
2801 if (result)
2802 goto out_err;
2803
2804 return;
2805out_err:
2806 if (img_request)
2807 rbd_img_request_put(img_request);
2808 obj_request->result = result;
2809 obj_request->xferred = 0;
2810 obj_request_done_set(obj_request);
2811}
bf0d5f50 2812
cc4a38bd 2813static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
b8d70035
AE
2814{
2815 struct rbd_obj_request *obj_request;
2169238d 2816 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
b8d70035
AE
2817 int ret;
2818
2819 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2820 OBJ_REQUEST_NODATA);
2821 if (!obj_request)
2822 return -ENOMEM;
2823
2824 ret = -ENOMEM;
430c28c3 2825 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
b8d70035
AE
2826 if (!obj_request->osd_req)
2827 goto out;
2169238d 2828 obj_request->callback = rbd_obj_request_put;
b8d70035 2829
c99d2d4a 2830 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
cc4a38bd 2831 notify_id, 0, 0);
9d4df01f 2832 rbd_osd_req_format_read(obj_request);
430c28c3 2833
b8d70035 2834 ret = rbd_obj_request_submit(osdc, obj_request);
b8d70035 2835out:
cf81b60e
AE
2836 if (ret)
2837 rbd_obj_request_put(obj_request);
b8d70035
AE
2838
2839 return ret;
2840}
2841
2842static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2843{
2844 struct rbd_device *rbd_dev = (struct rbd_device *)data;
e627db08 2845 int ret;
b8d70035
AE
2846
2847 if (!rbd_dev)
2848 return;
2849
37206ee5 2850 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
cc4a38bd
AE
2851 rbd_dev->header_name, (unsigned long long)notify_id,
2852 (unsigned int)opcode);
e627db08
AE
2853 ret = rbd_dev_refresh(rbd_dev);
2854 if (ret)
2855 rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
b8d70035 2856
cc4a38bd 2857 rbd_obj_notify_ack(rbd_dev, notify_id);
b8d70035
AE
2858}
2859
9969ebc5
AE
2860/*
2861 * Request sync osd watch/unwatch. The value of "start" determines
2862 * whether a watch request is being initiated or torn down.
2863 */
1f3ef788 2864static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
9969ebc5
AE
2865{
2866 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2867 struct rbd_obj_request *obj_request;
9969ebc5
AE
2868 int ret;
2869
2870 rbd_assert(start ^ !!rbd_dev->watch_event);
2871 rbd_assert(start ^ !!rbd_dev->watch_request);
2872
2873 if (start) {
3c663bbd 2874 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
9969ebc5
AE
2875 &rbd_dev->watch_event);
2876 if (ret < 0)
2877 return ret;
8eb87565 2878 rbd_assert(rbd_dev->watch_event != NULL);
9969ebc5
AE
2879 }
2880
2881 ret = -ENOMEM;
2882 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2883 OBJ_REQUEST_NODATA);
2884 if (!obj_request)
2885 goto out_cancel;
2886
430c28c3
AE
2887 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2888 if (!obj_request->osd_req)
2889 goto out_cancel;
2890
8eb87565 2891 if (start)
975241af 2892 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
8eb87565 2893 else
6977c3f9 2894 ceph_osdc_unregister_linger_request(osdc,
975241af 2895 rbd_dev->watch_request->osd_req);
2169238d
AE
2896
2897 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
1f3ef788 2898 rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
9d4df01f 2899 rbd_osd_req_format_write(obj_request);
2169238d 2900
9969ebc5
AE
2901 ret = rbd_obj_request_submit(osdc, obj_request);
2902 if (ret)
2903 goto out_cancel;
2904 ret = rbd_obj_request_wait(obj_request);
2905 if (ret)
2906 goto out_cancel;
9969ebc5
AE
2907 ret = obj_request->result;
2908 if (ret)
2909 goto out_cancel;
2910
8eb87565
AE
2911 /*
2912 * A watch request is set to linger, so the underlying osd
2913 * request won't go away until we unregister it. We retain
2914 * a pointer to the object request during that time (in
2915 * rbd_dev->watch_request), so we'll keep a reference to
2916 * it. We'll drop that reference (below) after we've
2917 * unregistered it.
2918 */
2919 if (start) {
2920 rbd_dev->watch_request = obj_request;
2921
2922 return 0;
2923 }
2924
2925 /* We have successfully torn down the watch request */
2926
2927 rbd_obj_request_put(rbd_dev->watch_request);
2928 rbd_dev->watch_request = NULL;
9969ebc5
AE
2929out_cancel:
2930 /* Cancel the event if we're tearing down, or on error */
2931 ceph_osdc_cancel_event(rbd_dev->watch_event);
2932 rbd_dev->watch_event = NULL;
9969ebc5
AE
2933 if (obj_request)
2934 rbd_obj_request_put(obj_request);
2935
2936 return ret;
2937}
2938
36be9a76 2939/*
f40eb349
AE
2940 * Synchronous osd object method call. Returns the number of bytes
2941 * returned in the outbound buffer, or a negative error code.
36be9a76
AE
2942 */
2943static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2944 const char *object_name,
2945 const char *class_name,
2946 const char *method_name,
4157976b 2947 const void *outbound,
36be9a76 2948 size_t outbound_size,
4157976b 2949 void *inbound,
e2a58ee5 2950 size_t inbound_size)
36be9a76 2951{
2169238d 2952 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
36be9a76 2953 struct rbd_obj_request *obj_request;
36be9a76
AE
2954 struct page **pages;
2955 u32 page_count;
2956 int ret;
2957
2958 /*
6010a451
AE
2959 * Method calls are ultimately read operations. The result
2960 * should placed into the inbound buffer provided. They
2961 * also supply outbound data--parameters for the object
2962 * method. Currently if this is present it will be a
2963 * snapshot id.
36be9a76 2964 */
57385b51 2965 page_count = (u32)calc_pages_for(0, inbound_size);
36be9a76
AE
2966 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2967 if (IS_ERR(pages))
2968 return PTR_ERR(pages);
2969
2970 ret = -ENOMEM;
6010a451 2971 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
36be9a76
AE
2972 OBJ_REQUEST_PAGES);
2973 if (!obj_request)
2974 goto out;
2975
2976 obj_request->pages = pages;
2977 obj_request->page_count = page_count;
2978
430c28c3 2979 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
36be9a76
AE
2980 if (!obj_request->osd_req)
2981 goto out;
2982
c99d2d4a 2983 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
04017e29
AE
2984 class_name, method_name);
2985 if (outbound_size) {
2986 struct ceph_pagelist *pagelist;
2987
2988 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2989 if (!pagelist)
2990 goto out;
2991
2992 ceph_pagelist_init(pagelist);
2993 ceph_pagelist_append(pagelist, outbound, outbound_size);
2994 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2995 pagelist);
2996 }
a4ce40a9
AE
2997 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2998 obj_request->pages, inbound_size,
44cd188d 2999 0, false, false);
9d4df01f 3000 rbd_osd_req_format_read(obj_request);
430c28c3 3001
36be9a76
AE
3002 ret = rbd_obj_request_submit(osdc, obj_request);
3003 if (ret)
3004 goto out;
3005 ret = rbd_obj_request_wait(obj_request);
3006 if (ret)
3007 goto out;
3008
3009 ret = obj_request->result;
3010 if (ret < 0)
3011 goto out;
57385b51
AE
3012
3013 rbd_assert(obj_request->xferred < (u64)INT_MAX);
3014 ret = (int)obj_request->xferred;
903bb32e 3015 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
36be9a76
AE
3016out:
3017 if (obj_request)
3018 rbd_obj_request_put(obj_request);
3019 else
3020 ceph_release_page_vector(pages, page_count);
3021
3022 return ret;
3023}
3024
bf0d5f50 3025static void rbd_request_fn(struct request_queue *q)
cc344fa1 3026 __releases(q->queue_lock) __acquires(q->queue_lock)
bf0d5f50
AE
3027{
3028 struct rbd_device *rbd_dev = q->queuedata;
3029 bool read_only = rbd_dev->mapping.read_only;
3030 struct request *rq;
3031 int result;
3032
3033 while ((rq = blk_fetch_request(q))) {
3034 bool write_request = rq_data_dir(rq) == WRITE;
3035 struct rbd_img_request *img_request;
3036 u64 offset;
3037 u64 length;
3038
3039 /* Ignore any non-FS requests that filter through. */
3040
3041 if (rq->cmd_type != REQ_TYPE_FS) {
4dda41d3
AE
3042 dout("%s: non-fs request type %d\n", __func__,
3043 (int) rq->cmd_type);
3044 __blk_end_request_all(rq, 0);
3045 continue;
3046 }
3047
3048 /* Ignore/skip any zero-length requests */
3049
3050 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
3051 length = (u64) blk_rq_bytes(rq);
3052
3053 if (!length) {
3054 dout("%s: zero-length request\n", __func__);
bf0d5f50
AE
3055 __blk_end_request_all(rq, 0);
3056 continue;
3057 }
3058
3059 spin_unlock_irq(q->queue_lock);
3060
3061 /* Disallow writes to a read-only device */
3062
3063 if (write_request) {
3064 result = -EROFS;
3065 if (read_only)
3066 goto end_request;
3067 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
3068 }
3069
6d292906
AE
3070 /*
3071 * Quit early if the mapped snapshot no longer
3072 * exists. It's still possible the snapshot will
3073 * have disappeared by the time our request arrives
3074 * at the osd, but there's no sense in sending it if
3075 * we already know.
3076 */
3077 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
bf0d5f50
AE
3078 dout("request for non-existent snapshot");
3079 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3080 result = -ENXIO;
3081 goto end_request;
3082 }
3083
bf0d5f50 3084 result = -EINVAL;
c0cd10db
AE
3085 if (offset && length > U64_MAX - offset + 1) {
3086 rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
3087 offset, length);
bf0d5f50 3088 goto end_request; /* Shouldn't happen */
c0cd10db 3089 }
bf0d5f50 3090
00a653e2
AE
3091 result = -EIO;
3092 if (offset + length > rbd_dev->mapping.size) {
3093 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
3094 offset, length, rbd_dev->mapping.size);
3095 goto end_request;
3096 }
3097
bf0d5f50
AE
3098 result = -ENOMEM;
3099 img_request = rbd_img_request_create(rbd_dev, offset, length,
e93f3152 3100 write_request);
bf0d5f50
AE
3101 if (!img_request)
3102 goto end_request;
3103
3104 img_request->rq = rq;
3105
f1a4739f
AE
3106 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3107 rq->bio);
bf0d5f50
AE
3108 if (!result)
3109 result = rbd_img_request_submit(img_request);
3110 if (result)
3111 rbd_img_request_put(img_request);
3112end_request:
3113 spin_lock_irq(q->queue_lock);
3114 if (result < 0) {
7da22d29
AE
3115 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
3116 write_request ? "write" : "read",
3117 length, offset, result);
3118
bf0d5f50
AE
3119 __blk_end_request_all(rq, result);
3120 }
3121 }
3122}
3123
602adf40
YS
3124/*
3125 * a queue callback. Makes sure that we don't create a bio that spans across
3126 * multiple osd objects. One exception would be with a single page bios,
f7760dad 3127 * which we handle later at bio_chain_clone_range()
602adf40
YS
3128 */
3129static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
3130 struct bio_vec *bvec)
3131{
3132 struct rbd_device *rbd_dev = q->queuedata;
e5cfeed2
AE
3133 sector_t sector_offset;
3134 sector_t sectors_per_obj;
3135 sector_t obj_sector_offset;
3136 int ret;
3137
3138 /*
3139 * Find how far into its rbd object the partition-relative
3140 * bio start sector is to offset relative to the enclosing
3141 * device.
3142 */
3143 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
3144 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
3145 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
3146
3147 /*
3148 * Compute the number of bytes from that offset to the end
3149 * of the object. Account for what's already used by the bio.
3150 */
3151 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
3152 if (ret > bmd->bi_size)
3153 ret -= bmd->bi_size;
3154 else
3155 ret = 0;
3156
3157 /*
3158 * Don't send back more than was asked for. And if the bio
3159 * was empty, let the whole thing through because: "Note
3160 * that a block device *must* allow a single page to be
3161 * added to an empty bio."
3162 */
3163 rbd_assert(bvec->bv_len <= PAGE_SIZE);
3164 if (ret > (int) bvec->bv_len || !bmd->bi_size)
3165 ret = (int) bvec->bv_len;
3166
3167 return ret;
602adf40
YS
3168}
3169
3170static void rbd_free_disk(struct rbd_device *rbd_dev)
3171{
3172 struct gendisk *disk = rbd_dev->disk;
3173
3174 if (!disk)
3175 return;
3176
a0cab924
AE
3177 rbd_dev->disk = NULL;
3178 if (disk->flags & GENHD_FL_UP) {
602adf40 3179 del_gendisk(disk);
a0cab924
AE
3180 if (disk->queue)
3181 blk_cleanup_queue(disk->queue);
3182 }
602adf40
YS
3183 put_disk(disk);
3184}
3185
788e2df3
AE
3186static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3187 const char *object_name,
7097f8df 3188 u64 offset, u64 length, void *buf)
788e2df3
AE
3189
3190{
2169238d 3191 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
788e2df3 3192 struct rbd_obj_request *obj_request;
788e2df3
AE
3193 struct page **pages = NULL;
3194 u32 page_count;
1ceae7ef 3195 size_t size;
788e2df3
AE
3196 int ret;
3197
3198 page_count = (u32) calc_pages_for(offset, length);
3199 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3200 if (IS_ERR(pages))
3201 ret = PTR_ERR(pages);
3202
3203 ret = -ENOMEM;
3204 obj_request = rbd_obj_request_create(object_name, offset, length,
36be9a76 3205 OBJ_REQUEST_PAGES);
788e2df3
AE
3206 if (!obj_request)
3207 goto out;
3208
3209 obj_request->pages = pages;
3210 obj_request->page_count = page_count;
3211
430c28c3 3212 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
788e2df3
AE
3213 if (!obj_request->osd_req)
3214 goto out;
3215
c99d2d4a
AE
3216 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3217 offset, length, 0, 0);
406e2c9f 3218 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
a4ce40a9 3219 obj_request->pages,
44cd188d
AE
3220 obj_request->length,
3221 obj_request->offset & ~PAGE_MASK,
3222 false, false);
9d4df01f 3223 rbd_osd_req_format_read(obj_request);
430c28c3 3224
788e2df3
AE
3225 ret = rbd_obj_request_submit(osdc, obj_request);
3226 if (ret)
3227 goto out;
3228 ret = rbd_obj_request_wait(obj_request);
3229 if (ret)
3230 goto out;
3231
3232 ret = obj_request->result;
3233 if (ret < 0)
3234 goto out;
1ceae7ef
AE
3235
3236 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3237 size = (size_t) obj_request->xferred;
903bb32e 3238 ceph_copy_from_page_vector(pages, buf, 0, size);
7097f8df
AE
3239 rbd_assert(size <= (size_t)INT_MAX);
3240 ret = (int)size;
788e2df3
AE
3241out:
3242 if (obj_request)
3243 rbd_obj_request_put(obj_request);
3244 else
3245 ceph_release_page_vector(pages, page_count);
3246
3247 return ret;
3248}
3249
602adf40 3250/*
662518b1
AE
3251 * Read the complete header for the given rbd device. On successful
3252 * return, the rbd_dev->header field will contain up-to-date
3253 * information about the image.
602adf40 3254 */
99a41ebc 3255static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
602adf40 3256{
4156d998 3257 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 3258 u32 snap_count = 0;
4156d998
AE
3259 u64 names_size = 0;
3260 u32 want_count;
3261 int ret;
602adf40 3262
00f1f36f 3263 /*
4156d998
AE
3264 * The complete header will include an array of its 64-bit
3265 * snapshot ids, followed by the names of those snapshots as
3266 * a contiguous block of NUL-terminated strings. Note that
3267 * the number of snapshots could change by the time we read
3268 * it in, in which case we re-read it.
00f1f36f 3269 */
4156d998
AE
3270 do {
3271 size_t size;
3272
3273 kfree(ondisk);
3274
3275 size = sizeof (*ondisk);
3276 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3277 size += names_size;
3278 ondisk = kmalloc(size, GFP_KERNEL);
3279 if (!ondisk)
662518b1 3280 return -ENOMEM;
4156d998 3281
788e2df3 3282 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
7097f8df 3283 0, size, ondisk);
4156d998 3284 if (ret < 0)
662518b1 3285 goto out;
c0cd10db 3286 if ((size_t)ret < size) {
4156d998 3287 ret = -ENXIO;
06ecc6cb
AE
3288 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3289 size, ret);
662518b1 3290 goto out;
4156d998
AE
3291 }
3292 if (!rbd_dev_ondisk_valid(ondisk)) {
3293 ret = -ENXIO;
06ecc6cb 3294 rbd_warn(rbd_dev, "invalid header");
662518b1 3295 goto out;
81e759fb 3296 }
602adf40 3297
4156d998
AE
3298 names_size = le64_to_cpu(ondisk->snap_names_len);
3299 want_count = snap_count;
3300 snap_count = le32_to_cpu(ondisk->snap_count);
3301 } while (snap_count != want_count);
00f1f36f 3302
662518b1
AE
3303 ret = rbd_header_from_disk(rbd_dev, ondisk);
3304out:
4156d998
AE
3305 kfree(ondisk);
3306
3307 return ret;
602adf40
YS
3308}
3309
15228ede
AE
3310/*
3311 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3312 * has disappeared from the (just updated) snapshot context.
3313 */
3314static void rbd_exists_validate(struct rbd_device *rbd_dev)
3315{
3316 u64 snap_id;
3317
3318 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3319 return;
3320
3321 snap_id = rbd_dev->spec->snap_id;
3322 if (snap_id == CEPH_NOSNAP)
3323 return;
3324
3325 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3326 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3327}
3328
cc4a38bd 3329static int rbd_dev_refresh(struct rbd_device *rbd_dev)
1fe5e993 3330{
e627db08 3331 u64 mapping_size;
1fe5e993
AE
3332 int ret;
3333
117973fb 3334 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
e627db08 3335 mapping_size = rbd_dev->mapping.size;
1fe5e993 3336 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
117973fb 3337 if (rbd_dev->image_format == 1)
99a41ebc 3338 ret = rbd_dev_v1_header_info(rbd_dev);
117973fb 3339 else
2df3fac7 3340 ret = rbd_dev_v2_header_info(rbd_dev);
15228ede
AE
3341
3342 /* If it's a mapped snapshot, validate its EXISTS flag */
3343
3344 rbd_exists_validate(rbd_dev);
1fe5e993 3345 mutex_unlock(&ctl_mutex);
00a653e2
AE
3346 if (mapping_size != rbd_dev->mapping.size) {
3347 sector_t size;
3348
3349 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3350 dout("setting size to %llu sectors", (unsigned long long)size);
3351 set_capacity(rbd_dev->disk, size);
a3fbe5d4 3352 revalidate_disk(rbd_dev->disk);
00a653e2 3353 }
1fe5e993
AE
3354
3355 return ret;
3356}
3357
602adf40
YS
3358static int rbd_init_disk(struct rbd_device *rbd_dev)
3359{
3360 struct gendisk *disk;
3361 struct request_queue *q;
593a9e7b 3362 u64 segment_size;
602adf40 3363
602adf40 3364 /* create gendisk info */
602adf40
YS
3365 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3366 if (!disk)
1fcdb8aa 3367 return -ENOMEM;
602adf40 3368
f0f8cef5 3369 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 3370 rbd_dev->dev_id);
602adf40
YS
3371 disk->major = rbd_dev->major;
3372 disk->first_minor = 0;
3373 disk->fops = &rbd_bd_ops;
3374 disk->private_data = rbd_dev;
3375
bf0d5f50 3376 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
602adf40
YS
3377 if (!q)
3378 goto out_disk;
029bcbd8 3379
593a9e7b
AE
3380 /* We use the default size, but let's be explicit about it. */
3381 blk_queue_physical_block_size(q, SECTOR_SIZE);
3382
029bcbd8 3383 /* set io sizes to object size */
593a9e7b
AE
3384 segment_size = rbd_obj_bytes(&rbd_dev->header);
3385 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3386 blk_queue_max_segment_size(q, segment_size);
3387 blk_queue_io_min(q, segment_size);
3388 blk_queue_io_opt(q, segment_size);
029bcbd8 3389
602adf40
YS
3390 blk_queue_merge_bvec(q, rbd_merge_bvec);
3391 disk->queue = q;
3392
3393 q->queuedata = rbd_dev;
3394
3395 rbd_dev->disk = disk;
602adf40 3396
602adf40 3397 return 0;
602adf40
YS
3398out_disk:
3399 put_disk(disk);
1fcdb8aa
AE
3400
3401 return -ENOMEM;
602adf40
YS
3402}
3403
dfc5606d
YS
3404/*
3405 sysfs
3406*/
3407
593a9e7b
AE
3408static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3409{
3410 return container_of(dev, struct rbd_device, dev);
3411}
3412
dfc5606d
YS
3413static ssize_t rbd_size_show(struct device *dev,
3414 struct device_attribute *attr, char *buf)
3415{
593a9e7b 3416 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0 3417
fc71d833
AE
3418 return sprintf(buf, "%llu\n",
3419 (unsigned long long)rbd_dev->mapping.size);
dfc5606d
YS
3420}
3421
34b13184
AE
3422/*
3423 * Note this shows the features for whatever's mapped, which is not
3424 * necessarily the base image.
3425 */
3426static ssize_t rbd_features_show(struct device *dev,
3427 struct device_attribute *attr, char *buf)
3428{
3429 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3430
3431 return sprintf(buf, "0x%016llx\n",
fc71d833 3432 (unsigned long long)rbd_dev->mapping.features);
34b13184
AE
3433}
3434
dfc5606d
YS
3435static ssize_t rbd_major_show(struct device *dev,
3436 struct device_attribute *attr, char *buf)
3437{
593a9e7b 3438 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 3439
fc71d833
AE
3440 if (rbd_dev->major)
3441 return sprintf(buf, "%d\n", rbd_dev->major);
3442
3443 return sprintf(buf, "(none)\n");
3444
dfc5606d
YS
3445}
3446
3447static ssize_t rbd_client_id_show(struct device *dev,
3448 struct device_attribute *attr, char *buf)
602adf40 3449{
593a9e7b 3450 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3451
1dbb4399
AE
3452 return sprintf(buf, "client%lld\n",
3453 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
3454}
3455
dfc5606d
YS
3456static ssize_t rbd_pool_show(struct device *dev,
3457 struct device_attribute *attr, char *buf)
602adf40 3458{
593a9e7b 3459 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3460
0d7dbfce 3461 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
3462}
3463
9bb2f334
AE
3464static ssize_t rbd_pool_id_show(struct device *dev,
3465 struct device_attribute *attr, char *buf)
3466{
3467 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3468
0d7dbfce 3469 return sprintf(buf, "%llu\n",
fc71d833 3470 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
3471}
3472
dfc5606d
YS
3473static ssize_t rbd_name_show(struct device *dev,
3474 struct device_attribute *attr, char *buf)
3475{
593a9e7b 3476 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3477
a92ffdf8
AE
3478 if (rbd_dev->spec->image_name)
3479 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3480
3481 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
3482}
3483
589d30e0
AE
3484static ssize_t rbd_image_id_show(struct device *dev,
3485 struct device_attribute *attr, char *buf)
3486{
3487 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3488
0d7dbfce 3489 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
3490}
3491
34b13184
AE
3492/*
3493 * Shows the name of the currently-mapped snapshot (or
3494 * RBD_SNAP_HEAD_NAME for the base image).
3495 */
dfc5606d
YS
3496static ssize_t rbd_snap_show(struct device *dev,
3497 struct device_attribute *attr,
3498 char *buf)
3499{
593a9e7b 3500 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3501
0d7dbfce 3502 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
3503}
3504
86b00e0d
AE
3505/*
3506 * For an rbd v2 image, shows the pool id, image id, and snapshot id
3507 * for the parent image. If there is no parent, simply shows
3508 * "(no parent image)".
3509 */
3510static ssize_t rbd_parent_show(struct device *dev,
3511 struct device_attribute *attr,
3512 char *buf)
3513{
3514 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3515 struct rbd_spec *spec = rbd_dev->parent_spec;
3516 int count;
3517 char *bufp = buf;
3518
3519 if (!spec)
3520 return sprintf(buf, "(no parent image)\n");
3521
3522 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3523 (unsigned long long) spec->pool_id, spec->pool_name);
3524 if (count < 0)
3525 return count;
3526 bufp += count;
3527
3528 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3529 spec->image_name ? spec->image_name : "(unknown)");
3530 if (count < 0)
3531 return count;
3532 bufp += count;
3533
3534 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3535 (unsigned long long) spec->snap_id, spec->snap_name);
3536 if (count < 0)
3537 return count;
3538 bufp += count;
3539
3540 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3541 if (count < 0)
3542 return count;
3543 bufp += count;
3544
3545 return (ssize_t) (bufp - buf);
3546}
3547
dfc5606d
YS
3548static ssize_t rbd_image_refresh(struct device *dev,
3549 struct device_attribute *attr,
3550 const char *buf,
3551 size_t size)
3552{
593a9e7b 3553 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 3554 int ret;
602adf40 3555
cc4a38bd 3556 ret = rbd_dev_refresh(rbd_dev);
e627db08
AE
3557 if (ret)
3558 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
b813623a
AE
3559
3560 return ret < 0 ? ret : size;
dfc5606d 3561}
602adf40 3562
dfc5606d 3563static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 3564static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d
YS
3565static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3566static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3567static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 3568static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 3569static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 3570static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
3571static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3572static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
86b00e0d 3573static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
3574
3575static struct attribute *rbd_attrs[] = {
3576 &dev_attr_size.attr,
34b13184 3577 &dev_attr_features.attr,
dfc5606d
YS
3578 &dev_attr_major.attr,
3579 &dev_attr_client_id.attr,
3580 &dev_attr_pool.attr,
9bb2f334 3581 &dev_attr_pool_id.attr,
dfc5606d 3582 &dev_attr_name.attr,
589d30e0 3583 &dev_attr_image_id.attr,
dfc5606d 3584 &dev_attr_current_snap.attr,
86b00e0d 3585 &dev_attr_parent.attr,
dfc5606d 3586 &dev_attr_refresh.attr,
dfc5606d
YS
3587 NULL
3588};
3589
3590static struct attribute_group rbd_attr_group = {
3591 .attrs = rbd_attrs,
3592};
3593
3594static const struct attribute_group *rbd_attr_groups[] = {
3595 &rbd_attr_group,
3596 NULL
3597};
3598
3599static void rbd_sysfs_dev_release(struct device *dev)
3600{
3601}
3602
3603static struct device_type rbd_device_type = {
3604 .name = "rbd",
3605 .groups = rbd_attr_groups,
3606 .release = rbd_sysfs_dev_release,
3607};
3608
8b8fb99c
AE
3609static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3610{
3611 kref_get(&spec->kref);
3612
3613 return spec;
3614}
3615
3616static void rbd_spec_free(struct kref *kref);
3617static void rbd_spec_put(struct rbd_spec *spec)
3618{
3619 if (spec)
3620 kref_put(&spec->kref, rbd_spec_free);
3621}
3622
3623static struct rbd_spec *rbd_spec_alloc(void)
3624{
3625 struct rbd_spec *spec;
3626
3627 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3628 if (!spec)
3629 return NULL;
3630 kref_init(&spec->kref);
3631
8b8fb99c
AE
3632 return spec;
3633}
3634
3635static void rbd_spec_free(struct kref *kref)
3636{
3637 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3638
3639 kfree(spec->pool_name);
3640 kfree(spec->image_id);
3641 kfree(spec->image_name);
3642 kfree(spec->snap_name);
3643 kfree(spec);
3644}
3645
cc344fa1 3646static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
c53d5893
AE
3647 struct rbd_spec *spec)
3648{
3649 struct rbd_device *rbd_dev;
3650
3651 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3652 if (!rbd_dev)
3653 return NULL;
3654
3655 spin_lock_init(&rbd_dev->lock);
6d292906 3656 rbd_dev->flags = 0;
a2acd00e 3657 atomic_set(&rbd_dev->parent_ref, 0);
c53d5893 3658 INIT_LIST_HEAD(&rbd_dev->node);
c53d5893
AE
3659 init_rwsem(&rbd_dev->header_rwsem);
3660
3661 rbd_dev->spec = spec;
3662 rbd_dev->rbd_client = rbdc;
3663
0903e875
AE
3664 /* Initialize the layout used for all rbd requests */
3665
3666 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3667 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3668 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3669 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3670
c53d5893
AE
3671 return rbd_dev;
3672}
3673
3674static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3675{
c53d5893
AE
3676 rbd_put_client(rbd_dev->rbd_client);
3677 rbd_spec_put(rbd_dev->spec);
3678 kfree(rbd_dev);
3679}
3680
9d475de5
AE
3681/*
3682 * Get the size and object order for an image snapshot, or if
3683 * snap_id is CEPH_NOSNAP, gets this information for the base
3684 * image.
3685 */
3686static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3687 u8 *order, u64 *snap_size)
3688{
3689 __le64 snapid = cpu_to_le64(snap_id);
3690 int ret;
3691 struct {
3692 u8 order;
3693 __le64 size;
3694 } __attribute__ ((packed)) size_buf = { 0 };
3695
36be9a76 3696 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
9d475de5 3697 "rbd", "get_size",
4157976b 3698 &snapid, sizeof (snapid),
e2a58ee5 3699 &size_buf, sizeof (size_buf));
36be9a76 3700 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
3701 if (ret < 0)
3702 return ret;
57385b51
AE
3703 if (ret < sizeof (size_buf))
3704 return -ERANGE;
9d475de5 3705
c86f86e9
AE
3706 if (order)
3707 *order = size_buf.order;
9d475de5
AE
3708 *snap_size = le64_to_cpu(size_buf.size);
3709
3710 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
57385b51
AE
3711 (unsigned long long)snap_id, (unsigned int)*order,
3712 (unsigned long long)*snap_size);
9d475de5
AE
3713
3714 return 0;
3715}
3716
3717static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3718{
3719 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3720 &rbd_dev->header.obj_order,
3721 &rbd_dev->header.image_size);
3722}
3723
1e130199
AE
3724static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3725{
3726 void *reply_buf;
3727 int ret;
3728 void *p;
3729
3730 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3731 if (!reply_buf)
3732 return -ENOMEM;
3733
36be9a76 3734 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 3735 "rbd", "get_object_prefix", NULL, 0,
e2a58ee5 3736 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
36be9a76 3737 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
3738 if (ret < 0)
3739 goto out;
3740
3741 p = reply_buf;
3742 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
57385b51
AE
3743 p + ret, NULL, GFP_NOIO);
3744 ret = 0;
1e130199
AE
3745
3746 if (IS_ERR(rbd_dev->header.object_prefix)) {
3747 ret = PTR_ERR(rbd_dev->header.object_prefix);
3748 rbd_dev->header.object_prefix = NULL;
3749 } else {
3750 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
3751 }
1e130199
AE
3752out:
3753 kfree(reply_buf);
3754
3755 return ret;
3756}
3757
b1b5402a
AE
3758static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3759 u64 *snap_features)
3760{
3761 __le64 snapid = cpu_to_le64(snap_id);
3762 struct {
3763 __le64 features;
3764 __le64 incompat;
4157976b 3765 } __attribute__ ((packed)) features_buf = { 0 };
d889140c 3766 u64 incompat;
b1b5402a
AE
3767 int ret;
3768
36be9a76 3769 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b1b5402a 3770 "rbd", "get_features",
4157976b 3771 &snapid, sizeof (snapid),
e2a58ee5 3772 &features_buf, sizeof (features_buf));
36be9a76 3773 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
3774 if (ret < 0)
3775 return ret;
57385b51
AE
3776 if (ret < sizeof (features_buf))
3777 return -ERANGE;
d889140c
AE
3778
3779 incompat = le64_to_cpu(features_buf.incompat);
5cbf6f12 3780 if (incompat & ~RBD_FEATURES_SUPPORTED)
b8f5c6ed 3781 return -ENXIO;
d889140c 3782
b1b5402a
AE
3783 *snap_features = le64_to_cpu(features_buf.features);
3784
3785 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
57385b51
AE
3786 (unsigned long long)snap_id,
3787 (unsigned long long)*snap_features,
3788 (unsigned long long)le64_to_cpu(features_buf.incompat));
b1b5402a
AE
3789
3790 return 0;
3791}
3792
3793static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3794{
3795 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3796 &rbd_dev->header.features);
3797}
3798
86b00e0d
AE
3799static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3800{
3801 struct rbd_spec *parent_spec;
3802 size_t size;
3803 void *reply_buf = NULL;
3804 __le64 snapid;
3805 void *p;
3806 void *end;
642a2537 3807 u64 pool_id;
86b00e0d
AE
3808 char *image_id;
3809 u64 overlap;
86b00e0d
AE
3810 int ret;
3811
3812 parent_spec = rbd_spec_alloc();
3813 if (!parent_spec)
3814 return -ENOMEM;
3815
3816 size = sizeof (__le64) + /* pool_id */
3817 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3818 sizeof (__le64) + /* snap_id */
3819 sizeof (__le64); /* overlap */
3820 reply_buf = kmalloc(size, GFP_KERNEL);
3821 if (!reply_buf) {
3822 ret = -ENOMEM;
3823 goto out_err;
3824 }
3825
3826 snapid = cpu_to_le64(CEPH_NOSNAP);
36be9a76 3827 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
86b00e0d 3828 "rbd", "get_parent",
4157976b 3829 &snapid, sizeof (snapid),
e2a58ee5 3830 reply_buf, size);
36be9a76 3831 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
86b00e0d
AE
3832 if (ret < 0)
3833 goto out_err;
3834
86b00e0d 3835 p = reply_buf;
57385b51
AE
3836 end = reply_buf + ret;
3837 ret = -ERANGE;
642a2537 3838 ceph_decode_64_safe(&p, end, pool_id, out_err);
392a9dad
AE
3839 if (pool_id == CEPH_NOPOOL) {
3840 /*
3841 * Either the parent never existed, or we have
3842 * record of it but the image got flattened so it no
3843 * longer has a parent. When the parent of a
3844 * layered image disappears we immediately set the
3845 * overlap to 0. The effect of this is that all new
3846 * requests will be treated as if the image had no
3847 * parent.
3848 */
3849 if (rbd_dev->parent_overlap) {
3850 rbd_dev->parent_overlap = 0;
3851 smp_mb();
3852 rbd_dev_parent_put(rbd_dev);
3853 pr_info("%s: clone image has been flattened\n",
3854 rbd_dev->disk->disk_name);
3855 }
3856
86b00e0d 3857 goto out; /* No parent? No problem. */
392a9dad 3858 }
86b00e0d 3859
0903e875
AE
3860 /* The ceph file layout needs to fit pool id in 32 bits */
3861
3862 ret = -EIO;
642a2537 3863 if (pool_id > (u64)U32_MAX) {
c0cd10db 3864 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
642a2537 3865 (unsigned long long)pool_id, U32_MAX);
57385b51 3866 goto out_err;
c0cd10db 3867 }
642a2537 3868 parent_spec->pool_id = pool_id;
0903e875 3869
979ed480 3870 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
3871 if (IS_ERR(image_id)) {
3872 ret = PTR_ERR(image_id);
3873 goto out_err;
3874 }
3875 parent_spec->image_id = image_id;
3876 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3877 ceph_decode_64_safe(&p, end, overlap, out_err);
3878
70cf49cf 3879 if (overlap) {
642a2537 3880 rbd_spec_put(rbd_dev->parent_spec);
70cf49cf
AE
3881 rbd_dev->parent_spec = parent_spec;
3882 parent_spec = NULL; /* rbd_dev now owns this */
3883 rbd_dev->parent_overlap = overlap;
3884 } else {
3885 rbd_warn(rbd_dev, "ignoring parent of clone with overlap 0\n");
3886 }
86b00e0d
AE
3887out:
3888 ret = 0;
3889out_err:
3890 kfree(reply_buf);
3891 rbd_spec_put(parent_spec);
3892
3893 return ret;
3894}
3895
cc070d59
AE
3896static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3897{
3898 struct {
3899 __le64 stripe_unit;
3900 __le64 stripe_count;
3901 } __attribute__ ((packed)) striping_info_buf = { 0 };
3902 size_t size = sizeof (striping_info_buf);
3903 void *p;
3904 u64 obj_size;
3905 u64 stripe_unit;
3906 u64 stripe_count;
3907 int ret;
3908
3909 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3910 "rbd", "get_stripe_unit_count", NULL, 0,
e2a58ee5 3911 (char *)&striping_info_buf, size);
cc070d59
AE
3912 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3913 if (ret < 0)
3914 return ret;
3915 if (ret < size)
3916 return -ERANGE;
3917
3918 /*
3919 * We don't actually support the "fancy striping" feature
3920 * (STRIPINGV2) yet, but if the striping sizes are the
3921 * defaults the behavior is the same as before. So find
3922 * out, and only fail if the image has non-default values.
3923 */
3924 ret = -EINVAL;
3925 obj_size = (u64)1 << rbd_dev->header.obj_order;
3926 p = &striping_info_buf;
3927 stripe_unit = ceph_decode_64(&p);
3928 if (stripe_unit != obj_size) {
3929 rbd_warn(rbd_dev, "unsupported stripe unit "
3930 "(got %llu want %llu)",
3931 stripe_unit, obj_size);
3932 return -EINVAL;
3933 }
3934 stripe_count = ceph_decode_64(&p);
3935 if (stripe_count != 1) {
3936 rbd_warn(rbd_dev, "unsupported stripe count "
3937 "(got %llu want 1)", stripe_count);
3938 return -EINVAL;
3939 }
500d0c0f
AE
3940 rbd_dev->header.stripe_unit = stripe_unit;
3941 rbd_dev->header.stripe_count = stripe_count;
cc070d59
AE
3942
3943 return 0;
3944}
3945
9e15b77d
AE
3946static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3947{
3948 size_t image_id_size;
3949 char *image_id;
3950 void *p;
3951 void *end;
3952 size_t size;
3953 void *reply_buf = NULL;
3954 size_t len = 0;
3955 char *image_name = NULL;
3956 int ret;
3957
3958 rbd_assert(!rbd_dev->spec->image_name);
3959
69e7a02f
AE
3960 len = strlen(rbd_dev->spec->image_id);
3961 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
3962 image_id = kmalloc(image_id_size, GFP_KERNEL);
3963 if (!image_id)
3964 return NULL;
3965
3966 p = image_id;
4157976b 3967 end = image_id + image_id_size;
57385b51 3968 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
9e15b77d
AE
3969
3970 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3971 reply_buf = kmalloc(size, GFP_KERNEL);
3972 if (!reply_buf)
3973 goto out;
3974
36be9a76 3975 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
9e15b77d
AE
3976 "rbd", "dir_get_name",
3977 image_id, image_id_size,
e2a58ee5 3978 reply_buf, size);
9e15b77d
AE
3979 if (ret < 0)
3980 goto out;
3981 p = reply_buf;
f40eb349
AE
3982 end = reply_buf + ret;
3983
9e15b77d
AE
3984 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3985 if (IS_ERR(image_name))
3986 image_name = NULL;
3987 else
3988 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3989out:
3990 kfree(reply_buf);
3991 kfree(image_id);
3992
3993 return image_name;
3994}
3995
2ad3d716
AE
3996static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3997{
3998 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3999 const char *snap_name;
4000 u32 which = 0;
4001
4002 /* Skip over names until we find the one we are looking for */
4003
4004 snap_name = rbd_dev->header.snap_names;
4005 while (which < snapc->num_snaps) {
4006 if (!strcmp(name, snap_name))
4007 return snapc->snaps[which];
4008 snap_name += strlen(snap_name) + 1;
4009 which++;
4010 }
4011 return CEPH_NOSNAP;
4012}
4013
4014static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4015{
4016 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4017 u32 which;
4018 bool found = false;
4019 u64 snap_id;
4020
4021 for (which = 0; !found && which < snapc->num_snaps; which++) {
4022 const char *snap_name;
4023
4024 snap_id = snapc->snaps[which];
4025 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
4026 if (IS_ERR(snap_name))
4027 break;
4028 found = !strcmp(name, snap_name);
4029 kfree(snap_name);
4030 }
4031 return found ? snap_id : CEPH_NOSNAP;
4032}
4033
4034/*
4035 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
4036 * no snapshot by that name is found, or if an error occurs.
4037 */
4038static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4039{
4040 if (rbd_dev->image_format == 1)
4041 return rbd_v1_snap_id_by_name(rbd_dev, name);
4042
4043 return rbd_v2_snap_id_by_name(rbd_dev, name);
4044}
4045
9e15b77d 4046/*
2e9f7f1c
AE
4047 * When an rbd image has a parent image, it is identified by the
4048 * pool, image, and snapshot ids (not names). This function fills
4049 * in the names for those ids. (It's OK if we can't figure out the
4050 * name for an image id, but the pool and snapshot ids should always
4051 * exist and have names.) All names in an rbd spec are dynamically
4052 * allocated.
e1d4213f
AE
4053 *
4054 * When an image being mapped (not a parent) is probed, we have the
4055 * pool name and pool id, image name and image id, and the snapshot
4056 * name. The only thing we're missing is the snapshot id.
9e15b77d 4057 */
2e9f7f1c 4058static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
9e15b77d 4059{
2e9f7f1c
AE
4060 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4061 struct rbd_spec *spec = rbd_dev->spec;
4062 const char *pool_name;
4063 const char *image_name;
4064 const char *snap_name;
9e15b77d
AE
4065 int ret;
4066
e1d4213f
AE
4067 /*
4068 * An image being mapped will have the pool name (etc.), but
4069 * we need to look up the snapshot id.
4070 */
2e9f7f1c
AE
4071 if (spec->pool_name) {
4072 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
2ad3d716 4073 u64 snap_id;
e1d4213f 4074
2ad3d716
AE
4075 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
4076 if (snap_id == CEPH_NOSNAP)
e1d4213f 4077 return -ENOENT;
2ad3d716 4078 spec->snap_id = snap_id;
e1d4213f 4079 } else {
2e9f7f1c 4080 spec->snap_id = CEPH_NOSNAP;
e1d4213f
AE
4081 }
4082
4083 return 0;
4084 }
9e15b77d 4085
2e9f7f1c 4086 /* Get the pool name; we have to make our own copy of this */
9e15b77d 4087
2e9f7f1c
AE
4088 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
4089 if (!pool_name) {
4090 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
935dc89f
AE
4091 return -EIO;
4092 }
2e9f7f1c
AE
4093 pool_name = kstrdup(pool_name, GFP_KERNEL);
4094 if (!pool_name)
9e15b77d
AE
4095 return -ENOMEM;
4096
4097 /* Fetch the image name; tolerate failure here */
4098
2e9f7f1c
AE
4099 image_name = rbd_dev_image_name(rbd_dev);
4100 if (!image_name)
06ecc6cb 4101 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d 4102
2e9f7f1c 4103 /* Look up the snapshot name, and make a copy */
9e15b77d 4104
2e9f7f1c 4105 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
2e9f7f1c
AE
4106 if (!snap_name) {
4107 ret = -ENOMEM;
9e15b77d 4108 goto out_err;
2e9f7f1c
AE
4109 }
4110
4111 spec->pool_name = pool_name;
4112 spec->image_name = image_name;
4113 spec->snap_name = snap_name;
9e15b77d
AE
4114
4115 return 0;
4116out_err:
2e9f7f1c
AE
4117 kfree(image_name);
4118 kfree(pool_name);
9e15b77d
AE
4119
4120 return ret;
4121}
4122
cc4a38bd 4123static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
35d489f9
AE
4124{
4125 size_t size;
4126 int ret;
4127 void *reply_buf;
4128 void *p;
4129 void *end;
4130 u64 seq;
4131 u32 snap_count;
4132 struct ceph_snap_context *snapc;
4133 u32 i;
4134
4135 /*
4136 * We'll need room for the seq value (maximum snapshot id),
4137 * snapshot count, and array of that many snapshot ids.
4138 * For now we have a fixed upper limit on the number we're
4139 * prepared to receive.
4140 */
4141 size = sizeof (__le64) + sizeof (__le32) +
4142 RBD_MAX_SNAP_COUNT * sizeof (__le64);
4143 reply_buf = kzalloc(size, GFP_KERNEL);
4144 if (!reply_buf)
4145 return -ENOMEM;
4146
36be9a76 4147 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 4148 "rbd", "get_snapcontext", NULL, 0,
e2a58ee5 4149 reply_buf, size);
36be9a76 4150 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
4151 if (ret < 0)
4152 goto out;
4153
35d489f9 4154 p = reply_buf;
57385b51
AE
4155 end = reply_buf + ret;
4156 ret = -ERANGE;
35d489f9
AE
4157 ceph_decode_64_safe(&p, end, seq, out);
4158 ceph_decode_32_safe(&p, end, snap_count, out);
4159
4160 /*
4161 * Make sure the reported number of snapshot ids wouldn't go
4162 * beyond the end of our buffer. But before checking that,
4163 * make sure the computed size of the snapshot context we
4164 * allocate is representable in a size_t.
4165 */
4166 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
4167 / sizeof (u64)) {
4168 ret = -EINVAL;
4169 goto out;
4170 }
4171 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
4172 goto out;
468521c1 4173 ret = 0;
35d489f9 4174
812164f8 4175 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
35d489f9
AE
4176 if (!snapc) {
4177 ret = -ENOMEM;
4178 goto out;
4179 }
35d489f9 4180 snapc->seq = seq;
35d489f9
AE
4181 for (i = 0; i < snap_count; i++)
4182 snapc->snaps[i] = ceph_decode_64(&p);
4183
49ece554 4184 ceph_put_snap_context(rbd_dev->header.snapc);
35d489f9
AE
4185 rbd_dev->header.snapc = snapc;
4186
4187 dout(" snap context seq = %llu, snap_count = %u\n",
57385b51 4188 (unsigned long long)seq, (unsigned int)snap_count);
35d489f9
AE
4189out:
4190 kfree(reply_buf);
4191
57385b51 4192 return ret;
35d489f9
AE
4193}
4194
54cac61f
AE
4195static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4196 u64 snap_id)
b8b1e2db
AE
4197{
4198 size_t size;
4199 void *reply_buf;
54cac61f 4200 __le64 snapid;
b8b1e2db
AE
4201 int ret;
4202 void *p;
4203 void *end;
b8b1e2db
AE
4204 char *snap_name;
4205
4206 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4207 reply_buf = kmalloc(size, GFP_KERNEL);
4208 if (!reply_buf)
4209 return ERR_PTR(-ENOMEM);
4210
54cac61f 4211 snapid = cpu_to_le64(snap_id);
36be9a76 4212 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b8b1e2db 4213 "rbd", "get_snapshot_name",
54cac61f 4214 &snapid, sizeof (snapid),
e2a58ee5 4215 reply_buf, size);
36be9a76 4216 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
f40eb349
AE
4217 if (ret < 0) {
4218 snap_name = ERR_PTR(ret);
b8b1e2db 4219 goto out;
f40eb349 4220 }
b8b1e2db
AE
4221
4222 p = reply_buf;
f40eb349 4223 end = reply_buf + ret;
e5c35534 4224 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
f40eb349 4225 if (IS_ERR(snap_name))
b8b1e2db 4226 goto out;
b8b1e2db 4227
f40eb349 4228 dout(" snap_id 0x%016llx snap_name = %s\n",
54cac61f 4229 (unsigned long long)snap_id, snap_name);
b8b1e2db
AE
4230out:
4231 kfree(reply_buf);
4232
f40eb349 4233 return snap_name;
b8b1e2db
AE
4234}
4235
2df3fac7 4236static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
117973fb 4237{
2df3fac7 4238 bool first_time = rbd_dev->header.object_prefix == NULL;
117973fb 4239 int ret;
117973fb
AE
4240
4241 down_write(&rbd_dev->header_rwsem);
4242
2df3fac7
AE
4243 if (first_time) {
4244 ret = rbd_dev_v2_header_onetime(rbd_dev);
4245 if (ret)
4246 goto out;
4247 }
4248
642a2537
AE
4249 /*
4250 * If the image supports layering, get the parent info. We
4251 * need to probe the first time regardless. Thereafter we
4252 * only need to if there's a parent, to see if it has
4253 * disappeared due to the mapped image getting flattened.
4254 */
4255 if (rbd_dev->header.features & RBD_FEATURE_LAYERING &&
4256 (first_time || rbd_dev->parent_spec)) {
4257 bool warn;
4258
4259 ret = rbd_dev_v2_parent_info(rbd_dev);
4260 if (ret)
4261 goto out;
4262
4263 /*
4264 * Print a warning if this is the initial probe and
4265 * the image has a parent. Don't print it if the
4266 * image now being probed is itself a parent. We
4267 * can tell at this point because we won't know its
4268 * pool name yet (just its pool id).
4269 */
4270 warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name;
4271 if (first_time && warn)
4272 rbd_warn(rbd_dev, "WARNING: kernel layering "
4273 "is EXPERIMENTAL!");
4274 }
4275
117973fb
AE
4276 ret = rbd_dev_v2_image_size(rbd_dev);
4277 if (ret)
4278 goto out;
642a2537 4279
29334ba4
AE
4280 if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4281 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4282 rbd_dev->mapping.size = rbd_dev->header.image_size;
117973fb 4283
cc4a38bd 4284 ret = rbd_dev_v2_snap_context(rbd_dev);
117973fb 4285 dout("rbd_dev_v2_snap_context returned %d\n", ret);
117973fb
AE
4286out:
4287 up_write(&rbd_dev->header_rwsem);
4288
4289 return ret;
4290}
4291
dfc5606d
YS
4292static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4293{
dfc5606d 4294 struct device *dev;
cd789ab9 4295 int ret;
dfc5606d
YS
4296
4297 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
dfc5606d 4298
cd789ab9 4299 dev = &rbd_dev->dev;
dfc5606d
YS
4300 dev->bus = &rbd_bus_type;
4301 dev->type = &rbd_device_type;
4302 dev->parent = &rbd_root_dev;
200a6a8b 4303 dev->release = rbd_dev_device_release;
de71a297 4304 dev_set_name(dev, "%d", rbd_dev->dev_id);
dfc5606d 4305 ret = device_register(dev);
dfc5606d 4306
dfc5606d 4307 mutex_unlock(&ctl_mutex);
cd789ab9 4308
dfc5606d 4309 return ret;
602adf40
YS
4310}
4311
dfc5606d
YS
4312static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4313{
4314 device_unregister(&rbd_dev->dev);
4315}
4316
e2839308 4317static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
1ddbe94e
AE
4318
4319/*
499afd5b
AE
4320 * Get a unique rbd identifier for the given new rbd_dev, and add
4321 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 4322 */
e2839308 4323static void rbd_dev_id_get(struct rbd_device *rbd_dev)
b7f23c36 4324{
e2839308 4325 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
499afd5b
AE
4326
4327 spin_lock(&rbd_dev_list_lock);
4328 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4329 spin_unlock(&rbd_dev_list_lock);
e2839308
AE
4330 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4331 (unsigned long long) rbd_dev->dev_id);
1ddbe94e 4332}
b7f23c36 4333
1ddbe94e 4334/*
499afd5b
AE
4335 * Remove an rbd_dev from the global list, and record that its
4336 * identifier is no longer in use.
1ddbe94e 4337 */
e2839308 4338static void rbd_dev_id_put(struct rbd_device *rbd_dev)
1ddbe94e 4339{
d184f6bf 4340 struct list_head *tmp;
de71a297 4341 int rbd_id = rbd_dev->dev_id;
d184f6bf
AE
4342 int max_id;
4343
aafb230e 4344 rbd_assert(rbd_id > 0);
499afd5b 4345
e2839308
AE
4346 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4347 (unsigned long long) rbd_dev->dev_id);
499afd5b
AE
4348 spin_lock(&rbd_dev_list_lock);
4349 list_del_init(&rbd_dev->node);
d184f6bf
AE
4350
4351 /*
4352 * If the id being "put" is not the current maximum, there
4353 * is nothing special we need to do.
4354 */
e2839308 4355 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
d184f6bf
AE
4356 spin_unlock(&rbd_dev_list_lock);
4357 return;
4358 }
4359
4360 /*
4361 * We need to update the current maximum id. Search the
4362 * list to find out what it is. We're more likely to find
4363 * the maximum at the end, so search the list backward.
4364 */
4365 max_id = 0;
4366 list_for_each_prev(tmp, &rbd_dev_list) {
4367 struct rbd_device *rbd_dev;
4368
4369 rbd_dev = list_entry(tmp, struct rbd_device, node);
b213e0b1
AE
4370 if (rbd_dev->dev_id > max_id)
4371 max_id = rbd_dev->dev_id;
d184f6bf 4372 }
499afd5b 4373 spin_unlock(&rbd_dev_list_lock);
b7f23c36 4374
1ddbe94e 4375 /*
e2839308 4376 * The max id could have been updated by rbd_dev_id_get(), in
d184f6bf
AE
4377 * which case it now accurately reflects the new maximum.
4378 * Be careful not to overwrite the maximum value in that
4379 * case.
1ddbe94e 4380 */
e2839308
AE
4381 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4382 dout(" max dev id has been reset\n");
b7f23c36
AE
4383}
4384
e28fff26
AE
4385/*
4386 * Skips over white space at *buf, and updates *buf to point to the
4387 * first found non-space character (if any). Returns the length of
593a9e7b
AE
4388 * the token (string of non-white space characters) found. Note
4389 * that *buf must be terminated with '\0'.
e28fff26
AE
4390 */
4391static inline size_t next_token(const char **buf)
4392{
4393 /*
4394 * These are the characters that produce nonzero for
4395 * isspace() in the "C" and "POSIX" locales.
4396 */
4397 const char *spaces = " \f\n\r\t\v";
4398
4399 *buf += strspn(*buf, spaces); /* Find start of token */
4400
4401 return strcspn(*buf, spaces); /* Return token length */
4402}
4403
4404/*
4405 * Finds the next token in *buf, and if the provided token buffer is
4406 * big enough, copies the found token into it. The result, if
593a9e7b
AE
4407 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4408 * must be terminated with '\0' on entry.
e28fff26
AE
4409 *
4410 * Returns the length of the token found (not including the '\0').
4411 * Return value will be 0 if no token is found, and it will be >=
4412 * token_size if the token would not fit.
4413 *
593a9e7b 4414 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
4415 * found token. Note that this occurs even if the token buffer is
4416 * too small to hold it.
4417 */
4418static inline size_t copy_token(const char **buf,
4419 char *token,
4420 size_t token_size)
4421{
4422 size_t len;
4423
4424 len = next_token(buf);
4425 if (len < token_size) {
4426 memcpy(token, *buf, len);
4427 *(token + len) = '\0';
4428 }
4429 *buf += len;
4430
4431 return len;
4432}
4433
ea3352f4
AE
4434/*
4435 * Finds the next token in *buf, dynamically allocates a buffer big
4436 * enough to hold a copy of it, and copies the token into the new
4437 * buffer. The copy is guaranteed to be terminated with '\0'. Note
4438 * that a duplicate buffer is created even for a zero-length token.
4439 *
4440 * Returns a pointer to the newly-allocated duplicate, or a null
4441 * pointer if memory for the duplicate was not available. If
4442 * the lenp argument is a non-null pointer, the length of the token
4443 * (not including the '\0') is returned in *lenp.
4444 *
4445 * If successful, the *buf pointer will be updated to point beyond
4446 * the end of the found token.
4447 *
4448 * Note: uses GFP_KERNEL for allocation.
4449 */
4450static inline char *dup_token(const char **buf, size_t *lenp)
4451{
4452 char *dup;
4453 size_t len;
4454
4455 len = next_token(buf);
4caf35f9 4456 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
4457 if (!dup)
4458 return NULL;
ea3352f4
AE
4459 *(dup + len) = '\0';
4460 *buf += len;
4461
4462 if (lenp)
4463 *lenp = len;
4464
4465 return dup;
4466}
4467
a725f65e 4468/*
859c31df
AE
4469 * Parse the options provided for an "rbd add" (i.e., rbd image
4470 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4471 * and the data written is passed here via a NUL-terminated buffer.
4472 * Returns 0 if successful or an error code otherwise.
d22f76e7 4473 *
859c31df
AE
4474 * The information extracted from these options is recorded in
4475 * the other parameters which return dynamically-allocated
4476 * structures:
4477 * ceph_opts
4478 * The address of a pointer that will refer to a ceph options
4479 * structure. Caller must release the returned pointer using
4480 * ceph_destroy_options() when it is no longer needed.
4481 * rbd_opts
4482 * Address of an rbd options pointer. Fully initialized by
4483 * this function; caller must release with kfree().
4484 * spec
4485 * Address of an rbd image specification pointer. Fully
4486 * initialized by this function based on parsed options.
4487 * Caller must release with rbd_spec_put().
4488 *
4489 * The options passed take this form:
4490 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4491 * where:
4492 * <mon_addrs>
4493 * A comma-separated list of one or more monitor addresses.
4494 * A monitor address is an ip address, optionally followed
4495 * by a port number (separated by a colon).
4496 * I.e.: ip1[:port1][,ip2[:port2]...]
4497 * <options>
4498 * A comma-separated list of ceph and/or rbd options.
4499 * <pool_name>
4500 * The name of the rados pool containing the rbd image.
4501 * <image_name>
4502 * The name of the image in that pool to map.
4503 * <snap_id>
4504 * An optional snapshot id. If provided, the mapping will
4505 * present data from the image at the time that snapshot was
4506 * created. The image head is used if no snapshot id is
4507 * provided. Snapshot mappings are always read-only.
a725f65e 4508 */
859c31df 4509static int rbd_add_parse_args(const char *buf,
dc79b113 4510 struct ceph_options **ceph_opts,
859c31df
AE
4511 struct rbd_options **opts,
4512 struct rbd_spec **rbd_spec)
e28fff26 4513{
d22f76e7 4514 size_t len;
859c31df 4515 char *options;
0ddebc0c 4516 const char *mon_addrs;
ecb4dc22 4517 char *snap_name;
0ddebc0c 4518 size_t mon_addrs_size;
859c31df 4519 struct rbd_spec *spec = NULL;
4e9afeba 4520 struct rbd_options *rbd_opts = NULL;
859c31df 4521 struct ceph_options *copts;
dc79b113 4522 int ret;
e28fff26
AE
4523
4524 /* The first four tokens are required */
4525
7ef3214a 4526 len = next_token(&buf);
4fb5d671
AE
4527 if (!len) {
4528 rbd_warn(NULL, "no monitor address(es) provided");
4529 return -EINVAL;
4530 }
0ddebc0c 4531 mon_addrs = buf;
f28e565a 4532 mon_addrs_size = len + 1;
7ef3214a 4533 buf += len;
a725f65e 4534
dc79b113 4535 ret = -EINVAL;
f28e565a
AE
4536 options = dup_token(&buf, NULL);
4537 if (!options)
dc79b113 4538 return -ENOMEM;
4fb5d671
AE
4539 if (!*options) {
4540 rbd_warn(NULL, "no options provided");
4541 goto out_err;
4542 }
e28fff26 4543
859c31df
AE
4544 spec = rbd_spec_alloc();
4545 if (!spec)
f28e565a 4546 goto out_mem;
859c31df
AE
4547
4548 spec->pool_name = dup_token(&buf, NULL);
4549 if (!spec->pool_name)
4550 goto out_mem;
4fb5d671
AE
4551 if (!*spec->pool_name) {
4552 rbd_warn(NULL, "no pool name provided");
4553 goto out_err;
4554 }
e28fff26 4555
69e7a02f 4556 spec->image_name = dup_token(&buf, NULL);
859c31df 4557 if (!spec->image_name)
f28e565a 4558 goto out_mem;
4fb5d671
AE
4559 if (!*spec->image_name) {
4560 rbd_warn(NULL, "no image name provided");
4561 goto out_err;
4562 }
d4b125e9 4563
f28e565a
AE
4564 /*
4565 * Snapshot name is optional; default is to use "-"
4566 * (indicating the head/no snapshot).
4567 */
3feeb894 4568 len = next_token(&buf);
820a5f3e 4569 if (!len) {
3feeb894
AE
4570 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4571 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 4572 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 4573 ret = -ENAMETOOLONG;
f28e565a 4574 goto out_err;
849b4260 4575 }
ecb4dc22
AE
4576 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4577 if (!snap_name)
f28e565a 4578 goto out_mem;
ecb4dc22
AE
4579 *(snap_name + len) = '\0';
4580 spec->snap_name = snap_name;
e5c35534 4581
0ddebc0c 4582 /* Initialize all rbd options to the defaults */
e28fff26 4583
4e9afeba
AE
4584 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4585 if (!rbd_opts)
4586 goto out_mem;
4587
4588 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
d22f76e7 4589
859c31df 4590 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 4591 mon_addrs + mon_addrs_size - 1,
4e9afeba 4592 parse_rbd_opts_token, rbd_opts);
859c31df
AE
4593 if (IS_ERR(copts)) {
4594 ret = PTR_ERR(copts);
dc79b113
AE
4595 goto out_err;
4596 }
859c31df
AE
4597 kfree(options);
4598
4599 *ceph_opts = copts;
4e9afeba 4600 *opts = rbd_opts;
859c31df 4601 *rbd_spec = spec;
0ddebc0c 4602
dc79b113 4603 return 0;
f28e565a 4604out_mem:
dc79b113 4605 ret = -ENOMEM;
d22f76e7 4606out_err:
859c31df
AE
4607 kfree(rbd_opts);
4608 rbd_spec_put(spec);
f28e565a 4609 kfree(options);
d22f76e7 4610
dc79b113 4611 return ret;
a725f65e
AE
4612}
4613
589d30e0
AE
4614/*
4615 * An rbd format 2 image has a unique identifier, distinct from the
4616 * name given to it by the user. Internally, that identifier is
4617 * what's used to specify the names of objects related to the image.
4618 *
4619 * A special "rbd id" object is used to map an rbd image name to its
4620 * id. If that object doesn't exist, then there is no v2 rbd image
4621 * with the supplied name.
4622 *
4623 * This function will record the given rbd_dev's image_id field if
4624 * it can be determined, and in that case will return 0. If any
4625 * errors occur a negative errno will be returned and the rbd_dev's
4626 * image_id field will be unchanged (and should be NULL).
4627 */
4628static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4629{
4630 int ret;
4631 size_t size;
4632 char *object_name;
4633 void *response;
c0fba368 4634 char *image_id;
2f82ee54 4635
2c0d0a10
AE
4636 /*
4637 * When probing a parent image, the image id is already
4638 * known (and the image name likely is not). There's no
c0fba368
AE
4639 * need to fetch the image id again in this case. We
4640 * do still need to set the image format though.
2c0d0a10 4641 */
c0fba368
AE
4642 if (rbd_dev->spec->image_id) {
4643 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4644
2c0d0a10 4645 return 0;
c0fba368 4646 }
2c0d0a10 4647
589d30e0
AE
4648 /*
4649 * First, see if the format 2 image id file exists, and if
4650 * so, get the image's persistent id from it.
4651 */
69e7a02f 4652 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
589d30e0
AE
4653 object_name = kmalloc(size, GFP_NOIO);
4654 if (!object_name)
4655 return -ENOMEM;
0d7dbfce 4656 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
589d30e0
AE
4657 dout("rbd id object name is %s\n", object_name);
4658
4659 /* Response will be an encoded string, which includes a length */
4660
4661 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4662 response = kzalloc(size, GFP_NOIO);
4663 if (!response) {
4664 ret = -ENOMEM;
4665 goto out;
4666 }
4667
c0fba368
AE
4668 /* If it doesn't exist we'll assume it's a format 1 image */
4669
36be9a76 4670 ret = rbd_obj_method_sync(rbd_dev, object_name,
4157976b 4671 "rbd", "get_id", NULL, 0,
e2a58ee5 4672 response, RBD_IMAGE_ID_LEN_MAX);
36be9a76 4673 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
c0fba368
AE
4674 if (ret == -ENOENT) {
4675 image_id = kstrdup("", GFP_KERNEL);
4676 ret = image_id ? 0 : -ENOMEM;
4677 if (!ret)
4678 rbd_dev->image_format = 1;
4679 } else if (ret > sizeof (__le32)) {
4680 void *p = response;
4681
4682 image_id = ceph_extract_encoded_string(&p, p + ret,
979ed480 4683 NULL, GFP_NOIO);
c0fba368
AE
4684 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4685 if (!ret)
4686 rbd_dev->image_format = 2;
589d30e0 4687 } else {
c0fba368
AE
4688 ret = -EINVAL;
4689 }
4690
4691 if (!ret) {
4692 rbd_dev->spec->image_id = image_id;
4693 dout("image_id is %s\n", image_id);
589d30e0
AE
4694 }
4695out:
4696 kfree(response);
4697 kfree(object_name);
4698
4699 return ret;
4700}
4701
6fd48b3b
AE
4702/* Undo whatever state changes are made by v1 or v2 image probe */
4703
4704static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4705{
4706 struct rbd_image_header *header;
4707
392a9dad
AE
4708 /* Drop parent reference unless it's already been done (or none) */
4709
4710 if (rbd_dev->parent_overlap)
4711 rbd_dev_parent_put(rbd_dev);
6fd48b3b
AE
4712
4713 /* Free dynamic fields from the header, then zero it out */
4714
4715 header = &rbd_dev->header;
812164f8 4716 ceph_put_snap_context(header->snapc);
6fd48b3b
AE
4717 kfree(header->snap_sizes);
4718 kfree(header->snap_names);
4719 kfree(header->object_prefix);
4720 memset(header, 0, sizeof (*header));
4721}
4722
2df3fac7 4723static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
a30b71b9 4724{
9d475de5 4725 int ret;
a30b71b9 4726
1e130199 4727 ret = rbd_dev_v2_object_prefix(rbd_dev);
57385b51 4728 if (ret)
b1b5402a
AE
4729 goto out_err;
4730
2df3fac7
AE
4731 /*
4732 * Get the and check features for the image. Currently the
4733 * features are assumed to never change.
4734 */
b1b5402a 4735 ret = rbd_dev_v2_features(rbd_dev);
57385b51 4736 if (ret)
9d475de5 4737 goto out_err;
35d489f9 4738
cc070d59
AE
4739 /* If the image supports fancy striping, get its parameters */
4740
4741 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4742 ret = rbd_dev_v2_striping_info(rbd_dev);
4743 if (ret < 0)
4744 goto out_err;
4745 }
2df3fac7 4746 /* No support for crypto and compression type format 2 images */
6e14b1a6 4747
35152979 4748 return 0;
9d475de5 4749out_err:
642a2537 4750 rbd_dev->header.features = 0;
1e130199
AE
4751 kfree(rbd_dev->header.object_prefix);
4752 rbd_dev->header.object_prefix = NULL;
9d475de5
AE
4753
4754 return ret;
a30b71b9
AE
4755}
4756
124afba2 4757static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
83a06263 4758{
2f82ee54 4759 struct rbd_device *parent = NULL;
124afba2
AE
4760 struct rbd_spec *parent_spec;
4761 struct rbd_client *rbdc;
4762 int ret;
4763
4764 if (!rbd_dev->parent_spec)
4765 return 0;
4766 /*
4767 * We need to pass a reference to the client and the parent
4768 * spec when creating the parent rbd_dev. Images related by
4769 * parent/child relationships always share both.
4770 */
4771 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4772 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4773
4774 ret = -ENOMEM;
4775 parent = rbd_dev_create(rbdc, parent_spec);
4776 if (!parent)
4777 goto out_err;
4778
1f3ef788 4779 ret = rbd_dev_image_probe(parent, false);
124afba2
AE
4780 if (ret < 0)
4781 goto out_err;
4782 rbd_dev->parent = parent;
a2acd00e 4783 atomic_set(&rbd_dev->parent_ref, 1);
124afba2
AE
4784
4785 return 0;
4786out_err:
4787 if (parent) {
fb65d228 4788 rbd_dev_unparent(rbd_dev);
124afba2
AE
4789 kfree(rbd_dev->header_name);
4790 rbd_dev_destroy(parent);
4791 } else {
4792 rbd_put_client(rbdc);
4793 rbd_spec_put(parent_spec);
4794 }
4795
4796 return ret;
4797}
4798
200a6a8b 4799static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
124afba2 4800{
83a06263 4801 int ret;
d1cf5788 4802
83a06263
AE
4803 /* generate unique id: find highest unique id, add one */
4804 rbd_dev_id_get(rbd_dev);
4805
4806 /* Fill in the device name, now that we have its id. */
4807 BUILD_BUG_ON(DEV_NAME_LEN
4808 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4809 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4810
4811 /* Get our block major device number. */
4812
4813 ret = register_blkdev(0, rbd_dev->name);
4814 if (ret < 0)
4815 goto err_out_id;
4816 rbd_dev->major = ret;
4817
4818 /* Set up the blkdev mapping. */
4819
4820 ret = rbd_init_disk(rbd_dev);
4821 if (ret)
4822 goto err_out_blkdev;
4823
f35a4dee 4824 ret = rbd_dev_mapping_set(rbd_dev);
83a06263
AE
4825 if (ret)
4826 goto err_out_disk;
f35a4dee
AE
4827 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4828
4829 ret = rbd_bus_add_dev(rbd_dev);
4830 if (ret)
4831 goto err_out_mapping;
83a06263 4832
83a06263
AE
4833 /* Everything's ready. Announce the disk to the world. */
4834
129b79d4 4835 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
83a06263
AE
4836 add_disk(rbd_dev->disk);
4837
4838 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4839 (unsigned long long) rbd_dev->mapping.size);
4840
4841 return ret;
2f82ee54 4842
f35a4dee
AE
4843err_out_mapping:
4844 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
4845err_out_disk:
4846 rbd_free_disk(rbd_dev);
4847err_out_blkdev:
4848 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4849err_out_id:
4850 rbd_dev_id_put(rbd_dev);
d1cf5788 4851 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
4852
4853 return ret;
4854}
4855
332bb12d
AE
4856static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4857{
4858 struct rbd_spec *spec = rbd_dev->spec;
4859 size_t size;
4860
4861 /* Record the header object name for this rbd image. */
4862
4863 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4864
4865 if (rbd_dev->image_format == 1)
4866 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4867 else
4868 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4869
4870 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4871 if (!rbd_dev->header_name)
4872 return -ENOMEM;
4873
4874 if (rbd_dev->image_format == 1)
4875 sprintf(rbd_dev->header_name, "%s%s",
4876 spec->image_name, RBD_SUFFIX);
4877 else
4878 sprintf(rbd_dev->header_name, "%s%s",
4879 RBD_HEADER_PREFIX, spec->image_id);
4880 return 0;
4881}
4882
200a6a8b
AE
4883static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4884{
6fd48b3b 4885 rbd_dev_unprobe(rbd_dev);
200a6a8b 4886 kfree(rbd_dev->header_name);
6fd48b3b
AE
4887 rbd_dev->header_name = NULL;
4888 rbd_dev->image_format = 0;
4889 kfree(rbd_dev->spec->image_id);
4890 rbd_dev->spec->image_id = NULL;
4891
200a6a8b
AE
4892 rbd_dev_destroy(rbd_dev);
4893}
4894
a30b71b9
AE
4895/*
4896 * Probe for the existence of the header object for the given rbd
1f3ef788
AE
4897 * device. If this image is the one being mapped (i.e., not a
4898 * parent), initiate a watch on its header object before using that
4899 * object to get detailed information about the rbd image.
a30b71b9 4900 */
1f3ef788 4901static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
a30b71b9
AE
4902{
4903 int ret;
b644de2b 4904 int tmp;
a30b71b9
AE
4905
4906 /*
4907 * Get the id from the image id object. If it's not a
4908 * format 2 image, we'll get ENOENT back, and we'll assume
4909 * it's a format 1 image.
4910 */
4911 ret = rbd_dev_image_id(rbd_dev);
4912 if (ret)
c0fba368
AE
4913 return ret;
4914 rbd_assert(rbd_dev->spec->image_id);
4915 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4916
332bb12d
AE
4917 ret = rbd_dev_header_name(rbd_dev);
4918 if (ret)
4919 goto err_out_format;
4920
1f3ef788
AE
4921 if (mapping) {
4922 ret = rbd_dev_header_watch_sync(rbd_dev, true);
4923 if (ret)
4924 goto out_header_name;
4925 }
b644de2b 4926
c0fba368 4927 if (rbd_dev->image_format == 1)
99a41ebc 4928 ret = rbd_dev_v1_header_info(rbd_dev);
a30b71b9 4929 else
2df3fac7 4930 ret = rbd_dev_v2_header_info(rbd_dev);
5655c4d9 4931 if (ret)
b644de2b 4932 goto err_out_watch;
83a06263 4933
9bb81c9b
AE
4934 ret = rbd_dev_spec_update(rbd_dev);
4935 if (ret)
33dca39f 4936 goto err_out_probe;
9bb81c9b
AE
4937
4938 ret = rbd_dev_probe_parent(rbd_dev);
30d60ba2
AE
4939 if (ret)
4940 goto err_out_probe;
4941
4942 dout("discovered format %u image, header name is %s\n",
4943 rbd_dev->image_format, rbd_dev->header_name);
83a06263 4944
30d60ba2 4945 return 0;
6fd48b3b
AE
4946err_out_probe:
4947 rbd_dev_unprobe(rbd_dev);
b644de2b 4948err_out_watch:
1f3ef788
AE
4949 if (mapping) {
4950 tmp = rbd_dev_header_watch_sync(rbd_dev, false);
4951 if (tmp)
4952 rbd_warn(rbd_dev, "unable to tear down "
4953 "watch request (%d)\n", tmp);
4954 }
332bb12d
AE
4955out_header_name:
4956 kfree(rbd_dev->header_name);
4957 rbd_dev->header_name = NULL;
4958err_out_format:
4959 rbd_dev->image_format = 0;
5655c4d9
AE
4960 kfree(rbd_dev->spec->image_id);
4961 rbd_dev->spec->image_id = NULL;
4962
4963 dout("probe failed, returning %d\n", ret);
4964
a30b71b9
AE
4965 return ret;
4966}
4967
59c2be1e
YS
4968static ssize_t rbd_add(struct bus_type *bus,
4969 const char *buf,
4970 size_t count)
602adf40 4971{
cb8627c7 4972 struct rbd_device *rbd_dev = NULL;
dc79b113 4973 struct ceph_options *ceph_opts = NULL;
4e9afeba 4974 struct rbd_options *rbd_opts = NULL;
859c31df 4975 struct rbd_spec *spec = NULL;
9d3997fd 4976 struct rbd_client *rbdc;
27cc2594 4977 struct ceph_osd_client *osdc;
51344a38 4978 bool read_only;
27cc2594 4979 int rc = -ENOMEM;
602adf40
YS
4980
4981 if (!try_module_get(THIS_MODULE))
4982 return -ENODEV;
4983
602adf40 4984 /* parse add command */
859c31df 4985 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 4986 if (rc < 0)
bd4ba655 4987 goto err_out_module;
51344a38
AE
4988 read_only = rbd_opts->read_only;
4989 kfree(rbd_opts);
4990 rbd_opts = NULL; /* done with this */
78cea76e 4991
9d3997fd
AE
4992 rbdc = rbd_get_client(ceph_opts);
4993 if (IS_ERR(rbdc)) {
4994 rc = PTR_ERR(rbdc);
0ddebc0c 4995 goto err_out_args;
9d3997fd 4996 }
c53d5893 4997 ceph_opts = NULL; /* rbd_dev client now owns this */
602adf40 4998
602adf40 4999 /* pick the pool */
9d3997fd 5000 osdc = &rbdc->client->osdc;
859c31df 5001 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
602adf40
YS
5002 if (rc < 0)
5003 goto err_out_client;
c0cd10db 5004 spec->pool_id = (u64)rc;
859c31df 5005
0903e875
AE
5006 /* The ceph file layout needs to fit pool id in 32 bits */
5007
c0cd10db
AE
5008 if (spec->pool_id > (u64)U32_MAX) {
5009 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
5010 (unsigned long long)spec->pool_id, U32_MAX);
0903e875
AE
5011 rc = -EIO;
5012 goto err_out_client;
5013 }
5014
c53d5893 5015 rbd_dev = rbd_dev_create(rbdc, spec);
bd4ba655
AE
5016 if (!rbd_dev)
5017 goto err_out_client;
c53d5893
AE
5018 rbdc = NULL; /* rbd_dev now owns this */
5019 spec = NULL; /* rbd_dev now owns this */
602adf40 5020
1f3ef788 5021 rc = rbd_dev_image_probe(rbd_dev, true);
a30b71b9 5022 if (rc < 0)
c53d5893 5023 goto err_out_rbd_dev;
05fd6f6f 5024
7ce4eef7
AE
5025 /* If we are mapping a snapshot it must be marked read-only */
5026
5027 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
5028 read_only = true;
5029 rbd_dev->mapping.read_only = read_only;
5030
b536f69a
AE
5031 rc = rbd_dev_device_setup(rbd_dev);
5032 if (!rc)
5033 return count;
5034
5035 rbd_dev_image_release(rbd_dev);
c53d5893
AE
5036err_out_rbd_dev:
5037 rbd_dev_destroy(rbd_dev);
bd4ba655 5038err_out_client:
9d3997fd 5039 rbd_put_client(rbdc);
0ddebc0c 5040err_out_args:
78cea76e
AE
5041 if (ceph_opts)
5042 ceph_destroy_options(ceph_opts);
4e9afeba 5043 kfree(rbd_opts);
859c31df 5044 rbd_spec_put(spec);
bd4ba655
AE
5045err_out_module:
5046 module_put(THIS_MODULE);
27cc2594 5047
602adf40 5048 dout("Error adding device %s\n", buf);
27cc2594 5049
c0cd10db 5050 return (ssize_t)rc;
602adf40
YS
5051}
5052
de71a297 5053static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
602adf40
YS
5054{
5055 struct list_head *tmp;
5056 struct rbd_device *rbd_dev;
5057
e124a82f 5058 spin_lock(&rbd_dev_list_lock);
602adf40
YS
5059 list_for_each(tmp, &rbd_dev_list) {
5060 rbd_dev = list_entry(tmp, struct rbd_device, node);
de71a297 5061 if (rbd_dev->dev_id == dev_id) {
e124a82f 5062 spin_unlock(&rbd_dev_list_lock);
602adf40 5063 return rbd_dev;
e124a82f 5064 }
602adf40 5065 }
e124a82f 5066 spin_unlock(&rbd_dev_list_lock);
602adf40
YS
5067 return NULL;
5068}
5069
200a6a8b 5070static void rbd_dev_device_release(struct device *dev)
602adf40 5071{
593a9e7b 5072 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 5073
602adf40 5074 rbd_free_disk(rbd_dev);
200a6a8b 5075 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6d80b130 5076 rbd_dev_mapping_clear(rbd_dev);
602adf40 5077 unregister_blkdev(rbd_dev->major, rbd_dev->name);
200a6a8b 5078 rbd_dev->major = 0;
e2839308 5079 rbd_dev_id_put(rbd_dev);
d1cf5788 5080 rbd_dev_mapping_clear(rbd_dev);
602adf40
YS
5081}
5082
05a46afd
AE
5083static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5084{
ad945fc1 5085 while (rbd_dev->parent) {
05a46afd
AE
5086 struct rbd_device *first = rbd_dev;
5087 struct rbd_device *second = first->parent;
5088 struct rbd_device *third;
5089
5090 /*
5091 * Follow to the parent with no grandparent and
5092 * remove it.
5093 */
5094 while (second && (third = second->parent)) {
5095 first = second;
5096 second = third;
5097 }
ad945fc1 5098 rbd_assert(second);
8ad42cd0 5099 rbd_dev_image_release(second);
ad945fc1
AE
5100 first->parent = NULL;
5101 first->parent_overlap = 0;
5102
5103 rbd_assert(first->parent_spec);
05a46afd
AE
5104 rbd_spec_put(first->parent_spec);
5105 first->parent_spec = NULL;
05a46afd
AE
5106 }
5107}
5108
dfc5606d
YS
5109static ssize_t rbd_remove(struct bus_type *bus,
5110 const char *buf,
5111 size_t count)
602adf40
YS
5112{
5113 struct rbd_device *rbd_dev = NULL;
0d8189e1 5114 int target_id;
602adf40 5115 unsigned long ul;
0d8189e1 5116 int ret;
602adf40 5117
0d8189e1
AE
5118 ret = strict_strtoul(buf, 10, &ul);
5119 if (ret)
5120 return ret;
602adf40
YS
5121
5122 /* convert to int; abort if we lost anything in the conversion */
5123 target_id = (int) ul;
5124 if (target_id != ul)
5125 return -EINVAL;
5126
5127 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
5128
5129 rbd_dev = __rbd_get_dev(target_id);
5130 if (!rbd_dev) {
5131 ret = -ENOENT;
5132 goto done;
42382b70
AE
5133 }
5134
a14ea269 5135 spin_lock_irq(&rbd_dev->lock);
b82d167b 5136 if (rbd_dev->open_count)
42382b70 5137 ret = -EBUSY;
b82d167b
AE
5138 else
5139 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
a14ea269 5140 spin_unlock_irq(&rbd_dev->lock);
b82d167b 5141 if (ret < 0)
42382b70 5142 goto done;
b480815a 5143 rbd_bus_del_dev(rbd_dev);
1f3ef788
AE
5144 ret = rbd_dev_header_watch_sync(rbd_dev, false);
5145 if (ret)
5146 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
8ad42cd0 5147 rbd_dev_image_release(rbd_dev);
79ab7558 5148 module_put(THIS_MODULE);
1f3ef788 5149 ret = count;
602adf40
YS
5150done:
5151 mutex_unlock(&ctl_mutex);
aafb230e 5152
602adf40
YS
5153 return ret;
5154}
5155
602adf40
YS
5156/*
5157 * create control files in sysfs
dfc5606d 5158 * /sys/bus/rbd/...
602adf40
YS
5159 */
5160static int rbd_sysfs_init(void)
5161{
dfc5606d 5162 int ret;
602adf40 5163
fed4c143 5164 ret = device_register(&rbd_root_dev);
21079786 5165 if (ret < 0)
dfc5606d 5166 return ret;
602adf40 5167
fed4c143
AE
5168 ret = bus_register(&rbd_bus_type);
5169 if (ret < 0)
5170 device_unregister(&rbd_root_dev);
602adf40 5171
602adf40
YS
5172 return ret;
5173}
5174
5175static void rbd_sysfs_cleanup(void)
5176{
dfc5606d 5177 bus_unregister(&rbd_bus_type);
fed4c143 5178 device_unregister(&rbd_root_dev);
602adf40
YS
5179}
5180
1c2a9dfe
AE
5181static int rbd_slab_init(void)
5182{
5183 rbd_assert(!rbd_img_request_cache);
5184 rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5185 sizeof (struct rbd_img_request),
5186 __alignof__(struct rbd_img_request),
5187 0, NULL);
868311b1
AE
5188 if (!rbd_img_request_cache)
5189 return -ENOMEM;
5190
5191 rbd_assert(!rbd_obj_request_cache);
5192 rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5193 sizeof (struct rbd_obj_request),
5194 __alignof__(struct rbd_obj_request),
5195 0, NULL);
78c2a44a
AE
5196 if (!rbd_obj_request_cache)
5197 goto out_err;
5198
5199 rbd_assert(!rbd_segment_name_cache);
5200 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5201 MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
5202 if (rbd_segment_name_cache)
1c2a9dfe 5203 return 0;
78c2a44a
AE
5204out_err:
5205 if (rbd_obj_request_cache) {
5206 kmem_cache_destroy(rbd_obj_request_cache);
5207 rbd_obj_request_cache = NULL;
5208 }
1c2a9dfe 5209
868311b1
AE
5210 kmem_cache_destroy(rbd_img_request_cache);
5211 rbd_img_request_cache = NULL;
5212
1c2a9dfe
AE
5213 return -ENOMEM;
5214}
5215
5216static void rbd_slab_exit(void)
5217{
78c2a44a
AE
5218 rbd_assert(rbd_segment_name_cache);
5219 kmem_cache_destroy(rbd_segment_name_cache);
5220 rbd_segment_name_cache = NULL;
5221
868311b1
AE
5222 rbd_assert(rbd_obj_request_cache);
5223 kmem_cache_destroy(rbd_obj_request_cache);
5224 rbd_obj_request_cache = NULL;
5225
1c2a9dfe
AE
5226 rbd_assert(rbd_img_request_cache);
5227 kmem_cache_destroy(rbd_img_request_cache);
5228 rbd_img_request_cache = NULL;
5229}
5230
cc344fa1 5231static int __init rbd_init(void)
602adf40
YS
5232{
5233 int rc;
5234
1e32d34c
AE
5235 if (!libceph_compatible(NULL)) {
5236 rbd_warn(NULL, "libceph incompatibility (quitting)");
5237
5238 return -EINVAL;
5239 }
1c2a9dfe 5240 rc = rbd_slab_init();
602adf40
YS
5241 if (rc)
5242 return rc;
1c2a9dfe
AE
5243 rc = rbd_sysfs_init();
5244 if (rc)
5245 rbd_slab_exit();
5246 else
5247 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5248
5249 return rc;
602adf40
YS
5250}
5251
cc344fa1 5252static void __exit rbd_exit(void)
602adf40
YS
5253{
5254 rbd_sysfs_cleanup();
1c2a9dfe 5255 rbd_slab_exit();
602adf40
YS
5256}
5257
5258module_init(rbd_init);
5259module_exit(rbd_exit);
5260
5261MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5262MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5263MODULE_DESCRIPTION("rados block device");
5264
5265/* following authorship retained from original osdblk.c */
5266MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5267
5268MODULE_LICENSE("GPL");
This page took 0.497912 seconds and 5 git commands to generate.