ceph fscache: Uncaching no data page from fscache in readpage()
[deliverable/linux.git] / drivers / block / rbd.c
CommitLineData
e2a58ee5 1
602adf40
YS
2/*
3 rbd.c -- Export ceph rados objects as a Linux block device
4
5
6 based on drivers/block/osdblk.c:
7
8 Copyright 2009 Red Hat, Inc.
9
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
dfc5606d 25 For usage instructions, please refer to:
602adf40 26
dfc5606d 27 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
28
29 */
30
31#include <linux/ceph/libceph.h>
32#include <linux/ceph/osd_client.h>
33#include <linux/ceph/mon_client.h>
34#include <linux/ceph/decode.h>
59c2be1e 35#include <linux/parser.h>
30d1cff8 36#include <linux/bsearch.h>
602adf40
YS
37
38#include <linux/kernel.h>
39#include <linux/device.h>
40#include <linux/module.h>
41#include <linux/fs.h>
42#include <linux/blkdev.h>
1c2a9dfe 43#include <linux/slab.h>
f8a22fc2 44#include <linux/idr.h>
602adf40
YS
45
46#include "rbd_types.h"
47
aafb230e
AE
48#define RBD_DEBUG /* Activate rbd_assert() calls */
49
593a9e7b
AE
50/*
51 * The basic unit of block I/O is a sector. It is interpreted in a
52 * number of contexts in Linux (blk, bio, genhd), but the default is
53 * universally 512 bytes. These symbols are just slightly more
54 * meaningful than the bare numbers they represent.
55 */
56#define SECTOR_SHIFT 9
57#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
58
a2acd00e
AE
59/*
60 * Increment the given counter and return its updated value.
61 * If the counter is already 0 it will not be incremented.
62 * If the counter is already at its maximum value returns
63 * -EINVAL without updating it.
64 */
65static int atomic_inc_return_safe(atomic_t *v)
66{
67 unsigned int counter;
68
69 counter = (unsigned int)__atomic_add_unless(v, 1, 0);
70 if (counter <= (unsigned int)INT_MAX)
71 return (int)counter;
72
73 atomic_dec(v);
74
75 return -EINVAL;
76}
77
78/* Decrement the counter. Return the resulting value, or -EINVAL */
79static int atomic_dec_return_safe(atomic_t *v)
80{
81 int counter;
82
83 counter = atomic_dec_return(v);
84 if (counter >= 0)
85 return counter;
86
87 atomic_inc(v);
88
89 return -EINVAL;
90}
91
f0f8cef5 92#define RBD_DRV_NAME "rbd"
602adf40 93
9b60e70b 94#define RBD_PART_SHIFT 8
602adf40 95
d4b125e9
AE
96#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
97#define RBD_MAX_SNAP_NAME_LEN \
98 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
99
35d489f9 100#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
101
102#define RBD_SNAP_HEAD_NAME "-"
103
9682fc6d
AE
104#define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
105
9e15b77d
AE
106/* This allows a single page to hold an image name sent by OSD */
107#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 108#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 109
1e130199 110#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 111
d889140c
AE
112/* Feature bits */
113
5cbf6f12
AE
114#define RBD_FEATURE_LAYERING (1<<0)
115#define RBD_FEATURE_STRIPINGV2 (1<<1)
116#define RBD_FEATURES_ALL \
117 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
d889140c
AE
118
119/* Features supported by this (client software) implementation. */
120
770eba6e 121#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
d889140c 122
81a89793
AE
123/*
124 * An RBD device name will be "rbd#", where the "rbd" comes from
125 * RBD_DRV_NAME above, and # is a unique integer identifier.
126 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
127 * enough to hold all possible device names.
128 */
602adf40 129#define DEV_NAME_LEN 32
81a89793 130#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40
YS
131
132/*
133 * block device image metadata (in-memory version)
134 */
135struct rbd_image_header {
f35a4dee 136 /* These six fields never change for a given rbd image */
849b4260 137 char *object_prefix;
602adf40
YS
138 __u8 obj_order;
139 __u8 crypt_type;
140 __u8 comp_type;
f35a4dee
AE
141 u64 stripe_unit;
142 u64 stripe_count;
143 u64 features; /* Might be changeable someday? */
602adf40 144
f84344f3
AE
145 /* The remaining fields need to be updated occasionally */
146 u64 image_size;
147 struct ceph_snap_context *snapc;
f35a4dee
AE
148 char *snap_names; /* format 1 only */
149 u64 *snap_sizes; /* format 1 only */
59c2be1e
YS
150};
151
0d7dbfce
AE
152/*
153 * An rbd image specification.
154 *
155 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
156 * identify an image. Each rbd_dev structure includes a pointer to
157 * an rbd_spec structure that encapsulates this identity.
158 *
159 * Each of the id's in an rbd_spec has an associated name. For a
160 * user-mapped image, the names are supplied and the id's associated
161 * with them are looked up. For a layered image, a parent image is
162 * defined by the tuple, and the names are looked up.
163 *
164 * An rbd_dev structure contains a parent_spec pointer which is
165 * non-null if the image it represents is a child in a layered
166 * image. This pointer will refer to the rbd_spec structure used
167 * by the parent rbd_dev for its own identity (i.e., the structure
168 * is shared between the parent and child).
169 *
170 * Since these structures are populated once, during the discovery
171 * phase of image construction, they are effectively immutable so
172 * we make no effort to synchronize access to them.
173 *
174 * Note that code herein does not assume the image name is known (it
175 * could be a null pointer).
0d7dbfce
AE
176 */
177struct rbd_spec {
178 u64 pool_id;
ecb4dc22 179 const char *pool_name;
0d7dbfce 180
ecb4dc22
AE
181 const char *image_id;
182 const char *image_name;
0d7dbfce
AE
183
184 u64 snap_id;
ecb4dc22 185 const char *snap_name;
0d7dbfce
AE
186
187 struct kref kref;
188};
189
602adf40 190/*
f0f8cef5 191 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
192 */
193struct rbd_client {
194 struct ceph_client *client;
195 struct kref kref;
196 struct list_head node;
197};
198
bf0d5f50
AE
199struct rbd_img_request;
200typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
201
202#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
203
204struct rbd_obj_request;
205typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
206
9969ebc5
AE
207enum obj_request_type {
208 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
209};
bf0d5f50 210
926f9b3f
AE
211enum obj_req_flags {
212 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
6365d33a 213 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
5679c59f
AE
214 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
215 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
926f9b3f
AE
216};
217
bf0d5f50
AE
218struct rbd_obj_request {
219 const char *object_name;
220 u64 offset; /* object start byte */
221 u64 length; /* bytes from offset */
926f9b3f 222 unsigned long flags;
bf0d5f50 223
c5b5ef6c
AE
224 /*
225 * An object request associated with an image will have its
226 * img_data flag set; a standalone object request will not.
227 *
228 * A standalone object request will have which == BAD_WHICH
229 * and a null obj_request pointer.
230 *
231 * An object request initiated in support of a layered image
232 * object (to check for its existence before a write) will
233 * have which == BAD_WHICH and a non-null obj_request pointer.
234 *
235 * Finally, an object request for rbd image data will have
236 * which != BAD_WHICH, and will have a non-null img_request
237 * pointer. The value of which will be in the range
238 * 0..(img_request->obj_request_count-1).
239 */
240 union {
241 struct rbd_obj_request *obj_request; /* STAT op */
242 struct {
243 struct rbd_img_request *img_request;
244 u64 img_offset;
245 /* links for img_request->obj_requests list */
246 struct list_head links;
247 };
248 };
bf0d5f50
AE
249 u32 which; /* posn image request list */
250
251 enum obj_request_type type;
788e2df3
AE
252 union {
253 struct bio *bio_list;
254 struct {
255 struct page **pages;
256 u32 page_count;
257 };
258 };
0eefd470 259 struct page **copyup_pages;
ebda6408 260 u32 copyup_page_count;
bf0d5f50
AE
261
262 struct ceph_osd_request *osd_req;
263
264 u64 xferred; /* bytes transferred */
1b83bef2 265 int result;
bf0d5f50
AE
266
267 rbd_obj_callback_t callback;
788e2df3 268 struct completion completion;
bf0d5f50
AE
269
270 struct kref kref;
271};
272
0c425248 273enum img_req_flags {
9849e986
AE
274 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
275 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
d0b2e944 276 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
0c425248
AE
277};
278
bf0d5f50 279struct rbd_img_request {
bf0d5f50
AE
280 struct rbd_device *rbd_dev;
281 u64 offset; /* starting image byte offset */
282 u64 length; /* byte count from offset */
0c425248 283 unsigned long flags;
bf0d5f50 284 union {
9849e986 285 u64 snap_id; /* for reads */
bf0d5f50 286 struct ceph_snap_context *snapc; /* for writes */
9849e986
AE
287 };
288 union {
289 struct request *rq; /* block request */
290 struct rbd_obj_request *obj_request; /* obj req initiator */
bf0d5f50 291 };
3d7efd18 292 struct page **copyup_pages;
ebda6408 293 u32 copyup_page_count;
bf0d5f50
AE
294 spinlock_t completion_lock;/* protects next_completion */
295 u32 next_completion;
296 rbd_img_callback_t callback;
55f27e09 297 u64 xferred;/* aggregate bytes transferred */
a5a337d4 298 int result; /* first nonzero obj_request result */
bf0d5f50
AE
299
300 u32 obj_request_count;
301 struct list_head obj_requests; /* rbd_obj_request structs */
302
303 struct kref kref;
304};
305
306#define for_each_obj_request(ireq, oreq) \
ef06f4d3 307 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
bf0d5f50 308#define for_each_obj_request_from(ireq, oreq) \
ef06f4d3 309 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
bf0d5f50 310#define for_each_obj_request_safe(ireq, oreq, n) \
ef06f4d3 311 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
bf0d5f50 312
f84344f3 313struct rbd_mapping {
99c1f08f 314 u64 size;
34b13184 315 u64 features;
f84344f3
AE
316 bool read_only;
317};
318
602adf40
YS
319/*
320 * a single device
321 */
322struct rbd_device {
de71a297 323 int dev_id; /* blkdev unique id */
602adf40
YS
324
325 int major; /* blkdev assigned major */
dd82fff1 326 int minor;
602adf40 327 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 328
a30b71b9 329 u32 image_format; /* Either 1 or 2 */
602adf40
YS
330 struct rbd_client *rbd_client;
331
332 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
333
b82d167b 334 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
335
336 struct rbd_image_header header;
b82d167b 337 unsigned long flags; /* possibly lock protected */
0d7dbfce 338 struct rbd_spec *spec;
602adf40 339
0d7dbfce 340 char *header_name;
971f839a 341
0903e875
AE
342 struct ceph_file_layout layout;
343
59c2be1e 344 struct ceph_osd_event *watch_event;
975241af 345 struct rbd_obj_request *watch_request;
59c2be1e 346
86b00e0d
AE
347 struct rbd_spec *parent_spec;
348 u64 parent_overlap;
a2acd00e 349 atomic_t parent_ref;
2f82ee54 350 struct rbd_device *parent;
86b00e0d 351
c666601a
JD
352 /* protects updating the header */
353 struct rw_semaphore header_rwsem;
f84344f3
AE
354
355 struct rbd_mapping mapping;
602adf40
YS
356
357 struct list_head node;
dfc5606d 358
dfc5606d
YS
359 /* sysfs related */
360 struct device dev;
b82d167b 361 unsigned long open_count; /* protected by lock */
dfc5606d
YS
362};
363
b82d167b
AE
364/*
365 * Flag bits for rbd_dev->flags. If atomicity is required,
366 * rbd_dev->lock is used to protect access.
367 *
368 * Currently, only the "removing" flag (which is coupled with the
369 * "open_count" field) requires atomic access.
370 */
6d292906
AE
371enum rbd_dev_flags {
372 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 373 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
6d292906
AE
374};
375
cfbf6377 376static DEFINE_MUTEX(client_mutex); /* Serialize client creation */
e124a82f 377
602adf40 378static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
379static DEFINE_SPINLOCK(rbd_dev_list_lock);
380
432b8587
AE
381static LIST_HEAD(rbd_client_list); /* clients */
382static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 383
78c2a44a
AE
384/* Slab caches for frequently-allocated structures */
385
1c2a9dfe 386static struct kmem_cache *rbd_img_request_cache;
868311b1 387static struct kmem_cache *rbd_obj_request_cache;
78c2a44a 388static struct kmem_cache *rbd_segment_name_cache;
1c2a9dfe 389
9b60e70b 390static int rbd_major;
f8a22fc2
ID
391static DEFINE_IDA(rbd_dev_id_ida);
392
9b60e70b
ID
393/*
394 * Default to false for now, as single-major requires >= 0.75 version of
395 * userspace rbd utility.
396 */
397static bool single_major = false;
398module_param(single_major, bool, S_IRUGO);
399MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)");
400
3d7efd18
AE
401static int rbd_img_request_submit(struct rbd_img_request *img_request);
402
200a6a8b 403static void rbd_dev_device_release(struct device *dev);
dfc5606d 404
f0f8cef5
AE
405static ssize_t rbd_add(struct bus_type *bus, const char *buf,
406 size_t count);
407static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
408 size_t count);
9b60e70b
ID
409static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
410 size_t count);
411static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
412 size_t count);
1f3ef788 413static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
a2acd00e 414static void rbd_spec_put(struct rbd_spec *spec);
f0f8cef5 415
9b60e70b
ID
416static int rbd_dev_id_to_minor(int dev_id)
417{
418 return dev_id << RBD_PART_SHIFT;
419}
420
421static int minor_to_rbd_dev_id(int minor)
422{
423 return minor >> RBD_PART_SHIFT;
424}
425
b15a21dd
GKH
426static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
427static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
9b60e70b
ID
428static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
429static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
b15a21dd
GKH
430
431static struct attribute *rbd_bus_attrs[] = {
432 &bus_attr_add.attr,
433 &bus_attr_remove.attr,
9b60e70b
ID
434 &bus_attr_add_single_major.attr,
435 &bus_attr_remove_single_major.attr,
b15a21dd 436 NULL,
f0f8cef5 437};
92c76dc0
ID
438
439static umode_t rbd_bus_is_visible(struct kobject *kobj,
440 struct attribute *attr, int index)
441{
9b60e70b
ID
442 if (!single_major &&
443 (attr == &bus_attr_add_single_major.attr ||
444 attr == &bus_attr_remove_single_major.attr))
445 return 0;
446
92c76dc0
ID
447 return attr->mode;
448}
449
450static const struct attribute_group rbd_bus_group = {
451 .attrs = rbd_bus_attrs,
452 .is_visible = rbd_bus_is_visible,
453};
454__ATTRIBUTE_GROUPS(rbd_bus);
f0f8cef5
AE
455
456static struct bus_type rbd_bus_type = {
457 .name = "rbd",
b15a21dd 458 .bus_groups = rbd_bus_groups,
f0f8cef5
AE
459};
460
461static void rbd_root_dev_release(struct device *dev)
462{
463}
464
465static struct device rbd_root_dev = {
466 .init_name = "rbd",
467 .release = rbd_root_dev_release,
468};
469
06ecc6cb
AE
470static __printf(2, 3)
471void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
472{
473 struct va_format vaf;
474 va_list args;
475
476 va_start(args, fmt);
477 vaf.fmt = fmt;
478 vaf.va = &args;
479
480 if (!rbd_dev)
481 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
482 else if (rbd_dev->disk)
483 printk(KERN_WARNING "%s: %s: %pV\n",
484 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
485 else if (rbd_dev->spec && rbd_dev->spec->image_name)
486 printk(KERN_WARNING "%s: image %s: %pV\n",
487 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
488 else if (rbd_dev->spec && rbd_dev->spec->image_id)
489 printk(KERN_WARNING "%s: id %s: %pV\n",
490 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
491 else /* punt */
492 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
493 RBD_DRV_NAME, rbd_dev, &vaf);
494 va_end(args);
495}
496
aafb230e
AE
497#ifdef RBD_DEBUG
498#define rbd_assert(expr) \
499 if (unlikely(!(expr))) { \
500 printk(KERN_ERR "\nAssertion failure in %s() " \
501 "at line %d:\n\n" \
502 "\trbd_assert(%s);\n\n", \
503 __func__, __LINE__, #expr); \
504 BUG(); \
505 }
506#else /* !RBD_DEBUG */
507# define rbd_assert(expr) ((void) 0)
508#endif /* !RBD_DEBUG */
dfc5606d 509
b454e36d 510static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
05a46afd
AE
511static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
512static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
8b3e1a56 513
cc4a38bd 514static int rbd_dev_refresh(struct rbd_device *rbd_dev);
2df3fac7
AE
515static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
516static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
54cac61f
AE
517static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
518 u64 snap_id);
2ad3d716
AE
519static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
520 u8 *order, u64 *snap_size);
521static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
522 u64 *snap_features);
523static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
59c2be1e 524
602adf40
YS
525static int rbd_open(struct block_device *bdev, fmode_t mode)
526{
f0f8cef5 527 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 528 bool removing = false;
602adf40 529
f84344f3 530 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
602adf40
YS
531 return -EROFS;
532
a14ea269 533 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
534 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
535 removing = true;
536 else
537 rbd_dev->open_count++;
a14ea269 538 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
539 if (removing)
540 return -ENOENT;
541
c3e946ce 542 (void) get_device(&rbd_dev->dev);
f84344f3 543 set_device_ro(bdev, rbd_dev->mapping.read_only);
340c7a2b 544
602adf40
YS
545 return 0;
546}
547
db2a144b 548static void rbd_release(struct gendisk *disk, fmode_t mode)
dfc5606d
YS
549{
550 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
551 unsigned long open_count_before;
552
a14ea269 553 spin_lock_irq(&rbd_dev->lock);
b82d167b 554 open_count_before = rbd_dev->open_count--;
a14ea269 555 spin_unlock_irq(&rbd_dev->lock);
b82d167b 556 rbd_assert(open_count_before > 0);
dfc5606d 557
c3e946ce 558 put_device(&rbd_dev->dev);
dfc5606d
YS
559}
560
602adf40
YS
561static const struct block_device_operations rbd_bd_ops = {
562 .owner = THIS_MODULE,
563 .open = rbd_open,
dfc5606d 564 .release = rbd_release,
602adf40
YS
565};
566
567/*
7262cfca 568 * Initialize an rbd client instance. Success or not, this function
cfbf6377 569 * consumes ceph_opts. Caller holds client_mutex.
602adf40 570 */
f8c38929 571static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
572{
573 struct rbd_client *rbdc;
574 int ret = -ENOMEM;
575
37206ee5 576 dout("%s:\n", __func__);
602adf40
YS
577 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
578 if (!rbdc)
579 goto out_opt;
580
581 kref_init(&rbdc->kref);
582 INIT_LIST_HEAD(&rbdc->node);
583
43ae4701 584 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 585 if (IS_ERR(rbdc->client))
08f75463 586 goto out_rbdc;
43ae4701 587 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
588
589 ret = ceph_open_session(rbdc->client);
590 if (ret < 0)
08f75463 591 goto out_client;
602adf40 592
432b8587 593 spin_lock(&rbd_client_list_lock);
602adf40 594 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 595 spin_unlock(&rbd_client_list_lock);
602adf40 596
37206ee5 597 dout("%s: rbdc %p\n", __func__, rbdc);
bc534d86 598
602adf40 599 return rbdc;
08f75463 600out_client:
602adf40 601 ceph_destroy_client(rbdc->client);
08f75463 602out_rbdc:
602adf40
YS
603 kfree(rbdc);
604out_opt:
43ae4701
AE
605 if (ceph_opts)
606 ceph_destroy_options(ceph_opts);
37206ee5
AE
607 dout("%s: error %d\n", __func__, ret);
608
28f259b7 609 return ERR_PTR(ret);
602adf40
YS
610}
611
2f82ee54
AE
612static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
613{
614 kref_get(&rbdc->kref);
615
616 return rbdc;
617}
618
602adf40 619/*
1f7ba331
AE
620 * Find a ceph client with specific addr and configuration. If
621 * found, bump its reference count.
602adf40 622 */
1f7ba331 623static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
624{
625 struct rbd_client *client_node;
1f7ba331 626 bool found = false;
602adf40 627
43ae4701 628 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
629 return NULL;
630
1f7ba331
AE
631 spin_lock(&rbd_client_list_lock);
632 list_for_each_entry(client_node, &rbd_client_list, node) {
633 if (!ceph_compare_options(ceph_opts, client_node->client)) {
2f82ee54
AE
634 __rbd_get_client(client_node);
635
1f7ba331
AE
636 found = true;
637 break;
638 }
639 }
640 spin_unlock(&rbd_client_list_lock);
641
642 return found ? client_node : NULL;
602adf40
YS
643}
644
59c2be1e
YS
645/*
646 * mount options
647 */
648enum {
59c2be1e
YS
649 Opt_last_int,
650 /* int args above */
651 Opt_last_string,
652 /* string args above */
cc0538b6
AE
653 Opt_read_only,
654 Opt_read_write,
655 /* Boolean args above */
656 Opt_last_bool,
59c2be1e
YS
657};
658
43ae4701 659static match_table_t rbd_opts_tokens = {
59c2be1e
YS
660 /* int args above */
661 /* string args above */
be466c1c 662 {Opt_read_only, "read_only"},
cc0538b6
AE
663 {Opt_read_only, "ro"}, /* Alternate spelling */
664 {Opt_read_write, "read_write"},
665 {Opt_read_write, "rw"}, /* Alternate spelling */
666 /* Boolean args above */
59c2be1e
YS
667 {-1, NULL}
668};
669
98571b5a
AE
670struct rbd_options {
671 bool read_only;
672};
673
674#define RBD_READ_ONLY_DEFAULT false
675
59c2be1e
YS
676static int parse_rbd_opts_token(char *c, void *private)
677{
43ae4701 678 struct rbd_options *rbd_opts = private;
59c2be1e
YS
679 substring_t argstr[MAX_OPT_ARGS];
680 int token, intval, ret;
681
43ae4701 682 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
683 if (token < 0)
684 return -EINVAL;
685
686 if (token < Opt_last_int) {
687 ret = match_int(&argstr[0], &intval);
688 if (ret < 0) {
689 pr_err("bad mount option arg (not int) "
690 "at '%s'\n", c);
691 return ret;
692 }
693 dout("got int token %d val %d\n", token, intval);
694 } else if (token > Opt_last_int && token < Opt_last_string) {
695 dout("got string token %d val %s\n", token,
696 argstr[0].from);
cc0538b6
AE
697 } else if (token > Opt_last_string && token < Opt_last_bool) {
698 dout("got Boolean token %d\n", token);
59c2be1e
YS
699 } else {
700 dout("got token %d\n", token);
701 }
702
703 switch (token) {
cc0538b6
AE
704 case Opt_read_only:
705 rbd_opts->read_only = true;
706 break;
707 case Opt_read_write:
708 rbd_opts->read_only = false;
709 break;
59c2be1e 710 default:
aafb230e
AE
711 rbd_assert(false);
712 break;
59c2be1e
YS
713 }
714 return 0;
715}
716
602adf40
YS
717/*
718 * Get a ceph client with specific addr and configuration, if one does
7262cfca
AE
719 * not exist create it. Either way, ceph_opts is consumed by this
720 * function.
602adf40 721 */
9d3997fd 722static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
602adf40 723{
f8c38929 724 struct rbd_client *rbdc;
59c2be1e 725
cfbf6377 726 mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
1f7ba331 727 rbdc = rbd_client_find(ceph_opts);
9d3997fd 728 if (rbdc) /* using an existing client */
43ae4701 729 ceph_destroy_options(ceph_opts);
9d3997fd 730 else
f8c38929 731 rbdc = rbd_client_create(ceph_opts);
cfbf6377 732 mutex_unlock(&client_mutex);
602adf40 733
9d3997fd 734 return rbdc;
602adf40
YS
735}
736
737/*
738 * Destroy ceph client
d23a4b3f 739 *
432b8587 740 * Caller must hold rbd_client_list_lock.
602adf40
YS
741 */
742static void rbd_client_release(struct kref *kref)
743{
744 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
745
37206ee5 746 dout("%s: rbdc %p\n", __func__, rbdc);
cd9d9f5d 747 spin_lock(&rbd_client_list_lock);
602adf40 748 list_del(&rbdc->node);
cd9d9f5d 749 spin_unlock(&rbd_client_list_lock);
602adf40
YS
750
751 ceph_destroy_client(rbdc->client);
752 kfree(rbdc);
753}
754
755/*
756 * Drop reference to ceph client node. If it's not referenced anymore, release
757 * it.
758 */
9d3997fd 759static void rbd_put_client(struct rbd_client *rbdc)
602adf40 760{
c53d5893
AE
761 if (rbdc)
762 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
763}
764
a30b71b9
AE
765static bool rbd_image_format_valid(u32 image_format)
766{
767 return image_format == 1 || image_format == 2;
768}
769
8e94af8e
AE
770static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
771{
103a150f
AE
772 size_t size;
773 u32 snap_count;
774
775 /* The header has to start with the magic rbd header text */
776 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
777 return false;
778
db2388b6
AE
779 /* The bio layer requires at least sector-sized I/O */
780
781 if (ondisk->options.order < SECTOR_SHIFT)
782 return false;
783
784 /* If we use u64 in a few spots we may be able to loosen this */
785
786 if (ondisk->options.order > 8 * sizeof (int) - 1)
787 return false;
788
103a150f
AE
789 /*
790 * The size of a snapshot header has to fit in a size_t, and
791 * that limits the number of snapshots.
792 */
793 snap_count = le32_to_cpu(ondisk->snap_count);
794 size = SIZE_MAX - sizeof (struct ceph_snap_context);
795 if (snap_count > size / sizeof (__le64))
796 return false;
797
798 /*
799 * Not only that, but the size of the entire the snapshot
800 * header must also be representable in a size_t.
801 */
802 size -= snap_count * sizeof (__le64);
803 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
804 return false;
805
806 return true;
8e94af8e
AE
807}
808
602adf40 809/*
bb23e37a
AE
810 * Fill an rbd image header with information from the given format 1
811 * on-disk header.
602adf40 812 */
662518b1 813static int rbd_header_from_disk(struct rbd_device *rbd_dev,
4156d998 814 struct rbd_image_header_ondisk *ondisk)
602adf40 815{
662518b1 816 struct rbd_image_header *header = &rbd_dev->header;
bb23e37a
AE
817 bool first_time = header->object_prefix == NULL;
818 struct ceph_snap_context *snapc;
819 char *object_prefix = NULL;
820 char *snap_names = NULL;
821 u64 *snap_sizes = NULL;
ccece235 822 u32 snap_count;
d2bb24e5 823 size_t size;
bb23e37a 824 int ret = -ENOMEM;
621901d6 825 u32 i;
602adf40 826
bb23e37a 827 /* Allocate this now to avoid having to handle failure below */
6a52325f 828
bb23e37a
AE
829 if (first_time) {
830 size_t len;
103a150f 831
bb23e37a
AE
832 len = strnlen(ondisk->object_prefix,
833 sizeof (ondisk->object_prefix));
834 object_prefix = kmalloc(len + 1, GFP_KERNEL);
835 if (!object_prefix)
836 return -ENOMEM;
837 memcpy(object_prefix, ondisk->object_prefix, len);
838 object_prefix[len] = '\0';
839 }
00f1f36f 840
bb23e37a 841 /* Allocate the snapshot context and fill it in */
00f1f36f 842
bb23e37a
AE
843 snap_count = le32_to_cpu(ondisk->snap_count);
844 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
845 if (!snapc)
846 goto out_err;
847 snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 848 if (snap_count) {
bb23e37a 849 struct rbd_image_snap_ondisk *snaps;
f785cc1d
AE
850 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
851
bb23e37a 852 /* We'll keep a copy of the snapshot names... */
621901d6 853
bb23e37a
AE
854 if (snap_names_len > (u64)SIZE_MAX)
855 goto out_2big;
856 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
857 if (!snap_names)
6a52325f
AE
858 goto out_err;
859
bb23e37a 860 /* ...as well as the array of their sizes. */
621901d6 861
d2bb24e5 862 size = snap_count * sizeof (*header->snap_sizes);
bb23e37a
AE
863 snap_sizes = kmalloc(size, GFP_KERNEL);
864 if (!snap_sizes)
6a52325f 865 goto out_err;
bb23e37a 866
f785cc1d 867 /*
bb23e37a
AE
868 * Copy the names, and fill in each snapshot's id
869 * and size.
870 *
99a41ebc 871 * Note that rbd_dev_v1_header_info() guarantees the
bb23e37a 872 * ondisk buffer we're working with has
f785cc1d
AE
873 * snap_names_len bytes beyond the end of the
874 * snapshot id array, this memcpy() is safe.
875 */
bb23e37a
AE
876 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
877 snaps = ondisk->snaps;
878 for (i = 0; i < snap_count; i++) {
879 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
880 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
881 }
602adf40 882 }
6a52325f 883
bb23e37a 884 /* We won't fail any more, fill in the header */
621901d6 885
bb23e37a
AE
886 if (first_time) {
887 header->object_prefix = object_prefix;
888 header->obj_order = ondisk->options.order;
889 header->crypt_type = ondisk->options.crypt_type;
890 header->comp_type = ondisk->options.comp_type;
891 /* The rest aren't used for format 1 images */
892 header->stripe_unit = 0;
893 header->stripe_count = 0;
894 header->features = 0;
602adf40 895 } else {
662518b1
AE
896 ceph_put_snap_context(header->snapc);
897 kfree(header->snap_names);
898 kfree(header->snap_sizes);
602adf40 899 }
849b4260 900
bb23e37a 901 /* The remaining fields always get updated (when we refresh) */
621901d6 902
f84344f3 903 header->image_size = le64_to_cpu(ondisk->image_size);
bb23e37a
AE
904 header->snapc = snapc;
905 header->snap_names = snap_names;
906 header->snap_sizes = snap_sizes;
468521c1 907
662518b1 908 /* Make sure mapping size is consistent with header info */
602adf40 909
662518b1
AE
910 if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
911 if (rbd_dev->mapping.size != header->image_size)
912 rbd_dev->mapping.size = header->image_size;
913
602adf40 914 return 0;
bb23e37a
AE
915out_2big:
916 ret = -EIO;
6a52325f 917out_err:
bb23e37a
AE
918 kfree(snap_sizes);
919 kfree(snap_names);
920 ceph_put_snap_context(snapc);
921 kfree(object_prefix);
ccece235 922
bb23e37a 923 return ret;
602adf40
YS
924}
925
9682fc6d
AE
926static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
927{
928 const char *snap_name;
929
930 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
931
932 /* Skip over names until we find the one we are looking for */
933
934 snap_name = rbd_dev->header.snap_names;
935 while (which--)
936 snap_name += strlen(snap_name) + 1;
937
938 return kstrdup(snap_name, GFP_KERNEL);
939}
940
30d1cff8
AE
941/*
942 * Snapshot id comparison function for use with qsort()/bsearch().
943 * Note that result is for snapshots in *descending* order.
944 */
945static int snapid_compare_reverse(const void *s1, const void *s2)
946{
947 u64 snap_id1 = *(u64 *)s1;
948 u64 snap_id2 = *(u64 *)s2;
949
950 if (snap_id1 < snap_id2)
951 return 1;
952 return snap_id1 == snap_id2 ? 0 : -1;
953}
954
955/*
956 * Search a snapshot context to see if the given snapshot id is
957 * present.
958 *
959 * Returns the position of the snapshot id in the array if it's found,
960 * or BAD_SNAP_INDEX otherwise.
961 *
962 * Note: The snapshot array is in kept sorted (by the osd) in
963 * reverse order, highest snapshot id first.
964 */
9682fc6d
AE
965static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
966{
967 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
30d1cff8 968 u64 *found;
9682fc6d 969
30d1cff8
AE
970 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
971 sizeof (snap_id), snapid_compare_reverse);
9682fc6d 972
30d1cff8 973 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
9682fc6d
AE
974}
975
2ad3d716
AE
976static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
977 u64 snap_id)
9e15b77d 978{
54cac61f 979 u32 which;
da6a6b63 980 const char *snap_name;
9e15b77d 981
54cac61f
AE
982 which = rbd_dev_snap_index(rbd_dev, snap_id);
983 if (which == BAD_SNAP_INDEX)
da6a6b63 984 return ERR_PTR(-ENOENT);
54cac61f 985
da6a6b63
JD
986 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
987 return snap_name ? snap_name : ERR_PTR(-ENOMEM);
54cac61f
AE
988}
989
990static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
991{
9e15b77d
AE
992 if (snap_id == CEPH_NOSNAP)
993 return RBD_SNAP_HEAD_NAME;
994
54cac61f
AE
995 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
996 if (rbd_dev->image_format == 1)
997 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
9e15b77d 998
54cac61f 999 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
9e15b77d
AE
1000}
1001
2ad3d716
AE
1002static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1003 u64 *snap_size)
602adf40 1004{
2ad3d716
AE
1005 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1006 if (snap_id == CEPH_NOSNAP) {
1007 *snap_size = rbd_dev->header.image_size;
1008 } else if (rbd_dev->image_format == 1) {
1009 u32 which;
602adf40 1010
2ad3d716
AE
1011 which = rbd_dev_snap_index(rbd_dev, snap_id);
1012 if (which == BAD_SNAP_INDEX)
1013 return -ENOENT;
e86924a8 1014
2ad3d716
AE
1015 *snap_size = rbd_dev->header.snap_sizes[which];
1016 } else {
1017 u64 size = 0;
1018 int ret;
1019
1020 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1021 if (ret)
1022 return ret;
1023
1024 *snap_size = size;
1025 }
1026 return 0;
602adf40
YS
1027}
1028
2ad3d716
AE
1029static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1030 u64 *snap_features)
602adf40 1031{
2ad3d716
AE
1032 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1033 if (snap_id == CEPH_NOSNAP) {
1034 *snap_features = rbd_dev->header.features;
1035 } else if (rbd_dev->image_format == 1) {
1036 *snap_features = 0; /* No features for format 1 */
602adf40 1037 } else {
2ad3d716
AE
1038 u64 features = 0;
1039 int ret;
8b0241f8 1040
2ad3d716
AE
1041 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1042 if (ret)
1043 return ret;
1044
1045 *snap_features = features;
1046 }
1047 return 0;
1048}
1049
1050static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1051{
8f4b7d98 1052 u64 snap_id = rbd_dev->spec->snap_id;
2ad3d716
AE
1053 u64 size = 0;
1054 u64 features = 0;
1055 int ret;
1056
2ad3d716
AE
1057 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1058 if (ret)
1059 return ret;
1060 ret = rbd_snap_features(rbd_dev, snap_id, &features);
1061 if (ret)
1062 return ret;
1063
1064 rbd_dev->mapping.size = size;
1065 rbd_dev->mapping.features = features;
1066
8b0241f8 1067 return 0;
602adf40
YS
1068}
1069
d1cf5788
AE
1070static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1071{
1072 rbd_dev->mapping.size = 0;
1073 rbd_dev->mapping.features = 0;
200a6a8b
AE
1074}
1075
98571b5a 1076static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
602adf40 1077{
65ccfe21
AE
1078 char *name;
1079 u64 segment;
1080 int ret;
3a96d5cd 1081 char *name_format;
602adf40 1082
78c2a44a 1083 name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
65ccfe21
AE
1084 if (!name)
1085 return NULL;
1086 segment = offset >> rbd_dev->header.obj_order;
3a96d5cd
JD
1087 name_format = "%s.%012llx";
1088 if (rbd_dev->image_format == 2)
1089 name_format = "%s.%016llx";
1090 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, name_format,
65ccfe21 1091 rbd_dev->header.object_prefix, segment);
2fd82b9e 1092 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
65ccfe21
AE
1093 pr_err("error formatting segment name for #%llu (%d)\n",
1094 segment, ret);
1095 kfree(name);
1096 name = NULL;
1097 }
602adf40 1098
65ccfe21
AE
1099 return name;
1100}
602adf40 1101
78c2a44a
AE
1102static void rbd_segment_name_free(const char *name)
1103{
1104 /* The explicit cast here is needed to drop the const qualifier */
1105
1106 kmem_cache_free(rbd_segment_name_cache, (void *)name);
1107}
1108
65ccfe21
AE
1109static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1110{
1111 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
602adf40 1112
65ccfe21
AE
1113 return offset & (segment_size - 1);
1114}
1115
1116static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1117 u64 offset, u64 length)
1118{
1119 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1120
1121 offset &= segment_size - 1;
1122
aafb230e 1123 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
1124 if (offset + length > segment_size)
1125 length = segment_size - offset;
1126
1127 return length;
602adf40
YS
1128}
1129
029bcbd8
JD
1130/*
1131 * returns the size of an object in the image
1132 */
1133static u64 rbd_obj_bytes(struct rbd_image_header *header)
1134{
1135 return 1 << header->obj_order;
1136}
1137
602adf40
YS
1138/*
1139 * bio helpers
1140 */
1141
1142static void bio_chain_put(struct bio *chain)
1143{
1144 struct bio *tmp;
1145
1146 while (chain) {
1147 tmp = chain;
1148 chain = chain->bi_next;
1149 bio_put(tmp);
1150 }
1151}
1152
1153/*
1154 * zeros a bio chain, starting at specific offset
1155 */
1156static void zero_bio_chain(struct bio *chain, int start_ofs)
1157{
1158 struct bio_vec *bv;
1159 unsigned long flags;
1160 void *buf;
1161 int i;
1162 int pos = 0;
1163
1164 while (chain) {
1165 bio_for_each_segment(bv, chain, i) {
1166 if (pos + bv->bv_len > start_ofs) {
1167 int remainder = max(start_ofs - pos, 0);
1168 buf = bvec_kmap_irq(bv, &flags);
1169 memset(buf + remainder, 0,
1170 bv->bv_len - remainder);
e2156054 1171 flush_dcache_page(bv->bv_page);
85b5aaa6 1172 bvec_kunmap_irq(buf, &flags);
602adf40
YS
1173 }
1174 pos += bv->bv_len;
1175 }
1176
1177 chain = chain->bi_next;
1178 }
1179}
1180
b9434c5b
AE
1181/*
1182 * similar to zero_bio_chain(), zeros data defined by a page array,
1183 * starting at the given byte offset from the start of the array and
1184 * continuing up to the given end offset. The pages array is
1185 * assumed to be big enough to hold all bytes up to the end.
1186 */
1187static void zero_pages(struct page **pages, u64 offset, u64 end)
1188{
1189 struct page **page = &pages[offset >> PAGE_SHIFT];
1190
1191 rbd_assert(end > offset);
1192 rbd_assert(end - offset <= (u64)SIZE_MAX);
1193 while (offset < end) {
1194 size_t page_offset;
1195 size_t length;
1196 unsigned long flags;
1197 void *kaddr;
1198
491205a8
GU
1199 page_offset = offset & ~PAGE_MASK;
1200 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
b9434c5b
AE
1201 local_irq_save(flags);
1202 kaddr = kmap_atomic(*page);
1203 memset(kaddr + page_offset, 0, length);
e2156054 1204 flush_dcache_page(*page);
b9434c5b
AE
1205 kunmap_atomic(kaddr);
1206 local_irq_restore(flags);
1207
1208 offset += length;
1209 page++;
1210 }
1211}
1212
602adf40 1213/*
f7760dad
AE
1214 * Clone a portion of a bio, starting at the given byte offset
1215 * and continuing for the number of bytes indicated.
602adf40 1216 */
f7760dad
AE
1217static struct bio *bio_clone_range(struct bio *bio_src,
1218 unsigned int offset,
1219 unsigned int len,
1220 gfp_t gfpmask)
602adf40 1221{
f7760dad
AE
1222 struct bio_vec *bv;
1223 unsigned int resid;
1224 unsigned short idx;
1225 unsigned int voff;
1226 unsigned short end_idx;
1227 unsigned short vcnt;
1228 struct bio *bio;
1229
1230 /* Handle the easy case for the caller */
1231
1232 if (!offset && len == bio_src->bi_size)
1233 return bio_clone(bio_src, gfpmask);
1234
1235 if (WARN_ON_ONCE(!len))
1236 return NULL;
1237 if (WARN_ON_ONCE(len > bio_src->bi_size))
1238 return NULL;
1239 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1240 return NULL;
1241
1242 /* Find first affected segment... */
1243
1244 resid = offset;
d74c6d51 1245 bio_for_each_segment(bv, bio_src, idx) {
f7760dad
AE
1246 if (resid < bv->bv_len)
1247 break;
1248 resid -= bv->bv_len;
602adf40 1249 }
f7760dad 1250 voff = resid;
602adf40 1251
f7760dad 1252 /* ...and the last affected segment */
602adf40 1253
f7760dad
AE
1254 resid += len;
1255 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1256 if (resid <= bv->bv_len)
1257 break;
1258 resid -= bv->bv_len;
1259 }
1260 vcnt = end_idx - idx + 1;
1261
1262 /* Build the clone */
1263
1264 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1265 if (!bio)
1266 return NULL; /* ENOMEM */
602adf40 1267
f7760dad
AE
1268 bio->bi_bdev = bio_src->bi_bdev;
1269 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1270 bio->bi_rw = bio_src->bi_rw;
1271 bio->bi_flags |= 1 << BIO_CLONED;
1272
1273 /*
1274 * Copy over our part of the bio_vec, then update the first
1275 * and last (or only) entries.
1276 */
1277 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1278 vcnt * sizeof (struct bio_vec));
1279 bio->bi_io_vec[0].bv_offset += voff;
1280 if (vcnt > 1) {
1281 bio->bi_io_vec[0].bv_len -= voff;
1282 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1283 } else {
1284 bio->bi_io_vec[0].bv_len = len;
602adf40
YS
1285 }
1286
f7760dad
AE
1287 bio->bi_vcnt = vcnt;
1288 bio->bi_size = len;
1289 bio->bi_idx = 0;
1290
1291 return bio;
1292}
1293
1294/*
1295 * Clone a portion of a bio chain, starting at the given byte offset
1296 * into the first bio in the source chain and continuing for the
1297 * number of bytes indicated. The result is another bio chain of
1298 * exactly the given length, or a null pointer on error.
1299 *
1300 * The bio_src and offset parameters are both in-out. On entry they
1301 * refer to the first source bio and the offset into that bio where
1302 * the start of data to be cloned is located.
1303 *
1304 * On return, bio_src is updated to refer to the bio in the source
1305 * chain that contains first un-cloned byte, and *offset will
1306 * contain the offset of that byte within that bio.
1307 */
1308static struct bio *bio_chain_clone_range(struct bio **bio_src,
1309 unsigned int *offset,
1310 unsigned int len,
1311 gfp_t gfpmask)
1312{
1313 struct bio *bi = *bio_src;
1314 unsigned int off = *offset;
1315 struct bio *chain = NULL;
1316 struct bio **end;
1317
1318 /* Build up a chain of clone bios up to the limit */
1319
1320 if (!bi || off >= bi->bi_size || !len)
1321 return NULL; /* Nothing to clone */
602adf40 1322
f7760dad
AE
1323 end = &chain;
1324 while (len) {
1325 unsigned int bi_size;
1326 struct bio *bio;
1327
f5400b7a
AE
1328 if (!bi) {
1329 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
f7760dad 1330 goto out_err; /* EINVAL; ran out of bio's */
f5400b7a 1331 }
f7760dad
AE
1332 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1333 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1334 if (!bio)
1335 goto out_err; /* ENOMEM */
1336
1337 *end = bio;
1338 end = &bio->bi_next;
602adf40 1339
f7760dad
AE
1340 off += bi_size;
1341 if (off == bi->bi_size) {
1342 bi = bi->bi_next;
1343 off = 0;
1344 }
1345 len -= bi_size;
1346 }
1347 *bio_src = bi;
1348 *offset = off;
1349
1350 return chain;
1351out_err:
1352 bio_chain_put(chain);
602adf40 1353
602adf40
YS
1354 return NULL;
1355}
1356
926f9b3f
AE
1357/*
1358 * The default/initial value for all object request flags is 0. For
1359 * each flag, once its value is set to 1 it is never reset to 0
1360 * again.
1361 */
57acbaa7 1362static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
926f9b3f 1363{
57acbaa7 1364 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
926f9b3f
AE
1365 struct rbd_device *rbd_dev;
1366
57acbaa7
AE
1367 rbd_dev = obj_request->img_request->rbd_dev;
1368 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
926f9b3f
AE
1369 obj_request);
1370 }
1371}
1372
57acbaa7 1373static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
926f9b3f
AE
1374{
1375 smp_mb();
57acbaa7 1376 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
926f9b3f
AE
1377}
1378
57acbaa7 1379static void obj_request_done_set(struct rbd_obj_request *obj_request)
6365d33a 1380{
57acbaa7
AE
1381 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1382 struct rbd_device *rbd_dev = NULL;
6365d33a 1383
57acbaa7
AE
1384 if (obj_request_img_data_test(obj_request))
1385 rbd_dev = obj_request->img_request->rbd_dev;
1386 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
6365d33a
AE
1387 obj_request);
1388 }
1389}
1390
57acbaa7 1391static bool obj_request_done_test(struct rbd_obj_request *obj_request)
6365d33a
AE
1392{
1393 smp_mb();
57acbaa7 1394 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
6365d33a
AE
1395}
1396
5679c59f
AE
1397/*
1398 * This sets the KNOWN flag after (possibly) setting the EXISTS
1399 * flag. The latter is set based on the "exists" value provided.
1400 *
1401 * Note that for our purposes once an object exists it never goes
1402 * away again. It's possible that the response from two existence
1403 * checks are separated by the creation of the target object, and
1404 * the first ("doesn't exist") response arrives *after* the second
1405 * ("does exist"). In that case we ignore the second one.
1406 */
1407static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1408 bool exists)
1409{
1410 if (exists)
1411 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1412 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1413 smp_mb();
1414}
1415
1416static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1417{
1418 smp_mb();
1419 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1420}
1421
1422static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1423{
1424 smp_mb();
1425 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1426}
1427
bf0d5f50
AE
1428static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1429{
37206ee5
AE
1430 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1431 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1432 kref_get(&obj_request->kref);
1433}
1434
1435static void rbd_obj_request_destroy(struct kref *kref);
1436static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1437{
1438 rbd_assert(obj_request != NULL);
37206ee5
AE
1439 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1440 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1441 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1442}
1443
e93f3152
AE
1444static bool img_request_child_test(struct rbd_img_request *img_request);
1445static void rbd_parent_request_destroy(struct kref *kref);
bf0d5f50
AE
1446static void rbd_img_request_destroy(struct kref *kref);
1447static void rbd_img_request_put(struct rbd_img_request *img_request)
1448{
1449 rbd_assert(img_request != NULL);
37206ee5
AE
1450 dout("%s: img %p (was %d)\n", __func__, img_request,
1451 atomic_read(&img_request->kref.refcount));
e93f3152
AE
1452 if (img_request_child_test(img_request))
1453 kref_put(&img_request->kref, rbd_parent_request_destroy);
1454 else
1455 kref_put(&img_request->kref, rbd_img_request_destroy);
bf0d5f50
AE
1456}
1457
1458static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1459 struct rbd_obj_request *obj_request)
1460{
25dcf954
AE
1461 rbd_assert(obj_request->img_request == NULL);
1462
b155e86c 1463 /* Image request now owns object's original reference */
bf0d5f50 1464 obj_request->img_request = img_request;
25dcf954 1465 obj_request->which = img_request->obj_request_count;
6365d33a
AE
1466 rbd_assert(!obj_request_img_data_test(obj_request));
1467 obj_request_img_data_set(obj_request);
bf0d5f50 1468 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954
AE
1469 img_request->obj_request_count++;
1470 list_add_tail(&obj_request->links, &img_request->obj_requests);
37206ee5
AE
1471 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1472 obj_request->which);
bf0d5f50
AE
1473}
1474
1475static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1476 struct rbd_obj_request *obj_request)
1477{
1478 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954 1479
37206ee5
AE
1480 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1481 obj_request->which);
bf0d5f50 1482 list_del(&obj_request->links);
25dcf954
AE
1483 rbd_assert(img_request->obj_request_count > 0);
1484 img_request->obj_request_count--;
1485 rbd_assert(obj_request->which == img_request->obj_request_count);
1486 obj_request->which = BAD_WHICH;
6365d33a 1487 rbd_assert(obj_request_img_data_test(obj_request));
bf0d5f50 1488 rbd_assert(obj_request->img_request == img_request);
bf0d5f50 1489 obj_request->img_request = NULL;
25dcf954 1490 obj_request->callback = NULL;
bf0d5f50
AE
1491 rbd_obj_request_put(obj_request);
1492}
1493
1494static bool obj_request_type_valid(enum obj_request_type type)
1495{
1496 switch (type) {
9969ebc5 1497 case OBJ_REQUEST_NODATA:
bf0d5f50 1498 case OBJ_REQUEST_BIO:
788e2df3 1499 case OBJ_REQUEST_PAGES:
bf0d5f50
AE
1500 return true;
1501 default:
1502 return false;
1503 }
1504}
1505
bf0d5f50
AE
1506static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1507 struct rbd_obj_request *obj_request)
1508{
37206ee5
AE
1509 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1510
bf0d5f50
AE
1511 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1512}
1513
1514static void rbd_img_request_complete(struct rbd_img_request *img_request)
1515{
55f27e09 1516
37206ee5 1517 dout("%s: img %p\n", __func__, img_request);
55f27e09
AE
1518
1519 /*
1520 * If no error occurred, compute the aggregate transfer
1521 * count for the image request. We could instead use
1522 * atomic64_cmpxchg() to update it as each object request
1523 * completes; not clear which way is better off hand.
1524 */
1525 if (!img_request->result) {
1526 struct rbd_obj_request *obj_request;
1527 u64 xferred = 0;
1528
1529 for_each_obj_request(img_request, obj_request)
1530 xferred += obj_request->xferred;
1531 img_request->xferred = xferred;
1532 }
1533
bf0d5f50
AE
1534 if (img_request->callback)
1535 img_request->callback(img_request);
1536 else
1537 rbd_img_request_put(img_request);
1538}
1539
788e2df3
AE
1540/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1541
1542static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1543{
37206ee5
AE
1544 dout("%s: obj %p\n", __func__, obj_request);
1545
788e2df3
AE
1546 return wait_for_completion_interruptible(&obj_request->completion);
1547}
1548
0c425248
AE
1549/*
1550 * The default/initial value for all image request flags is 0. Each
1551 * is conditionally set to 1 at image request initialization time
1552 * and currently never change thereafter.
1553 */
1554static void img_request_write_set(struct rbd_img_request *img_request)
1555{
1556 set_bit(IMG_REQ_WRITE, &img_request->flags);
1557 smp_mb();
1558}
1559
1560static bool img_request_write_test(struct rbd_img_request *img_request)
1561{
1562 smp_mb();
1563 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1564}
1565
9849e986
AE
1566static void img_request_child_set(struct rbd_img_request *img_request)
1567{
1568 set_bit(IMG_REQ_CHILD, &img_request->flags);
1569 smp_mb();
1570}
1571
e93f3152
AE
1572static void img_request_child_clear(struct rbd_img_request *img_request)
1573{
1574 clear_bit(IMG_REQ_CHILD, &img_request->flags);
1575 smp_mb();
1576}
1577
9849e986
AE
1578static bool img_request_child_test(struct rbd_img_request *img_request)
1579{
1580 smp_mb();
1581 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1582}
1583
d0b2e944
AE
1584static void img_request_layered_set(struct rbd_img_request *img_request)
1585{
1586 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1587 smp_mb();
1588}
1589
a2acd00e
AE
1590static void img_request_layered_clear(struct rbd_img_request *img_request)
1591{
1592 clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1593 smp_mb();
1594}
1595
d0b2e944
AE
1596static bool img_request_layered_test(struct rbd_img_request *img_request)
1597{
1598 smp_mb();
1599 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1600}
1601
6e2a4505
AE
1602static void
1603rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1604{
b9434c5b
AE
1605 u64 xferred = obj_request->xferred;
1606 u64 length = obj_request->length;
1607
6e2a4505
AE
1608 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1609 obj_request, obj_request->img_request, obj_request->result,
b9434c5b 1610 xferred, length);
6e2a4505 1611 /*
17c1cc1d
JD
1612 * ENOENT means a hole in the image. We zero-fill the entire
1613 * length of the request. A short read also implies zero-fill
1614 * to the end of the request. An error requires the whole
1615 * length of the request to be reported finished with an error
1616 * to the block layer. In each case we update the xferred
1617 * count to indicate the whole request was satisfied.
6e2a4505 1618 */
b9434c5b 1619 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
6e2a4505 1620 if (obj_request->result == -ENOENT) {
b9434c5b
AE
1621 if (obj_request->type == OBJ_REQUEST_BIO)
1622 zero_bio_chain(obj_request->bio_list, 0);
1623 else
1624 zero_pages(obj_request->pages, 0, length);
6e2a4505 1625 obj_request->result = 0;
b9434c5b
AE
1626 } else if (xferred < length && !obj_request->result) {
1627 if (obj_request->type == OBJ_REQUEST_BIO)
1628 zero_bio_chain(obj_request->bio_list, xferred);
1629 else
1630 zero_pages(obj_request->pages, xferred, length);
6e2a4505 1631 }
17c1cc1d 1632 obj_request->xferred = length;
6e2a4505
AE
1633 obj_request_done_set(obj_request);
1634}
1635
bf0d5f50
AE
1636static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1637{
37206ee5
AE
1638 dout("%s: obj %p cb %p\n", __func__, obj_request,
1639 obj_request->callback);
bf0d5f50
AE
1640 if (obj_request->callback)
1641 obj_request->callback(obj_request);
788e2df3
AE
1642 else
1643 complete_all(&obj_request->completion);
bf0d5f50
AE
1644}
1645
c47f9371 1646static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
39bf2c5d
AE
1647{
1648 dout("%s: obj %p\n", __func__, obj_request);
1649 obj_request_done_set(obj_request);
1650}
1651
c47f9371 1652static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1653{
57acbaa7 1654 struct rbd_img_request *img_request = NULL;
a9e8ba2c 1655 struct rbd_device *rbd_dev = NULL;
57acbaa7
AE
1656 bool layered = false;
1657
1658 if (obj_request_img_data_test(obj_request)) {
1659 img_request = obj_request->img_request;
1660 layered = img_request && img_request_layered_test(img_request);
a9e8ba2c 1661 rbd_dev = img_request->rbd_dev;
57acbaa7 1662 }
8b3e1a56
AE
1663
1664 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1665 obj_request, img_request, obj_request->result,
1666 obj_request->xferred, obj_request->length);
a9e8ba2c
AE
1667 if (layered && obj_request->result == -ENOENT &&
1668 obj_request->img_offset < rbd_dev->parent_overlap)
8b3e1a56
AE
1669 rbd_img_parent_read(obj_request);
1670 else if (img_request)
6e2a4505
AE
1671 rbd_img_obj_request_read_callback(obj_request);
1672 else
1673 obj_request_done_set(obj_request);
bf0d5f50
AE
1674}
1675
c47f9371 1676static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1677{
1b83bef2
SW
1678 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1679 obj_request->result, obj_request->length);
1680 /*
8b3e1a56
AE
1681 * There is no such thing as a successful short write. Set
1682 * it to our originally-requested length.
1b83bef2
SW
1683 */
1684 obj_request->xferred = obj_request->length;
07741308 1685 obj_request_done_set(obj_request);
bf0d5f50
AE
1686}
1687
fbfab539
AE
1688/*
1689 * For a simple stat call there's nothing to do. We'll do more if
1690 * this is part of a write sequence for a layered image.
1691 */
c47f9371 1692static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
fbfab539 1693{
37206ee5 1694 dout("%s: obj %p\n", __func__, obj_request);
fbfab539
AE
1695 obj_request_done_set(obj_request);
1696}
1697
bf0d5f50
AE
1698static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1699 struct ceph_msg *msg)
1700{
1701 struct rbd_obj_request *obj_request = osd_req->r_priv;
bf0d5f50
AE
1702 u16 opcode;
1703
37206ee5 1704 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
bf0d5f50 1705 rbd_assert(osd_req == obj_request->osd_req);
57acbaa7
AE
1706 if (obj_request_img_data_test(obj_request)) {
1707 rbd_assert(obj_request->img_request);
1708 rbd_assert(obj_request->which != BAD_WHICH);
1709 } else {
1710 rbd_assert(obj_request->which == BAD_WHICH);
1711 }
bf0d5f50 1712
1b83bef2
SW
1713 if (osd_req->r_result < 0)
1714 obj_request->result = osd_req->r_result;
bf0d5f50 1715
0eefd470 1716 BUG_ON(osd_req->r_num_ops > 2);
bf0d5f50 1717
c47f9371
AE
1718 /*
1719 * We support a 64-bit length, but ultimately it has to be
1720 * passed to blk_end_request(), which takes an unsigned int.
1721 */
1b83bef2 1722 obj_request->xferred = osd_req->r_reply_op_len[0];
8b3e1a56 1723 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
79528734 1724 opcode = osd_req->r_ops[0].op;
bf0d5f50
AE
1725 switch (opcode) {
1726 case CEPH_OSD_OP_READ:
c47f9371 1727 rbd_osd_read_callback(obj_request);
bf0d5f50
AE
1728 break;
1729 case CEPH_OSD_OP_WRITE:
c47f9371 1730 rbd_osd_write_callback(obj_request);
bf0d5f50 1731 break;
fbfab539 1732 case CEPH_OSD_OP_STAT:
c47f9371 1733 rbd_osd_stat_callback(obj_request);
fbfab539 1734 break;
36be9a76 1735 case CEPH_OSD_OP_CALL:
b8d70035 1736 case CEPH_OSD_OP_NOTIFY_ACK:
9969ebc5 1737 case CEPH_OSD_OP_WATCH:
c47f9371 1738 rbd_osd_trivial_callback(obj_request);
9969ebc5 1739 break;
bf0d5f50
AE
1740 default:
1741 rbd_warn(NULL, "%s: unsupported op %hu\n",
1742 obj_request->object_name, (unsigned short) opcode);
1743 break;
1744 }
1745
07741308 1746 if (obj_request_done_test(obj_request))
bf0d5f50
AE
1747 rbd_obj_request_complete(obj_request);
1748}
1749
9d4df01f 1750static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
430c28c3
AE
1751{
1752 struct rbd_img_request *img_request = obj_request->img_request;
8c042b0d 1753 struct ceph_osd_request *osd_req = obj_request->osd_req;
9d4df01f 1754 u64 snap_id;
430c28c3 1755
8c042b0d 1756 rbd_assert(osd_req != NULL);
430c28c3 1757
9d4df01f 1758 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
8c042b0d 1759 ceph_osdc_build_request(osd_req, obj_request->offset,
9d4df01f
AE
1760 NULL, snap_id, NULL);
1761}
1762
1763static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1764{
1765 struct rbd_img_request *img_request = obj_request->img_request;
1766 struct ceph_osd_request *osd_req = obj_request->osd_req;
1767 struct ceph_snap_context *snapc;
1768 struct timespec mtime = CURRENT_TIME;
1769
1770 rbd_assert(osd_req != NULL);
1771
1772 snapc = img_request ? img_request->snapc : NULL;
1773 ceph_osdc_build_request(osd_req, obj_request->offset,
1774 snapc, CEPH_NOSNAP, &mtime);
430c28c3
AE
1775}
1776
bf0d5f50
AE
1777static struct ceph_osd_request *rbd_osd_req_create(
1778 struct rbd_device *rbd_dev,
1779 bool write_request,
430c28c3 1780 struct rbd_obj_request *obj_request)
bf0d5f50 1781{
bf0d5f50
AE
1782 struct ceph_snap_context *snapc = NULL;
1783 struct ceph_osd_client *osdc;
1784 struct ceph_osd_request *osd_req;
bf0d5f50 1785
6365d33a
AE
1786 if (obj_request_img_data_test(obj_request)) {
1787 struct rbd_img_request *img_request = obj_request->img_request;
1788
0c425248
AE
1789 rbd_assert(write_request ==
1790 img_request_write_test(img_request));
1791 if (write_request)
bf0d5f50 1792 snapc = img_request->snapc;
bf0d5f50
AE
1793 }
1794
1795 /* Allocate and initialize the request, for the single op */
1796
1797 osdc = &rbd_dev->rbd_client->client->osdc;
1798 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1799 if (!osd_req)
1800 return NULL; /* ENOMEM */
bf0d5f50 1801
430c28c3 1802 if (write_request)
bf0d5f50 1803 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
430c28c3 1804 else
bf0d5f50 1805 osd_req->r_flags = CEPH_OSD_FLAG_READ;
bf0d5f50
AE
1806
1807 osd_req->r_callback = rbd_osd_req_callback;
1808 osd_req->r_priv = obj_request;
1809
1810 osd_req->r_oid_len = strlen(obj_request->object_name);
1811 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1812 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1813
1814 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1815
bf0d5f50
AE
1816 return osd_req;
1817}
1818
0eefd470
AE
1819/*
1820 * Create a copyup osd request based on the information in the
1821 * object request supplied. A copyup request has two osd ops,
1822 * a copyup method call, and a "normal" write request.
1823 */
1824static struct ceph_osd_request *
1825rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1826{
1827 struct rbd_img_request *img_request;
1828 struct ceph_snap_context *snapc;
1829 struct rbd_device *rbd_dev;
1830 struct ceph_osd_client *osdc;
1831 struct ceph_osd_request *osd_req;
1832
1833 rbd_assert(obj_request_img_data_test(obj_request));
1834 img_request = obj_request->img_request;
1835 rbd_assert(img_request);
1836 rbd_assert(img_request_write_test(img_request));
1837
1838 /* Allocate and initialize the request, for the two ops */
1839
1840 snapc = img_request->snapc;
1841 rbd_dev = img_request->rbd_dev;
1842 osdc = &rbd_dev->rbd_client->client->osdc;
1843 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1844 if (!osd_req)
1845 return NULL; /* ENOMEM */
1846
1847 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1848 osd_req->r_callback = rbd_osd_req_callback;
1849 osd_req->r_priv = obj_request;
1850
1851 osd_req->r_oid_len = strlen(obj_request->object_name);
1852 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1853 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1854
1855 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1856
1857 return osd_req;
1858}
1859
1860
bf0d5f50
AE
1861static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1862{
1863 ceph_osdc_put_request(osd_req);
1864}
1865
1866/* object_name is assumed to be a non-null pointer and NUL-terminated */
1867
1868static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1869 u64 offset, u64 length,
1870 enum obj_request_type type)
1871{
1872 struct rbd_obj_request *obj_request;
1873 size_t size;
1874 char *name;
1875
1876 rbd_assert(obj_request_type_valid(type));
1877
1878 size = strlen(object_name) + 1;
f907ad55
AE
1879 name = kmalloc(size, GFP_KERNEL);
1880 if (!name)
bf0d5f50
AE
1881 return NULL;
1882
868311b1 1883 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
f907ad55
AE
1884 if (!obj_request) {
1885 kfree(name);
1886 return NULL;
1887 }
1888
bf0d5f50
AE
1889 obj_request->object_name = memcpy(name, object_name, size);
1890 obj_request->offset = offset;
1891 obj_request->length = length;
926f9b3f 1892 obj_request->flags = 0;
bf0d5f50
AE
1893 obj_request->which = BAD_WHICH;
1894 obj_request->type = type;
1895 INIT_LIST_HEAD(&obj_request->links);
788e2df3 1896 init_completion(&obj_request->completion);
bf0d5f50
AE
1897 kref_init(&obj_request->kref);
1898
37206ee5
AE
1899 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1900 offset, length, (int)type, obj_request);
1901
bf0d5f50
AE
1902 return obj_request;
1903}
1904
1905static void rbd_obj_request_destroy(struct kref *kref)
1906{
1907 struct rbd_obj_request *obj_request;
1908
1909 obj_request = container_of(kref, struct rbd_obj_request, kref);
1910
37206ee5
AE
1911 dout("%s: obj %p\n", __func__, obj_request);
1912
bf0d5f50
AE
1913 rbd_assert(obj_request->img_request == NULL);
1914 rbd_assert(obj_request->which == BAD_WHICH);
1915
1916 if (obj_request->osd_req)
1917 rbd_osd_req_destroy(obj_request->osd_req);
1918
1919 rbd_assert(obj_request_type_valid(obj_request->type));
1920 switch (obj_request->type) {
9969ebc5
AE
1921 case OBJ_REQUEST_NODATA:
1922 break; /* Nothing to do */
bf0d5f50
AE
1923 case OBJ_REQUEST_BIO:
1924 if (obj_request->bio_list)
1925 bio_chain_put(obj_request->bio_list);
1926 break;
788e2df3
AE
1927 case OBJ_REQUEST_PAGES:
1928 if (obj_request->pages)
1929 ceph_release_page_vector(obj_request->pages,
1930 obj_request->page_count);
1931 break;
bf0d5f50
AE
1932 }
1933
f907ad55 1934 kfree(obj_request->object_name);
868311b1
AE
1935 obj_request->object_name = NULL;
1936 kmem_cache_free(rbd_obj_request_cache, obj_request);
bf0d5f50
AE
1937}
1938
fb65d228
AE
1939/* It's OK to call this for a device with no parent */
1940
1941static void rbd_spec_put(struct rbd_spec *spec);
1942static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1943{
1944 rbd_dev_remove_parent(rbd_dev);
1945 rbd_spec_put(rbd_dev->parent_spec);
1946 rbd_dev->parent_spec = NULL;
1947 rbd_dev->parent_overlap = 0;
1948}
1949
a2acd00e
AE
1950/*
1951 * Parent image reference counting is used to determine when an
1952 * image's parent fields can be safely torn down--after there are no
1953 * more in-flight requests to the parent image. When the last
1954 * reference is dropped, cleaning them up is safe.
1955 */
1956static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1957{
1958 int counter;
1959
1960 if (!rbd_dev->parent_spec)
1961 return;
1962
1963 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1964 if (counter > 0)
1965 return;
1966
1967 /* Last reference; clean up parent data structures */
1968
1969 if (!counter)
1970 rbd_dev_unparent(rbd_dev);
1971 else
1972 rbd_warn(rbd_dev, "parent reference underflow\n");
1973}
1974
1975/*
1976 * If an image has a non-zero parent overlap, get a reference to its
1977 * parent.
1978 *
392a9dad
AE
1979 * We must get the reference before checking for the overlap to
1980 * coordinate properly with zeroing the parent overlap in
1981 * rbd_dev_v2_parent_info() when an image gets flattened. We
1982 * drop it again if there is no overlap.
1983 *
a2acd00e
AE
1984 * Returns true if the rbd device has a parent with a non-zero
1985 * overlap and a reference for it was successfully taken, or
1986 * false otherwise.
1987 */
1988static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1989{
1990 int counter;
1991
1992 if (!rbd_dev->parent_spec)
1993 return false;
1994
1995 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1996 if (counter > 0 && rbd_dev->parent_overlap)
1997 return true;
1998
1999 /* Image was flattened, but parent is not yet torn down */
2000
2001 if (counter < 0)
2002 rbd_warn(rbd_dev, "parent reference overflow\n");
2003
2004 return false;
2005}
2006
bf0d5f50
AE
2007/*
2008 * Caller is responsible for filling in the list of object requests
2009 * that comprises the image request, and the Linux request pointer
2010 * (if there is one).
2011 */
cc344fa1
AE
2012static struct rbd_img_request *rbd_img_request_create(
2013 struct rbd_device *rbd_dev,
bf0d5f50 2014 u64 offset, u64 length,
e93f3152 2015 bool write_request)
bf0d5f50
AE
2016{
2017 struct rbd_img_request *img_request;
bf0d5f50 2018
1c2a9dfe 2019 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
bf0d5f50
AE
2020 if (!img_request)
2021 return NULL;
2022
2023 if (write_request) {
2024 down_read(&rbd_dev->header_rwsem);
812164f8 2025 ceph_get_snap_context(rbd_dev->header.snapc);
bf0d5f50 2026 up_read(&rbd_dev->header_rwsem);
bf0d5f50
AE
2027 }
2028
2029 img_request->rq = NULL;
2030 img_request->rbd_dev = rbd_dev;
2031 img_request->offset = offset;
2032 img_request->length = length;
0c425248
AE
2033 img_request->flags = 0;
2034 if (write_request) {
2035 img_request_write_set(img_request);
468521c1 2036 img_request->snapc = rbd_dev->header.snapc;
0c425248 2037 } else {
bf0d5f50 2038 img_request->snap_id = rbd_dev->spec->snap_id;
0c425248 2039 }
a2acd00e 2040 if (rbd_dev_parent_get(rbd_dev))
d0b2e944 2041 img_request_layered_set(img_request);
bf0d5f50
AE
2042 spin_lock_init(&img_request->completion_lock);
2043 img_request->next_completion = 0;
2044 img_request->callback = NULL;
a5a337d4 2045 img_request->result = 0;
bf0d5f50
AE
2046 img_request->obj_request_count = 0;
2047 INIT_LIST_HEAD(&img_request->obj_requests);
2048 kref_init(&img_request->kref);
2049
37206ee5
AE
2050 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
2051 write_request ? "write" : "read", offset, length,
2052 img_request);
2053
bf0d5f50
AE
2054 return img_request;
2055}
2056
2057static void rbd_img_request_destroy(struct kref *kref)
2058{
2059 struct rbd_img_request *img_request;
2060 struct rbd_obj_request *obj_request;
2061 struct rbd_obj_request *next_obj_request;
2062
2063 img_request = container_of(kref, struct rbd_img_request, kref);
2064
37206ee5
AE
2065 dout("%s: img %p\n", __func__, img_request);
2066
bf0d5f50
AE
2067 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2068 rbd_img_obj_request_del(img_request, obj_request);
25dcf954 2069 rbd_assert(img_request->obj_request_count == 0);
bf0d5f50 2070
a2acd00e
AE
2071 if (img_request_layered_test(img_request)) {
2072 img_request_layered_clear(img_request);
2073 rbd_dev_parent_put(img_request->rbd_dev);
2074 }
2075
0c425248 2076 if (img_request_write_test(img_request))
812164f8 2077 ceph_put_snap_context(img_request->snapc);
bf0d5f50 2078
1c2a9dfe 2079 kmem_cache_free(rbd_img_request_cache, img_request);
bf0d5f50
AE
2080}
2081
e93f3152
AE
2082static struct rbd_img_request *rbd_parent_request_create(
2083 struct rbd_obj_request *obj_request,
2084 u64 img_offset, u64 length)
2085{
2086 struct rbd_img_request *parent_request;
2087 struct rbd_device *rbd_dev;
2088
2089 rbd_assert(obj_request->img_request);
2090 rbd_dev = obj_request->img_request->rbd_dev;
2091
2092 parent_request = rbd_img_request_create(rbd_dev->parent,
2093 img_offset, length, false);
2094 if (!parent_request)
2095 return NULL;
2096
2097 img_request_child_set(parent_request);
2098 rbd_obj_request_get(obj_request);
2099 parent_request->obj_request = obj_request;
2100
2101 return parent_request;
2102}
2103
2104static void rbd_parent_request_destroy(struct kref *kref)
2105{
2106 struct rbd_img_request *parent_request;
2107 struct rbd_obj_request *orig_request;
2108
2109 parent_request = container_of(kref, struct rbd_img_request, kref);
2110 orig_request = parent_request->obj_request;
2111
2112 parent_request->obj_request = NULL;
2113 rbd_obj_request_put(orig_request);
2114 img_request_child_clear(parent_request);
2115
2116 rbd_img_request_destroy(kref);
2117}
2118
1217857f
AE
2119static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2120{
6365d33a 2121 struct rbd_img_request *img_request;
1217857f
AE
2122 unsigned int xferred;
2123 int result;
8b3e1a56 2124 bool more;
1217857f 2125
6365d33a
AE
2126 rbd_assert(obj_request_img_data_test(obj_request));
2127 img_request = obj_request->img_request;
2128
1217857f
AE
2129 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2130 xferred = (unsigned int)obj_request->xferred;
2131 result = obj_request->result;
2132 if (result) {
2133 struct rbd_device *rbd_dev = img_request->rbd_dev;
2134
2135 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
2136 img_request_write_test(img_request) ? "write" : "read",
2137 obj_request->length, obj_request->img_offset,
2138 obj_request->offset);
2139 rbd_warn(rbd_dev, " result %d xferred %x\n",
2140 result, xferred);
2141 if (!img_request->result)
2142 img_request->result = result;
2143 }
2144
f1a4739f
AE
2145 /* Image object requests don't own their page array */
2146
2147 if (obj_request->type == OBJ_REQUEST_PAGES) {
2148 obj_request->pages = NULL;
2149 obj_request->page_count = 0;
2150 }
2151
8b3e1a56
AE
2152 if (img_request_child_test(img_request)) {
2153 rbd_assert(img_request->obj_request != NULL);
2154 more = obj_request->which < img_request->obj_request_count - 1;
2155 } else {
2156 rbd_assert(img_request->rq != NULL);
2157 more = blk_end_request(img_request->rq, result, xferred);
2158 }
2159
2160 return more;
1217857f
AE
2161}
2162
2169238d
AE
2163static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2164{
2165 struct rbd_img_request *img_request;
2166 u32 which = obj_request->which;
2167 bool more = true;
2168
6365d33a 2169 rbd_assert(obj_request_img_data_test(obj_request));
2169238d
AE
2170 img_request = obj_request->img_request;
2171
2172 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2173 rbd_assert(img_request != NULL);
2169238d
AE
2174 rbd_assert(img_request->obj_request_count > 0);
2175 rbd_assert(which != BAD_WHICH);
2176 rbd_assert(which < img_request->obj_request_count);
2177 rbd_assert(which >= img_request->next_completion);
2178
2179 spin_lock_irq(&img_request->completion_lock);
2180 if (which != img_request->next_completion)
2181 goto out;
2182
2183 for_each_obj_request_from(img_request, obj_request) {
2169238d
AE
2184 rbd_assert(more);
2185 rbd_assert(which < img_request->obj_request_count);
2186
2187 if (!obj_request_done_test(obj_request))
2188 break;
1217857f 2189 more = rbd_img_obj_end_request(obj_request);
2169238d
AE
2190 which++;
2191 }
2192
2193 rbd_assert(more ^ (which == img_request->obj_request_count));
2194 img_request->next_completion = which;
2195out:
2196 spin_unlock_irq(&img_request->completion_lock);
2197
2198 if (!more)
2199 rbd_img_request_complete(img_request);
2200}
2201
f1a4739f
AE
2202/*
2203 * Split up an image request into one or more object requests, each
2204 * to a different object. The "type" parameter indicates whether
2205 * "data_desc" is the pointer to the head of a list of bio
2206 * structures, or the base of a page array. In either case this
2207 * function assumes data_desc describes memory sufficient to hold
2208 * all data described by the image request.
2209 */
2210static int rbd_img_request_fill(struct rbd_img_request *img_request,
2211 enum obj_request_type type,
2212 void *data_desc)
bf0d5f50
AE
2213{
2214 struct rbd_device *rbd_dev = img_request->rbd_dev;
2215 struct rbd_obj_request *obj_request = NULL;
2216 struct rbd_obj_request *next_obj_request;
0c425248 2217 bool write_request = img_request_write_test(img_request);
a158073c 2218 struct bio *bio_list = NULL;
f1a4739f 2219 unsigned int bio_offset = 0;
a158073c 2220 struct page **pages = NULL;
7da22d29 2221 u64 img_offset;
bf0d5f50
AE
2222 u64 resid;
2223 u16 opcode;
2224
f1a4739f
AE
2225 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2226 (int)type, data_desc);
37206ee5 2227
430c28c3 2228 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
7da22d29 2229 img_offset = img_request->offset;
bf0d5f50 2230 resid = img_request->length;
4dda41d3 2231 rbd_assert(resid > 0);
f1a4739f
AE
2232
2233 if (type == OBJ_REQUEST_BIO) {
2234 bio_list = data_desc;
2235 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2236 } else {
2237 rbd_assert(type == OBJ_REQUEST_PAGES);
2238 pages = data_desc;
2239 }
2240
bf0d5f50 2241 while (resid) {
2fa12320 2242 struct ceph_osd_request *osd_req;
bf0d5f50 2243 const char *object_name;
bf0d5f50
AE
2244 u64 offset;
2245 u64 length;
2246
7da22d29 2247 object_name = rbd_segment_name(rbd_dev, img_offset);
bf0d5f50
AE
2248 if (!object_name)
2249 goto out_unwind;
7da22d29
AE
2250 offset = rbd_segment_offset(rbd_dev, img_offset);
2251 length = rbd_segment_length(rbd_dev, img_offset, resid);
bf0d5f50 2252 obj_request = rbd_obj_request_create(object_name,
f1a4739f 2253 offset, length, type);
78c2a44a
AE
2254 /* object request has its own copy of the object name */
2255 rbd_segment_name_free(object_name);
bf0d5f50
AE
2256 if (!obj_request)
2257 goto out_unwind;
03507db6
JD
2258 /*
2259 * set obj_request->img_request before creating the
2260 * osd_request so that it gets the right snapc
2261 */
2262 rbd_img_obj_request_add(img_request, obj_request);
bf0d5f50 2263
f1a4739f
AE
2264 if (type == OBJ_REQUEST_BIO) {
2265 unsigned int clone_size;
2266
2267 rbd_assert(length <= (u64)UINT_MAX);
2268 clone_size = (unsigned int)length;
2269 obj_request->bio_list =
2270 bio_chain_clone_range(&bio_list,
2271 &bio_offset,
2272 clone_size,
2273 GFP_ATOMIC);
2274 if (!obj_request->bio_list)
2275 goto out_partial;
2276 } else {
2277 unsigned int page_count;
2278
2279 obj_request->pages = pages;
2280 page_count = (u32)calc_pages_for(offset, length);
2281 obj_request->page_count = page_count;
2282 if ((offset + length) & ~PAGE_MASK)
2283 page_count--; /* more on last page */
2284 pages += page_count;
2285 }
bf0d5f50 2286
2fa12320
AE
2287 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2288 obj_request);
2289 if (!osd_req)
bf0d5f50 2290 goto out_partial;
2fa12320 2291 obj_request->osd_req = osd_req;
2169238d 2292 obj_request->callback = rbd_img_obj_callback;
430c28c3 2293
2fa12320
AE
2294 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2295 0, 0);
f1a4739f
AE
2296 if (type == OBJ_REQUEST_BIO)
2297 osd_req_op_extent_osd_data_bio(osd_req, 0,
2298 obj_request->bio_list, length);
2299 else
2300 osd_req_op_extent_osd_data_pages(osd_req, 0,
2301 obj_request->pages, length,
2302 offset & ~PAGE_MASK, false, false);
9d4df01f
AE
2303
2304 if (write_request)
2305 rbd_osd_req_format_write(obj_request);
2306 else
2307 rbd_osd_req_format_read(obj_request);
430c28c3 2308
7da22d29 2309 obj_request->img_offset = img_offset;
bf0d5f50 2310
7da22d29 2311 img_offset += length;
bf0d5f50
AE
2312 resid -= length;
2313 }
2314
2315 return 0;
2316
2317out_partial:
2318 rbd_obj_request_put(obj_request);
2319out_unwind:
2320 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2321 rbd_obj_request_put(obj_request);
2322
2323 return -ENOMEM;
2324}
2325
0eefd470
AE
2326static void
2327rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2328{
2329 struct rbd_img_request *img_request;
2330 struct rbd_device *rbd_dev;
ebda6408 2331 struct page **pages;
0eefd470
AE
2332 u32 page_count;
2333
2334 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2335 rbd_assert(obj_request_img_data_test(obj_request));
2336 img_request = obj_request->img_request;
2337 rbd_assert(img_request);
2338
2339 rbd_dev = img_request->rbd_dev;
2340 rbd_assert(rbd_dev);
0eefd470 2341
ebda6408
AE
2342 pages = obj_request->copyup_pages;
2343 rbd_assert(pages != NULL);
0eefd470 2344 obj_request->copyup_pages = NULL;
ebda6408
AE
2345 page_count = obj_request->copyup_page_count;
2346 rbd_assert(page_count);
2347 obj_request->copyup_page_count = 0;
2348 ceph_release_page_vector(pages, page_count);
0eefd470
AE
2349
2350 /*
2351 * We want the transfer count to reflect the size of the
2352 * original write request. There is no such thing as a
2353 * successful short write, so if the request was successful
2354 * we can just set it to the originally-requested length.
2355 */
2356 if (!obj_request->result)
2357 obj_request->xferred = obj_request->length;
2358
2359 /* Finish up with the normal image object callback */
2360
2361 rbd_img_obj_callback(obj_request);
2362}
2363
3d7efd18
AE
2364static void
2365rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2366{
2367 struct rbd_obj_request *orig_request;
0eefd470
AE
2368 struct ceph_osd_request *osd_req;
2369 struct ceph_osd_client *osdc;
2370 struct rbd_device *rbd_dev;
3d7efd18 2371 struct page **pages;
ebda6408 2372 u32 page_count;
bbea1c1a 2373 int img_result;
ebda6408 2374 u64 parent_length;
b91f09f1
AE
2375 u64 offset;
2376 u64 length;
3d7efd18
AE
2377
2378 rbd_assert(img_request_child_test(img_request));
2379
2380 /* First get what we need from the image request */
2381
2382 pages = img_request->copyup_pages;
2383 rbd_assert(pages != NULL);
2384 img_request->copyup_pages = NULL;
ebda6408
AE
2385 page_count = img_request->copyup_page_count;
2386 rbd_assert(page_count);
2387 img_request->copyup_page_count = 0;
3d7efd18
AE
2388
2389 orig_request = img_request->obj_request;
2390 rbd_assert(orig_request != NULL);
b91f09f1 2391 rbd_assert(obj_request_type_valid(orig_request->type));
bbea1c1a 2392 img_result = img_request->result;
ebda6408
AE
2393 parent_length = img_request->length;
2394 rbd_assert(parent_length == img_request->xferred);
91c6febb 2395 rbd_img_request_put(img_request);
3d7efd18 2396
91c6febb
AE
2397 rbd_assert(orig_request->img_request);
2398 rbd_dev = orig_request->img_request->rbd_dev;
0eefd470 2399 rbd_assert(rbd_dev);
0eefd470 2400
bbea1c1a
AE
2401 /*
2402 * If the overlap has become 0 (most likely because the
2403 * image has been flattened) we need to free the pages
2404 * and re-submit the original write request.
2405 */
2406 if (!rbd_dev->parent_overlap) {
2407 struct ceph_osd_client *osdc;
3d7efd18 2408
bbea1c1a
AE
2409 ceph_release_page_vector(pages, page_count);
2410 osdc = &rbd_dev->rbd_client->client->osdc;
2411 img_result = rbd_obj_request_submit(osdc, orig_request);
2412 if (!img_result)
2413 return;
2414 }
0eefd470 2415
bbea1c1a 2416 if (img_result)
0eefd470 2417 goto out_err;
0eefd470 2418
8785b1d4
AE
2419 /*
2420 * The original osd request is of no use to use any more.
2421 * We need a new one that can hold the two ops in a copyup
2422 * request. Allocate the new copyup osd request for the
2423 * original request, and release the old one.
2424 */
bbea1c1a 2425 img_result = -ENOMEM;
0eefd470
AE
2426 osd_req = rbd_osd_req_create_copyup(orig_request);
2427 if (!osd_req)
2428 goto out_err;
8785b1d4 2429 rbd_osd_req_destroy(orig_request->osd_req);
0eefd470
AE
2430 orig_request->osd_req = osd_req;
2431 orig_request->copyup_pages = pages;
ebda6408 2432 orig_request->copyup_page_count = page_count;
3d7efd18 2433
0eefd470 2434 /* Initialize the copyup op */
3d7efd18 2435
0eefd470 2436 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
ebda6408 2437 osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
0eefd470 2438 false, false);
3d7efd18 2439
0eefd470
AE
2440 /* Then the original write request op */
2441
b91f09f1
AE
2442 offset = orig_request->offset;
2443 length = orig_request->length;
0eefd470 2444 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
b91f09f1
AE
2445 offset, length, 0, 0);
2446 if (orig_request->type == OBJ_REQUEST_BIO)
2447 osd_req_op_extent_osd_data_bio(osd_req, 1,
2448 orig_request->bio_list, length);
2449 else
2450 osd_req_op_extent_osd_data_pages(osd_req, 1,
2451 orig_request->pages, length,
2452 offset & ~PAGE_MASK, false, false);
0eefd470
AE
2453
2454 rbd_osd_req_format_write(orig_request);
2455
2456 /* All set, send it off. */
2457
2458 orig_request->callback = rbd_img_obj_copyup_callback;
2459 osdc = &rbd_dev->rbd_client->client->osdc;
bbea1c1a
AE
2460 img_result = rbd_obj_request_submit(osdc, orig_request);
2461 if (!img_result)
0eefd470
AE
2462 return;
2463out_err:
2464 /* Record the error code and complete the request */
2465
bbea1c1a 2466 orig_request->result = img_result;
0eefd470
AE
2467 orig_request->xferred = 0;
2468 obj_request_done_set(orig_request);
2469 rbd_obj_request_complete(orig_request);
3d7efd18
AE
2470}
2471
2472/*
2473 * Read from the parent image the range of data that covers the
2474 * entire target of the given object request. This is used for
2475 * satisfying a layered image write request when the target of an
2476 * object request from the image request does not exist.
2477 *
2478 * A page array big enough to hold the returned data is allocated
2479 * and supplied to rbd_img_request_fill() as the "data descriptor."
2480 * When the read completes, this page array will be transferred to
2481 * the original object request for the copyup operation.
2482 *
2483 * If an error occurs, record it as the result of the original
2484 * object request and mark it done so it gets completed.
2485 */
2486static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2487{
2488 struct rbd_img_request *img_request = NULL;
2489 struct rbd_img_request *parent_request = NULL;
2490 struct rbd_device *rbd_dev;
2491 u64 img_offset;
2492 u64 length;
2493 struct page **pages = NULL;
2494 u32 page_count;
2495 int result;
2496
2497 rbd_assert(obj_request_img_data_test(obj_request));
b91f09f1 2498 rbd_assert(obj_request_type_valid(obj_request->type));
3d7efd18
AE
2499
2500 img_request = obj_request->img_request;
2501 rbd_assert(img_request != NULL);
2502 rbd_dev = img_request->rbd_dev;
2503 rbd_assert(rbd_dev->parent != NULL);
2504
2505 /*
2506 * Determine the byte range covered by the object in the
2507 * child image to which the original request was to be sent.
2508 */
2509 img_offset = obj_request->img_offset - obj_request->offset;
2510 length = (u64)1 << rbd_dev->header.obj_order;
2511
a9e8ba2c
AE
2512 /*
2513 * There is no defined parent data beyond the parent
2514 * overlap, so limit what we read at that boundary if
2515 * necessary.
2516 */
2517 if (img_offset + length > rbd_dev->parent_overlap) {
2518 rbd_assert(img_offset < rbd_dev->parent_overlap);
2519 length = rbd_dev->parent_overlap - img_offset;
2520 }
2521
3d7efd18
AE
2522 /*
2523 * Allocate a page array big enough to receive the data read
2524 * from the parent.
2525 */
2526 page_count = (u32)calc_pages_for(0, length);
2527 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2528 if (IS_ERR(pages)) {
2529 result = PTR_ERR(pages);
2530 pages = NULL;
2531 goto out_err;
2532 }
2533
2534 result = -ENOMEM;
e93f3152
AE
2535 parent_request = rbd_parent_request_create(obj_request,
2536 img_offset, length);
3d7efd18
AE
2537 if (!parent_request)
2538 goto out_err;
3d7efd18
AE
2539
2540 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2541 if (result)
2542 goto out_err;
2543 parent_request->copyup_pages = pages;
ebda6408 2544 parent_request->copyup_page_count = page_count;
3d7efd18
AE
2545
2546 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2547 result = rbd_img_request_submit(parent_request);
2548 if (!result)
2549 return 0;
2550
2551 parent_request->copyup_pages = NULL;
ebda6408 2552 parent_request->copyup_page_count = 0;
3d7efd18
AE
2553 parent_request->obj_request = NULL;
2554 rbd_obj_request_put(obj_request);
2555out_err:
2556 if (pages)
2557 ceph_release_page_vector(pages, page_count);
2558 if (parent_request)
2559 rbd_img_request_put(parent_request);
2560 obj_request->result = result;
2561 obj_request->xferred = 0;
2562 obj_request_done_set(obj_request);
2563
2564 return result;
2565}
2566
c5b5ef6c
AE
2567static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2568{
c5b5ef6c 2569 struct rbd_obj_request *orig_request;
638f5abe 2570 struct rbd_device *rbd_dev;
c5b5ef6c
AE
2571 int result;
2572
2573 rbd_assert(!obj_request_img_data_test(obj_request));
2574
2575 /*
2576 * All we need from the object request is the original
2577 * request and the result of the STAT op. Grab those, then
2578 * we're done with the request.
2579 */
2580 orig_request = obj_request->obj_request;
2581 obj_request->obj_request = NULL;
912c317d 2582 rbd_obj_request_put(orig_request);
c5b5ef6c
AE
2583 rbd_assert(orig_request);
2584 rbd_assert(orig_request->img_request);
2585
2586 result = obj_request->result;
2587 obj_request->result = 0;
2588
2589 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2590 obj_request, orig_request, result,
2591 obj_request->xferred, obj_request->length);
2592 rbd_obj_request_put(obj_request);
2593
638f5abe
AE
2594 /*
2595 * If the overlap has become 0 (most likely because the
2596 * image has been flattened) we need to free the pages
2597 * and re-submit the original write request.
2598 */
2599 rbd_dev = orig_request->img_request->rbd_dev;
2600 if (!rbd_dev->parent_overlap) {
2601 struct ceph_osd_client *osdc;
2602
638f5abe
AE
2603 osdc = &rbd_dev->rbd_client->client->osdc;
2604 result = rbd_obj_request_submit(osdc, orig_request);
2605 if (!result)
2606 return;
2607 }
c5b5ef6c
AE
2608
2609 /*
2610 * Our only purpose here is to determine whether the object
2611 * exists, and we don't want to treat the non-existence as
2612 * an error. If something else comes back, transfer the
2613 * error to the original request and complete it now.
2614 */
2615 if (!result) {
2616 obj_request_existence_set(orig_request, true);
2617 } else if (result == -ENOENT) {
2618 obj_request_existence_set(orig_request, false);
2619 } else if (result) {
2620 orig_request->result = result;
3d7efd18 2621 goto out;
c5b5ef6c
AE
2622 }
2623
2624 /*
2625 * Resubmit the original request now that we have recorded
2626 * whether the target object exists.
2627 */
b454e36d 2628 orig_request->result = rbd_img_obj_request_submit(orig_request);
3d7efd18 2629out:
c5b5ef6c
AE
2630 if (orig_request->result)
2631 rbd_obj_request_complete(orig_request);
c5b5ef6c
AE
2632}
2633
2634static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2635{
2636 struct rbd_obj_request *stat_request;
2637 struct rbd_device *rbd_dev;
2638 struct ceph_osd_client *osdc;
2639 struct page **pages = NULL;
2640 u32 page_count;
2641 size_t size;
2642 int ret;
2643
2644 /*
2645 * The response data for a STAT call consists of:
2646 * le64 length;
2647 * struct {
2648 * le32 tv_sec;
2649 * le32 tv_nsec;
2650 * } mtime;
2651 */
2652 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2653 page_count = (u32)calc_pages_for(0, size);
2654 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2655 if (IS_ERR(pages))
2656 return PTR_ERR(pages);
2657
2658 ret = -ENOMEM;
2659 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2660 OBJ_REQUEST_PAGES);
2661 if (!stat_request)
2662 goto out;
2663
2664 rbd_obj_request_get(obj_request);
2665 stat_request->obj_request = obj_request;
2666 stat_request->pages = pages;
2667 stat_request->page_count = page_count;
2668
2669 rbd_assert(obj_request->img_request);
2670 rbd_dev = obj_request->img_request->rbd_dev;
2671 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2672 stat_request);
2673 if (!stat_request->osd_req)
2674 goto out;
2675 stat_request->callback = rbd_img_obj_exists_callback;
2676
2677 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2678 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2679 false, false);
9d4df01f 2680 rbd_osd_req_format_read(stat_request);
c5b5ef6c
AE
2681
2682 osdc = &rbd_dev->rbd_client->client->osdc;
2683 ret = rbd_obj_request_submit(osdc, stat_request);
2684out:
2685 if (ret)
2686 rbd_obj_request_put(obj_request);
2687
2688 return ret;
2689}
2690
b454e36d
AE
2691static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2692{
2693 struct rbd_img_request *img_request;
a9e8ba2c 2694 struct rbd_device *rbd_dev;
3d7efd18 2695 bool known;
b454e36d
AE
2696
2697 rbd_assert(obj_request_img_data_test(obj_request));
2698
2699 img_request = obj_request->img_request;
2700 rbd_assert(img_request);
a9e8ba2c 2701 rbd_dev = img_request->rbd_dev;
b454e36d 2702
b454e36d 2703 /*
a9e8ba2c
AE
2704 * Only writes to layered images need special handling.
2705 * Reads and non-layered writes are simple object requests.
2706 * Layered writes that start beyond the end of the overlap
2707 * with the parent have no parent data, so they too are
2708 * simple object requests. Finally, if the target object is
2709 * known to already exist, its parent data has already been
2710 * copied, so a write to the object can also be handled as a
2711 * simple object request.
b454e36d
AE
2712 */
2713 if (!img_request_write_test(img_request) ||
2714 !img_request_layered_test(img_request) ||
a9e8ba2c 2715 rbd_dev->parent_overlap <= obj_request->img_offset ||
3d7efd18
AE
2716 ((known = obj_request_known_test(obj_request)) &&
2717 obj_request_exists_test(obj_request))) {
b454e36d
AE
2718
2719 struct rbd_device *rbd_dev;
2720 struct ceph_osd_client *osdc;
2721
2722 rbd_dev = obj_request->img_request->rbd_dev;
2723 osdc = &rbd_dev->rbd_client->client->osdc;
2724
2725 return rbd_obj_request_submit(osdc, obj_request);
2726 }
2727
2728 /*
3d7efd18
AE
2729 * It's a layered write. The target object might exist but
2730 * we may not know that yet. If we know it doesn't exist,
2731 * start by reading the data for the full target object from
2732 * the parent so we can use it for a copyup to the target.
b454e36d 2733 */
3d7efd18
AE
2734 if (known)
2735 return rbd_img_obj_parent_read_full(obj_request);
2736
2737 /* We don't know whether the target exists. Go find out. */
b454e36d
AE
2738
2739 return rbd_img_obj_exists_submit(obj_request);
2740}
2741
bf0d5f50
AE
2742static int rbd_img_request_submit(struct rbd_img_request *img_request)
2743{
bf0d5f50 2744 struct rbd_obj_request *obj_request;
46faeed4 2745 struct rbd_obj_request *next_obj_request;
bf0d5f50 2746
37206ee5 2747 dout("%s: img %p\n", __func__, img_request);
46faeed4 2748 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
bf0d5f50
AE
2749 int ret;
2750
b454e36d 2751 ret = rbd_img_obj_request_submit(obj_request);
bf0d5f50
AE
2752 if (ret)
2753 return ret;
bf0d5f50
AE
2754 }
2755
2756 return 0;
2757}
8b3e1a56
AE
2758
2759static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2760{
2761 struct rbd_obj_request *obj_request;
a9e8ba2c
AE
2762 struct rbd_device *rbd_dev;
2763 u64 obj_end;
02c74fba
AE
2764 u64 img_xferred;
2765 int img_result;
8b3e1a56
AE
2766
2767 rbd_assert(img_request_child_test(img_request));
2768
02c74fba
AE
2769 /* First get what we need from the image request and release it */
2770
8b3e1a56 2771 obj_request = img_request->obj_request;
02c74fba
AE
2772 img_xferred = img_request->xferred;
2773 img_result = img_request->result;
2774 rbd_img_request_put(img_request);
2775
2776 /*
2777 * If the overlap has become 0 (most likely because the
2778 * image has been flattened) we need to re-submit the
2779 * original request.
2780 */
a9e8ba2c
AE
2781 rbd_assert(obj_request);
2782 rbd_assert(obj_request->img_request);
02c74fba
AE
2783 rbd_dev = obj_request->img_request->rbd_dev;
2784 if (!rbd_dev->parent_overlap) {
2785 struct ceph_osd_client *osdc;
2786
2787 osdc = &rbd_dev->rbd_client->client->osdc;
2788 img_result = rbd_obj_request_submit(osdc, obj_request);
2789 if (!img_result)
2790 return;
2791 }
a9e8ba2c 2792
02c74fba 2793 obj_request->result = img_result;
a9e8ba2c
AE
2794 if (obj_request->result)
2795 goto out;
2796
2797 /*
2798 * We need to zero anything beyond the parent overlap
2799 * boundary. Since rbd_img_obj_request_read_callback()
2800 * will zero anything beyond the end of a short read, an
2801 * easy way to do this is to pretend the data from the
2802 * parent came up short--ending at the overlap boundary.
2803 */
2804 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2805 obj_end = obj_request->img_offset + obj_request->length;
a9e8ba2c
AE
2806 if (obj_end > rbd_dev->parent_overlap) {
2807 u64 xferred = 0;
2808
2809 if (obj_request->img_offset < rbd_dev->parent_overlap)
2810 xferred = rbd_dev->parent_overlap -
2811 obj_request->img_offset;
8b3e1a56 2812
02c74fba 2813 obj_request->xferred = min(img_xferred, xferred);
a9e8ba2c 2814 } else {
02c74fba 2815 obj_request->xferred = img_xferred;
a9e8ba2c
AE
2816 }
2817out:
8b3e1a56
AE
2818 rbd_img_obj_request_read_callback(obj_request);
2819 rbd_obj_request_complete(obj_request);
2820}
2821
2822static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2823{
8b3e1a56
AE
2824 struct rbd_img_request *img_request;
2825 int result;
2826
2827 rbd_assert(obj_request_img_data_test(obj_request));
2828 rbd_assert(obj_request->img_request != NULL);
2829 rbd_assert(obj_request->result == (s32) -ENOENT);
5b2ab72d 2830 rbd_assert(obj_request_type_valid(obj_request->type));
8b3e1a56 2831
8b3e1a56 2832 /* rbd_read_finish(obj_request, obj_request->length); */
e93f3152 2833 img_request = rbd_parent_request_create(obj_request,
8b3e1a56 2834 obj_request->img_offset,
e93f3152 2835 obj_request->length);
8b3e1a56
AE
2836 result = -ENOMEM;
2837 if (!img_request)
2838 goto out_err;
2839
5b2ab72d
AE
2840 if (obj_request->type == OBJ_REQUEST_BIO)
2841 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2842 obj_request->bio_list);
2843 else
2844 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
2845 obj_request->pages);
8b3e1a56
AE
2846 if (result)
2847 goto out_err;
2848
2849 img_request->callback = rbd_img_parent_read_callback;
2850 result = rbd_img_request_submit(img_request);
2851 if (result)
2852 goto out_err;
2853
2854 return;
2855out_err:
2856 if (img_request)
2857 rbd_img_request_put(img_request);
2858 obj_request->result = result;
2859 obj_request->xferred = 0;
2860 obj_request_done_set(obj_request);
2861}
bf0d5f50 2862
20e0af67 2863static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev, u64 notify_id)
b8d70035
AE
2864{
2865 struct rbd_obj_request *obj_request;
2169238d 2866 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
b8d70035
AE
2867 int ret;
2868
2869 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2870 OBJ_REQUEST_NODATA);
2871 if (!obj_request)
2872 return -ENOMEM;
2873
2874 ret = -ENOMEM;
430c28c3 2875 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
b8d70035
AE
2876 if (!obj_request->osd_req)
2877 goto out;
2878
c99d2d4a 2879 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
cc4a38bd 2880 notify_id, 0, 0);
9d4df01f 2881 rbd_osd_req_format_read(obj_request);
430c28c3 2882
b8d70035 2883 ret = rbd_obj_request_submit(osdc, obj_request);
cf81b60e 2884 if (ret)
20e0af67
JD
2885 goto out;
2886 ret = rbd_obj_request_wait(obj_request);
2887out:
2888 rbd_obj_request_put(obj_request);
b8d70035
AE
2889
2890 return ret;
2891}
2892
2893static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2894{
2895 struct rbd_device *rbd_dev = (struct rbd_device *)data;
e627db08 2896 int ret;
b8d70035
AE
2897
2898 if (!rbd_dev)
2899 return;
2900
37206ee5 2901 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
cc4a38bd
AE
2902 rbd_dev->header_name, (unsigned long long)notify_id,
2903 (unsigned int)opcode);
e627db08
AE
2904 ret = rbd_dev_refresh(rbd_dev);
2905 if (ret)
3b5cf2a2 2906 rbd_warn(rbd_dev, "header refresh error (%d)\n", ret);
b8d70035 2907
20e0af67 2908 rbd_obj_notify_ack_sync(rbd_dev, notify_id);
b8d70035
AE
2909}
2910
9969ebc5
AE
2911/*
2912 * Request sync osd watch/unwatch. The value of "start" determines
2913 * whether a watch request is being initiated or torn down.
2914 */
1f3ef788 2915static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
9969ebc5
AE
2916{
2917 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2918 struct rbd_obj_request *obj_request;
9969ebc5
AE
2919 int ret;
2920
2921 rbd_assert(start ^ !!rbd_dev->watch_event);
2922 rbd_assert(start ^ !!rbd_dev->watch_request);
2923
2924 if (start) {
3c663bbd 2925 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
9969ebc5
AE
2926 &rbd_dev->watch_event);
2927 if (ret < 0)
2928 return ret;
8eb87565 2929 rbd_assert(rbd_dev->watch_event != NULL);
9969ebc5
AE
2930 }
2931
2932 ret = -ENOMEM;
2933 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2934 OBJ_REQUEST_NODATA);
2935 if (!obj_request)
2936 goto out_cancel;
2937
430c28c3
AE
2938 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2939 if (!obj_request->osd_req)
2940 goto out_cancel;
2941
8eb87565 2942 if (start)
975241af 2943 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
8eb87565 2944 else
6977c3f9 2945 ceph_osdc_unregister_linger_request(osdc,
975241af 2946 rbd_dev->watch_request->osd_req);
2169238d
AE
2947
2948 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
1f3ef788 2949 rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
9d4df01f 2950 rbd_osd_req_format_write(obj_request);
2169238d 2951
9969ebc5
AE
2952 ret = rbd_obj_request_submit(osdc, obj_request);
2953 if (ret)
2954 goto out_cancel;
2955 ret = rbd_obj_request_wait(obj_request);
2956 if (ret)
2957 goto out_cancel;
9969ebc5
AE
2958 ret = obj_request->result;
2959 if (ret)
2960 goto out_cancel;
2961
8eb87565
AE
2962 /*
2963 * A watch request is set to linger, so the underlying osd
2964 * request won't go away until we unregister it. We retain
2965 * a pointer to the object request during that time (in
2966 * rbd_dev->watch_request), so we'll keep a reference to
2967 * it. We'll drop that reference (below) after we've
2968 * unregistered it.
2969 */
2970 if (start) {
2971 rbd_dev->watch_request = obj_request;
2972
2973 return 0;
2974 }
2975
2976 /* We have successfully torn down the watch request */
2977
2978 rbd_obj_request_put(rbd_dev->watch_request);
2979 rbd_dev->watch_request = NULL;
9969ebc5
AE
2980out_cancel:
2981 /* Cancel the event if we're tearing down, or on error */
2982 ceph_osdc_cancel_event(rbd_dev->watch_event);
2983 rbd_dev->watch_event = NULL;
9969ebc5
AE
2984 if (obj_request)
2985 rbd_obj_request_put(obj_request);
2986
2987 return ret;
2988}
2989
36be9a76 2990/*
f40eb349
AE
2991 * Synchronous osd object method call. Returns the number of bytes
2992 * returned in the outbound buffer, or a negative error code.
36be9a76
AE
2993 */
2994static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2995 const char *object_name,
2996 const char *class_name,
2997 const char *method_name,
4157976b 2998 const void *outbound,
36be9a76 2999 size_t outbound_size,
4157976b 3000 void *inbound,
e2a58ee5 3001 size_t inbound_size)
36be9a76 3002{
2169238d 3003 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
36be9a76 3004 struct rbd_obj_request *obj_request;
36be9a76
AE
3005 struct page **pages;
3006 u32 page_count;
3007 int ret;
3008
3009 /*
6010a451
AE
3010 * Method calls are ultimately read operations. The result
3011 * should placed into the inbound buffer provided. They
3012 * also supply outbound data--parameters for the object
3013 * method. Currently if this is present it will be a
3014 * snapshot id.
36be9a76 3015 */
57385b51 3016 page_count = (u32)calc_pages_for(0, inbound_size);
36be9a76
AE
3017 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3018 if (IS_ERR(pages))
3019 return PTR_ERR(pages);
3020
3021 ret = -ENOMEM;
6010a451 3022 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
36be9a76
AE
3023 OBJ_REQUEST_PAGES);
3024 if (!obj_request)
3025 goto out;
3026
3027 obj_request->pages = pages;
3028 obj_request->page_count = page_count;
3029
430c28c3 3030 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
36be9a76
AE
3031 if (!obj_request->osd_req)
3032 goto out;
3033
c99d2d4a 3034 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
04017e29
AE
3035 class_name, method_name);
3036 if (outbound_size) {
3037 struct ceph_pagelist *pagelist;
3038
3039 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
3040 if (!pagelist)
3041 goto out;
3042
3043 ceph_pagelist_init(pagelist);
3044 ceph_pagelist_append(pagelist, outbound, outbound_size);
3045 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
3046 pagelist);
3047 }
a4ce40a9
AE
3048 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
3049 obj_request->pages, inbound_size,
44cd188d 3050 0, false, false);
9d4df01f 3051 rbd_osd_req_format_read(obj_request);
430c28c3 3052
36be9a76
AE
3053 ret = rbd_obj_request_submit(osdc, obj_request);
3054 if (ret)
3055 goto out;
3056 ret = rbd_obj_request_wait(obj_request);
3057 if (ret)
3058 goto out;
3059
3060 ret = obj_request->result;
3061 if (ret < 0)
3062 goto out;
57385b51
AE
3063
3064 rbd_assert(obj_request->xferred < (u64)INT_MAX);
3065 ret = (int)obj_request->xferred;
903bb32e 3066 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
36be9a76
AE
3067out:
3068 if (obj_request)
3069 rbd_obj_request_put(obj_request);
3070 else
3071 ceph_release_page_vector(pages, page_count);
3072
3073 return ret;
3074}
3075
bf0d5f50 3076static void rbd_request_fn(struct request_queue *q)
cc344fa1 3077 __releases(q->queue_lock) __acquires(q->queue_lock)
bf0d5f50
AE
3078{
3079 struct rbd_device *rbd_dev = q->queuedata;
3080 bool read_only = rbd_dev->mapping.read_only;
3081 struct request *rq;
3082 int result;
3083
3084 while ((rq = blk_fetch_request(q))) {
3085 bool write_request = rq_data_dir(rq) == WRITE;
3086 struct rbd_img_request *img_request;
3087 u64 offset;
3088 u64 length;
3089
3090 /* Ignore any non-FS requests that filter through. */
3091
3092 if (rq->cmd_type != REQ_TYPE_FS) {
4dda41d3
AE
3093 dout("%s: non-fs request type %d\n", __func__,
3094 (int) rq->cmd_type);
3095 __blk_end_request_all(rq, 0);
3096 continue;
3097 }
3098
3099 /* Ignore/skip any zero-length requests */
3100
3101 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
3102 length = (u64) blk_rq_bytes(rq);
3103
3104 if (!length) {
3105 dout("%s: zero-length request\n", __func__);
bf0d5f50
AE
3106 __blk_end_request_all(rq, 0);
3107 continue;
3108 }
3109
3110 spin_unlock_irq(q->queue_lock);
3111
3112 /* Disallow writes to a read-only device */
3113
3114 if (write_request) {
3115 result = -EROFS;
3116 if (read_only)
3117 goto end_request;
3118 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
3119 }
3120
6d292906
AE
3121 /*
3122 * Quit early if the mapped snapshot no longer
3123 * exists. It's still possible the snapshot will
3124 * have disappeared by the time our request arrives
3125 * at the osd, but there's no sense in sending it if
3126 * we already know.
3127 */
3128 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
bf0d5f50
AE
3129 dout("request for non-existent snapshot");
3130 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3131 result = -ENXIO;
3132 goto end_request;
3133 }
3134
bf0d5f50 3135 result = -EINVAL;
c0cd10db
AE
3136 if (offset && length > U64_MAX - offset + 1) {
3137 rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
3138 offset, length);
bf0d5f50 3139 goto end_request; /* Shouldn't happen */
c0cd10db 3140 }
bf0d5f50 3141
00a653e2
AE
3142 result = -EIO;
3143 if (offset + length > rbd_dev->mapping.size) {
3144 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
3145 offset, length, rbd_dev->mapping.size);
3146 goto end_request;
3147 }
3148
bf0d5f50
AE
3149 result = -ENOMEM;
3150 img_request = rbd_img_request_create(rbd_dev, offset, length,
e93f3152 3151 write_request);
bf0d5f50
AE
3152 if (!img_request)
3153 goto end_request;
3154
3155 img_request->rq = rq;
3156
f1a4739f
AE
3157 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3158 rq->bio);
bf0d5f50
AE
3159 if (!result)
3160 result = rbd_img_request_submit(img_request);
3161 if (result)
3162 rbd_img_request_put(img_request);
3163end_request:
3164 spin_lock_irq(q->queue_lock);
3165 if (result < 0) {
7da22d29
AE
3166 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
3167 write_request ? "write" : "read",
3168 length, offset, result);
3169
bf0d5f50
AE
3170 __blk_end_request_all(rq, result);
3171 }
3172 }
3173}
3174
602adf40
YS
3175/*
3176 * a queue callback. Makes sure that we don't create a bio that spans across
3177 * multiple osd objects. One exception would be with a single page bios,
f7760dad 3178 * which we handle later at bio_chain_clone_range()
602adf40
YS
3179 */
3180static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
3181 struct bio_vec *bvec)
3182{
3183 struct rbd_device *rbd_dev = q->queuedata;
e5cfeed2
AE
3184 sector_t sector_offset;
3185 sector_t sectors_per_obj;
3186 sector_t obj_sector_offset;
3187 int ret;
3188
3189 /*
3190 * Find how far into its rbd object the partition-relative
3191 * bio start sector is to offset relative to the enclosing
3192 * device.
3193 */
3194 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
3195 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
3196 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
3197
3198 /*
3199 * Compute the number of bytes from that offset to the end
3200 * of the object. Account for what's already used by the bio.
3201 */
3202 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
3203 if (ret > bmd->bi_size)
3204 ret -= bmd->bi_size;
3205 else
3206 ret = 0;
3207
3208 /*
3209 * Don't send back more than was asked for. And if the bio
3210 * was empty, let the whole thing through because: "Note
3211 * that a block device *must* allow a single page to be
3212 * added to an empty bio."
3213 */
3214 rbd_assert(bvec->bv_len <= PAGE_SIZE);
3215 if (ret > (int) bvec->bv_len || !bmd->bi_size)
3216 ret = (int) bvec->bv_len;
3217
3218 return ret;
602adf40
YS
3219}
3220
3221static void rbd_free_disk(struct rbd_device *rbd_dev)
3222{
3223 struct gendisk *disk = rbd_dev->disk;
3224
3225 if (!disk)
3226 return;
3227
a0cab924
AE
3228 rbd_dev->disk = NULL;
3229 if (disk->flags & GENHD_FL_UP) {
602adf40 3230 del_gendisk(disk);
a0cab924
AE
3231 if (disk->queue)
3232 blk_cleanup_queue(disk->queue);
3233 }
602adf40
YS
3234 put_disk(disk);
3235}
3236
788e2df3
AE
3237static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3238 const char *object_name,
7097f8df 3239 u64 offset, u64 length, void *buf)
788e2df3
AE
3240
3241{
2169238d 3242 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
788e2df3 3243 struct rbd_obj_request *obj_request;
788e2df3
AE
3244 struct page **pages = NULL;
3245 u32 page_count;
1ceae7ef 3246 size_t size;
788e2df3
AE
3247 int ret;
3248
3249 page_count = (u32) calc_pages_for(offset, length);
3250 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3251 if (IS_ERR(pages))
3252 ret = PTR_ERR(pages);
3253
3254 ret = -ENOMEM;
3255 obj_request = rbd_obj_request_create(object_name, offset, length,
36be9a76 3256 OBJ_REQUEST_PAGES);
788e2df3
AE
3257 if (!obj_request)
3258 goto out;
3259
3260 obj_request->pages = pages;
3261 obj_request->page_count = page_count;
3262
430c28c3 3263 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
788e2df3
AE
3264 if (!obj_request->osd_req)
3265 goto out;
3266
c99d2d4a
AE
3267 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3268 offset, length, 0, 0);
406e2c9f 3269 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
a4ce40a9 3270 obj_request->pages,
44cd188d
AE
3271 obj_request->length,
3272 obj_request->offset & ~PAGE_MASK,
3273 false, false);
9d4df01f 3274 rbd_osd_req_format_read(obj_request);
430c28c3 3275
788e2df3
AE
3276 ret = rbd_obj_request_submit(osdc, obj_request);
3277 if (ret)
3278 goto out;
3279 ret = rbd_obj_request_wait(obj_request);
3280 if (ret)
3281 goto out;
3282
3283 ret = obj_request->result;
3284 if (ret < 0)
3285 goto out;
1ceae7ef
AE
3286
3287 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3288 size = (size_t) obj_request->xferred;
903bb32e 3289 ceph_copy_from_page_vector(pages, buf, 0, size);
7097f8df
AE
3290 rbd_assert(size <= (size_t)INT_MAX);
3291 ret = (int)size;
788e2df3
AE
3292out:
3293 if (obj_request)
3294 rbd_obj_request_put(obj_request);
3295 else
3296 ceph_release_page_vector(pages, page_count);
3297
3298 return ret;
3299}
3300
602adf40 3301/*
662518b1
AE
3302 * Read the complete header for the given rbd device. On successful
3303 * return, the rbd_dev->header field will contain up-to-date
3304 * information about the image.
602adf40 3305 */
99a41ebc 3306static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
602adf40 3307{
4156d998 3308 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 3309 u32 snap_count = 0;
4156d998
AE
3310 u64 names_size = 0;
3311 u32 want_count;
3312 int ret;
602adf40 3313
00f1f36f 3314 /*
4156d998
AE
3315 * The complete header will include an array of its 64-bit
3316 * snapshot ids, followed by the names of those snapshots as
3317 * a contiguous block of NUL-terminated strings. Note that
3318 * the number of snapshots could change by the time we read
3319 * it in, in which case we re-read it.
00f1f36f 3320 */
4156d998
AE
3321 do {
3322 size_t size;
3323
3324 kfree(ondisk);
3325
3326 size = sizeof (*ondisk);
3327 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3328 size += names_size;
3329 ondisk = kmalloc(size, GFP_KERNEL);
3330 if (!ondisk)
662518b1 3331 return -ENOMEM;
4156d998 3332
788e2df3 3333 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
7097f8df 3334 0, size, ondisk);
4156d998 3335 if (ret < 0)
662518b1 3336 goto out;
c0cd10db 3337 if ((size_t)ret < size) {
4156d998 3338 ret = -ENXIO;
06ecc6cb
AE
3339 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3340 size, ret);
662518b1 3341 goto out;
4156d998
AE
3342 }
3343 if (!rbd_dev_ondisk_valid(ondisk)) {
3344 ret = -ENXIO;
06ecc6cb 3345 rbd_warn(rbd_dev, "invalid header");
662518b1 3346 goto out;
81e759fb 3347 }
602adf40 3348
4156d998
AE
3349 names_size = le64_to_cpu(ondisk->snap_names_len);
3350 want_count = snap_count;
3351 snap_count = le32_to_cpu(ondisk->snap_count);
3352 } while (snap_count != want_count);
00f1f36f 3353
662518b1
AE
3354 ret = rbd_header_from_disk(rbd_dev, ondisk);
3355out:
4156d998
AE
3356 kfree(ondisk);
3357
3358 return ret;
602adf40
YS
3359}
3360
15228ede
AE
3361/*
3362 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3363 * has disappeared from the (just updated) snapshot context.
3364 */
3365static void rbd_exists_validate(struct rbd_device *rbd_dev)
3366{
3367 u64 snap_id;
3368
3369 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3370 return;
3371
3372 snap_id = rbd_dev->spec->snap_id;
3373 if (snap_id == CEPH_NOSNAP)
3374 return;
3375
3376 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3377 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3378}
3379
9875201e
JD
3380static void rbd_dev_update_size(struct rbd_device *rbd_dev)
3381{
3382 sector_t size;
3383 bool removing;
3384
3385 /*
3386 * Don't hold the lock while doing disk operations,
3387 * or lock ordering will conflict with the bdev mutex via:
3388 * rbd_add() -> blkdev_get() -> rbd_open()
3389 */
3390 spin_lock_irq(&rbd_dev->lock);
3391 removing = test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
3392 spin_unlock_irq(&rbd_dev->lock);
3393 /*
3394 * If the device is being removed, rbd_dev->disk has
3395 * been destroyed, so don't try to update its size
3396 */
3397 if (!removing) {
3398 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3399 dout("setting size to %llu sectors", (unsigned long long)size);
3400 set_capacity(rbd_dev->disk, size);
3401 revalidate_disk(rbd_dev->disk);
3402 }
3403}
3404
cc4a38bd 3405static int rbd_dev_refresh(struct rbd_device *rbd_dev)
1fe5e993 3406{
e627db08 3407 u64 mapping_size;
1fe5e993
AE
3408 int ret;
3409
117973fb 3410 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
cfbf6377 3411 down_write(&rbd_dev->header_rwsem);
3b5cf2a2 3412 mapping_size = rbd_dev->mapping.size;
117973fb 3413 if (rbd_dev->image_format == 1)
99a41ebc 3414 ret = rbd_dev_v1_header_info(rbd_dev);
117973fb 3415 else
2df3fac7 3416 ret = rbd_dev_v2_header_info(rbd_dev);
15228ede
AE
3417
3418 /* If it's a mapped snapshot, validate its EXISTS flag */
3419
3420 rbd_exists_validate(rbd_dev);
cfbf6377
AE
3421 up_write(&rbd_dev->header_rwsem);
3422
00a653e2 3423 if (mapping_size != rbd_dev->mapping.size) {
9875201e 3424 rbd_dev_update_size(rbd_dev);
00a653e2 3425 }
1fe5e993
AE
3426
3427 return ret;
3428}
3429
602adf40
YS
3430static int rbd_init_disk(struct rbd_device *rbd_dev)
3431{
3432 struct gendisk *disk;
3433 struct request_queue *q;
593a9e7b 3434 u64 segment_size;
602adf40 3435
602adf40 3436 /* create gendisk info */
9b60e70b 3437 disk = alloc_disk(1 << RBD_PART_SHIFT);
602adf40 3438 if (!disk)
1fcdb8aa 3439 return -ENOMEM;
602adf40 3440
f0f8cef5 3441 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 3442 rbd_dev->dev_id);
602adf40 3443 disk->major = rbd_dev->major;
dd82fff1 3444 disk->first_minor = rbd_dev->minor;
602adf40
YS
3445 disk->fops = &rbd_bd_ops;
3446 disk->private_data = rbd_dev;
3447
bf0d5f50 3448 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
602adf40
YS
3449 if (!q)
3450 goto out_disk;
029bcbd8 3451
593a9e7b
AE
3452 /* We use the default size, but let's be explicit about it. */
3453 blk_queue_physical_block_size(q, SECTOR_SIZE);
3454
029bcbd8 3455 /* set io sizes to object size */
593a9e7b
AE
3456 segment_size = rbd_obj_bytes(&rbd_dev->header);
3457 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3458 blk_queue_max_segment_size(q, segment_size);
3459 blk_queue_io_min(q, segment_size);
3460 blk_queue_io_opt(q, segment_size);
029bcbd8 3461
602adf40
YS
3462 blk_queue_merge_bvec(q, rbd_merge_bvec);
3463 disk->queue = q;
3464
3465 q->queuedata = rbd_dev;
3466
3467 rbd_dev->disk = disk;
602adf40 3468
602adf40 3469 return 0;
602adf40
YS
3470out_disk:
3471 put_disk(disk);
1fcdb8aa
AE
3472
3473 return -ENOMEM;
602adf40
YS
3474}
3475
dfc5606d
YS
3476/*
3477 sysfs
3478*/
3479
593a9e7b
AE
3480static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3481{
3482 return container_of(dev, struct rbd_device, dev);
3483}
3484
dfc5606d
YS
3485static ssize_t rbd_size_show(struct device *dev,
3486 struct device_attribute *attr, char *buf)
3487{
593a9e7b 3488 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0 3489
fc71d833
AE
3490 return sprintf(buf, "%llu\n",
3491 (unsigned long long)rbd_dev->mapping.size);
dfc5606d
YS
3492}
3493
34b13184
AE
3494/*
3495 * Note this shows the features for whatever's mapped, which is not
3496 * necessarily the base image.
3497 */
3498static ssize_t rbd_features_show(struct device *dev,
3499 struct device_attribute *attr, char *buf)
3500{
3501 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3502
3503 return sprintf(buf, "0x%016llx\n",
fc71d833 3504 (unsigned long long)rbd_dev->mapping.features);
34b13184
AE
3505}
3506
dfc5606d
YS
3507static ssize_t rbd_major_show(struct device *dev,
3508 struct device_attribute *attr, char *buf)
3509{
593a9e7b 3510 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 3511
fc71d833
AE
3512 if (rbd_dev->major)
3513 return sprintf(buf, "%d\n", rbd_dev->major);
3514
3515 return sprintf(buf, "(none)\n");
dd82fff1
ID
3516}
3517
3518static ssize_t rbd_minor_show(struct device *dev,
3519 struct device_attribute *attr, char *buf)
3520{
3521 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
fc71d833 3522
dd82fff1 3523 return sprintf(buf, "%d\n", rbd_dev->minor);
dfc5606d
YS
3524}
3525
3526static ssize_t rbd_client_id_show(struct device *dev,
3527 struct device_attribute *attr, char *buf)
602adf40 3528{
593a9e7b 3529 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3530
1dbb4399
AE
3531 return sprintf(buf, "client%lld\n",
3532 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
3533}
3534
dfc5606d
YS
3535static ssize_t rbd_pool_show(struct device *dev,
3536 struct device_attribute *attr, char *buf)
602adf40 3537{
593a9e7b 3538 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3539
0d7dbfce 3540 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
3541}
3542
9bb2f334
AE
3543static ssize_t rbd_pool_id_show(struct device *dev,
3544 struct device_attribute *attr, char *buf)
3545{
3546 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3547
0d7dbfce 3548 return sprintf(buf, "%llu\n",
fc71d833 3549 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
3550}
3551
dfc5606d
YS
3552static ssize_t rbd_name_show(struct device *dev,
3553 struct device_attribute *attr, char *buf)
3554{
593a9e7b 3555 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3556
a92ffdf8
AE
3557 if (rbd_dev->spec->image_name)
3558 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3559
3560 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
3561}
3562
589d30e0
AE
3563static ssize_t rbd_image_id_show(struct device *dev,
3564 struct device_attribute *attr, char *buf)
3565{
3566 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3567
0d7dbfce 3568 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
3569}
3570
34b13184
AE
3571/*
3572 * Shows the name of the currently-mapped snapshot (or
3573 * RBD_SNAP_HEAD_NAME for the base image).
3574 */
dfc5606d
YS
3575static ssize_t rbd_snap_show(struct device *dev,
3576 struct device_attribute *attr,
3577 char *buf)
3578{
593a9e7b 3579 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3580
0d7dbfce 3581 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
3582}
3583
86b00e0d
AE
3584/*
3585 * For an rbd v2 image, shows the pool id, image id, and snapshot id
3586 * for the parent image. If there is no parent, simply shows
3587 * "(no parent image)".
3588 */
3589static ssize_t rbd_parent_show(struct device *dev,
3590 struct device_attribute *attr,
3591 char *buf)
3592{
3593 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3594 struct rbd_spec *spec = rbd_dev->parent_spec;
3595 int count;
3596 char *bufp = buf;
3597
3598 if (!spec)
3599 return sprintf(buf, "(no parent image)\n");
3600
3601 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3602 (unsigned long long) spec->pool_id, spec->pool_name);
3603 if (count < 0)
3604 return count;
3605 bufp += count;
3606
3607 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3608 spec->image_name ? spec->image_name : "(unknown)");
3609 if (count < 0)
3610 return count;
3611 bufp += count;
3612
3613 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3614 (unsigned long long) spec->snap_id, spec->snap_name);
3615 if (count < 0)
3616 return count;
3617 bufp += count;
3618
3619 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3620 if (count < 0)
3621 return count;
3622 bufp += count;
3623
3624 return (ssize_t) (bufp - buf);
3625}
3626
dfc5606d
YS
3627static ssize_t rbd_image_refresh(struct device *dev,
3628 struct device_attribute *attr,
3629 const char *buf,
3630 size_t size)
3631{
593a9e7b 3632 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 3633 int ret;
602adf40 3634
cc4a38bd 3635 ret = rbd_dev_refresh(rbd_dev);
e627db08
AE
3636 if (ret)
3637 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
b813623a
AE
3638
3639 return ret < 0 ? ret : size;
dfc5606d 3640}
602adf40 3641
dfc5606d 3642static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 3643static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d 3644static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
dd82fff1 3645static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
dfc5606d
YS
3646static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3647static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 3648static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 3649static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 3650static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
3651static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3652static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
86b00e0d 3653static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
3654
3655static struct attribute *rbd_attrs[] = {
3656 &dev_attr_size.attr,
34b13184 3657 &dev_attr_features.attr,
dfc5606d 3658 &dev_attr_major.attr,
dd82fff1 3659 &dev_attr_minor.attr,
dfc5606d
YS
3660 &dev_attr_client_id.attr,
3661 &dev_attr_pool.attr,
9bb2f334 3662 &dev_attr_pool_id.attr,
dfc5606d 3663 &dev_attr_name.attr,
589d30e0 3664 &dev_attr_image_id.attr,
dfc5606d 3665 &dev_attr_current_snap.attr,
86b00e0d 3666 &dev_attr_parent.attr,
dfc5606d 3667 &dev_attr_refresh.attr,
dfc5606d
YS
3668 NULL
3669};
3670
3671static struct attribute_group rbd_attr_group = {
3672 .attrs = rbd_attrs,
3673};
3674
3675static const struct attribute_group *rbd_attr_groups[] = {
3676 &rbd_attr_group,
3677 NULL
3678};
3679
3680static void rbd_sysfs_dev_release(struct device *dev)
3681{
3682}
3683
3684static struct device_type rbd_device_type = {
3685 .name = "rbd",
3686 .groups = rbd_attr_groups,
3687 .release = rbd_sysfs_dev_release,
3688};
3689
8b8fb99c
AE
3690static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3691{
3692 kref_get(&spec->kref);
3693
3694 return spec;
3695}
3696
3697static void rbd_spec_free(struct kref *kref);
3698static void rbd_spec_put(struct rbd_spec *spec)
3699{
3700 if (spec)
3701 kref_put(&spec->kref, rbd_spec_free);
3702}
3703
3704static struct rbd_spec *rbd_spec_alloc(void)
3705{
3706 struct rbd_spec *spec;
3707
3708 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3709 if (!spec)
3710 return NULL;
3711 kref_init(&spec->kref);
3712
8b8fb99c
AE
3713 return spec;
3714}
3715
3716static void rbd_spec_free(struct kref *kref)
3717{
3718 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3719
3720 kfree(spec->pool_name);
3721 kfree(spec->image_id);
3722 kfree(spec->image_name);
3723 kfree(spec->snap_name);
3724 kfree(spec);
3725}
3726
cc344fa1 3727static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
c53d5893
AE
3728 struct rbd_spec *spec)
3729{
3730 struct rbd_device *rbd_dev;
3731
3732 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3733 if (!rbd_dev)
3734 return NULL;
3735
3736 spin_lock_init(&rbd_dev->lock);
6d292906 3737 rbd_dev->flags = 0;
a2acd00e 3738 atomic_set(&rbd_dev->parent_ref, 0);
c53d5893 3739 INIT_LIST_HEAD(&rbd_dev->node);
c53d5893
AE
3740 init_rwsem(&rbd_dev->header_rwsem);
3741
3742 rbd_dev->spec = spec;
3743 rbd_dev->rbd_client = rbdc;
3744
0903e875
AE
3745 /* Initialize the layout used for all rbd requests */
3746
3747 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3748 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3749 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3750 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3751
c53d5893
AE
3752 return rbd_dev;
3753}
3754
3755static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3756{
c53d5893
AE
3757 rbd_put_client(rbd_dev->rbd_client);
3758 rbd_spec_put(rbd_dev->spec);
3759 kfree(rbd_dev);
3760}
3761
9d475de5
AE
3762/*
3763 * Get the size and object order for an image snapshot, or if
3764 * snap_id is CEPH_NOSNAP, gets this information for the base
3765 * image.
3766 */
3767static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3768 u8 *order, u64 *snap_size)
3769{
3770 __le64 snapid = cpu_to_le64(snap_id);
3771 int ret;
3772 struct {
3773 u8 order;
3774 __le64 size;
3775 } __attribute__ ((packed)) size_buf = { 0 };
3776
36be9a76 3777 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
9d475de5 3778 "rbd", "get_size",
4157976b 3779 &snapid, sizeof (snapid),
e2a58ee5 3780 &size_buf, sizeof (size_buf));
36be9a76 3781 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
3782 if (ret < 0)
3783 return ret;
57385b51
AE
3784 if (ret < sizeof (size_buf))
3785 return -ERANGE;
9d475de5 3786
c3545579 3787 if (order) {
c86f86e9 3788 *order = size_buf.order;
c3545579
JD
3789 dout(" order %u", (unsigned int)*order);
3790 }
9d475de5
AE
3791 *snap_size = le64_to_cpu(size_buf.size);
3792
c3545579
JD
3793 dout(" snap_id 0x%016llx snap_size = %llu\n",
3794 (unsigned long long)snap_id,
57385b51 3795 (unsigned long long)*snap_size);
9d475de5
AE
3796
3797 return 0;
3798}
3799
3800static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3801{
3802 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3803 &rbd_dev->header.obj_order,
3804 &rbd_dev->header.image_size);
3805}
3806
1e130199
AE
3807static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3808{
3809 void *reply_buf;
3810 int ret;
3811 void *p;
3812
3813 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3814 if (!reply_buf)
3815 return -ENOMEM;
3816
36be9a76 3817 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 3818 "rbd", "get_object_prefix", NULL, 0,
e2a58ee5 3819 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
36be9a76 3820 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
3821 if (ret < 0)
3822 goto out;
3823
3824 p = reply_buf;
3825 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
57385b51
AE
3826 p + ret, NULL, GFP_NOIO);
3827 ret = 0;
1e130199
AE
3828
3829 if (IS_ERR(rbd_dev->header.object_prefix)) {
3830 ret = PTR_ERR(rbd_dev->header.object_prefix);
3831 rbd_dev->header.object_prefix = NULL;
3832 } else {
3833 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
3834 }
1e130199
AE
3835out:
3836 kfree(reply_buf);
3837
3838 return ret;
3839}
3840
b1b5402a
AE
3841static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3842 u64 *snap_features)
3843{
3844 __le64 snapid = cpu_to_le64(snap_id);
3845 struct {
3846 __le64 features;
3847 __le64 incompat;
4157976b 3848 } __attribute__ ((packed)) features_buf = { 0 };
d889140c 3849 u64 incompat;
b1b5402a
AE
3850 int ret;
3851
36be9a76 3852 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b1b5402a 3853 "rbd", "get_features",
4157976b 3854 &snapid, sizeof (snapid),
e2a58ee5 3855 &features_buf, sizeof (features_buf));
36be9a76 3856 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
3857 if (ret < 0)
3858 return ret;
57385b51
AE
3859 if (ret < sizeof (features_buf))
3860 return -ERANGE;
d889140c
AE
3861
3862 incompat = le64_to_cpu(features_buf.incompat);
5cbf6f12 3863 if (incompat & ~RBD_FEATURES_SUPPORTED)
b8f5c6ed 3864 return -ENXIO;
d889140c 3865
b1b5402a
AE
3866 *snap_features = le64_to_cpu(features_buf.features);
3867
3868 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
57385b51
AE
3869 (unsigned long long)snap_id,
3870 (unsigned long long)*snap_features,
3871 (unsigned long long)le64_to_cpu(features_buf.incompat));
b1b5402a
AE
3872
3873 return 0;
3874}
3875
3876static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3877{
3878 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3879 &rbd_dev->header.features);
3880}
3881
86b00e0d
AE
3882static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3883{
3884 struct rbd_spec *parent_spec;
3885 size_t size;
3886 void *reply_buf = NULL;
3887 __le64 snapid;
3888 void *p;
3889 void *end;
642a2537 3890 u64 pool_id;
86b00e0d 3891 char *image_id;
3b5cf2a2 3892 u64 snap_id;
86b00e0d 3893 u64 overlap;
86b00e0d
AE
3894 int ret;
3895
3896 parent_spec = rbd_spec_alloc();
3897 if (!parent_spec)
3898 return -ENOMEM;
3899
3900 size = sizeof (__le64) + /* pool_id */
3901 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3902 sizeof (__le64) + /* snap_id */
3903 sizeof (__le64); /* overlap */
3904 reply_buf = kmalloc(size, GFP_KERNEL);
3905 if (!reply_buf) {
3906 ret = -ENOMEM;
3907 goto out_err;
3908 }
3909
3910 snapid = cpu_to_le64(CEPH_NOSNAP);
36be9a76 3911 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
86b00e0d 3912 "rbd", "get_parent",
4157976b 3913 &snapid, sizeof (snapid),
e2a58ee5 3914 reply_buf, size);
36be9a76 3915 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
86b00e0d
AE
3916 if (ret < 0)
3917 goto out_err;
3918
86b00e0d 3919 p = reply_buf;
57385b51
AE
3920 end = reply_buf + ret;
3921 ret = -ERANGE;
642a2537 3922 ceph_decode_64_safe(&p, end, pool_id, out_err);
392a9dad
AE
3923 if (pool_id == CEPH_NOPOOL) {
3924 /*
3925 * Either the parent never existed, or we have
3926 * record of it but the image got flattened so it no
3927 * longer has a parent. When the parent of a
3928 * layered image disappears we immediately set the
3929 * overlap to 0. The effect of this is that all new
3930 * requests will be treated as if the image had no
3931 * parent.
3932 */
3933 if (rbd_dev->parent_overlap) {
3934 rbd_dev->parent_overlap = 0;
3935 smp_mb();
3936 rbd_dev_parent_put(rbd_dev);
3937 pr_info("%s: clone image has been flattened\n",
3938 rbd_dev->disk->disk_name);
3939 }
3940
86b00e0d 3941 goto out; /* No parent? No problem. */
392a9dad 3942 }
86b00e0d 3943
0903e875
AE
3944 /* The ceph file layout needs to fit pool id in 32 bits */
3945
3946 ret = -EIO;
642a2537 3947 if (pool_id > (u64)U32_MAX) {
c0cd10db 3948 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
642a2537 3949 (unsigned long long)pool_id, U32_MAX);
57385b51 3950 goto out_err;
c0cd10db 3951 }
0903e875 3952
979ed480 3953 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
3954 if (IS_ERR(image_id)) {
3955 ret = PTR_ERR(image_id);
3956 goto out_err;
3957 }
3b5cf2a2 3958 ceph_decode_64_safe(&p, end, snap_id, out_err);
86b00e0d
AE
3959 ceph_decode_64_safe(&p, end, overlap, out_err);
3960
3b5cf2a2
AE
3961 /*
3962 * The parent won't change (except when the clone is
3963 * flattened, already handled that). So we only need to
3964 * record the parent spec we have not already done so.
3965 */
3966 if (!rbd_dev->parent_spec) {
3967 parent_spec->pool_id = pool_id;
3968 parent_spec->image_id = image_id;
3969 parent_spec->snap_id = snap_id;
70cf49cf
AE
3970 rbd_dev->parent_spec = parent_spec;
3971 parent_spec = NULL; /* rbd_dev now owns this */
3b5cf2a2
AE
3972 }
3973
3974 /*
3975 * We always update the parent overlap. If it's zero we
3976 * treat it specially.
3977 */
3978 rbd_dev->parent_overlap = overlap;
3979 smp_mb();
3980 if (!overlap) {
3981
3982 /* A null parent_spec indicates it's the initial probe */
3983
3984 if (parent_spec) {
3985 /*
3986 * The overlap has become zero, so the clone
3987 * must have been resized down to 0 at some
3988 * point. Treat this the same as a flatten.
3989 */
3990 rbd_dev_parent_put(rbd_dev);
3991 pr_info("%s: clone image now standalone\n",
3992 rbd_dev->disk->disk_name);
3993 } else {
3994 /*
3995 * For the initial probe, if we find the
3996 * overlap is zero we just pretend there was
3997 * no parent image.
3998 */
3999 rbd_warn(rbd_dev, "ignoring parent of "
4000 "clone with overlap 0\n");
4001 }
70cf49cf 4002 }
86b00e0d
AE
4003out:
4004 ret = 0;
4005out_err:
4006 kfree(reply_buf);
4007 rbd_spec_put(parent_spec);
4008
4009 return ret;
4010}
4011
cc070d59
AE
4012static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
4013{
4014 struct {
4015 __le64 stripe_unit;
4016 __le64 stripe_count;
4017 } __attribute__ ((packed)) striping_info_buf = { 0 };
4018 size_t size = sizeof (striping_info_buf);
4019 void *p;
4020 u64 obj_size;
4021 u64 stripe_unit;
4022 u64 stripe_count;
4023 int ret;
4024
4025 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4026 "rbd", "get_stripe_unit_count", NULL, 0,
e2a58ee5 4027 (char *)&striping_info_buf, size);
cc070d59
AE
4028 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4029 if (ret < 0)
4030 return ret;
4031 if (ret < size)
4032 return -ERANGE;
4033
4034 /*
4035 * We don't actually support the "fancy striping" feature
4036 * (STRIPINGV2) yet, but if the striping sizes are the
4037 * defaults the behavior is the same as before. So find
4038 * out, and only fail if the image has non-default values.
4039 */
4040 ret = -EINVAL;
4041 obj_size = (u64)1 << rbd_dev->header.obj_order;
4042 p = &striping_info_buf;
4043 stripe_unit = ceph_decode_64(&p);
4044 if (stripe_unit != obj_size) {
4045 rbd_warn(rbd_dev, "unsupported stripe unit "
4046 "(got %llu want %llu)",
4047 stripe_unit, obj_size);
4048 return -EINVAL;
4049 }
4050 stripe_count = ceph_decode_64(&p);
4051 if (stripe_count != 1) {
4052 rbd_warn(rbd_dev, "unsupported stripe count "
4053 "(got %llu want 1)", stripe_count);
4054 return -EINVAL;
4055 }
500d0c0f
AE
4056 rbd_dev->header.stripe_unit = stripe_unit;
4057 rbd_dev->header.stripe_count = stripe_count;
cc070d59
AE
4058
4059 return 0;
4060}
4061
9e15b77d
AE
4062static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
4063{
4064 size_t image_id_size;
4065 char *image_id;
4066 void *p;
4067 void *end;
4068 size_t size;
4069 void *reply_buf = NULL;
4070 size_t len = 0;
4071 char *image_name = NULL;
4072 int ret;
4073
4074 rbd_assert(!rbd_dev->spec->image_name);
4075
69e7a02f
AE
4076 len = strlen(rbd_dev->spec->image_id);
4077 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
4078 image_id = kmalloc(image_id_size, GFP_KERNEL);
4079 if (!image_id)
4080 return NULL;
4081
4082 p = image_id;
4157976b 4083 end = image_id + image_id_size;
57385b51 4084 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
9e15b77d
AE
4085
4086 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
4087 reply_buf = kmalloc(size, GFP_KERNEL);
4088 if (!reply_buf)
4089 goto out;
4090
36be9a76 4091 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
9e15b77d
AE
4092 "rbd", "dir_get_name",
4093 image_id, image_id_size,
e2a58ee5 4094 reply_buf, size);
9e15b77d
AE
4095 if (ret < 0)
4096 goto out;
4097 p = reply_buf;
f40eb349
AE
4098 end = reply_buf + ret;
4099
9e15b77d
AE
4100 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
4101 if (IS_ERR(image_name))
4102 image_name = NULL;
4103 else
4104 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
4105out:
4106 kfree(reply_buf);
4107 kfree(image_id);
4108
4109 return image_name;
4110}
4111
2ad3d716
AE
4112static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4113{
4114 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4115 const char *snap_name;
4116 u32 which = 0;
4117
4118 /* Skip over names until we find the one we are looking for */
4119
4120 snap_name = rbd_dev->header.snap_names;
4121 while (which < snapc->num_snaps) {
4122 if (!strcmp(name, snap_name))
4123 return snapc->snaps[which];
4124 snap_name += strlen(snap_name) + 1;
4125 which++;
4126 }
4127 return CEPH_NOSNAP;
4128}
4129
4130static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4131{
4132 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4133 u32 which;
4134 bool found = false;
4135 u64 snap_id;
4136
4137 for (which = 0; !found && which < snapc->num_snaps; which++) {
4138 const char *snap_name;
4139
4140 snap_id = snapc->snaps[which];
4141 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
efadc98a
JD
4142 if (IS_ERR(snap_name)) {
4143 /* ignore no-longer existing snapshots */
4144 if (PTR_ERR(snap_name) == -ENOENT)
4145 continue;
4146 else
4147 break;
4148 }
2ad3d716
AE
4149 found = !strcmp(name, snap_name);
4150 kfree(snap_name);
4151 }
4152 return found ? snap_id : CEPH_NOSNAP;
4153}
4154
4155/*
4156 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
4157 * no snapshot by that name is found, or if an error occurs.
4158 */
4159static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4160{
4161 if (rbd_dev->image_format == 1)
4162 return rbd_v1_snap_id_by_name(rbd_dev, name);
4163
4164 return rbd_v2_snap_id_by_name(rbd_dev, name);
4165}
4166
9e15b77d 4167/*
2e9f7f1c
AE
4168 * When an rbd image has a parent image, it is identified by the
4169 * pool, image, and snapshot ids (not names). This function fills
4170 * in the names for those ids. (It's OK if we can't figure out the
4171 * name for an image id, but the pool and snapshot ids should always
4172 * exist and have names.) All names in an rbd spec are dynamically
4173 * allocated.
e1d4213f
AE
4174 *
4175 * When an image being mapped (not a parent) is probed, we have the
4176 * pool name and pool id, image name and image id, and the snapshot
4177 * name. The only thing we're missing is the snapshot id.
9e15b77d 4178 */
2e9f7f1c 4179static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
9e15b77d 4180{
2e9f7f1c
AE
4181 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4182 struct rbd_spec *spec = rbd_dev->spec;
4183 const char *pool_name;
4184 const char *image_name;
4185 const char *snap_name;
9e15b77d
AE
4186 int ret;
4187
e1d4213f
AE
4188 /*
4189 * An image being mapped will have the pool name (etc.), but
4190 * we need to look up the snapshot id.
4191 */
2e9f7f1c
AE
4192 if (spec->pool_name) {
4193 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
2ad3d716 4194 u64 snap_id;
e1d4213f 4195
2ad3d716
AE
4196 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
4197 if (snap_id == CEPH_NOSNAP)
e1d4213f 4198 return -ENOENT;
2ad3d716 4199 spec->snap_id = snap_id;
e1d4213f 4200 } else {
2e9f7f1c 4201 spec->snap_id = CEPH_NOSNAP;
e1d4213f
AE
4202 }
4203
4204 return 0;
4205 }
9e15b77d 4206
2e9f7f1c 4207 /* Get the pool name; we have to make our own copy of this */
9e15b77d 4208
2e9f7f1c
AE
4209 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
4210 if (!pool_name) {
4211 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
935dc89f
AE
4212 return -EIO;
4213 }
2e9f7f1c
AE
4214 pool_name = kstrdup(pool_name, GFP_KERNEL);
4215 if (!pool_name)
9e15b77d
AE
4216 return -ENOMEM;
4217
4218 /* Fetch the image name; tolerate failure here */
4219
2e9f7f1c
AE
4220 image_name = rbd_dev_image_name(rbd_dev);
4221 if (!image_name)
06ecc6cb 4222 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d 4223
2e9f7f1c 4224 /* Look up the snapshot name, and make a copy */
9e15b77d 4225
2e9f7f1c 4226 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
da6a6b63
JD
4227 if (IS_ERR(snap_name)) {
4228 ret = PTR_ERR(snap_name);
9e15b77d 4229 goto out_err;
2e9f7f1c
AE
4230 }
4231
4232 spec->pool_name = pool_name;
4233 spec->image_name = image_name;
4234 spec->snap_name = snap_name;
9e15b77d
AE
4235
4236 return 0;
4237out_err:
2e9f7f1c
AE
4238 kfree(image_name);
4239 kfree(pool_name);
9e15b77d
AE
4240
4241 return ret;
4242}
4243
cc4a38bd 4244static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
35d489f9
AE
4245{
4246 size_t size;
4247 int ret;
4248 void *reply_buf;
4249 void *p;
4250 void *end;
4251 u64 seq;
4252 u32 snap_count;
4253 struct ceph_snap_context *snapc;
4254 u32 i;
4255
4256 /*
4257 * We'll need room for the seq value (maximum snapshot id),
4258 * snapshot count, and array of that many snapshot ids.
4259 * For now we have a fixed upper limit on the number we're
4260 * prepared to receive.
4261 */
4262 size = sizeof (__le64) + sizeof (__le32) +
4263 RBD_MAX_SNAP_COUNT * sizeof (__le64);
4264 reply_buf = kzalloc(size, GFP_KERNEL);
4265 if (!reply_buf)
4266 return -ENOMEM;
4267
36be9a76 4268 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 4269 "rbd", "get_snapcontext", NULL, 0,
e2a58ee5 4270 reply_buf, size);
36be9a76 4271 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
4272 if (ret < 0)
4273 goto out;
4274
35d489f9 4275 p = reply_buf;
57385b51
AE
4276 end = reply_buf + ret;
4277 ret = -ERANGE;
35d489f9
AE
4278 ceph_decode_64_safe(&p, end, seq, out);
4279 ceph_decode_32_safe(&p, end, snap_count, out);
4280
4281 /*
4282 * Make sure the reported number of snapshot ids wouldn't go
4283 * beyond the end of our buffer. But before checking that,
4284 * make sure the computed size of the snapshot context we
4285 * allocate is representable in a size_t.
4286 */
4287 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
4288 / sizeof (u64)) {
4289 ret = -EINVAL;
4290 goto out;
4291 }
4292 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
4293 goto out;
468521c1 4294 ret = 0;
35d489f9 4295
812164f8 4296 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
35d489f9
AE
4297 if (!snapc) {
4298 ret = -ENOMEM;
4299 goto out;
4300 }
35d489f9 4301 snapc->seq = seq;
35d489f9
AE
4302 for (i = 0; i < snap_count; i++)
4303 snapc->snaps[i] = ceph_decode_64(&p);
4304
49ece554 4305 ceph_put_snap_context(rbd_dev->header.snapc);
35d489f9
AE
4306 rbd_dev->header.snapc = snapc;
4307
4308 dout(" snap context seq = %llu, snap_count = %u\n",
57385b51 4309 (unsigned long long)seq, (unsigned int)snap_count);
35d489f9
AE
4310out:
4311 kfree(reply_buf);
4312
57385b51 4313 return ret;
35d489f9
AE
4314}
4315
54cac61f
AE
4316static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4317 u64 snap_id)
b8b1e2db
AE
4318{
4319 size_t size;
4320 void *reply_buf;
54cac61f 4321 __le64 snapid;
b8b1e2db
AE
4322 int ret;
4323 void *p;
4324 void *end;
b8b1e2db
AE
4325 char *snap_name;
4326
4327 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4328 reply_buf = kmalloc(size, GFP_KERNEL);
4329 if (!reply_buf)
4330 return ERR_PTR(-ENOMEM);
4331
54cac61f 4332 snapid = cpu_to_le64(snap_id);
36be9a76 4333 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b8b1e2db 4334 "rbd", "get_snapshot_name",
54cac61f 4335 &snapid, sizeof (snapid),
e2a58ee5 4336 reply_buf, size);
36be9a76 4337 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
f40eb349
AE
4338 if (ret < 0) {
4339 snap_name = ERR_PTR(ret);
b8b1e2db 4340 goto out;
f40eb349 4341 }
b8b1e2db
AE
4342
4343 p = reply_buf;
f40eb349 4344 end = reply_buf + ret;
e5c35534 4345 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
f40eb349 4346 if (IS_ERR(snap_name))
b8b1e2db 4347 goto out;
b8b1e2db 4348
f40eb349 4349 dout(" snap_id 0x%016llx snap_name = %s\n",
54cac61f 4350 (unsigned long long)snap_id, snap_name);
b8b1e2db
AE
4351out:
4352 kfree(reply_buf);
4353
f40eb349 4354 return snap_name;
b8b1e2db
AE
4355}
4356
2df3fac7 4357static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
117973fb 4358{
2df3fac7 4359 bool first_time = rbd_dev->header.object_prefix == NULL;
117973fb 4360 int ret;
117973fb 4361
1617e40c
JD
4362 ret = rbd_dev_v2_image_size(rbd_dev);
4363 if (ret)
cfbf6377 4364 return ret;
1617e40c 4365
2df3fac7
AE
4366 if (first_time) {
4367 ret = rbd_dev_v2_header_onetime(rbd_dev);
4368 if (ret)
cfbf6377 4369 return ret;
2df3fac7
AE
4370 }
4371
642a2537
AE
4372 /*
4373 * If the image supports layering, get the parent info. We
4374 * need to probe the first time regardless. Thereafter we
4375 * only need to if there's a parent, to see if it has
4376 * disappeared due to the mapped image getting flattened.
4377 */
4378 if (rbd_dev->header.features & RBD_FEATURE_LAYERING &&
4379 (first_time || rbd_dev->parent_spec)) {
4380 bool warn;
4381
4382 ret = rbd_dev_v2_parent_info(rbd_dev);
4383 if (ret)
cfbf6377 4384 return ret;
642a2537
AE
4385
4386 /*
4387 * Print a warning if this is the initial probe and
4388 * the image has a parent. Don't print it if the
4389 * image now being probed is itself a parent. We
4390 * can tell at this point because we won't know its
4391 * pool name yet (just its pool id).
4392 */
4393 warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name;
4394 if (first_time && warn)
4395 rbd_warn(rbd_dev, "WARNING: kernel layering "
4396 "is EXPERIMENTAL!");
4397 }
4398
29334ba4
AE
4399 if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4400 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4401 rbd_dev->mapping.size = rbd_dev->header.image_size;
117973fb 4402
cc4a38bd 4403 ret = rbd_dev_v2_snap_context(rbd_dev);
117973fb 4404 dout("rbd_dev_v2_snap_context returned %d\n", ret);
117973fb
AE
4405
4406 return ret;
4407}
4408
dfc5606d
YS
4409static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4410{
dfc5606d 4411 struct device *dev;
cd789ab9 4412 int ret;
dfc5606d 4413
cd789ab9 4414 dev = &rbd_dev->dev;
dfc5606d
YS
4415 dev->bus = &rbd_bus_type;
4416 dev->type = &rbd_device_type;
4417 dev->parent = &rbd_root_dev;
200a6a8b 4418 dev->release = rbd_dev_device_release;
de71a297 4419 dev_set_name(dev, "%d", rbd_dev->dev_id);
dfc5606d 4420 ret = device_register(dev);
dfc5606d 4421
dfc5606d 4422 return ret;
602adf40
YS
4423}
4424
dfc5606d
YS
4425static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4426{
4427 device_unregister(&rbd_dev->dev);
4428}
4429
1ddbe94e 4430/*
499afd5b 4431 * Get a unique rbd identifier for the given new rbd_dev, and add
f8a22fc2 4432 * the rbd_dev to the global list.
1ddbe94e 4433 */
f8a22fc2 4434static int rbd_dev_id_get(struct rbd_device *rbd_dev)
b7f23c36 4435{
f8a22fc2
ID
4436 int new_dev_id;
4437
9b60e70b
ID
4438 new_dev_id = ida_simple_get(&rbd_dev_id_ida,
4439 0, minor_to_rbd_dev_id(1 << MINORBITS),
4440 GFP_KERNEL);
f8a22fc2
ID
4441 if (new_dev_id < 0)
4442 return new_dev_id;
4443
4444 rbd_dev->dev_id = new_dev_id;
499afd5b
AE
4445
4446 spin_lock(&rbd_dev_list_lock);
4447 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4448 spin_unlock(&rbd_dev_list_lock);
f8a22fc2 4449
70eebd20 4450 dout("rbd_dev %p given dev id %d\n", rbd_dev, rbd_dev->dev_id);
f8a22fc2
ID
4451
4452 return 0;
1ddbe94e 4453}
b7f23c36 4454
1ddbe94e 4455/*
499afd5b
AE
4456 * Remove an rbd_dev from the global list, and record that its
4457 * identifier is no longer in use.
1ddbe94e 4458 */
e2839308 4459static void rbd_dev_id_put(struct rbd_device *rbd_dev)
1ddbe94e 4460{
499afd5b
AE
4461 spin_lock(&rbd_dev_list_lock);
4462 list_del_init(&rbd_dev->node);
4463 spin_unlock(&rbd_dev_list_lock);
b7f23c36 4464
f8a22fc2
ID
4465 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4466
4467 dout("rbd_dev %p released dev id %d\n", rbd_dev, rbd_dev->dev_id);
b7f23c36
AE
4468}
4469
e28fff26
AE
4470/*
4471 * Skips over white space at *buf, and updates *buf to point to the
4472 * first found non-space character (if any). Returns the length of
593a9e7b
AE
4473 * the token (string of non-white space characters) found. Note
4474 * that *buf must be terminated with '\0'.
e28fff26
AE
4475 */
4476static inline size_t next_token(const char **buf)
4477{
4478 /*
4479 * These are the characters that produce nonzero for
4480 * isspace() in the "C" and "POSIX" locales.
4481 */
4482 const char *spaces = " \f\n\r\t\v";
4483
4484 *buf += strspn(*buf, spaces); /* Find start of token */
4485
4486 return strcspn(*buf, spaces); /* Return token length */
4487}
4488
4489/*
4490 * Finds the next token in *buf, and if the provided token buffer is
4491 * big enough, copies the found token into it. The result, if
593a9e7b
AE
4492 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4493 * must be terminated with '\0' on entry.
e28fff26
AE
4494 *
4495 * Returns the length of the token found (not including the '\0').
4496 * Return value will be 0 if no token is found, and it will be >=
4497 * token_size if the token would not fit.
4498 *
593a9e7b 4499 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
4500 * found token. Note that this occurs even if the token buffer is
4501 * too small to hold it.
4502 */
4503static inline size_t copy_token(const char **buf,
4504 char *token,
4505 size_t token_size)
4506{
4507 size_t len;
4508
4509 len = next_token(buf);
4510 if (len < token_size) {
4511 memcpy(token, *buf, len);
4512 *(token + len) = '\0';
4513 }
4514 *buf += len;
4515
4516 return len;
4517}
4518
ea3352f4
AE
4519/*
4520 * Finds the next token in *buf, dynamically allocates a buffer big
4521 * enough to hold a copy of it, and copies the token into the new
4522 * buffer. The copy is guaranteed to be terminated with '\0'. Note
4523 * that a duplicate buffer is created even for a zero-length token.
4524 *
4525 * Returns a pointer to the newly-allocated duplicate, or a null
4526 * pointer if memory for the duplicate was not available. If
4527 * the lenp argument is a non-null pointer, the length of the token
4528 * (not including the '\0') is returned in *lenp.
4529 *
4530 * If successful, the *buf pointer will be updated to point beyond
4531 * the end of the found token.
4532 *
4533 * Note: uses GFP_KERNEL for allocation.
4534 */
4535static inline char *dup_token(const char **buf, size_t *lenp)
4536{
4537 char *dup;
4538 size_t len;
4539
4540 len = next_token(buf);
4caf35f9 4541 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
4542 if (!dup)
4543 return NULL;
ea3352f4
AE
4544 *(dup + len) = '\0';
4545 *buf += len;
4546
4547 if (lenp)
4548 *lenp = len;
4549
4550 return dup;
4551}
4552
a725f65e 4553/*
859c31df
AE
4554 * Parse the options provided for an "rbd add" (i.e., rbd image
4555 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4556 * and the data written is passed here via a NUL-terminated buffer.
4557 * Returns 0 if successful or an error code otherwise.
d22f76e7 4558 *
859c31df
AE
4559 * The information extracted from these options is recorded in
4560 * the other parameters which return dynamically-allocated
4561 * structures:
4562 * ceph_opts
4563 * The address of a pointer that will refer to a ceph options
4564 * structure. Caller must release the returned pointer using
4565 * ceph_destroy_options() when it is no longer needed.
4566 * rbd_opts
4567 * Address of an rbd options pointer. Fully initialized by
4568 * this function; caller must release with kfree().
4569 * spec
4570 * Address of an rbd image specification pointer. Fully
4571 * initialized by this function based on parsed options.
4572 * Caller must release with rbd_spec_put().
4573 *
4574 * The options passed take this form:
4575 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4576 * where:
4577 * <mon_addrs>
4578 * A comma-separated list of one or more monitor addresses.
4579 * A monitor address is an ip address, optionally followed
4580 * by a port number (separated by a colon).
4581 * I.e.: ip1[:port1][,ip2[:port2]...]
4582 * <options>
4583 * A comma-separated list of ceph and/or rbd options.
4584 * <pool_name>
4585 * The name of the rados pool containing the rbd image.
4586 * <image_name>
4587 * The name of the image in that pool to map.
4588 * <snap_id>
4589 * An optional snapshot id. If provided, the mapping will
4590 * present data from the image at the time that snapshot was
4591 * created. The image head is used if no snapshot id is
4592 * provided. Snapshot mappings are always read-only.
a725f65e 4593 */
859c31df 4594static int rbd_add_parse_args(const char *buf,
dc79b113 4595 struct ceph_options **ceph_opts,
859c31df
AE
4596 struct rbd_options **opts,
4597 struct rbd_spec **rbd_spec)
e28fff26 4598{
d22f76e7 4599 size_t len;
859c31df 4600 char *options;
0ddebc0c 4601 const char *mon_addrs;
ecb4dc22 4602 char *snap_name;
0ddebc0c 4603 size_t mon_addrs_size;
859c31df 4604 struct rbd_spec *spec = NULL;
4e9afeba 4605 struct rbd_options *rbd_opts = NULL;
859c31df 4606 struct ceph_options *copts;
dc79b113 4607 int ret;
e28fff26
AE
4608
4609 /* The first four tokens are required */
4610
7ef3214a 4611 len = next_token(&buf);
4fb5d671
AE
4612 if (!len) {
4613 rbd_warn(NULL, "no monitor address(es) provided");
4614 return -EINVAL;
4615 }
0ddebc0c 4616 mon_addrs = buf;
f28e565a 4617 mon_addrs_size = len + 1;
7ef3214a 4618 buf += len;
a725f65e 4619
dc79b113 4620 ret = -EINVAL;
f28e565a
AE
4621 options = dup_token(&buf, NULL);
4622 if (!options)
dc79b113 4623 return -ENOMEM;
4fb5d671
AE
4624 if (!*options) {
4625 rbd_warn(NULL, "no options provided");
4626 goto out_err;
4627 }
e28fff26 4628
859c31df
AE
4629 spec = rbd_spec_alloc();
4630 if (!spec)
f28e565a 4631 goto out_mem;
859c31df
AE
4632
4633 spec->pool_name = dup_token(&buf, NULL);
4634 if (!spec->pool_name)
4635 goto out_mem;
4fb5d671
AE
4636 if (!*spec->pool_name) {
4637 rbd_warn(NULL, "no pool name provided");
4638 goto out_err;
4639 }
e28fff26 4640
69e7a02f 4641 spec->image_name = dup_token(&buf, NULL);
859c31df 4642 if (!spec->image_name)
f28e565a 4643 goto out_mem;
4fb5d671
AE
4644 if (!*spec->image_name) {
4645 rbd_warn(NULL, "no image name provided");
4646 goto out_err;
4647 }
d4b125e9 4648
f28e565a
AE
4649 /*
4650 * Snapshot name is optional; default is to use "-"
4651 * (indicating the head/no snapshot).
4652 */
3feeb894 4653 len = next_token(&buf);
820a5f3e 4654 if (!len) {
3feeb894
AE
4655 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4656 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 4657 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 4658 ret = -ENAMETOOLONG;
f28e565a 4659 goto out_err;
849b4260 4660 }
ecb4dc22
AE
4661 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4662 if (!snap_name)
f28e565a 4663 goto out_mem;
ecb4dc22
AE
4664 *(snap_name + len) = '\0';
4665 spec->snap_name = snap_name;
e5c35534 4666
0ddebc0c 4667 /* Initialize all rbd options to the defaults */
e28fff26 4668
4e9afeba
AE
4669 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4670 if (!rbd_opts)
4671 goto out_mem;
4672
4673 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
d22f76e7 4674
859c31df 4675 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 4676 mon_addrs + mon_addrs_size - 1,
4e9afeba 4677 parse_rbd_opts_token, rbd_opts);
859c31df
AE
4678 if (IS_ERR(copts)) {
4679 ret = PTR_ERR(copts);
dc79b113
AE
4680 goto out_err;
4681 }
859c31df
AE
4682 kfree(options);
4683
4684 *ceph_opts = copts;
4e9afeba 4685 *opts = rbd_opts;
859c31df 4686 *rbd_spec = spec;
0ddebc0c 4687
dc79b113 4688 return 0;
f28e565a 4689out_mem:
dc79b113 4690 ret = -ENOMEM;
d22f76e7 4691out_err:
859c31df
AE
4692 kfree(rbd_opts);
4693 rbd_spec_put(spec);
f28e565a 4694 kfree(options);
d22f76e7 4695
dc79b113 4696 return ret;
a725f65e
AE
4697}
4698
589d30e0
AE
4699/*
4700 * An rbd format 2 image has a unique identifier, distinct from the
4701 * name given to it by the user. Internally, that identifier is
4702 * what's used to specify the names of objects related to the image.
4703 *
4704 * A special "rbd id" object is used to map an rbd image name to its
4705 * id. If that object doesn't exist, then there is no v2 rbd image
4706 * with the supplied name.
4707 *
4708 * This function will record the given rbd_dev's image_id field if
4709 * it can be determined, and in that case will return 0. If any
4710 * errors occur a negative errno will be returned and the rbd_dev's
4711 * image_id field will be unchanged (and should be NULL).
4712 */
4713static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4714{
4715 int ret;
4716 size_t size;
4717 char *object_name;
4718 void *response;
c0fba368 4719 char *image_id;
2f82ee54 4720
2c0d0a10
AE
4721 /*
4722 * When probing a parent image, the image id is already
4723 * known (and the image name likely is not). There's no
c0fba368
AE
4724 * need to fetch the image id again in this case. We
4725 * do still need to set the image format though.
2c0d0a10 4726 */
c0fba368
AE
4727 if (rbd_dev->spec->image_id) {
4728 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4729
2c0d0a10 4730 return 0;
c0fba368 4731 }
2c0d0a10 4732
589d30e0
AE
4733 /*
4734 * First, see if the format 2 image id file exists, and if
4735 * so, get the image's persistent id from it.
4736 */
69e7a02f 4737 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
589d30e0
AE
4738 object_name = kmalloc(size, GFP_NOIO);
4739 if (!object_name)
4740 return -ENOMEM;
0d7dbfce 4741 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
589d30e0
AE
4742 dout("rbd id object name is %s\n", object_name);
4743
4744 /* Response will be an encoded string, which includes a length */
4745
4746 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4747 response = kzalloc(size, GFP_NOIO);
4748 if (!response) {
4749 ret = -ENOMEM;
4750 goto out;
4751 }
4752
c0fba368
AE
4753 /* If it doesn't exist we'll assume it's a format 1 image */
4754
36be9a76 4755 ret = rbd_obj_method_sync(rbd_dev, object_name,
4157976b 4756 "rbd", "get_id", NULL, 0,
e2a58ee5 4757 response, RBD_IMAGE_ID_LEN_MAX);
36be9a76 4758 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
c0fba368
AE
4759 if (ret == -ENOENT) {
4760 image_id = kstrdup("", GFP_KERNEL);
4761 ret = image_id ? 0 : -ENOMEM;
4762 if (!ret)
4763 rbd_dev->image_format = 1;
4764 } else if (ret > sizeof (__le32)) {
4765 void *p = response;
4766
4767 image_id = ceph_extract_encoded_string(&p, p + ret,
979ed480 4768 NULL, GFP_NOIO);
c0fba368
AE
4769 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4770 if (!ret)
4771 rbd_dev->image_format = 2;
589d30e0 4772 } else {
c0fba368
AE
4773 ret = -EINVAL;
4774 }
4775
4776 if (!ret) {
4777 rbd_dev->spec->image_id = image_id;
4778 dout("image_id is %s\n", image_id);
589d30e0
AE
4779 }
4780out:
4781 kfree(response);
4782 kfree(object_name);
4783
4784 return ret;
4785}
4786
3abef3b3
AE
4787/*
4788 * Undo whatever state changes are made by v1 or v2 header info
4789 * call.
4790 */
6fd48b3b
AE
4791static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4792{
4793 struct rbd_image_header *header;
4794
392a9dad
AE
4795 /* Drop parent reference unless it's already been done (or none) */
4796
4797 if (rbd_dev->parent_overlap)
4798 rbd_dev_parent_put(rbd_dev);
6fd48b3b
AE
4799
4800 /* Free dynamic fields from the header, then zero it out */
4801
4802 header = &rbd_dev->header;
812164f8 4803 ceph_put_snap_context(header->snapc);
6fd48b3b
AE
4804 kfree(header->snap_sizes);
4805 kfree(header->snap_names);
4806 kfree(header->object_prefix);
4807 memset(header, 0, sizeof (*header));
4808}
4809
2df3fac7 4810static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
a30b71b9
AE
4811{
4812 int ret;
a30b71b9 4813
1e130199 4814 ret = rbd_dev_v2_object_prefix(rbd_dev);
57385b51 4815 if (ret)
b1b5402a
AE
4816 goto out_err;
4817
2df3fac7
AE
4818 /*
4819 * Get the and check features for the image. Currently the
4820 * features are assumed to never change.
4821 */
b1b5402a 4822 ret = rbd_dev_v2_features(rbd_dev);
57385b51 4823 if (ret)
9d475de5 4824 goto out_err;
35d489f9 4825
cc070d59
AE
4826 /* If the image supports fancy striping, get its parameters */
4827
4828 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4829 ret = rbd_dev_v2_striping_info(rbd_dev);
4830 if (ret < 0)
4831 goto out_err;
4832 }
2df3fac7 4833 /* No support for crypto and compression type format 2 images */
a30b71b9 4834
35152979 4835 return 0;
9d475de5 4836out_err:
642a2537 4837 rbd_dev->header.features = 0;
1e130199
AE
4838 kfree(rbd_dev->header.object_prefix);
4839 rbd_dev->header.object_prefix = NULL;
9d475de5
AE
4840
4841 return ret;
a30b71b9
AE
4842}
4843
124afba2 4844static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
83a06263 4845{
2f82ee54 4846 struct rbd_device *parent = NULL;
124afba2
AE
4847 struct rbd_spec *parent_spec;
4848 struct rbd_client *rbdc;
4849 int ret;
4850
4851 if (!rbd_dev->parent_spec)
4852 return 0;
4853 /*
4854 * We need to pass a reference to the client and the parent
4855 * spec when creating the parent rbd_dev. Images related by
4856 * parent/child relationships always share both.
4857 */
4858 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4859 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4860
4861 ret = -ENOMEM;
4862 parent = rbd_dev_create(rbdc, parent_spec);
4863 if (!parent)
4864 goto out_err;
4865
1f3ef788 4866 ret = rbd_dev_image_probe(parent, false);
124afba2
AE
4867 if (ret < 0)
4868 goto out_err;
4869 rbd_dev->parent = parent;
a2acd00e 4870 atomic_set(&rbd_dev->parent_ref, 1);
124afba2
AE
4871
4872 return 0;
4873out_err:
4874 if (parent) {
fb65d228 4875 rbd_dev_unparent(rbd_dev);
124afba2
AE
4876 kfree(rbd_dev->header_name);
4877 rbd_dev_destroy(parent);
4878 } else {
4879 rbd_put_client(rbdc);
4880 rbd_spec_put(parent_spec);
4881 }
4882
4883 return ret;
4884}
4885
200a6a8b 4886static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
124afba2 4887{
83a06263 4888 int ret;
d1cf5788 4889
f8a22fc2
ID
4890 /* Get an id and fill in device name. */
4891
4892 ret = rbd_dev_id_get(rbd_dev);
4893 if (ret)
4894 return ret;
83a06263 4895
83a06263
AE
4896 BUILD_BUG_ON(DEV_NAME_LEN
4897 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4898 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4899
9b60e70b 4900 /* Record our major and minor device numbers. */
83a06263 4901
9b60e70b
ID
4902 if (!single_major) {
4903 ret = register_blkdev(0, rbd_dev->name);
4904 if (ret < 0)
4905 goto err_out_id;
4906
4907 rbd_dev->major = ret;
4908 rbd_dev->minor = 0;
4909 } else {
4910 rbd_dev->major = rbd_major;
4911 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
4912 }
83a06263
AE
4913
4914 /* Set up the blkdev mapping. */
4915
4916 ret = rbd_init_disk(rbd_dev);
4917 if (ret)
4918 goto err_out_blkdev;
4919
f35a4dee 4920 ret = rbd_dev_mapping_set(rbd_dev);
83a06263
AE
4921 if (ret)
4922 goto err_out_disk;
f35a4dee
AE
4923 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4924
4925 ret = rbd_bus_add_dev(rbd_dev);
4926 if (ret)
4927 goto err_out_mapping;
83a06263 4928
83a06263
AE
4929 /* Everything's ready. Announce the disk to the world. */
4930
129b79d4 4931 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
83a06263
AE
4932 add_disk(rbd_dev->disk);
4933
4934 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4935 (unsigned long long) rbd_dev->mapping.size);
4936
4937 return ret;
2f82ee54 4938
f35a4dee
AE
4939err_out_mapping:
4940 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
4941err_out_disk:
4942 rbd_free_disk(rbd_dev);
4943err_out_blkdev:
9b60e70b
ID
4944 if (!single_major)
4945 unregister_blkdev(rbd_dev->major, rbd_dev->name);
83a06263
AE
4946err_out_id:
4947 rbd_dev_id_put(rbd_dev);
d1cf5788 4948 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
4949
4950 return ret;
4951}
4952
332bb12d
AE
4953static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4954{
4955 struct rbd_spec *spec = rbd_dev->spec;
4956 size_t size;
4957
4958 /* Record the header object name for this rbd image. */
4959
4960 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4961
4962 if (rbd_dev->image_format == 1)
4963 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4964 else
4965 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4966
4967 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4968 if (!rbd_dev->header_name)
4969 return -ENOMEM;
4970
4971 if (rbd_dev->image_format == 1)
4972 sprintf(rbd_dev->header_name, "%s%s",
4973 spec->image_name, RBD_SUFFIX);
4974 else
4975 sprintf(rbd_dev->header_name, "%s%s",
4976 RBD_HEADER_PREFIX, spec->image_id);
4977 return 0;
4978}
4979
200a6a8b
AE
4980static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4981{
6fd48b3b 4982 rbd_dev_unprobe(rbd_dev);
200a6a8b 4983 kfree(rbd_dev->header_name);
6fd48b3b
AE
4984 rbd_dev->header_name = NULL;
4985 rbd_dev->image_format = 0;
4986 kfree(rbd_dev->spec->image_id);
4987 rbd_dev->spec->image_id = NULL;
4988
200a6a8b
AE
4989 rbd_dev_destroy(rbd_dev);
4990}
4991
a30b71b9
AE
4992/*
4993 * Probe for the existence of the header object for the given rbd
1f3ef788
AE
4994 * device. If this image is the one being mapped (i.e., not a
4995 * parent), initiate a watch on its header object before using that
4996 * object to get detailed information about the rbd image.
a30b71b9 4997 */
1f3ef788 4998static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
a30b71b9
AE
4999{
5000 int ret;
b644de2b 5001 int tmp;
a30b71b9
AE
5002
5003 /*
3abef3b3
AE
5004 * Get the id from the image id object. Unless there's an
5005 * error, rbd_dev->spec->image_id will be filled in with
5006 * a dynamically-allocated string, and rbd_dev->image_format
5007 * will be set to either 1 or 2.
a30b71b9
AE
5008 */
5009 ret = rbd_dev_image_id(rbd_dev);
5010 if (ret)
c0fba368
AE
5011 return ret;
5012 rbd_assert(rbd_dev->spec->image_id);
5013 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5014
332bb12d
AE
5015 ret = rbd_dev_header_name(rbd_dev);
5016 if (ret)
5017 goto err_out_format;
5018
1f3ef788
AE
5019 if (mapping) {
5020 ret = rbd_dev_header_watch_sync(rbd_dev, true);
5021 if (ret)
5022 goto out_header_name;
5023 }
b644de2b 5024
c0fba368 5025 if (rbd_dev->image_format == 1)
99a41ebc 5026 ret = rbd_dev_v1_header_info(rbd_dev);
a30b71b9 5027 else
2df3fac7 5028 ret = rbd_dev_v2_header_info(rbd_dev);
5655c4d9 5029 if (ret)
b644de2b 5030 goto err_out_watch;
83a06263 5031
9bb81c9b
AE
5032 ret = rbd_dev_spec_update(rbd_dev);
5033 if (ret)
33dca39f 5034 goto err_out_probe;
9bb81c9b
AE
5035
5036 ret = rbd_dev_probe_parent(rbd_dev);
30d60ba2
AE
5037 if (ret)
5038 goto err_out_probe;
5039
5040 dout("discovered format %u image, header name is %s\n",
5041 rbd_dev->image_format, rbd_dev->header_name);
83a06263 5042
30d60ba2 5043 return 0;
6fd48b3b
AE
5044err_out_probe:
5045 rbd_dev_unprobe(rbd_dev);
b644de2b 5046err_out_watch:
1f3ef788
AE
5047 if (mapping) {
5048 tmp = rbd_dev_header_watch_sync(rbd_dev, false);
5049 if (tmp)
5050 rbd_warn(rbd_dev, "unable to tear down "
5051 "watch request (%d)\n", tmp);
5052 }
332bb12d
AE
5053out_header_name:
5054 kfree(rbd_dev->header_name);
5055 rbd_dev->header_name = NULL;
5056err_out_format:
5057 rbd_dev->image_format = 0;
5655c4d9
AE
5058 kfree(rbd_dev->spec->image_id);
5059 rbd_dev->spec->image_id = NULL;
5060
5061 dout("probe failed, returning %d\n", ret);
5062
a30b71b9
AE
5063 return ret;
5064}
5065
9b60e70b
ID
5066static ssize_t do_rbd_add(struct bus_type *bus,
5067 const char *buf,
5068 size_t count)
602adf40 5069{
cb8627c7 5070 struct rbd_device *rbd_dev = NULL;
dc79b113 5071 struct ceph_options *ceph_opts = NULL;
4e9afeba 5072 struct rbd_options *rbd_opts = NULL;
859c31df 5073 struct rbd_spec *spec = NULL;
9d3997fd 5074 struct rbd_client *rbdc;
27cc2594 5075 struct ceph_osd_client *osdc;
51344a38 5076 bool read_only;
27cc2594 5077 int rc = -ENOMEM;
602adf40
YS
5078
5079 if (!try_module_get(THIS_MODULE))
5080 return -ENODEV;
5081
602adf40 5082 /* parse add command */
859c31df 5083 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 5084 if (rc < 0)
bd4ba655 5085 goto err_out_module;
51344a38
AE
5086 read_only = rbd_opts->read_only;
5087 kfree(rbd_opts);
5088 rbd_opts = NULL; /* done with this */
78cea76e 5089
9d3997fd
AE
5090 rbdc = rbd_get_client(ceph_opts);
5091 if (IS_ERR(rbdc)) {
5092 rc = PTR_ERR(rbdc);
0ddebc0c 5093 goto err_out_args;
9d3997fd 5094 }
602adf40 5095
602adf40 5096 /* pick the pool */
9d3997fd 5097 osdc = &rbdc->client->osdc;
859c31df 5098 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
602adf40
YS
5099 if (rc < 0)
5100 goto err_out_client;
c0cd10db 5101 spec->pool_id = (u64)rc;
859c31df 5102
0903e875
AE
5103 /* The ceph file layout needs to fit pool id in 32 bits */
5104
c0cd10db
AE
5105 if (spec->pool_id > (u64)U32_MAX) {
5106 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
5107 (unsigned long long)spec->pool_id, U32_MAX);
0903e875
AE
5108 rc = -EIO;
5109 goto err_out_client;
5110 }
5111
c53d5893 5112 rbd_dev = rbd_dev_create(rbdc, spec);
bd4ba655
AE
5113 if (!rbd_dev)
5114 goto err_out_client;
c53d5893
AE
5115 rbdc = NULL; /* rbd_dev now owns this */
5116 spec = NULL; /* rbd_dev now owns this */
602adf40 5117
1f3ef788 5118 rc = rbd_dev_image_probe(rbd_dev, true);
a30b71b9 5119 if (rc < 0)
c53d5893 5120 goto err_out_rbd_dev;
05fd6f6f 5121
7ce4eef7
AE
5122 /* If we are mapping a snapshot it must be marked read-only */
5123
5124 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
5125 read_only = true;
5126 rbd_dev->mapping.read_only = read_only;
5127
b536f69a 5128 rc = rbd_dev_device_setup(rbd_dev);
3abef3b3
AE
5129 if (rc) {
5130 rbd_dev_image_release(rbd_dev);
5131 goto err_out_module;
5132 }
5133
5134 return count;
b536f69a 5135
c53d5893
AE
5136err_out_rbd_dev:
5137 rbd_dev_destroy(rbd_dev);
bd4ba655 5138err_out_client:
9d3997fd 5139 rbd_put_client(rbdc);
0ddebc0c 5140err_out_args:
859c31df 5141 rbd_spec_put(spec);
bd4ba655
AE
5142err_out_module:
5143 module_put(THIS_MODULE);
27cc2594 5144
602adf40 5145 dout("Error adding device %s\n", buf);
27cc2594 5146
c0cd10db 5147 return (ssize_t)rc;
602adf40
YS
5148}
5149
9b60e70b
ID
5150static ssize_t rbd_add(struct bus_type *bus,
5151 const char *buf,
5152 size_t count)
5153{
5154 if (single_major)
5155 return -EINVAL;
5156
5157 return do_rbd_add(bus, buf, count);
5158}
5159
5160static ssize_t rbd_add_single_major(struct bus_type *bus,
5161 const char *buf,
5162 size_t count)
5163{
5164 return do_rbd_add(bus, buf, count);
5165}
5166
200a6a8b 5167static void rbd_dev_device_release(struct device *dev)
602adf40 5168{
593a9e7b 5169 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 5170
602adf40 5171 rbd_free_disk(rbd_dev);
200a6a8b 5172 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6d80b130 5173 rbd_dev_mapping_clear(rbd_dev);
9b60e70b
ID
5174 if (!single_major)
5175 unregister_blkdev(rbd_dev->major, rbd_dev->name);
e2839308 5176 rbd_dev_id_put(rbd_dev);
d1cf5788 5177 rbd_dev_mapping_clear(rbd_dev);
602adf40
YS
5178}
5179
05a46afd
AE
5180static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5181{
ad945fc1 5182 while (rbd_dev->parent) {
05a46afd
AE
5183 struct rbd_device *first = rbd_dev;
5184 struct rbd_device *second = first->parent;
5185 struct rbd_device *third;
5186
5187 /*
5188 * Follow to the parent with no grandparent and
5189 * remove it.
5190 */
5191 while (second && (third = second->parent)) {
5192 first = second;
5193 second = third;
5194 }
ad945fc1 5195 rbd_assert(second);
8ad42cd0 5196 rbd_dev_image_release(second);
ad945fc1
AE
5197 first->parent = NULL;
5198 first->parent_overlap = 0;
5199
5200 rbd_assert(first->parent_spec);
05a46afd
AE
5201 rbd_spec_put(first->parent_spec);
5202 first->parent_spec = NULL;
05a46afd
AE
5203 }
5204}
5205
9b60e70b
ID
5206static ssize_t do_rbd_remove(struct bus_type *bus,
5207 const char *buf,
5208 size_t count)
602adf40
YS
5209{
5210 struct rbd_device *rbd_dev = NULL;
751cc0e3
AE
5211 struct list_head *tmp;
5212 int dev_id;
602adf40 5213 unsigned long ul;
82a442d2 5214 bool already = false;
0d8189e1 5215 int ret;
602adf40 5216
bb8e0e84 5217 ret = kstrtoul(buf, 10, &ul);
0d8189e1
AE
5218 if (ret)
5219 return ret;
602adf40
YS
5220
5221 /* convert to int; abort if we lost anything in the conversion */
751cc0e3
AE
5222 dev_id = (int)ul;
5223 if (dev_id != ul)
602adf40
YS
5224 return -EINVAL;
5225
751cc0e3
AE
5226 ret = -ENOENT;
5227 spin_lock(&rbd_dev_list_lock);
5228 list_for_each(tmp, &rbd_dev_list) {
5229 rbd_dev = list_entry(tmp, struct rbd_device, node);
5230 if (rbd_dev->dev_id == dev_id) {
5231 ret = 0;
5232 break;
5233 }
42382b70 5234 }
751cc0e3
AE
5235 if (!ret) {
5236 spin_lock_irq(&rbd_dev->lock);
5237 if (rbd_dev->open_count)
5238 ret = -EBUSY;
5239 else
82a442d2
AE
5240 already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
5241 &rbd_dev->flags);
751cc0e3
AE
5242 spin_unlock_irq(&rbd_dev->lock);
5243 }
5244 spin_unlock(&rbd_dev_list_lock);
82a442d2 5245 if (ret < 0 || already)
1ba0f1e7 5246 return ret;
751cc0e3 5247
1f3ef788
AE
5248 ret = rbd_dev_header_watch_sync(rbd_dev, false);
5249 if (ret)
5250 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
9abc5990
JD
5251
5252 /*
5253 * flush remaining watch callbacks - these must be complete
5254 * before the osd_client is shutdown
5255 */
5256 dout("%s: flushing notifies", __func__);
5257 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
9875201e
JD
5258 /*
5259 * Don't free anything from rbd_dev->disk until after all
5260 * notifies are completely processed. Otherwise
5261 * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
5262 * in a potential use after free of rbd_dev->disk or rbd_dev.
5263 */
5264 rbd_bus_del_dev(rbd_dev);
8ad42cd0 5265 rbd_dev_image_release(rbd_dev);
79ab7558 5266 module_put(THIS_MODULE);
aafb230e 5267
1ba0f1e7 5268 return count;
602adf40
YS
5269}
5270
9b60e70b
ID
5271static ssize_t rbd_remove(struct bus_type *bus,
5272 const char *buf,
5273 size_t count)
5274{
5275 if (single_major)
5276 return -EINVAL;
5277
5278 return do_rbd_remove(bus, buf, count);
5279}
5280
5281static ssize_t rbd_remove_single_major(struct bus_type *bus,
5282 const char *buf,
5283 size_t count)
5284{
5285 return do_rbd_remove(bus, buf, count);
5286}
5287
602adf40
YS
5288/*
5289 * create control files in sysfs
dfc5606d 5290 * /sys/bus/rbd/...
602adf40
YS
5291 */
5292static int rbd_sysfs_init(void)
5293{
dfc5606d 5294 int ret;
602adf40 5295
fed4c143 5296 ret = device_register(&rbd_root_dev);
21079786 5297 if (ret < 0)
dfc5606d 5298 return ret;
602adf40 5299
fed4c143
AE
5300 ret = bus_register(&rbd_bus_type);
5301 if (ret < 0)
5302 device_unregister(&rbd_root_dev);
602adf40 5303
602adf40
YS
5304 return ret;
5305}
5306
5307static void rbd_sysfs_cleanup(void)
5308{
dfc5606d 5309 bus_unregister(&rbd_bus_type);
fed4c143 5310 device_unregister(&rbd_root_dev);
602adf40
YS
5311}
5312
1c2a9dfe
AE
5313static int rbd_slab_init(void)
5314{
5315 rbd_assert(!rbd_img_request_cache);
5316 rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5317 sizeof (struct rbd_img_request),
5318 __alignof__(struct rbd_img_request),
5319 0, NULL);
868311b1
AE
5320 if (!rbd_img_request_cache)
5321 return -ENOMEM;
5322
5323 rbd_assert(!rbd_obj_request_cache);
5324 rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5325 sizeof (struct rbd_obj_request),
5326 __alignof__(struct rbd_obj_request),
5327 0, NULL);
78c2a44a
AE
5328 if (!rbd_obj_request_cache)
5329 goto out_err;
5330
5331 rbd_assert(!rbd_segment_name_cache);
5332 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5333 MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
5334 if (rbd_segment_name_cache)
1c2a9dfe 5335 return 0;
78c2a44a
AE
5336out_err:
5337 if (rbd_obj_request_cache) {
5338 kmem_cache_destroy(rbd_obj_request_cache);
5339 rbd_obj_request_cache = NULL;
5340 }
1c2a9dfe 5341
868311b1
AE
5342 kmem_cache_destroy(rbd_img_request_cache);
5343 rbd_img_request_cache = NULL;
5344
1c2a9dfe
AE
5345 return -ENOMEM;
5346}
5347
5348static void rbd_slab_exit(void)
5349{
78c2a44a
AE
5350 rbd_assert(rbd_segment_name_cache);
5351 kmem_cache_destroy(rbd_segment_name_cache);
5352 rbd_segment_name_cache = NULL;
5353
868311b1
AE
5354 rbd_assert(rbd_obj_request_cache);
5355 kmem_cache_destroy(rbd_obj_request_cache);
5356 rbd_obj_request_cache = NULL;
5357
1c2a9dfe
AE
5358 rbd_assert(rbd_img_request_cache);
5359 kmem_cache_destroy(rbd_img_request_cache);
5360 rbd_img_request_cache = NULL;
5361}
5362
cc344fa1 5363static int __init rbd_init(void)
602adf40
YS
5364{
5365 int rc;
5366
1e32d34c
AE
5367 if (!libceph_compatible(NULL)) {
5368 rbd_warn(NULL, "libceph incompatibility (quitting)");
1e32d34c
AE
5369 return -EINVAL;
5370 }
e1b4d96d 5371
1c2a9dfe 5372 rc = rbd_slab_init();
602adf40
YS
5373 if (rc)
5374 return rc;
e1b4d96d 5375
9b60e70b
ID
5376 if (single_major) {
5377 rbd_major = register_blkdev(0, RBD_DRV_NAME);
5378 if (rbd_major < 0) {
5379 rc = rbd_major;
5380 goto err_out_slab;
5381 }
5382 }
5383
1c2a9dfe
AE
5384 rc = rbd_sysfs_init();
5385 if (rc)
9b60e70b
ID
5386 goto err_out_blkdev;
5387
5388 if (single_major)
5389 pr_info("loaded (major %d)\n", rbd_major);
5390 else
5391 pr_info("loaded\n");
1c2a9dfe 5392
e1b4d96d
ID
5393 return 0;
5394
9b60e70b
ID
5395err_out_blkdev:
5396 if (single_major)
5397 unregister_blkdev(rbd_major, RBD_DRV_NAME);
e1b4d96d
ID
5398err_out_slab:
5399 rbd_slab_exit();
1c2a9dfe 5400 return rc;
602adf40
YS
5401}
5402
cc344fa1 5403static void __exit rbd_exit(void)
602adf40
YS
5404{
5405 rbd_sysfs_cleanup();
9b60e70b
ID
5406 if (single_major)
5407 unregister_blkdev(rbd_major, RBD_DRV_NAME);
1c2a9dfe 5408 rbd_slab_exit();
602adf40
YS
5409}
5410
5411module_init(rbd_init);
5412module_exit(rbd_exit);
5413
d552c619 5414MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
602adf40
YS
5415MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5416MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
602adf40
YS
5417/* following authorship retained from original osdblk.c */
5418MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5419
90da258b 5420MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
602adf40 5421MODULE_LICENSE("GPL");
This page took 0.596375 seconds and 5 git commands to generate.