rbd: send notify ack asynchronously
[deliverable/linux.git] / drivers / block / rbd.c
CommitLineData
602adf40
YS
1/*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
dfc5606d 24 For usage instructions, please refer to:
602adf40 25
dfc5606d 26 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
59c2be1e 34#include <linux/parser.h>
602adf40
YS
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
aafb230e
AE
44#define RBD_DEBUG /* Activate rbd_assert() calls */
45
593a9e7b
AE
46/*
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
51 */
52#define SECTOR_SHIFT 9
53#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
54
2647ba38 55/* It might be useful to have these defined elsewhere */
df111be6 56
2647ba38
AE
57#define U8_MAX ((u8) (~0U))
58#define U16_MAX ((u16) (~0U))
59#define U32_MAX ((u32) (~0U))
60#define U64_MAX ((u64) (~0ULL))
df111be6 61
f0f8cef5
AE
62#define RBD_DRV_NAME "rbd"
63#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
64
65#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
66
d4b125e9
AE
67#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
68#define RBD_MAX_SNAP_NAME_LEN \
69 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
70
35d489f9 71#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
72
73#define RBD_SNAP_HEAD_NAME "-"
74
9e15b77d
AE
75/* This allows a single page to hold an image name sent by OSD */
76#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 77#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 78
1e130199 79#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 80
d889140c
AE
81/* Feature bits */
82
83#define RBD_FEATURE_LAYERING 1
84
85/* Features supported by this (client software) implementation. */
86
87#define RBD_FEATURES_ALL (0)
88
81a89793
AE
89/*
90 * An RBD device name will be "rbd#", where the "rbd" comes from
91 * RBD_DRV_NAME above, and # is a unique integer identifier.
92 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
93 * enough to hold all possible device names.
94 */
602adf40 95#define DEV_NAME_LEN 32
81a89793 96#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40
YS
97
98/*
99 * block device image metadata (in-memory version)
100 */
101struct rbd_image_header {
f84344f3 102 /* These four fields never change for a given rbd image */
849b4260 103 char *object_prefix;
34b13184 104 u64 features;
602adf40
YS
105 __u8 obj_order;
106 __u8 crypt_type;
107 __u8 comp_type;
602adf40 108
f84344f3
AE
109 /* The remaining fields need to be updated occasionally */
110 u64 image_size;
111 struct ceph_snap_context *snapc;
602adf40
YS
112 char *snap_names;
113 u64 *snap_sizes;
59c2be1e
YS
114
115 u64 obj_version;
116};
117
0d7dbfce
AE
118/*
119 * An rbd image specification.
120 *
121 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
122 * identify an image. Each rbd_dev structure includes a pointer to
123 * an rbd_spec structure that encapsulates this identity.
124 *
125 * Each of the id's in an rbd_spec has an associated name. For a
126 * user-mapped image, the names are supplied and the id's associated
127 * with them are looked up. For a layered image, a parent image is
128 * defined by the tuple, and the names are looked up.
129 *
130 * An rbd_dev structure contains a parent_spec pointer which is
131 * non-null if the image it represents is a child in a layered
132 * image. This pointer will refer to the rbd_spec structure used
133 * by the parent rbd_dev for its own identity (i.e., the structure
134 * is shared between the parent and child).
135 *
136 * Since these structures are populated once, during the discovery
137 * phase of image construction, they are effectively immutable so
138 * we make no effort to synchronize access to them.
139 *
140 * Note that code herein does not assume the image name is known (it
141 * could be a null pointer).
0d7dbfce
AE
142 */
143struct rbd_spec {
144 u64 pool_id;
145 char *pool_name;
146
147 char *image_id;
0d7dbfce 148 char *image_name;
0d7dbfce
AE
149
150 u64 snap_id;
151 char *snap_name;
152
153 struct kref kref;
154};
155
602adf40 156/*
f0f8cef5 157 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
158 */
159struct rbd_client {
160 struct ceph_client *client;
161 struct kref kref;
162 struct list_head node;
163};
164
bf0d5f50
AE
165struct rbd_img_request;
166typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
167
168#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
169
170struct rbd_obj_request;
171typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
172
9969ebc5
AE
173enum obj_request_type {
174 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
175};
bf0d5f50
AE
176
177struct rbd_obj_request {
178 const char *object_name;
179 u64 offset; /* object start byte */
180 u64 length; /* bytes from offset */
181
182 struct rbd_img_request *img_request;
183 struct list_head links; /* img_request->obj_requests */
184 u32 which; /* posn image request list */
185
186 enum obj_request_type type;
788e2df3
AE
187 union {
188 struct bio *bio_list;
189 struct {
190 struct page **pages;
191 u32 page_count;
192 };
193 };
bf0d5f50
AE
194
195 struct ceph_osd_request *osd_req;
196
197 u64 xferred; /* bytes transferred */
198 u64 version;
199 s32 result;
200 atomic_t done;
201
202 rbd_obj_callback_t callback;
788e2df3 203 struct completion completion;
bf0d5f50
AE
204
205 struct kref kref;
206};
207
208struct rbd_img_request {
209 struct request *rq;
210 struct rbd_device *rbd_dev;
211 u64 offset; /* starting image byte offset */
212 u64 length; /* byte count from offset */
213 bool write_request; /* false for read */
214 union {
215 struct ceph_snap_context *snapc; /* for writes */
216 u64 snap_id; /* for reads */
217 };
218 spinlock_t completion_lock;/* protects next_completion */
219 u32 next_completion;
220 rbd_img_callback_t callback;
221
222 u32 obj_request_count;
223 struct list_head obj_requests; /* rbd_obj_request structs */
224
225 struct kref kref;
226};
227
228#define for_each_obj_request(ireq, oreq) \
229 list_for_each_entry(oreq, &ireq->obj_requests, links)
230#define for_each_obj_request_from(ireq, oreq) \
231 list_for_each_entry_from(oreq, &ireq->obj_requests, links)
232#define for_each_obj_request_safe(ireq, oreq, n) \
233 list_for_each_entry_safe_reverse(oreq, n, &ireq->obj_requests, links)
234
dfc5606d
YS
235struct rbd_snap {
236 struct device dev;
237 const char *name;
3591538f 238 u64 size;
dfc5606d
YS
239 struct list_head node;
240 u64 id;
34b13184 241 u64 features;
dfc5606d
YS
242};
243
f84344f3 244struct rbd_mapping {
99c1f08f 245 u64 size;
34b13184 246 u64 features;
f84344f3
AE
247 bool read_only;
248};
249
602adf40
YS
250/*
251 * a single device
252 */
253struct rbd_device {
de71a297 254 int dev_id; /* blkdev unique id */
602adf40
YS
255
256 int major; /* blkdev assigned major */
257 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 258
a30b71b9 259 u32 image_format; /* Either 1 or 2 */
602adf40
YS
260 struct rbd_client *rbd_client;
261
262 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
263
264 spinlock_t lock; /* queue lock */
265
266 struct rbd_image_header header;
d78b650a 267 atomic_t exists;
0d7dbfce 268 struct rbd_spec *spec;
602adf40 269
0d7dbfce 270 char *header_name;
971f839a 271
0903e875
AE
272 struct ceph_file_layout layout;
273
59c2be1e
YS
274 struct ceph_osd_event *watch_event;
275 struct ceph_osd_request *watch_request;
276
86b00e0d
AE
277 struct rbd_spec *parent_spec;
278 u64 parent_overlap;
279
c666601a
JD
280 /* protects updating the header */
281 struct rw_semaphore header_rwsem;
f84344f3
AE
282
283 struct rbd_mapping mapping;
602adf40
YS
284
285 struct list_head node;
dfc5606d
YS
286
287 /* list of snapshots */
288 struct list_head snaps;
289
290 /* sysfs related */
291 struct device dev;
42382b70 292 unsigned long open_count;
dfc5606d
YS
293};
294
602adf40 295static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 296
602adf40 297static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
298static DEFINE_SPINLOCK(rbd_dev_list_lock);
299
432b8587
AE
300static LIST_HEAD(rbd_client_list); /* clients */
301static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 302
304f6808
AE
303static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
304static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
305
dfc5606d 306static void rbd_dev_release(struct device *dev);
41f38c2b 307static void rbd_remove_snap_dev(struct rbd_snap *snap);
dfc5606d 308
f0f8cef5
AE
309static ssize_t rbd_add(struct bus_type *bus, const char *buf,
310 size_t count);
311static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
312 size_t count);
313
314static struct bus_attribute rbd_bus_attrs[] = {
315 __ATTR(add, S_IWUSR, NULL, rbd_add),
316 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
317 __ATTR_NULL
318};
319
320static struct bus_type rbd_bus_type = {
321 .name = "rbd",
322 .bus_attrs = rbd_bus_attrs,
323};
324
325static void rbd_root_dev_release(struct device *dev)
326{
327}
328
329static struct device rbd_root_dev = {
330 .init_name = "rbd",
331 .release = rbd_root_dev_release,
332};
333
06ecc6cb
AE
334static __printf(2, 3)
335void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
336{
337 struct va_format vaf;
338 va_list args;
339
340 va_start(args, fmt);
341 vaf.fmt = fmt;
342 vaf.va = &args;
343
344 if (!rbd_dev)
345 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
346 else if (rbd_dev->disk)
347 printk(KERN_WARNING "%s: %s: %pV\n",
348 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
349 else if (rbd_dev->spec && rbd_dev->spec->image_name)
350 printk(KERN_WARNING "%s: image %s: %pV\n",
351 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
352 else if (rbd_dev->spec && rbd_dev->spec->image_id)
353 printk(KERN_WARNING "%s: id %s: %pV\n",
354 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
355 else /* punt */
356 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
357 RBD_DRV_NAME, rbd_dev, &vaf);
358 va_end(args);
359}
360
aafb230e
AE
361#ifdef RBD_DEBUG
362#define rbd_assert(expr) \
363 if (unlikely(!(expr))) { \
364 printk(KERN_ERR "\nAssertion failure in %s() " \
365 "at line %d:\n\n" \
366 "\trbd_assert(%s);\n\n", \
367 __func__, __LINE__, #expr); \
368 BUG(); \
369 }
370#else /* !RBD_DEBUG */
371# define rbd_assert(expr) ((void) 0)
372#endif /* !RBD_DEBUG */
dfc5606d 373
117973fb
AE
374static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
375static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
59c2be1e 376
602adf40
YS
377static int rbd_open(struct block_device *bdev, fmode_t mode)
378{
f0f8cef5 379 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
602adf40 380
f84344f3 381 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
602adf40
YS
382 return -EROFS;
383
42382b70 384 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 385 (void) get_device(&rbd_dev->dev);
f84344f3 386 set_device_ro(bdev, rbd_dev->mapping.read_only);
42382b70
AE
387 rbd_dev->open_count++;
388 mutex_unlock(&ctl_mutex);
340c7a2b 389
602adf40
YS
390 return 0;
391}
392
dfc5606d
YS
393static int rbd_release(struct gendisk *disk, fmode_t mode)
394{
395 struct rbd_device *rbd_dev = disk->private_data;
396
42382b70
AE
397 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
398 rbd_assert(rbd_dev->open_count > 0);
399 rbd_dev->open_count--;
c3e946ce 400 put_device(&rbd_dev->dev);
42382b70 401 mutex_unlock(&ctl_mutex);
dfc5606d
YS
402
403 return 0;
404}
405
602adf40
YS
406static const struct block_device_operations rbd_bd_ops = {
407 .owner = THIS_MODULE,
408 .open = rbd_open,
dfc5606d 409 .release = rbd_release,
602adf40
YS
410};
411
412/*
413 * Initialize an rbd client instance.
43ae4701 414 * We own *ceph_opts.
602adf40 415 */
f8c38929 416static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
417{
418 struct rbd_client *rbdc;
419 int ret = -ENOMEM;
420
421 dout("rbd_client_create\n");
422 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
423 if (!rbdc)
424 goto out_opt;
425
426 kref_init(&rbdc->kref);
427 INIT_LIST_HEAD(&rbdc->node);
428
bc534d86
AE
429 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
430
43ae4701 431 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 432 if (IS_ERR(rbdc->client))
bc534d86 433 goto out_mutex;
43ae4701 434 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
435
436 ret = ceph_open_session(rbdc->client);
437 if (ret < 0)
438 goto out_err;
439
432b8587 440 spin_lock(&rbd_client_list_lock);
602adf40 441 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 442 spin_unlock(&rbd_client_list_lock);
602adf40 443
bc534d86
AE
444 mutex_unlock(&ctl_mutex);
445
602adf40
YS
446 dout("rbd_client_create created %p\n", rbdc);
447 return rbdc;
448
449out_err:
450 ceph_destroy_client(rbdc->client);
bc534d86
AE
451out_mutex:
452 mutex_unlock(&ctl_mutex);
602adf40
YS
453 kfree(rbdc);
454out_opt:
43ae4701
AE
455 if (ceph_opts)
456 ceph_destroy_options(ceph_opts);
28f259b7 457 return ERR_PTR(ret);
602adf40
YS
458}
459
460/*
1f7ba331
AE
461 * Find a ceph client with specific addr and configuration. If
462 * found, bump its reference count.
602adf40 463 */
1f7ba331 464static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
465{
466 struct rbd_client *client_node;
1f7ba331 467 bool found = false;
602adf40 468
43ae4701 469 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
470 return NULL;
471
1f7ba331
AE
472 spin_lock(&rbd_client_list_lock);
473 list_for_each_entry(client_node, &rbd_client_list, node) {
474 if (!ceph_compare_options(ceph_opts, client_node->client)) {
475 kref_get(&client_node->kref);
476 found = true;
477 break;
478 }
479 }
480 spin_unlock(&rbd_client_list_lock);
481
482 return found ? client_node : NULL;
602adf40
YS
483}
484
59c2be1e
YS
485/*
486 * mount options
487 */
488enum {
59c2be1e
YS
489 Opt_last_int,
490 /* int args above */
491 Opt_last_string,
492 /* string args above */
cc0538b6
AE
493 Opt_read_only,
494 Opt_read_write,
495 /* Boolean args above */
496 Opt_last_bool,
59c2be1e
YS
497};
498
43ae4701 499static match_table_t rbd_opts_tokens = {
59c2be1e
YS
500 /* int args above */
501 /* string args above */
be466c1c 502 {Opt_read_only, "read_only"},
cc0538b6
AE
503 {Opt_read_only, "ro"}, /* Alternate spelling */
504 {Opt_read_write, "read_write"},
505 {Opt_read_write, "rw"}, /* Alternate spelling */
506 /* Boolean args above */
59c2be1e
YS
507 {-1, NULL}
508};
509
98571b5a
AE
510struct rbd_options {
511 bool read_only;
512};
513
514#define RBD_READ_ONLY_DEFAULT false
515
59c2be1e
YS
516static int parse_rbd_opts_token(char *c, void *private)
517{
43ae4701 518 struct rbd_options *rbd_opts = private;
59c2be1e
YS
519 substring_t argstr[MAX_OPT_ARGS];
520 int token, intval, ret;
521
43ae4701 522 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
523 if (token < 0)
524 return -EINVAL;
525
526 if (token < Opt_last_int) {
527 ret = match_int(&argstr[0], &intval);
528 if (ret < 0) {
529 pr_err("bad mount option arg (not int) "
530 "at '%s'\n", c);
531 return ret;
532 }
533 dout("got int token %d val %d\n", token, intval);
534 } else if (token > Opt_last_int && token < Opt_last_string) {
535 dout("got string token %d val %s\n", token,
536 argstr[0].from);
cc0538b6
AE
537 } else if (token > Opt_last_string && token < Opt_last_bool) {
538 dout("got Boolean token %d\n", token);
59c2be1e
YS
539 } else {
540 dout("got token %d\n", token);
541 }
542
543 switch (token) {
cc0538b6
AE
544 case Opt_read_only:
545 rbd_opts->read_only = true;
546 break;
547 case Opt_read_write:
548 rbd_opts->read_only = false;
549 break;
59c2be1e 550 default:
aafb230e
AE
551 rbd_assert(false);
552 break;
59c2be1e
YS
553 }
554 return 0;
555}
556
602adf40
YS
557/*
558 * Get a ceph client with specific addr and configuration, if one does
559 * not exist create it.
560 */
9d3997fd 561static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
602adf40 562{
f8c38929 563 struct rbd_client *rbdc;
59c2be1e 564
1f7ba331 565 rbdc = rbd_client_find(ceph_opts);
9d3997fd 566 if (rbdc) /* using an existing client */
43ae4701 567 ceph_destroy_options(ceph_opts);
9d3997fd 568 else
f8c38929 569 rbdc = rbd_client_create(ceph_opts);
602adf40 570
9d3997fd 571 return rbdc;
602adf40
YS
572}
573
574/*
575 * Destroy ceph client
d23a4b3f 576 *
432b8587 577 * Caller must hold rbd_client_list_lock.
602adf40
YS
578 */
579static void rbd_client_release(struct kref *kref)
580{
581 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
582
583 dout("rbd_release_client %p\n", rbdc);
cd9d9f5d 584 spin_lock(&rbd_client_list_lock);
602adf40 585 list_del(&rbdc->node);
cd9d9f5d 586 spin_unlock(&rbd_client_list_lock);
602adf40
YS
587
588 ceph_destroy_client(rbdc->client);
589 kfree(rbdc);
590}
591
592/*
593 * Drop reference to ceph client node. If it's not referenced anymore, release
594 * it.
595 */
9d3997fd 596static void rbd_put_client(struct rbd_client *rbdc)
602adf40 597{
c53d5893
AE
598 if (rbdc)
599 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
600}
601
a30b71b9
AE
602static bool rbd_image_format_valid(u32 image_format)
603{
604 return image_format == 1 || image_format == 2;
605}
606
8e94af8e
AE
607static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
608{
103a150f
AE
609 size_t size;
610 u32 snap_count;
611
612 /* The header has to start with the magic rbd header text */
613 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
614 return false;
615
db2388b6
AE
616 /* The bio layer requires at least sector-sized I/O */
617
618 if (ondisk->options.order < SECTOR_SHIFT)
619 return false;
620
621 /* If we use u64 in a few spots we may be able to loosen this */
622
623 if (ondisk->options.order > 8 * sizeof (int) - 1)
624 return false;
625
103a150f
AE
626 /*
627 * The size of a snapshot header has to fit in a size_t, and
628 * that limits the number of snapshots.
629 */
630 snap_count = le32_to_cpu(ondisk->snap_count);
631 size = SIZE_MAX - sizeof (struct ceph_snap_context);
632 if (snap_count > size / sizeof (__le64))
633 return false;
634
635 /*
636 * Not only that, but the size of the entire the snapshot
637 * header must also be representable in a size_t.
638 */
639 size -= snap_count * sizeof (__le64);
640 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
641 return false;
642
643 return true;
8e94af8e
AE
644}
645
602adf40
YS
646/*
647 * Create a new header structure, translate header format from the on-disk
648 * header.
649 */
650static int rbd_header_from_disk(struct rbd_image_header *header,
4156d998 651 struct rbd_image_header_ondisk *ondisk)
602adf40 652{
ccece235 653 u32 snap_count;
58c17b0e 654 size_t len;
d2bb24e5 655 size_t size;
621901d6 656 u32 i;
602adf40 657
6a52325f
AE
658 memset(header, 0, sizeof (*header));
659
103a150f
AE
660 snap_count = le32_to_cpu(ondisk->snap_count);
661
58c17b0e
AE
662 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
663 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
6a52325f 664 if (!header->object_prefix)
602adf40 665 return -ENOMEM;
58c17b0e
AE
666 memcpy(header->object_prefix, ondisk->object_prefix, len);
667 header->object_prefix[len] = '\0';
00f1f36f 668
602adf40 669 if (snap_count) {
f785cc1d
AE
670 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
671
621901d6
AE
672 /* Save a copy of the snapshot names */
673
f785cc1d
AE
674 if (snap_names_len > (u64) SIZE_MAX)
675 return -EIO;
676 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
602adf40 677 if (!header->snap_names)
6a52325f 678 goto out_err;
f785cc1d
AE
679 /*
680 * Note that rbd_dev_v1_header_read() guarantees
681 * the ondisk buffer we're working with has
682 * snap_names_len bytes beyond the end of the
683 * snapshot id array, this memcpy() is safe.
684 */
685 memcpy(header->snap_names, &ondisk->snaps[snap_count],
686 snap_names_len);
6a52325f 687
621901d6
AE
688 /* Record each snapshot's size */
689
d2bb24e5
AE
690 size = snap_count * sizeof (*header->snap_sizes);
691 header->snap_sizes = kmalloc(size, GFP_KERNEL);
602adf40 692 if (!header->snap_sizes)
6a52325f 693 goto out_err;
621901d6
AE
694 for (i = 0; i < snap_count; i++)
695 header->snap_sizes[i] =
696 le64_to_cpu(ondisk->snaps[i].image_size);
602adf40 697 } else {
ccece235 698 WARN_ON(ondisk->snap_names_len);
602adf40
YS
699 header->snap_names = NULL;
700 header->snap_sizes = NULL;
701 }
849b4260 702
34b13184 703 header->features = 0; /* No features support in v1 images */
602adf40
YS
704 header->obj_order = ondisk->options.order;
705 header->crypt_type = ondisk->options.crypt_type;
706 header->comp_type = ondisk->options.comp_type;
6a52325f 707
621901d6
AE
708 /* Allocate and fill in the snapshot context */
709
f84344f3 710 header->image_size = le64_to_cpu(ondisk->image_size);
6a52325f
AE
711 size = sizeof (struct ceph_snap_context);
712 size += snap_count * sizeof (header->snapc->snaps[0]);
713 header->snapc = kzalloc(size, GFP_KERNEL);
714 if (!header->snapc)
715 goto out_err;
602adf40
YS
716
717 atomic_set(&header->snapc->nref, 1);
505cbb9b 718 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 719 header->snapc->num_snaps = snap_count;
621901d6
AE
720 for (i = 0; i < snap_count; i++)
721 header->snapc->snaps[i] =
722 le64_to_cpu(ondisk->snaps[i].id);
602adf40
YS
723
724 return 0;
725
6a52325f 726out_err:
849b4260 727 kfree(header->snap_sizes);
ccece235 728 header->snap_sizes = NULL;
602adf40 729 kfree(header->snap_names);
ccece235 730 header->snap_names = NULL;
6a52325f
AE
731 kfree(header->object_prefix);
732 header->object_prefix = NULL;
ccece235 733
00f1f36f 734 return -ENOMEM;
602adf40
YS
735}
736
9e15b77d
AE
737static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
738{
739 struct rbd_snap *snap;
740
741 if (snap_id == CEPH_NOSNAP)
742 return RBD_SNAP_HEAD_NAME;
743
744 list_for_each_entry(snap, &rbd_dev->snaps, node)
745 if (snap_id == snap->id)
746 return snap->name;
747
748 return NULL;
749}
750
8836b995 751static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
602adf40 752{
602adf40 753
e86924a8 754 struct rbd_snap *snap;
602adf40 755
e86924a8
AE
756 list_for_each_entry(snap, &rbd_dev->snaps, node) {
757 if (!strcmp(snap_name, snap->name)) {
0d7dbfce 758 rbd_dev->spec->snap_id = snap->id;
e86924a8 759 rbd_dev->mapping.size = snap->size;
34b13184 760 rbd_dev->mapping.features = snap->features;
602adf40 761
e86924a8 762 return 0;
00f1f36f 763 }
00f1f36f 764 }
e86924a8 765
00f1f36f 766 return -ENOENT;
602adf40
YS
767}
768
819d52bf 769static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
602adf40 770{
78dc447d 771 int ret;
602adf40 772
0d7dbfce 773 if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
cc9d734c 774 sizeof (RBD_SNAP_HEAD_NAME))) {
0d7dbfce 775 rbd_dev->spec->snap_id = CEPH_NOSNAP;
99c1f08f 776 rbd_dev->mapping.size = rbd_dev->header.image_size;
34b13184 777 rbd_dev->mapping.features = rbd_dev->header.features;
e86924a8 778 ret = 0;
602adf40 779 } else {
0d7dbfce 780 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
602adf40
YS
781 if (ret < 0)
782 goto done;
f84344f3 783 rbd_dev->mapping.read_only = true;
602adf40 784 }
d78b650a 785 atomic_set(&rbd_dev->exists, 1);
602adf40 786done:
602adf40
YS
787 return ret;
788}
789
790static void rbd_header_free(struct rbd_image_header *header)
791{
849b4260 792 kfree(header->object_prefix);
d78fd7ae 793 header->object_prefix = NULL;
602adf40 794 kfree(header->snap_sizes);
d78fd7ae 795 header->snap_sizes = NULL;
849b4260 796 kfree(header->snap_names);
d78fd7ae 797 header->snap_names = NULL;
d1d25646 798 ceph_put_snap_context(header->snapc);
d78fd7ae 799 header->snapc = NULL;
602adf40
YS
800}
801
98571b5a 802static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
602adf40 803{
65ccfe21
AE
804 char *name;
805 u64 segment;
806 int ret;
602adf40 807
2fd82b9e 808 name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
65ccfe21
AE
809 if (!name)
810 return NULL;
811 segment = offset >> rbd_dev->header.obj_order;
2fd82b9e 812 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
65ccfe21 813 rbd_dev->header.object_prefix, segment);
2fd82b9e 814 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
65ccfe21
AE
815 pr_err("error formatting segment name for #%llu (%d)\n",
816 segment, ret);
817 kfree(name);
818 name = NULL;
819 }
602adf40 820
65ccfe21
AE
821 return name;
822}
602adf40 823
65ccfe21
AE
824static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
825{
826 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
602adf40 827
65ccfe21
AE
828 return offset & (segment_size - 1);
829}
830
831static u64 rbd_segment_length(struct rbd_device *rbd_dev,
832 u64 offset, u64 length)
833{
834 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
835
836 offset &= segment_size - 1;
837
aafb230e 838 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
839 if (offset + length > segment_size)
840 length = segment_size - offset;
841
842 return length;
602adf40
YS
843}
844
029bcbd8
JD
845/*
846 * returns the size of an object in the image
847 */
848static u64 rbd_obj_bytes(struct rbd_image_header *header)
849{
850 return 1 << header->obj_order;
851}
852
602adf40
YS
853/*
854 * bio helpers
855 */
856
857static void bio_chain_put(struct bio *chain)
858{
859 struct bio *tmp;
860
861 while (chain) {
862 tmp = chain;
863 chain = chain->bi_next;
864 bio_put(tmp);
865 }
866}
867
868/*
869 * zeros a bio chain, starting at specific offset
870 */
871static void zero_bio_chain(struct bio *chain, int start_ofs)
872{
873 struct bio_vec *bv;
874 unsigned long flags;
875 void *buf;
876 int i;
877 int pos = 0;
878
879 while (chain) {
880 bio_for_each_segment(bv, chain, i) {
881 if (pos + bv->bv_len > start_ofs) {
882 int remainder = max(start_ofs - pos, 0);
883 buf = bvec_kmap_irq(bv, &flags);
884 memset(buf + remainder, 0,
885 bv->bv_len - remainder);
85b5aaa6 886 bvec_kunmap_irq(buf, &flags);
602adf40
YS
887 }
888 pos += bv->bv_len;
889 }
890
891 chain = chain->bi_next;
892 }
893}
894
895/*
f7760dad
AE
896 * Clone a portion of a bio, starting at the given byte offset
897 * and continuing for the number of bytes indicated.
602adf40 898 */
f7760dad
AE
899static struct bio *bio_clone_range(struct bio *bio_src,
900 unsigned int offset,
901 unsigned int len,
902 gfp_t gfpmask)
602adf40 903{
f7760dad
AE
904 struct bio_vec *bv;
905 unsigned int resid;
906 unsigned short idx;
907 unsigned int voff;
908 unsigned short end_idx;
909 unsigned short vcnt;
910 struct bio *bio;
911
912 /* Handle the easy case for the caller */
913
914 if (!offset && len == bio_src->bi_size)
915 return bio_clone(bio_src, gfpmask);
916
917 if (WARN_ON_ONCE(!len))
918 return NULL;
919 if (WARN_ON_ONCE(len > bio_src->bi_size))
920 return NULL;
921 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
922 return NULL;
923
924 /* Find first affected segment... */
925
926 resid = offset;
927 __bio_for_each_segment(bv, bio_src, idx, 0) {
928 if (resid < bv->bv_len)
929 break;
930 resid -= bv->bv_len;
602adf40 931 }
f7760dad 932 voff = resid;
602adf40 933
f7760dad 934 /* ...and the last affected segment */
602adf40 935
f7760dad
AE
936 resid += len;
937 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
938 if (resid <= bv->bv_len)
939 break;
940 resid -= bv->bv_len;
941 }
942 vcnt = end_idx - idx + 1;
943
944 /* Build the clone */
945
946 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
947 if (!bio)
948 return NULL; /* ENOMEM */
602adf40 949
f7760dad
AE
950 bio->bi_bdev = bio_src->bi_bdev;
951 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
952 bio->bi_rw = bio_src->bi_rw;
953 bio->bi_flags |= 1 << BIO_CLONED;
954
955 /*
956 * Copy over our part of the bio_vec, then update the first
957 * and last (or only) entries.
958 */
959 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
960 vcnt * sizeof (struct bio_vec));
961 bio->bi_io_vec[0].bv_offset += voff;
962 if (vcnt > 1) {
963 bio->bi_io_vec[0].bv_len -= voff;
964 bio->bi_io_vec[vcnt - 1].bv_len = resid;
965 } else {
966 bio->bi_io_vec[0].bv_len = len;
602adf40
YS
967 }
968
f7760dad
AE
969 bio->bi_vcnt = vcnt;
970 bio->bi_size = len;
971 bio->bi_idx = 0;
972
973 return bio;
974}
975
976/*
977 * Clone a portion of a bio chain, starting at the given byte offset
978 * into the first bio in the source chain and continuing for the
979 * number of bytes indicated. The result is another bio chain of
980 * exactly the given length, or a null pointer on error.
981 *
982 * The bio_src and offset parameters are both in-out. On entry they
983 * refer to the first source bio and the offset into that bio where
984 * the start of data to be cloned is located.
985 *
986 * On return, bio_src is updated to refer to the bio in the source
987 * chain that contains first un-cloned byte, and *offset will
988 * contain the offset of that byte within that bio.
989 */
990static struct bio *bio_chain_clone_range(struct bio **bio_src,
991 unsigned int *offset,
992 unsigned int len,
993 gfp_t gfpmask)
994{
995 struct bio *bi = *bio_src;
996 unsigned int off = *offset;
997 struct bio *chain = NULL;
998 struct bio **end;
999
1000 /* Build up a chain of clone bios up to the limit */
1001
1002 if (!bi || off >= bi->bi_size || !len)
1003 return NULL; /* Nothing to clone */
602adf40 1004
f7760dad
AE
1005 end = &chain;
1006 while (len) {
1007 unsigned int bi_size;
1008 struct bio *bio;
1009
f5400b7a
AE
1010 if (!bi) {
1011 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
f7760dad 1012 goto out_err; /* EINVAL; ran out of bio's */
f5400b7a 1013 }
f7760dad
AE
1014 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1015 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1016 if (!bio)
1017 goto out_err; /* ENOMEM */
1018
1019 *end = bio;
1020 end = &bio->bi_next;
602adf40 1021
f7760dad
AE
1022 off += bi_size;
1023 if (off == bi->bi_size) {
1024 bi = bi->bi_next;
1025 off = 0;
1026 }
1027 len -= bi_size;
1028 }
1029 *bio_src = bi;
1030 *offset = off;
1031
1032 return chain;
1033out_err:
1034 bio_chain_put(chain);
602adf40 1035
602adf40
YS
1036 return NULL;
1037}
1038
bf0d5f50
AE
1039static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1040{
1041 kref_get(&obj_request->kref);
1042}
1043
1044static void rbd_obj_request_destroy(struct kref *kref);
1045static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1046{
1047 rbd_assert(obj_request != NULL);
1048 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1049}
1050
1051static void rbd_img_request_get(struct rbd_img_request *img_request)
1052{
1053 kref_get(&img_request->kref);
1054}
1055
1056static void rbd_img_request_destroy(struct kref *kref);
1057static void rbd_img_request_put(struct rbd_img_request *img_request)
1058{
1059 rbd_assert(img_request != NULL);
1060 kref_put(&img_request->kref, rbd_img_request_destroy);
1061}
1062
1063static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1064 struct rbd_obj_request *obj_request)
1065{
1066 rbd_obj_request_get(obj_request);
1067 obj_request->img_request = img_request;
1068 list_add_tail(&obj_request->links, &img_request->obj_requests);
1069 obj_request->which = img_request->obj_request_count++;
1070 rbd_assert(obj_request->which != BAD_WHICH);
1071}
1072
1073static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1074 struct rbd_obj_request *obj_request)
1075{
1076 rbd_assert(obj_request->which != BAD_WHICH);
1077 obj_request->which = BAD_WHICH;
1078 list_del(&obj_request->links);
1079 rbd_assert(obj_request->img_request == img_request);
1080 obj_request->callback = NULL;
1081 obj_request->img_request = NULL;
1082 rbd_obj_request_put(obj_request);
1083}
1084
1085static bool obj_request_type_valid(enum obj_request_type type)
1086{
1087 switch (type) {
9969ebc5 1088 case OBJ_REQUEST_NODATA:
bf0d5f50 1089 case OBJ_REQUEST_BIO:
788e2df3 1090 case OBJ_REQUEST_PAGES:
bf0d5f50
AE
1091 return true;
1092 default:
1093 return false;
1094 }
1095}
1096
8d23bf29
AE
1097struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...)
1098{
1099 struct ceph_osd_req_op *op;
1100 va_list args;
2647ba38 1101 size_t size;
8d23bf29
AE
1102
1103 op = kzalloc(sizeof (*op), GFP_NOIO);
1104 if (!op)
1105 return NULL;
1106 op->op = opcode;
1107 va_start(args, opcode);
1108 switch (opcode) {
1109 case CEPH_OSD_OP_READ:
1110 case CEPH_OSD_OP_WRITE:
1111 /* rbd_osd_req_op_create(READ, offset, length) */
1112 /* rbd_osd_req_op_create(WRITE, offset, length) */
1113 op->extent.offset = va_arg(args, u64);
1114 op->extent.length = va_arg(args, u64);
1115 if (opcode == CEPH_OSD_OP_WRITE)
1116 op->payload_len = op->extent.length;
1117 break;
2647ba38
AE
1118 case CEPH_OSD_OP_CALL:
1119 /* rbd_osd_req_op_create(CALL, class, method, data, datalen) */
1120 op->cls.class_name = va_arg(args, char *);
1121 size = strlen(op->cls.class_name);
1122 rbd_assert(size <= (size_t) U8_MAX);
1123 op->cls.class_len = size;
1124 op->payload_len = size;
1125
1126 op->cls.method_name = va_arg(args, char *);
1127 size = strlen(op->cls.method_name);
1128 rbd_assert(size <= (size_t) U8_MAX);
1129 op->cls.method_len = size;
1130 op->payload_len += size;
1131
1132 op->cls.argc = 0;
1133 op->cls.indata = va_arg(args, void *);
1134 size = va_arg(args, size_t);
1135 rbd_assert(size <= (size_t) U32_MAX);
1136 op->cls.indata_len = (u32) size;
1137 op->payload_len += size;
1138 break;
5efea49a
AE
1139 case CEPH_OSD_OP_NOTIFY_ACK:
1140 case CEPH_OSD_OP_WATCH:
1141 /* rbd_osd_req_op_create(NOTIFY_ACK, cookie, version) */
1142 /* rbd_osd_req_op_create(WATCH, cookie, version, flag) */
1143 op->watch.cookie = va_arg(args, u64);
1144 op->watch.ver = va_arg(args, u64);
1145 op->watch.ver = cpu_to_le64(op->watch.ver);
1146 if (opcode == CEPH_OSD_OP_WATCH && va_arg(args, int))
1147 op->watch.flag = (u8) 1;
1148 break;
8d23bf29
AE
1149 default:
1150 rbd_warn(NULL, "unsupported opcode %hu\n", opcode);
1151 kfree(op);
1152 op = NULL;
1153 break;
1154 }
1155 va_end(args);
1156
1157 return op;
1158}
1159
1160static void rbd_osd_req_op_destroy(struct ceph_osd_req_op *op)
1161{
1162 kfree(op);
1163}
1164
602adf40
YS
1165/*
1166 * Send ceph osd request
1167 */
1168static int rbd_do_request(struct request *rq,
0ce1a794 1169 struct rbd_device *rbd_dev,
602adf40
YS
1170 struct ceph_snap_context *snapc,
1171 u64 snapid,
aded07ea 1172 const char *object_name, u64 ofs, u64 len,
602adf40
YS
1173 struct bio *bio,
1174 struct page **pages,
1175 int num_pages,
1176 int flags,
30573d68 1177 struct ceph_osd_req_op *op,
5f29ddd4
AE
1178 void (*rbd_cb)(struct ceph_osd_request *,
1179 struct ceph_msg *),
59c2be1e 1180 u64 *ver)
602adf40 1181{
2e53c6c3 1182 struct ceph_osd_client *osdc;
5f29ddd4 1183 struct ceph_osd_request *osd_req;
602adf40 1184 struct timespec mtime = CURRENT_TIME;
2e53c6c3 1185 int ret;
602adf40 1186
7d250b94 1187 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n",
f7760dad 1188 object_name, (unsigned long long) ofs,
7d250b94 1189 (unsigned long long) len);
602adf40 1190
0ce1a794 1191 osdc = &rbd_dev->rbd_client->client->osdc;
30573d68 1192 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_NOIO);
2e53c6c3
AE
1193 if (!osd_req)
1194 return -ENOMEM;
602adf40 1195
d178a9e7 1196 osd_req->r_flags = flags;
54a54007
AE
1197 osd_req->r_pages = pages;
1198 if (bio) {
1199 osd_req->r_bio = bio;
1200 bio_get(osd_req->r_bio);
1201 }
602adf40 1202
2e53c6c3 1203 osd_req->r_callback = rbd_cb;
7d250b94 1204 osd_req->r_priv = NULL;
602adf40 1205
5f29ddd4
AE
1206 strncpy(osd_req->r_oid, object_name, sizeof(osd_req->r_oid));
1207 osd_req->r_oid_len = strlen(osd_req->r_oid);
602adf40 1208
0903e875 1209 osd_req->r_file_layout = rbd_dev->layout; /* struct */
e01e7927
AE
1210 osd_req->r_num_pages = calc_pages_for(ofs, len);
1211 osd_req->r_page_alignment = ofs & ~PAGE_MASK;
602adf40 1212
30573d68 1213 ceph_osdc_build_request(osd_req, ofs, len, 1, op,
ae7ca4a3 1214 snapc, snapid, &mtime);
602adf40 1215
8b84de79 1216 if (op->op == CEPH_OSD_OP_WATCH && op->watch.flag) {
5f29ddd4 1217 ceph_osdc_set_request_linger(osdc, osd_req);
8b84de79 1218 rbd_dev->watch_request = osd_req;
59c2be1e
YS
1219 }
1220
5f29ddd4 1221 ret = ceph_osdc_start_request(osdc, osd_req, false);
602adf40
YS
1222 if (ret < 0)
1223 goto done_err;
1224
1225 if (!rbd_cb) {
5f29ddd4
AE
1226 u64 version;
1227
1228 ret = ceph_osdc_wait_request(osdc, osd_req);
1229 version = le64_to_cpu(osd_req->r_reassert_version.version);
59c2be1e 1230 if (ver)
5f29ddd4
AE
1231 *ver = version;
1232 dout("reassert_ver=%llu\n", (unsigned long long) version);
1233 ceph_osdc_put_request(osd_req);
602adf40
YS
1234 }
1235 return ret;
1236
1237done_err:
2e53c6c3
AE
1238 if (bio)
1239 bio_chain_put(osd_req->r_bio);
2e53c6c3
AE
1240 ceph_osdc_put_request(osd_req);
1241
602adf40
YS
1242 return ret;
1243}
1244
602adf40
YS
1245/*
1246 * Do a synchronous ceph osd operation
1247 */
0ce1a794 1248static int rbd_req_sync_op(struct rbd_device *rbd_dev,
602adf40 1249 int flags,
30573d68 1250 struct ceph_osd_req_op *op,
aded07ea 1251 const char *object_name,
f8d4de6e
AE
1252 u64 ofs, u64 inbound_size,
1253 char *inbound,
59c2be1e 1254 u64 *ver)
602adf40
YS
1255{
1256 int ret;
1257 struct page **pages;
1258 int num_pages;
913d2fdc 1259
30573d68 1260 rbd_assert(op != NULL);
602adf40 1261
f8d4de6e 1262 num_pages = calc_pages_for(ofs, inbound_size);
602adf40 1263 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
b8d0638a
DC
1264 if (IS_ERR(pages))
1265 return PTR_ERR(pages);
602adf40 1266
25704ac9 1267 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
f8d4de6e 1268 object_name, ofs, inbound_size, NULL,
602adf40
YS
1269 pages, num_pages,
1270 flags,
30573d68 1271 op,
59c2be1e 1272 NULL,
8b84de79 1273 ver);
602adf40 1274 if (ret < 0)
913d2fdc 1275 goto done;
602adf40 1276
f8d4de6e
AE
1277 if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1278 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
602adf40 1279
602adf40
YS
1280done:
1281 ceph_release_page_vector(pages, num_pages);
1282 return ret;
1283}
1284
bf0d5f50
AE
1285static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1286 struct rbd_obj_request *obj_request)
1287{
1288 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1289}
1290
1291static void rbd_img_request_complete(struct rbd_img_request *img_request)
1292{
1293 if (img_request->callback)
1294 img_request->callback(img_request);
1295 else
1296 rbd_img_request_put(img_request);
1297}
1298
788e2df3
AE
1299/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1300
1301static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1302{
1303 return wait_for_completion_interruptible(&obj_request->completion);
1304}
1305
9969ebc5
AE
1306static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request,
1307 struct ceph_osd_op *op)
1308{
1309 atomic_set(&obj_request->done, 1);
1310}
1311
bf0d5f50
AE
1312static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1313{
1314 if (obj_request->callback)
1315 obj_request->callback(obj_request);
788e2df3
AE
1316 else
1317 complete_all(&obj_request->completion);
bf0d5f50
AE
1318}
1319
602adf40 1320/*
3cb4a687 1321 * Synchronous osd object method call
602adf40 1322 */
0ce1a794 1323static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
aded07ea
AE
1324 const char *object_name,
1325 const char *class_name,
1326 const char *method_name,
3cb4a687
AE
1327 const char *outbound,
1328 size_t outbound_size,
f8d4de6e
AE
1329 char *inbound,
1330 size_t inbound_size,
59c2be1e 1331 u64 *ver)
602adf40 1332{
139b4318 1333 struct ceph_osd_req_op *op;
57cfc106
AE
1334 int ret;
1335
3cb4a687
AE
1336 /*
1337 * Any input parameters required by the method we're calling
1338 * will be sent along with the class and method names as
1339 * part of the message payload. That data and its size are
1340 * supplied via the indata and indata_len fields (named from
1341 * the perspective of the server side) in the OSD request
1342 * operation.
1343 */
2647ba38
AE
1344 op = rbd_osd_req_op_create(CEPH_OSD_OP_CALL, class_name,
1345 method_name, outbound, outbound_size);
139b4318 1346 if (!op)
57cfc106 1347 return -ENOMEM;
602adf40 1348
30573d68 1349 ret = rbd_req_sync_op(rbd_dev, CEPH_OSD_FLAG_READ, op,
f8d4de6e 1350 object_name, 0, inbound_size, inbound,
8b84de79 1351 ver);
602adf40 1352
2647ba38 1353 rbd_osd_req_op_destroy(op);
602adf40
YS
1354
1355 dout("cls_exec returned %d\n", ret);
1356 return ret;
1357}
1358
bf0d5f50
AE
1359static void rbd_osd_read_callback(struct rbd_obj_request *obj_request,
1360 struct ceph_osd_op *op)
1361{
1362 u64 xferred;
1363
1364 /*
1365 * We support a 64-bit length, but ultimately it has to be
1366 * passed to blk_end_request(), which takes an unsigned int.
1367 */
1368 xferred = le64_to_cpu(op->extent.length);
1369 rbd_assert(xferred < (u64) UINT_MAX);
1370 if (obj_request->result == (s32) -ENOENT) {
1371 zero_bio_chain(obj_request->bio_list, 0);
1372 obj_request->result = 0;
1373 } else if (xferred < obj_request->length && !obj_request->result) {
1374 zero_bio_chain(obj_request->bio_list, xferred);
1375 xferred = obj_request->length;
1376 }
1377 obj_request->xferred = xferred;
1378 atomic_set(&obj_request->done, 1);
1379}
1380
1381static void rbd_osd_write_callback(struct rbd_obj_request *obj_request,
1382 struct ceph_osd_op *op)
1383{
1384 obj_request->xferred = le64_to_cpu(op->extent.length);
1385 atomic_set(&obj_request->done, 1);
1386}
1387
1388static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1389 struct ceph_msg *msg)
1390{
1391 struct rbd_obj_request *obj_request = osd_req->r_priv;
1392 struct ceph_osd_reply_head *reply_head;
1393 struct ceph_osd_op *op;
1394 u32 num_ops;
1395 u16 opcode;
1396
1397 rbd_assert(osd_req == obj_request->osd_req);
1398 rbd_assert(!!obj_request->img_request ^
1399 (obj_request->which == BAD_WHICH));
1400
1401 obj_request->xferred = le32_to_cpu(msg->hdr.data_len);
1402 reply_head = msg->front.iov_base;
1403 obj_request->result = (s32) le32_to_cpu(reply_head->result);
1404 obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1405
1406 num_ops = le32_to_cpu(reply_head->num_ops);
1407 WARN_ON(num_ops != 1); /* For now */
1408
1409 op = &reply_head->ops[0];
1410 opcode = le16_to_cpu(op->op);
1411 switch (opcode) {
1412 case CEPH_OSD_OP_READ:
1413 rbd_osd_read_callback(obj_request, op);
1414 break;
1415 case CEPH_OSD_OP_WRITE:
1416 rbd_osd_write_callback(obj_request, op);
1417 break;
b8d70035 1418 case CEPH_OSD_OP_NOTIFY_ACK:
9969ebc5
AE
1419 case CEPH_OSD_OP_WATCH:
1420 rbd_osd_trivial_callback(obj_request, op);
1421 break;
bf0d5f50
AE
1422 default:
1423 rbd_warn(NULL, "%s: unsupported op %hu\n",
1424 obj_request->object_name, (unsigned short) opcode);
1425 break;
1426 }
1427
1428 if (atomic_read(&obj_request->done))
1429 rbd_obj_request_complete(obj_request);
1430}
1431
1432static struct ceph_osd_request *rbd_osd_req_create(
1433 struct rbd_device *rbd_dev,
1434 bool write_request,
1435 struct rbd_obj_request *obj_request,
1436 struct ceph_osd_req_op *op)
1437{
1438 struct rbd_img_request *img_request = obj_request->img_request;
1439 struct ceph_snap_context *snapc = NULL;
1440 struct ceph_osd_client *osdc;
1441 struct ceph_osd_request *osd_req;
1442 struct timespec now;
1443 struct timespec *mtime;
1444 u64 snap_id = CEPH_NOSNAP;
1445 u64 offset = obj_request->offset;
1446 u64 length = obj_request->length;
1447
1448 if (img_request) {
1449 rbd_assert(img_request->write_request == write_request);
1450 if (img_request->write_request)
1451 snapc = img_request->snapc;
1452 else
1453 snap_id = img_request->snap_id;
1454 }
1455
1456 /* Allocate and initialize the request, for the single op */
1457
1458 osdc = &rbd_dev->rbd_client->client->osdc;
1459 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1460 if (!osd_req)
1461 return NULL; /* ENOMEM */
1462
1463 rbd_assert(obj_request_type_valid(obj_request->type));
1464 switch (obj_request->type) {
9969ebc5
AE
1465 case OBJ_REQUEST_NODATA:
1466 break; /* Nothing to do */
bf0d5f50
AE
1467 case OBJ_REQUEST_BIO:
1468 rbd_assert(obj_request->bio_list != NULL);
1469 osd_req->r_bio = obj_request->bio_list;
1470 bio_get(osd_req->r_bio);
1471 /* osd client requires "num pages" even for bio */
1472 osd_req->r_num_pages = calc_pages_for(offset, length);
1473 break;
788e2df3
AE
1474 case OBJ_REQUEST_PAGES:
1475 osd_req->r_pages = obj_request->pages;
1476 osd_req->r_num_pages = obj_request->page_count;
1477 osd_req->r_page_alignment = offset & ~PAGE_MASK;
1478 break;
bf0d5f50
AE
1479 }
1480
1481 if (write_request) {
1482 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1483 now = CURRENT_TIME;
1484 mtime = &now;
1485 } else {
1486 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1487 mtime = NULL; /* not needed for reads */
1488 offset = 0; /* These are not used... */
1489 length = 0; /* ...for osd read requests */
1490 }
1491
1492 osd_req->r_callback = rbd_osd_req_callback;
1493 osd_req->r_priv = obj_request;
1494
1495 osd_req->r_oid_len = strlen(obj_request->object_name);
1496 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1497 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1498
1499 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1500
1501 /* osd_req will get its own reference to snapc (if non-null) */
1502
1503 ceph_osdc_build_request(osd_req, offset, length, 1, op,
1504 snapc, snap_id, mtime);
1505
1506 return osd_req;
1507}
1508
1509static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1510{
1511 ceph_osdc_put_request(osd_req);
1512}
1513
1514/* object_name is assumed to be a non-null pointer and NUL-terminated */
1515
1516static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1517 u64 offset, u64 length,
1518 enum obj_request_type type)
1519{
1520 struct rbd_obj_request *obj_request;
1521 size_t size;
1522 char *name;
1523
1524 rbd_assert(obj_request_type_valid(type));
1525
1526 size = strlen(object_name) + 1;
1527 obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1528 if (!obj_request)
1529 return NULL;
1530
1531 name = (char *)(obj_request + 1);
1532 obj_request->object_name = memcpy(name, object_name, size);
1533 obj_request->offset = offset;
1534 obj_request->length = length;
1535 obj_request->which = BAD_WHICH;
1536 obj_request->type = type;
1537 INIT_LIST_HEAD(&obj_request->links);
1538 atomic_set(&obj_request->done, 0);
788e2df3 1539 init_completion(&obj_request->completion);
bf0d5f50
AE
1540 kref_init(&obj_request->kref);
1541
1542 return obj_request;
1543}
1544
1545static void rbd_obj_request_destroy(struct kref *kref)
1546{
1547 struct rbd_obj_request *obj_request;
1548
1549 obj_request = container_of(kref, struct rbd_obj_request, kref);
1550
1551 rbd_assert(obj_request->img_request == NULL);
1552 rbd_assert(obj_request->which == BAD_WHICH);
1553
1554 if (obj_request->osd_req)
1555 rbd_osd_req_destroy(obj_request->osd_req);
1556
1557 rbd_assert(obj_request_type_valid(obj_request->type));
1558 switch (obj_request->type) {
9969ebc5
AE
1559 case OBJ_REQUEST_NODATA:
1560 break; /* Nothing to do */
bf0d5f50
AE
1561 case OBJ_REQUEST_BIO:
1562 if (obj_request->bio_list)
1563 bio_chain_put(obj_request->bio_list);
1564 break;
788e2df3
AE
1565 case OBJ_REQUEST_PAGES:
1566 if (obj_request->pages)
1567 ceph_release_page_vector(obj_request->pages,
1568 obj_request->page_count);
1569 break;
bf0d5f50
AE
1570 }
1571
1572 kfree(obj_request);
1573}
1574
1575/*
1576 * Caller is responsible for filling in the list of object requests
1577 * that comprises the image request, and the Linux request pointer
1578 * (if there is one).
1579 */
1580struct rbd_img_request *rbd_img_request_create(struct rbd_device *rbd_dev,
1581 u64 offset, u64 length,
1582 bool write_request)
1583{
1584 struct rbd_img_request *img_request;
1585 struct ceph_snap_context *snapc = NULL;
1586
1587 img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1588 if (!img_request)
1589 return NULL;
1590
1591 if (write_request) {
1592 down_read(&rbd_dev->header_rwsem);
1593 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1594 up_read(&rbd_dev->header_rwsem);
1595 if (WARN_ON(!snapc)) {
1596 kfree(img_request);
1597 return NULL; /* Shouldn't happen */
1598 }
1599 }
1600
1601 img_request->rq = NULL;
1602 img_request->rbd_dev = rbd_dev;
1603 img_request->offset = offset;
1604 img_request->length = length;
1605 img_request->write_request = write_request;
1606 if (write_request)
1607 img_request->snapc = snapc;
1608 else
1609 img_request->snap_id = rbd_dev->spec->snap_id;
1610 spin_lock_init(&img_request->completion_lock);
1611 img_request->next_completion = 0;
1612 img_request->callback = NULL;
1613 img_request->obj_request_count = 0;
1614 INIT_LIST_HEAD(&img_request->obj_requests);
1615 kref_init(&img_request->kref);
1616
1617 rbd_img_request_get(img_request); /* Avoid a warning */
1618 rbd_img_request_put(img_request); /* TEMPORARY */
1619
1620 return img_request;
1621}
1622
1623static void rbd_img_request_destroy(struct kref *kref)
1624{
1625 struct rbd_img_request *img_request;
1626 struct rbd_obj_request *obj_request;
1627 struct rbd_obj_request *next_obj_request;
1628
1629 img_request = container_of(kref, struct rbd_img_request, kref);
1630
1631 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1632 rbd_img_obj_request_del(img_request, obj_request);
1633
1634 if (img_request->write_request)
1635 ceph_put_snap_context(img_request->snapc);
1636
1637 kfree(img_request);
1638}
1639
1640static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
1641 struct bio *bio_list)
1642{
1643 struct rbd_device *rbd_dev = img_request->rbd_dev;
1644 struct rbd_obj_request *obj_request = NULL;
1645 struct rbd_obj_request *next_obj_request;
1646 unsigned int bio_offset;
1647 u64 image_offset;
1648 u64 resid;
1649 u16 opcode;
1650
1651 opcode = img_request->write_request ? CEPH_OSD_OP_WRITE
1652 : CEPH_OSD_OP_READ;
1653 bio_offset = 0;
1654 image_offset = img_request->offset;
1655 rbd_assert(image_offset == bio_list->bi_sector << SECTOR_SHIFT);
1656 resid = img_request->length;
1657 while (resid) {
1658 const char *object_name;
1659 unsigned int clone_size;
1660 struct ceph_osd_req_op *op;
1661 u64 offset;
1662 u64 length;
1663
1664 object_name = rbd_segment_name(rbd_dev, image_offset);
1665 if (!object_name)
1666 goto out_unwind;
1667 offset = rbd_segment_offset(rbd_dev, image_offset);
1668 length = rbd_segment_length(rbd_dev, image_offset, resid);
1669 obj_request = rbd_obj_request_create(object_name,
1670 offset, length,
1671 OBJ_REQUEST_BIO);
1672 kfree(object_name); /* object request has its own copy */
1673 if (!obj_request)
1674 goto out_unwind;
1675
1676 rbd_assert(length <= (u64) UINT_MAX);
1677 clone_size = (unsigned int) length;
1678 obj_request->bio_list = bio_chain_clone_range(&bio_list,
1679 &bio_offset, clone_size,
1680 GFP_ATOMIC);
1681 if (!obj_request->bio_list)
1682 goto out_partial;
1683
1684 /*
1685 * Build up the op to use in building the osd
1686 * request. Note that the contents of the op are
1687 * copied by rbd_osd_req_create().
1688 */
1689 op = rbd_osd_req_op_create(opcode, offset, length);
1690 if (!op)
1691 goto out_partial;
1692 obj_request->osd_req = rbd_osd_req_create(rbd_dev,
1693 img_request->write_request,
1694 obj_request, op);
1695 rbd_osd_req_op_destroy(op);
1696 if (!obj_request->osd_req)
1697 goto out_partial;
1698 /* status and version are initially zero-filled */
1699
1700 rbd_img_obj_request_add(img_request, obj_request);
1701
1702 image_offset += length;
1703 resid -= length;
1704 }
1705
1706 return 0;
1707
1708out_partial:
1709 rbd_obj_request_put(obj_request);
1710out_unwind:
1711 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1712 rbd_obj_request_put(obj_request);
1713
1714 return -ENOMEM;
1715}
1716
1717static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1718{
1719 struct rbd_img_request *img_request;
1720 u32 which = obj_request->which;
1721 bool more = true;
1722
1723 img_request = obj_request->img_request;
1724 rbd_assert(img_request != NULL);
1725 rbd_assert(img_request->rq != NULL);
1726 rbd_assert(which != BAD_WHICH);
1727 rbd_assert(which < img_request->obj_request_count);
1728 rbd_assert(which >= img_request->next_completion);
1729
1730 spin_lock_irq(&img_request->completion_lock);
1731 if (which != img_request->next_completion)
1732 goto out;
1733
1734 for_each_obj_request_from(img_request, obj_request) {
1735 unsigned int xferred;
1736 int result;
1737
1738 rbd_assert(more);
1739 rbd_assert(which < img_request->obj_request_count);
1740
1741 if (!atomic_read(&obj_request->done))
1742 break;
1743
1744 rbd_assert(obj_request->xferred <= (u64) UINT_MAX);
1745 xferred = (unsigned int) obj_request->xferred;
1746 result = (int) obj_request->result;
1747 if (result)
1748 rbd_warn(NULL, "obj_request %s result %d xferred %u\n",
1749 img_request->write_request ? "write" : "read",
1750 result, xferred);
1751
1752 more = blk_end_request(img_request->rq, result, xferred);
1753 which++;
1754 }
1755 rbd_assert(more ^ (which == img_request->obj_request_count));
1756 img_request->next_completion = which;
1757out:
1758 spin_unlock_irq(&img_request->completion_lock);
1759
1760 if (!more)
1761 rbd_img_request_complete(img_request);
1762}
1763
1764static int rbd_img_request_submit(struct rbd_img_request *img_request)
1765{
1766 struct rbd_device *rbd_dev = img_request->rbd_dev;
1767 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1768 struct rbd_obj_request *obj_request;
1769
1770 for_each_obj_request(img_request, obj_request) {
1771 int ret;
1772
1773 obj_request->callback = rbd_img_obj_callback;
1774 ret = rbd_obj_request_submit(osdc, obj_request);
1775 if (ret)
1776 return ret;
1777 /*
1778 * The image request has its own reference to each
1779 * of its object requests, so we can safely drop the
1780 * initial one here.
1781 */
1782 rbd_obj_request_put(obj_request);
1783 }
1784
1785 return 0;
1786}
1787
cf81b60e 1788static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
b8d70035
AE
1789 u64 ver, u64 notify_id)
1790{
1791 struct rbd_obj_request *obj_request;
1792 struct ceph_osd_req_op *op;
1793 struct ceph_osd_client *osdc;
1794 int ret;
1795
1796 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1797 OBJ_REQUEST_NODATA);
1798 if (!obj_request)
1799 return -ENOMEM;
1800
1801 ret = -ENOMEM;
1802 op = rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver);
1803 if (!op)
1804 goto out;
1805 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
1806 obj_request, op);
1807 rbd_osd_req_op_destroy(op);
1808 if (!obj_request->osd_req)
1809 goto out;
1810
1811 osdc = &rbd_dev->rbd_client->client->osdc;
cf81b60e 1812 obj_request->callback = rbd_obj_request_put;
b8d70035 1813 ret = rbd_obj_request_submit(osdc, obj_request);
b8d70035 1814out:
cf81b60e
AE
1815 if (ret)
1816 rbd_obj_request_put(obj_request);
b8d70035
AE
1817
1818 return ret;
1819}
1820
1821static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1822{
1823 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1824 u64 hver;
1825 int rc;
1826
1827 if (!rbd_dev)
1828 return;
1829
1830 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1831 rbd_dev->header_name, (unsigned long long) notify_id,
1832 (unsigned int) opcode);
1833 rc = rbd_dev_refresh(rbd_dev, &hver);
1834 if (rc)
1835 rbd_warn(rbd_dev, "got notification but failed to "
1836 " update snaps: %d\n", rc);
1837
cf81b60e 1838 rbd_obj_notify_ack(rbd_dev, hver, notify_id);
b8d70035
AE
1839}
1840
9969ebc5
AE
1841/*
1842 * Request sync osd watch/unwatch. The value of "start" determines
1843 * whether a watch request is being initiated or torn down.
1844 */
1845static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
1846{
1847 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1848 struct rbd_obj_request *obj_request;
1849 struct ceph_osd_req_op *op;
1850 int ret;
1851
1852 rbd_assert(start ^ !!rbd_dev->watch_event);
1853 rbd_assert(start ^ !!rbd_dev->watch_request);
1854
1855 if (start) {
1856 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0, rbd_dev,
1857 &rbd_dev->watch_event);
1858 if (ret < 0)
1859 return ret;
1860 }
1861
1862 ret = -ENOMEM;
1863 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1864 OBJ_REQUEST_NODATA);
1865 if (!obj_request)
1866 goto out_cancel;
1867
1868 op = rbd_osd_req_op_create(CEPH_OSD_OP_WATCH,
1869 rbd_dev->watch_event->cookie,
1870 rbd_dev->header.obj_version, start);
1871 if (!op)
1872 goto out_cancel;
1873 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true,
1874 obj_request, op);
1875 rbd_osd_req_op_destroy(op);
1876 if (!obj_request->osd_req)
1877 goto out_cancel;
1878
1879 if (start) {
1880 rbd_dev->watch_request = obj_request->osd_req;
1881 ceph_osdc_set_request_linger(osdc, rbd_dev->watch_request);
1882 }
1883 ret = rbd_obj_request_submit(osdc, obj_request);
1884 if (ret)
1885 goto out_cancel;
1886 ret = rbd_obj_request_wait(obj_request);
1887 if (ret)
1888 goto out_cancel;
1889
1890 ret = obj_request->result;
1891 if (ret)
1892 goto out_cancel;
1893
1894 if (start)
1895 goto done; /* Done if setting up the watch request */
1896out_cancel:
1897 /* Cancel the event if we're tearing down, or on error */
1898 ceph_osdc_cancel_event(rbd_dev->watch_event);
1899 rbd_dev->watch_event = NULL;
1900done:
1901 if (obj_request)
1902 rbd_obj_request_put(obj_request);
1903
1904 return ret;
1905}
1906
bf0d5f50
AE
1907static void rbd_request_fn(struct request_queue *q)
1908{
1909 struct rbd_device *rbd_dev = q->queuedata;
1910 bool read_only = rbd_dev->mapping.read_only;
1911 struct request *rq;
1912 int result;
1913
1914 while ((rq = blk_fetch_request(q))) {
1915 bool write_request = rq_data_dir(rq) == WRITE;
1916 struct rbd_img_request *img_request;
1917 u64 offset;
1918 u64 length;
1919
1920 /* Ignore any non-FS requests that filter through. */
1921
1922 if (rq->cmd_type != REQ_TYPE_FS) {
1923 __blk_end_request_all(rq, 0);
1924 continue;
1925 }
1926
1927 spin_unlock_irq(q->queue_lock);
1928
1929 /* Disallow writes to a read-only device */
1930
1931 if (write_request) {
1932 result = -EROFS;
1933 if (read_only)
1934 goto end_request;
1935 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
1936 }
1937
1938 /* Quit early if the snapshot has disappeared */
1939
1940 if (!atomic_read(&rbd_dev->exists)) {
1941 dout("request for non-existent snapshot");
1942 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
1943 result = -ENXIO;
1944 goto end_request;
1945 }
1946
1947 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
1948 length = (u64) blk_rq_bytes(rq);
1949
1950 result = -EINVAL;
1951 if (WARN_ON(offset && length > U64_MAX - offset + 1))
1952 goto end_request; /* Shouldn't happen */
1953
1954 result = -ENOMEM;
1955 img_request = rbd_img_request_create(rbd_dev, offset, length,
1956 write_request);
1957 if (!img_request)
1958 goto end_request;
1959
1960 img_request->rq = rq;
1961
1962 result = rbd_img_request_fill_bio(img_request, rq->bio);
1963 if (!result)
1964 result = rbd_img_request_submit(img_request);
1965 if (result)
1966 rbd_img_request_put(img_request);
1967end_request:
1968 spin_lock_irq(q->queue_lock);
1969 if (result < 0) {
1970 rbd_warn(rbd_dev, "obj_request %s result %d\n",
1971 write_request ? "write" : "read", result);
1972 __blk_end_request_all(rq, result);
1973 }
1974 }
1975}
1976
602adf40
YS
1977/*
1978 * a queue callback. Makes sure that we don't create a bio that spans across
1979 * multiple osd objects. One exception would be with a single page bios,
f7760dad 1980 * which we handle later at bio_chain_clone_range()
602adf40
YS
1981 */
1982static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1983 struct bio_vec *bvec)
1984{
1985 struct rbd_device *rbd_dev = q->queuedata;
e5cfeed2
AE
1986 sector_t sector_offset;
1987 sector_t sectors_per_obj;
1988 sector_t obj_sector_offset;
1989 int ret;
1990
1991 /*
1992 * Find how far into its rbd object the partition-relative
1993 * bio start sector is to offset relative to the enclosing
1994 * device.
1995 */
1996 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
1997 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1998 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
1999
2000 /*
2001 * Compute the number of bytes from that offset to the end
2002 * of the object. Account for what's already used by the bio.
2003 */
2004 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2005 if (ret > bmd->bi_size)
2006 ret -= bmd->bi_size;
2007 else
2008 ret = 0;
2009
2010 /*
2011 * Don't send back more than was asked for. And if the bio
2012 * was empty, let the whole thing through because: "Note
2013 * that a block device *must* allow a single page to be
2014 * added to an empty bio."
2015 */
2016 rbd_assert(bvec->bv_len <= PAGE_SIZE);
2017 if (ret > (int) bvec->bv_len || !bmd->bi_size)
2018 ret = (int) bvec->bv_len;
2019
2020 return ret;
602adf40
YS
2021}
2022
2023static void rbd_free_disk(struct rbd_device *rbd_dev)
2024{
2025 struct gendisk *disk = rbd_dev->disk;
2026
2027 if (!disk)
2028 return;
2029
602adf40
YS
2030 if (disk->flags & GENHD_FL_UP)
2031 del_gendisk(disk);
2032 if (disk->queue)
2033 blk_cleanup_queue(disk->queue);
2034 put_disk(disk);
2035}
2036
788e2df3
AE
2037static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2038 const char *object_name,
2039 u64 offset, u64 length,
2040 char *buf, u64 *version)
2041
2042{
2043 struct ceph_osd_req_op *op;
2044 struct rbd_obj_request *obj_request;
2045 struct ceph_osd_client *osdc;
2046 struct page **pages = NULL;
2047 u32 page_count;
2048 int ret;
2049
2050 page_count = (u32) calc_pages_for(offset, length);
2051 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2052 if (IS_ERR(pages))
2053 ret = PTR_ERR(pages);
2054
2055 ret = -ENOMEM;
2056 obj_request = rbd_obj_request_create(object_name, offset, length,
2057 OBJ_REQUEST_PAGES);
2058 if (!obj_request)
2059 goto out;
2060
2061 obj_request->pages = pages;
2062 obj_request->page_count = page_count;
2063
2064 op = rbd_osd_req_op_create(CEPH_OSD_OP_READ, offset, length);
2065 if (!op)
2066 goto out;
2067 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2068 obj_request, op);
2069 rbd_osd_req_op_destroy(op);
2070 if (!obj_request->osd_req)
2071 goto out;
2072
2073 osdc = &rbd_dev->rbd_client->client->osdc;
2074 ret = rbd_obj_request_submit(osdc, obj_request);
2075 if (ret)
2076 goto out;
2077 ret = rbd_obj_request_wait(obj_request);
2078 if (ret)
2079 goto out;
2080
2081 ret = obj_request->result;
2082 if (ret < 0)
2083 goto out;
2084 ret = ceph_copy_from_page_vector(pages, buf, 0, obj_request->xferred);
2085 if (version)
2086 *version = obj_request->version;
2087out:
2088 if (obj_request)
2089 rbd_obj_request_put(obj_request);
2090 else
2091 ceph_release_page_vector(pages, page_count);
2092
2093 return ret;
2094}
2095
602adf40 2096/*
4156d998
AE
2097 * Read the complete header for the given rbd device.
2098 *
2099 * Returns a pointer to a dynamically-allocated buffer containing
2100 * the complete and validated header. Caller can pass the address
2101 * of a variable that will be filled in with the version of the
2102 * header object at the time it was read.
2103 *
2104 * Returns a pointer-coded errno if a failure occurs.
602adf40 2105 */
4156d998
AE
2106static struct rbd_image_header_ondisk *
2107rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
602adf40 2108{
4156d998 2109 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 2110 u32 snap_count = 0;
4156d998
AE
2111 u64 names_size = 0;
2112 u32 want_count;
2113 int ret;
602adf40 2114
00f1f36f 2115 /*
4156d998
AE
2116 * The complete header will include an array of its 64-bit
2117 * snapshot ids, followed by the names of those snapshots as
2118 * a contiguous block of NUL-terminated strings. Note that
2119 * the number of snapshots could change by the time we read
2120 * it in, in which case we re-read it.
00f1f36f 2121 */
4156d998
AE
2122 do {
2123 size_t size;
2124
2125 kfree(ondisk);
2126
2127 size = sizeof (*ondisk);
2128 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2129 size += names_size;
2130 ondisk = kmalloc(size, GFP_KERNEL);
2131 if (!ondisk)
2132 return ERR_PTR(-ENOMEM);
2133
788e2df3 2134 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
4156d998
AE
2135 0, size,
2136 (char *) ondisk, version);
2137
2138 if (ret < 0)
2139 goto out_err;
2140 if (WARN_ON((size_t) ret < size)) {
2141 ret = -ENXIO;
06ecc6cb
AE
2142 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2143 size, ret);
4156d998
AE
2144 goto out_err;
2145 }
2146 if (!rbd_dev_ondisk_valid(ondisk)) {
2147 ret = -ENXIO;
06ecc6cb 2148 rbd_warn(rbd_dev, "invalid header");
4156d998 2149 goto out_err;
81e759fb 2150 }
602adf40 2151
4156d998
AE
2152 names_size = le64_to_cpu(ondisk->snap_names_len);
2153 want_count = snap_count;
2154 snap_count = le32_to_cpu(ondisk->snap_count);
2155 } while (snap_count != want_count);
00f1f36f 2156
4156d998 2157 return ondisk;
00f1f36f 2158
4156d998
AE
2159out_err:
2160 kfree(ondisk);
2161
2162 return ERR_PTR(ret);
2163}
2164
2165/*
2166 * reload the ondisk the header
2167 */
2168static int rbd_read_header(struct rbd_device *rbd_dev,
2169 struct rbd_image_header *header)
2170{
2171 struct rbd_image_header_ondisk *ondisk;
2172 u64 ver = 0;
2173 int ret;
602adf40 2174
4156d998
AE
2175 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2176 if (IS_ERR(ondisk))
2177 return PTR_ERR(ondisk);
2178 ret = rbd_header_from_disk(header, ondisk);
2179 if (ret >= 0)
2180 header->obj_version = ver;
2181 kfree(ondisk);
2182
2183 return ret;
602adf40
YS
2184}
2185
41f38c2b 2186static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
dfc5606d
YS
2187{
2188 struct rbd_snap *snap;
a0593290 2189 struct rbd_snap *next;
dfc5606d 2190
a0593290 2191 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
41f38c2b 2192 rbd_remove_snap_dev(snap);
dfc5606d
YS
2193}
2194
9478554a
AE
2195static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2196{
2197 sector_t size;
2198
0d7dbfce 2199 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
9478554a
AE
2200 return;
2201
2202 size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
2203 dout("setting size to %llu sectors", (unsigned long long) size);
2204 rbd_dev->mapping.size = (u64) size;
2205 set_capacity(rbd_dev->disk, size);
2206}
2207
602adf40
YS
2208/*
2209 * only read the first part of the ondisk header, without the snaps info
2210 */
117973fb 2211static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
602adf40
YS
2212{
2213 int ret;
2214 struct rbd_image_header h;
602adf40
YS
2215
2216 ret = rbd_read_header(rbd_dev, &h);
2217 if (ret < 0)
2218 return ret;
2219
a51aa0c0
JD
2220 down_write(&rbd_dev->header_rwsem);
2221
9478554a
AE
2222 /* Update image size, and check for resize of mapped image */
2223 rbd_dev->header.image_size = h.image_size;
2224 rbd_update_mapping_size(rbd_dev);
9db4b3e3 2225
849b4260 2226 /* rbd_dev->header.object_prefix shouldn't change */
602adf40 2227 kfree(rbd_dev->header.snap_sizes);
849b4260 2228 kfree(rbd_dev->header.snap_names);
d1d25646
JD
2229 /* osd requests may still refer to snapc */
2230 ceph_put_snap_context(rbd_dev->header.snapc);
602adf40 2231
b813623a
AE
2232 if (hver)
2233 *hver = h.obj_version;
a71b891b 2234 rbd_dev->header.obj_version = h.obj_version;
93a24e08 2235 rbd_dev->header.image_size = h.image_size;
602adf40
YS
2236 rbd_dev->header.snapc = h.snapc;
2237 rbd_dev->header.snap_names = h.snap_names;
2238 rbd_dev->header.snap_sizes = h.snap_sizes;
849b4260
AE
2239 /* Free the extra copy of the object prefix */
2240 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
2241 kfree(h.object_prefix);
2242
304f6808
AE
2243 ret = rbd_dev_snaps_update(rbd_dev);
2244 if (!ret)
2245 ret = rbd_dev_snaps_register(rbd_dev);
dfc5606d 2246
c666601a 2247 up_write(&rbd_dev->header_rwsem);
602adf40 2248
dfc5606d 2249 return ret;
602adf40
YS
2250}
2251
117973fb 2252static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1fe5e993
AE
2253{
2254 int ret;
2255
117973fb 2256 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1fe5e993 2257 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
117973fb
AE
2258 if (rbd_dev->image_format == 1)
2259 ret = rbd_dev_v1_refresh(rbd_dev, hver);
2260 else
2261 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1fe5e993
AE
2262 mutex_unlock(&ctl_mutex);
2263
2264 return ret;
2265}
2266
602adf40
YS
2267static int rbd_init_disk(struct rbd_device *rbd_dev)
2268{
2269 struct gendisk *disk;
2270 struct request_queue *q;
593a9e7b 2271 u64 segment_size;
602adf40 2272
602adf40 2273 /* create gendisk info */
602adf40
YS
2274 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
2275 if (!disk)
1fcdb8aa 2276 return -ENOMEM;
602adf40 2277
f0f8cef5 2278 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 2279 rbd_dev->dev_id);
602adf40
YS
2280 disk->major = rbd_dev->major;
2281 disk->first_minor = 0;
2282 disk->fops = &rbd_bd_ops;
2283 disk->private_data = rbd_dev;
2284
bf0d5f50 2285 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
602adf40
YS
2286 if (!q)
2287 goto out_disk;
029bcbd8 2288
593a9e7b
AE
2289 /* We use the default size, but let's be explicit about it. */
2290 blk_queue_physical_block_size(q, SECTOR_SIZE);
2291
029bcbd8 2292 /* set io sizes to object size */
593a9e7b
AE
2293 segment_size = rbd_obj_bytes(&rbd_dev->header);
2294 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
2295 blk_queue_max_segment_size(q, segment_size);
2296 blk_queue_io_min(q, segment_size);
2297 blk_queue_io_opt(q, segment_size);
029bcbd8 2298
602adf40
YS
2299 blk_queue_merge_bvec(q, rbd_merge_bvec);
2300 disk->queue = q;
2301
2302 q->queuedata = rbd_dev;
2303
2304 rbd_dev->disk = disk;
602adf40 2305
12f02944
AE
2306 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
2307
602adf40 2308 return 0;
602adf40
YS
2309out_disk:
2310 put_disk(disk);
1fcdb8aa
AE
2311
2312 return -ENOMEM;
602adf40
YS
2313}
2314
dfc5606d
YS
2315/*
2316 sysfs
2317*/
2318
593a9e7b
AE
2319static struct rbd_device *dev_to_rbd_dev(struct device *dev)
2320{
2321 return container_of(dev, struct rbd_device, dev);
2322}
2323
dfc5606d
YS
2324static ssize_t rbd_size_show(struct device *dev,
2325 struct device_attribute *attr, char *buf)
2326{
593a9e7b 2327 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0
JD
2328 sector_t size;
2329
2330 down_read(&rbd_dev->header_rwsem);
2331 size = get_capacity(rbd_dev->disk);
2332 up_read(&rbd_dev->header_rwsem);
dfc5606d 2333
a51aa0c0 2334 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
dfc5606d
YS
2335}
2336
34b13184
AE
2337/*
2338 * Note this shows the features for whatever's mapped, which is not
2339 * necessarily the base image.
2340 */
2341static ssize_t rbd_features_show(struct device *dev,
2342 struct device_attribute *attr, char *buf)
2343{
2344 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2345
2346 return sprintf(buf, "0x%016llx\n",
2347 (unsigned long long) rbd_dev->mapping.features);
2348}
2349
dfc5606d
YS
2350static ssize_t rbd_major_show(struct device *dev,
2351 struct device_attribute *attr, char *buf)
2352{
593a9e7b 2353 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 2354
dfc5606d
YS
2355 return sprintf(buf, "%d\n", rbd_dev->major);
2356}
2357
2358static ssize_t rbd_client_id_show(struct device *dev,
2359 struct device_attribute *attr, char *buf)
602adf40 2360{
593a9e7b 2361 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 2362
1dbb4399
AE
2363 return sprintf(buf, "client%lld\n",
2364 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
2365}
2366
dfc5606d
YS
2367static ssize_t rbd_pool_show(struct device *dev,
2368 struct device_attribute *attr, char *buf)
602adf40 2369{
593a9e7b 2370 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 2371
0d7dbfce 2372 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
2373}
2374
9bb2f334
AE
2375static ssize_t rbd_pool_id_show(struct device *dev,
2376 struct device_attribute *attr, char *buf)
2377{
2378 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2379
0d7dbfce
AE
2380 return sprintf(buf, "%llu\n",
2381 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
2382}
2383
dfc5606d
YS
2384static ssize_t rbd_name_show(struct device *dev,
2385 struct device_attribute *attr, char *buf)
2386{
593a9e7b 2387 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 2388
a92ffdf8
AE
2389 if (rbd_dev->spec->image_name)
2390 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2391
2392 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
2393}
2394
589d30e0
AE
2395static ssize_t rbd_image_id_show(struct device *dev,
2396 struct device_attribute *attr, char *buf)
2397{
2398 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2399
0d7dbfce 2400 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
2401}
2402
34b13184
AE
2403/*
2404 * Shows the name of the currently-mapped snapshot (or
2405 * RBD_SNAP_HEAD_NAME for the base image).
2406 */
dfc5606d
YS
2407static ssize_t rbd_snap_show(struct device *dev,
2408 struct device_attribute *attr,
2409 char *buf)
2410{
593a9e7b 2411 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 2412
0d7dbfce 2413 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
2414}
2415
86b00e0d
AE
2416/*
2417 * For an rbd v2 image, shows the pool id, image id, and snapshot id
2418 * for the parent image. If there is no parent, simply shows
2419 * "(no parent image)".
2420 */
2421static ssize_t rbd_parent_show(struct device *dev,
2422 struct device_attribute *attr,
2423 char *buf)
2424{
2425 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2426 struct rbd_spec *spec = rbd_dev->parent_spec;
2427 int count;
2428 char *bufp = buf;
2429
2430 if (!spec)
2431 return sprintf(buf, "(no parent image)\n");
2432
2433 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2434 (unsigned long long) spec->pool_id, spec->pool_name);
2435 if (count < 0)
2436 return count;
2437 bufp += count;
2438
2439 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2440 spec->image_name ? spec->image_name : "(unknown)");
2441 if (count < 0)
2442 return count;
2443 bufp += count;
2444
2445 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2446 (unsigned long long) spec->snap_id, spec->snap_name);
2447 if (count < 0)
2448 return count;
2449 bufp += count;
2450
2451 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2452 if (count < 0)
2453 return count;
2454 bufp += count;
2455
2456 return (ssize_t) (bufp - buf);
2457}
2458
dfc5606d
YS
2459static ssize_t rbd_image_refresh(struct device *dev,
2460 struct device_attribute *attr,
2461 const char *buf,
2462 size_t size)
2463{
593a9e7b 2464 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 2465 int ret;
602adf40 2466
117973fb 2467 ret = rbd_dev_refresh(rbd_dev, NULL);
b813623a
AE
2468
2469 return ret < 0 ? ret : size;
dfc5606d 2470}
602adf40 2471
dfc5606d 2472static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 2473static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d
YS
2474static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2475static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2476static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 2477static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 2478static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 2479static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
2480static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2481static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
86b00e0d 2482static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
2483
2484static struct attribute *rbd_attrs[] = {
2485 &dev_attr_size.attr,
34b13184 2486 &dev_attr_features.attr,
dfc5606d
YS
2487 &dev_attr_major.attr,
2488 &dev_attr_client_id.attr,
2489 &dev_attr_pool.attr,
9bb2f334 2490 &dev_attr_pool_id.attr,
dfc5606d 2491 &dev_attr_name.attr,
589d30e0 2492 &dev_attr_image_id.attr,
dfc5606d 2493 &dev_attr_current_snap.attr,
86b00e0d 2494 &dev_attr_parent.attr,
dfc5606d 2495 &dev_attr_refresh.attr,
dfc5606d
YS
2496 NULL
2497};
2498
2499static struct attribute_group rbd_attr_group = {
2500 .attrs = rbd_attrs,
2501};
2502
2503static const struct attribute_group *rbd_attr_groups[] = {
2504 &rbd_attr_group,
2505 NULL
2506};
2507
2508static void rbd_sysfs_dev_release(struct device *dev)
2509{
2510}
2511
2512static struct device_type rbd_device_type = {
2513 .name = "rbd",
2514 .groups = rbd_attr_groups,
2515 .release = rbd_sysfs_dev_release,
2516};
2517
2518
2519/*
2520 sysfs - snapshots
2521*/
2522
2523static ssize_t rbd_snap_size_show(struct device *dev,
2524 struct device_attribute *attr,
2525 char *buf)
2526{
2527 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2528
3591538f 2529 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
dfc5606d
YS
2530}
2531
2532static ssize_t rbd_snap_id_show(struct device *dev,
2533 struct device_attribute *attr,
2534 char *buf)
2535{
2536 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2537
3591538f 2538 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
dfc5606d
YS
2539}
2540
34b13184
AE
2541static ssize_t rbd_snap_features_show(struct device *dev,
2542 struct device_attribute *attr,
2543 char *buf)
2544{
2545 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2546
2547 return sprintf(buf, "0x%016llx\n",
2548 (unsigned long long) snap->features);
2549}
2550
dfc5606d
YS
2551static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2552static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
34b13184 2553static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
dfc5606d
YS
2554
2555static struct attribute *rbd_snap_attrs[] = {
2556 &dev_attr_snap_size.attr,
2557 &dev_attr_snap_id.attr,
34b13184 2558 &dev_attr_snap_features.attr,
dfc5606d
YS
2559 NULL,
2560};
2561
2562static struct attribute_group rbd_snap_attr_group = {
2563 .attrs = rbd_snap_attrs,
2564};
2565
2566static void rbd_snap_dev_release(struct device *dev)
2567{
2568 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2569 kfree(snap->name);
2570 kfree(snap);
2571}
2572
2573static const struct attribute_group *rbd_snap_attr_groups[] = {
2574 &rbd_snap_attr_group,
2575 NULL
2576};
2577
2578static struct device_type rbd_snap_device_type = {
2579 .groups = rbd_snap_attr_groups,
2580 .release = rbd_snap_dev_release,
2581};
2582
8b8fb99c
AE
2583static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2584{
2585 kref_get(&spec->kref);
2586
2587 return spec;
2588}
2589
2590static void rbd_spec_free(struct kref *kref);
2591static void rbd_spec_put(struct rbd_spec *spec)
2592{
2593 if (spec)
2594 kref_put(&spec->kref, rbd_spec_free);
2595}
2596
2597static struct rbd_spec *rbd_spec_alloc(void)
2598{
2599 struct rbd_spec *spec;
2600
2601 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2602 if (!spec)
2603 return NULL;
2604 kref_init(&spec->kref);
2605
2606 rbd_spec_put(rbd_spec_get(spec)); /* TEMPORARY */
2607
2608 return spec;
2609}
2610
2611static void rbd_spec_free(struct kref *kref)
2612{
2613 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2614
2615 kfree(spec->pool_name);
2616 kfree(spec->image_id);
2617 kfree(spec->image_name);
2618 kfree(spec->snap_name);
2619 kfree(spec);
2620}
2621
c53d5893
AE
2622struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2623 struct rbd_spec *spec)
2624{
2625 struct rbd_device *rbd_dev;
2626
2627 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2628 if (!rbd_dev)
2629 return NULL;
2630
2631 spin_lock_init(&rbd_dev->lock);
d78b650a 2632 atomic_set(&rbd_dev->exists, 0);
c53d5893
AE
2633 INIT_LIST_HEAD(&rbd_dev->node);
2634 INIT_LIST_HEAD(&rbd_dev->snaps);
2635 init_rwsem(&rbd_dev->header_rwsem);
2636
2637 rbd_dev->spec = spec;
2638 rbd_dev->rbd_client = rbdc;
2639
0903e875
AE
2640 /* Initialize the layout used for all rbd requests */
2641
2642 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2643 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
2644 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2645 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
2646
c53d5893
AE
2647 return rbd_dev;
2648}
2649
2650static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2651{
86b00e0d 2652 rbd_spec_put(rbd_dev->parent_spec);
c53d5893
AE
2653 kfree(rbd_dev->header_name);
2654 rbd_put_client(rbd_dev->rbd_client);
2655 rbd_spec_put(rbd_dev->spec);
2656 kfree(rbd_dev);
2657}
2658
304f6808
AE
2659static bool rbd_snap_registered(struct rbd_snap *snap)
2660{
2661 bool ret = snap->dev.type == &rbd_snap_device_type;
2662 bool reg = device_is_registered(&snap->dev);
2663
2664 rbd_assert(!ret ^ reg);
2665
2666 return ret;
2667}
2668
41f38c2b 2669static void rbd_remove_snap_dev(struct rbd_snap *snap)
dfc5606d
YS
2670{
2671 list_del(&snap->node);
304f6808
AE
2672 if (device_is_registered(&snap->dev))
2673 device_unregister(&snap->dev);
dfc5606d
YS
2674}
2675
14e7085d 2676static int rbd_register_snap_dev(struct rbd_snap *snap,
dfc5606d
YS
2677 struct device *parent)
2678{
2679 struct device *dev = &snap->dev;
2680 int ret;
2681
2682 dev->type = &rbd_snap_device_type;
2683 dev->parent = parent;
2684 dev->release = rbd_snap_dev_release;
d4b125e9 2685 dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
304f6808
AE
2686 dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2687
dfc5606d
YS
2688 ret = device_register(dev);
2689
2690 return ret;
2691}
2692
4e891e0a 2693static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
c8d18425 2694 const char *snap_name,
34b13184
AE
2695 u64 snap_id, u64 snap_size,
2696 u64 snap_features)
dfc5606d 2697{
4e891e0a 2698 struct rbd_snap *snap;
dfc5606d 2699 int ret;
4e891e0a
AE
2700
2701 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
dfc5606d 2702 if (!snap)
4e891e0a
AE
2703 return ERR_PTR(-ENOMEM);
2704
2705 ret = -ENOMEM;
c8d18425 2706 snap->name = kstrdup(snap_name, GFP_KERNEL);
4e891e0a
AE
2707 if (!snap->name)
2708 goto err;
2709
c8d18425
AE
2710 snap->id = snap_id;
2711 snap->size = snap_size;
34b13184 2712 snap->features = snap_features;
4e891e0a
AE
2713
2714 return snap;
2715
dfc5606d
YS
2716err:
2717 kfree(snap->name);
2718 kfree(snap);
4e891e0a
AE
2719
2720 return ERR_PTR(ret);
dfc5606d
YS
2721}
2722
cd892126
AE
2723static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2724 u64 *snap_size, u64 *snap_features)
2725{
2726 char *snap_name;
2727
2728 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2729
2730 *snap_size = rbd_dev->header.snap_sizes[which];
2731 *snap_features = 0; /* No features for v1 */
2732
2733 /* Skip over names until we find the one we are looking for */
2734
2735 snap_name = rbd_dev->header.snap_names;
2736 while (which--)
2737 snap_name += strlen(snap_name) + 1;
2738
2739 return snap_name;
2740}
2741
9d475de5
AE
2742/*
2743 * Get the size and object order for an image snapshot, or if
2744 * snap_id is CEPH_NOSNAP, gets this information for the base
2745 * image.
2746 */
2747static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2748 u8 *order, u64 *snap_size)
2749{
2750 __le64 snapid = cpu_to_le64(snap_id);
2751 int ret;
2752 struct {
2753 u8 order;
2754 __le64 size;
2755 } __attribute__ ((packed)) size_buf = { 0 };
2756
2757 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2758 "rbd", "get_size",
2759 (char *) &snapid, sizeof (snapid),
07b2391f 2760 (char *) &size_buf, sizeof (size_buf), NULL);
9d475de5
AE
2761 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2762 if (ret < 0)
2763 return ret;
2764
2765 *order = size_buf.order;
2766 *snap_size = le64_to_cpu(size_buf.size);
2767
2768 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
2769 (unsigned long long) snap_id, (unsigned int) *order,
2770 (unsigned long long) *snap_size);
2771
2772 return 0;
2773}
2774
2775static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2776{
2777 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2778 &rbd_dev->header.obj_order,
2779 &rbd_dev->header.image_size);
2780}
2781
1e130199
AE
2782static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2783{
2784 void *reply_buf;
2785 int ret;
2786 void *p;
2787
2788 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2789 if (!reply_buf)
2790 return -ENOMEM;
2791
2792 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2793 "rbd", "get_object_prefix",
2794 NULL, 0,
07b2391f 2795 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
1e130199
AE
2796 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2797 if (ret < 0)
2798 goto out;
a0ea3a40 2799 ret = 0; /* rbd_req_sync_exec() can return positive */
1e130199
AE
2800
2801 p = reply_buf;
2802 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2803 p + RBD_OBJ_PREFIX_LEN_MAX,
2804 NULL, GFP_NOIO);
2805
2806 if (IS_ERR(rbd_dev->header.object_prefix)) {
2807 ret = PTR_ERR(rbd_dev->header.object_prefix);
2808 rbd_dev->header.object_prefix = NULL;
2809 } else {
2810 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
2811 }
2812
2813out:
2814 kfree(reply_buf);
2815
2816 return ret;
2817}
2818
b1b5402a
AE
2819static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2820 u64 *snap_features)
2821{
2822 __le64 snapid = cpu_to_le64(snap_id);
2823 struct {
2824 __le64 features;
2825 __le64 incompat;
2826 } features_buf = { 0 };
d889140c 2827 u64 incompat;
b1b5402a
AE
2828 int ret;
2829
2830 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2831 "rbd", "get_features",
2832 (char *) &snapid, sizeof (snapid),
2833 (char *) &features_buf, sizeof (features_buf),
07b2391f 2834 NULL);
b1b5402a
AE
2835 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2836 if (ret < 0)
2837 return ret;
d889140c
AE
2838
2839 incompat = le64_to_cpu(features_buf.incompat);
2840 if (incompat & ~RBD_FEATURES_ALL)
b8f5c6ed 2841 return -ENXIO;
d889140c 2842
b1b5402a
AE
2843 *snap_features = le64_to_cpu(features_buf.features);
2844
2845 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2846 (unsigned long long) snap_id,
2847 (unsigned long long) *snap_features,
2848 (unsigned long long) le64_to_cpu(features_buf.incompat));
2849
2850 return 0;
2851}
2852
2853static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2854{
2855 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2856 &rbd_dev->header.features);
2857}
2858
86b00e0d
AE
2859static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2860{
2861 struct rbd_spec *parent_spec;
2862 size_t size;
2863 void *reply_buf = NULL;
2864 __le64 snapid;
2865 void *p;
2866 void *end;
2867 char *image_id;
2868 u64 overlap;
86b00e0d
AE
2869 int ret;
2870
2871 parent_spec = rbd_spec_alloc();
2872 if (!parent_spec)
2873 return -ENOMEM;
2874
2875 size = sizeof (__le64) + /* pool_id */
2876 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
2877 sizeof (__le64) + /* snap_id */
2878 sizeof (__le64); /* overlap */
2879 reply_buf = kmalloc(size, GFP_KERNEL);
2880 if (!reply_buf) {
2881 ret = -ENOMEM;
2882 goto out_err;
2883 }
2884
2885 snapid = cpu_to_le64(CEPH_NOSNAP);
2886 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2887 "rbd", "get_parent",
2888 (char *) &snapid, sizeof (snapid),
07b2391f 2889 (char *) reply_buf, size, NULL);
86b00e0d
AE
2890 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2891 if (ret < 0)
2892 goto out_err;
2893
2894 ret = -ERANGE;
2895 p = reply_buf;
2896 end = (char *) reply_buf + size;
2897 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
2898 if (parent_spec->pool_id == CEPH_NOPOOL)
2899 goto out; /* No parent? No problem. */
2900
0903e875
AE
2901 /* The ceph file layout needs to fit pool id in 32 bits */
2902
2903 ret = -EIO;
2904 if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
2905 goto out;
2906
979ed480 2907 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
2908 if (IS_ERR(image_id)) {
2909 ret = PTR_ERR(image_id);
2910 goto out_err;
2911 }
2912 parent_spec->image_id = image_id;
2913 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
2914 ceph_decode_64_safe(&p, end, overlap, out_err);
2915
2916 rbd_dev->parent_overlap = overlap;
2917 rbd_dev->parent_spec = parent_spec;
2918 parent_spec = NULL; /* rbd_dev now owns this */
2919out:
2920 ret = 0;
2921out_err:
2922 kfree(reply_buf);
2923 rbd_spec_put(parent_spec);
2924
2925 return ret;
2926}
2927
9e15b77d
AE
2928static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
2929{
2930 size_t image_id_size;
2931 char *image_id;
2932 void *p;
2933 void *end;
2934 size_t size;
2935 void *reply_buf = NULL;
2936 size_t len = 0;
2937 char *image_name = NULL;
2938 int ret;
2939
2940 rbd_assert(!rbd_dev->spec->image_name);
2941
69e7a02f
AE
2942 len = strlen(rbd_dev->spec->image_id);
2943 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
2944 image_id = kmalloc(image_id_size, GFP_KERNEL);
2945 if (!image_id)
2946 return NULL;
2947
2948 p = image_id;
2949 end = (char *) image_id + image_id_size;
69e7a02f 2950 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
9e15b77d
AE
2951
2952 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
2953 reply_buf = kmalloc(size, GFP_KERNEL);
2954 if (!reply_buf)
2955 goto out;
2956
2957 ret = rbd_req_sync_exec(rbd_dev, RBD_DIRECTORY,
2958 "rbd", "dir_get_name",
2959 image_id, image_id_size,
07b2391f 2960 (char *) reply_buf, size, NULL);
9e15b77d
AE
2961 if (ret < 0)
2962 goto out;
2963 p = reply_buf;
2964 end = (char *) reply_buf + size;
2965 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
2966 if (IS_ERR(image_name))
2967 image_name = NULL;
2968 else
2969 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
2970out:
2971 kfree(reply_buf);
2972 kfree(image_id);
2973
2974 return image_name;
2975}
2976
2977/*
2978 * When a parent image gets probed, we only have the pool, image,
2979 * and snapshot ids but not the names of any of them. This call
2980 * is made later to fill in those names. It has to be done after
2981 * rbd_dev_snaps_update() has completed because some of the
2982 * information (in particular, snapshot name) is not available
2983 * until then.
2984 */
2985static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
2986{
2987 struct ceph_osd_client *osdc;
2988 const char *name;
2989 void *reply_buf = NULL;
2990 int ret;
2991
2992 if (rbd_dev->spec->pool_name)
2993 return 0; /* Already have the names */
2994
2995 /* Look up the pool name */
2996
2997 osdc = &rbd_dev->rbd_client->client->osdc;
2998 name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
935dc89f
AE
2999 if (!name) {
3000 rbd_warn(rbd_dev, "there is no pool with id %llu",
3001 rbd_dev->spec->pool_id); /* Really a BUG() */
3002 return -EIO;
3003 }
9e15b77d
AE
3004
3005 rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3006 if (!rbd_dev->spec->pool_name)
3007 return -ENOMEM;
3008
3009 /* Fetch the image name; tolerate failure here */
3010
3011 name = rbd_dev_image_name(rbd_dev);
69e7a02f 3012 if (name)
9e15b77d 3013 rbd_dev->spec->image_name = (char *) name;
69e7a02f 3014 else
06ecc6cb 3015 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d
AE
3016
3017 /* Look up the snapshot name. */
3018
3019 name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3020 if (!name) {
935dc89f
AE
3021 rbd_warn(rbd_dev, "no snapshot with id %llu",
3022 rbd_dev->spec->snap_id); /* Really a BUG() */
9e15b77d
AE
3023 ret = -EIO;
3024 goto out_err;
3025 }
3026 rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3027 if(!rbd_dev->spec->snap_name)
3028 goto out_err;
3029
3030 return 0;
3031out_err:
3032 kfree(reply_buf);
3033 kfree(rbd_dev->spec->pool_name);
3034 rbd_dev->spec->pool_name = NULL;
3035
3036 return ret;
3037}
3038
6e14b1a6 3039static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
35d489f9
AE
3040{
3041 size_t size;
3042 int ret;
3043 void *reply_buf;
3044 void *p;
3045 void *end;
3046 u64 seq;
3047 u32 snap_count;
3048 struct ceph_snap_context *snapc;
3049 u32 i;
3050
3051 /*
3052 * We'll need room for the seq value (maximum snapshot id),
3053 * snapshot count, and array of that many snapshot ids.
3054 * For now we have a fixed upper limit on the number we're
3055 * prepared to receive.
3056 */
3057 size = sizeof (__le64) + sizeof (__le32) +
3058 RBD_MAX_SNAP_COUNT * sizeof (__le64);
3059 reply_buf = kzalloc(size, GFP_KERNEL);
3060 if (!reply_buf)
3061 return -ENOMEM;
3062
3063 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
3064 "rbd", "get_snapcontext",
3065 NULL, 0,
07b2391f 3066 reply_buf, size, ver);
35d489f9
AE
3067 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
3068 if (ret < 0)
3069 goto out;
3070
3071 ret = -ERANGE;
3072 p = reply_buf;
3073 end = (char *) reply_buf + size;
3074 ceph_decode_64_safe(&p, end, seq, out);
3075 ceph_decode_32_safe(&p, end, snap_count, out);
3076
3077 /*
3078 * Make sure the reported number of snapshot ids wouldn't go
3079 * beyond the end of our buffer. But before checking that,
3080 * make sure the computed size of the snapshot context we
3081 * allocate is representable in a size_t.
3082 */
3083 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3084 / sizeof (u64)) {
3085 ret = -EINVAL;
3086 goto out;
3087 }
3088 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3089 goto out;
3090
3091 size = sizeof (struct ceph_snap_context) +
3092 snap_count * sizeof (snapc->snaps[0]);
3093 snapc = kmalloc(size, GFP_KERNEL);
3094 if (!snapc) {
3095 ret = -ENOMEM;
3096 goto out;
3097 }
3098
3099 atomic_set(&snapc->nref, 1);
3100 snapc->seq = seq;
3101 snapc->num_snaps = snap_count;
3102 for (i = 0; i < snap_count; i++)
3103 snapc->snaps[i] = ceph_decode_64(&p);
3104
3105 rbd_dev->header.snapc = snapc;
3106
3107 dout(" snap context seq = %llu, snap_count = %u\n",
3108 (unsigned long long) seq, (unsigned int) snap_count);
3109
3110out:
3111 kfree(reply_buf);
3112
3113 return 0;
3114}
3115
b8b1e2db
AE
3116static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3117{
3118 size_t size;
3119 void *reply_buf;
3120 __le64 snap_id;
3121 int ret;
3122 void *p;
3123 void *end;
b8b1e2db
AE
3124 char *snap_name;
3125
3126 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3127 reply_buf = kmalloc(size, GFP_KERNEL);
3128 if (!reply_buf)
3129 return ERR_PTR(-ENOMEM);
3130
3131 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3132 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
3133 "rbd", "get_snapshot_name",
3134 (char *) &snap_id, sizeof (snap_id),
07b2391f 3135 reply_buf, size, NULL);
b8b1e2db
AE
3136 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
3137 if (ret < 0)
3138 goto out;
3139
3140 p = reply_buf;
3141 end = (char *) reply_buf + size;
e5c35534 3142 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
b8b1e2db
AE
3143 if (IS_ERR(snap_name)) {
3144 ret = PTR_ERR(snap_name);
3145 goto out;
3146 } else {
3147 dout(" snap_id 0x%016llx snap_name = %s\n",
3148 (unsigned long long) le64_to_cpu(snap_id), snap_name);
3149 }
3150 kfree(reply_buf);
3151
3152 return snap_name;
3153out:
3154 kfree(reply_buf);
3155
3156 return ERR_PTR(ret);
3157}
3158
3159static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3160 u64 *snap_size, u64 *snap_features)
3161{
e0b49868 3162 u64 snap_id;
b8b1e2db
AE
3163 u8 order;
3164 int ret;
3165
3166 snap_id = rbd_dev->header.snapc->snaps[which];
3167 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3168 if (ret)
3169 return ERR_PTR(ret);
3170 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3171 if (ret)
3172 return ERR_PTR(ret);
3173
3174 return rbd_dev_v2_snap_name(rbd_dev, which);
3175}
3176
3177static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3178 u64 *snap_size, u64 *snap_features)
3179{
3180 if (rbd_dev->image_format == 1)
3181 return rbd_dev_v1_snap_info(rbd_dev, which,
3182 snap_size, snap_features);
3183 if (rbd_dev->image_format == 2)
3184 return rbd_dev_v2_snap_info(rbd_dev, which,
3185 snap_size, snap_features);
3186 return ERR_PTR(-EINVAL);
3187}
3188
117973fb
AE
3189static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3190{
3191 int ret;
3192 __u8 obj_order;
3193
3194 down_write(&rbd_dev->header_rwsem);
3195
3196 /* Grab old order first, to see if it changes */
3197
3198 obj_order = rbd_dev->header.obj_order,
3199 ret = rbd_dev_v2_image_size(rbd_dev);
3200 if (ret)
3201 goto out;
3202 if (rbd_dev->header.obj_order != obj_order) {
3203 ret = -EIO;
3204 goto out;
3205 }
3206 rbd_update_mapping_size(rbd_dev);
3207
3208 ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3209 dout("rbd_dev_v2_snap_context returned %d\n", ret);
3210 if (ret)
3211 goto out;
3212 ret = rbd_dev_snaps_update(rbd_dev);
3213 dout("rbd_dev_snaps_update returned %d\n", ret);
3214 if (ret)
3215 goto out;
3216 ret = rbd_dev_snaps_register(rbd_dev);
3217 dout("rbd_dev_snaps_register returned %d\n", ret);
3218out:
3219 up_write(&rbd_dev->header_rwsem);
3220
3221 return ret;
3222}
3223
dfc5606d 3224/*
35938150
AE
3225 * Scan the rbd device's current snapshot list and compare it to the
3226 * newly-received snapshot context. Remove any existing snapshots
3227 * not present in the new snapshot context. Add a new snapshot for
3228 * any snaphots in the snapshot context not in the current list.
3229 * And verify there are no changes to snapshots we already know
3230 * about.
3231 *
3232 * Assumes the snapshots in the snapshot context are sorted by
3233 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
3234 * are also maintained in that order.)
dfc5606d 3235 */
304f6808 3236static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
dfc5606d 3237{
35938150
AE
3238 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3239 const u32 snap_count = snapc->num_snaps;
35938150
AE
3240 struct list_head *head = &rbd_dev->snaps;
3241 struct list_head *links = head->next;
3242 u32 index = 0;
dfc5606d 3243
9fcbb800 3244 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
35938150
AE
3245 while (index < snap_count || links != head) {
3246 u64 snap_id;
3247 struct rbd_snap *snap;
cd892126
AE
3248 char *snap_name;
3249 u64 snap_size = 0;
3250 u64 snap_features = 0;
dfc5606d 3251
35938150
AE
3252 snap_id = index < snap_count ? snapc->snaps[index]
3253 : CEPH_NOSNAP;
3254 snap = links != head ? list_entry(links, struct rbd_snap, node)
3255 : NULL;
aafb230e 3256 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
dfc5606d 3257
35938150
AE
3258 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
3259 struct list_head *next = links->next;
dfc5606d 3260
35938150 3261 /* Existing snapshot not in the new snap context */
dfc5606d 3262
0d7dbfce 3263 if (rbd_dev->spec->snap_id == snap->id)
d78b650a 3264 atomic_set(&rbd_dev->exists, 0);
41f38c2b 3265 rbd_remove_snap_dev(snap);
9fcbb800 3266 dout("%ssnap id %llu has been removed\n",
0d7dbfce
AE
3267 rbd_dev->spec->snap_id == snap->id ?
3268 "mapped " : "",
9fcbb800 3269 (unsigned long long) snap->id);
35938150
AE
3270
3271 /* Done with this list entry; advance */
3272
3273 links = next;
dfc5606d
YS
3274 continue;
3275 }
35938150 3276
b8b1e2db
AE
3277 snap_name = rbd_dev_snap_info(rbd_dev, index,
3278 &snap_size, &snap_features);
cd892126
AE
3279 if (IS_ERR(snap_name))
3280 return PTR_ERR(snap_name);
3281
9fcbb800
AE
3282 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
3283 (unsigned long long) snap_id);
35938150
AE
3284 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
3285 struct rbd_snap *new_snap;
3286
3287 /* We haven't seen this snapshot before */
3288
c8d18425 3289 new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
cd892126 3290 snap_id, snap_size, snap_features);
9fcbb800
AE
3291 if (IS_ERR(new_snap)) {
3292 int err = PTR_ERR(new_snap);
3293
3294 dout(" failed to add dev, error %d\n", err);
3295
3296 return err;
3297 }
35938150
AE
3298
3299 /* New goes before existing, or at end of list */
3300
9fcbb800 3301 dout(" added dev%s\n", snap ? "" : " at end\n");
35938150
AE
3302 if (snap)
3303 list_add_tail(&new_snap->node, &snap->node);
3304 else
523f3258 3305 list_add_tail(&new_snap->node, head);
35938150
AE
3306 } else {
3307 /* Already have this one */
3308
9fcbb800
AE
3309 dout(" already present\n");
3310
cd892126 3311 rbd_assert(snap->size == snap_size);
aafb230e 3312 rbd_assert(!strcmp(snap->name, snap_name));
cd892126 3313 rbd_assert(snap->features == snap_features);
35938150
AE
3314
3315 /* Done with this list entry; advance */
3316
3317 links = links->next;
dfc5606d 3318 }
35938150
AE
3319
3320 /* Advance to the next entry in the snapshot context */
3321
3322 index++;
dfc5606d 3323 }
9fcbb800 3324 dout("%s: done\n", __func__);
dfc5606d
YS
3325
3326 return 0;
3327}
3328
304f6808
AE
3329/*
3330 * Scan the list of snapshots and register the devices for any that
3331 * have not already been registered.
3332 */
3333static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
3334{
3335 struct rbd_snap *snap;
3336 int ret = 0;
3337
3338 dout("%s called\n", __func__);
86ff77bb
AE
3339 if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
3340 return -EIO;
304f6808
AE
3341
3342 list_for_each_entry(snap, &rbd_dev->snaps, node) {
3343 if (!rbd_snap_registered(snap)) {
3344 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
3345 if (ret < 0)
3346 break;
3347 }
3348 }
3349 dout("%s: returning %d\n", __func__, ret);
3350
3351 return ret;
3352}
3353
dfc5606d
YS
3354static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
3355{
dfc5606d 3356 struct device *dev;
cd789ab9 3357 int ret;
dfc5606d
YS
3358
3359 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
dfc5606d 3360
cd789ab9 3361 dev = &rbd_dev->dev;
dfc5606d
YS
3362 dev->bus = &rbd_bus_type;
3363 dev->type = &rbd_device_type;
3364 dev->parent = &rbd_root_dev;
3365 dev->release = rbd_dev_release;
de71a297 3366 dev_set_name(dev, "%d", rbd_dev->dev_id);
dfc5606d 3367 ret = device_register(dev);
dfc5606d 3368
dfc5606d 3369 mutex_unlock(&ctl_mutex);
cd789ab9 3370
dfc5606d 3371 return ret;
602adf40
YS
3372}
3373
dfc5606d
YS
3374static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
3375{
3376 device_unregister(&rbd_dev->dev);
3377}
3378
e2839308 3379static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
1ddbe94e
AE
3380
3381/*
499afd5b
AE
3382 * Get a unique rbd identifier for the given new rbd_dev, and add
3383 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 3384 */
e2839308 3385static void rbd_dev_id_get(struct rbd_device *rbd_dev)
b7f23c36 3386{
e2839308 3387 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
499afd5b
AE
3388
3389 spin_lock(&rbd_dev_list_lock);
3390 list_add_tail(&rbd_dev->node, &rbd_dev_list);
3391 spin_unlock(&rbd_dev_list_lock);
e2839308
AE
3392 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3393 (unsigned long long) rbd_dev->dev_id);
1ddbe94e 3394}
b7f23c36 3395
1ddbe94e 3396/*
499afd5b
AE
3397 * Remove an rbd_dev from the global list, and record that its
3398 * identifier is no longer in use.
1ddbe94e 3399 */
e2839308 3400static void rbd_dev_id_put(struct rbd_device *rbd_dev)
1ddbe94e 3401{
d184f6bf 3402 struct list_head *tmp;
de71a297 3403 int rbd_id = rbd_dev->dev_id;
d184f6bf
AE
3404 int max_id;
3405
aafb230e 3406 rbd_assert(rbd_id > 0);
499afd5b 3407
e2839308
AE
3408 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3409 (unsigned long long) rbd_dev->dev_id);
499afd5b
AE
3410 spin_lock(&rbd_dev_list_lock);
3411 list_del_init(&rbd_dev->node);
d184f6bf
AE
3412
3413 /*
3414 * If the id being "put" is not the current maximum, there
3415 * is nothing special we need to do.
3416 */
e2839308 3417 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
d184f6bf
AE
3418 spin_unlock(&rbd_dev_list_lock);
3419 return;
3420 }
3421
3422 /*
3423 * We need to update the current maximum id. Search the
3424 * list to find out what it is. We're more likely to find
3425 * the maximum at the end, so search the list backward.
3426 */
3427 max_id = 0;
3428 list_for_each_prev(tmp, &rbd_dev_list) {
3429 struct rbd_device *rbd_dev;
3430
3431 rbd_dev = list_entry(tmp, struct rbd_device, node);
b213e0b1
AE
3432 if (rbd_dev->dev_id > max_id)
3433 max_id = rbd_dev->dev_id;
d184f6bf 3434 }
499afd5b 3435 spin_unlock(&rbd_dev_list_lock);
b7f23c36 3436
1ddbe94e 3437 /*
e2839308 3438 * The max id could have been updated by rbd_dev_id_get(), in
d184f6bf
AE
3439 * which case it now accurately reflects the new maximum.
3440 * Be careful not to overwrite the maximum value in that
3441 * case.
1ddbe94e 3442 */
e2839308
AE
3443 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3444 dout(" max dev id has been reset\n");
b7f23c36
AE
3445}
3446
e28fff26
AE
3447/*
3448 * Skips over white space at *buf, and updates *buf to point to the
3449 * first found non-space character (if any). Returns the length of
593a9e7b
AE
3450 * the token (string of non-white space characters) found. Note
3451 * that *buf must be terminated with '\0'.
e28fff26
AE
3452 */
3453static inline size_t next_token(const char **buf)
3454{
3455 /*
3456 * These are the characters that produce nonzero for
3457 * isspace() in the "C" and "POSIX" locales.
3458 */
3459 const char *spaces = " \f\n\r\t\v";
3460
3461 *buf += strspn(*buf, spaces); /* Find start of token */
3462
3463 return strcspn(*buf, spaces); /* Return token length */
3464}
3465
3466/*
3467 * Finds the next token in *buf, and if the provided token buffer is
3468 * big enough, copies the found token into it. The result, if
593a9e7b
AE
3469 * copied, is guaranteed to be terminated with '\0'. Note that *buf
3470 * must be terminated with '\0' on entry.
e28fff26
AE
3471 *
3472 * Returns the length of the token found (not including the '\0').
3473 * Return value will be 0 if no token is found, and it will be >=
3474 * token_size if the token would not fit.
3475 *
593a9e7b 3476 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
3477 * found token. Note that this occurs even if the token buffer is
3478 * too small to hold it.
3479 */
3480static inline size_t copy_token(const char **buf,
3481 char *token,
3482 size_t token_size)
3483{
3484 size_t len;
3485
3486 len = next_token(buf);
3487 if (len < token_size) {
3488 memcpy(token, *buf, len);
3489 *(token + len) = '\0';
3490 }
3491 *buf += len;
3492
3493 return len;
3494}
3495
ea3352f4
AE
3496/*
3497 * Finds the next token in *buf, dynamically allocates a buffer big
3498 * enough to hold a copy of it, and copies the token into the new
3499 * buffer. The copy is guaranteed to be terminated with '\0'. Note
3500 * that a duplicate buffer is created even for a zero-length token.
3501 *
3502 * Returns a pointer to the newly-allocated duplicate, or a null
3503 * pointer if memory for the duplicate was not available. If
3504 * the lenp argument is a non-null pointer, the length of the token
3505 * (not including the '\0') is returned in *lenp.
3506 *
3507 * If successful, the *buf pointer will be updated to point beyond
3508 * the end of the found token.
3509 *
3510 * Note: uses GFP_KERNEL for allocation.
3511 */
3512static inline char *dup_token(const char **buf, size_t *lenp)
3513{
3514 char *dup;
3515 size_t len;
3516
3517 len = next_token(buf);
4caf35f9 3518 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
3519 if (!dup)
3520 return NULL;
ea3352f4
AE
3521 *(dup + len) = '\0';
3522 *buf += len;
3523
3524 if (lenp)
3525 *lenp = len;
3526
3527 return dup;
3528}
3529
a725f65e 3530/*
859c31df
AE
3531 * Parse the options provided for an "rbd add" (i.e., rbd image
3532 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
3533 * and the data written is passed here via a NUL-terminated buffer.
3534 * Returns 0 if successful or an error code otherwise.
d22f76e7 3535 *
859c31df
AE
3536 * The information extracted from these options is recorded in
3537 * the other parameters which return dynamically-allocated
3538 * structures:
3539 * ceph_opts
3540 * The address of a pointer that will refer to a ceph options
3541 * structure. Caller must release the returned pointer using
3542 * ceph_destroy_options() when it is no longer needed.
3543 * rbd_opts
3544 * Address of an rbd options pointer. Fully initialized by
3545 * this function; caller must release with kfree().
3546 * spec
3547 * Address of an rbd image specification pointer. Fully
3548 * initialized by this function based on parsed options.
3549 * Caller must release with rbd_spec_put().
3550 *
3551 * The options passed take this form:
3552 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3553 * where:
3554 * <mon_addrs>
3555 * A comma-separated list of one or more monitor addresses.
3556 * A monitor address is an ip address, optionally followed
3557 * by a port number (separated by a colon).
3558 * I.e.: ip1[:port1][,ip2[:port2]...]
3559 * <options>
3560 * A comma-separated list of ceph and/or rbd options.
3561 * <pool_name>
3562 * The name of the rados pool containing the rbd image.
3563 * <image_name>
3564 * The name of the image in that pool to map.
3565 * <snap_id>
3566 * An optional snapshot id. If provided, the mapping will
3567 * present data from the image at the time that snapshot was
3568 * created. The image head is used if no snapshot id is
3569 * provided. Snapshot mappings are always read-only.
a725f65e 3570 */
859c31df 3571static int rbd_add_parse_args(const char *buf,
dc79b113 3572 struct ceph_options **ceph_opts,
859c31df
AE
3573 struct rbd_options **opts,
3574 struct rbd_spec **rbd_spec)
e28fff26 3575{
d22f76e7 3576 size_t len;
859c31df 3577 char *options;
0ddebc0c
AE
3578 const char *mon_addrs;
3579 size_t mon_addrs_size;
859c31df 3580 struct rbd_spec *spec = NULL;
4e9afeba 3581 struct rbd_options *rbd_opts = NULL;
859c31df 3582 struct ceph_options *copts;
dc79b113 3583 int ret;
e28fff26
AE
3584
3585 /* The first four tokens are required */
3586
7ef3214a 3587 len = next_token(&buf);
4fb5d671
AE
3588 if (!len) {
3589 rbd_warn(NULL, "no monitor address(es) provided");
3590 return -EINVAL;
3591 }
0ddebc0c 3592 mon_addrs = buf;
f28e565a 3593 mon_addrs_size = len + 1;
7ef3214a 3594 buf += len;
a725f65e 3595
dc79b113 3596 ret = -EINVAL;
f28e565a
AE
3597 options = dup_token(&buf, NULL);
3598 if (!options)
dc79b113 3599 return -ENOMEM;
4fb5d671
AE
3600 if (!*options) {
3601 rbd_warn(NULL, "no options provided");
3602 goto out_err;
3603 }
e28fff26 3604
859c31df
AE
3605 spec = rbd_spec_alloc();
3606 if (!spec)
f28e565a 3607 goto out_mem;
859c31df
AE
3608
3609 spec->pool_name = dup_token(&buf, NULL);
3610 if (!spec->pool_name)
3611 goto out_mem;
4fb5d671
AE
3612 if (!*spec->pool_name) {
3613 rbd_warn(NULL, "no pool name provided");
3614 goto out_err;
3615 }
e28fff26 3616
69e7a02f 3617 spec->image_name = dup_token(&buf, NULL);
859c31df 3618 if (!spec->image_name)
f28e565a 3619 goto out_mem;
4fb5d671
AE
3620 if (!*spec->image_name) {
3621 rbd_warn(NULL, "no image name provided");
3622 goto out_err;
3623 }
d4b125e9 3624
f28e565a
AE
3625 /*
3626 * Snapshot name is optional; default is to use "-"
3627 * (indicating the head/no snapshot).
3628 */
3feeb894 3629 len = next_token(&buf);
820a5f3e 3630 if (!len) {
3feeb894
AE
3631 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3632 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 3633 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 3634 ret = -ENAMETOOLONG;
f28e565a 3635 goto out_err;
849b4260 3636 }
4caf35f9 3637 spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
859c31df 3638 if (!spec->snap_name)
f28e565a 3639 goto out_mem;
859c31df 3640 *(spec->snap_name + len) = '\0';
e5c35534 3641
0ddebc0c 3642 /* Initialize all rbd options to the defaults */
e28fff26 3643
4e9afeba
AE
3644 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3645 if (!rbd_opts)
3646 goto out_mem;
3647
3648 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
d22f76e7 3649
859c31df 3650 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 3651 mon_addrs + mon_addrs_size - 1,
4e9afeba 3652 parse_rbd_opts_token, rbd_opts);
859c31df
AE
3653 if (IS_ERR(copts)) {
3654 ret = PTR_ERR(copts);
dc79b113
AE
3655 goto out_err;
3656 }
859c31df
AE
3657 kfree(options);
3658
3659 *ceph_opts = copts;
4e9afeba 3660 *opts = rbd_opts;
859c31df 3661 *rbd_spec = spec;
0ddebc0c 3662
dc79b113 3663 return 0;
f28e565a 3664out_mem:
dc79b113 3665 ret = -ENOMEM;
d22f76e7 3666out_err:
859c31df
AE
3667 kfree(rbd_opts);
3668 rbd_spec_put(spec);
f28e565a 3669 kfree(options);
d22f76e7 3670
dc79b113 3671 return ret;
a725f65e
AE
3672}
3673
589d30e0
AE
3674/*
3675 * An rbd format 2 image has a unique identifier, distinct from the
3676 * name given to it by the user. Internally, that identifier is
3677 * what's used to specify the names of objects related to the image.
3678 *
3679 * A special "rbd id" object is used to map an rbd image name to its
3680 * id. If that object doesn't exist, then there is no v2 rbd image
3681 * with the supplied name.
3682 *
3683 * This function will record the given rbd_dev's image_id field if
3684 * it can be determined, and in that case will return 0. If any
3685 * errors occur a negative errno will be returned and the rbd_dev's
3686 * image_id field will be unchanged (and should be NULL).
3687 */
3688static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3689{
3690 int ret;
3691 size_t size;
3692 char *object_name;
3693 void *response;
3694 void *p;
3695
2c0d0a10
AE
3696 /*
3697 * When probing a parent image, the image id is already
3698 * known (and the image name likely is not). There's no
3699 * need to fetch the image id again in this case.
3700 */
3701 if (rbd_dev->spec->image_id)
3702 return 0;
3703
589d30e0
AE
3704 /*
3705 * First, see if the format 2 image id file exists, and if
3706 * so, get the image's persistent id from it.
3707 */
69e7a02f 3708 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
589d30e0
AE
3709 object_name = kmalloc(size, GFP_NOIO);
3710 if (!object_name)
3711 return -ENOMEM;
0d7dbfce 3712 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
589d30e0
AE
3713 dout("rbd id object name is %s\n", object_name);
3714
3715 /* Response will be an encoded string, which includes a length */
3716
3717 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3718 response = kzalloc(size, GFP_NOIO);
3719 if (!response) {
3720 ret = -ENOMEM;
3721 goto out;
3722 }
3723
3724 ret = rbd_req_sync_exec(rbd_dev, object_name,
3725 "rbd", "get_id",
3726 NULL, 0,
07b2391f 3727 response, RBD_IMAGE_ID_LEN_MAX, NULL);
589d30e0
AE
3728 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
3729 if (ret < 0)
3730 goto out;
a0ea3a40 3731 ret = 0; /* rbd_req_sync_exec() can return positive */
589d30e0
AE
3732
3733 p = response;
0d7dbfce 3734 rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
589d30e0 3735 p + RBD_IMAGE_ID_LEN_MAX,
979ed480 3736 NULL, GFP_NOIO);
0d7dbfce
AE
3737 if (IS_ERR(rbd_dev->spec->image_id)) {
3738 ret = PTR_ERR(rbd_dev->spec->image_id);
3739 rbd_dev->spec->image_id = NULL;
589d30e0 3740 } else {
0d7dbfce 3741 dout("image_id is %s\n", rbd_dev->spec->image_id);
589d30e0
AE
3742 }
3743out:
3744 kfree(response);
3745 kfree(object_name);
3746
3747 return ret;
3748}
3749
a30b71b9
AE
3750static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3751{
3752 int ret;
3753 size_t size;
3754
3755 /* Version 1 images have no id; empty string is used */
3756
0d7dbfce
AE
3757 rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3758 if (!rbd_dev->spec->image_id)
a30b71b9 3759 return -ENOMEM;
a30b71b9
AE
3760
3761 /* Record the header object name for this rbd image. */
3762
69e7a02f 3763 size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
a30b71b9
AE
3764 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3765 if (!rbd_dev->header_name) {
3766 ret = -ENOMEM;
3767 goto out_err;
3768 }
0d7dbfce
AE
3769 sprintf(rbd_dev->header_name, "%s%s",
3770 rbd_dev->spec->image_name, RBD_SUFFIX);
a30b71b9
AE
3771
3772 /* Populate rbd image metadata */
3773
3774 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3775 if (ret < 0)
3776 goto out_err;
86b00e0d
AE
3777
3778 /* Version 1 images have no parent (no layering) */
3779
3780 rbd_dev->parent_spec = NULL;
3781 rbd_dev->parent_overlap = 0;
3782
a30b71b9
AE
3783 rbd_dev->image_format = 1;
3784
3785 dout("discovered version 1 image, header name is %s\n",
3786 rbd_dev->header_name);
3787
3788 return 0;
3789
3790out_err:
3791 kfree(rbd_dev->header_name);
3792 rbd_dev->header_name = NULL;
0d7dbfce
AE
3793 kfree(rbd_dev->spec->image_id);
3794 rbd_dev->spec->image_id = NULL;
a30b71b9
AE
3795
3796 return ret;
3797}
3798
3799static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3800{
3801 size_t size;
9d475de5 3802 int ret;
6e14b1a6 3803 u64 ver = 0;
a30b71b9
AE
3804
3805 /*
3806 * Image id was filled in by the caller. Record the header
3807 * object name for this rbd image.
3808 */
979ed480 3809 size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
a30b71b9
AE
3810 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3811 if (!rbd_dev->header_name)
3812 return -ENOMEM;
3813 sprintf(rbd_dev->header_name, "%s%s",
0d7dbfce 3814 RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
9d475de5
AE
3815
3816 /* Get the size and object order for the image */
3817
3818 ret = rbd_dev_v2_image_size(rbd_dev);
1e130199
AE
3819 if (ret < 0)
3820 goto out_err;
3821
3822 /* Get the object prefix (a.k.a. block_name) for the image */
3823
3824 ret = rbd_dev_v2_object_prefix(rbd_dev);
b1b5402a
AE
3825 if (ret < 0)
3826 goto out_err;
3827
d889140c 3828 /* Get the and check features for the image */
b1b5402a
AE
3829
3830 ret = rbd_dev_v2_features(rbd_dev);
9d475de5
AE
3831 if (ret < 0)
3832 goto out_err;
35d489f9 3833
86b00e0d
AE
3834 /* If the image supports layering, get the parent info */
3835
3836 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3837 ret = rbd_dev_v2_parent_info(rbd_dev);
3838 if (ret < 0)
3839 goto out_err;
3840 }
3841
6e14b1a6
AE
3842 /* crypto and compression type aren't (yet) supported for v2 images */
3843
3844 rbd_dev->header.crypt_type = 0;
3845 rbd_dev->header.comp_type = 0;
35d489f9 3846
6e14b1a6
AE
3847 /* Get the snapshot context, plus the header version */
3848
3849 ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
35d489f9
AE
3850 if (ret)
3851 goto out_err;
6e14b1a6
AE
3852 rbd_dev->header.obj_version = ver;
3853
a30b71b9
AE
3854 rbd_dev->image_format = 2;
3855
3856 dout("discovered version 2 image, header name is %s\n",
3857 rbd_dev->header_name);
3858
35152979 3859 return 0;
9d475de5 3860out_err:
86b00e0d
AE
3861 rbd_dev->parent_overlap = 0;
3862 rbd_spec_put(rbd_dev->parent_spec);
3863 rbd_dev->parent_spec = NULL;
9d475de5
AE
3864 kfree(rbd_dev->header_name);
3865 rbd_dev->header_name = NULL;
1e130199
AE
3866 kfree(rbd_dev->header.object_prefix);
3867 rbd_dev->header.object_prefix = NULL;
9d475de5
AE
3868
3869 return ret;
a30b71b9
AE
3870}
3871
83a06263
AE
3872static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
3873{
3874 int ret;
3875
3876 /* no need to lock here, as rbd_dev is not registered yet */
3877 ret = rbd_dev_snaps_update(rbd_dev);
3878 if (ret)
3879 return ret;
3880
9e15b77d
AE
3881 ret = rbd_dev_probe_update_spec(rbd_dev);
3882 if (ret)
3883 goto err_out_snaps;
3884
83a06263
AE
3885 ret = rbd_dev_set_mapping(rbd_dev);
3886 if (ret)
3887 goto err_out_snaps;
3888
3889 /* generate unique id: find highest unique id, add one */
3890 rbd_dev_id_get(rbd_dev);
3891
3892 /* Fill in the device name, now that we have its id. */
3893 BUILD_BUG_ON(DEV_NAME_LEN
3894 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3895 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3896
3897 /* Get our block major device number. */
3898
3899 ret = register_blkdev(0, rbd_dev->name);
3900 if (ret < 0)
3901 goto err_out_id;
3902 rbd_dev->major = ret;
3903
3904 /* Set up the blkdev mapping. */
3905
3906 ret = rbd_init_disk(rbd_dev);
3907 if (ret)
3908 goto err_out_blkdev;
3909
3910 ret = rbd_bus_add_dev(rbd_dev);
3911 if (ret)
3912 goto err_out_disk;
3913
3914 /*
3915 * At this point cleanup in the event of an error is the job
3916 * of the sysfs code (initiated by rbd_bus_del_dev()).
3917 */
3918 down_write(&rbd_dev->header_rwsem);
3919 ret = rbd_dev_snaps_register(rbd_dev);
3920 up_write(&rbd_dev->header_rwsem);
3921 if (ret)
3922 goto err_out_bus;
3923
9969ebc5 3924 ret = rbd_dev_header_watch_sync(rbd_dev, 1);
83a06263
AE
3925 if (ret)
3926 goto err_out_bus;
3927
3928 /* Everything's ready. Announce the disk to the world. */
3929
3930 add_disk(rbd_dev->disk);
3931
3932 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3933 (unsigned long long) rbd_dev->mapping.size);
3934
3935 return ret;
3936err_out_bus:
3937 /* this will also clean up rest of rbd_dev stuff */
3938
3939 rbd_bus_del_dev(rbd_dev);
3940
3941 return ret;
3942err_out_disk:
3943 rbd_free_disk(rbd_dev);
3944err_out_blkdev:
3945 unregister_blkdev(rbd_dev->major, rbd_dev->name);
3946err_out_id:
3947 rbd_dev_id_put(rbd_dev);
3948err_out_snaps:
3949 rbd_remove_all_snaps(rbd_dev);
3950
3951 return ret;
3952}
3953
a30b71b9
AE
3954/*
3955 * Probe for the existence of the header object for the given rbd
3956 * device. For format 2 images this includes determining the image
3957 * id.
3958 */
3959static int rbd_dev_probe(struct rbd_device *rbd_dev)
3960{
3961 int ret;
3962
3963 /*
3964 * Get the id from the image id object. If it's not a
3965 * format 2 image, we'll get ENOENT back, and we'll assume
3966 * it's a format 1 image.
3967 */
3968 ret = rbd_dev_image_id(rbd_dev);
3969 if (ret)
3970 ret = rbd_dev_v1_probe(rbd_dev);
3971 else
3972 ret = rbd_dev_v2_probe(rbd_dev);
83a06263 3973 if (ret) {
a30b71b9
AE
3974 dout("probe failed, returning %d\n", ret);
3975
83a06263
AE
3976 return ret;
3977 }
3978
3979 ret = rbd_dev_probe_finish(rbd_dev);
3980 if (ret)
3981 rbd_header_free(&rbd_dev->header);
3982
a30b71b9
AE
3983 return ret;
3984}
3985
59c2be1e
YS
3986static ssize_t rbd_add(struct bus_type *bus,
3987 const char *buf,
3988 size_t count)
602adf40 3989{
cb8627c7 3990 struct rbd_device *rbd_dev = NULL;
dc79b113 3991 struct ceph_options *ceph_opts = NULL;
4e9afeba 3992 struct rbd_options *rbd_opts = NULL;
859c31df 3993 struct rbd_spec *spec = NULL;
9d3997fd 3994 struct rbd_client *rbdc;
27cc2594
AE
3995 struct ceph_osd_client *osdc;
3996 int rc = -ENOMEM;
602adf40
YS
3997
3998 if (!try_module_get(THIS_MODULE))
3999 return -ENODEV;
4000
602adf40 4001 /* parse add command */
859c31df 4002 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 4003 if (rc < 0)
bd4ba655 4004 goto err_out_module;
78cea76e 4005
9d3997fd
AE
4006 rbdc = rbd_get_client(ceph_opts);
4007 if (IS_ERR(rbdc)) {
4008 rc = PTR_ERR(rbdc);
0ddebc0c 4009 goto err_out_args;
9d3997fd 4010 }
c53d5893 4011 ceph_opts = NULL; /* rbd_dev client now owns this */
602adf40 4012
602adf40 4013 /* pick the pool */
9d3997fd 4014 osdc = &rbdc->client->osdc;
859c31df 4015 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
602adf40
YS
4016 if (rc < 0)
4017 goto err_out_client;
859c31df
AE
4018 spec->pool_id = (u64) rc;
4019
0903e875
AE
4020 /* The ceph file layout needs to fit pool id in 32 bits */
4021
4022 if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4023 rc = -EIO;
4024 goto err_out_client;
4025 }
4026
c53d5893 4027 rbd_dev = rbd_dev_create(rbdc, spec);
bd4ba655
AE
4028 if (!rbd_dev)
4029 goto err_out_client;
c53d5893
AE
4030 rbdc = NULL; /* rbd_dev now owns this */
4031 spec = NULL; /* rbd_dev now owns this */
602adf40 4032
bd4ba655 4033 rbd_dev->mapping.read_only = rbd_opts->read_only;
c53d5893
AE
4034 kfree(rbd_opts);
4035 rbd_opts = NULL; /* done with this */
bd4ba655 4036
a30b71b9
AE
4037 rc = rbd_dev_probe(rbd_dev);
4038 if (rc < 0)
c53d5893 4039 goto err_out_rbd_dev;
05fd6f6f 4040
602adf40 4041 return count;
c53d5893
AE
4042err_out_rbd_dev:
4043 rbd_dev_destroy(rbd_dev);
bd4ba655 4044err_out_client:
9d3997fd 4045 rbd_put_client(rbdc);
0ddebc0c 4046err_out_args:
78cea76e
AE
4047 if (ceph_opts)
4048 ceph_destroy_options(ceph_opts);
4e9afeba 4049 kfree(rbd_opts);
859c31df 4050 rbd_spec_put(spec);
bd4ba655
AE
4051err_out_module:
4052 module_put(THIS_MODULE);
27cc2594 4053
602adf40 4054 dout("Error adding device %s\n", buf);
27cc2594
AE
4055
4056 return (ssize_t) rc;
602adf40
YS
4057}
4058
de71a297 4059static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
602adf40
YS
4060{
4061 struct list_head *tmp;
4062 struct rbd_device *rbd_dev;
4063
e124a82f 4064 spin_lock(&rbd_dev_list_lock);
602adf40
YS
4065 list_for_each(tmp, &rbd_dev_list) {
4066 rbd_dev = list_entry(tmp, struct rbd_device, node);
de71a297 4067 if (rbd_dev->dev_id == dev_id) {
e124a82f 4068 spin_unlock(&rbd_dev_list_lock);
602adf40 4069 return rbd_dev;
e124a82f 4070 }
602adf40 4071 }
e124a82f 4072 spin_unlock(&rbd_dev_list_lock);
602adf40
YS
4073 return NULL;
4074}
4075
dfc5606d 4076static void rbd_dev_release(struct device *dev)
602adf40 4077{
593a9e7b 4078 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 4079
1dbb4399
AE
4080 if (rbd_dev->watch_request) {
4081 struct ceph_client *client = rbd_dev->rbd_client->client;
4082
4083 ceph_osdc_unregister_linger_request(&client->osdc,
59c2be1e 4084 rbd_dev->watch_request);
1dbb4399 4085 }
59c2be1e 4086 if (rbd_dev->watch_event)
9969ebc5 4087 rbd_dev_header_watch_sync(rbd_dev, 0);
602adf40
YS
4088
4089 /* clean up and free blkdev */
4090 rbd_free_disk(rbd_dev);
4091 unregister_blkdev(rbd_dev->major, rbd_dev->name);
32eec68d 4092
2ac4e75d
AE
4093 /* release allocated disk header fields */
4094 rbd_header_free(&rbd_dev->header);
4095
32eec68d 4096 /* done with the id, and with the rbd_dev */
e2839308 4097 rbd_dev_id_put(rbd_dev);
c53d5893
AE
4098 rbd_assert(rbd_dev->rbd_client != NULL);
4099 rbd_dev_destroy(rbd_dev);
602adf40
YS
4100
4101 /* release module ref */
4102 module_put(THIS_MODULE);
602adf40
YS
4103}
4104
dfc5606d
YS
4105static ssize_t rbd_remove(struct bus_type *bus,
4106 const char *buf,
4107 size_t count)
602adf40
YS
4108{
4109 struct rbd_device *rbd_dev = NULL;
4110 int target_id, rc;
4111 unsigned long ul;
4112 int ret = count;
4113
4114 rc = strict_strtoul(buf, 10, &ul);
4115 if (rc)
4116 return rc;
4117
4118 /* convert to int; abort if we lost anything in the conversion */
4119 target_id = (int) ul;
4120 if (target_id != ul)
4121 return -EINVAL;
4122
4123 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4124
4125 rbd_dev = __rbd_get_dev(target_id);
4126 if (!rbd_dev) {
4127 ret = -ENOENT;
4128 goto done;
42382b70
AE
4129 }
4130
4131 if (rbd_dev->open_count) {
4132 ret = -EBUSY;
4133 goto done;
602adf40
YS
4134 }
4135
41f38c2b 4136 rbd_remove_all_snaps(rbd_dev);
dfc5606d 4137 rbd_bus_del_dev(rbd_dev);
602adf40
YS
4138
4139done:
4140 mutex_unlock(&ctl_mutex);
aafb230e 4141
602adf40
YS
4142 return ret;
4143}
4144
602adf40
YS
4145/*
4146 * create control files in sysfs
dfc5606d 4147 * /sys/bus/rbd/...
602adf40
YS
4148 */
4149static int rbd_sysfs_init(void)
4150{
dfc5606d 4151 int ret;
602adf40 4152
fed4c143 4153 ret = device_register(&rbd_root_dev);
21079786 4154 if (ret < 0)
dfc5606d 4155 return ret;
602adf40 4156
fed4c143
AE
4157 ret = bus_register(&rbd_bus_type);
4158 if (ret < 0)
4159 device_unregister(&rbd_root_dev);
602adf40 4160
602adf40
YS
4161 return ret;
4162}
4163
4164static void rbd_sysfs_cleanup(void)
4165{
dfc5606d 4166 bus_unregister(&rbd_bus_type);
fed4c143 4167 device_unregister(&rbd_root_dev);
602adf40
YS
4168}
4169
4170int __init rbd_init(void)
4171{
4172 int rc;
4173
4174 rc = rbd_sysfs_init();
4175 if (rc)
4176 return rc;
f0f8cef5 4177 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
602adf40
YS
4178 return 0;
4179}
4180
4181void __exit rbd_exit(void)
4182{
4183 rbd_sysfs_cleanup();
4184}
4185
4186module_init(rbd_init);
4187module_exit(rbd_exit);
4188
4189MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4190MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4191MODULE_DESCRIPTION("rados block device");
4192
4193/* following authorship retained from original osdblk.c */
4194MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4195
4196MODULE_LICENSE("GPL");
This page took 0.390852 seconds and 5 git commands to generate.