libceph: a per-osdc crush scratch buffer
[deliverable/linux.git] / drivers / block / rbd.c
CommitLineData
e2a58ee5 1
602adf40
YS
2/*
3 rbd.c -- Export ceph rados objects as a Linux block device
4
5
6 based on drivers/block/osdblk.c:
7
8 Copyright 2009 Red Hat, Inc.
9
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
dfc5606d 25 For usage instructions, please refer to:
602adf40 26
dfc5606d 27 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
28
29 */
30
31#include <linux/ceph/libceph.h>
32#include <linux/ceph/osd_client.h>
33#include <linux/ceph/mon_client.h>
34#include <linux/ceph/decode.h>
59c2be1e 35#include <linux/parser.h>
30d1cff8 36#include <linux/bsearch.h>
602adf40
YS
37
38#include <linux/kernel.h>
39#include <linux/device.h>
40#include <linux/module.h>
41#include <linux/fs.h>
42#include <linux/blkdev.h>
1c2a9dfe 43#include <linux/slab.h>
f8a22fc2 44#include <linux/idr.h>
602adf40
YS
45
46#include "rbd_types.h"
47
aafb230e
AE
48#define RBD_DEBUG /* Activate rbd_assert() calls */
49
593a9e7b
AE
50/*
51 * The basic unit of block I/O is a sector. It is interpreted in a
52 * number of contexts in Linux (blk, bio, genhd), but the default is
53 * universally 512 bytes. These symbols are just slightly more
54 * meaningful than the bare numbers they represent.
55 */
56#define SECTOR_SHIFT 9
57#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
58
a2acd00e
AE
59/*
60 * Increment the given counter and return its updated value.
61 * If the counter is already 0 it will not be incremented.
62 * If the counter is already at its maximum value returns
63 * -EINVAL without updating it.
64 */
65static int atomic_inc_return_safe(atomic_t *v)
66{
67 unsigned int counter;
68
69 counter = (unsigned int)__atomic_add_unless(v, 1, 0);
70 if (counter <= (unsigned int)INT_MAX)
71 return (int)counter;
72
73 atomic_dec(v);
74
75 return -EINVAL;
76}
77
78/* Decrement the counter. Return the resulting value, or -EINVAL */
79static int atomic_dec_return_safe(atomic_t *v)
80{
81 int counter;
82
83 counter = atomic_dec_return(v);
84 if (counter >= 0)
85 return counter;
86
87 atomic_inc(v);
88
89 return -EINVAL;
90}
91
f0f8cef5 92#define RBD_DRV_NAME "rbd"
602adf40 93
7e513d43
ID
94#define RBD_MINORS_PER_MAJOR 256
95#define RBD_SINGLE_MAJOR_PART_SHIFT 4
602adf40 96
d4b125e9
AE
97#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
98#define RBD_MAX_SNAP_NAME_LEN \
99 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
100
35d489f9 101#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
102
103#define RBD_SNAP_HEAD_NAME "-"
104
9682fc6d
AE
105#define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
106
9e15b77d
AE
107/* This allows a single page to hold an image name sent by OSD */
108#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 109#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 110
1e130199 111#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 112
d889140c
AE
113/* Feature bits */
114
5cbf6f12
AE
115#define RBD_FEATURE_LAYERING (1<<0)
116#define RBD_FEATURE_STRIPINGV2 (1<<1)
117#define RBD_FEATURES_ALL \
118 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
d889140c
AE
119
120/* Features supported by this (client software) implementation. */
121
770eba6e 122#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
d889140c 123
81a89793
AE
124/*
125 * An RBD device name will be "rbd#", where the "rbd" comes from
126 * RBD_DRV_NAME above, and # is a unique integer identifier.
127 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
128 * enough to hold all possible device names.
129 */
602adf40 130#define DEV_NAME_LEN 32
81a89793 131#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40
YS
132
133/*
134 * block device image metadata (in-memory version)
135 */
136struct rbd_image_header {
f35a4dee 137 /* These six fields never change for a given rbd image */
849b4260 138 char *object_prefix;
602adf40
YS
139 __u8 obj_order;
140 __u8 crypt_type;
141 __u8 comp_type;
f35a4dee
AE
142 u64 stripe_unit;
143 u64 stripe_count;
144 u64 features; /* Might be changeable someday? */
602adf40 145
f84344f3
AE
146 /* The remaining fields need to be updated occasionally */
147 u64 image_size;
148 struct ceph_snap_context *snapc;
f35a4dee
AE
149 char *snap_names; /* format 1 only */
150 u64 *snap_sizes; /* format 1 only */
59c2be1e
YS
151};
152
0d7dbfce
AE
153/*
154 * An rbd image specification.
155 *
156 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
157 * identify an image. Each rbd_dev structure includes a pointer to
158 * an rbd_spec structure that encapsulates this identity.
159 *
160 * Each of the id's in an rbd_spec has an associated name. For a
161 * user-mapped image, the names are supplied and the id's associated
162 * with them are looked up. For a layered image, a parent image is
163 * defined by the tuple, and the names are looked up.
164 *
165 * An rbd_dev structure contains a parent_spec pointer which is
166 * non-null if the image it represents is a child in a layered
167 * image. This pointer will refer to the rbd_spec structure used
168 * by the parent rbd_dev for its own identity (i.e., the structure
169 * is shared between the parent and child).
170 *
171 * Since these structures are populated once, during the discovery
172 * phase of image construction, they are effectively immutable so
173 * we make no effort to synchronize access to them.
174 *
175 * Note that code herein does not assume the image name is known (it
176 * could be a null pointer).
0d7dbfce
AE
177 */
178struct rbd_spec {
179 u64 pool_id;
ecb4dc22 180 const char *pool_name;
0d7dbfce 181
ecb4dc22
AE
182 const char *image_id;
183 const char *image_name;
0d7dbfce
AE
184
185 u64 snap_id;
ecb4dc22 186 const char *snap_name;
0d7dbfce
AE
187
188 struct kref kref;
189};
190
602adf40 191/*
f0f8cef5 192 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
193 */
194struct rbd_client {
195 struct ceph_client *client;
196 struct kref kref;
197 struct list_head node;
198};
199
bf0d5f50
AE
200struct rbd_img_request;
201typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
202
203#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
204
205struct rbd_obj_request;
206typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
207
9969ebc5
AE
208enum obj_request_type {
209 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
210};
bf0d5f50 211
926f9b3f
AE
212enum obj_req_flags {
213 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
6365d33a 214 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
5679c59f
AE
215 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
216 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
926f9b3f
AE
217};
218
bf0d5f50
AE
219struct rbd_obj_request {
220 const char *object_name;
221 u64 offset; /* object start byte */
222 u64 length; /* bytes from offset */
926f9b3f 223 unsigned long flags;
bf0d5f50 224
c5b5ef6c
AE
225 /*
226 * An object request associated with an image will have its
227 * img_data flag set; a standalone object request will not.
228 *
229 * A standalone object request will have which == BAD_WHICH
230 * and a null obj_request pointer.
231 *
232 * An object request initiated in support of a layered image
233 * object (to check for its existence before a write) will
234 * have which == BAD_WHICH and a non-null obj_request pointer.
235 *
236 * Finally, an object request for rbd image data will have
237 * which != BAD_WHICH, and will have a non-null img_request
238 * pointer. The value of which will be in the range
239 * 0..(img_request->obj_request_count-1).
240 */
241 union {
242 struct rbd_obj_request *obj_request; /* STAT op */
243 struct {
244 struct rbd_img_request *img_request;
245 u64 img_offset;
246 /* links for img_request->obj_requests list */
247 struct list_head links;
248 };
249 };
bf0d5f50
AE
250 u32 which; /* posn image request list */
251
252 enum obj_request_type type;
788e2df3
AE
253 union {
254 struct bio *bio_list;
255 struct {
256 struct page **pages;
257 u32 page_count;
258 };
259 };
0eefd470 260 struct page **copyup_pages;
ebda6408 261 u32 copyup_page_count;
bf0d5f50
AE
262
263 struct ceph_osd_request *osd_req;
264
265 u64 xferred; /* bytes transferred */
1b83bef2 266 int result;
bf0d5f50
AE
267
268 rbd_obj_callback_t callback;
788e2df3 269 struct completion completion;
bf0d5f50
AE
270
271 struct kref kref;
272};
273
0c425248 274enum img_req_flags {
9849e986
AE
275 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
276 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
d0b2e944 277 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
0c425248
AE
278};
279
bf0d5f50 280struct rbd_img_request {
bf0d5f50
AE
281 struct rbd_device *rbd_dev;
282 u64 offset; /* starting image byte offset */
283 u64 length; /* byte count from offset */
0c425248 284 unsigned long flags;
bf0d5f50 285 union {
9849e986 286 u64 snap_id; /* for reads */
bf0d5f50 287 struct ceph_snap_context *snapc; /* for writes */
9849e986
AE
288 };
289 union {
290 struct request *rq; /* block request */
291 struct rbd_obj_request *obj_request; /* obj req initiator */
bf0d5f50 292 };
3d7efd18 293 struct page **copyup_pages;
ebda6408 294 u32 copyup_page_count;
bf0d5f50
AE
295 spinlock_t completion_lock;/* protects next_completion */
296 u32 next_completion;
297 rbd_img_callback_t callback;
55f27e09 298 u64 xferred;/* aggregate bytes transferred */
a5a337d4 299 int result; /* first nonzero obj_request result */
bf0d5f50
AE
300
301 u32 obj_request_count;
302 struct list_head obj_requests; /* rbd_obj_request structs */
303
304 struct kref kref;
305};
306
307#define for_each_obj_request(ireq, oreq) \
ef06f4d3 308 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
bf0d5f50 309#define for_each_obj_request_from(ireq, oreq) \
ef06f4d3 310 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
bf0d5f50 311#define for_each_obj_request_safe(ireq, oreq, n) \
ef06f4d3 312 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
bf0d5f50 313
f84344f3 314struct rbd_mapping {
99c1f08f 315 u64 size;
34b13184 316 u64 features;
f84344f3
AE
317 bool read_only;
318};
319
602adf40
YS
320/*
321 * a single device
322 */
323struct rbd_device {
de71a297 324 int dev_id; /* blkdev unique id */
602adf40
YS
325
326 int major; /* blkdev assigned major */
dd82fff1 327 int minor;
602adf40 328 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 329
a30b71b9 330 u32 image_format; /* Either 1 or 2 */
602adf40
YS
331 struct rbd_client *rbd_client;
332
333 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
334
b82d167b 335 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
336
337 struct rbd_image_header header;
b82d167b 338 unsigned long flags; /* possibly lock protected */
0d7dbfce 339 struct rbd_spec *spec;
602adf40 340
0d7dbfce 341 char *header_name;
971f839a 342
0903e875
AE
343 struct ceph_file_layout layout;
344
59c2be1e 345 struct ceph_osd_event *watch_event;
975241af 346 struct rbd_obj_request *watch_request;
59c2be1e 347
86b00e0d
AE
348 struct rbd_spec *parent_spec;
349 u64 parent_overlap;
a2acd00e 350 atomic_t parent_ref;
2f82ee54 351 struct rbd_device *parent;
86b00e0d 352
c666601a
JD
353 /* protects updating the header */
354 struct rw_semaphore header_rwsem;
f84344f3
AE
355
356 struct rbd_mapping mapping;
602adf40
YS
357
358 struct list_head node;
dfc5606d 359
dfc5606d
YS
360 /* sysfs related */
361 struct device dev;
b82d167b 362 unsigned long open_count; /* protected by lock */
dfc5606d
YS
363};
364
b82d167b
AE
365/*
366 * Flag bits for rbd_dev->flags. If atomicity is required,
367 * rbd_dev->lock is used to protect access.
368 *
369 * Currently, only the "removing" flag (which is coupled with the
370 * "open_count" field) requires atomic access.
371 */
6d292906
AE
372enum rbd_dev_flags {
373 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 374 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
6d292906
AE
375};
376
cfbf6377 377static DEFINE_MUTEX(client_mutex); /* Serialize client creation */
e124a82f 378
602adf40 379static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
380static DEFINE_SPINLOCK(rbd_dev_list_lock);
381
432b8587
AE
382static LIST_HEAD(rbd_client_list); /* clients */
383static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 384
78c2a44a
AE
385/* Slab caches for frequently-allocated structures */
386
1c2a9dfe 387static struct kmem_cache *rbd_img_request_cache;
868311b1 388static struct kmem_cache *rbd_obj_request_cache;
78c2a44a 389static struct kmem_cache *rbd_segment_name_cache;
1c2a9dfe 390
9b60e70b 391static int rbd_major;
f8a22fc2
ID
392static DEFINE_IDA(rbd_dev_id_ida);
393
9b60e70b
ID
394/*
395 * Default to false for now, as single-major requires >= 0.75 version of
396 * userspace rbd utility.
397 */
398static bool single_major = false;
399module_param(single_major, bool, S_IRUGO);
400MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)");
401
3d7efd18
AE
402static int rbd_img_request_submit(struct rbd_img_request *img_request);
403
200a6a8b 404static void rbd_dev_device_release(struct device *dev);
dfc5606d 405
f0f8cef5
AE
406static ssize_t rbd_add(struct bus_type *bus, const char *buf,
407 size_t count);
408static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
409 size_t count);
9b60e70b
ID
410static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
411 size_t count);
412static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
413 size_t count);
1f3ef788 414static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
a2acd00e 415static void rbd_spec_put(struct rbd_spec *spec);
f0f8cef5 416
9b60e70b
ID
417static int rbd_dev_id_to_minor(int dev_id)
418{
7e513d43 419 return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
9b60e70b
ID
420}
421
422static int minor_to_rbd_dev_id(int minor)
423{
7e513d43 424 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
9b60e70b
ID
425}
426
b15a21dd
GKH
427static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
428static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
9b60e70b
ID
429static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
430static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
b15a21dd
GKH
431
432static struct attribute *rbd_bus_attrs[] = {
433 &bus_attr_add.attr,
434 &bus_attr_remove.attr,
9b60e70b
ID
435 &bus_attr_add_single_major.attr,
436 &bus_attr_remove_single_major.attr,
b15a21dd 437 NULL,
f0f8cef5 438};
92c76dc0
ID
439
440static umode_t rbd_bus_is_visible(struct kobject *kobj,
441 struct attribute *attr, int index)
442{
9b60e70b
ID
443 if (!single_major &&
444 (attr == &bus_attr_add_single_major.attr ||
445 attr == &bus_attr_remove_single_major.attr))
446 return 0;
447
92c76dc0
ID
448 return attr->mode;
449}
450
451static const struct attribute_group rbd_bus_group = {
452 .attrs = rbd_bus_attrs,
453 .is_visible = rbd_bus_is_visible,
454};
455__ATTRIBUTE_GROUPS(rbd_bus);
f0f8cef5
AE
456
457static struct bus_type rbd_bus_type = {
458 .name = "rbd",
b15a21dd 459 .bus_groups = rbd_bus_groups,
f0f8cef5
AE
460};
461
462static void rbd_root_dev_release(struct device *dev)
463{
464}
465
466static struct device rbd_root_dev = {
467 .init_name = "rbd",
468 .release = rbd_root_dev_release,
469};
470
06ecc6cb
AE
471static __printf(2, 3)
472void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
473{
474 struct va_format vaf;
475 va_list args;
476
477 va_start(args, fmt);
478 vaf.fmt = fmt;
479 vaf.va = &args;
480
481 if (!rbd_dev)
482 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
483 else if (rbd_dev->disk)
484 printk(KERN_WARNING "%s: %s: %pV\n",
485 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
486 else if (rbd_dev->spec && rbd_dev->spec->image_name)
487 printk(KERN_WARNING "%s: image %s: %pV\n",
488 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
489 else if (rbd_dev->spec && rbd_dev->spec->image_id)
490 printk(KERN_WARNING "%s: id %s: %pV\n",
491 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
492 else /* punt */
493 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
494 RBD_DRV_NAME, rbd_dev, &vaf);
495 va_end(args);
496}
497
aafb230e
AE
498#ifdef RBD_DEBUG
499#define rbd_assert(expr) \
500 if (unlikely(!(expr))) { \
501 printk(KERN_ERR "\nAssertion failure in %s() " \
502 "at line %d:\n\n" \
503 "\trbd_assert(%s);\n\n", \
504 __func__, __LINE__, #expr); \
505 BUG(); \
506 }
507#else /* !RBD_DEBUG */
508# define rbd_assert(expr) ((void) 0)
509#endif /* !RBD_DEBUG */
dfc5606d 510
b454e36d 511static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
05a46afd
AE
512static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
513static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
8b3e1a56 514
cc4a38bd 515static int rbd_dev_refresh(struct rbd_device *rbd_dev);
2df3fac7
AE
516static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
517static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
54cac61f
AE
518static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
519 u64 snap_id);
2ad3d716
AE
520static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
521 u8 *order, u64 *snap_size);
522static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
523 u64 *snap_features);
524static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
59c2be1e 525
602adf40
YS
526static int rbd_open(struct block_device *bdev, fmode_t mode)
527{
f0f8cef5 528 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 529 bool removing = false;
602adf40 530
f84344f3 531 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
602adf40
YS
532 return -EROFS;
533
a14ea269 534 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
535 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
536 removing = true;
537 else
538 rbd_dev->open_count++;
a14ea269 539 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
540 if (removing)
541 return -ENOENT;
542
c3e946ce 543 (void) get_device(&rbd_dev->dev);
f84344f3 544 set_device_ro(bdev, rbd_dev->mapping.read_only);
340c7a2b 545
602adf40
YS
546 return 0;
547}
548
db2a144b 549static void rbd_release(struct gendisk *disk, fmode_t mode)
dfc5606d
YS
550{
551 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
552 unsigned long open_count_before;
553
a14ea269 554 spin_lock_irq(&rbd_dev->lock);
b82d167b 555 open_count_before = rbd_dev->open_count--;
a14ea269 556 spin_unlock_irq(&rbd_dev->lock);
b82d167b 557 rbd_assert(open_count_before > 0);
dfc5606d 558
c3e946ce 559 put_device(&rbd_dev->dev);
dfc5606d
YS
560}
561
602adf40
YS
562static const struct block_device_operations rbd_bd_ops = {
563 .owner = THIS_MODULE,
564 .open = rbd_open,
dfc5606d 565 .release = rbd_release,
602adf40
YS
566};
567
568/*
7262cfca 569 * Initialize an rbd client instance. Success or not, this function
cfbf6377 570 * consumes ceph_opts. Caller holds client_mutex.
602adf40 571 */
f8c38929 572static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
573{
574 struct rbd_client *rbdc;
575 int ret = -ENOMEM;
576
37206ee5 577 dout("%s:\n", __func__);
602adf40
YS
578 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
579 if (!rbdc)
580 goto out_opt;
581
582 kref_init(&rbdc->kref);
583 INIT_LIST_HEAD(&rbdc->node);
584
43ae4701 585 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 586 if (IS_ERR(rbdc->client))
08f75463 587 goto out_rbdc;
43ae4701 588 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
589
590 ret = ceph_open_session(rbdc->client);
591 if (ret < 0)
08f75463 592 goto out_client;
602adf40 593
432b8587 594 spin_lock(&rbd_client_list_lock);
602adf40 595 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 596 spin_unlock(&rbd_client_list_lock);
602adf40 597
37206ee5 598 dout("%s: rbdc %p\n", __func__, rbdc);
bc534d86 599
602adf40 600 return rbdc;
08f75463 601out_client:
602adf40 602 ceph_destroy_client(rbdc->client);
08f75463 603out_rbdc:
602adf40
YS
604 kfree(rbdc);
605out_opt:
43ae4701
AE
606 if (ceph_opts)
607 ceph_destroy_options(ceph_opts);
37206ee5
AE
608 dout("%s: error %d\n", __func__, ret);
609
28f259b7 610 return ERR_PTR(ret);
602adf40
YS
611}
612
2f82ee54
AE
613static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
614{
615 kref_get(&rbdc->kref);
616
617 return rbdc;
618}
619
602adf40 620/*
1f7ba331
AE
621 * Find a ceph client with specific addr and configuration. If
622 * found, bump its reference count.
602adf40 623 */
1f7ba331 624static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
625{
626 struct rbd_client *client_node;
1f7ba331 627 bool found = false;
602adf40 628
43ae4701 629 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
630 return NULL;
631
1f7ba331
AE
632 spin_lock(&rbd_client_list_lock);
633 list_for_each_entry(client_node, &rbd_client_list, node) {
634 if (!ceph_compare_options(ceph_opts, client_node->client)) {
2f82ee54
AE
635 __rbd_get_client(client_node);
636
1f7ba331
AE
637 found = true;
638 break;
639 }
640 }
641 spin_unlock(&rbd_client_list_lock);
642
643 return found ? client_node : NULL;
602adf40
YS
644}
645
59c2be1e
YS
646/*
647 * mount options
648 */
649enum {
59c2be1e
YS
650 Opt_last_int,
651 /* int args above */
652 Opt_last_string,
653 /* string args above */
cc0538b6
AE
654 Opt_read_only,
655 Opt_read_write,
656 /* Boolean args above */
657 Opt_last_bool,
59c2be1e
YS
658};
659
43ae4701 660static match_table_t rbd_opts_tokens = {
59c2be1e
YS
661 /* int args above */
662 /* string args above */
be466c1c 663 {Opt_read_only, "read_only"},
cc0538b6
AE
664 {Opt_read_only, "ro"}, /* Alternate spelling */
665 {Opt_read_write, "read_write"},
666 {Opt_read_write, "rw"}, /* Alternate spelling */
667 /* Boolean args above */
59c2be1e
YS
668 {-1, NULL}
669};
670
98571b5a
AE
671struct rbd_options {
672 bool read_only;
673};
674
675#define RBD_READ_ONLY_DEFAULT false
676
59c2be1e
YS
677static int parse_rbd_opts_token(char *c, void *private)
678{
43ae4701 679 struct rbd_options *rbd_opts = private;
59c2be1e
YS
680 substring_t argstr[MAX_OPT_ARGS];
681 int token, intval, ret;
682
43ae4701 683 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
684 if (token < 0)
685 return -EINVAL;
686
687 if (token < Opt_last_int) {
688 ret = match_int(&argstr[0], &intval);
689 if (ret < 0) {
690 pr_err("bad mount option arg (not int) "
691 "at '%s'\n", c);
692 return ret;
693 }
694 dout("got int token %d val %d\n", token, intval);
695 } else if (token > Opt_last_int && token < Opt_last_string) {
696 dout("got string token %d val %s\n", token,
697 argstr[0].from);
cc0538b6
AE
698 } else if (token > Opt_last_string && token < Opt_last_bool) {
699 dout("got Boolean token %d\n", token);
59c2be1e
YS
700 } else {
701 dout("got token %d\n", token);
702 }
703
704 switch (token) {
cc0538b6
AE
705 case Opt_read_only:
706 rbd_opts->read_only = true;
707 break;
708 case Opt_read_write:
709 rbd_opts->read_only = false;
710 break;
59c2be1e 711 default:
aafb230e
AE
712 rbd_assert(false);
713 break;
59c2be1e
YS
714 }
715 return 0;
716}
717
602adf40
YS
718/*
719 * Get a ceph client with specific addr and configuration, if one does
7262cfca
AE
720 * not exist create it. Either way, ceph_opts is consumed by this
721 * function.
602adf40 722 */
9d3997fd 723static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
602adf40 724{
f8c38929 725 struct rbd_client *rbdc;
59c2be1e 726
cfbf6377 727 mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
1f7ba331 728 rbdc = rbd_client_find(ceph_opts);
9d3997fd 729 if (rbdc) /* using an existing client */
43ae4701 730 ceph_destroy_options(ceph_opts);
9d3997fd 731 else
f8c38929 732 rbdc = rbd_client_create(ceph_opts);
cfbf6377 733 mutex_unlock(&client_mutex);
602adf40 734
9d3997fd 735 return rbdc;
602adf40
YS
736}
737
738/*
739 * Destroy ceph client
d23a4b3f 740 *
432b8587 741 * Caller must hold rbd_client_list_lock.
602adf40
YS
742 */
743static void rbd_client_release(struct kref *kref)
744{
745 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
746
37206ee5 747 dout("%s: rbdc %p\n", __func__, rbdc);
cd9d9f5d 748 spin_lock(&rbd_client_list_lock);
602adf40 749 list_del(&rbdc->node);
cd9d9f5d 750 spin_unlock(&rbd_client_list_lock);
602adf40
YS
751
752 ceph_destroy_client(rbdc->client);
753 kfree(rbdc);
754}
755
756/*
757 * Drop reference to ceph client node. If it's not referenced anymore, release
758 * it.
759 */
9d3997fd 760static void rbd_put_client(struct rbd_client *rbdc)
602adf40 761{
c53d5893
AE
762 if (rbdc)
763 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
764}
765
a30b71b9
AE
766static bool rbd_image_format_valid(u32 image_format)
767{
768 return image_format == 1 || image_format == 2;
769}
770
8e94af8e
AE
771static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
772{
103a150f
AE
773 size_t size;
774 u32 snap_count;
775
776 /* The header has to start with the magic rbd header text */
777 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
778 return false;
779
db2388b6
AE
780 /* The bio layer requires at least sector-sized I/O */
781
782 if (ondisk->options.order < SECTOR_SHIFT)
783 return false;
784
785 /* If we use u64 in a few spots we may be able to loosen this */
786
787 if (ondisk->options.order > 8 * sizeof (int) - 1)
788 return false;
789
103a150f
AE
790 /*
791 * The size of a snapshot header has to fit in a size_t, and
792 * that limits the number of snapshots.
793 */
794 snap_count = le32_to_cpu(ondisk->snap_count);
795 size = SIZE_MAX - sizeof (struct ceph_snap_context);
796 if (snap_count > size / sizeof (__le64))
797 return false;
798
799 /*
800 * Not only that, but the size of the entire the snapshot
801 * header must also be representable in a size_t.
802 */
803 size -= snap_count * sizeof (__le64);
804 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
805 return false;
806
807 return true;
8e94af8e
AE
808}
809
602adf40 810/*
bb23e37a
AE
811 * Fill an rbd image header with information from the given format 1
812 * on-disk header.
602adf40 813 */
662518b1 814static int rbd_header_from_disk(struct rbd_device *rbd_dev,
4156d998 815 struct rbd_image_header_ondisk *ondisk)
602adf40 816{
662518b1 817 struct rbd_image_header *header = &rbd_dev->header;
bb23e37a
AE
818 bool first_time = header->object_prefix == NULL;
819 struct ceph_snap_context *snapc;
820 char *object_prefix = NULL;
821 char *snap_names = NULL;
822 u64 *snap_sizes = NULL;
ccece235 823 u32 snap_count;
d2bb24e5 824 size_t size;
bb23e37a 825 int ret = -ENOMEM;
621901d6 826 u32 i;
602adf40 827
bb23e37a 828 /* Allocate this now to avoid having to handle failure below */
6a52325f 829
bb23e37a
AE
830 if (first_time) {
831 size_t len;
103a150f 832
bb23e37a
AE
833 len = strnlen(ondisk->object_prefix,
834 sizeof (ondisk->object_prefix));
835 object_prefix = kmalloc(len + 1, GFP_KERNEL);
836 if (!object_prefix)
837 return -ENOMEM;
838 memcpy(object_prefix, ondisk->object_prefix, len);
839 object_prefix[len] = '\0';
840 }
00f1f36f 841
bb23e37a 842 /* Allocate the snapshot context and fill it in */
00f1f36f 843
bb23e37a
AE
844 snap_count = le32_to_cpu(ondisk->snap_count);
845 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
846 if (!snapc)
847 goto out_err;
848 snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 849 if (snap_count) {
bb23e37a 850 struct rbd_image_snap_ondisk *snaps;
f785cc1d
AE
851 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
852
bb23e37a 853 /* We'll keep a copy of the snapshot names... */
621901d6 854
bb23e37a
AE
855 if (snap_names_len > (u64)SIZE_MAX)
856 goto out_2big;
857 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
858 if (!snap_names)
6a52325f
AE
859 goto out_err;
860
bb23e37a 861 /* ...as well as the array of their sizes. */
621901d6 862
d2bb24e5 863 size = snap_count * sizeof (*header->snap_sizes);
bb23e37a
AE
864 snap_sizes = kmalloc(size, GFP_KERNEL);
865 if (!snap_sizes)
6a52325f 866 goto out_err;
bb23e37a 867
f785cc1d 868 /*
bb23e37a
AE
869 * Copy the names, and fill in each snapshot's id
870 * and size.
871 *
99a41ebc 872 * Note that rbd_dev_v1_header_info() guarantees the
bb23e37a 873 * ondisk buffer we're working with has
f785cc1d
AE
874 * snap_names_len bytes beyond the end of the
875 * snapshot id array, this memcpy() is safe.
876 */
bb23e37a
AE
877 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
878 snaps = ondisk->snaps;
879 for (i = 0; i < snap_count; i++) {
880 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
881 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
882 }
602adf40 883 }
6a52325f 884
bb23e37a 885 /* We won't fail any more, fill in the header */
621901d6 886
bb23e37a
AE
887 if (first_time) {
888 header->object_prefix = object_prefix;
889 header->obj_order = ondisk->options.order;
890 header->crypt_type = ondisk->options.crypt_type;
891 header->comp_type = ondisk->options.comp_type;
892 /* The rest aren't used for format 1 images */
893 header->stripe_unit = 0;
894 header->stripe_count = 0;
895 header->features = 0;
602adf40 896 } else {
662518b1
AE
897 ceph_put_snap_context(header->snapc);
898 kfree(header->snap_names);
899 kfree(header->snap_sizes);
602adf40 900 }
849b4260 901
bb23e37a 902 /* The remaining fields always get updated (when we refresh) */
621901d6 903
f84344f3 904 header->image_size = le64_to_cpu(ondisk->image_size);
bb23e37a
AE
905 header->snapc = snapc;
906 header->snap_names = snap_names;
907 header->snap_sizes = snap_sizes;
468521c1 908
662518b1 909 /* Make sure mapping size is consistent with header info */
602adf40 910
662518b1
AE
911 if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
912 if (rbd_dev->mapping.size != header->image_size)
913 rbd_dev->mapping.size = header->image_size;
914
602adf40 915 return 0;
bb23e37a
AE
916out_2big:
917 ret = -EIO;
6a52325f 918out_err:
bb23e37a
AE
919 kfree(snap_sizes);
920 kfree(snap_names);
921 ceph_put_snap_context(snapc);
922 kfree(object_prefix);
ccece235 923
bb23e37a 924 return ret;
602adf40
YS
925}
926
9682fc6d
AE
927static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
928{
929 const char *snap_name;
930
931 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
932
933 /* Skip over names until we find the one we are looking for */
934
935 snap_name = rbd_dev->header.snap_names;
936 while (which--)
937 snap_name += strlen(snap_name) + 1;
938
939 return kstrdup(snap_name, GFP_KERNEL);
940}
941
30d1cff8
AE
942/*
943 * Snapshot id comparison function for use with qsort()/bsearch().
944 * Note that result is for snapshots in *descending* order.
945 */
946static int snapid_compare_reverse(const void *s1, const void *s2)
947{
948 u64 snap_id1 = *(u64 *)s1;
949 u64 snap_id2 = *(u64 *)s2;
950
951 if (snap_id1 < snap_id2)
952 return 1;
953 return snap_id1 == snap_id2 ? 0 : -1;
954}
955
956/*
957 * Search a snapshot context to see if the given snapshot id is
958 * present.
959 *
960 * Returns the position of the snapshot id in the array if it's found,
961 * or BAD_SNAP_INDEX otherwise.
962 *
963 * Note: The snapshot array is in kept sorted (by the osd) in
964 * reverse order, highest snapshot id first.
965 */
9682fc6d
AE
966static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
967{
968 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
30d1cff8 969 u64 *found;
9682fc6d 970
30d1cff8
AE
971 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
972 sizeof (snap_id), snapid_compare_reverse);
9682fc6d 973
30d1cff8 974 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
9682fc6d
AE
975}
976
2ad3d716
AE
977static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
978 u64 snap_id)
9e15b77d 979{
54cac61f 980 u32 which;
da6a6b63 981 const char *snap_name;
9e15b77d 982
54cac61f
AE
983 which = rbd_dev_snap_index(rbd_dev, snap_id);
984 if (which == BAD_SNAP_INDEX)
da6a6b63 985 return ERR_PTR(-ENOENT);
54cac61f 986
da6a6b63
JD
987 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
988 return snap_name ? snap_name : ERR_PTR(-ENOMEM);
54cac61f
AE
989}
990
991static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
992{
9e15b77d
AE
993 if (snap_id == CEPH_NOSNAP)
994 return RBD_SNAP_HEAD_NAME;
995
54cac61f
AE
996 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
997 if (rbd_dev->image_format == 1)
998 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
9e15b77d 999
54cac61f 1000 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
9e15b77d
AE
1001}
1002
2ad3d716
AE
1003static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1004 u64 *snap_size)
602adf40 1005{
2ad3d716
AE
1006 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1007 if (snap_id == CEPH_NOSNAP) {
1008 *snap_size = rbd_dev->header.image_size;
1009 } else if (rbd_dev->image_format == 1) {
1010 u32 which;
602adf40 1011
2ad3d716
AE
1012 which = rbd_dev_snap_index(rbd_dev, snap_id);
1013 if (which == BAD_SNAP_INDEX)
1014 return -ENOENT;
e86924a8 1015
2ad3d716
AE
1016 *snap_size = rbd_dev->header.snap_sizes[which];
1017 } else {
1018 u64 size = 0;
1019 int ret;
1020
1021 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1022 if (ret)
1023 return ret;
1024
1025 *snap_size = size;
1026 }
1027 return 0;
602adf40
YS
1028}
1029
2ad3d716
AE
1030static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1031 u64 *snap_features)
602adf40 1032{
2ad3d716
AE
1033 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1034 if (snap_id == CEPH_NOSNAP) {
1035 *snap_features = rbd_dev->header.features;
1036 } else if (rbd_dev->image_format == 1) {
1037 *snap_features = 0; /* No features for format 1 */
602adf40 1038 } else {
2ad3d716
AE
1039 u64 features = 0;
1040 int ret;
8b0241f8 1041
2ad3d716
AE
1042 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1043 if (ret)
1044 return ret;
1045
1046 *snap_features = features;
1047 }
1048 return 0;
1049}
1050
1051static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1052{
8f4b7d98 1053 u64 snap_id = rbd_dev->spec->snap_id;
2ad3d716
AE
1054 u64 size = 0;
1055 u64 features = 0;
1056 int ret;
1057
2ad3d716
AE
1058 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1059 if (ret)
1060 return ret;
1061 ret = rbd_snap_features(rbd_dev, snap_id, &features);
1062 if (ret)
1063 return ret;
1064
1065 rbd_dev->mapping.size = size;
1066 rbd_dev->mapping.features = features;
1067
8b0241f8 1068 return 0;
602adf40
YS
1069}
1070
d1cf5788
AE
1071static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1072{
1073 rbd_dev->mapping.size = 0;
1074 rbd_dev->mapping.features = 0;
200a6a8b
AE
1075}
1076
98571b5a 1077static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
602adf40 1078{
65ccfe21
AE
1079 char *name;
1080 u64 segment;
1081 int ret;
3a96d5cd 1082 char *name_format;
602adf40 1083
78c2a44a 1084 name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
65ccfe21
AE
1085 if (!name)
1086 return NULL;
1087 segment = offset >> rbd_dev->header.obj_order;
3a96d5cd
JD
1088 name_format = "%s.%012llx";
1089 if (rbd_dev->image_format == 2)
1090 name_format = "%s.%016llx";
2d0ebc5d 1091 ret = snprintf(name, CEPH_MAX_OID_NAME_LEN + 1, name_format,
65ccfe21 1092 rbd_dev->header.object_prefix, segment);
2d0ebc5d 1093 if (ret < 0 || ret > CEPH_MAX_OID_NAME_LEN) {
65ccfe21
AE
1094 pr_err("error formatting segment name for #%llu (%d)\n",
1095 segment, ret);
1096 kfree(name);
1097 name = NULL;
1098 }
602adf40 1099
65ccfe21
AE
1100 return name;
1101}
602adf40 1102
78c2a44a
AE
1103static void rbd_segment_name_free(const char *name)
1104{
1105 /* The explicit cast here is needed to drop the const qualifier */
1106
1107 kmem_cache_free(rbd_segment_name_cache, (void *)name);
1108}
1109
65ccfe21
AE
1110static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1111{
1112 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
602adf40 1113
65ccfe21
AE
1114 return offset & (segment_size - 1);
1115}
1116
1117static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1118 u64 offset, u64 length)
1119{
1120 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1121
1122 offset &= segment_size - 1;
1123
aafb230e 1124 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
1125 if (offset + length > segment_size)
1126 length = segment_size - offset;
1127
1128 return length;
602adf40
YS
1129}
1130
029bcbd8
JD
1131/*
1132 * returns the size of an object in the image
1133 */
1134static u64 rbd_obj_bytes(struct rbd_image_header *header)
1135{
1136 return 1 << header->obj_order;
1137}
1138
602adf40
YS
1139/*
1140 * bio helpers
1141 */
1142
1143static void bio_chain_put(struct bio *chain)
1144{
1145 struct bio *tmp;
1146
1147 while (chain) {
1148 tmp = chain;
1149 chain = chain->bi_next;
1150 bio_put(tmp);
1151 }
1152}
1153
1154/*
1155 * zeros a bio chain, starting at specific offset
1156 */
1157static void zero_bio_chain(struct bio *chain, int start_ofs)
1158{
7988613b
KO
1159 struct bio_vec bv;
1160 struct bvec_iter iter;
602adf40
YS
1161 unsigned long flags;
1162 void *buf;
602adf40
YS
1163 int pos = 0;
1164
1165 while (chain) {
7988613b
KO
1166 bio_for_each_segment(bv, chain, iter) {
1167 if (pos + bv.bv_len > start_ofs) {
602adf40 1168 int remainder = max(start_ofs - pos, 0);
7988613b 1169 buf = bvec_kmap_irq(&bv, &flags);
602adf40 1170 memset(buf + remainder, 0,
7988613b
KO
1171 bv.bv_len - remainder);
1172 flush_dcache_page(bv.bv_page);
85b5aaa6 1173 bvec_kunmap_irq(buf, &flags);
602adf40 1174 }
7988613b 1175 pos += bv.bv_len;
602adf40
YS
1176 }
1177
1178 chain = chain->bi_next;
1179 }
1180}
1181
b9434c5b
AE
1182/*
1183 * similar to zero_bio_chain(), zeros data defined by a page array,
1184 * starting at the given byte offset from the start of the array and
1185 * continuing up to the given end offset. The pages array is
1186 * assumed to be big enough to hold all bytes up to the end.
1187 */
1188static void zero_pages(struct page **pages, u64 offset, u64 end)
1189{
1190 struct page **page = &pages[offset >> PAGE_SHIFT];
1191
1192 rbd_assert(end > offset);
1193 rbd_assert(end - offset <= (u64)SIZE_MAX);
1194 while (offset < end) {
1195 size_t page_offset;
1196 size_t length;
1197 unsigned long flags;
1198 void *kaddr;
1199
491205a8
GU
1200 page_offset = offset & ~PAGE_MASK;
1201 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
b9434c5b
AE
1202 local_irq_save(flags);
1203 kaddr = kmap_atomic(*page);
1204 memset(kaddr + page_offset, 0, length);
e2156054 1205 flush_dcache_page(*page);
b9434c5b
AE
1206 kunmap_atomic(kaddr);
1207 local_irq_restore(flags);
1208
1209 offset += length;
1210 page++;
1211 }
1212}
1213
602adf40 1214/*
f7760dad
AE
1215 * Clone a portion of a bio, starting at the given byte offset
1216 * and continuing for the number of bytes indicated.
602adf40 1217 */
f7760dad
AE
1218static struct bio *bio_clone_range(struct bio *bio_src,
1219 unsigned int offset,
1220 unsigned int len,
1221 gfp_t gfpmask)
602adf40 1222{
f7760dad
AE
1223 struct bio *bio;
1224
5341a627 1225 bio = bio_clone(bio_src, gfpmask);
f7760dad
AE
1226 if (!bio)
1227 return NULL; /* ENOMEM */
602adf40 1228
5341a627 1229 bio_advance(bio, offset);
4f024f37 1230 bio->bi_iter.bi_size = len;
f7760dad
AE
1231
1232 return bio;
1233}
1234
1235/*
1236 * Clone a portion of a bio chain, starting at the given byte offset
1237 * into the first bio in the source chain and continuing for the
1238 * number of bytes indicated. The result is another bio chain of
1239 * exactly the given length, or a null pointer on error.
1240 *
1241 * The bio_src and offset parameters are both in-out. On entry they
1242 * refer to the first source bio and the offset into that bio where
1243 * the start of data to be cloned is located.
1244 *
1245 * On return, bio_src is updated to refer to the bio in the source
1246 * chain that contains first un-cloned byte, and *offset will
1247 * contain the offset of that byte within that bio.
1248 */
1249static struct bio *bio_chain_clone_range(struct bio **bio_src,
1250 unsigned int *offset,
1251 unsigned int len,
1252 gfp_t gfpmask)
1253{
1254 struct bio *bi = *bio_src;
1255 unsigned int off = *offset;
1256 struct bio *chain = NULL;
1257 struct bio **end;
1258
1259 /* Build up a chain of clone bios up to the limit */
1260
4f024f37 1261 if (!bi || off >= bi->bi_iter.bi_size || !len)
f7760dad 1262 return NULL; /* Nothing to clone */
602adf40 1263
f7760dad
AE
1264 end = &chain;
1265 while (len) {
1266 unsigned int bi_size;
1267 struct bio *bio;
1268
f5400b7a
AE
1269 if (!bi) {
1270 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
f7760dad 1271 goto out_err; /* EINVAL; ran out of bio's */
f5400b7a 1272 }
4f024f37 1273 bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len);
f7760dad
AE
1274 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1275 if (!bio)
1276 goto out_err; /* ENOMEM */
1277
1278 *end = bio;
1279 end = &bio->bi_next;
602adf40 1280
f7760dad 1281 off += bi_size;
4f024f37 1282 if (off == bi->bi_iter.bi_size) {
f7760dad
AE
1283 bi = bi->bi_next;
1284 off = 0;
1285 }
1286 len -= bi_size;
1287 }
1288 *bio_src = bi;
1289 *offset = off;
1290
1291 return chain;
1292out_err:
1293 bio_chain_put(chain);
602adf40 1294
602adf40
YS
1295 return NULL;
1296}
1297
926f9b3f
AE
1298/*
1299 * The default/initial value for all object request flags is 0. For
1300 * each flag, once its value is set to 1 it is never reset to 0
1301 * again.
1302 */
57acbaa7 1303static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
926f9b3f 1304{
57acbaa7 1305 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
926f9b3f
AE
1306 struct rbd_device *rbd_dev;
1307
57acbaa7
AE
1308 rbd_dev = obj_request->img_request->rbd_dev;
1309 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
926f9b3f
AE
1310 obj_request);
1311 }
1312}
1313
57acbaa7 1314static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
926f9b3f
AE
1315{
1316 smp_mb();
57acbaa7 1317 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
926f9b3f
AE
1318}
1319
57acbaa7 1320static void obj_request_done_set(struct rbd_obj_request *obj_request)
6365d33a 1321{
57acbaa7
AE
1322 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1323 struct rbd_device *rbd_dev = NULL;
6365d33a 1324
57acbaa7
AE
1325 if (obj_request_img_data_test(obj_request))
1326 rbd_dev = obj_request->img_request->rbd_dev;
1327 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
6365d33a
AE
1328 obj_request);
1329 }
1330}
1331
57acbaa7 1332static bool obj_request_done_test(struct rbd_obj_request *obj_request)
6365d33a
AE
1333{
1334 smp_mb();
57acbaa7 1335 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
6365d33a
AE
1336}
1337
5679c59f
AE
1338/*
1339 * This sets the KNOWN flag after (possibly) setting the EXISTS
1340 * flag. The latter is set based on the "exists" value provided.
1341 *
1342 * Note that for our purposes once an object exists it never goes
1343 * away again. It's possible that the response from two existence
1344 * checks are separated by the creation of the target object, and
1345 * the first ("doesn't exist") response arrives *after* the second
1346 * ("does exist"). In that case we ignore the second one.
1347 */
1348static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1349 bool exists)
1350{
1351 if (exists)
1352 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1353 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1354 smp_mb();
1355}
1356
1357static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1358{
1359 smp_mb();
1360 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1361}
1362
1363static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1364{
1365 smp_mb();
1366 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1367}
1368
bf0d5f50
AE
1369static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1370{
37206ee5
AE
1371 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1372 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1373 kref_get(&obj_request->kref);
1374}
1375
1376static void rbd_obj_request_destroy(struct kref *kref);
1377static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1378{
1379 rbd_assert(obj_request != NULL);
37206ee5
AE
1380 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1381 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1382 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1383}
1384
e93f3152
AE
1385static bool img_request_child_test(struct rbd_img_request *img_request);
1386static void rbd_parent_request_destroy(struct kref *kref);
bf0d5f50
AE
1387static void rbd_img_request_destroy(struct kref *kref);
1388static void rbd_img_request_put(struct rbd_img_request *img_request)
1389{
1390 rbd_assert(img_request != NULL);
37206ee5
AE
1391 dout("%s: img %p (was %d)\n", __func__, img_request,
1392 atomic_read(&img_request->kref.refcount));
e93f3152
AE
1393 if (img_request_child_test(img_request))
1394 kref_put(&img_request->kref, rbd_parent_request_destroy);
1395 else
1396 kref_put(&img_request->kref, rbd_img_request_destroy);
bf0d5f50
AE
1397}
1398
1399static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1400 struct rbd_obj_request *obj_request)
1401{
25dcf954
AE
1402 rbd_assert(obj_request->img_request == NULL);
1403
b155e86c 1404 /* Image request now owns object's original reference */
bf0d5f50 1405 obj_request->img_request = img_request;
25dcf954 1406 obj_request->which = img_request->obj_request_count;
6365d33a
AE
1407 rbd_assert(!obj_request_img_data_test(obj_request));
1408 obj_request_img_data_set(obj_request);
bf0d5f50 1409 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954
AE
1410 img_request->obj_request_count++;
1411 list_add_tail(&obj_request->links, &img_request->obj_requests);
37206ee5
AE
1412 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1413 obj_request->which);
bf0d5f50
AE
1414}
1415
1416static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1417 struct rbd_obj_request *obj_request)
1418{
1419 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954 1420
37206ee5
AE
1421 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1422 obj_request->which);
bf0d5f50 1423 list_del(&obj_request->links);
25dcf954
AE
1424 rbd_assert(img_request->obj_request_count > 0);
1425 img_request->obj_request_count--;
1426 rbd_assert(obj_request->which == img_request->obj_request_count);
1427 obj_request->which = BAD_WHICH;
6365d33a 1428 rbd_assert(obj_request_img_data_test(obj_request));
bf0d5f50 1429 rbd_assert(obj_request->img_request == img_request);
bf0d5f50 1430 obj_request->img_request = NULL;
25dcf954 1431 obj_request->callback = NULL;
bf0d5f50
AE
1432 rbd_obj_request_put(obj_request);
1433}
1434
1435static bool obj_request_type_valid(enum obj_request_type type)
1436{
1437 switch (type) {
9969ebc5 1438 case OBJ_REQUEST_NODATA:
bf0d5f50 1439 case OBJ_REQUEST_BIO:
788e2df3 1440 case OBJ_REQUEST_PAGES:
bf0d5f50
AE
1441 return true;
1442 default:
1443 return false;
1444 }
1445}
1446
bf0d5f50
AE
1447static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1448 struct rbd_obj_request *obj_request)
1449{
37206ee5
AE
1450 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1451
bf0d5f50
AE
1452 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1453}
1454
1455static void rbd_img_request_complete(struct rbd_img_request *img_request)
1456{
55f27e09 1457
37206ee5 1458 dout("%s: img %p\n", __func__, img_request);
55f27e09
AE
1459
1460 /*
1461 * If no error occurred, compute the aggregate transfer
1462 * count for the image request. We could instead use
1463 * atomic64_cmpxchg() to update it as each object request
1464 * completes; not clear which way is better off hand.
1465 */
1466 if (!img_request->result) {
1467 struct rbd_obj_request *obj_request;
1468 u64 xferred = 0;
1469
1470 for_each_obj_request(img_request, obj_request)
1471 xferred += obj_request->xferred;
1472 img_request->xferred = xferred;
1473 }
1474
bf0d5f50
AE
1475 if (img_request->callback)
1476 img_request->callback(img_request);
1477 else
1478 rbd_img_request_put(img_request);
1479}
1480
788e2df3
AE
1481/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1482
1483static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1484{
37206ee5
AE
1485 dout("%s: obj %p\n", __func__, obj_request);
1486
788e2df3
AE
1487 return wait_for_completion_interruptible(&obj_request->completion);
1488}
1489
0c425248
AE
1490/*
1491 * The default/initial value for all image request flags is 0. Each
1492 * is conditionally set to 1 at image request initialization time
1493 * and currently never change thereafter.
1494 */
1495static void img_request_write_set(struct rbd_img_request *img_request)
1496{
1497 set_bit(IMG_REQ_WRITE, &img_request->flags);
1498 smp_mb();
1499}
1500
1501static bool img_request_write_test(struct rbd_img_request *img_request)
1502{
1503 smp_mb();
1504 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1505}
1506
9849e986
AE
1507static void img_request_child_set(struct rbd_img_request *img_request)
1508{
1509 set_bit(IMG_REQ_CHILD, &img_request->flags);
1510 smp_mb();
1511}
1512
e93f3152
AE
1513static void img_request_child_clear(struct rbd_img_request *img_request)
1514{
1515 clear_bit(IMG_REQ_CHILD, &img_request->flags);
1516 smp_mb();
1517}
1518
9849e986
AE
1519static bool img_request_child_test(struct rbd_img_request *img_request)
1520{
1521 smp_mb();
1522 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1523}
1524
d0b2e944
AE
1525static void img_request_layered_set(struct rbd_img_request *img_request)
1526{
1527 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1528 smp_mb();
1529}
1530
a2acd00e
AE
1531static void img_request_layered_clear(struct rbd_img_request *img_request)
1532{
1533 clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1534 smp_mb();
1535}
1536
d0b2e944
AE
1537static bool img_request_layered_test(struct rbd_img_request *img_request)
1538{
1539 smp_mb();
1540 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1541}
1542
6e2a4505
AE
1543static void
1544rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1545{
b9434c5b
AE
1546 u64 xferred = obj_request->xferred;
1547 u64 length = obj_request->length;
1548
6e2a4505
AE
1549 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1550 obj_request, obj_request->img_request, obj_request->result,
b9434c5b 1551 xferred, length);
6e2a4505 1552 /*
17c1cc1d
JD
1553 * ENOENT means a hole in the image. We zero-fill the entire
1554 * length of the request. A short read also implies zero-fill
1555 * to the end of the request. An error requires the whole
1556 * length of the request to be reported finished with an error
1557 * to the block layer. In each case we update the xferred
1558 * count to indicate the whole request was satisfied.
6e2a4505 1559 */
b9434c5b 1560 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
6e2a4505 1561 if (obj_request->result == -ENOENT) {
b9434c5b
AE
1562 if (obj_request->type == OBJ_REQUEST_BIO)
1563 zero_bio_chain(obj_request->bio_list, 0);
1564 else
1565 zero_pages(obj_request->pages, 0, length);
6e2a4505 1566 obj_request->result = 0;
b9434c5b
AE
1567 } else if (xferred < length && !obj_request->result) {
1568 if (obj_request->type == OBJ_REQUEST_BIO)
1569 zero_bio_chain(obj_request->bio_list, xferred);
1570 else
1571 zero_pages(obj_request->pages, xferred, length);
6e2a4505 1572 }
17c1cc1d 1573 obj_request->xferred = length;
6e2a4505
AE
1574 obj_request_done_set(obj_request);
1575}
1576
bf0d5f50
AE
1577static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1578{
37206ee5
AE
1579 dout("%s: obj %p cb %p\n", __func__, obj_request,
1580 obj_request->callback);
bf0d5f50
AE
1581 if (obj_request->callback)
1582 obj_request->callback(obj_request);
788e2df3
AE
1583 else
1584 complete_all(&obj_request->completion);
bf0d5f50
AE
1585}
1586
c47f9371 1587static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
39bf2c5d
AE
1588{
1589 dout("%s: obj %p\n", __func__, obj_request);
1590 obj_request_done_set(obj_request);
1591}
1592
c47f9371 1593static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1594{
57acbaa7 1595 struct rbd_img_request *img_request = NULL;
a9e8ba2c 1596 struct rbd_device *rbd_dev = NULL;
57acbaa7
AE
1597 bool layered = false;
1598
1599 if (obj_request_img_data_test(obj_request)) {
1600 img_request = obj_request->img_request;
1601 layered = img_request && img_request_layered_test(img_request);
a9e8ba2c 1602 rbd_dev = img_request->rbd_dev;
57acbaa7 1603 }
8b3e1a56
AE
1604
1605 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1606 obj_request, img_request, obj_request->result,
1607 obj_request->xferred, obj_request->length);
a9e8ba2c
AE
1608 if (layered && obj_request->result == -ENOENT &&
1609 obj_request->img_offset < rbd_dev->parent_overlap)
8b3e1a56
AE
1610 rbd_img_parent_read(obj_request);
1611 else if (img_request)
6e2a4505
AE
1612 rbd_img_obj_request_read_callback(obj_request);
1613 else
1614 obj_request_done_set(obj_request);
bf0d5f50
AE
1615}
1616
c47f9371 1617static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1618{
1b83bef2
SW
1619 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1620 obj_request->result, obj_request->length);
1621 /*
8b3e1a56
AE
1622 * There is no such thing as a successful short write. Set
1623 * it to our originally-requested length.
1b83bef2
SW
1624 */
1625 obj_request->xferred = obj_request->length;
07741308 1626 obj_request_done_set(obj_request);
bf0d5f50
AE
1627}
1628
fbfab539
AE
1629/*
1630 * For a simple stat call there's nothing to do. We'll do more if
1631 * this is part of a write sequence for a layered image.
1632 */
c47f9371 1633static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
fbfab539 1634{
37206ee5 1635 dout("%s: obj %p\n", __func__, obj_request);
fbfab539
AE
1636 obj_request_done_set(obj_request);
1637}
1638
bf0d5f50
AE
1639static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1640 struct ceph_msg *msg)
1641{
1642 struct rbd_obj_request *obj_request = osd_req->r_priv;
bf0d5f50
AE
1643 u16 opcode;
1644
37206ee5 1645 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
bf0d5f50 1646 rbd_assert(osd_req == obj_request->osd_req);
57acbaa7
AE
1647 if (obj_request_img_data_test(obj_request)) {
1648 rbd_assert(obj_request->img_request);
1649 rbd_assert(obj_request->which != BAD_WHICH);
1650 } else {
1651 rbd_assert(obj_request->which == BAD_WHICH);
1652 }
bf0d5f50 1653
1b83bef2
SW
1654 if (osd_req->r_result < 0)
1655 obj_request->result = osd_req->r_result;
bf0d5f50 1656
0eefd470 1657 BUG_ON(osd_req->r_num_ops > 2);
bf0d5f50 1658
c47f9371
AE
1659 /*
1660 * We support a 64-bit length, but ultimately it has to be
1661 * passed to blk_end_request(), which takes an unsigned int.
1662 */
1b83bef2 1663 obj_request->xferred = osd_req->r_reply_op_len[0];
8b3e1a56 1664 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
79528734 1665 opcode = osd_req->r_ops[0].op;
bf0d5f50
AE
1666 switch (opcode) {
1667 case CEPH_OSD_OP_READ:
c47f9371 1668 rbd_osd_read_callback(obj_request);
bf0d5f50
AE
1669 break;
1670 case CEPH_OSD_OP_WRITE:
c47f9371 1671 rbd_osd_write_callback(obj_request);
bf0d5f50 1672 break;
fbfab539 1673 case CEPH_OSD_OP_STAT:
c47f9371 1674 rbd_osd_stat_callback(obj_request);
fbfab539 1675 break;
36be9a76 1676 case CEPH_OSD_OP_CALL:
b8d70035 1677 case CEPH_OSD_OP_NOTIFY_ACK:
9969ebc5 1678 case CEPH_OSD_OP_WATCH:
c47f9371 1679 rbd_osd_trivial_callback(obj_request);
9969ebc5 1680 break;
bf0d5f50
AE
1681 default:
1682 rbd_warn(NULL, "%s: unsupported op %hu\n",
1683 obj_request->object_name, (unsigned short) opcode);
1684 break;
1685 }
1686
07741308 1687 if (obj_request_done_test(obj_request))
bf0d5f50
AE
1688 rbd_obj_request_complete(obj_request);
1689}
1690
9d4df01f 1691static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
430c28c3
AE
1692{
1693 struct rbd_img_request *img_request = obj_request->img_request;
8c042b0d 1694 struct ceph_osd_request *osd_req = obj_request->osd_req;
9d4df01f 1695 u64 snap_id;
430c28c3 1696
8c042b0d 1697 rbd_assert(osd_req != NULL);
430c28c3 1698
9d4df01f 1699 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
8c042b0d 1700 ceph_osdc_build_request(osd_req, obj_request->offset,
9d4df01f
AE
1701 NULL, snap_id, NULL);
1702}
1703
1704static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1705{
1706 struct rbd_img_request *img_request = obj_request->img_request;
1707 struct ceph_osd_request *osd_req = obj_request->osd_req;
1708 struct ceph_snap_context *snapc;
1709 struct timespec mtime = CURRENT_TIME;
1710
1711 rbd_assert(osd_req != NULL);
1712
1713 snapc = img_request ? img_request->snapc : NULL;
1714 ceph_osdc_build_request(osd_req, obj_request->offset,
1715 snapc, CEPH_NOSNAP, &mtime);
430c28c3
AE
1716}
1717
bf0d5f50
AE
1718static struct ceph_osd_request *rbd_osd_req_create(
1719 struct rbd_device *rbd_dev,
1720 bool write_request,
430c28c3 1721 struct rbd_obj_request *obj_request)
bf0d5f50 1722{
bf0d5f50
AE
1723 struct ceph_snap_context *snapc = NULL;
1724 struct ceph_osd_client *osdc;
1725 struct ceph_osd_request *osd_req;
bf0d5f50 1726
6365d33a
AE
1727 if (obj_request_img_data_test(obj_request)) {
1728 struct rbd_img_request *img_request = obj_request->img_request;
1729
0c425248
AE
1730 rbd_assert(write_request ==
1731 img_request_write_test(img_request));
1732 if (write_request)
bf0d5f50 1733 snapc = img_request->snapc;
bf0d5f50
AE
1734 }
1735
1736 /* Allocate and initialize the request, for the single op */
1737
1738 osdc = &rbd_dev->rbd_client->client->osdc;
1739 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1740 if (!osd_req)
1741 return NULL; /* ENOMEM */
bf0d5f50 1742
430c28c3 1743 if (write_request)
bf0d5f50 1744 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
430c28c3 1745 else
bf0d5f50 1746 osd_req->r_flags = CEPH_OSD_FLAG_READ;
bf0d5f50
AE
1747
1748 osd_req->r_callback = rbd_osd_req_callback;
1749 osd_req->r_priv = obj_request;
1750
3c972c95
ID
1751 osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
1752 ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name);
bf0d5f50 1753
bf0d5f50
AE
1754 return osd_req;
1755}
1756
0eefd470
AE
1757/*
1758 * Create a copyup osd request based on the information in the
1759 * object request supplied. A copyup request has two osd ops,
1760 * a copyup method call, and a "normal" write request.
1761 */
1762static struct ceph_osd_request *
1763rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1764{
1765 struct rbd_img_request *img_request;
1766 struct ceph_snap_context *snapc;
1767 struct rbd_device *rbd_dev;
1768 struct ceph_osd_client *osdc;
1769 struct ceph_osd_request *osd_req;
1770
1771 rbd_assert(obj_request_img_data_test(obj_request));
1772 img_request = obj_request->img_request;
1773 rbd_assert(img_request);
1774 rbd_assert(img_request_write_test(img_request));
1775
1776 /* Allocate and initialize the request, for the two ops */
1777
1778 snapc = img_request->snapc;
1779 rbd_dev = img_request->rbd_dev;
1780 osdc = &rbd_dev->rbd_client->client->osdc;
1781 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1782 if (!osd_req)
1783 return NULL; /* ENOMEM */
1784
1785 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1786 osd_req->r_callback = rbd_osd_req_callback;
1787 osd_req->r_priv = obj_request;
1788
3c972c95
ID
1789 osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
1790 ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name);
0eefd470 1791
0eefd470
AE
1792 return osd_req;
1793}
1794
1795
bf0d5f50
AE
1796static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1797{
1798 ceph_osdc_put_request(osd_req);
1799}
1800
1801/* object_name is assumed to be a non-null pointer and NUL-terminated */
1802
1803static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1804 u64 offset, u64 length,
1805 enum obj_request_type type)
1806{
1807 struct rbd_obj_request *obj_request;
1808 size_t size;
1809 char *name;
1810
1811 rbd_assert(obj_request_type_valid(type));
1812
1813 size = strlen(object_name) + 1;
f907ad55
AE
1814 name = kmalloc(size, GFP_KERNEL);
1815 if (!name)
bf0d5f50
AE
1816 return NULL;
1817
868311b1 1818 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
f907ad55
AE
1819 if (!obj_request) {
1820 kfree(name);
1821 return NULL;
1822 }
1823
bf0d5f50
AE
1824 obj_request->object_name = memcpy(name, object_name, size);
1825 obj_request->offset = offset;
1826 obj_request->length = length;
926f9b3f 1827 obj_request->flags = 0;
bf0d5f50
AE
1828 obj_request->which = BAD_WHICH;
1829 obj_request->type = type;
1830 INIT_LIST_HEAD(&obj_request->links);
788e2df3 1831 init_completion(&obj_request->completion);
bf0d5f50
AE
1832 kref_init(&obj_request->kref);
1833
37206ee5
AE
1834 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1835 offset, length, (int)type, obj_request);
1836
bf0d5f50
AE
1837 return obj_request;
1838}
1839
1840static void rbd_obj_request_destroy(struct kref *kref)
1841{
1842 struct rbd_obj_request *obj_request;
1843
1844 obj_request = container_of(kref, struct rbd_obj_request, kref);
1845
37206ee5
AE
1846 dout("%s: obj %p\n", __func__, obj_request);
1847
bf0d5f50
AE
1848 rbd_assert(obj_request->img_request == NULL);
1849 rbd_assert(obj_request->which == BAD_WHICH);
1850
1851 if (obj_request->osd_req)
1852 rbd_osd_req_destroy(obj_request->osd_req);
1853
1854 rbd_assert(obj_request_type_valid(obj_request->type));
1855 switch (obj_request->type) {
9969ebc5
AE
1856 case OBJ_REQUEST_NODATA:
1857 break; /* Nothing to do */
bf0d5f50
AE
1858 case OBJ_REQUEST_BIO:
1859 if (obj_request->bio_list)
1860 bio_chain_put(obj_request->bio_list);
1861 break;
788e2df3
AE
1862 case OBJ_REQUEST_PAGES:
1863 if (obj_request->pages)
1864 ceph_release_page_vector(obj_request->pages,
1865 obj_request->page_count);
1866 break;
bf0d5f50
AE
1867 }
1868
f907ad55 1869 kfree(obj_request->object_name);
868311b1
AE
1870 obj_request->object_name = NULL;
1871 kmem_cache_free(rbd_obj_request_cache, obj_request);
bf0d5f50
AE
1872}
1873
fb65d228
AE
1874/* It's OK to call this for a device with no parent */
1875
1876static void rbd_spec_put(struct rbd_spec *spec);
1877static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1878{
1879 rbd_dev_remove_parent(rbd_dev);
1880 rbd_spec_put(rbd_dev->parent_spec);
1881 rbd_dev->parent_spec = NULL;
1882 rbd_dev->parent_overlap = 0;
1883}
1884
a2acd00e
AE
1885/*
1886 * Parent image reference counting is used to determine when an
1887 * image's parent fields can be safely torn down--after there are no
1888 * more in-flight requests to the parent image. When the last
1889 * reference is dropped, cleaning them up is safe.
1890 */
1891static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1892{
1893 int counter;
1894
1895 if (!rbd_dev->parent_spec)
1896 return;
1897
1898 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1899 if (counter > 0)
1900 return;
1901
1902 /* Last reference; clean up parent data structures */
1903
1904 if (!counter)
1905 rbd_dev_unparent(rbd_dev);
1906 else
1907 rbd_warn(rbd_dev, "parent reference underflow\n");
1908}
1909
1910/*
1911 * If an image has a non-zero parent overlap, get a reference to its
1912 * parent.
1913 *
392a9dad
AE
1914 * We must get the reference before checking for the overlap to
1915 * coordinate properly with zeroing the parent overlap in
1916 * rbd_dev_v2_parent_info() when an image gets flattened. We
1917 * drop it again if there is no overlap.
1918 *
a2acd00e
AE
1919 * Returns true if the rbd device has a parent with a non-zero
1920 * overlap and a reference for it was successfully taken, or
1921 * false otherwise.
1922 */
1923static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1924{
1925 int counter;
1926
1927 if (!rbd_dev->parent_spec)
1928 return false;
1929
1930 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1931 if (counter > 0 && rbd_dev->parent_overlap)
1932 return true;
1933
1934 /* Image was flattened, but parent is not yet torn down */
1935
1936 if (counter < 0)
1937 rbd_warn(rbd_dev, "parent reference overflow\n");
1938
1939 return false;
1940}
1941
bf0d5f50
AE
1942/*
1943 * Caller is responsible for filling in the list of object requests
1944 * that comprises the image request, and the Linux request pointer
1945 * (if there is one).
1946 */
cc344fa1
AE
1947static struct rbd_img_request *rbd_img_request_create(
1948 struct rbd_device *rbd_dev,
bf0d5f50 1949 u64 offset, u64 length,
e93f3152 1950 bool write_request)
bf0d5f50
AE
1951{
1952 struct rbd_img_request *img_request;
bf0d5f50 1953
1c2a9dfe 1954 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
bf0d5f50
AE
1955 if (!img_request)
1956 return NULL;
1957
1958 if (write_request) {
1959 down_read(&rbd_dev->header_rwsem);
812164f8 1960 ceph_get_snap_context(rbd_dev->header.snapc);
bf0d5f50 1961 up_read(&rbd_dev->header_rwsem);
bf0d5f50
AE
1962 }
1963
1964 img_request->rq = NULL;
1965 img_request->rbd_dev = rbd_dev;
1966 img_request->offset = offset;
1967 img_request->length = length;
0c425248
AE
1968 img_request->flags = 0;
1969 if (write_request) {
1970 img_request_write_set(img_request);
468521c1 1971 img_request->snapc = rbd_dev->header.snapc;
0c425248 1972 } else {
bf0d5f50 1973 img_request->snap_id = rbd_dev->spec->snap_id;
0c425248 1974 }
a2acd00e 1975 if (rbd_dev_parent_get(rbd_dev))
d0b2e944 1976 img_request_layered_set(img_request);
bf0d5f50
AE
1977 spin_lock_init(&img_request->completion_lock);
1978 img_request->next_completion = 0;
1979 img_request->callback = NULL;
a5a337d4 1980 img_request->result = 0;
bf0d5f50
AE
1981 img_request->obj_request_count = 0;
1982 INIT_LIST_HEAD(&img_request->obj_requests);
1983 kref_init(&img_request->kref);
1984
37206ee5
AE
1985 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1986 write_request ? "write" : "read", offset, length,
1987 img_request);
1988
bf0d5f50
AE
1989 return img_request;
1990}
1991
1992static void rbd_img_request_destroy(struct kref *kref)
1993{
1994 struct rbd_img_request *img_request;
1995 struct rbd_obj_request *obj_request;
1996 struct rbd_obj_request *next_obj_request;
1997
1998 img_request = container_of(kref, struct rbd_img_request, kref);
1999
37206ee5
AE
2000 dout("%s: img %p\n", __func__, img_request);
2001
bf0d5f50
AE
2002 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2003 rbd_img_obj_request_del(img_request, obj_request);
25dcf954 2004 rbd_assert(img_request->obj_request_count == 0);
bf0d5f50 2005
a2acd00e
AE
2006 if (img_request_layered_test(img_request)) {
2007 img_request_layered_clear(img_request);
2008 rbd_dev_parent_put(img_request->rbd_dev);
2009 }
2010
0c425248 2011 if (img_request_write_test(img_request))
812164f8 2012 ceph_put_snap_context(img_request->snapc);
bf0d5f50 2013
1c2a9dfe 2014 kmem_cache_free(rbd_img_request_cache, img_request);
bf0d5f50
AE
2015}
2016
e93f3152
AE
2017static struct rbd_img_request *rbd_parent_request_create(
2018 struct rbd_obj_request *obj_request,
2019 u64 img_offset, u64 length)
2020{
2021 struct rbd_img_request *parent_request;
2022 struct rbd_device *rbd_dev;
2023
2024 rbd_assert(obj_request->img_request);
2025 rbd_dev = obj_request->img_request->rbd_dev;
2026
2027 parent_request = rbd_img_request_create(rbd_dev->parent,
2028 img_offset, length, false);
2029 if (!parent_request)
2030 return NULL;
2031
2032 img_request_child_set(parent_request);
2033 rbd_obj_request_get(obj_request);
2034 parent_request->obj_request = obj_request;
2035
2036 return parent_request;
2037}
2038
2039static void rbd_parent_request_destroy(struct kref *kref)
2040{
2041 struct rbd_img_request *parent_request;
2042 struct rbd_obj_request *orig_request;
2043
2044 parent_request = container_of(kref, struct rbd_img_request, kref);
2045 orig_request = parent_request->obj_request;
2046
2047 parent_request->obj_request = NULL;
2048 rbd_obj_request_put(orig_request);
2049 img_request_child_clear(parent_request);
2050
2051 rbd_img_request_destroy(kref);
2052}
2053
1217857f
AE
2054static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2055{
6365d33a 2056 struct rbd_img_request *img_request;
1217857f
AE
2057 unsigned int xferred;
2058 int result;
8b3e1a56 2059 bool more;
1217857f 2060
6365d33a
AE
2061 rbd_assert(obj_request_img_data_test(obj_request));
2062 img_request = obj_request->img_request;
2063
1217857f
AE
2064 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2065 xferred = (unsigned int)obj_request->xferred;
2066 result = obj_request->result;
2067 if (result) {
2068 struct rbd_device *rbd_dev = img_request->rbd_dev;
2069
2070 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
2071 img_request_write_test(img_request) ? "write" : "read",
2072 obj_request->length, obj_request->img_offset,
2073 obj_request->offset);
2074 rbd_warn(rbd_dev, " result %d xferred %x\n",
2075 result, xferred);
2076 if (!img_request->result)
2077 img_request->result = result;
2078 }
2079
f1a4739f
AE
2080 /* Image object requests don't own their page array */
2081
2082 if (obj_request->type == OBJ_REQUEST_PAGES) {
2083 obj_request->pages = NULL;
2084 obj_request->page_count = 0;
2085 }
2086
8b3e1a56
AE
2087 if (img_request_child_test(img_request)) {
2088 rbd_assert(img_request->obj_request != NULL);
2089 more = obj_request->which < img_request->obj_request_count - 1;
2090 } else {
2091 rbd_assert(img_request->rq != NULL);
2092 more = blk_end_request(img_request->rq, result, xferred);
2093 }
2094
2095 return more;
1217857f
AE
2096}
2097
2169238d
AE
2098static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2099{
2100 struct rbd_img_request *img_request;
2101 u32 which = obj_request->which;
2102 bool more = true;
2103
6365d33a 2104 rbd_assert(obj_request_img_data_test(obj_request));
2169238d
AE
2105 img_request = obj_request->img_request;
2106
2107 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2108 rbd_assert(img_request != NULL);
2169238d
AE
2109 rbd_assert(img_request->obj_request_count > 0);
2110 rbd_assert(which != BAD_WHICH);
2111 rbd_assert(which < img_request->obj_request_count);
2169238d
AE
2112
2113 spin_lock_irq(&img_request->completion_lock);
2114 if (which != img_request->next_completion)
2115 goto out;
2116
2117 for_each_obj_request_from(img_request, obj_request) {
2169238d
AE
2118 rbd_assert(more);
2119 rbd_assert(which < img_request->obj_request_count);
2120
2121 if (!obj_request_done_test(obj_request))
2122 break;
1217857f 2123 more = rbd_img_obj_end_request(obj_request);
2169238d
AE
2124 which++;
2125 }
2126
2127 rbd_assert(more ^ (which == img_request->obj_request_count));
2128 img_request->next_completion = which;
2129out:
2130 spin_unlock_irq(&img_request->completion_lock);
2131
2132 if (!more)
2133 rbd_img_request_complete(img_request);
2134}
2135
f1a4739f
AE
2136/*
2137 * Split up an image request into one or more object requests, each
2138 * to a different object. The "type" parameter indicates whether
2139 * "data_desc" is the pointer to the head of a list of bio
2140 * structures, or the base of a page array. In either case this
2141 * function assumes data_desc describes memory sufficient to hold
2142 * all data described by the image request.
2143 */
2144static int rbd_img_request_fill(struct rbd_img_request *img_request,
2145 enum obj_request_type type,
2146 void *data_desc)
bf0d5f50
AE
2147{
2148 struct rbd_device *rbd_dev = img_request->rbd_dev;
2149 struct rbd_obj_request *obj_request = NULL;
2150 struct rbd_obj_request *next_obj_request;
0c425248 2151 bool write_request = img_request_write_test(img_request);
a158073c 2152 struct bio *bio_list = NULL;
f1a4739f 2153 unsigned int bio_offset = 0;
a158073c 2154 struct page **pages = NULL;
7da22d29 2155 u64 img_offset;
bf0d5f50
AE
2156 u64 resid;
2157 u16 opcode;
2158
f1a4739f
AE
2159 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2160 (int)type, data_desc);
37206ee5 2161
430c28c3 2162 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
7da22d29 2163 img_offset = img_request->offset;
bf0d5f50 2164 resid = img_request->length;
4dda41d3 2165 rbd_assert(resid > 0);
f1a4739f
AE
2166
2167 if (type == OBJ_REQUEST_BIO) {
2168 bio_list = data_desc;
4f024f37
KO
2169 rbd_assert(img_offset ==
2170 bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
f1a4739f
AE
2171 } else {
2172 rbd_assert(type == OBJ_REQUEST_PAGES);
2173 pages = data_desc;
2174 }
2175
bf0d5f50 2176 while (resid) {
2fa12320 2177 struct ceph_osd_request *osd_req;
bf0d5f50 2178 const char *object_name;
bf0d5f50
AE
2179 u64 offset;
2180 u64 length;
2181
7da22d29 2182 object_name = rbd_segment_name(rbd_dev, img_offset);
bf0d5f50
AE
2183 if (!object_name)
2184 goto out_unwind;
7da22d29
AE
2185 offset = rbd_segment_offset(rbd_dev, img_offset);
2186 length = rbd_segment_length(rbd_dev, img_offset, resid);
bf0d5f50 2187 obj_request = rbd_obj_request_create(object_name,
f1a4739f 2188 offset, length, type);
78c2a44a
AE
2189 /* object request has its own copy of the object name */
2190 rbd_segment_name_free(object_name);
bf0d5f50
AE
2191 if (!obj_request)
2192 goto out_unwind;
03507db6
JD
2193 /*
2194 * set obj_request->img_request before creating the
2195 * osd_request so that it gets the right snapc
2196 */
2197 rbd_img_obj_request_add(img_request, obj_request);
bf0d5f50 2198
f1a4739f
AE
2199 if (type == OBJ_REQUEST_BIO) {
2200 unsigned int clone_size;
2201
2202 rbd_assert(length <= (u64)UINT_MAX);
2203 clone_size = (unsigned int)length;
2204 obj_request->bio_list =
2205 bio_chain_clone_range(&bio_list,
2206 &bio_offset,
2207 clone_size,
2208 GFP_ATOMIC);
2209 if (!obj_request->bio_list)
2210 goto out_partial;
2211 } else {
2212 unsigned int page_count;
2213
2214 obj_request->pages = pages;
2215 page_count = (u32)calc_pages_for(offset, length);
2216 obj_request->page_count = page_count;
2217 if ((offset + length) & ~PAGE_MASK)
2218 page_count--; /* more on last page */
2219 pages += page_count;
2220 }
bf0d5f50 2221
2fa12320
AE
2222 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2223 obj_request);
2224 if (!osd_req)
bf0d5f50 2225 goto out_partial;
2fa12320 2226 obj_request->osd_req = osd_req;
2169238d 2227 obj_request->callback = rbd_img_obj_callback;
430c28c3 2228
2fa12320
AE
2229 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2230 0, 0);
f1a4739f
AE
2231 if (type == OBJ_REQUEST_BIO)
2232 osd_req_op_extent_osd_data_bio(osd_req, 0,
2233 obj_request->bio_list, length);
2234 else
2235 osd_req_op_extent_osd_data_pages(osd_req, 0,
2236 obj_request->pages, length,
2237 offset & ~PAGE_MASK, false, false);
9d4df01f
AE
2238
2239 if (write_request)
2240 rbd_osd_req_format_write(obj_request);
2241 else
2242 rbd_osd_req_format_read(obj_request);
430c28c3 2243
7da22d29 2244 obj_request->img_offset = img_offset;
bf0d5f50 2245
7da22d29 2246 img_offset += length;
bf0d5f50
AE
2247 resid -= length;
2248 }
2249
2250 return 0;
2251
2252out_partial:
2253 rbd_obj_request_put(obj_request);
2254out_unwind:
2255 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2256 rbd_obj_request_put(obj_request);
2257
2258 return -ENOMEM;
2259}
2260
0eefd470
AE
2261static void
2262rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2263{
2264 struct rbd_img_request *img_request;
2265 struct rbd_device *rbd_dev;
ebda6408 2266 struct page **pages;
0eefd470
AE
2267 u32 page_count;
2268
2269 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2270 rbd_assert(obj_request_img_data_test(obj_request));
2271 img_request = obj_request->img_request;
2272 rbd_assert(img_request);
2273
2274 rbd_dev = img_request->rbd_dev;
2275 rbd_assert(rbd_dev);
0eefd470 2276
ebda6408
AE
2277 pages = obj_request->copyup_pages;
2278 rbd_assert(pages != NULL);
0eefd470 2279 obj_request->copyup_pages = NULL;
ebda6408
AE
2280 page_count = obj_request->copyup_page_count;
2281 rbd_assert(page_count);
2282 obj_request->copyup_page_count = 0;
2283 ceph_release_page_vector(pages, page_count);
0eefd470
AE
2284
2285 /*
2286 * We want the transfer count to reflect the size of the
2287 * original write request. There is no such thing as a
2288 * successful short write, so if the request was successful
2289 * we can just set it to the originally-requested length.
2290 */
2291 if (!obj_request->result)
2292 obj_request->xferred = obj_request->length;
2293
2294 /* Finish up with the normal image object callback */
2295
2296 rbd_img_obj_callback(obj_request);
2297}
2298
3d7efd18
AE
2299static void
2300rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2301{
2302 struct rbd_obj_request *orig_request;
0eefd470
AE
2303 struct ceph_osd_request *osd_req;
2304 struct ceph_osd_client *osdc;
2305 struct rbd_device *rbd_dev;
3d7efd18 2306 struct page **pages;
ebda6408 2307 u32 page_count;
bbea1c1a 2308 int img_result;
ebda6408 2309 u64 parent_length;
b91f09f1
AE
2310 u64 offset;
2311 u64 length;
3d7efd18
AE
2312
2313 rbd_assert(img_request_child_test(img_request));
2314
2315 /* First get what we need from the image request */
2316
2317 pages = img_request->copyup_pages;
2318 rbd_assert(pages != NULL);
2319 img_request->copyup_pages = NULL;
ebda6408
AE
2320 page_count = img_request->copyup_page_count;
2321 rbd_assert(page_count);
2322 img_request->copyup_page_count = 0;
3d7efd18
AE
2323
2324 orig_request = img_request->obj_request;
2325 rbd_assert(orig_request != NULL);
b91f09f1 2326 rbd_assert(obj_request_type_valid(orig_request->type));
bbea1c1a 2327 img_result = img_request->result;
ebda6408
AE
2328 parent_length = img_request->length;
2329 rbd_assert(parent_length == img_request->xferred);
91c6febb 2330 rbd_img_request_put(img_request);
3d7efd18 2331
91c6febb
AE
2332 rbd_assert(orig_request->img_request);
2333 rbd_dev = orig_request->img_request->rbd_dev;
0eefd470 2334 rbd_assert(rbd_dev);
0eefd470 2335
bbea1c1a
AE
2336 /*
2337 * If the overlap has become 0 (most likely because the
2338 * image has been flattened) we need to free the pages
2339 * and re-submit the original write request.
2340 */
2341 if (!rbd_dev->parent_overlap) {
2342 struct ceph_osd_client *osdc;
3d7efd18 2343
bbea1c1a
AE
2344 ceph_release_page_vector(pages, page_count);
2345 osdc = &rbd_dev->rbd_client->client->osdc;
2346 img_result = rbd_obj_request_submit(osdc, orig_request);
2347 if (!img_result)
2348 return;
2349 }
0eefd470 2350
bbea1c1a 2351 if (img_result)
0eefd470 2352 goto out_err;
0eefd470 2353
8785b1d4
AE
2354 /*
2355 * The original osd request is of no use to use any more.
2356 * We need a new one that can hold the two ops in a copyup
2357 * request. Allocate the new copyup osd request for the
2358 * original request, and release the old one.
2359 */
bbea1c1a 2360 img_result = -ENOMEM;
0eefd470
AE
2361 osd_req = rbd_osd_req_create_copyup(orig_request);
2362 if (!osd_req)
2363 goto out_err;
8785b1d4 2364 rbd_osd_req_destroy(orig_request->osd_req);
0eefd470
AE
2365 orig_request->osd_req = osd_req;
2366 orig_request->copyup_pages = pages;
ebda6408 2367 orig_request->copyup_page_count = page_count;
3d7efd18 2368
0eefd470 2369 /* Initialize the copyup op */
3d7efd18 2370
0eefd470 2371 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
ebda6408 2372 osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
0eefd470 2373 false, false);
3d7efd18 2374
0eefd470
AE
2375 /* Then the original write request op */
2376
b91f09f1
AE
2377 offset = orig_request->offset;
2378 length = orig_request->length;
0eefd470 2379 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
b91f09f1
AE
2380 offset, length, 0, 0);
2381 if (orig_request->type == OBJ_REQUEST_BIO)
2382 osd_req_op_extent_osd_data_bio(osd_req, 1,
2383 orig_request->bio_list, length);
2384 else
2385 osd_req_op_extent_osd_data_pages(osd_req, 1,
2386 orig_request->pages, length,
2387 offset & ~PAGE_MASK, false, false);
0eefd470
AE
2388
2389 rbd_osd_req_format_write(orig_request);
2390
2391 /* All set, send it off. */
2392
2393 orig_request->callback = rbd_img_obj_copyup_callback;
2394 osdc = &rbd_dev->rbd_client->client->osdc;
bbea1c1a
AE
2395 img_result = rbd_obj_request_submit(osdc, orig_request);
2396 if (!img_result)
0eefd470
AE
2397 return;
2398out_err:
2399 /* Record the error code and complete the request */
2400
bbea1c1a 2401 orig_request->result = img_result;
0eefd470
AE
2402 orig_request->xferred = 0;
2403 obj_request_done_set(orig_request);
2404 rbd_obj_request_complete(orig_request);
3d7efd18
AE
2405}
2406
2407/*
2408 * Read from the parent image the range of data that covers the
2409 * entire target of the given object request. This is used for
2410 * satisfying a layered image write request when the target of an
2411 * object request from the image request does not exist.
2412 *
2413 * A page array big enough to hold the returned data is allocated
2414 * and supplied to rbd_img_request_fill() as the "data descriptor."
2415 * When the read completes, this page array will be transferred to
2416 * the original object request for the copyup operation.
2417 *
2418 * If an error occurs, record it as the result of the original
2419 * object request and mark it done so it gets completed.
2420 */
2421static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2422{
2423 struct rbd_img_request *img_request = NULL;
2424 struct rbd_img_request *parent_request = NULL;
2425 struct rbd_device *rbd_dev;
2426 u64 img_offset;
2427 u64 length;
2428 struct page **pages = NULL;
2429 u32 page_count;
2430 int result;
2431
2432 rbd_assert(obj_request_img_data_test(obj_request));
b91f09f1 2433 rbd_assert(obj_request_type_valid(obj_request->type));
3d7efd18
AE
2434
2435 img_request = obj_request->img_request;
2436 rbd_assert(img_request != NULL);
2437 rbd_dev = img_request->rbd_dev;
2438 rbd_assert(rbd_dev->parent != NULL);
2439
2440 /*
2441 * Determine the byte range covered by the object in the
2442 * child image to which the original request was to be sent.
2443 */
2444 img_offset = obj_request->img_offset - obj_request->offset;
2445 length = (u64)1 << rbd_dev->header.obj_order;
2446
a9e8ba2c
AE
2447 /*
2448 * There is no defined parent data beyond the parent
2449 * overlap, so limit what we read at that boundary if
2450 * necessary.
2451 */
2452 if (img_offset + length > rbd_dev->parent_overlap) {
2453 rbd_assert(img_offset < rbd_dev->parent_overlap);
2454 length = rbd_dev->parent_overlap - img_offset;
2455 }
2456
3d7efd18
AE
2457 /*
2458 * Allocate a page array big enough to receive the data read
2459 * from the parent.
2460 */
2461 page_count = (u32)calc_pages_for(0, length);
2462 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2463 if (IS_ERR(pages)) {
2464 result = PTR_ERR(pages);
2465 pages = NULL;
2466 goto out_err;
2467 }
2468
2469 result = -ENOMEM;
e93f3152
AE
2470 parent_request = rbd_parent_request_create(obj_request,
2471 img_offset, length);
3d7efd18
AE
2472 if (!parent_request)
2473 goto out_err;
3d7efd18
AE
2474
2475 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2476 if (result)
2477 goto out_err;
2478 parent_request->copyup_pages = pages;
ebda6408 2479 parent_request->copyup_page_count = page_count;
3d7efd18
AE
2480
2481 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2482 result = rbd_img_request_submit(parent_request);
2483 if (!result)
2484 return 0;
2485
2486 parent_request->copyup_pages = NULL;
ebda6408 2487 parent_request->copyup_page_count = 0;
3d7efd18
AE
2488 parent_request->obj_request = NULL;
2489 rbd_obj_request_put(obj_request);
2490out_err:
2491 if (pages)
2492 ceph_release_page_vector(pages, page_count);
2493 if (parent_request)
2494 rbd_img_request_put(parent_request);
2495 obj_request->result = result;
2496 obj_request->xferred = 0;
2497 obj_request_done_set(obj_request);
2498
2499 return result;
2500}
2501
c5b5ef6c
AE
2502static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2503{
c5b5ef6c 2504 struct rbd_obj_request *orig_request;
638f5abe 2505 struct rbd_device *rbd_dev;
c5b5ef6c
AE
2506 int result;
2507
2508 rbd_assert(!obj_request_img_data_test(obj_request));
2509
2510 /*
2511 * All we need from the object request is the original
2512 * request and the result of the STAT op. Grab those, then
2513 * we're done with the request.
2514 */
2515 orig_request = obj_request->obj_request;
2516 obj_request->obj_request = NULL;
912c317d 2517 rbd_obj_request_put(orig_request);
c5b5ef6c
AE
2518 rbd_assert(orig_request);
2519 rbd_assert(orig_request->img_request);
2520
2521 result = obj_request->result;
2522 obj_request->result = 0;
2523
2524 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2525 obj_request, orig_request, result,
2526 obj_request->xferred, obj_request->length);
2527 rbd_obj_request_put(obj_request);
2528
638f5abe
AE
2529 /*
2530 * If the overlap has become 0 (most likely because the
2531 * image has been flattened) we need to free the pages
2532 * and re-submit the original write request.
2533 */
2534 rbd_dev = orig_request->img_request->rbd_dev;
2535 if (!rbd_dev->parent_overlap) {
2536 struct ceph_osd_client *osdc;
2537
638f5abe
AE
2538 osdc = &rbd_dev->rbd_client->client->osdc;
2539 result = rbd_obj_request_submit(osdc, orig_request);
2540 if (!result)
2541 return;
2542 }
c5b5ef6c
AE
2543
2544 /*
2545 * Our only purpose here is to determine whether the object
2546 * exists, and we don't want to treat the non-existence as
2547 * an error. If something else comes back, transfer the
2548 * error to the original request and complete it now.
2549 */
2550 if (!result) {
2551 obj_request_existence_set(orig_request, true);
2552 } else if (result == -ENOENT) {
2553 obj_request_existence_set(orig_request, false);
2554 } else if (result) {
2555 orig_request->result = result;
3d7efd18 2556 goto out;
c5b5ef6c
AE
2557 }
2558
2559 /*
2560 * Resubmit the original request now that we have recorded
2561 * whether the target object exists.
2562 */
b454e36d 2563 orig_request->result = rbd_img_obj_request_submit(orig_request);
3d7efd18 2564out:
c5b5ef6c
AE
2565 if (orig_request->result)
2566 rbd_obj_request_complete(orig_request);
c5b5ef6c
AE
2567}
2568
2569static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2570{
2571 struct rbd_obj_request *stat_request;
2572 struct rbd_device *rbd_dev;
2573 struct ceph_osd_client *osdc;
2574 struct page **pages = NULL;
2575 u32 page_count;
2576 size_t size;
2577 int ret;
2578
2579 /*
2580 * The response data for a STAT call consists of:
2581 * le64 length;
2582 * struct {
2583 * le32 tv_sec;
2584 * le32 tv_nsec;
2585 * } mtime;
2586 */
2587 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2588 page_count = (u32)calc_pages_for(0, size);
2589 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2590 if (IS_ERR(pages))
2591 return PTR_ERR(pages);
2592
2593 ret = -ENOMEM;
2594 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2595 OBJ_REQUEST_PAGES);
2596 if (!stat_request)
2597 goto out;
2598
2599 rbd_obj_request_get(obj_request);
2600 stat_request->obj_request = obj_request;
2601 stat_request->pages = pages;
2602 stat_request->page_count = page_count;
2603
2604 rbd_assert(obj_request->img_request);
2605 rbd_dev = obj_request->img_request->rbd_dev;
2606 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2607 stat_request);
2608 if (!stat_request->osd_req)
2609 goto out;
2610 stat_request->callback = rbd_img_obj_exists_callback;
2611
2612 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2613 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2614 false, false);
9d4df01f 2615 rbd_osd_req_format_read(stat_request);
c5b5ef6c
AE
2616
2617 osdc = &rbd_dev->rbd_client->client->osdc;
2618 ret = rbd_obj_request_submit(osdc, stat_request);
2619out:
2620 if (ret)
2621 rbd_obj_request_put(obj_request);
2622
2623 return ret;
2624}
2625
b454e36d
AE
2626static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2627{
2628 struct rbd_img_request *img_request;
a9e8ba2c 2629 struct rbd_device *rbd_dev;
3d7efd18 2630 bool known;
b454e36d
AE
2631
2632 rbd_assert(obj_request_img_data_test(obj_request));
2633
2634 img_request = obj_request->img_request;
2635 rbd_assert(img_request);
a9e8ba2c 2636 rbd_dev = img_request->rbd_dev;
b454e36d 2637
b454e36d 2638 /*
a9e8ba2c
AE
2639 * Only writes to layered images need special handling.
2640 * Reads and non-layered writes are simple object requests.
2641 * Layered writes that start beyond the end of the overlap
2642 * with the parent have no parent data, so they too are
2643 * simple object requests. Finally, if the target object is
2644 * known to already exist, its parent data has already been
2645 * copied, so a write to the object can also be handled as a
2646 * simple object request.
b454e36d
AE
2647 */
2648 if (!img_request_write_test(img_request) ||
2649 !img_request_layered_test(img_request) ||
a9e8ba2c 2650 rbd_dev->parent_overlap <= obj_request->img_offset ||
3d7efd18
AE
2651 ((known = obj_request_known_test(obj_request)) &&
2652 obj_request_exists_test(obj_request))) {
b454e36d
AE
2653
2654 struct rbd_device *rbd_dev;
2655 struct ceph_osd_client *osdc;
2656
2657 rbd_dev = obj_request->img_request->rbd_dev;
2658 osdc = &rbd_dev->rbd_client->client->osdc;
2659
2660 return rbd_obj_request_submit(osdc, obj_request);
2661 }
2662
2663 /*
3d7efd18
AE
2664 * It's a layered write. The target object might exist but
2665 * we may not know that yet. If we know it doesn't exist,
2666 * start by reading the data for the full target object from
2667 * the parent so we can use it for a copyup to the target.
b454e36d 2668 */
3d7efd18
AE
2669 if (known)
2670 return rbd_img_obj_parent_read_full(obj_request);
2671
2672 /* We don't know whether the target exists. Go find out. */
b454e36d
AE
2673
2674 return rbd_img_obj_exists_submit(obj_request);
2675}
2676
bf0d5f50
AE
2677static int rbd_img_request_submit(struct rbd_img_request *img_request)
2678{
bf0d5f50 2679 struct rbd_obj_request *obj_request;
46faeed4 2680 struct rbd_obj_request *next_obj_request;
bf0d5f50 2681
37206ee5 2682 dout("%s: img %p\n", __func__, img_request);
46faeed4 2683 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
bf0d5f50
AE
2684 int ret;
2685
b454e36d 2686 ret = rbd_img_obj_request_submit(obj_request);
bf0d5f50
AE
2687 if (ret)
2688 return ret;
bf0d5f50
AE
2689 }
2690
2691 return 0;
2692}
8b3e1a56
AE
2693
2694static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2695{
2696 struct rbd_obj_request *obj_request;
a9e8ba2c
AE
2697 struct rbd_device *rbd_dev;
2698 u64 obj_end;
02c74fba
AE
2699 u64 img_xferred;
2700 int img_result;
8b3e1a56
AE
2701
2702 rbd_assert(img_request_child_test(img_request));
2703
02c74fba
AE
2704 /* First get what we need from the image request and release it */
2705
8b3e1a56 2706 obj_request = img_request->obj_request;
02c74fba
AE
2707 img_xferred = img_request->xferred;
2708 img_result = img_request->result;
2709 rbd_img_request_put(img_request);
2710
2711 /*
2712 * If the overlap has become 0 (most likely because the
2713 * image has been flattened) we need to re-submit the
2714 * original request.
2715 */
a9e8ba2c
AE
2716 rbd_assert(obj_request);
2717 rbd_assert(obj_request->img_request);
02c74fba
AE
2718 rbd_dev = obj_request->img_request->rbd_dev;
2719 if (!rbd_dev->parent_overlap) {
2720 struct ceph_osd_client *osdc;
2721
2722 osdc = &rbd_dev->rbd_client->client->osdc;
2723 img_result = rbd_obj_request_submit(osdc, obj_request);
2724 if (!img_result)
2725 return;
2726 }
a9e8ba2c 2727
02c74fba 2728 obj_request->result = img_result;
a9e8ba2c
AE
2729 if (obj_request->result)
2730 goto out;
2731
2732 /*
2733 * We need to zero anything beyond the parent overlap
2734 * boundary. Since rbd_img_obj_request_read_callback()
2735 * will zero anything beyond the end of a short read, an
2736 * easy way to do this is to pretend the data from the
2737 * parent came up short--ending at the overlap boundary.
2738 */
2739 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2740 obj_end = obj_request->img_offset + obj_request->length;
a9e8ba2c
AE
2741 if (obj_end > rbd_dev->parent_overlap) {
2742 u64 xferred = 0;
2743
2744 if (obj_request->img_offset < rbd_dev->parent_overlap)
2745 xferred = rbd_dev->parent_overlap -
2746 obj_request->img_offset;
8b3e1a56 2747
02c74fba 2748 obj_request->xferred = min(img_xferred, xferred);
a9e8ba2c 2749 } else {
02c74fba 2750 obj_request->xferred = img_xferred;
a9e8ba2c
AE
2751 }
2752out:
8b3e1a56
AE
2753 rbd_img_obj_request_read_callback(obj_request);
2754 rbd_obj_request_complete(obj_request);
2755}
2756
2757static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2758{
8b3e1a56
AE
2759 struct rbd_img_request *img_request;
2760 int result;
2761
2762 rbd_assert(obj_request_img_data_test(obj_request));
2763 rbd_assert(obj_request->img_request != NULL);
2764 rbd_assert(obj_request->result == (s32) -ENOENT);
5b2ab72d 2765 rbd_assert(obj_request_type_valid(obj_request->type));
8b3e1a56 2766
8b3e1a56 2767 /* rbd_read_finish(obj_request, obj_request->length); */
e93f3152 2768 img_request = rbd_parent_request_create(obj_request,
8b3e1a56 2769 obj_request->img_offset,
e93f3152 2770 obj_request->length);
8b3e1a56
AE
2771 result = -ENOMEM;
2772 if (!img_request)
2773 goto out_err;
2774
5b2ab72d
AE
2775 if (obj_request->type == OBJ_REQUEST_BIO)
2776 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2777 obj_request->bio_list);
2778 else
2779 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
2780 obj_request->pages);
8b3e1a56
AE
2781 if (result)
2782 goto out_err;
2783
2784 img_request->callback = rbd_img_parent_read_callback;
2785 result = rbd_img_request_submit(img_request);
2786 if (result)
2787 goto out_err;
2788
2789 return;
2790out_err:
2791 if (img_request)
2792 rbd_img_request_put(img_request);
2793 obj_request->result = result;
2794 obj_request->xferred = 0;
2795 obj_request_done_set(obj_request);
2796}
bf0d5f50 2797
20e0af67 2798static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev, u64 notify_id)
b8d70035
AE
2799{
2800 struct rbd_obj_request *obj_request;
2169238d 2801 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
b8d70035
AE
2802 int ret;
2803
2804 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2805 OBJ_REQUEST_NODATA);
2806 if (!obj_request)
2807 return -ENOMEM;
2808
2809 ret = -ENOMEM;
430c28c3 2810 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
b8d70035
AE
2811 if (!obj_request->osd_req)
2812 goto out;
2813
c99d2d4a 2814 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
cc4a38bd 2815 notify_id, 0, 0);
9d4df01f 2816 rbd_osd_req_format_read(obj_request);
430c28c3 2817
b8d70035 2818 ret = rbd_obj_request_submit(osdc, obj_request);
cf81b60e 2819 if (ret)
20e0af67
JD
2820 goto out;
2821 ret = rbd_obj_request_wait(obj_request);
2822out:
2823 rbd_obj_request_put(obj_request);
b8d70035
AE
2824
2825 return ret;
2826}
2827
2828static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2829{
2830 struct rbd_device *rbd_dev = (struct rbd_device *)data;
e627db08 2831 int ret;
b8d70035
AE
2832
2833 if (!rbd_dev)
2834 return;
2835
37206ee5 2836 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
cc4a38bd
AE
2837 rbd_dev->header_name, (unsigned long long)notify_id,
2838 (unsigned int)opcode);
e627db08
AE
2839 ret = rbd_dev_refresh(rbd_dev);
2840 if (ret)
3b5cf2a2 2841 rbd_warn(rbd_dev, "header refresh error (%d)\n", ret);
b8d70035 2842
20e0af67 2843 rbd_obj_notify_ack_sync(rbd_dev, notify_id);
b8d70035
AE
2844}
2845
9969ebc5
AE
2846/*
2847 * Request sync osd watch/unwatch. The value of "start" determines
2848 * whether a watch request is being initiated or torn down.
2849 */
fca27065 2850static int __rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
9969ebc5
AE
2851{
2852 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2853 struct rbd_obj_request *obj_request;
9969ebc5
AE
2854 int ret;
2855
2856 rbd_assert(start ^ !!rbd_dev->watch_event);
2857 rbd_assert(start ^ !!rbd_dev->watch_request);
2858
2859 if (start) {
3c663bbd 2860 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
9969ebc5
AE
2861 &rbd_dev->watch_event);
2862 if (ret < 0)
2863 return ret;
8eb87565 2864 rbd_assert(rbd_dev->watch_event != NULL);
9969ebc5
AE
2865 }
2866
2867 ret = -ENOMEM;
2868 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2869 OBJ_REQUEST_NODATA);
2870 if (!obj_request)
2871 goto out_cancel;
2872
430c28c3
AE
2873 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2874 if (!obj_request->osd_req)
2875 goto out_cancel;
2876
8eb87565 2877 if (start)
975241af 2878 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
8eb87565 2879 else
6977c3f9 2880 ceph_osdc_unregister_linger_request(osdc,
975241af 2881 rbd_dev->watch_request->osd_req);
2169238d
AE
2882
2883 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
1f3ef788 2884 rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
9d4df01f 2885 rbd_osd_req_format_write(obj_request);
2169238d 2886
9969ebc5
AE
2887 ret = rbd_obj_request_submit(osdc, obj_request);
2888 if (ret)
2889 goto out_cancel;
2890 ret = rbd_obj_request_wait(obj_request);
2891 if (ret)
2892 goto out_cancel;
9969ebc5
AE
2893 ret = obj_request->result;
2894 if (ret)
2895 goto out_cancel;
2896
8eb87565
AE
2897 /*
2898 * A watch request is set to linger, so the underlying osd
2899 * request won't go away until we unregister it. We retain
2900 * a pointer to the object request during that time (in
2901 * rbd_dev->watch_request), so we'll keep a reference to
2902 * it. We'll drop that reference (below) after we've
2903 * unregistered it.
2904 */
2905 if (start) {
2906 rbd_dev->watch_request = obj_request;
2907
2908 return 0;
2909 }
2910
2911 /* We have successfully torn down the watch request */
2912
2913 rbd_obj_request_put(rbd_dev->watch_request);
2914 rbd_dev->watch_request = NULL;
9969ebc5
AE
2915out_cancel:
2916 /* Cancel the event if we're tearing down, or on error */
2917 ceph_osdc_cancel_event(rbd_dev->watch_event);
2918 rbd_dev->watch_event = NULL;
9969ebc5
AE
2919 if (obj_request)
2920 rbd_obj_request_put(obj_request);
2921
2922 return ret;
2923}
2924
fca27065
ID
2925static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
2926{
2927 return __rbd_dev_header_watch_sync(rbd_dev, true);
2928}
2929
2930static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
2931{
2932 int ret;
2933
2934 ret = __rbd_dev_header_watch_sync(rbd_dev, false);
2935 if (ret) {
2936 rbd_warn(rbd_dev, "unable to tear down watch request: %d\n",
2937 ret);
2938 }
2939}
2940
36be9a76 2941/*
f40eb349
AE
2942 * Synchronous osd object method call. Returns the number of bytes
2943 * returned in the outbound buffer, or a negative error code.
36be9a76
AE
2944 */
2945static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2946 const char *object_name,
2947 const char *class_name,
2948 const char *method_name,
4157976b 2949 const void *outbound,
36be9a76 2950 size_t outbound_size,
4157976b 2951 void *inbound,
e2a58ee5 2952 size_t inbound_size)
36be9a76 2953{
2169238d 2954 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
36be9a76 2955 struct rbd_obj_request *obj_request;
36be9a76
AE
2956 struct page **pages;
2957 u32 page_count;
2958 int ret;
2959
2960 /*
6010a451
AE
2961 * Method calls are ultimately read operations. The result
2962 * should placed into the inbound buffer provided. They
2963 * also supply outbound data--parameters for the object
2964 * method. Currently if this is present it will be a
2965 * snapshot id.
36be9a76 2966 */
57385b51 2967 page_count = (u32)calc_pages_for(0, inbound_size);
36be9a76
AE
2968 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2969 if (IS_ERR(pages))
2970 return PTR_ERR(pages);
2971
2972 ret = -ENOMEM;
6010a451 2973 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
36be9a76
AE
2974 OBJ_REQUEST_PAGES);
2975 if (!obj_request)
2976 goto out;
2977
2978 obj_request->pages = pages;
2979 obj_request->page_count = page_count;
2980
430c28c3 2981 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
36be9a76
AE
2982 if (!obj_request->osd_req)
2983 goto out;
2984
c99d2d4a 2985 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
04017e29
AE
2986 class_name, method_name);
2987 if (outbound_size) {
2988 struct ceph_pagelist *pagelist;
2989
2990 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2991 if (!pagelist)
2992 goto out;
2993
2994 ceph_pagelist_init(pagelist);
2995 ceph_pagelist_append(pagelist, outbound, outbound_size);
2996 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2997 pagelist);
2998 }
a4ce40a9
AE
2999 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
3000 obj_request->pages, inbound_size,
44cd188d 3001 0, false, false);
9d4df01f 3002 rbd_osd_req_format_read(obj_request);
430c28c3 3003
36be9a76
AE
3004 ret = rbd_obj_request_submit(osdc, obj_request);
3005 if (ret)
3006 goto out;
3007 ret = rbd_obj_request_wait(obj_request);
3008 if (ret)
3009 goto out;
3010
3011 ret = obj_request->result;
3012 if (ret < 0)
3013 goto out;
57385b51
AE
3014
3015 rbd_assert(obj_request->xferred < (u64)INT_MAX);
3016 ret = (int)obj_request->xferred;
903bb32e 3017 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
36be9a76
AE
3018out:
3019 if (obj_request)
3020 rbd_obj_request_put(obj_request);
3021 else
3022 ceph_release_page_vector(pages, page_count);
3023
3024 return ret;
3025}
3026
bf0d5f50 3027static void rbd_request_fn(struct request_queue *q)
cc344fa1 3028 __releases(q->queue_lock) __acquires(q->queue_lock)
bf0d5f50
AE
3029{
3030 struct rbd_device *rbd_dev = q->queuedata;
3031 bool read_only = rbd_dev->mapping.read_only;
3032 struct request *rq;
3033 int result;
3034
3035 while ((rq = blk_fetch_request(q))) {
3036 bool write_request = rq_data_dir(rq) == WRITE;
3037 struct rbd_img_request *img_request;
3038 u64 offset;
3039 u64 length;
3040
3041 /* Ignore any non-FS requests that filter through. */
3042
3043 if (rq->cmd_type != REQ_TYPE_FS) {
4dda41d3
AE
3044 dout("%s: non-fs request type %d\n", __func__,
3045 (int) rq->cmd_type);
3046 __blk_end_request_all(rq, 0);
3047 continue;
3048 }
3049
3050 /* Ignore/skip any zero-length requests */
3051
3052 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
3053 length = (u64) blk_rq_bytes(rq);
3054
3055 if (!length) {
3056 dout("%s: zero-length request\n", __func__);
bf0d5f50
AE
3057 __blk_end_request_all(rq, 0);
3058 continue;
3059 }
3060
3061 spin_unlock_irq(q->queue_lock);
3062
3063 /* Disallow writes to a read-only device */
3064
3065 if (write_request) {
3066 result = -EROFS;
3067 if (read_only)
3068 goto end_request;
3069 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
3070 }
3071
6d292906
AE
3072 /*
3073 * Quit early if the mapped snapshot no longer
3074 * exists. It's still possible the snapshot will
3075 * have disappeared by the time our request arrives
3076 * at the osd, but there's no sense in sending it if
3077 * we already know.
3078 */
3079 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
bf0d5f50
AE
3080 dout("request for non-existent snapshot");
3081 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3082 result = -ENXIO;
3083 goto end_request;
3084 }
3085
bf0d5f50 3086 result = -EINVAL;
c0cd10db
AE
3087 if (offset && length > U64_MAX - offset + 1) {
3088 rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
3089 offset, length);
bf0d5f50 3090 goto end_request; /* Shouldn't happen */
c0cd10db 3091 }
bf0d5f50 3092
00a653e2
AE
3093 result = -EIO;
3094 if (offset + length > rbd_dev->mapping.size) {
3095 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
3096 offset, length, rbd_dev->mapping.size);
3097 goto end_request;
3098 }
3099
bf0d5f50
AE
3100 result = -ENOMEM;
3101 img_request = rbd_img_request_create(rbd_dev, offset, length,
e93f3152 3102 write_request);
bf0d5f50
AE
3103 if (!img_request)
3104 goto end_request;
3105
3106 img_request->rq = rq;
3107
f1a4739f
AE
3108 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3109 rq->bio);
bf0d5f50
AE
3110 if (!result)
3111 result = rbd_img_request_submit(img_request);
3112 if (result)
3113 rbd_img_request_put(img_request);
3114end_request:
3115 spin_lock_irq(q->queue_lock);
3116 if (result < 0) {
7da22d29
AE
3117 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
3118 write_request ? "write" : "read",
3119 length, offset, result);
3120
bf0d5f50
AE
3121 __blk_end_request_all(rq, result);
3122 }
3123 }
3124}
3125
602adf40
YS
3126/*
3127 * a queue callback. Makes sure that we don't create a bio that spans across
3128 * multiple osd objects. One exception would be with a single page bios,
f7760dad 3129 * which we handle later at bio_chain_clone_range()
602adf40
YS
3130 */
3131static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
3132 struct bio_vec *bvec)
3133{
3134 struct rbd_device *rbd_dev = q->queuedata;
e5cfeed2
AE
3135 sector_t sector_offset;
3136 sector_t sectors_per_obj;
3137 sector_t obj_sector_offset;
3138 int ret;
3139
3140 /*
3141 * Find how far into its rbd object the partition-relative
3142 * bio start sector is to offset relative to the enclosing
3143 * device.
3144 */
3145 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
3146 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
3147 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
3148
3149 /*
3150 * Compute the number of bytes from that offset to the end
3151 * of the object. Account for what's already used by the bio.
3152 */
3153 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
3154 if (ret > bmd->bi_size)
3155 ret -= bmd->bi_size;
3156 else
3157 ret = 0;
3158
3159 /*
3160 * Don't send back more than was asked for. And if the bio
3161 * was empty, let the whole thing through because: "Note
3162 * that a block device *must* allow a single page to be
3163 * added to an empty bio."
3164 */
3165 rbd_assert(bvec->bv_len <= PAGE_SIZE);
3166 if (ret > (int) bvec->bv_len || !bmd->bi_size)
3167 ret = (int) bvec->bv_len;
3168
3169 return ret;
602adf40
YS
3170}
3171
3172static void rbd_free_disk(struct rbd_device *rbd_dev)
3173{
3174 struct gendisk *disk = rbd_dev->disk;
3175
3176 if (!disk)
3177 return;
3178
a0cab924
AE
3179 rbd_dev->disk = NULL;
3180 if (disk->flags & GENHD_FL_UP) {
602adf40 3181 del_gendisk(disk);
a0cab924
AE
3182 if (disk->queue)
3183 blk_cleanup_queue(disk->queue);
3184 }
602adf40
YS
3185 put_disk(disk);
3186}
3187
788e2df3
AE
3188static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3189 const char *object_name,
7097f8df 3190 u64 offset, u64 length, void *buf)
788e2df3
AE
3191
3192{
2169238d 3193 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
788e2df3 3194 struct rbd_obj_request *obj_request;
788e2df3
AE
3195 struct page **pages = NULL;
3196 u32 page_count;
1ceae7ef 3197 size_t size;
788e2df3
AE
3198 int ret;
3199
3200 page_count = (u32) calc_pages_for(offset, length);
3201 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3202 if (IS_ERR(pages))
3203 ret = PTR_ERR(pages);
3204
3205 ret = -ENOMEM;
3206 obj_request = rbd_obj_request_create(object_name, offset, length,
36be9a76 3207 OBJ_REQUEST_PAGES);
788e2df3
AE
3208 if (!obj_request)
3209 goto out;
3210
3211 obj_request->pages = pages;
3212 obj_request->page_count = page_count;
3213
430c28c3 3214 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
788e2df3
AE
3215 if (!obj_request->osd_req)
3216 goto out;
3217
c99d2d4a
AE
3218 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3219 offset, length, 0, 0);
406e2c9f 3220 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
a4ce40a9 3221 obj_request->pages,
44cd188d
AE
3222 obj_request->length,
3223 obj_request->offset & ~PAGE_MASK,
3224 false, false);
9d4df01f 3225 rbd_osd_req_format_read(obj_request);
430c28c3 3226
788e2df3
AE
3227 ret = rbd_obj_request_submit(osdc, obj_request);
3228 if (ret)
3229 goto out;
3230 ret = rbd_obj_request_wait(obj_request);
3231 if (ret)
3232 goto out;
3233
3234 ret = obj_request->result;
3235 if (ret < 0)
3236 goto out;
1ceae7ef
AE
3237
3238 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3239 size = (size_t) obj_request->xferred;
903bb32e 3240 ceph_copy_from_page_vector(pages, buf, 0, size);
7097f8df
AE
3241 rbd_assert(size <= (size_t)INT_MAX);
3242 ret = (int)size;
788e2df3
AE
3243out:
3244 if (obj_request)
3245 rbd_obj_request_put(obj_request);
3246 else
3247 ceph_release_page_vector(pages, page_count);
3248
3249 return ret;
3250}
3251
602adf40 3252/*
662518b1
AE
3253 * Read the complete header for the given rbd device. On successful
3254 * return, the rbd_dev->header field will contain up-to-date
3255 * information about the image.
602adf40 3256 */
99a41ebc 3257static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
602adf40 3258{
4156d998 3259 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 3260 u32 snap_count = 0;
4156d998
AE
3261 u64 names_size = 0;
3262 u32 want_count;
3263 int ret;
602adf40 3264
00f1f36f 3265 /*
4156d998
AE
3266 * The complete header will include an array of its 64-bit
3267 * snapshot ids, followed by the names of those snapshots as
3268 * a contiguous block of NUL-terminated strings. Note that
3269 * the number of snapshots could change by the time we read
3270 * it in, in which case we re-read it.
00f1f36f 3271 */
4156d998
AE
3272 do {
3273 size_t size;
3274
3275 kfree(ondisk);
3276
3277 size = sizeof (*ondisk);
3278 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3279 size += names_size;
3280 ondisk = kmalloc(size, GFP_KERNEL);
3281 if (!ondisk)
662518b1 3282 return -ENOMEM;
4156d998 3283
788e2df3 3284 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
7097f8df 3285 0, size, ondisk);
4156d998 3286 if (ret < 0)
662518b1 3287 goto out;
c0cd10db 3288 if ((size_t)ret < size) {
4156d998 3289 ret = -ENXIO;
06ecc6cb
AE
3290 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3291 size, ret);
662518b1 3292 goto out;
4156d998
AE
3293 }
3294 if (!rbd_dev_ondisk_valid(ondisk)) {
3295 ret = -ENXIO;
06ecc6cb 3296 rbd_warn(rbd_dev, "invalid header");
662518b1 3297 goto out;
81e759fb 3298 }
602adf40 3299
4156d998
AE
3300 names_size = le64_to_cpu(ondisk->snap_names_len);
3301 want_count = snap_count;
3302 snap_count = le32_to_cpu(ondisk->snap_count);
3303 } while (snap_count != want_count);
00f1f36f 3304
662518b1
AE
3305 ret = rbd_header_from_disk(rbd_dev, ondisk);
3306out:
4156d998
AE
3307 kfree(ondisk);
3308
3309 return ret;
602adf40
YS
3310}
3311
15228ede
AE
3312/*
3313 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3314 * has disappeared from the (just updated) snapshot context.
3315 */
3316static void rbd_exists_validate(struct rbd_device *rbd_dev)
3317{
3318 u64 snap_id;
3319
3320 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3321 return;
3322
3323 snap_id = rbd_dev->spec->snap_id;
3324 if (snap_id == CEPH_NOSNAP)
3325 return;
3326
3327 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3328 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3329}
3330
9875201e
JD
3331static void rbd_dev_update_size(struct rbd_device *rbd_dev)
3332{
3333 sector_t size;
3334 bool removing;
3335
3336 /*
3337 * Don't hold the lock while doing disk operations,
3338 * or lock ordering will conflict with the bdev mutex via:
3339 * rbd_add() -> blkdev_get() -> rbd_open()
3340 */
3341 spin_lock_irq(&rbd_dev->lock);
3342 removing = test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
3343 spin_unlock_irq(&rbd_dev->lock);
3344 /*
3345 * If the device is being removed, rbd_dev->disk has
3346 * been destroyed, so don't try to update its size
3347 */
3348 if (!removing) {
3349 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3350 dout("setting size to %llu sectors", (unsigned long long)size);
3351 set_capacity(rbd_dev->disk, size);
3352 revalidate_disk(rbd_dev->disk);
3353 }
3354}
3355
cc4a38bd 3356static int rbd_dev_refresh(struct rbd_device *rbd_dev)
1fe5e993 3357{
e627db08 3358 u64 mapping_size;
1fe5e993
AE
3359 int ret;
3360
117973fb 3361 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
cfbf6377 3362 down_write(&rbd_dev->header_rwsem);
3b5cf2a2 3363 mapping_size = rbd_dev->mapping.size;
117973fb 3364 if (rbd_dev->image_format == 1)
99a41ebc 3365 ret = rbd_dev_v1_header_info(rbd_dev);
117973fb 3366 else
2df3fac7 3367 ret = rbd_dev_v2_header_info(rbd_dev);
15228ede
AE
3368
3369 /* If it's a mapped snapshot, validate its EXISTS flag */
3370
3371 rbd_exists_validate(rbd_dev);
cfbf6377
AE
3372 up_write(&rbd_dev->header_rwsem);
3373
00a653e2 3374 if (mapping_size != rbd_dev->mapping.size) {
9875201e 3375 rbd_dev_update_size(rbd_dev);
00a653e2 3376 }
1fe5e993
AE
3377
3378 return ret;
3379}
3380
602adf40
YS
3381static int rbd_init_disk(struct rbd_device *rbd_dev)
3382{
3383 struct gendisk *disk;
3384 struct request_queue *q;
593a9e7b 3385 u64 segment_size;
602adf40 3386
602adf40 3387 /* create gendisk info */
7e513d43
ID
3388 disk = alloc_disk(single_major ?
3389 (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
3390 RBD_MINORS_PER_MAJOR);
602adf40 3391 if (!disk)
1fcdb8aa 3392 return -ENOMEM;
602adf40 3393
f0f8cef5 3394 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 3395 rbd_dev->dev_id);
602adf40 3396 disk->major = rbd_dev->major;
dd82fff1 3397 disk->first_minor = rbd_dev->minor;
7e513d43
ID
3398 if (single_major)
3399 disk->flags |= GENHD_FL_EXT_DEVT;
602adf40
YS
3400 disk->fops = &rbd_bd_ops;
3401 disk->private_data = rbd_dev;
3402
bf0d5f50 3403 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
602adf40
YS
3404 if (!q)
3405 goto out_disk;
029bcbd8 3406
593a9e7b
AE
3407 /* We use the default size, but let's be explicit about it. */
3408 blk_queue_physical_block_size(q, SECTOR_SIZE);
3409
029bcbd8 3410 /* set io sizes to object size */
593a9e7b
AE
3411 segment_size = rbd_obj_bytes(&rbd_dev->header);
3412 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3413 blk_queue_max_segment_size(q, segment_size);
3414 blk_queue_io_min(q, segment_size);
3415 blk_queue_io_opt(q, segment_size);
029bcbd8 3416
602adf40
YS
3417 blk_queue_merge_bvec(q, rbd_merge_bvec);
3418 disk->queue = q;
3419
3420 q->queuedata = rbd_dev;
3421
3422 rbd_dev->disk = disk;
602adf40 3423
602adf40 3424 return 0;
602adf40
YS
3425out_disk:
3426 put_disk(disk);
1fcdb8aa
AE
3427
3428 return -ENOMEM;
602adf40
YS
3429}
3430
dfc5606d
YS
3431/*
3432 sysfs
3433*/
3434
593a9e7b
AE
3435static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3436{
3437 return container_of(dev, struct rbd_device, dev);
3438}
3439
dfc5606d
YS
3440static ssize_t rbd_size_show(struct device *dev,
3441 struct device_attribute *attr, char *buf)
3442{
593a9e7b 3443 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0 3444
fc71d833
AE
3445 return sprintf(buf, "%llu\n",
3446 (unsigned long long)rbd_dev->mapping.size);
dfc5606d
YS
3447}
3448
34b13184
AE
3449/*
3450 * Note this shows the features for whatever's mapped, which is not
3451 * necessarily the base image.
3452 */
3453static ssize_t rbd_features_show(struct device *dev,
3454 struct device_attribute *attr, char *buf)
3455{
3456 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3457
3458 return sprintf(buf, "0x%016llx\n",
fc71d833 3459 (unsigned long long)rbd_dev->mapping.features);
34b13184
AE
3460}
3461
dfc5606d
YS
3462static ssize_t rbd_major_show(struct device *dev,
3463 struct device_attribute *attr, char *buf)
3464{
593a9e7b 3465 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 3466
fc71d833
AE
3467 if (rbd_dev->major)
3468 return sprintf(buf, "%d\n", rbd_dev->major);
3469
3470 return sprintf(buf, "(none)\n");
dd82fff1
ID
3471}
3472
3473static ssize_t rbd_minor_show(struct device *dev,
3474 struct device_attribute *attr, char *buf)
3475{
3476 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
fc71d833 3477
dd82fff1 3478 return sprintf(buf, "%d\n", rbd_dev->minor);
dfc5606d
YS
3479}
3480
3481static ssize_t rbd_client_id_show(struct device *dev,
3482 struct device_attribute *attr, char *buf)
602adf40 3483{
593a9e7b 3484 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3485
1dbb4399
AE
3486 return sprintf(buf, "client%lld\n",
3487 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
3488}
3489
dfc5606d
YS
3490static ssize_t rbd_pool_show(struct device *dev,
3491 struct device_attribute *attr, char *buf)
602adf40 3492{
593a9e7b 3493 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3494
0d7dbfce 3495 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
3496}
3497
9bb2f334
AE
3498static ssize_t rbd_pool_id_show(struct device *dev,
3499 struct device_attribute *attr, char *buf)
3500{
3501 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3502
0d7dbfce 3503 return sprintf(buf, "%llu\n",
fc71d833 3504 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
3505}
3506
dfc5606d
YS
3507static ssize_t rbd_name_show(struct device *dev,
3508 struct device_attribute *attr, char *buf)
3509{
593a9e7b 3510 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3511
a92ffdf8
AE
3512 if (rbd_dev->spec->image_name)
3513 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3514
3515 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
3516}
3517
589d30e0
AE
3518static ssize_t rbd_image_id_show(struct device *dev,
3519 struct device_attribute *attr, char *buf)
3520{
3521 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3522
0d7dbfce 3523 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
3524}
3525
34b13184
AE
3526/*
3527 * Shows the name of the currently-mapped snapshot (or
3528 * RBD_SNAP_HEAD_NAME for the base image).
3529 */
dfc5606d
YS
3530static ssize_t rbd_snap_show(struct device *dev,
3531 struct device_attribute *attr,
3532 char *buf)
3533{
593a9e7b 3534 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3535
0d7dbfce 3536 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
3537}
3538
86b00e0d
AE
3539/*
3540 * For an rbd v2 image, shows the pool id, image id, and snapshot id
3541 * for the parent image. If there is no parent, simply shows
3542 * "(no parent image)".
3543 */
3544static ssize_t rbd_parent_show(struct device *dev,
3545 struct device_attribute *attr,
3546 char *buf)
3547{
3548 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3549 struct rbd_spec *spec = rbd_dev->parent_spec;
3550 int count;
3551 char *bufp = buf;
3552
3553 if (!spec)
3554 return sprintf(buf, "(no parent image)\n");
3555
3556 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3557 (unsigned long long) spec->pool_id, spec->pool_name);
3558 if (count < 0)
3559 return count;
3560 bufp += count;
3561
3562 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3563 spec->image_name ? spec->image_name : "(unknown)");
3564 if (count < 0)
3565 return count;
3566 bufp += count;
3567
3568 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3569 (unsigned long long) spec->snap_id, spec->snap_name);
3570 if (count < 0)
3571 return count;
3572 bufp += count;
3573
3574 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3575 if (count < 0)
3576 return count;
3577 bufp += count;
3578
3579 return (ssize_t) (bufp - buf);
3580}
3581
dfc5606d
YS
3582static ssize_t rbd_image_refresh(struct device *dev,
3583 struct device_attribute *attr,
3584 const char *buf,
3585 size_t size)
3586{
593a9e7b 3587 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 3588 int ret;
602adf40 3589
cc4a38bd 3590 ret = rbd_dev_refresh(rbd_dev);
e627db08
AE
3591 if (ret)
3592 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
b813623a
AE
3593
3594 return ret < 0 ? ret : size;
dfc5606d 3595}
602adf40 3596
dfc5606d 3597static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 3598static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d 3599static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
dd82fff1 3600static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
dfc5606d
YS
3601static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3602static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 3603static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 3604static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 3605static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
3606static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3607static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
86b00e0d 3608static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
3609
3610static struct attribute *rbd_attrs[] = {
3611 &dev_attr_size.attr,
34b13184 3612 &dev_attr_features.attr,
dfc5606d 3613 &dev_attr_major.attr,
dd82fff1 3614 &dev_attr_minor.attr,
dfc5606d
YS
3615 &dev_attr_client_id.attr,
3616 &dev_attr_pool.attr,
9bb2f334 3617 &dev_attr_pool_id.attr,
dfc5606d 3618 &dev_attr_name.attr,
589d30e0 3619 &dev_attr_image_id.attr,
dfc5606d 3620 &dev_attr_current_snap.attr,
86b00e0d 3621 &dev_attr_parent.attr,
dfc5606d 3622 &dev_attr_refresh.attr,
dfc5606d
YS
3623 NULL
3624};
3625
3626static struct attribute_group rbd_attr_group = {
3627 .attrs = rbd_attrs,
3628};
3629
3630static const struct attribute_group *rbd_attr_groups[] = {
3631 &rbd_attr_group,
3632 NULL
3633};
3634
3635static void rbd_sysfs_dev_release(struct device *dev)
3636{
3637}
3638
3639static struct device_type rbd_device_type = {
3640 .name = "rbd",
3641 .groups = rbd_attr_groups,
3642 .release = rbd_sysfs_dev_release,
3643};
3644
8b8fb99c
AE
3645static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3646{
3647 kref_get(&spec->kref);
3648
3649 return spec;
3650}
3651
3652static void rbd_spec_free(struct kref *kref);
3653static void rbd_spec_put(struct rbd_spec *spec)
3654{
3655 if (spec)
3656 kref_put(&spec->kref, rbd_spec_free);
3657}
3658
3659static struct rbd_spec *rbd_spec_alloc(void)
3660{
3661 struct rbd_spec *spec;
3662
3663 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3664 if (!spec)
3665 return NULL;
3666 kref_init(&spec->kref);
3667
8b8fb99c
AE
3668 return spec;
3669}
3670
3671static void rbd_spec_free(struct kref *kref)
3672{
3673 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3674
3675 kfree(spec->pool_name);
3676 kfree(spec->image_id);
3677 kfree(spec->image_name);
3678 kfree(spec->snap_name);
3679 kfree(spec);
3680}
3681
cc344fa1 3682static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
c53d5893
AE
3683 struct rbd_spec *spec)
3684{
3685 struct rbd_device *rbd_dev;
3686
3687 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3688 if (!rbd_dev)
3689 return NULL;
3690
3691 spin_lock_init(&rbd_dev->lock);
6d292906 3692 rbd_dev->flags = 0;
a2acd00e 3693 atomic_set(&rbd_dev->parent_ref, 0);
c53d5893 3694 INIT_LIST_HEAD(&rbd_dev->node);
c53d5893
AE
3695 init_rwsem(&rbd_dev->header_rwsem);
3696
3697 rbd_dev->spec = spec;
3698 rbd_dev->rbd_client = rbdc;
3699
0903e875
AE
3700 /* Initialize the layout used for all rbd requests */
3701
3702 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3703 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3704 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3705 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3706
c53d5893
AE
3707 return rbd_dev;
3708}
3709
3710static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3711{
c53d5893
AE
3712 rbd_put_client(rbd_dev->rbd_client);
3713 rbd_spec_put(rbd_dev->spec);
3714 kfree(rbd_dev);
3715}
3716
9d475de5
AE
3717/*
3718 * Get the size and object order for an image snapshot, or if
3719 * snap_id is CEPH_NOSNAP, gets this information for the base
3720 * image.
3721 */
3722static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3723 u8 *order, u64 *snap_size)
3724{
3725 __le64 snapid = cpu_to_le64(snap_id);
3726 int ret;
3727 struct {
3728 u8 order;
3729 __le64 size;
3730 } __attribute__ ((packed)) size_buf = { 0 };
3731
36be9a76 3732 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
9d475de5 3733 "rbd", "get_size",
4157976b 3734 &snapid, sizeof (snapid),
e2a58ee5 3735 &size_buf, sizeof (size_buf));
36be9a76 3736 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
3737 if (ret < 0)
3738 return ret;
57385b51
AE
3739 if (ret < sizeof (size_buf))
3740 return -ERANGE;
9d475de5 3741
c3545579 3742 if (order) {
c86f86e9 3743 *order = size_buf.order;
c3545579
JD
3744 dout(" order %u", (unsigned int)*order);
3745 }
9d475de5
AE
3746 *snap_size = le64_to_cpu(size_buf.size);
3747
c3545579
JD
3748 dout(" snap_id 0x%016llx snap_size = %llu\n",
3749 (unsigned long long)snap_id,
57385b51 3750 (unsigned long long)*snap_size);
9d475de5
AE
3751
3752 return 0;
3753}
3754
3755static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3756{
3757 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3758 &rbd_dev->header.obj_order,
3759 &rbd_dev->header.image_size);
3760}
3761
1e130199
AE
3762static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3763{
3764 void *reply_buf;
3765 int ret;
3766 void *p;
3767
3768 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3769 if (!reply_buf)
3770 return -ENOMEM;
3771
36be9a76 3772 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 3773 "rbd", "get_object_prefix", NULL, 0,
e2a58ee5 3774 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
36be9a76 3775 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
3776 if (ret < 0)
3777 goto out;
3778
3779 p = reply_buf;
3780 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
57385b51
AE
3781 p + ret, NULL, GFP_NOIO);
3782 ret = 0;
1e130199
AE
3783
3784 if (IS_ERR(rbd_dev->header.object_prefix)) {
3785 ret = PTR_ERR(rbd_dev->header.object_prefix);
3786 rbd_dev->header.object_prefix = NULL;
3787 } else {
3788 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
3789 }
1e130199
AE
3790out:
3791 kfree(reply_buf);
3792
3793 return ret;
3794}
3795
b1b5402a
AE
3796static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3797 u64 *snap_features)
3798{
3799 __le64 snapid = cpu_to_le64(snap_id);
3800 struct {
3801 __le64 features;
3802 __le64 incompat;
4157976b 3803 } __attribute__ ((packed)) features_buf = { 0 };
d889140c 3804 u64 incompat;
b1b5402a
AE
3805 int ret;
3806
36be9a76 3807 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b1b5402a 3808 "rbd", "get_features",
4157976b 3809 &snapid, sizeof (snapid),
e2a58ee5 3810 &features_buf, sizeof (features_buf));
36be9a76 3811 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
3812 if (ret < 0)
3813 return ret;
57385b51
AE
3814 if (ret < sizeof (features_buf))
3815 return -ERANGE;
d889140c
AE
3816
3817 incompat = le64_to_cpu(features_buf.incompat);
5cbf6f12 3818 if (incompat & ~RBD_FEATURES_SUPPORTED)
b8f5c6ed 3819 return -ENXIO;
d889140c 3820
b1b5402a
AE
3821 *snap_features = le64_to_cpu(features_buf.features);
3822
3823 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
57385b51
AE
3824 (unsigned long long)snap_id,
3825 (unsigned long long)*snap_features,
3826 (unsigned long long)le64_to_cpu(features_buf.incompat));
b1b5402a
AE
3827
3828 return 0;
3829}
3830
3831static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3832{
3833 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3834 &rbd_dev->header.features);
3835}
3836
86b00e0d
AE
3837static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3838{
3839 struct rbd_spec *parent_spec;
3840 size_t size;
3841 void *reply_buf = NULL;
3842 __le64 snapid;
3843 void *p;
3844 void *end;
642a2537 3845 u64 pool_id;
86b00e0d 3846 char *image_id;
3b5cf2a2 3847 u64 snap_id;
86b00e0d 3848 u64 overlap;
86b00e0d
AE
3849 int ret;
3850
3851 parent_spec = rbd_spec_alloc();
3852 if (!parent_spec)
3853 return -ENOMEM;
3854
3855 size = sizeof (__le64) + /* pool_id */
3856 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3857 sizeof (__le64) + /* snap_id */
3858 sizeof (__le64); /* overlap */
3859 reply_buf = kmalloc(size, GFP_KERNEL);
3860 if (!reply_buf) {
3861 ret = -ENOMEM;
3862 goto out_err;
3863 }
3864
3865 snapid = cpu_to_le64(CEPH_NOSNAP);
36be9a76 3866 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
86b00e0d 3867 "rbd", "get_parent",
4157976b 3868 &snapid, sizeof (snapid),
e2a58ee5 3869 reply_buf, size);
36be9a76 3870 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
86b00e0d
AE
3871 if (ret < 0)
3872 goto out_err;
3873
86b00e0d 3874 p = reply_buf;
57385b51
AE
3875 end = reply_buf + ret;
3876 ret = -ERANGE;
642a2537 3877 ceph_decode_64_safe(&p, end, pool_id, out_err);
392a9dad
AE
3878 if (pool_id == CEPH_NOPOOL) {
3879 /*
3880 * Either the parent never existed, or we have
3881 * record of it but the image got flattened so it no
3882 * longer has a parent. When the parent of a
3883 * layered image disappears we immediately set the
3884 * overlap to 0. The effect of this is that all new
3885 * requests will be treated as if the image had no
3886 * parent.
3887 */
3888 if (rbd_dev->parent_overlap) {
3889 rbd_dev->parent_overlap = 0;
3890 smp_mb();
3891 rbd_dev_parent_put(rbd_dev);
3892 pr_info("%s: clone image has been flattened\n",
3893 rbd_dev->disk->disk_name);
3894 }
3895
86b00e0d 3896 goto out; /* No parent? No problem. */
392a9dad 3897 }
86b00e0d 3898
0903e875
AE
3899 /* The ceph file layout needs to fit pool id in 32 bits */
3900
3901 ret = -EIO;
642a2537 3902 if (pool_id > (u64)U32_MAX) {
c0cd10db 3903 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
642a2537 3904 (unsigned long long)pool_id, U32_MAX);
57385b51 3905 goto out_err;
c0cd10db 3906 }
0903e875 3907
979ed480 3908 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
3909 if (IS_ERR(image_id)) {
3910 ret = PTR_ERR(image_id);
3911 goto out_err;
3912 }
3b5cf2a2 3913 ceph_decode_64_safe(&p, end, snap_id, out_err);
86b00e0d
AE
3914 ceph_decode_64_safe(&p, end, overlap, out_err);
3915
3b5cf2a2
AE
3916 /*
3917 * The parent won't change (except when the clone is
3918 * flattened, already handled that). So we only need to
3919 * record the parent spec we have not already done so.
3920 */
3921 if (!rbd_dev->parent_spec) {
3922 parent_spec->pool_id = pool_id;
3923 parent_spec->image_id = image_id;
3924 parent_spec->snap_id = snap_id;
70cf49cf
AE
3925 rbd_dev->parent_spec = parent_spec;
3926 parent_spec = NULL; /* rbd_dev now owns this */
3b5cf2a2
AE
3927 }
3928
3929 /*
3930 * We always update the parent overlap. If it's zero we
3931 * treat it specially.
3932 */
3933 rbd_dev->parent_overlap = overlap;
3934 smp_mb();
3935 if (!overlap) {
3936
3937 /* A null parent_spec indicates it's the initial probe */
3938
3939 if (parent_spec) {
3940 /*
3941 * The overlap has become zero, so the clone
3942 * must have been resized down to 0 at some
3943 * point. Treat this the same as a flatten.
3944 */
3945 rbd_dev_parent_put(rbd_dev);
3946 pr_info("%s: clone image now standalone\n",
3947 rbd_dev->disk->disk_name);
3948 } else {
3949 /*
3950 * For the initial probe, if we find the
3951 * overlap is zero we just pretend there was
3952 * no parent image.
3953 */
3954 rbd_warn(rbd_dev, "ignoring parent of "
3955 "clone with overlap 0\n");
3956 }
70cf49cf 3957 }
86b00e0d
AE
3958out:
3959 ret = 0;
3960out_err:
3961 kfree(reply_buf);
3962 rbd_spec_put(parent_spec);
3963
3964 return ret;
3965}
3966
cc070d59
AE
3967static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3968{
3969 struct {
3970 __le64 stripe_unit;
3971 __le64 stripe_count;
3972 } __attribute__ ((packed)) striping_info_buf = { 0 };
3973 size_t size = sizeof (striping_info_buf);
3974 void *p;
3975 u64 obj_size;
3976 u64 stripe_unit;
3977 u64 stripe_count;
3978 int ret;
3979
3980 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3981 "rbd", "get_stripe_unit_count", NULL, 0,
e2a58ee5 3982 (char *)&striping_info_buf, size);
cc070d59
AE
3983 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3984 if (ret < 0)
3985 return ret;
3986 if (ret < size)
3987 return -ERANGE;
3988
3989 /*
3990 * We don't actually support the "fancy striping" feature
3991 * (STRIPINGV2) yet, but if the striping sizes are the
3992 * defaults the behavior is the same as before. So find
3993 * out, and only fail if the image has non-default values.
3994 */
3995 ret = -EINVAL;
3996 obj_size = (u64)1 << rbd_dev->header.obj_order;
3997 p = &striping_info_buf;
3998 stripe_unit = ceph_decode_64(&p);
3999 if (stripe_unit != obj_size) {
4000 rbd_warn(rbd_dev, "unsupported stripe unit "
4001 "(got %llu want %llu)",
4002 stripe_unit, obj_size);
4003 return -EINVAL;
4004 }
4005 stripe_count = ceph_decode_64(&p);
4006 if (stripe_count != 1) {
4007 rbd_warn(rbd_dev, "unsupported stripe count "
4008 "(got %llu want 1)", stripe_count);
4009 return -EINVAL;
4010 }
500d0c0f
AE
4011 rbd_dev->header.stripe_unit = stripe_unit;
4012 rbd_dev->header.stripe_count = stripe_count;
cc070d59
AE
4013
4014 return 0;
4015}
4016
9e15b77d
AE
4017static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
4018{
4019 size_t image_id_size;
4020 char *image_id;
4021 void *p;
4022 void *end;
4023 size_t size;
4024 void *reply_buf = NULL;
4025 size_t len = 0;
4026 char *image_name = NULL;
4027 int ret;
4028
4029 rbd_assert(!rbd_dev->spec->image_name);
4030
69e7a02f
AE
4031 len = strlen(rbd_dev->spec->image_id);
4032 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
4033 image_id = kmalloc(image_id_size, GFP_KERNEL);
4034 if (!image_id)
4035 return NULL;
4036
4037 p = image_id;
4157976b 4038 end = image_id + image_id_size;
57385b51 4039 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
9e15b77d
AE
4040
4041 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
4042 reply_buf = kmalloc(size, GFP_KERNEL);
4043 if (!reply_buf)
4044 goto out;
4045
36be9a76 4046 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
9e15b77d
AE
4047 "rbd", "dir_get_name",
4048 image_id, image_id_size,
e2a58ee5 4049 reply_buf, size);
9e15b77d
AE
4050 if (ret < 0)
4051 goto out;
4052 p = reply_buf;
f40eb349
AE
4053 end = reply_buf + ret;
4054
9e15b77d
AE
4055 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
4056 if (IS_ERR(image_name))
4057 image_name = NULL;
4058 else
4059 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
4060out:
4061 kfree(reply_buf);
4062 kfree(image_id);
4063
4064 return image_name;
4065}
4066
2ad3d716
AE
4067static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4068{
4069 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4070 const char *snap_name;
4071 u32 which = 0;
4072
4073 /* Skip over names until we find the one we are looking for */
4074
4075 snap_name = rbd_dev->header.snap_names;
4076 while (which < snapc->num_snaps) {
4077 if (!strcmp(name, snap_name))
4078 return snapc->snaps[which];
4079 snap_name += strlen(snap_name) + 1;
4080 which++;
4081 }
4082 return CEPH_NOSNAP;
4083}
4084
4085static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4086{
4087 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4088 u32 which;
4089 bool found = false;
4090 u64 snap_id;
4091
4092 for (which = 0; !found && which < snapc->num_snaps; which++) {
4093 const char *snap_name;
4094
4095 snap_id = snapc->snaps[which];
4096 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
efadc98a
JD
4097 if (IS_ERR(snap_name)) {
4098 /* ignore no-longer existing snapshots */
4099 if (PTR_ERR(snap_name) == -ENOENT)
4100 continue;
4101 else
4102 break;
4103 }
2ad3d716
AE
4104 found = !strcmp(name, snap_name);
4105 kfree(snap_name);
4106 }
4107 return found ? snap_id : CEPH_NOSNAP;
4108}
4109
4110/*
4111 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
4112 * no snapshot by that name is found, or if an error occurs.
4113 */
4114static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4115{
4116 if (rbd_dev->image_format == 1)
4117 return rbd_v1_snap_id_by_name(rbd_dev, name);
4118
4119 return rbd_v2_snap_id_by_name(rbd_dev, name);
4120}
4121
9e15b77d 4122/*
2e9f7f1c
AE
4123 * When an rbd image has a parent image, it is identified by the
4124 * pool, image, and snapshot ids (not names). This function fills
4125 * in the names for those ids. (It's OK if we can't figure out the
4126 * name for an image id, but the pool and snapshot ids should always
4127 * exist and have names.) All names in an rbd spec are dynamically
4128 * allocated.
e1d4213f
AE
4129 *
4130 * When an image being mapped (not a parent) is probed, we have the
4131 * pool name and pool id, image name and image id, and the snapshot
4132 * name. The only thing we're missing is the snapshot id.
9e15b77d 4133 */
2e9f7f1c 4134static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
9e15b77d 4135{
2e9f7f1c
AE
4136 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4137 struct rbd_spec *spec = rbd_dev->spec;
4138 const char *pool_name;
4139 const char *image_name;
4140 const char *snap_name;
9e15b77d
AE
4141 int ret;
4142
e1d4213f
AE
4143 /*
4144 * An image being mapped will have the pool name (etc.), but
4145 * we need to look up the snapshot id.
4146 */
2e9f7f1c
AE
4147 if (spec->pool_name) {
4148 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
2ad3d716 4149 u64 snap_id;
e1d4213f 4150
2ad3d716
AE
4151 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
4152 if (snap_id == CEPH_NOSNAP)
e1d4213f 4153 return -ENOENT;
2ad3d716 4154 spec->snap_id = snap_id;
e1d4213f 4155 } else {
2e9f7f1c 4156 spec->snap_id = CEPH_NOSNAP;
e1d4213f
AE
4157 }
4158
4159 return 0;
4160 }
9e15b77d 4161
2e9f7f1c 4162 /* Get the pool name; we have to make our own copy of this */
9e15b77d 4163
2e9f7f1c
AE
4164 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
4165 if (!pool_name) {
4166 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
935dc89f
AE
4167 return -EIO;
4168 }
2e9f7f1c
AE
4169 pool_name = kstrdup(pool_name, GFP_KERNEL);
4170 if (!pool_name)
9e15b77d
AE
4171 return -ENOMEM;
4172
4173 /* Fetch the image name; tolerate failure here */
4174
2e9f7f1c
AE
4175 image_name = rbd_dev_image_name(rbd_dev);
4176 if (!image_name)
06ecc6cb 4177 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d 4178
2e9f7f1c 4179 /* Look up the snapshot name, and make a copy */
9e15b77d 4180
2e9f7f1c 4181 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
da6a6b63
JD
4182 if (IS_ERR(snap_name)) {
4183 ret = PTR_ERR(snap_name);
9e15b77d 4184 goto out_err;
2e9f7f1c
AE
4185 }
4186
4187 spec->pool_name = pool_name;
4188 spec->image_name = image_name;
4189 spec->snap_name = snap_name;
9e15b77d
AE
4190
4191 return 0;
4192out_err:
2e9f7f1c
AE
4193 kfree(image_name);
4194 kfree(pool_name);
9e15b77d
AE
4195
4196 return ret;
4197}
4198
cc4a38bd 4199static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
35d489f9
AE
4200{
4201 size_t size;
4202 int ret;
4203 void *reply_buf;
4204 void *p;
4205 void *end;
4206 u64 seq;
4207 u32 snap_count;
4208 struct ceph_snap_context *snapc;
4209 u32 i;
4210
4211 /*
4212 * We'll need room for the seq value (maximum snapshot id),
4213 * snapshot count, and array of that many snapshot ids.
4214 * For now we have a fixed upper limit on the number we're
4215 * prepared to receive.
4216 */
4217 size = sizeof (__le64) + sizeof (__le32) +
4218 RBD_MAX_SNAP_COUNT * sizeof (__le64);
4219 reply_buf = kzalloc(size, GFP_KERNEL);
4220 if (!reply_buf)
4221 return -ENOMEM;
4222
36be9a76 4223 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 4224 "rbd", "get_snapcontext", NULL, 0,
e2a58ee5 4225 reply_buf, size);
36be9a76 4226 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
4227 if (ret < 0)
4228 goto out;
4229
35d489f9 4230 p = reply_buf;
57385b51
AE
4231 end = reply_buf + ret;
4232 ret = -ERANGE;
35d489f9
AE
4233 ceph_decode_64_safe(&p, end, seq, out);
4234 ceph_decode_32_safe(&p, end, snap_count, out);
4235
4236 /*
4237 * Make sure the reported number of snapshot ids wouldn't go
4238 * beyond the end of our buffer. But before checking that,
4239 * make sure the computed size of the snapshot context we
4240 * allocate is representable in a size_t.
4241 */
4242 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
4243 / sizeof (u64)) {
4244 ret = -EINVAL;
4245 goto out;
4246 }
4247 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
4248 goto out;
468521c1 4249 ret = 0;
35d489f9 4250
812164f8 4251 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
35d489f9
AE
4252 if (!snapc) {
4253 ret = -ENOMEM;
4254 goto out;
4255 }
35d489f9 4256 snapc->seq = seq;
35d489f9
AE
4257 for (i = 0; i < snap_count; i++)
4258 snapc->snaps[i] = ceph_decode_64(&p);
4259
49ece554 4260 ceph_put_snap_context(rbd_dev->header.snapc);
35d489f9
AE
4261 rbd_dev->header.snapc = snapc;
4262
4263 dout(" snap context seq = %llu, snap_count = %u\n",
57385b51 4264 (unsigned long long)seq, (unsigned int)snap_count);
35d489f9
AE
4265out:
4266 kfree(reply_buf);
4267
57385b51 4268 return ret;
35d489f9
AE
4269}
4270
54cac61f
AE
4271static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4272 u64 snap_id)
b8b1e2db
AE
4273{
4274 size_t size;
4275 void *reply_buf;
54cac61f 4276 __le64 snapid;
b8b1e2db
AE
4277 int ret;
4278 void *p;
4279 void *end;
b8b1e2db
AE
4280 char *snap_name;
4281
4282 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4283 reply_buf = kmalloc(size, GFP_KERNEL);
4284 if (!reply_buf)
4285 return ERR_PTR(-ENOMEM);
4286
54cac61f 4287 snapid = cpu_to_le64(snap_id);
36be9a76 4288 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b8b1e2db 4289 "rbd", "get_snapshot_name",
54cac61f 4290 &snapid, sizeof (snapid),
e2a58ee5 4291 reply_buf, size);
36be9a76 4292 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
f40eb349
AE
4293 if (ret < 0) {
4294 snap_name = ERR_PTR(ret);
b8b1e2db 4295 goto out;
f40eb349 4296 }
b8b1e2db
AE
4297
4298 p = reply_buf;
f40eb349 4299 end = reply_buf + ret;
e5c35534 4300 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
f40eb349 4301 if (IS_ERR(snap_name))
b8b1e2db 4302 goto out;
b8b1e2db 4303
f40eb349 4304 dout(" snap_id 0x%016llx snap_name = %s\n",
54cac61f 4305 (unsigned long long)snap_id, snap_name);
b8b1e2db
AE
4306out:
4307 kfree(reply_buf);
4308
f40eb349 4309 return snap_name;
b8b1e2db
AE
4310}
4311
2df3fac7 4312static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
117973fb 4313{
2df3fac7 4314 bool first_time = rbd_dev->header.object_prefix == NULL;
117973fb 4315 int ret;
117973fb 4316
1617e40c
JD
4317 ret = rbd_dev_v2_image_size(rbd_dev);
4318 if (ret)
cfbf6377 4319 return ret;
1617e40c 4320
2df3fac7
AE
4321 if (first_time) {
4322 ret = rbd_dev_v2_header_onetime(rbd_dev);
4323 if (ret)
cfbf6377 4324 return ret;
2df3fac7
AE
4325 }
4326
642a2537
AE
4327 /*
4328 * If the image supports layering, get the parent info. We
4329 * need to probe the first time regardless. Thereafter we
4330 * only need to if there's a parent, to see if it has
4331 * disappeared due to the mapped image getting flattened.
4332 */
4333 if (rbd_dev->header.features & RBD_FEATURE_LAYERING &&
4334 (first_time || rbd_dev->parent_spec)) {
4335 bool warn;
4336
4337 ret = rbd_dev_v2_parent_info(rbd_dev);
4338 if (ret)
cfbf6377 4339 return ret;
642a2537
AE
4340
4341 /*
4342 * Print a warning if this is the initial probe and
4343 * the image has a parent. Don't print it if the
4344 * image now being probed is itself a parent. We
4345 * can tell at this point because we won't know its
4346 * pool name yet (just its pool id).
4347 */
4348 warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name;
4349 if (first_time && warn)
4350 rbd_warn(rbd_dev, "WARNING: kernel layering "
4351 "is EXPERIMENTAL!");
4352 }
4353
29334ba4
AE
4354 if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4355 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4356 rbd_dev->mapping.size = rbd_dev->header.image_size;
117973fb 4357
cc4a38bd 4358 ret = rbd_dev_v2_snap_context(rbd_dev);
117973fb 4359 dout("rbd_dev_v2_snap_context returned %d\n", ret);
117973fb
AE
4360
4361 return ret;
4362}
4363
dfc5606d
YS
4364static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4365{
dfc5606d 4366 struct device *dev;
cd789ab9 4367 int ret;
dfc5606d 4368
cd789ab9 4369 dev = &rbd_dev->dev;
dfc5606d
YS
4370 dev->bus = &rbd_bus_type;
4371 dev->type = &rbd_device_type;
4372 dev->parent = &rbd_root_dev;
200a6a8b 4373 dev->release = rbd_dev_device_release;
de71a297 4374 dev_set_name(dev, "%d", rbd_dev->dev_id);
dfc5606d 4375 ret = device_register(dev);
dfc5606d 4376
dfc5606d 4377 return ret;
602adf40
YS
4378}
4379
dfc5606d
YS
4380static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4381{
4382 device_unregister(&rbd_dev->dev);
4383}
4384
1ddbe94e 4385/*
499afd5b 4386 * Get a unique rbd identifier for the given new rbd_dev, and add
f8a22fc2 4387 * the rbd_dev to the global list.
1ddbe94e 4388 */
f8a22fc2 4389static int rbd_dev_id_get(struct rbd_device *rbd_dev)
b7f23c36 4390{
f8a22fc2
ID
4391 int new_dev_id;
4392
9b60e70b
ID
4393 new_dev_id = ida_simple_get(&rbd_dev_id_ida,
4394 0, minor_to_rbd_dev_id(1 << MINORBITS),
4395 GFP_KERNEL);
f8a22fc2
ID
4396 if (new_dev_id < 0)
4397 return new_dev_id;
4398
4399 rbd_dev->dev_id = new_dev_id;
499afd5b
AE
4400
4401 spin_lock(&rbd_dev_list_lock);
4402 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4403 spin_unlock(&rbd_dev_list_lock);
f8a22fc2 4404
70eebd20 4405 dout("rbd_dev %p given dev id %d\n", rbd_dev, rbd_dev->dev_id);
f8a22fc2
ID
4406
4407 return 0;
1ddbe94e 4408}
b7f23c36 4409
1ddbe94e 4410/*
499afd5b
AE
4411 * Remove an rbd_dev from the global list, and record that its
4412 * identifier is no longer in use.
1ddbe94e 4413 */
e2839308 4414static void rbd_dev_id_put(struct rbd_device *rbd_dev)
1ddbe94e 4415{
499afd5b
AE
4416 spin_lock(&rbd_dev_list_lock);
4417 list_del_init(&rbd_dev->node);
4418 spin_unlock(&rbd_dev_list_lock);
b7f23c36 4419
f8a22fc2
ID
4420 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4421
4422 dout("rbd_dev %p released dev id %d\n", rbd_dev, rbd_dev->dev_id);
b7f23c36
AE
4423}
4424
e28fff26
AE
4425/*
4426 * Skips over white space at *buf, and updates *buf to point to the
4427 * first found non-space character (if any). Returns the length of
593a9e7b
AE
4428 * the token (string of non-white space characters) found. Note
4429 * that *buf must be terminated with '\0'.
e28fff26
AE
4430 */
4431static inline size_t next_token(const char **buf)
4432{
4433 /*
4434 * These are the characters that produce nonzero for
4435 * isspace() in the "C" and "POSIX" locales.
4436 */
4437 const char *spaces = " \f\n\r\t\v";
4438
4439 *buf += strspn(*buf, spaces); /* Find start of token */
4440
4441 return strcspn(*buf, spaces); /* Return token length */
4442}
4443
4444/*
4445 * Finds the next token in *buf, and if the provided token buffer is
4446 * big enough, copies the found token into it. The result, if
593a9e7b
AE
4447 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4448 * must be terminated with '\0' on entry.
e28fff26
AE
4449 *
4450 * Returns the length of the token found (not including the '\0').
4451 * Return value will be 0 if no token is found, and it will be >=
4452 * token_size if the token would not fit.
4453 *
593a9e7b 4454 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
4455 * found token. Note that this occurs even if the token buffer is
4456 * too small to hold it.
4457 */
4458static inline size_t copy_token(const char **buf,
4459 char *token,
4460 size_t token_size)
4461{
4462 size_t len;
4463
4464 len = next_token(buf);
4465 if (len < token_size) {
4466 memcpy(token, *buf, len);
4467 *(token + len) = '\0';
4468 }
4469 *buf += len;
4470
4471 return len;
4472}
4473
ea3352f4
AE
4474/*
4475 * Finds the next token in *buf, dynamically allocates a buffer big
4476 * enough to hold a copy of it, and copies the token into the new
4477 * buffer. The copy is guaranteed to be terminated with '\0'. Note
4478 * that a duplicate buffer is created even for a zero-length token.
4479 *
4480 * Returns a pointer to the newly-allocated duplicate, or a null
4481 * pointer if memory for the duplicate was not available. If
4482 * the lenp argument is a non-null pointer, the length of the token
4483 * (not including the '\0') is returned in *lenp.
4484 *
4485 * If successful, the *buf pointer will be updated to point beyond
4486 * the end of the found token.
4487 *
4488 * Note: uses GFP_KERNEL for allocation.
4489 */
4490static inline char *dup_token(const char **buf, size_t *lenp)
4491{
4492 char *dup;
4493 size_t len;
4494
4495 len = next_token(buf);
4caf35f9 4496 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
4497 if (!dup)
4498 return NULL;
ea3352f4
AE
4499 *(dup + len) = '\0';
4500 *buf += len;
4501
4502 if (lenp)
4503 *lenp = len;
4504
4505 return dup;
4506}
4507
a725f65e 4508/*
859c31df
AE
4509 * Parse the options provided for an "rbd add" (i.e., rbd image
4510 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4511 * and the data written is passed here via a NUL-terminated buffer.
4512 * Returns 0 if successful or an error code otherwise.
d22f76e7 4513 *
859c31df
AE
4514 * The information extracted from these options is recorded in
4515 * the other parameters which return dynamically-allocated
4516 * structures:
4517 * ceph_opts
4518 * The address of a pointer that will refer to a ceph options
4519 * structure. Caller must release the returned pointer using
4520 * ceph_destroy_options() when it is no longer needed.
4521 * rbd_opts
4522 * Address of an rbd options pointer. Fully initialized by
4523 * this function; caller must release with kfree().
4524 * spec
4525 * Address of an rbd image specification pointer. Fully
4526 * initialized by this function based on parsed options.
4527 * Caller must release with rbd_spec_put().
4528 *
4529 * The options passed take this form:
4530 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4531 * where:
4532 * <mon_addrs>
4533 * A comma-separated list of one or more monitor addresses.
4534 * A monitor address is an ip address, optionally followed
4535 * by a port number (separated by a colon).
4536 * I.e.: ip1[:port1][,ip2[:port2]...]
4537 * <options>
4538 * A comma-separated list of ceph and/or rbd options.
4539 * <pool_name>
4540 * The name of the rados pool containing the rbd image.
4541 * <image_name>
4542 * The name of the image in that pool to map.
4543 * <snap_id>
4544 * An optional snapshot id. If provided, the mapping will
4545 * present data from the image at the time that snapshot was
4546 * created. The image head is used if no snapshot id is
4547 * provided. Snapshot mappings are always read-only.
a725f65e 4548 */
859c31df 4549static int rbd_add_parse_args(const char *buf,
dc79b113 4550 struct ceph_options **ceph_opts,
859c31df
AE
4551 struct rbd_options **opts,
4552 struct rbd_spec **rbd_spec)
e28fff26 4553{
d22f76e7 4554 size_t len;
859c31df 4555 char *options;
0ddebc0c 4556 const char *mon_addrs;
ecb4dc22 4557 char *snap_name;
0ddebc0c 4558 size_t mon_addrs_size;
859c31df 4559 struct rbd_spec *spec = NULL;
4e9afeba 4560 struct rbd_options *rbd_opts = NULL;
859c31df 4561 struct ceph_options *copts;
dc79b113 4562 int ret;
e28fff26
AE
4563
4564 /* The first four tokens are required */
4565
7ef3214a 4566 len = next_token(&buf);
4fb5d671
AE
4567 if (!len) {
4568 rbd_warn(NULL, "no monitor address(es) provided");
4569 return -EINVAL;
4570 }
0ddebc0c 4571 mon_addrs = buf;
f28e565a 4572 mon_addrs_size = len + 1;
7ef3214a 4573 buf += len;
a725f65e 4574
dc79b113 4575 ret = -EINVAL;
f28e565a
AE
4576 options = dup_token(&buf, NULL);
4577 if (!options)
dc79b113 4578 return -ENOMEM;
4fb5d671
AE
4579 if (!*options) {
4580 rbd_warn(NULL, "no options provided");
4581 goto out_err;
4582 }
e28fff26 4583
859c31df
AE
4584 spec = rbd_spec_alloc();
4585 if (!spec)
f28e565a 4586 goto out_mem;
859c31df
AE
4587
4588 spec->pool_name = dup_token(&buf, NULL);
4589 if (!spec->pool_name)
4590 goto out_mem;
4fb5d671
AE
4591 if (!*spec->pool_name) {
4592 rbd_warn(NULL, "no pool name provided");
4593 goto out_err;
4594 }
e28fff26 4595
69e7a02f 4596 spec->image_name = dup_token(&buf, NULL);
859c31df 4597 if (!spec->image_name)
f28e565a 4598 goto out_mem;
4fb5d671
AE
4599 if (!*spec->image_name) {
4600 rbd_warn(NULL, "no image name provided");
4601 goto out_err;
4602 }
d4b125e9 4603
f28e565a
AE
4604 /*
4605 * Snapshot name is optional; default is to use "-"
4606 * (indicating the head/no snapshot).
4607 */
3feeb894 4608 len = next_token(&buf);
820a5f3e 4609 if (!len) {
3feeb894
AE
4610 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4611 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 4612 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 4613 ret = -ENAMETOOLONG;
f28e565a 4614 goto out_err;
849b4260 4615 }
ecb4dc22
AE
4616 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4617 if (!snap_name)
f28e565a 4618 goto out_mem;
ecb4dc22
AE
4619 *(snap_name + len) = '\0';
4620 spec->snap_name = snap_name;
e5c35534 4621
0ddebc0c 4622 /* Initialize all rbd options to the defaults */
e28fff26 4623
4e9afeba
AE
4624 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4625 if (!rbd_opts)
4626 goto out_mem;
4627
4628 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
d22f76e7 4629
859c31df 4630 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 4631 mon_addrs + mon_addrs_size - 1,
4e9afeba 4632 parse_rbd_opts_token, rbd_opts);
859c31df
AE
4633 if (IS_ERR(copts)) {
4634 ret = PTR_ERR(copts);
dc79b113
AE
4635 goto out_err;
4636 }
859c31df
AE
4637 kfree(options);
4638
4639 *ceph_opts = copts;
4e9afeba 4640 *opts = rbd_opts;
859c31df 4641 *rbd_spec = spec;
0ddebc0c 4642
dc79b113 4643 return 0;
f28e565a 4644out_mem:
dc79b113 4645 ret = -ENOMEM;
d22f76e7 4646out_err:
859c31df
AE
4647 kfree(rbd_opts);
4648 rbd_spec_put(spec);
f28e565a 4649 kfree(options);
d22f76e7 4650
dc79b113 4651 return ret;
a725f65e
AE
4652}
4653
589d30e0
AE
4654/*
4655 * An rbd format 2 image has a unique identifier, distinct from the
4656 * name given to it by the user. Internally, that identifier is
4657 * what's used to specify the names of objects related to the image.
4658 *
4659 * A special "rbd id" object is used to map an rbd image name to its
4660 * id. If that object doesn't exist, then there is no v2 rbd image
4661 * with the supplied name.
4662 *
4663 * This function will record the given rbd_dev's image_id field if
4664 * it can be determined, and in that case will return 0. If any
4665 * errors occur a negative errno will be returned and the rbd_dev's
4666 * image_id field will be unchanged (and should be NULL).
4667 */
4668static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4669{
4670 int ret;
4671 size_t size;
4672 char *object_name;
4673 void *response;
c0fba368 4674 char *image_id;
2f82ee54 4675
2c0d0a10
AE
4676 /*
4677 * When probing a parent image, the image id is already
4678 * known (and the image name likely is not). There's no
c0fba368
AE
4679 * need to fetch the image id again in this case. We
4680 * do still need to set the image format though.
2c0d0a10 4681 */
c0fba368
AE
4682 if (rbd_dev->spec->image_id) {
4683 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4684
2c0d0a10 4685 return 0;
c0fba368 4686 }
2c0d0a10 4687
589d30e0
AE
4688 /*
4689 * First, see if the format 2 image id file exists, and if
4690 * so, get the image's persistent id from it.
4691 */
69e7a02f 4692 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
589d30e0
AE
4693 object_name = kmalloc(size, GFP_NOIO);
4694 if (!object_name)
4695 return -ENOMEM;
0d7dbfce 4696 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
589d30e0
AE
4697 dout("rbd id object name is %s\n", object_name);
4698
4699 /* Response will be an encoded string, which includes a length */
4700
4701 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4702 response = kzalloc(size, GFP_NOIO);
4703 if (!response) {
4704 ret = -ENOMEM;
4705 goto out;
4706 }
4707
c0fba368
AE
4708 /* If it doesn't exist we'll assume it's a format 1 image */
4709
36be9a76 4710 ret = rbd_obj_method_sync(rbd_dev, object_name,
4157976b 4711 "rbd", "get_id", NULL, 0,
e2a58ee5 4712 response, RBD_IMAGE_ID_LEN_MAX);
36be9a76 4713 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
c0fba368
AE
4714 if (ret == -ENOENT) {
4715 image_id = kstrdup("", GFP_KERNEL);
4716 ret = image_id ? 0 : -ENOMEM;
4717 if (!ret)
4718 rbd_dev->image_format = 1;
4719 } else if (ret > sizeof (__le32)) {
4720 void *p = response;
4721
4722 image_id = ceph_extract_encoded_string(&p, p + ret,
979ed480 4723 NULL, GFP_NOIO);
c0fba368
AE
4724 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4725 if (!ret)
4726 rbd_dev->image_format = 2;
589d30e0 4727 } else {
c0fba368
AE
4728 ret = -EINVAL;
4729 }
4730
4731 if (!ret) {
4732 rbd_dev->spec->image_id = image_id;
4733 dout("image_id is %s\n", image_id);
589d30e0
AE
4734 }
4735out:
4736 kfree(response);
4737 kfree(object_name);
4738
4739 return ret;
4740}
4741
3abef3b3
AE
4742/*
4743 * Undo whatever state changes are made by v1 or v2 header info
4744 * call.
4745 */
6fd48b3b
AE
4746static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4747{
4748 struct rbd_image_header *header;
4749
392a9dad
AE
4750 /* Drop parent reference unless it's already been done (or none) */
4751
4752 if (rbd_dev->parent_overlap)
4753 rbd_dev_parent_put(rbd_dev);
6fd48b3b
AE
4754
4755 /* Free dynamic fields from the header, then zero it out */
4756
4757 header = &rbd_dev->header;
812164f8 4758 ceph_put_snap_context(header->snapc);
6fd48b3b
AE
4759 kfree(header->snap_sizes);
4760 kfree(header->snap_names);
4761 kfree(header->object_prefix);
4762 memset(header, 0, sizeof (*header));
4763}
4764
2df3fac7 4765static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
a30b71b9
AE
4766{
4767 int ret;
a30b71b9 4768
1e130199 4769 ret = rbd_dev_v2_object_prefix(rbd_dev);
57385b51 4770 if (ret)
b1b5402a
AE
4771 goto out_err;
4772
2df3fac7
AE
4773 /*
4774 * Get the and check features for the image. Currently the
4775 * features are assumed to never change.
4776 */
b1b5402a 4777 ret = rbd_dev_v2_features(rbd_dev);
57385b51 4778 if (ret)
9d475de5 4779 goto out_err;
35d489f9 4780
cc070d59
AE
4781 /* If the image supports fancy striping, get its parameters */
4782
4783 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4784 ret = rbd_dev_v2_striping_info(rbd_dev);
4785 if (ret < 0)
4786 goto out_err;
4787 }
2df3fac7 4788 /* No support for crypto and compression type format 2 images */
a30b71b9 4789
35152979 4790 return 0;
9d475de5 4791out_err:
642a2537 4792 rbd_dev->header.features = 0;
1e130199
AE
4793 kfree(rbd_dev->header.object_prefix);
4794 rbd_dev->header.object_prefix = NULL;
9d475de5
AE
4795
4796 return ret;
a30b71b9
AE
4797}
4798
124afba2 4799static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
83a06263 4800{
2f82ee54 4801 struct rbd_device *parent = NULL;
124afba2
AE
4802 struct rbd_spec *parent_spec;
4803 struct rbd_client *rbdc;
4804 int ret;
4805
4806 if (!rbd_dev->parent_spec)
4807 return 0;
4808 /*
4809 * We need to pass a reference to the client and the parent
4810 * spec when creating the parent rbd_dev. Images related by
4811 * parent/child relationships always share both.
4812 */
4813 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4814 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4815
4816 ret = -ENOMEM;
4817 parent = rbd_dev_create(rbdc, parent_spec);
4818 if (!parent)
4819 goto out_err;
4820
1f3ef788 4821 ret = rbd_dev_image_probe(parent, false);
124afba2
AE
4822 if (ret < 0)
4823 goto out_err;
4824 rbd_dev->parent = parent;
a2acd00e 4825 atomic_set(&rbd_dev->parent_ref, 1);
124afba2
AE
4826
4827 return 0;
4828out_err:
4829 if (parent) {
fb65d228 4830 rbd_dev_unparent(rbd_dev);
124afba2
AE
4831 kfree(rbd_dev->header_name);
4832 rbd_dev_destroy(parent);
4833 } else {
4834 rbd_put_client(rbdc);
4835 rbd_spec_put(parent_spec);
4836 }
4837
4838 return ret;
4839}
4840
200a6a8b 4841static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
124afba2 4842{
83a06263 4843 int ret;
d1cf5788 4844
f8a22fc2
ID
4845 /* Get an id and fill in device name. */
4846
4847 ret = rbd_dev_id_get(rbd_dev);
4848 if (ret)
4849 return ret;
83a06263 4850
83a06263
AE
4851 BUILD_BUG_ON(DEV_NAME_LEN
4852 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4853 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4854
9b60e70b 4855 /* Record our major and minor device numbers. */
83a06263 4856
9b60e70b
ID
4857 if (!single_major) {
4858 ret = register_blkdev(0, rbd_dev->name);
4859 if (ret < 0)
4860 goto err_out_id;
4861
4862 rbd_dev->major = ret;
4863 rbd_dev->minor = 0;
4864 } else {
4865 rbd_dev->major = rbd_major;
4866 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
4867 }
83a06263
AE
4868
4869 /* Set up the blkdev mapping. */
4870
4871 ret = rbd_init_disk(rbd_dev);
4872 if (ret)
4873 goto err_out_blkdev;
4874
f35a4dee 4875 ret = rbd_dev_mapping_set(rbd_dev);
83a06263
AE
4876 if (ret)
4877 goto err_out_disk;
f35a4dee
AE
4878 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4879
4880 ret = rbd_bus_add_dev(rbd_dev);
4881 if (ret)
4882 goto err_out_mapping;
83a06263 4883
83a06263
AE
4884 /* Everything's ready. Announce the disk to the world. */
4885
129b79d4 4886 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
83a06263
AE
4887 add_disk(rbd_dev->disk);
4888
4889 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4890 (unsigned long long) rbd_dev->mapping.size);
4891
4892 return ret;
2f82ee54 4893
f35a4dee
AE
4894err_out_mapping:
4895 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
4896err_out_disk:
4897 rbd_free_disk(rbd_dev);
4898err_out_blkdev:
9b60e70b
ID
4899 if (!single_major)
4900 unregister_blkdev(rbd_dev->major, rbd_dev->name);
83a06263
AE
4901err_out_id:
4902 rbd_dev_id_put(rbd_dev);
d1cf5788 4903 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
4904
4905 return ret;
4906}
4907
332bb12d
AE
4908static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4909{
4910 struct rbd_spec *spec = rbd_dev->spec;
4911 size_t size;
4912
4913 /* Record the header object name for this rbd image. */
4914
4915 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4916
4917 if (rbd_dev->image_format == 1)
4918 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4919 else
4920 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4921
4922 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4923 if (!rbd_dev->header_name)
4924 return -ENOMEM;
4925
4926 if (rbd_dev->image_format == 1)
4927 sprintf(rbd_dev->header_name, "%s%s",
4928 spec->image_name, RBD_SUFFIX);
4929 else
4930 sprintf(rbd_dev->header_name, "%s%s",
4931 RBD_HEADER_PREFIX, spec->image_id);
4932 return 0;
4933}
4934
200a6a8b
AE
4935static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4936{
6fd48b3b 4937 rbd_dev_unprobe(rbd_dev);
200a6a8b 4938 kfree(rbd_dev->header_name);
6fd48b3b
AE
4939 rbd_dev->header_name = NULL;
4940 rbd_dev->image_format = 0;
4941 kfree(rbd_dev->spec->image_id);
4942 rbd_dev->spec->image_id = NULL;
4943
200a6a8b
AE
4944 rbd_dev_destroy(rbd_dev);
4945}
4946
a30b71b9
AE
4947/*
4948 * Probe for the existence of the header object for the given rbd
1f3ef788
AE
4949 * device. If this image is the one being mapped (i.e., not a
4950 * parent), initiate a watch on its header object before using that
4951 * object to get detailed information about the rbd image.
a30b71b9 4952 */
1f3ef788 4953static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
a30b71b9
AE
4954{
4955 int ret;
4956
4957 /*
3abef3b3
AE
4958 * Get the id from the image id object. Unless there's an
4959 * error, rbd_dev->spec->image_id will be filled in with
4960 * a dynamically-allocated string, and rbd_dev->image_format
4961 * will be set to either 1 or 2.
a30b71b9
AE
4962 */
4963 ret = rbd_dev_image_id(rbd_dev);
4964 if (ret)
c0fba368
AE
4965 return ret;
4966 rbd_assert(rbd_dev->spec->image_id);
4967 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4968
332bb12d
AE
4969 ret = rbd_dev_header_name(rbd_dev);
4970 if (ret)
4971 goto err_out_format;
4972
1f3ef788 4973 if (mapping) {
fca27065 4974 ret = rbd_dev_header_watch_sync(rbd_dev);
1f3ef788
AE
4975 if (ret)
4976 goto out_header_name;
4977 }
b644de2b 4978
c0fba368 4979 if (rbd_dev->image_format == 1)
99a41ebc 4980 ret = rbd_dev_v1_header_info(rbd_dev);
a30b71b9 4981 else
2df3fac7 4982 ret = rbd_dev_v2_header_info(rbd_dev);
5655c4d9 4983 if (ret)
b644de2b 4984 goto err_out_watch;
83a06263 4985
9bb81c9b
AE
4986 ret = rbd_dev_spec_update(rbd_dev);
4987 if (ret)
33dca39f 4988 goto err_out_probe;
9bb81c9b
AE
4989
4990 ret = rbd_dev_probe_parent(rbd_dev);
30d60ba2
AE
4991 if (ret)
4992 goto err_out_probe;
4993
4994 dout("discovered format %u image, header name is %s\n",
4995 rbd_dev->image_format, rbd_dev->header_name);
83a06263 4996
30d60ba2 4997 return 0;
6fd48b3b
AE
4998err_out_probe:
4999 rbd_dev_unprobe(rbd_dev);
b644de2b 5000err_out_watch:
fca27065
ID
5001 if (mapping)
5002 rbd_dev_header_unwatch_sync(rbd_dev);
332bb12d
AE
5003out_header_name:
5004 kfree(rbd_dev->header_name);
5005 rbd_dev->header_name = NULL;
5006err_out_format:
5007 rbd_dev->image_format = 0;
5655c4d9
AE
5008 kfree(rbd_dev->spec->image_id);
5009 rbd_dev->spec->image_id = NULL;
5010
5011 dout("probe failed, returning %d\n", ret);
5012
a30b71b9
AE
5013 return ret;
5014}
5015
9b60e70b
ID
5016static ssize_t do_rbd_add(struct bus_type *bus,
5017 const char *buf,
5018 size_t count)
602adf40 5019{
cb8627c7 5020 struct rbd_device *rbd_dev = NULL;
dc79b113 5021 struct ceph_options *ceph_opts = NULL;
4e9afeba 5022 struct rbd_options *rbd_opts = NULL;
859c31df 5023 struct rbd_spec *spec = NULL;
9d3997fd 5024 struct rbd_client *rbdc;
27cc2594 5025 struct ceph_osd_client *osdc;
51344a38 5026 bool read_only;
27cc2594 5027 int rc = -ENOMEM;
602adf40
YS
5028
5029 if (!try_module_get(THIS_MODULE))
5030 return -ENODEV;
5031
602adf40 5032 /* parse add command */
859c31df 5033 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 5034 if (rc < 0)
bd4ba655 5035 goto err_out_module;
51344a38
AE
5036 read_only = rbd_opts->read_only;
5037 kfree(rbd_opts);
5038 rbd_opts = NULL; /* done with this */
78cea76e 5039
9d3997fd
AE
5040 rbdc = rbd_get_client(ceph_opts);
5041 if (IS_ERR(rbdc)) {
5042 rc = PTR_ERR(rbdc);
0ddebc0c 5043 goto err_out_args;
9d3997fd 5044 }
602adf40 5045
602adf40 5046 /* pick the pool */
9d3997fd 5047 osdc = &rbdc->client->osdc;
859c31df 5048 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
602adf40
YS
5049 if (rc < 0)
5050 goto err_out_client;
c0cd10db 5051 spec->pool_id = (u64)rc;
859c31df 5052
0903e875
AE
5053 /* The ceph file layout needs to fit pool id in 32 bits */
5054
c0cd10db
AE
5055 if (spec->pool_id > (u64)U32_MAX) {
5056 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
5057 (unsigned long long)spec->pool_id, U32_MAX);
0903e875
AE
5058 rc = -EIO;
5059 goto err_out_client;
5060 }
5061
c53d5893 5062 rbd_dev = rbd_dev_create(rbdc, spec);
bd4ba655
AE
5063 if (!rbd_dev)
5064 goto err_out_client;
c53d5893
AE
5065 rbdc = NULL; /* rbd_dev now owns this */
5066 spec = NULL; /* rbd_dev now owns this */
602adf40 5067
1f3ef788 5068 rc = rbd_dev_image_probe(rbd_dev, true);
a30b71b9 5069 if (rc < 0)
c53d5893 5070 goto err_out_rbd_dev;
05fd6f6f 5071
7ce4eef7
AE
5072 /* If we are mapping a snapshot it must be marked read-only */
5073
5074 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
5075 read_only = true;
5076 rbd_dev->mapping.read_only = read_only;
5077
b536f69a 5078 rc = rbd_dev_device_setup(rbd_dev);
3abef3b3 5079 if (rc) {
e37180c0
ID
5080 /*
5081 * rbd_dev_header_unwatch_sync() can't be moved into
5082 * rbd_dev_image_release() without refactoring, see
5083 * commit 1f3ef78861ac.
5084 */
5085 rbd_dev_header_unwatch_sync(rbd_dev);
3abef3b3
AE
5086 rbd_dev_image_release(rbd_dev);
5087 goto err_out_module;
5088 }
5089
5090 return count;
b536f69a 5091
c53d5893
AE
5092err_out_rbd_dev:
5093 rbd_dev_destroy(rbd_dev);
bd4ba655 5094err_out_client:
9d3997fd 5095 rbd_put_client(rbdc);
0ddebc0c 5096err_out_args:
859c31df 5097 rbd_spec_put(spec);
bd4ba655
AE
5098err_out_module:
5099 module_put(THIS_MODULE);
27cc2594 5100
602adf40 5101 dout("Error adding device %s\n", buf);
27cc2594 5102
c0cd10db 5103 return (ssize_t)rc;
602adf40
YS
5104}
5105
9b60e70b
ID
5106static ssize_t rbd_add(struct bus_type *bus,
5107 const char *buf,
5108 size_t count)
5109{
5110 if (single_major)
5111 return -EINVAL;
5112
5113 return do_rbd_add(bus, buf, count);
5114}
5115
5116static ssize_t rbd_add_single_major(struct bus_type *bus,
5117 const char *buf,
5118 size_t count)
5119{
5120 return do_rbd_add(bus, buf, count);
5121}
5122
200a6a8b 5123static void rbd_dev_device_release(struct device *dev)
602adf40 5124{
593a9e7b 5125 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 5126
602adf40 5127 rbd_free_disk(rbd_dev);
200a6a8b 5128 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6d80b130 5129 rbd_dev_mapping_clear(rbd_dev);
9b60e70b
ID
5130 if (!single_major)
5131 unregister_blkdev(rbd_dev->major, rbd_dev->name);
e2839308 5132 rbd_dev_id_put(rbd_dev);
d1cf5788 5133 rbd_dev_mapping_clear(rbd_dev);
602adf40
YS
5134}
5135
05a46afd
AE
5136static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5137{
ad945fc1 5138 while (rbd_dev->parent) {
05a46afd
AE
5139 struct rbd_device *first = rbd_dev;
5140 struct rbd_device *second = first->parent;
5141 struct rbd_device *third;
5142
5143 /*
5144 * Follow to the parent with no grandparent and
5145 * remove it.
5146 */
5147 while (second && (third = second->parent)) {
5148 first = second;
5149 second = third;
5150 }
ad945fc1 5151 rbd_assert(second);
8ad42cd0 5152 rbd_dev_image_release(second);
ad945fc1
AE
5153 first->parent = NULL;
5154 first->parent_overlap = 0;
5155
5156 rbd_assert(first->parent_spec);
05a46afd
AE
5157 rbd_spec_put(first->parent_spec);
5158 first->parent_spec = NULL;
05a46afd
AE
5159 }
5160}
5161
9b60e70b
ID
5162static ssize_t do_rbd_remove(struct bus_type *bus,
5163 const char *buf,
5164 size_t count)
602adf40
YS
5165{
5166 struct rbd_device *rbd_dev = NULL;
751cc0e3
AE
5167 struct list_head *tmp;
5168 int dev_id;
602adf40 5169 unsigned long ul;
82a442d2 5170 bool already = false;
0d8189e1 5171 int ret;
602adf40 5172
bb8e0e84 5173 ret = kstrtoul(buf, 10, &ul);
0d8189e1
AE
5174 if (ret)
5175 return ret;
602adf40
YS
5176
5177 /* convert to int; abort if we lost anything in the conversion */
751cc0e3
AE
5178 dev_id = (int)ul;
5179 if (dev_id != ul)
602adf40
YS
5180 return -EINVAL;
5181
751cc0e3
AE
5182 ret = -ENOENT;
5183 spin_lock(&rbd_dev_list_lock);
5184 list_for_each(tmp, &rbd_dev_list) {
5185 rbd_dev = list_entry(tmp, struct rbd_device, node);
5186 if (rbd_dev->dev_id == dev_id) {
5187 ret = 0;
5188 break;
5189 }
42382b70 5190 }
751cc0e3
AE
5191 if (!ret) {
5192 spin_lock_irq(&rbd_dev->lock);
5193 if (rbd_dev->open_count)
5194 ret = -EBUSY;
5195 else
82a442d2
AE
5196 already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
5197 &rbd_dev->flags);
751cc0e3
AE
5198 spin_unlock_irq(&rbd_dev->lock);
5199 }
5200 spin_unlock(&rbd_dev_list_lock);
82a442d2 5201 if (ret < 0 || already)
1ba0f1e7 5202 return ret;
751cc0e3 5203
fca27065 5204 rbd_dev_header_unwatch_sync(rbd_dev);
9abc5990
JD
5205 /*
5206 * flush remaining watch callbacks - these must be complete
5207 * before the osd_client is shutdown
5208 */
5209 dout("%s: flushing notifies", __func__);
5210 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
fca27065 5211
9875201e
JD
5212 /*
5213 * Don't free anything from rbd_dev->disk until after all
5214 * notifies are completely processed. Otherwise
5215 * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
5216 * in a potential use after free of rbd_dev->disk or rbd_dev.
5217 */
5218 rbd_bus_del_dev(rbd_dev);
8ad42cd0 5219 rbd_dev_image_release(rbd_dev);
79ab7558 5220 module_put(THIS_MODULE);
aafb230e 5221
1ba0f1e7 5222 return count;
602adf40
YS
5223}
5224
9b60e70b
ID
5225static ssize_t rbd_remove(struct bus_type *bus,
5226 const char *buf,
5227 size_t count)
5228{
5229 if (single_major)
5230 return -EINVAL;
5231
5232 return do_rbd_remove(bus, buf, count);
5233}
5234
5235static ssize_t rbd_remove_single_major(struct bus_type *bus,
5236 const char *buf,
5237 size_t count)
5238{
5239 return do_rbd_remove(bus, buf, count);
5240}
5241
602adf40
YS
5242/*
5243 * create control files in sysfs
dfc5606d 5244 * /sys/bus/rbd/...
602adf40
YS
5245 */
5246static int rbd_sysfs_init(void)
5247{
dfc5606d 5248 int ret;
602adf40 5249
fed4c143 5250 ret = device_register(&rbd_root_dev);
21079786 5251 if (ret < 0)
dfc5606d 5252 return ret;
602adf40 5253
fed4c143
AE
5254 ret = bus_register(&rbd_bus_type);
5255 if (ret < 0)
5256 device_unregister(&rbd_root_dev);
602adf40 5257
602adf40
YS
5258 return ret;
5259}
5260
5261static void rbd_sysfs_cleanup(void)
5262{
dfc5606d 5263 bus_unregister(&rbd_bus_type);
fed4c143 5264 device_unregister(&rbd_root_dev);
602adf40
YS
5265}
5266
1c2a9dfe
AE
5267static int rbd_slab_init(void)
5268{
5269 rbd_assert(!rbd_img_request_cache);
5270 rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5271 sizeof (struct rbd_img_request),
5272 __alignof__(struct rbd_img_request),
5273 0, NULL);
868311b1
AE
5274 if (!rbd_img_request_cache)
5275 return -ENOMEM;
5276
5277 rbd_assert(!rbd_obj_request_cache);
5278 rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5279 sizeof (struct rbd_obj_request),
5280 __alignof__(struct rbd_obj_request),
5281 0, NULL);
78c2a44a
AE
5282 if (!rbd_obj_request_cache)
5283 goto out_err;
5284
5285 rbd_assert(!rbd_segment_name_cache);
5286 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
2d0ebc5d 5287 CEPH_MAX_OID_NAME_LEN + 1, 1, 0, NULL);
78c2a44a 5288 if (rbd_segment_name_cache)
1c2a9dfe 5289 return 0;
78c2a44a
AE
5290out_err:
5291 if (rbd_obj_request_cache) {
5292 kmem_cache_destroy(rbd_obj_request_cache);
5293 rbd_obj_request_cache = NULL;
5294 }
1c2a9dfe 5295
868311b1
AE
5296 kmem_cache_destroy(rbd_img_request_cache);
5297 rbd_img_request_cache = NULL;
5298
1c2a9dfe
AE
5299 return -ENOMEM;
5300}
5301
5302static void rbd_slab_exit(void)
5303{
78c2a44a
AE
5304 rbd_assert(rbd_segment_name_cache);
5305 kmem_cache_destroy(rbd_segment_name_cache);
5306 rbd_segment_name_cache = NULL;
5307
868311b1
AE
5308 rbd_assert(rbd_obj_request_cache);
5309 kmem_cache_destroy(rbd_obj_request_cache);
5310 rbd_obj_request_cache = NULL;
5311
1c2a9dfe
AE
5312 rbd_assert(rbd_img_request_cache);
5313 kmem_cache_destroy(rbd_img_request_cache);
5314 rbd_img_request_cache = NULL;
5315}
5316
cc344fa1 5317static int __init rbd_init(void)
602adf40
YS
5318{
5319 int rc;
5320
1e32d34c
AE
5321 if (!libceph_compatible(NULL)) {
5322 rbd_warn(NULL, "libceph incompatibility (quitting)");
1e32d34c
AE
5323 return -EINVAL;
5324 }
e1b4d96d 5325
1c2a9dfe 5326 rc = rbd_slab_init();
602adf40
YS
5327 if (rc)
5328 return rc;
e1b4d96d 5329
9b60e70b
ID
5330 if (single_major) {
5331 rbd_major = register_blkdev(0, RBD_DRV_NAME);
5332 if (rbd_major < 0) {
5333 rc = rbd_major;
5334 goto err_out_slab;
5335 }
5336 }
5337
1c2a9dfe
AE
5338 rc = rbd_sysfs_init();
5339 if (rc)
9b60e70b
ID
5340 goto err_out_blkdev;
5341
5342 if (single_major)
5343 pr_info("loaded (major %d)\n", rbd_major);
5344 else
5345 pr_info("loaded\n");
1c2a9dfe 5346
e1b4d96d
ID
5347 return 0;
5348
9b60e70b
ID
5349err_out_blkdev:
5350 if (single_major)
5351 unregister_blkdev(rbd_major, RBD_DRV_NAME);
e1b4d96d
ID
5352err_out_slab:
5353 rbd_slab_exit();
1c2a9dfe 5354 return rc;
602adf40
YS
5355}
5356
cc344fa1 5357static void __exit rbd_exit(void)
602adf40
YS
5358{
5359 rbd_sysfs_cleanup();
9b60e70b
ID
5360 if (single_major)
5361 unregister_blkdev(rbd_major, RBD_DRV_NAME);
1c2a9dfe 5362 rbd_slab_exit();
602adf40
YS
5363}
5364
5365module_init(rbd_init);
5366module_exit(rbd_exit);
5367
d552c619 5368MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
602adf40
YS
5369MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5370MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
602adf40
YS
5371/* following authorship retained from original osdblk.c */
5372MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5373
90da258b 5374MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
602adf40 5375MODULE_LICENSE("GPL");
This page took 0.556648 seconds and 5 git commands to generate.