rbd: get rid of rbd_req_sync_watch()
[deliverable/linux.git] / drivers / block / rbd.c
CommitLineData
602adf40
YS
1/*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
dfc5606d 24 For usage instructions, please refer to:
602adf40 25
dfc5606d 26 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
59c2be1e 34#include <linux/parser.h>
602adf40
YS
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
aafb230e
AE
44#define RBD_DEBUG /* Activate rbd_assert() calls */
45
593a9e7b
AE
46/*
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
51 */
52#define SECTOR_SHIFT 9
53#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
54
2647ba38 55/* It might be useful to have these defined elsewhere */
df111be6 56
2647ba38
AE
57#define U8_MAX ((u8) (~0U))
58#define U16_MAX ((u16) (~0U))
59#define U32_MAX ((u32) (~0U))
60#define U64_MAX ((u64) (~0ULL))
df111be6 61
f0f8cef5
AE
62#define RBD_DRV_NAME "rbd"
63#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
64
65#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
66
d4b125e9
AE
67#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
68#define RBD_MAX_SNAP_NAME_LEN \
69 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
70
35d489f9 71#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
72
73#define RBD_SNAP_HEAD_NAME "-"
74
9e15b77d
AE
75/* This allows a single page to hold an image name sent by OSD */
76#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 77#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 78
1e130199 79#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 80
d889140c
AE
81/* Feature bits */
82
83#define RBD_FEATURE_LAYERING 1
84
85/* Features supported by this (client software) implementation. */
86
87#define RBD_FEATURES_ALL (0)
88
81a89793
AE
89/*
90 * An RBD device name will be "rbd#", where the "rbd" comes from
91 * RBD_DRV_NAME above, and # is a unique integer identifier.
92 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
93 * enough to hold all possible device names.
94 */
602adf40 95#define DEV_NAME_LEN 32
81a89793 96#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40
YS
97
98/*
99 * block device image metadata (in-memory version)
100 */
101struct rbd_image_header {
f84344f3 102 /* These four fields never change for a given rbd image */
849b4260 103 char *object_prefix;
34b13184 104 u64 features;
602adf40
YS
105 __u8 obj_order;
106 __u8 crypt_type;
107 __u8 comp_type;
602adf40 108
f84344f3
AE
109 /* The remaining fields need to be updated occasionally */
110 u64 image_size;
111 struct ceph_snap_context *snapc;
602adf40
YS
112 char *snap_names;
113 u64 *snap_sizes;
59c2be1e
YS
114
115 u64 obj_version;
116};
117
0d7dbfce
AE
118/*
119 * An rbd image specification.
120 *
121 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
122 * identify an image. Each rbd_dev structure includes a pointer to
123 * an rbd_spec structure that encapsulates this identity.
124 *
125 * Each of the id's in an rbd_spec has an associated name. For a
126 * user-mapped image, the names are supplied and the id's associated
127 * with them are looked up. For a layered image, a parent image is
128 * defined by the tuple, and the names are looked up.
129 *
130 * An rbd_dev structure contains a parent_spec pointer which is
131 * non-null if the image it represents is a child in a layered
132 * image. This pointer will refer to the rbd_spec structure used
133 * by the parent rbd_dev for its own identity (i.e., the structure
134 * is shared between the parent and child).
135 *
136 * Since these structures are populated once, during the discovery
137 * phase of image construction, they are effectively immutable so
138 * we make no effort to synchronize access to them.
139 *
140 * Note that code herein does not assume the image name is known (it
141 * could be a null pointer).
0d7dbfce
AE
142 */
143struct rbd_spec {
144 u64 pool_id;
145 char *pool_name;
146
147 char *image_id;
0d7dbfce 148 char *image_name;
0d7dbfce
AE
149
150 u64 snap_id;
151 char *snap_name;
152
153 struct kref kref;
154};
155
602adf40 156/*
f0f8cef5 157 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
158 */
159struct rbd_client {
160 struct ceph_client *client;
161 struct kref kref;
162 struct list_head node;
163};
164
bf0d5f50
AE
165struct rbd_img_request;
166typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
167
168#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
169
170struct rbd_obj_request;
171typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
172
9969ebc5
AE
173enum obj_request_type {
174 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
175};
bf0d5f50
AE
176
177struct rbd_obj_request {
178 const char *object_name;
179 u64 offset; /* object start byte */
180 u64 length; /* bytes from offset */
181
182 struct rbd_img_request *img_request;
183 struct list_head links; /* img_request->obj_requests */
184 u32 which; /* posn image request list */
185
186 enum obj_request_type type;
788e2df3
AE
187 union {
188 struct bio *bio_list;
189 struct {
190 struct page **pages;
191 u32 page_count;
192 };
193 };
bf0d5f50
AE
194
195 struct ceph_osd_request *osd_req;
196
197 u64 xferred; /* bytes transferred */
198 u64 version;
199 s32 result;
200 atomic_t done;
201
202 rbd_obj_callback_t callback;
788e2df3 203 struct completion completion;
bf0d5f50
AE
204
205 struct kref kref;
206};
207
208struct rbd_img_request {
209 struct request *rq;
210 struct rbd_device *rbd_dev;
211 u64 offset; /* starting image byte offset */
212 u64 length; /* byte count from offset */
213 bool write_request; /* false for read */
214 union {
215 struct ceph_snap_context *snapc; /* for writes */
216 u64 snap_id; /* for reads */
217 };
218 spinlock_t completion_lock;/* protects next_completion */
219 u32 next_completion;
220 rbd_img_callback_t callback;
221
222 u32 obj_request_count;
223 struct list_head obj_requests; /* rbd_obj_request structs */
224
225 struct kref kref;
226};
227
228#define for_each_obj_request(ireq, oreq) \
229 list_for_each_entry(oreq, &ireq->obj_requests, links)
230#define for_each_obj_request_from(ireq, oreq) \
231 list_for_each_entry_from(oreq, &ireq->obj_requests, links)
232#define for_each_obj_request_safe(ireq, oreq, n) \
233 list_for_each_entry_safe_reverse(oreq, n, &ireq->obj_requests, links)
234
dfc5606d
YS
235struct rbd_snap {
236 struct device dev;
237 const char *name;
3591538f 238 u64 size;
dfc5606d
YS
239 struct list_head node;
240 u64 id;
34b13184 241 u64 features;
dfc5606d
YS
242};
243
f84344f3 244struct rbd_mapping {
99c1f08f 245 u64 size;
34b13184 246 u64 features;
f84344f3
AE
247 bool read_only;
248};
249
602adf40
YS
250/*
251 * a single device
252 */
253struct rbd_device {
de71a297 254 int dev_id; /* blkdev unique id */
602adf40
YS
255
256 int major; /* blkdev assigned major */
257 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 258
a30b71b9 259 u32 image_format; /* Either 1 or 2 */
602adf40
YS
260 struct rbd_client *rbd_client;
261
262 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
263
264 spinlock_t lock; /* queue lock */
265
266 struct rbd_image_header header;
d78b650a 267 atomic_t exists;
0d7dbfce 268 struct rbd_spec *spec;
602adf40 269
0d7dbfce 270 char *header_name;
971f839a 271
0903e875
AE
272 struct ceph_file_layout layout;
273
59c2be1e
YS
274 struct ceph_osd_event *watch_event;
275 struct ceph_osd_request *watch_request;
276
86b00e0d
AE
277 struct rbd_spec *parent_spec;
278 u64 parent_overlap;
279
c666601a
JD
280 /* protects updating the header */
281 struct rw_semaphore header_rwsem;
f84344f3
AE
282
283 struct rbd_mapping mapping;
602adf40
YS
284
285 struct list_head node;
dfc5606d
YS
286
287 /* list of snapshots */
288 struct list_head snaps;
289
290 /* sysfs related */
291 struct device dev;
42382b70 292 unsigned long open_count;
dfc5606d
YS
293};
294
602adf40 295static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 296
602adf40 297static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
298static DEFINE_SPINLOCK(rbd_dev_list_lock);
299
432b8587
AE
300static LIST_HEAD(rbd_client_list); /* clients */
301static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 302
304f6808
AE
303static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
304static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
305
dfc5606d 306static void rbd_dev_release(struct device *dev);
41f38c2b 307static void rbd_remove_snap_dev(struct rbd_snap *snap);
dfc5606d 308
f0f8cef5
AE
309static ssize_t rbd_add(struct bus_type *bus, const char *buf,
310 size_t count);
311static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
312 size_t count);
313
314static struct bus_attribute rbd_bus_attrs[] = {
315 __ATTR(add, S_IWUSR, NULL, rbd_add),
316 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
317 __ATTR_NULL
318};
319
320static struct bus_type rbd_bus_type = {
321 .name = "rbd",
322 .bus_attrs = rbd_bus_attrs,
323};
324
325static void rbd_root_dev_release(struct device *dev)
326{
327}
328
329static struct device rbd_root_dev = {
330 .init_name = "rbd",
331 .release = rbd_root_dev_release,
332};
333
06ecc6cb
AE
334static __printf(2, 3)
335void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
336{
337 struct va_format vaf;
338 va_list args;
339
340 va_start(args, fmt);
341 vaf.fmt = fmt;
342 vaf.va = &args;
343
344 if (!rbd_dev)
345 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
346 else if (rbd_dev->disk)
347 printk(KERN_WARNING "%s: %s: %pV\n",
348 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
349 else if (rbd_dev->spec && rbd_dev->spec->image_name)
350 printk(KERN_WARNING "%s: image %s: %pV\n",
351 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
352 else if (rbd_dev->spec && rbd_dev->spec->image_id)
353 printk(KERN_WARNING "%s: id %s: %pV\n",
354 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
355 else /* punt */
356 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
357 RBD_DRV_NAME, rbd_dev, &vaf);
358 va_end(args);
359}
360
aafb230e
AE
361#ifdef RBD_DEBUG
362#define rbd_assert(expr) \
363 if (unlikely(!(expr))) { \
364 printk(KERN_ERR "\nAssertion failure in %s() " \
365 "at line %d:\n\n" \
366 "\trbd_assert(%s);\n\n", \
367 __func__, __LINE__, #expr); \
368 BUG(); \
369 }
370#else /* !RBD_DEBUG */
371# define rbd_assert(expr) ((void) 0)
372#endif /* !RBD_DEBUG */
dfc5606d 373
117973fb
AE
374static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
375static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
59c2be1e 376
602adf40
YS
377static int rbd_open(struct block_device *bdev, fmode_t mode)
378{
f0f8cef5 379 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
602adf40 380
f84344f3 381 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
602adf40
YS
382 return -EROFS;
383
42382b70 384 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 385 (void) get_device(&rbd_dev->dev);
f84344f3 386 set_device_ro(bdev, rbd_dev->mapping.read_only);
42382b70
AE
387 rbd_dev->open_count++;
388 mutex_unlock(&ctl_mutex);
340c7a2b 389
602adf40
YS
390 return 0;
391}
392
dfc5606d
YS
393static int rbd_release(struct gendisk *disk, fmode_t mode)
394{
395 struct rbd_device *rbd_dev = disk->private_data;
396
42382b70
AE
397 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
398 rbd_assert(rbd_dev->open_count > 0);
399 rbd_dev->open_count--;
c3e946ce 400 put_device(&rbd_dev->dev);
42382b70 401 mutex_unlock(&ctl_mutex);
dfc5606d
YS
402
403 return 0;
404}
405
602adf40
YS
406static const struct block_device_operations rbd_bd_ops = {
407 .owner = THIS_MODULE,
408 .open = rbd_open,
dfc5606d 409 .release = rbd_release,
602adf40
YS
410};
411
412/*
413 * Initialize an rbd client instance.
43ae4701 414 * We own *ceph_opts.
602adf40 415 */
f8c38929 416static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
417{
418 struct rbd_client *rbdc;
419 int ret = -ENOMEM;
420
421 dout("rbd_client_create\n");
422 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
423 if (!rbdc)
424 goto out_opt;
425
426 kref_init(&rbdc->kref);
427 INIT_LIST_HEAD(&rbdc->node);
428
bc534d86
AE
429 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
430
43ae4701 431 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 432 if (IS_ERR(rbdc->client))
bc534d86 433 goto out_mutex;
43ae4701 434 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
435
436 ret = ceph_open_session(rbdc->client);
437 if (ret < 0)
438 goto out_err;
439
432b8587 440 spin_lock(&rbd_client_list_lock);
602adf40 441 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 442 spin_unlock(&rbd_client_list_lock);
602adf40 443
bc534d86
AE
444 mutex_unlock(&ctl_mutex);
445
602adf40
YS
446 dout("rbd_client_create created %p\n", rbdc);
447 return rbdc;
448
449out_err:
450 ceph_destroy_client(rbdc->client);
bc534d86
AE
451out_mutex:
452 mutex_unlock(&ctl_mutex);
602adf40
YS
453 kfree(rbdc);
454out_opt:
43ae4701
AE
455 if (ceph_opts)
456 ceph_destroy_options(ceph_opts);
28f259b7 457 return ERR_PTR(ret);
602adf40
YS
458}
459
460/*
1f7ba331
AE
461 * Find a ceph client with specific addr and configuration. If
462 * found, bump its reference count.
602adf40 463 */
1f7ba331 464static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
465{
466 struct rbd_client *client_node;
1f7ba331 467 bool found = false;
602adf40 468
43ae4701 469 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
470 return NULL;
471
1f7ba331
AE
472 spin_lock(&rbd_client_list_lock);
473 list_for_each_entry(client_node, &rbd_client_list, node) {
474 if (!ceph_compare_options(ceph_opts, client_node->client)) {
475 kref_get(&client_node->kref);
476 found = true;
477 break;
478 }
479 }
480 spin_unlock(&rbd_client_list_lock);
481
482 return found ? client_node : NULL;
602adf40
YS
483}
484
59c2be1e
YS
485/*
486 * mount options
487 */
488enum {
59c2be1e
YS
489 Opt_last_int,
490 /* int args above */
491 Opt_last_string,
492 /* string args above */
cc0538b6
AE
493 Opt_read_only,
494 Opt_read_write,
495 /* Boolean args above */
496 Opt_last_bool,
59c2be1e
YS
497};
498
43ae4701 499static match_table_t rbd_opts_tokens = {
59c2be1e
YS
500 /* int args above */
501 /* string args above */
be466c1c 502 {Opt_read_only, "read_only"},
cc0538b6
AE
503 {Opt_read_only, "ro"}, /* Alternate spelling */
504 {Opt_read_write, "read_write"},
505 {Opt_read_write, "rw"}, /* Alternate spelling */
506 /* Boolean args above */
59c2be1e
YS
507 {-1, NULL}
508};
509
98571b5a
AE
510struct rbd_options {
511 bool read_only;
512};
513
514#define RBD_READ_ONLY_DEFAULT false
515
59c2be1e
YS
516static int parse_rbd_opts_token(char *c, void *private)
517{
43ae4701 518 struct rbd_options *rbd_opts = private;
59c2be1e
YS
519 substring_t argstr[MAX_OPT_ARGS];
520 int token, intval, ret;
521
43ae4701 522 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
523 if (token < 0)
524 return -EINVAL;
525
526 if (token < Opt_last_int) {
527 ret = match_int(&argstr[0], &intval);
528 if (ret < 0) {
529 pr_err("bad mount option arg (not int) "
530 "at '%s'\n", c);
531 return ret;
532 }
533 dout("got int token %d val %d\n", token, intval);
534 } else if (token > Opt_last_int && token < Opt_last_string) {
535 dout("got string token %d val %s\n", token,
536 argstr[0].from);
cc0538b6
AE
537 } else if (token > Opt_last_string && token < Opt_last_bool) {
538 dout("got Boolean token %d\n", token);
59c2be1e
YS
539 } else {
540 dout("got token %d\n", token);
541 }
542
543 switch (token) {
cc0538b6
AE
544 case Opt_read_only:
545 rbd_opts->read_only = true;
546 break;
547 case Opt_read_write:
548 rbd_opts->read_only = false;
549 break;
59c2be1e 550 default:
aafb230e
AE
551 rbd_assert(false);
552 break;
59c2be1e
YS
553 }
554 return 0;
555}
556
602adf40
YS
557/*
558 * Get a ceph client with specific addr and configuration, if one does
559 * not exist create it.
560 */
9d3997fd 561static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
602adf40 562{
f8c38929 563 struct rbd_client *rbdc;
59c2be1e 564
1f7ba331 565 rbdc = rbd_client_find(ceph_opts);
9d3997fd 566 if (rbdc) /* using an existing client */
43ae4701 567 ceph_destroy_options(ceph_opts);
9d3997fd 568 else
f8c38929 569 rbdc = rbd_client_create(ceph_opts);
602adf40 570
9d3997fd 571 return rbdc;
602adf40
YS
572}
573
574/*
575 * Destroy ceph client
d23a4b3f 576 *
432b8587 577 * Caller must hold rbd_client_list_lock.
602adf40
YS
578 */
579static void rbd_client_release(struct kref *kref)
580{
581 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
582
583 dout("rbd_release_client %p\n", rbdc);
cd9d9f5d 584 spin_lock(&rbd_client_list_lock);
602adf40 585 list_del(&rbdc->node);
cd9d9f5d 586 spin_unlock(&rbd_client_list_lock);
602adf40
YS
587
588 ceph_destroy_client(rbdc->client);
589 kfree(rbdc);
590}
591
592/*
593 * Drop reference to ceph client node. If it's not referenced anymore, release
594 * it.
595 */
9d3997fd 596static void rbd_put_client(struct rbd_client *rbdc)
602adf40 597{
c53d5893
AE
598 if (rbdc)
599 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
600}
601
a30b71b9
AE
602static bool rbd_image_format_valid(u32 image_format)
603{
604 return image_format == 1 || image_format == 2;
605}
606
8e94af8e
AE
607static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
608{
103a150f
AE
609 size_t size;
610 u32 snap_count;
611
612 /* The header has to start with the magic rbd header text */
613 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
614 return false;
615
db2388b6
AE
616 /* The bio layer requires at least sector-sized I/O */
617
618 if (ondisk->options.order < SECTOR_SHIFT)
619 return false;
620
621 /* If we use u64 in a few spots we may be able to loosen this */
622
623 if (ondisk->options.order > 8 * sizeof (int) - 1)
624 return false;
625
103a150f
AE
626 /*
627 * The size of a snapshot header has to fit in a size_t, and
628 * that limits the number of snapshots.
629 */
630 snap_count = le32_to_cpu(ondisk->snap_count);
631 size = SIZE_MAX - sizeof (struct ceph_snap_context);
632 if (snap_count > size / sizeof (__le64))
633 return false;
634
635 /*
636 * Not only that, but the size of the entire the snapshot
637 * header must also be representable in a size_t.
638 */
639 size -= snap_count * sizeof (__le64);
640 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
641 return false;
642
643 return true;
8e94af8e
AE
644}
645
602adf40
YS
646/*
647 * Create a new header structure, translate header format from the on-disk
648 * header.
649 */
650static int rbd_header_from_disk(struct rbd_image_header *header,
4156d998 651 struct rbd_image_header_ondisk *ondisk)
602adf40 652{
ccece235 653 u32 snap_count;
58c17b0e 654 size_t len;
d2bb24e5 655 size_t size;
621901d6 656 u32 i;
602adf40 657
6a52325f
AE
658 memset(header, 0, sizeof (*header));
659
103a150f
AE
660 snap_count = le32_to_cpu(ondisk->snap_count);
661
58c17b0e
AE
662 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
663 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
6a52325f 664 if (!header->object_prefix)
602adf40 665 return -ENOMEM;
58c17b0e
AE
666 memcpy(header->object_prefix, ondisk->object_prefix, len);
667 header->object_prefix[len] = '\0';
00f1f36f 668
602adf40 669 if (snap_count) {
f785cc1d
AE
670 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
671
621901d6
AE
672 /* Save a copy of the snapshot names */
673
f785cc1d
AE
674 if (snap_names_len > (u64) SIZE_MAX)
675 return -EIO;
676 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
602adf40 677 if (!header->snap_names)
6a52325f 678 goto out_err;
f785cc1d
AE
679 /*
680 * Note that rbd_dev_v1_header_read() guarantees
681 * the ondisk buffer we're working with has
682 * snap_names_len bytes beyond the end of the
683 * snapshot id array, this memcpy() is safe.
684 */
685 memcpy(header->snap_names, &ondisk->snaps[snap_count],
686 snap_names_len);
6a52325f 687
621901d6
AE
688 /* Record each snapshot's size */
689
d2bb24e5
AE
690 size = snap_count * sizeof (*header->snap_sizes);
691 header->snap_sizes = kmalloc(size, GFP_KERNEL);
602adf40 692 if (!header->snap_sizes)
6a52325f 693 goto out_err;
621901d6
AE
694 for (i = 0; i < snap_count; i++)
695 header->snap_sizes[i] =
696 le64_to_cpu(ondisk->snaps[i].image_size);
602adf40 697 } else {
ccece235 698 WARN_ON(ondisk->snap_names_len);
602adf40
YS
699 header->snap_names = NULL;
700 header->snap_sizes = NULL;
701 }
849b4260 702
34b13184 703 header->features = 0; /* No features support in v1 images */
602adf40
YS
704 header->obj_order = ondisk->options.order;
705 header->crypt_type = ondisk->options.crypt_type;
706 header->comp_type = ondisk->options.comp_type;
6a52325f 707
621901d6
AE
708 /* Allocate and fill in the snapshot context */
709
f84344f3 710 header->image_size = le64_to_cpu(ondisk->image_size);
6a52325f
AE
711 size = sizeof (struct ceph_snap_context);
712 size += snap_count * sizeof (header->snapc->snaps[0]);
713 header->snapc = kzalloc(size, GFP_KERNEL);
714 if (!header->snapc)
715 goto out_err;
602adf40
YS
716
717 atomic_set(&header->snapc->nref, 1);
505cbb9b 718 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 719 header->snapc->num_snaps = snap_count;
621901d6
AE
720 for (i = 0; i < snap_count; i++)
721 header->snapc->snaps[i] =
722 le64_to_cpu(ondisk->snaps[i].id);
602adf40
YS
723
724 return 0;
725
6a52325f 726out_err:
849b4260 727 kfree(header->snap_sizes);
ccece235 728 header->snap_sizes = NULL;
602adf40 729 kfree(header->snap_names);
ccece235 730 header->snap_names = NULL;
6a52325f
AE
731 kfree(header->object_prefix);
732 header->object_prefix = NULL;
ccece235 733
00f1f36f 734 return -ENOMEM;
602adf40
YS
735}
736
9e15b77d
AE
737static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
738{
739 struct rbd_snap *snap;
740
741 if (snap_id == CEPH_NOSNAP)
742 return RBD_SNAP_HEAD_NAME;
743
744 list_for_each_entry(snap, &rbd_dev->snaps, node)
745 if (snap_id == snap->id)
746 return snap->name;
747
748 return NULL;
749}
750
8836b995 751static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
602adf40 752{
602adf40 753
e86924a8 754 struct rbd_snap *snap;
602adf40 755
e86924a8
AE
756 list_for_each_entry(snap, &rbd_dev->snaps, node) {
757 if (!strcmp(snap_name, snap->name)) {
0d7dbfce 758 rbd_dev->spec->snap_id = snap->id;
e86924a8 759 rbd_dev->mapping.size = snap->size;
34b13184 760 rbd_dev->mapping.features = snap->features;
602adf40 761
e86924a8 762 return 0;
00f1f36f 763 }
00f1f36f 764 }
e86924a8 765
00f1f36f 766 return -ENOENT;
602adf40
YS
767}
768
819d52bf 769static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
602adf40 770{
78dc447d 771 int ret;
602adf40 772
0d7dbfce 773 if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
cc9d734c 774 sizeof (RBD_SNAP_HEAD_NAME))) {
0d7dbfce 775 rbd_dev->spec->snap_id = CEPH_NOSNAP;
99c1f08f 776 rbd_dev->mapping.size = rbd_dev->header.image_size;
34b13184 777 rbd_dev->mapping.features = rbd_dev->header.features;
e86924a8 778 ret = 0;
602adf40 779 } else {
0d7dbfce 780 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
602adf40
YS
781 if (ret < 0)
782 goto done;
f84344f3 783 rbd_dev->mapping.read_only = true;
602adf40 784 }
d78b650a 785 atomic_set(&rbd_dev->exists, 1);
602adf40 786done:
602adf40
YS
787 return ret;
788}
789
790static void rbd_header_free(struct rbd_image_header *header)
791{
849b4260 792 kfree(header->object_prefix);
d78fd7ae 793 header->object_prefix = NULL;
602adf40 794 kfree(header->snap_sizes);
d78fd7ae 795 header->snap_sizes = NULL;
849b4260 796 kfree(header->snap_names);
d78fd7ae 797 header->snap_names = NULL;
d1d25646 798 ceph_put_snap_context(header->snapc);
d78fd7ae 799 header->snapc = NULL;
602adf40
YS
800}
801
98571b5a 802static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
602adf40 803{
65ccfe21
AE
804 char *name;
805 u64 segment;
806 int ret;
602adf40 807
2fd82b9e 808 name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
65ccfe21
AE
809 if (!name)
810 return NULL;
811 segment = offset >> rbd_dev->header.obj_order;
2fd82b9e 812 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
65ccfe21 813 rbd_dev->header.object_prefix, segment);
2fd82b9e 814 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
65ccfe21
AE
815 pr_err("error formatting segment name for #%llu (%d)\n",
816 segment, ret);
817 kfree(name);
818 name = NULL;
819 }
602adf40 820
65ccfe21
AE
821 return name;
822}
602adf40 823
65ccfe21
AE
824static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
825{
826 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
602adf40 827
65ccfe21
AE
828 return offset & (segment_size - 1);
829}
830
831static u64 rbd_segment_length(struct rbd_device *rbd_dev,
832 u64 offset, u64 length)
833{
834 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
835
836 offset &= segment_size - 1;
837
aafb230e 838 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
839 if (offset + length > segment_size)
840 length = segment_size - offset;
841
842 return length;
602adf40
YS
843}
844
029bcbd8
JD
845/*
846 * returns the size of an object in the image
847 */
848static u64 rbd_obj_bytes(struct rbd_image_header *header)
849{
850 return 1 << header->obj_order;
851}
852
602adf40
YS
853/*
854 * bio helpers
855 */
856
857static void bio_chain_put(struct bio *chain)
858{
859 struct bio *tmp;
860
861 while (chain) {
862 tmp = chain;
863 chain = chain->bi_next;
864 bio_put(tmp);
865 }
866}
867
868/*
869 * zeros a bio chain, starting at specific offset
870 */
871static void zero_bio_chain(struct bio *chain, int start_ofs)
872{
873 struct bio_vec *bv;
874 unsigned long flags;
875 void *buf;
876 int i;
877 int pos = 0;
878
879 while (chain) {
880 bio_for_each_segment(bv, chain, i) {
881 if (pos + bv->bv_len > start_ofs) {
882 int remainder = max(start_ofs - pos, 0);
883 buf = bvec_kmap_irq(bv, &flags);
884 memset(buf + remainder, 0,
885 bv->bv_len - remainder);
85b5aaa6 886 bvec_kunmap_irq(buf, &flags);
602adf40
YS
887 }
888 pos += bv->bv_len;
889 }
890
891 chain = chain->bi_next;
892 }
893}
894
895/*
f7760dad
AE
896 * Clone a portion of a bio, starting at the given byte offset
897 * and continuing for the number of bytes indicated.
602adf40 898 */
f7760dad
AE
899static struct bio *bio_clone_range(struct bio *bio_src,
900 unsigned int offset,
901 unsigned int len,
902 gfp_t gfpmask)
602adf40 903{
f7760dad
AE
904 struct bio_vec *bv;
905 unsigned int resid;
906 unsigned short idx;
907 unsigned int voff;
908 unsigned short end_idx;
909 unsigned short vcnt;
910 struct bio *bio;
911
912 /* Handle the easy case for the caller */
913
914 if (!offset && len == bio_src->bi_size)
915 return bio_clone(bio_src, gfpmask);
916
917 if (WARN_ON_ONCE(!len))
918 return NULL;
919 if (WARN_ON_ONCE(len > bio_src->bi_size))
920 return NULL;
921 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
922 return NULL;
923
924 /* Find first affected segment... */
925
926 resid = offset;
927 __bio_for_each_segment(bv, bio_src, idx, 0) {
928 if (resid < bv->bv_len)
929 break;
930 resid -= bv->bv_len;
602adf40 931 }
f7760dad 932 voff = resid;
602adf40 933
f7760dad 934 /* ...and the last affected segment */
602adf40 935
f7760dad
AE
936 resid += len;
937 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
938 if (resid <= bv->bv_len)
939 break;
940 resid -= bv->bv_len;
941 }
942 vcnt = end_idx - idx + 1;
943
944 /* Build the clone */
945
946 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
947 if (!bio)
948 return NULL; /* ENOMEM */
602adf40 949
f7760dad
AE
950 bio->bi_bdev = bio_src->bi_bdev;
951 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
952 bio->bi_rw = bio_src->bi_rw;
953 bio->bi_flags |= 1 << BIO_CLONED;
954
955 /*
956 * Copy over our part of the bio_vec, then update the first
957 * and last (or only) entries.
958 */
959 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
960 vcnt * sizeof (struct bio_vec));
961 bio->bi_io_vec[0].bv_offset += voff;
962 if (vcnt > 1) {
963 bio->bi_io_vec[0].bv_len -= voff;
964 bio->bi_io_vec[vcnt - 1].bv_len = resid;
965 } else {
966 bio->bi_io_vec[0].bv_len = len;
602adf40
YS
967 }
968
f7760dad
AE
969 bio->bi_vcnt = vcnt;
970 bio->bi_size = len;
971 bio->bi_idx = 0;
972
973 return bio;
974}
975
976/*
977 * Clone a portion of a bio chain, starting at the given byte offset
978 * into the first bio in the source chain and continuing for the
979 * number of bytes indicated. The result is another bio chain of
980 * exactly the given length, or a null pointer on error.
981 *
982 * The bio_src and offset parameters are both in-out. On entry they
983 * refer to the first source bio and the offset into that bio where
984 * the start of data to be cloned is located.
985 *
986 * On return, bio_src is updated to refer to the bio in the source
987 * chain that contains first un-cloned byte, and *offset will
988 * contain the offset of that byte within that bio.
989 */
990static struct bio *bio_chain_clone_range(struct bio **bio_src,
991 unsigned int *offset,
992 unsigned int len,
993 gfp_t gfpmask)
994{
995 struct bio *bi = *bio_src;
996 unsigned int off = *offset;
997 struct bio *chain = NULL;
998 struct bio **end;
999
1000 /* Build up a chain of clone bios up to the limit */
1001
1002 if (!bi || off >= bi->bi_size || !len)
1003 return NULL; /* Nothing to clone */
602adf40 1004
f7760dad
AE
1005 end = &chain;
1006 while (len) {
1007 unsigned int bi_size;
1008 struct bio *bio;
1009
f5400b7a
AE
1010 if (!bi) {
1011 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
f7760dad 1012 goto out_err; /* EINVAL; ran out of bio's */
f5400b7a 1013 }
f7760dad
AE
1014 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1015 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1016 if (!bio)
1017 goto out_err; /* ENOMEM */
1018
1019 *end = bio;
1020 end = &bio->bi_next;
602adf40 1021
f7760dad
AE
1022 off += bi_size;
1023 if (off == bi->bi_size) {
1024 bi = bi->bi_next;
1025 off = 0;
1026 }
1027 len -= bi_size;
1028 }
1029 *bio_src = bi;
1030 *offset = off;
1031
1032 return chain;
1033out_err:
1034 bio_chain_put(chain);
602adf40 1035
602adf40
YS
1036 return NULL;
1037}
1038
bf0d5f50
AE
1039static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1040{
1041 kref_get(&obj_request->kref);
1042}
1043
1044static void rbd_obj_request_destroy(struct kref *kref);
1045static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1046{
1047 rbd_assert(obj_request != NULL);
1048 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1049}
1050
1051static void rbd_img_request_get(struct rbd_img_request *img_request)
1052{
1053 kref_get(&img_request->kref);
1054}
1055
1056static void rbd_img_request_destroy(struct kref *kref);
1057static void rbd_img_request_put(struct rbd_img_request *img_request)
1058{
1059 rbd_assert(img_request != NULL);
1060 kref_put(&img_request->kref, rbd_img_request_destroy);
1061}
1062
1063static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1064 struct rbd_obj_request *obj_request)
1065{
1066 rbd_obj_request_get(obj_request);
1067 obj_request->img_request = img_request;
1068 list_add_tail(&obj_request->links, &img_request->obj_requests);
1069 obj_request->which = img_request->obj_request_count++;
1070 rbd_assert(obj_request->which != BAD_WHICH);
1071}
1072
1073static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1074 struct rbd_obj_request *obj_request)
1075{
1076 rbd_assert(obj_request->which != BAD_WHICH);
1077 obj_request->which = BAD_WHICH;
1078 list_del(&obj_request->links);
1079 rbd_assert(obj_request->img_request == img_request);
1080 obj_request->callback = NULL;
1081 obj_request->img_request = NULL;
1082 rbd_obj_request_put(obj_request);
1083}
1084
1085static bool obj_request_type_valid(enum obj_request_type type)
1086{
1087 switch (type) {
9969ebc5 1088 case OBJ_REQUEST_NODATA:
bf0d5f50 1089 case OBJ_REQUEST_BIO:
788e2df3 1090 case OBJ_REQUEST_PAGES:
bf0d5f50
AE
1091 return true;
1092 default:
1093 return false;
1094 }
1095}
1096
8d23bf29
AE
1097struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...)
1098{
1099 struct ceph_osd_req_op *op;
1100 va_list args;
2647ba38 1101 size_t size;
8d23bf29
AE
1102
1103 op = kzalloc(sizeof (*op), GFP_NOIO);
1104 if (!op)
1105 return NULL;
1106 op->op = opcode;
1107 va_start(args, opcode);
1108 switch (opcode) {
1109 case CEPH_OSD_OP_READ:
1110 case CEPH_OSD_OP_WRITE:
1111 /* rbd_osd_req_op_create(READ, offset, length) */
1112 /* rbd_osd_req_op_create(WRITE, offset, length) */
1113 op->extent.offset = va_arg(args, u64);
1114 op->extent.length = va_arg(args, u64);
1115 if (opcode == CEPH_OSD_OP_WRITE)
1116 op->payload_len = op->extent.length;
1117 break;
2647ba38
AE
1118 case CEPH_OSD_OP_CALL:
1119 /* rbd_osd_req_op_create(CALL, class, method, data, datalen) */
1120 op->cls.class_name = va_arg(args, char *);
1121 size = strlen(op->cls.class_name);
1122 rbd_assert(size <= (size_t) U8_MAX);
1123 op->cls.class_len = size;
1124 op->payload_len = size;
1125
1126 op->cls.method_name = va_arg(args, char *);
1127 size = strlen(op->cls.method_name);
1128 rbd_assert(size <= (size_t) U8_MAX);
1129 op->cls.method_len = size;
1130 op->payload_len += size;
1131
1132 op->cls.argc = 0;
1133 op->cls.indata = va_arg(args, void *);
1134 size = va_arg(args, size_t);
1135 rbd_assert(size <= (size_t) U32_MAX);
1136 op->cls.indata_len = (u32) size;
1137 op->payload_len += size;
1138 break;
5efea49a
AE
1139 case CEPH_OSD_OP_NOTIFY_ACK:
1140 case CEPH_OSD_OP_WATCH:
1141 /* rbd_osd_req_op_create(NOTIFY_ACK, cookie, version) */
1142 /* rbd_osd_req_op_create(WATCH, cookie, version, flag) */
1143 op->watch.cookie = va_arg(args, u64);
1144 op->watch.ver = va_arg(args, u64);
1145 op->watch.ver = cpu_to_le64(op->watch.ver);
1146 if (opcode == CEPH_OSD_OP_WATCH && va_arg(args, int))
1147 op->watch.flag = (u8) 1;
1148 break;
8d23bf29
AE
1149 default:
1150 rbd_warn(NULL, "unsupported opcode %hu\n", opcode);
1151 kfree(op);
1152 op = NULL;
1153 break;
1154 }
1155 va_end(args);
1156
1157 return op;
1158}
1159
1160static void rbd_osd_req_op_destroy(struct ceph_osd_req_op *op)
1161{
1162 kfree(op);
1163}
1164
602adf40
YS
1165/*
1166 * Send ceph osd request
1167 */
1168static int rbd_do_request(struct request *rq,
0ce1a794 1169 struct rbd_device *rbd_dev,
602adf40
YS
1170 struct ceph_snap_context *snapc,
1171 u64 snapid,
aded07ea 1172 const char *object_name, u64 ofs, u64 len,
602adf40
YS
1173 struct bio *bio,
1174 struct page **pages,
1175 int num_pages,
1176 int flags,
30573d68 1177 struct ceph_osd_req_op *op,
5f29ddd4
AE
1178 void (*rbd_cb)(struct ceph_osd_request *,
1179 struct ceph_msg *),
59c2be1e 1180 u64 *ver)
602adf40 1181{
2e53c6c3 1182 struct ceph_osd_client *osdc;
5f29ddd4 1183 struct ceph_osd_request *osd_req;
602adf40 1184 struct timespec mtime = CURRENT_TIME;
2e53c6c3 1185 int ret;
602adf40 1186
7d250b94 1187 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n",
f7760dad 1188 object_name, (unsigned long long) ofs,
7d250b94 1189 (unsigned long long) len);
602adf40 1190
0ce1a794 1191 osdc = &rbd_dev->rbd_client->client->osdc;
30573d68 1192 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_NOIO);
2e53c6c3
AE
1193 if (!osd_req)
1194 return -ENOMEM;
602adf40 1195
d178a9e7 1196 osd_req->r_flags = flags;
54a54007
AE
1197 osd_req->r_pages = pages;
1198 if (bio) {
1199 osd_req->r_bio = bio;
1200 bio_get(osd_req->r_bio);
1201 }
602adf40 1202
2e53c6c3 1203 osd_req->r_callback = rbd_cb;
7d250b94 1204 osd_req->r_priv = NULL;
602adf40 1205
5f29ddd4
AE
1206 strncpy(osd_req->r_oid, object_name, sizeof(osd_req->r_oid));
1207 osd_req->r_oid_len = strlen(osd_req->r_oid);
602adf40 1208
0903e875 1209 osd_req->r_file_layout = rbd_dev->layout; /* struct */
e01e7927
AE
1210 osd_req->r_num_pages = calc_pages_for(ofs, len);
1211 osd_req->r_page_alignment = ofs & ~PAGE_MASK;
602adf40 1212
30573d68 1213 ceph_osdc_build_request(osd_req, ofs, len, 1, op,
ae7ca4a3 1214 snapc, snapid, &mtime);
602adf40 1215
8b84de79 1216 if (op->op == CEPH_OSD_OP_WATCH && op->watch.flag) {
5f29ddd4 1217 ceph_osdc_set_request_linger(osdc, osd_req);
8b84de79 1218 rbd_dev->watch_request = osd_req;
59c2be1e
YS
1219 }
1220
5f29ddd4 1221 ret = ceph_osdc_start_request(osdc, osd_req, false);
602adf40
YS
1222 if (ret < 0)
1223 goto done_err;
1224
1225 if (!rbd_cb) {
5f29ddd4
AE
1226 u64 version;
1227
1228 ret = ceph_osdc_wait_request(osdc, osd_req);
1229 version = le64_to_cpu(osd_req->r_reassert_version.version);
59c2be1e 1230 if (ver)
5f29ddd4
AE
1231 *ver = version;
1232 dout("reassert_ver=%llu\n", (unsigned long long) version);
1233 ceph_osdc_put_request(osd_req);
602adf40
YS
1234 }
1235 return ret;
1236
1237done_err:
2e53c6c3
AE
1238 if (bio)
1239 bio_chain_put(osd_req->r_bio);
2e53c6c3
AE
1240 ceph_osdc_put_request(osd_req);
1241
602adf40
YS
1242 return ret;
1243}
1244
5f29ddd4
AE
1245static void rbd_simple_req_cb(struct ceph_osd_request *osd_req,
1246 struct ceph_msg *msg)
59c2be1e 1247{
5f29ddd4 1248 ceph_osdc_put_request(osd_req);
59c2be1e
YS
1249}
1250
602adf40
YS
1251/*
1252 * Do a synchronous ceph osd operation
1253 */
0ce1a794 1254static int rbd_req_sync_op(struct rbd_device *rbd_dev,
602adf40 1255 int flags,
30573d68 1256 struct ceph_osd_req_op *op,
aded07ea 1257 const char *object_name,
f8d4de6e
AE
1258 u64 ofs, u64 inbound_size,
1259 char *inbound,
59c2be1e 1260 u64 *ver)
602adf40
YS
1261{
1262 int ret;
1263 struct page **pages;
1264 int num_pages;
913d2fdc 1265
30573d68 1266 rbd_assert(op != NULL);
602adf40 1267
f8d4de6e 1268 num_pages = calc_pages_for(ofs, inbound_size);
602adf40 1269 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
b8d0638a
DC
1270 if (IS_ERR(pages))
1271 return PTR_ERR(pages);
602adf40 1272
25704ac9 1273 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
f8d4de6e 1274 object_name, ofs, inbound_size, NULL,
602adf40
YS
1275 pages, num_pages,
1276 flags,
30573d68 1277 op,
59c2be1e 1278 NULL,
8b84de79 1279 ver);
602adf40 1280 if (ret < 0)
913d2fdc 1281 goto done;
602adf40 1282
f8d4de6e
AE
1283 if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1284 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
602adf40 1285
602adf40
YS
1286done:
1287 ceph_release_page_vector(pages, num_pages);
1288 return ret;
1289}
1290
bf0d5f50
AE
1291static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1292 struct rbd_obj_request *obj_request)
1293{
1294 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1295}
1296
1297static void rbd_img_request_complete(struct rbd_img_request *img_request)
1298{
1299 if (img_request->callback)
1300 img_request->callback(img_request);
1301 else
1302 rbd_img_request_put(img_request);
1303}
1304
788e2df3
AE
1305/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1306
1307static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1308{
1309 return wait_for_completion_interruptible(&obj_request->completion);
1310}
1311
9969ebc5
AE
1312static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request,
1313 struct ceph_osd_op *op)
1314{
1315 atomic_set(&obj_request->done, 1);
1316}
1317
bf0d5f50
AE
1318static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1319{
1320 if (obj_request->callback)
1321 obj_request->callback(obj_request);
788e2df3
AE
1322 else
1323 complete_all(&obj_request->completion);
bf0d5f50
AE
1324}
1325
602adf40 1326/*
59c2be1e
YS
1327 * Request sync osd watch
1328 */
0ce1a794 1329static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
59c2be1e 1330 u64 ver,
7f0a24d8 1331 u64 notify_id)
59c2be1e 1332{
139b4318 1333 struct ceph_osd_req_op *op;
11f77002
SW
1334 int ret;
1335
5efea49a 1336 op = rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver);
139b4318 1337 if (!op)
57cfc106 1338 return -ENOMEM;
59c2be1e 1339
0ce1a794 1340 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
7f0a24d8 1341 rbd_dev->header_name, 0, 0, NULL,
ad4f232f 1342 NULL, 0,
59c2be1e 1343 CEPH_OSD_FLAG_READ,
30573d68 1344 op,
8b84de79 1345 rbd_simple_req_cb, NULL);
59c2be1e 1346
5efea49a
AE
1347 rbd_osd_req_op_destroy(op);
1348
59c2be1e
YS
1349 return ret;
1350}
1351
1352static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1353{
0ce1a794 1354 struct rbd_device *rbd_dev = (struct rbd_device *)data;
a71b891b 1355 u64 hver;
13143d2d
SW
1356 int rc;
1357
0ce1a794 1358 if (!rbd_dev)
59c2be1e
YS
1359 return;
1360
bd919d45
AE
1361 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1362 rbd_dev->header_name, (unsigned long long) notify_id,
1363 (unsigned int) opcode);
117973fb 1364 rc = rbd_dev_refresh(rbd_dev, &hver);
13143d2d 1365 if (rc)
06ecc6cb
AE
1366 rbd_warn(rbd_dev, "got notification but failed to "
1367 " update snaps: %d\n", rc);
59c2be1e 1368
7f0a24d8 1369 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
59c2be1e
YS
1370}
1371
79e3057c 1372
602adf40 1373/*
3cb4a687 1374 * Synchronous osd object method call
602adf40 1375 */
0ce1a794 1376static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
aded07ea
AE
1377 const char *object_name,
1378 const char *class_name,
1379 const char *method_name,
3cb4a687
AE
1380 const char *outbound,
1381 size_t outbound_size,
f8d4de6e
AE
1382 char *inbound,
1383 size_t inbound_size,
59c2be1e 1384 u64 *ver)
602adf40 1385{
139b4318 1386 struct ceph_osd_req_op *op;
57cfc106
AE
1387 int ret;
1388
3cb4a687
AE
1389 /*
1390 * Any input parameters required by the method we're calling
1391 * will be sent along with the class and method names as
1392 * part of the message payload. That data and its size are
1393 * supplied via the indata and indata_len fields (named from
1394 * the perspective of the server side) in the OSD request
1395 * operation.
1396 */
2647ba38
AE
1397 op = rbd_osd_req_op_create(CEPH_OSD_OP_CALL, class_name,
1398 method_name, outbound, outbound_size);
139b4318 1399 if (!op)
57cfc106 1400 return -ENOMEM;
602adf40 1401
30573d68 1402 ret = rbd_req_sync_op(rbd_dev, CEPH_OSD_FLAG_READ, op,
f8d4de6e 1403 object_name, 0, inbound_size, inbound,
8b84de79 1404 ver);
602adf40 1405
2647ba38 1406 rbd_osd_req_op_destroy(op);
602adf40
YS
1407
1408 dout("cls_exec returned %d\n", ret);
1409 return ret;
1410}
1411
bf0d5f50
AE
1412static void rbd_osd_read_callback(struct rbd_obj_request *obj_request,
1413 struct ceph_osd_op *op)
1414{
1415 u64 xferred;
1416
1417 /*
1418 * We support a 64-bit length, but ultimately it has to be
1419 * passed to blk_end_request(), which takes an unsigned int.
1420 */
1421 xferred = le64_to_cpu(op->extent.length);
1422 rbd_assert(xferred < (u64) UINT_MAX);
1423 if (obj_request->result == (s32) -ENOENT) {
1424 zero_bio_chain(obj_request->bio_list, 0);
1425 obj_request->result = 0;
1426 } else if (xferred < obj_request->length && !obj_request->result) {
1427 zero_bio_chain(obj_request->bio_list, xferred);
1428 xferred = obj_request->length;
1429 }
1430 obj_request->xferred = xferred;
1431 atomic_set(&obj_request->done, 1);
1432}
1433
1434static void rbd_osd_write_callback(struct rbd_obj_request *obj_request,
1435 struct ceph_osd_op *op)
1436{
1437 obj_request->xferred = le64_to_cpu(op->extent.length);
1438 atomic_set(&obj_request->done, 1);
1439}
1440
1441static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1442 struct ceph_msg *msg)
1443{
1444 struct rbd_obj_request *obj_request = osd_req->r_priv;
1445 struct ceph_osd_reply_head *reply_head;
1446 struct ceph_osd_op *op;
1447 u32 num_ops;
1448 u16 opcode;
1449
1450 rbd_assert(osd_req == obj_request->osd_req);
1451 rbd_assert(!!obj_request->img_request ^
1452 (obj_request->which == BAD_WHICH));
1453
1454 obj_request->xferred = le32_to_cpu(msg->hdr.data_len);
1455 reply_head = msg->front.iov_base;
1456 obj_request->result = (s32) le32_to_cpu(reply_head->result);
1457 obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1458
1459 num_ops = le32_to_cpu(reply_head->num_ops);
1460 WARN_ON(num_ops != 1); /* For now */
1461
1462 op = &reply_head->ops[0];
1463 opcode = le16_to_cpu(op->op);
1464 switch (opcode) {
1465 case CEPH_OSD_OP_READ:
1466 rbd_osd_read_callback(obj_request, op);
1467 break;
1468 case CEPH_OSD_OP_WRITE:
1469 rbd_osd_write_callback(obj_request, op);
1470 break;
9969ebc5
AE
1471 case CEPH_OSD_OP_WATCH:
1472 rbd_osd_trivial_callback(obj_request, op);
1473 break;
bf0d5f50
AE
1474 default:
1475 rbd_warn(NULL, "%s: unsupported op %hu\n",
1476 obj_request->object_name, (unsigned short) opcode);
1477 break;
1478 }
1479
1480 if (atomic_read(&obj_request->done))
1481 rbd_obj_request_complete(obj_request);
1482}
1483
1484static struct ceph_osd_request *rbd_osd_req_create(
1485 struct rbd_device *rbd_dev,
1486 bool write_request,
1487 struct rbd_obj_request *obj_request,
1488 struct ceph_osd_req_op *op)
1489{
1490 struct rbd_img_request *img_request = obj_request->img_request;
1491 struct ceph_snap_context *snapc = NULL;
1492 struct ceph_osd_client *osdc;
1493 struct ceph_osd_request *osd_req;
1494 struct timespec now;
1495 struct timespec *mtime;
1496 u64 snap_id = CEPH_NOSNAP;
1497 u64 offset = obj_request->offset;
1498 u64 length = obj_request->length;
1499
1500 if (img_request) {
1501 rbd_assert(img_request->write_request == write_request);
1502 if (img_request->write_request)
1503 snapc = img_request->snapc;
1504 else
1505 snap_id = img_request->snap_id;
1506 }
1507
1508 /* Allocate and initialize the request, for the single op */
1509
1510 osdc = &rbd_dev->rbd_client->client->osdc;
1511 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1512 if (!osd_req)
1513 return NULL; /* ENOMEM */
1514
1515 rbd_assert(obj_request_type_valid(obj_request->type));
1516 switch (obj_request->type) {
9969ebc5
AE
1517 case OBJ_REQUEST_NODATA:
1518 break; /* Nothing to do */
bf0d5f50
AE
1519 case OBJ_REQUEST_BIO:
1520 rbd_assert(obj_request->bio_list != NULL);
1521 osd_req->r_bio = obj_request->bio_list;
1522 bio_get(osd_req->r_bio);
1523 /* osd client requires "num pages" even for bio */
1524 osd_req->r_num_pages = calc_pages_for(offset, length);
1525 break;
788e2df3
AE
1526 case OBJ_REQUEST_PAGES:
1527 osd_req->r_pages = obj_request->pages;
1528 osd_req->r_num_pages = obj_request->page_count;
1529 osd_req->r_page_alignment = offset & ~PAGE_MASK;
1530 break;
bf0d5f50
AE
1531 }
1532
1533 if (write_request) {
1534 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1535 now = CURRENT_TIME;
1536 mtime = &now;
1537 } else {
1538 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1539 mtime = NULL; /* not needed for reads */
1540 offset = 0; /* These are not used... */
1541 length = 0; /* ...for osd read requests */
1542 }
1543
1544 osd_req->r_callback = rbd_osd_req_callback;
1545 osd_req->r_priv = obj_request;
1546
1547 osd_req->r_oid_len = strlen(obj_request->object_name);
1548 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1549 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1550
1551 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1552
1553 /* osd_req will get its own reference to snapc (if non-null) */
1554
1555 ceph_osdc_build_request(osd_req, offset, length, 1, op,
1556 snapc, snap_id, mtime);
1557
1558 return osd_req;
1559}
1560
1561static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1562{
1563 ceph_osdc_put_request(osd_req);
1564}
1565
1566/* object_name is assumed to be a non-null pointer and NUL-terminated */
1567
1568static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1569 u64 offset, u64 length,
1570 enum obj_request_type type)
1571{
1572 struct rbd_obj_request *obj_request;
1573 size_t size;
1574 char *name;
1575
1576 rbd_assert(obj_request_type_valid(type));
1577
1578 size = strlen(object_name) + 1;
1579 obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1580 if (!obj_request)
1581 return NULL;
1582
1583 name = (char *)(obj_request + 1);
1584 obj_request->object_name = memcpy(name, object_name, size);
1585 obj_request->offset = offset;
1586 obj_request->length = length;
1587 obj_request->which = BAD_WHICH;
1588 obj_request->type = type;
1589 INIT_LIST_HEAD(&obj_request->links);
1590 atomic_set(&obj_request->done, 0);
788e2df3 1591 init_completion(&obj_request->completion);
bf0d5f50
AE
1592 kref_init(&obj_request->kref);
1593
1594 return obj_request;
1595}
1596
1597static void rbd_obj_request_destroy(struct kref *kref)
1598{
1599 struct rbd_obj_request *obj_request;
1600
1601 obj_request = container_of(kref, struct rbd_obj_request, kref);
1602
1603 rbd_assert(obj_request->img_request == NULL);
1604 rbd_assert(obj_request->which == BAD_WHICH);
1605
1606 if (obj_request->osd_req)
1607 rbd_osd_req_destroy(obj_request->osd_req);
1608
1609 rbd_assert(obj_request_type_valid(obj_request->type));
1610 switch (obj_request->type) {
9969ebc5
AE
1611 case OBJ_REQUEST_NODATA:
1612 break; /* Nothing to do */
bf0d5f50
AE
1613 case OBJ_REQUEST_BIO:
1614 if (obj_request->bio_list)
1615 bio_chain_put(obj_request->bio_list);
1616 break;
788e2df3
AE
1617 case OBJ_REQUEST_PAGES:
1618 if (obj_request->pages)
1619 ceph_release_page_vector(obj_request->pages,
1620 obj_request->page_count);
1621 break;
bf0d5f50
AE
1622 }
1623
1624 kfree(obj_request);
1625}
1626
1627/*
1628 * Caller is responsible for filling in the list of object requests
1629 * that comprises the image request, and the Linux request pointer
1630 * (if there is one).
1631 */
1632struct rbd_img_request *rbd_img_request_create(struct rbd_device *rbd_dev,
1633 u64 offset, u64 length,
1634 bool write_request)
1635{
1636 struct rbd_img_request *img_request;
1637 struct ceph_snap_context *snapc = NULL;
1638
1639 img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1640 if (!img_request)
1641 return NULL;
1642
1643 if (write_request) {
1644 down_read(&rbd_dev->header_rwsem);
1645 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1646 up_read(&rbd_dev->header_rwsem);
1647 if (WARN_ON(!snapc)) {
1648 kfree(img_request);
1649 return NULL; /* Shouldn't happen */
1650 }
1651 }
1652
1653 img_request->rq = NULL;
1654 img_request->rbd_dev = rbd_dev;
1655 img_request->offset = offset;
1656 img_request->length = length;
1657 img_request->write_request = write_request;
1658 if (write_request)
1659 img_request->snapc = snapc;
1660 else
1661 img_request->snap_id = rbd_dev->spec->snap_id;
1662 spin_lock_init(&img_request->completion_lock);
1663 img_request->next_completion = 0;
1664 img_request->callback = NULL;
1665 img_request->obj_request_count = 0;
1666 INIT_LIST_HEAD(&img_request->obj_requests);
1667 kref_init(&img_request->kref);
1668
1669 rbd_img_request_get(img_request); /* Avoid a warning */
1670 rbd_img_request_put(img_request); /* TEMPORARY */
1671
1672 return img_request;
1673}
1674
1675static void rbd_img_request_destroy(struct kref *kref)
1676{
1677 struct rbd_img_request *img_request;
1678 struct rbd_obj_request *obj_request;
1679 struct rbd_obj_request *next_obj_request;
1680
1681 img_request = container_of(kref, struct rbd_img_request, kref);
1682
1683 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1684 rbd_img_obj_request_del(img_request, obj_request);
1685
1686 if (img_request->write_request)
1687 ceph_put_snap_context(img_request->snapc);
1688
1689 kfree(img_request);
1690}
1691
1692static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
1693 struct bio *bio_list)
1694{
1695 struct rbd_device *rbd_dev = img_request->rbd_dev;
1696 struct rbd_obj_request *obj_request = NULL;
1697 struct rbd_obj_request *next_obj_request;
1698 unsigned int bio_offset;
1699 u64 image_offset;
1700 u64 resid;
1701 u16 opcode;
1702
1703 opcode = img_request->write_request ? CEPH_OSD_OP_WRITE
1704 : CEPH_OSD_OP_READ;
1705 bio_offset = 0;
1706 image_offset = img_request->offset;
1707 rbd_assert(image_offset == bio_list->bi_sector << SECTOR_SHIFT);
1708 resid = img_request->length;
1709 while (resid) {
1710 const char *object_name;
1711 unsigned int clone_size;
1712 struct ceph_osd_req_op *op;
1713 u64 offset;
1714 u64 length;
1715
1716 object_name = rbd_segment_name(rbd_dev, image_offset);
1717 if (!object_name)
1718 goto out_unwind;
1719 offset = rbd_segment_offset(rbd_dev, image_offset);
1720 length = rbd_segment_length(rbd_dev, image_offset, resid);
1721 obj_request = rbd_obj_request_create(object_name,
1722 offset, length,
1723 OBJ_REQUEST_BIO);
1724 kfree(object_name); /* object request has its own copy */
1725 if (!obj_request)
1726 goto out_unwind;
1727
1728 rbd_assert(length <= (u64) UINT_MAX);
1729 clone_size = (unsigned int) length;
1730 obj_request->bio_list = bio_chain_clone_range(&bio_list,
1731 &bio_offset, clone_size,
1732 GFP_ATOMIC);
1733 if (!obj_request->bio_list)
1734 goto out_partial;
1735
1736 /*
1737 * Build up the op to use in building the osd
1738 * request. Note that the contents of the op are
1739 * copied by rbd_osd_req_create().
1740 */
1741 op = rbd_osd_req_op_create(opcode, offset, length);
1742 if (!op)
1743 goto out_partial;
1744 obj_request->osd_req = rbd_osd_req_create(rbd_dev,
1745 img_request->write_request,
1746 obj_request, op);
1747 rbd_osd_req_op_destroy(op);
1748 if (!obj_request->osd_req)
1749 goto out_partial;
1750 /* status and version are initially zero-filled */
1751
1752 rbd_img_obj_request_add(img_request, obj_request);
1753
1754 image_offset += length;
1755 resid -= length;
1756 }
1757
1758 return 0;
1759
1760out_partial:
1761 rbd_obj_request_put(obj_request);
1762out_unwind:
1763 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1764 rbd_obj_request_put(obj_request);
1765
1766 return -ENOMEM;
1767}
1768
1769static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1770{
1771 struct rbd_img_request *img_request;
1772 u32 which = obj_request->which;
1773 bool more = true;
1774
1775 img_request = obj_request->img_request;
1776 rbd_assert(img_request != NULL);
1777 rbd_assert(img_request->rq != NULL);
1778 rbd_assert(which != BAD_WHICH);
1779 rbd_assert(which < img_request->obj_request_count);
1780 rbd_assert(which >= img_request->next_completion);
1781
1782 spin_lock_irq(&img_request->completion_lock);
1783 if (which != img_request->next_completion)
1784 goto out;
1785
1786 for_each_obj_request_from(img_request, obj_request) {
1787 unsigned int xferred;
1788 int result;
1789
1790 rbd_assert(more);
1791 rbd_assert(which < img_request->obj_request_count);
1792
1793 if (!atomic_read(&obj_request->done))
1794 break;
1795
1796 rbd_assert(obj_request->xferred <= (u64) UINT_MAX);
1797 xferred = (unsigned int) obj_request->xferred;
1798 result = (int) obj_request->result;
1799 if (result)
1800 rbd_warn(NULL, "obj_request %s result %d xferred %u\n",
1801 img_request->write_request ? "write" : "read",
1802 result, xferred);
1803
1804 more = blk_end_request(img_request->rq, result, xferred);
1805 which++;
1806 }
1807 rbd_assert(more ^ (which == img_request->obj_request_count));
1808 img_request->next_completion = which;
1809out:
1810 spin_unlock_irq(&img_request->completion_lock);
1811
1812 if (!more)
1813 rbd_img_request_complete(img_request);
1814}
1815
1816static int rbd_img_request_submit(struct rbd_img_request *img_request)
1817{
1818 struct rbd_device *rbd_dev = img_request->rbd_dev;
1819 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1820 struct rbd_obj_request *obj_request;
1821
1822 for_each_obj_request(img_request, obj_request) {
1823 int ret;
1824
1825 obj_request->callback = rbd_img_obj_callback;
1826 ret = rbd_obj_request_submit(osdc, obj_request);
1827 if (ret)
1828 return ret;
1829 /*
1830 * The image request has its own reference to each
1831 * of its object requests, so we can safely drop the
1832 * initial one here.
1833 */
1834 rbd_obj_request_put(obj_request);
1835 }
1836
1837 return 0;
1838}
1839
9969ebc5
AE
1840/*
1841 * Request sync osd watch/unwatch. The value of "start" determines
1842 * whether a watch request is being initiated or torn down.
1843 */
1844static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
1845{
1846 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1847 struct rbd_obj_request *obj_request;
1848 struct ceph_osd_req_op *op;
1849 int ret;
1850
1851 rbd_assert(start ^ !!rbd_dev->watch_event);
1852 rbd_assert(start ^ !!rbd_dev->watch_request);
1853
1854 if (start) {
1855 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0, rbd_dev,
1856 &rbd_dev->watch_event);
1857 if (ret < 0)
1858 return ret;
1859 }
1860
1861 ret = -ENOMEM;
1862 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1863 OBJ_REQUEST_NODATA);
1864 if (!obj_request)
1865 goto out_cancel;
1866
1867 op = rbd_osd_req_op_create(CEPH_OSD_OP_WATCH,
1868 rbd_dev->watch_event->cookie,
1869 rbd_dev->header.obj_version, start);
1870 if (!op)
1871 goto out_cancel;
1872 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true,
1873 obj_request, op);
1874 rbd_osd_req_op_destroy(op);
1875 if (!obj_request->osd_req)
1876 goto out_cancel;
1877
1878 if (start) {
1879 rbd_dev->watch_request = obj_request->osd_req;
1880 ceph_osdc_set_request_linger(osdc, rbd_dev->watch_request);
1881 }
1882 ret = rbd_obj_request_submit(osdc, obj_request);
1883 if (ret)
1884 goto out_cancel;
1885 ret = rbd_obj_request_wait(obj_request);
1886 if (ret)
1887 goto out_cancel;
1888
1889 ret = obj_request->result;
1890 if (ret)
1891 goto out_cancel;
1892
1893 if (start)
1894 goto done; /* Done if setting up the watch request */
1895out_cancel:
1896 /* Cancel the event if we're tearing down, or on error */
1897 ceph_osdc_cancel_event(rbd_dev->watch_event);
1898 rbd_dev->watch_event = NULL;
1899done:
1900 if (obj_request)
1901 rbd_obj_request_put(obj_request);
1902
1903 return ret;
1904}
1905
bf0d5f50
AE
1906static void rbd_request_fn(struct request_queue *q)
1907{
1908 struct rbd_device *rbd_dev = q->queuedata;
1909 bool read_only = rbd_dev->mapping.read_only;
1910 struct request *rq;
1911 int result;
1912
1913 while ((rq = blk_fetch_request(q))) {
1914 bool write_request = rq_data_dir(rq) == WRITE;
1915 struct rbd_img_request *img_request;
1916 u64 offset;
1917 u64 length;
1918
1919 /* Ignore any non-FS requests that filter through. */
1920
1921 if (rq->cmd_type != REQ_TYPE_FS) {
1922 __blk_end_request_all(rq, 0);
1923 continue;
1924 }
1925
1926 spin_unlock_irq(q->queue_lock);
1927
1928 /* Disallow writes to a read-only device */
1929
1930 if (write_request) {
1931 result = -EROFS;
1932 if (read_only)
1933 goto end_request;
1934 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
1935 }
1936
1937 /* Quit early if the snapshot has disappeared */
1938
1939 if (!atomic_read(&rbd_dev->exists)) {
1940 dout("request for non-existent snapshot");
1941 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
1942 result = -ENXIO;
1943 goto end_request;
1944 }
1945
1946 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
1947 length = (u64) blk_rq_bytes(rq);
1948
1949 result = -EINVAL;
1950 if (WARN_ON(offset && length > U64_MAX - offset + 1))
1951 goto end_request; /* Shouldn't happen */
1952
1953 result = -ENOMEM;
1954 img_request = rbd_img_request_create(rbd_dev, offset, length,
1955 write_request);
1956 if (!img_request)
1957 goto end_request;
1958
1959 img_request->rq = rq;
1960
1961 result = rbd_img_request_fill_bio(img_request, rq->bio);
1962 if (!result)
1963 result = rbd_img_request_submit(img_request);
1964 if (result)
1965 rbd_img_request_put(img_request);
1966end_request:
1967 spin_lock_irq(q->queue_lock);
1968 if (result < 0) {
1969 rbd_warn(rbd_dev, "obj_request %s result %d\n",
1970 write_request ? "write" : "read", result);
1971 __blk_end_request_all(rq, result);
1972 }
1973 }
1974}
1975
602adf40
YS
1976/*
1977 * a queue callback. Makes sure that we don't create a bio that spans across
1978 * multiple osd objects. One exception would be with a single page bios,
f7760dad 1979 * which we handle later at bio_chain_clone_range()
602adf40
YS
1980 */
1981static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1982 struct bio_vec *bvec)
1983{
1984 struct rbd_device *rbd_dev = q->queuedata;
e5cfeed2
AE
1985 sector_t sector_offset;
1986 sector_t sectors_per_obj;
1987 sector_t obj_sector_offset;
1988 int ret;
1989
1990 /*
1991 * Find how far into its rbd object the partition-relative
1992 * bio start sector is to offset relative to the enclosing
1993 * device.
1994 */
1995 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
1996 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1997 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
1998
1999 /*
2000 * Compute the number of bytes from that offset to the end
2001 * of the object. Account for what's already used by the bio.
2002 */
2003 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2004 if (ret > bmd->bi_size)
2005 ret -= bmd->bi_size;
2006 else
2007 ret = 0;
2008
2009 /*
2010 * Don't send back more than was asked for. And if the bio
2011 * was empty, let the whole thing through because: "Note
2012 * that a block device *must* allow a single page to be
2013 * added to an empty bio."
2014 */
2015 rbd_assert(bvec->bv_len <= PAGE_SIZE);
2016 if (ret > (int) bvec->bv_len || !bmd->bi_size)
2017 ret = (int) bvec->bv_len;
2018
2019 return ret;
602adf40
YS
2020}
2021
2022static void rbd_free_disk(struct rbd_device *rbd_dev)
2023{
2024 struct gendisk *disk = rbd_dev->disk;
2025
2026 if (!disk)
2027 return;
2028
602adf40
YS
2029 if (disk->flags & GENHD_FL_UP)
2030 del_gendisk(disk);
2031 if (disk->queue)
2032 blk_cleanup_queue(disk->queue);
2033 put_disk(disk);
2034}
2035
788e2df3
AE
2036static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2037 const char *object_name,
2038 u64 offset, u64 length,
2039 char *buf, u64 *version)
2040
2041{
2042 struct ceph_osd_req_op *op;
2043 struct rbd_obj_request *obj_request;
2044 struct ceph_osd_client *osdc;
2045 struct page **pages = NULL;
2046 u32 page_count;
2047 int ret;
2048
2049 page_count = (u32) calc_pages_for(offset, length);
2050 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2051 if (IS_ERR(pages))
2052 ret = PTR_ERR(pages);
2053
2054 ret = -ENOMEM;
2055 obj_request = rbd_obj_request_create(object_name, offset, length,
2056 OBJ_REQUEST_PAGES);
2057 if (!obj_request)
2058 goto out;
2059
2060 obj_request->pages = pages;
2061 obj_request->page_count = page_count;
2062
2063 op = rbd_osd_req_op_create(CEPH_OSD_OP_READ, offset, length);
2064 if (!op)
2065 goto out;
2066 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2067 obj_request, op);
2068 rbd_osd_req_op_destroy(op);
2069 if (!obj_request->osd_req)
2070 goto out;
2071
2072 osdc = &rbd_dev->rbd_client->client->osdc;
2073 ret = rbd_obj_request_submit(osdc, obj_request);
2074 if (ret)
2075 goto out;
2076 ret = rbd_obj_request_wait(obj_request);
2077 if (ret)
2078 goto out;
2079
2080 ret = obj_request->result;
2081 if (ret < 0)
2082 goto out;
2083 ret = ceph_copy_from_page_vector(pages, buf, 0, obj_request->xferred);
2084 if (version)
2085 *version = obj_request->version;
2086out:
2087 if (obj_request)
2088 rbd_obj_request_put(obj_request);
2089 else
2090 ceph_release_page_vector(pages, page_count);
2091
2092 return ret;
2093}
2094
602adf40 2095/*
4156d998
AE
2096 * Read the complete header for the given rbd device.
2097 *
2098 * Returns a pointer to a dynamically-allocated buffer containing
2099 * the complete and validated header. Caller can pass the address
2100 * of a variable that will be filled in with the version of the
2101 * header object at the time it was read.
2102 *
2103 * Returns a pointer-coded errno if a failure occurs.
602adf40 2104 */
4156d998
AE
2105static struct rbd_image_header_ondisk *
2106rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
602adf40 2107{
4156d998 2108 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 2109 u32 snap_count = 0;
4156d998
AE
2110 u64 names_size = 0;
2111 u32 want_count;
2112 int ret;
602adf40 2113
00f1f36f 2114 /*
4156d998
AE
2115 * The complete header will include an array of its 64-bit
2116 * snapshot ids, followed by the names of those snapshots as
2117 * a contiguous block of NUL-terminated strings. Note that
2118 * the number of snapshots could change by the time we read
2119 * it in, in which case we re-read it.
00f1f36f 2120 */
4156d998
AE
2121 do {
2122 size_t size;
2123
2124 kfree(ondisk);
2125
2126 size = sizeof (*ondisk);
2127 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2128 size += names_size;
2129 ondisk = kmalloc(size, GFP_KERNEL);
2130 if (!ondisk)
2131 return ERR_PTR(-ENOMEM);
2132
788e2df3 2133 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
4156d998
AE
2134 0, size,
2135 (char *) ondisk, version);
2136
2137 if (ret < 0)
2138 goto out_err;
2139 if (WARN_ON((size_t) ret < size)) {
2140 ret = -ENXIO;
06ecc6cb
AE
2141 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2142 size, ret);
4156d998
AE
2143 goto out_err;
2144 }
2145 if (!rbd_dev_ondisk_valid(ondisk)) {
2146 ret = -ENXIO;
06ecc6cb 2147 rbd_warn(rbd_dev, "invalid header");
4156d998 2148 goto out_err;
81e759fb 2149 }
602adf40 2150
4156d998
AE
2151 names_size = le64_to_cpu(ondisk->snap_names_len);
2152 want_count = snap_count;
2153 snap_count = le32_to_cpu(ondisk->snap_count);
2154 } while (snap_count != want_count);
00f1f36f 2155
4156d998 2156 return ondisk;
00f1f36f 2157
4156d998
AE
2158out_err:
2159 kfree(ondisk);
2160
2161 return ERR_PTR(ret);
2162}
2163
2164/*
2165 * reload the ondisk the header
2166 */
2167static int rbd_read_header(struct rbd_device *rbd_dev,
2168 struct rbd_image_header *header)
2169{
2170 struct rbd_image_header_ondisk *ondisk;
2171 u64 ver = 0;
2172 int ret;
602adf40 2173
4156d998
AE
2174 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2175 if (IS_ERR(ondisk))
2176 return PTR_ERR(ondisk);
2177 ret = rbd_header_from_disk(header, ondisk);
2178 if (ret >= 0)
2179 header->obj_version = ver;
2180 kfree(ondisk);
2181
2182 return ret;
602adf40
YS
2183}
2184
41f38c2b 2185static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
dfc5606d
YS
2186{
2187 struct rbd_snap *snap;
a0593290 2188 struct rbd_snap *next;
dfc5606d 2189
a0593290 2190 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
41f38c2b 2191 rbd_remove_snap_dev(snap);
dfc5606d
YS
2192}
2193
9478554a
AE
2194static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2195{
2196 sector_t size;
2197
0d7dbfce 2198 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
9478554a
AE
2199 return;
2200
2201 size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
2202 dout("setting size to %llu sectors", (unsigned long long) size);
2203 rbd_dev->mapping.size = (u64) size;
2204 set_capacity(rbd_dev->disk, size);
2205}
2206
602adf40
YS
2207/*
2208 * only read the first part of the ondisk header, without the snaps info
2209 */
117973fb 2210static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
602adf40
YS
2211{
2212 int ret;
2213 struct rbd_image_header h;
602adf40
YS
2214
2215 ret = rbd_read_header(rbd_dev, &h);
2216 if (ret < 0)
2217 return ret;
2218
a51aa0c0
JD
2219 down_write(&rbd_dev->header_rwsem);
2220
9478554a
AE
2221 /* Update image size, and check for resize of mapped image */
2222 rbd_dev->header.image_size = h.image_size;
2223 rbd_update_mapping_size(rbd_dev);
9db4b3e3 2224
849b4260 2225 /* rbd_dev->header.object_prefix shouldn't change */
602adf40 2226 kfree(rbd_dev->header.snap_sizes);
849b4260 2227 kfree(rbd_dev->header.snap_names);
d1d25646
JD
2228 /* osd requests may still refer to snapc */
2229 ceph_put_snap_context(rbd_dev->header.snapc);
602adf40 2230
b813623a
AE
2231 if (hver)
2232 *hver = h.obj_version;
a71b891b 2233 rbd_dev->header.obj_version = h.obj_version;
93a24e08 2234 rbd_dev->header.image_size = h.image_size;
602adf40
YS
2235 rbd_dev->header.snapc = h.snapc;
2236 rbd_dev->header.snap_names = h.snap_names;
2237 rbd_dev->header.snap_sizes = h.snap_sizes;
849b4260
AE
2238 /* Free the extra copy of the object prefix */
2239 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
2240 kfree(h.object_prefix);
2241
304f6808
AE
2242 ret = rbd_dev_snaps_update(rbd_dev);
2243 if (!ret)
2244 ret = rbd_dev_snaps_register(rbd_dev);
dfc5606d 2245
c666601a 2246 up_write(&rbd_dev->header_rwsem);
602adf40 2247
dfc5606d 2248 return ret;
602adf40
YS
2249}
2250
117973fb 2251static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1fe5e993
AE
2252{
2253 int ret;
2254
117973fb 2255 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1fe5e993 2256 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
117973fb
AE
2257 if (rbd_dev->image_format == 1)
2258 ret = rbd_dev_v1_refresh(rbd_dev, hver);
2259 else
2260 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1fe5e993
AE
2261 mutex_unlock(&ctl_mutex);
2262
2263 return ret;
2264}
2265
602adf40
YS
2266static int rbd_init_disk(struct rbd_device *rbd_dev)
2267{
2268 struct gendisk *disk;
2269 struct request_queue *q;
593a9e7b 2270 u64 segment_size;
602adf40 2271
602adf40 2272 /* create gendisk info */
602adf40
YS
2273 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
2274 if (!disk)
1fcdb8aa 2275 return -ENOMEM;
602adf40 2276
f0f8cef5 2277 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 2278 rbd_dev->dev_id);
602adf40
YS
2279 disk->major = rbd_dev->major;
2280 disk->first_minor = 0;
2281 disk->fops = &rbd_bd_ops;
2282 disk->private_data = rbd_dev;
2283
bf0d5f50 2284 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
602adf40
YS
2285 if (!q)
2286 goto out_disk;
029bcbd8 2287
593a9e7b
AE
2288 /* We use the default size, but let's be explicit about it. */
2289 blk_queue_physical_block_size(q, SECTOR_SIZE);
2290
029bcbd8 2291 /* set io sizes to object size */
593a9e7b
AE
2292 segment_size = rbd_obj_bytes(&rbd_dev->header);
2293 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
2294 blk_queue_max_segment_size(q, segment_size);
2295 blk_queue_io_min(q, segment_size);
2296 blk_queue_io_opt(q, segment_size);
029bcbd8 2297
602adf40
YS
2298 blk_queue_merge_bvec(q, rbd_merge_bvec);
2299 disk->queue = q;
2300
2301 q->queuedata = rbd_dev;
2302
2303 rbd_dev->disk = disk;
602adf40 2304
12f02944
AE
2305 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
2306
602adf40 2307 return 0;
602adf40
YS
2308out_disk:
2309 put_disk(disk);
1fcdb8aa
AE
2310
2311 return -ENOMEM;
602adf40
YS
2312}
2313
dfc5606d
YS
2314/*
2315 sysfs
2316*/
2317
593a9e7b
AE
2318static struct rbd_device *dev_to_rbd_dev(struct device *dev)
2319{
2320 return container_of(dev, struct rbd_device, dev);
2321}
2322
dfc5606d
YS
2323static ssize_t rbd_size_show(struct device *dev,
2324 struct device_attribute *attr, char *buf)
2325{
593a9e7b 2326 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0
JD
2327 sector_t size;
2328
2329 down_read(&rbd_dev->header_rwsem);
2330 size = get_capacity(rbd_dev->disk);
2331 up_read(&rbd_dev->header_rwsem);
dfc5606d 2332
a51aa0c0 2333 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
dfc5606d
YS
2334}
2335
34b13184
AE
2336/*
2337 * Note this shows the features for whatever's mapped, which is not
2338 * necessarily the base image.
2339 */
2340static ssize_t rbd_features_show(struct device *dev,
2341 struct device_attribute *attr, char *buf)
2342{
2343 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2344
2345 return sprintf(buf, "0x%016llx\n",
2346 (unsigned long long) rbd_dev->mapping.features);
2347}
2348
dfc5606d
YS
2349static ssize_t rbd_major_show(struct device *dev,
2350 struct device_attribute *attr, char *buf)
2351{
593a9e7b 2352 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 2353
dfc5606d
YS
2354 return sprintf(buf, "%d\n", rbd_dev->major);
2355}
2356
2357static ssize_t rbd_client_id_show(struct device *dev,
2358 struct device_attribute *attr, char *buf)
602adf40 2359{
593a9e7b 2360 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 2361
1dbb4399
AE
2362 return sprintf(buf, "client%lld\n",
2363 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
2364}
2365
dfc5606d
YS
2366static ssize_t rbd_pool_show(struct device *dev,
2367 struct device_attribute *attr, char *buf)
602adf40 2368{
593a9e7b 2369 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 2370
0d7dbfce 2371 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
2372}
2373
9bb2f334
AE
2374static ssize_t rbd_pool_id_show(struct device *dev,
2375 struct device_attribute *attr, char *buf)
2376{
2377 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2378
0d7dbfce
AE
2379 return sprintf(buf, "%llu\n",
2380 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
2381}
2382
dfc5606d
YS
2383static ssize_t rbd_name_show(struct device *dev,
2384 struct device_attribute *attr, char *buf)
2385{
593a9e7b 2386 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 2387
a92ffdf8
AE
2388 if (rbd_dev->spec->image_name)
2389 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2390
2391 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
2392}
2393
589d30e0
AE
2394static ssize_t rbd_image_id_show(struct device *dev,
2395 struct device_attribute *attr, char *buf)
2396{
2397 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2398
0d7dbfce 2399 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
2400}
2401
34b13184
AE
2402/*
2403 * Shows the name of the currently-mapped snapshot (or
2404 * RBD_SNAP_HEAD_NAME for the base image).
2405 */
dfc5606d
YS
2406static ssize_t rbd_snap_show(struct device *dev,
2407 struct device_attribute *attr,
2408 char *buf)
2409{
593a9e7b 2410 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 2411
0d7dbfce 2412 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
2413}
2414
86b00e0d
AE
2415/*
2416 * For an rbd v2 image, shows the pool id, image id, and snapshot id
2417 * for the parent image. If there is no parent, simply shows
2418 * "(no parent image)".
2419 */
2420static ssize_t rbd_parent_show(struct device *dev,
2421 struct device_attribute *attr,
2422 char *buf)
2423{
2424 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2425 struct rbd_spec *spec = rbd_dev->parent_spec;
2426 int count;
2427 char *bufp = buf;
2428
2429 if (!spec)
2430 return sprintf(buf, "(no parent image)\n");
2431
2432 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2433 (unsigned long long) spec->pool_id, spec->pool_name);
2434 if (count < 0)
2435 return count;
2436 bufp += count;
2437
2438 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2439 spec->image_name ? spec->image_name : "(unknown)");
2440 if (count < 0)
2441 return count;
2442 bufp += count;
2443
2444 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2445 (unsigned long long) spec->snap_id, spec->snap_name);
2446 if (count < 0)
2447 return count;
2448 bufp += count;
2449
2450 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2451 if (count < 0)
2452 return count;
2453 bufp += count;
2454
2455 return (ssize_t) (bufp - buf);
2456}
2457
dfc5606d
YS
2458static ssize_t rbd_image_refresh(struct device *dev,
2459 struct device_attribute *attr,
2460 const char *buf,
2461 size_t size)
2462{
593a9e7b 2463 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 2464 int ret;
602adf40 2465
117973fb 2466 ret = rbd_dev_refresh(rbd_dev, NULL);
b813623a
AE
2467
2468 return ret < 0 ? ret : size;
dfc5606d 2469}
602adf40 2470
dfc5606d 2471static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 2472static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d
YS
2473static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2474static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2475static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 2476static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 2477static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 2478static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
2479static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2480static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
86b00e0d 2481static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
2482
2483static struct attribute *rbd_attrs[] = {
2484 &dev_attr_size.attr,
34b13184 2485 &dev_attr_features.attr,
dfc5606d
YS
2486 &dev_attr_major.attr,
2487 &dev_attr_client_id.attr,
2488 &dev_attr_pool.attr,
9bb2f334 2489 &dev_attr_pool_id.attr,
dfc5606d 2490 &dev_attr_name.attr,
589d30e0 2491 &dev_attr_image_id.attr,
dfc5606d 2492 &dev_attr_current_snap.attr,
86b00e0d 2493 &dev_attr_parent.attr,
dfc5606d 2494 &dev_attr_refresh.attr,
dfc5606d
YS
2495 NULL
2496};
2497
2498static struct attribute_group rbd_attr_group = {
2499 .attrs = rbd_attrs,
2500};
2501
2502static const struct attribute_group *rbd_attr_groups[] = {
2503 &rbd_attr_group,
2504 NULL
2505};
2506
2507static void rbd_sysfs_dev_release(struct device *dev)
2508{
2509}
2510
2511static struct device_type rbd_device_type = {
2512 .name = "rbd",
2513 .groups = rbd_attr_groups,
2514 .release = rbd_sysfs_dev_release,
2515};
2516
2517
2518/*
2519 sysfs - snapshots
2520*/
2521
2522static ssize_t rbd_snap_size_show(struct device *dev,
2523 struct device_attribute *attr,
2524 char *buf)
2525{
2526 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2527
3591538f 2528 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
dfc5606d
YS
2529}
2530
2531static ssize_t rbd_snap_id_show(struct device *dev,
2532 struct device_attribute *attr,
2533 char *buf)
2534{
2535 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2536
3591538f 2537 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
dfc5606d
YS
2538}
2539
34b13184
AE
2540static ssize_t rbd_snap_features_show(struct device *dev,
2541 struct device_attribute *attr,
2542 char *buf)
2543{
2544 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2545
2546 return sprintf(buf, "0x%016llx\n",
2547 (unsigned long long) snap->features);
2548}
2549
dfc5606d
YS
2550static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2551static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
34b13184 2552static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
dfc5606d
YS
2553
2554static struct attribute *rbd_snap_attrs[] = {
2555 &dev_attr_snap_size.attr,
2556 &dev_attr_snap_id.attr,
34b13184 2557 &dev_attr_snap_features.attr,
dfc5606d
YS
2558 NULL,
2559};
2560
2561static struct attribute_group rbd_snap_attr_group = {
2562 .attrs = rbd_snap_attrs,
2563};
2564
2565static void rbd_snap_dev_release(struct device *dev)
2566{
2567 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2568 kfree(snap->name);
2569 kfree(snap);
2570}
2571
2572static const struct attribute_group *rbd_snap_attr_groups[] = {
2573 &rbd_snap_attr_group,
2574 NULL
2575};
2576
2577static struct device_type rbd_snap_device_type = {
2578 .groups = rbd_snap_attr_groups,
2579 .release = rbd_snap_dev_release,
2580};
2581
8b8fb99c
AE
2582static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2583{
2584 kref_get(&spec->kref);
2585
2586 return spec;
2587}
2588
2589static void rbd_spec_free(struct kref *kref);
2590static void rbd_spec_put(struct rbd_spec *spec)
2591{
2592 if (spec)
2593 kref_put(&spec->kref, rbd_spec_free);
2594}
2595
2596static struct rbd_spec *rbd_spec_alloc(void)
2597{
2598 struct rbd_spec *spec;
2599
2600 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2601 if (!spec)
2602 return NULL;
2603 kref_init(&spec->kref);
2604
2605 rbd_spec_put(rbd_spec_get(spec)); /* TEMPORARY */
2606
2607 return spec;
2608}
2609
2610static void rbd_spec_free(struct kref *kref)
2611{
2612 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2613
2614 kfree(spec->pool_name);
2615 kfree(spec->image_id);
2616 kfree(spec->image_name);
2617 kfree(spec->snap_name);
2618 kfree(spec);
2619}
2620
c53d5893
AE
2621struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2622 struct rbd_spec *spec)
2623{
2624 struct rbd_device *rbd_dev;
2625
2626 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2627 if (!rbd_dev)
2628 return NULL;
2629
2630 spin_lock_init(&rbd_dev->lock);
d78b650a 2631 atomic_set(&rbd_dev->exists, 0);
c53d5893
AE
2632 INIT_LIST_HEAD(&rbd_dev->node);
2633 INIT_LIST_HEAD(&rbd_dev->snaps);
2634 init_rwsem(&rbd_dev->header_rwsem);
2635
2636 rbd_dev->spec = spec;
2637 rbd_dev->rbd_client = rbdc;
2638
0903e875
AE
2639 /* Initialize the layout used for all rbd requests */
2640
2641 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2642 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
2643 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2644 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
2645
c53d5893
AE
2646 return rbd_dev;
2647}
2648
2649static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2650{
86b00e0d 2651 rbd_spec_put(rbd_dev->parent_spec);
c53d5893
AE
2652 kfree(rbd_dev->header_name);
2653 rbd_put_client(rbd_dev->rbd_client);
2654 rbd_spec_put(rbd_dev->spec);
2655 kfree(rbd_dev);
2656}
2657
304f6808
AE
2658static bool rbd_snap_registered(struct rbd_snap *snap)
2659{
2660 bool ret = snap->dev.type == &rbd_snap_device_type;
2661 bool reg = device_is_registered(&snap->dev);
2662
2663 rbd_assert(!ret ^ reg);
2664
2665 return ret;
2666}
2667
41f38c2b 2668static void rbd_remove_snap_dev(struct rbd_snap *snap)
dfc5606d
YS
2669{
2670 list_del(&snap->node);
304f6808
AE
2671 if (device_is_registered(&snap->dev))
2672 device_unregister(&snap->dev);
dfc5606d
YS
2673}
2674
14e7085d 2675static int rbd_register_snap_dev(struct rbd_snap *snap,
dfc5606d
YS
2676 struct device *parent)
2677{
2678 struct device *dev = &snap->dev;
2679 int ret;
2680
2681 dev->type = &rbd_snap_device_type;
2682 dev->parent = parent;
2683 dev->release = rbd_snap_dev_release;
d4b125e9 2684 dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
304f6808
AE
2685 dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2686
dfc5606d
YS
2687 ret = device_register(dev);
2688
2689 return ret;
2690}
2691
4e891e0a 2692static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
c8d18425 2693 const char *snap_name,
34b13184
AE
2694 u64 snap_id, u64 snap_size,
2695 u64 snap_features)
dfc5606d 2696{
4e891e0a 2697 struct rbd_snap *snap;
dfc5606d 2698 int ret;
4e891e0a
AE
2699
2700 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
dfc5606d 2701 if (!snap)
4e891e0a
AE
2702 return ERR_PTR(-ENOMEM);
2703
2704 ret = -ENOMEM;
c8d18425 2705 snap->name = kstrdup(snap_name, GFP_KERNEL);
4e891e0a
AE
2706 if (!snap->name)
2707 goto err;
2708
c8d18425
AE
2709 snap->id = snap_id;
2710 snap->size = snap_size;
34b13184 2711 snap->features = snap_features;
4e891e0a
AE
2712
2713 return snap;
2714
dfc5606d
YS
2715err:
2716 kfree(snap->name);
2717 kfree(snap);
4e891e0a
AE
2718
2719 return ERR_PTR(ret);
dfc5606d
YS
2720}
2721
cd892126
AE
2722static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2723 u64 *snap_size, u64 *snap_features)
2724{
2725 char *snap_name;
2726
2727 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2728
2729 *snap_size = rbd_dev->header.snap_sizes[which];
2730 *snap_features = 0; /* No features for v1 */
2731
2732 /* Skip over names until we find the one we are looking for */
2733
2734 snap_name = rbd_dev->header.snap_names;
2735 while (which--)
2736 snap_name += strlen(snap_name) + 1;
2737
2738 return snap_name;
2739}
2740
9d475de5
AE
2741/*
2742 * Get the size and object order for an image snapshot, or if
2743 * snap_id is CEPH_NOSNAP, gets this information for the base
2744 * image.
2745 */
2746static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2747 u8 *order, u64 *snap_size)
2748{
2749 __le64 snapid = cpu_to_le64(snap_id);
2750 int ret;
2751 struct {
2752 u8 order;
2753 __le64 size;
2754 } __attribute__ ((packed)) size_buf = { 0 };
2755
2756 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2757 "rbd", "get_size",
2758 (char *) &snapid, sizeof (snapid),
07b2391f 2759 (char *) &size_buf, sizeof (size_buf), NULL);
9d475de5
AE
2760 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2761 if (ret < 0)
2762 return ret;
2763
2764 *order = size_buf.order;
2765 *snap_size = le64_to_cpu(size_buf.size);
2766
2767 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
2768 (unsigned long long) snap_id, (unsigned int) *order,
2769 (unsigned long long) *snap_size);
2770
2771 return 0;
2772}
2773
2774static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2775{
2776 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2777 &rbd_dev->header.obj_order,
2778 &rbd_dev->header.image_size);
2779}
2780
1e130199
AE
2781static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2782{
2783 void *reply_buf;
2784 int ret;
2785 void *p;
2786
2787 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2788 if (!reply_buf)
2789 return -ENOMEM;
2790
2791 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2792 "rbd", "get_object_prefix",
2793 NULL, 0,
07b2391f 2794 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
1e130199
AE
2795 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2796 if (ret < 0)
2797 goto out;
a0ea3a40 2798 ret = 0; /* rbd_req_sync_exec() can return positive */
1e130199
AE
2799
2800 p = reply_buf;
2801 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2802 p + RBD_OBJ_PREFIX_LEN_MAX,
2803 NULL, GFP_NOIO);
2804
2805 if (IS_ERR(rbd_dev->header.object_prefix)) {
2806 ret = PTR_ERR(rbd_dev->header.object_prefix);
2807 rbd_dev->header.object_prefix = NULL;
2808 } else {
2809 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
2810 }
2811
2812out:
2813 kfree(reply_buf);
2814
2815 return ret;
2816}
2817
b1b5402a
AE
2818static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2819 u64 *snap_features)
2820{
2821 __le64 snapid = cpu_to_le64(snap_id);
2822 struct {
2823 __le64 features;
2824 __le64 incompat;
2825 } features_buf = { 0 };
d889140c 2826 u64 incompat;
b1b5402a
AE
2827 int ret;
2828
2829 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2830 "rbd", "get_features",
2831 (char *) &snapid, sizeof (snapid),
2832 (char *) &features_buf, sizeof (features_buf),
07b2391f 2833 NULL);
b1b5402a
AE
2834 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2835 if (ret < 0)
2836 return ret;
d889140c
AE
2837
2838 incompat = le64_to_cpu(features_buf.incompat);
2839 if (incompat & ~RBD_FEATURES_ALL)
b8f5c6ed 2840 return -ENXIO;
d889140c 2841
b1b5402a
AE
2842 *snap_features = le64_to_cpu(features_buf.features);
2843
2844 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2845 (unsigned long long) snap_id,
2846 (unsigned long long) *snap_features,
2847 (unsigned long long) le64_to_cpu(features_buf.incompat));
2848
2849 return 0;
2850}
2851
2852static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2853{
2854 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2855 &rbd_dev->header.features);
2856}
2857
86b00e0d
AE
2858static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2859{
2860 struct rbd_spec *parent_spec;
2861 size_t size;
2862 void *reply_buf = NULL;
2863 __le64 snapid;
2864 void *p;
2865 void *end;
2866 char *image_id;
2867 u64 overlap;
86b00e0d
AE
2868 int ret;
2869
2870 parent_spec = rbd_spec_alloc();
2871 if (!parent_spec)
2872 return -ENOMEM;
2873
2874 size = sizeof (__le64) + /* pool_id */
2875 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
2876 sizeof (__le64) + /* snap_id */
2877 sizeof (__le64); /* overlap */
2878 reply_buf = kmalloc(size, GFP_KERNEL);
2879 if (!reply_buf) {
2880 ret = -ENOMEM;
2881 goto out_err;
2882 }
2883
2884 snapid = cpu_to_le64(CEPH_NOSNAP);
2885 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2886 "rbd", "get_parent",
2887 (char *) &snapid, sizeof (snapid),
07b2391f 2888 (char *) reply_buf, size, NULL);
86b00e0d
AE
2889 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2890 if (ret < 0)
2891 goto out_err;
2892
2893 ret = -ERANGE;
2894 p = reply_buf;
2895 end = (char *) reply_buf + size;
2896 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
2897 if (parent_spec->pool_id == CEPH_NOPOOL)
2898 goto out; /* No parent? No problem. */
2899
0903e875
AE
2900 /* The ceph file layout needs to fit pool id in 32 bits */
2901
2902 ret = -EIO;
2903 if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
2904 goto out;
2905
979ed480 2906 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
2907 if (IS_ERR(image_id)) {
2908 ret = PTR_ERR(image_id);
2909 goto out_err;
2910 }
2911 parent_spec->image_id = image_id;
2912 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
2913 ceph_decode_64_safe(&p, end, overlap, out_err);
2914
2915 rbd_dev->parent_overlap = overlap;
2916 rbd_dev->parent_spec = parent_spec;
2917 parent_spec = NULL; /* rbd_dev now owns this */
2918out:
2919 ret = 0;
2920out_err:
2921 kfree(reply_buf);
2922 rbd_spec_put(parent_spec);
2923
2924 return ret;
2925}
2926
9e15b77d
AE
2927static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
2928{
2929 size_t image_id_size;
2930 char *image_id;
2931 void *p;
2932 void *end;
2933 size_t size;
2934 void *reply_buf = NULL;
2935 size_t len = 0;
2936 char *image_name = NULL;
2937 int ret;
2938
2939 rbd_assert(!rbd_dev->spec->image_name);
2940
69e7a02f
AE
2941 len = strlen(rbd_dev->spec->image_id);
2942 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
2943 image_id = kmalloc(image_id_size, GFP_KERNEL);
2944 if (!image_id)
2945 return NULL;
2946
2947 p = image_id;
2948 end = (char *) image_id + image_id_size;
69e7a02f 2949 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
9e15b77d
AE
2950
2951 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
2952 reply_buf = kmalloc(size, GFP_KERNEL);
2953 if (!reply_buf)
2954 goto out;
2955
2956 ret = rbd_req_sync_exec(rbd_dev, RBD_DIRECTORY,
2957 "rbd", "dir_get_name",
2958 image_id, image_id_size,
07b2391f 2959 (char *) reply_buf, size, NULL);
9e15b77d
AE
2960 if (ret < 0)
2961 goto out;
2962 p = reply_buf;
2963 end = (char *) reply_buf + size;
2964 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
2965 if (IS_ERR(image_name))
2966 image_name = NULL;
2967 else
2968 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
2969out:
2970 kfree(reply_buf);
2971 kfree(image_id);
2972
2973 return image_name;
2974}
2975
2976/*
2977 * When a parent image gets probed, we only have the pool, image,
2978 * and snapshot ids but not the names of any of them. This call
2979 * is made later to fill in those names. It has to be done after
2980 * rbd_dev_snaps_update() has completed because some of the
2981 * information (in particular, snapshot name) is not available
2982 * until then.
2983 */
2984static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
2985{
2986 struct ceph_osd_client *osdc;
2987 const char *name;
2988 void *reply_buf = NULL;
2989 int ret;
2990
2991 if (rbd_dev->spec->pool_name)
2992 return 0; /* Already have the names */
2993
2994 /* Look up the pool name */
2995
2996 osdc = &rbd_dev->rbd_client->client->osdc;
2997 name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
935dc89f
AE
2998 if (!name) {
2999 rbd_warn(rbd_dev, "there is no pool with id %llu",
3000 rbd_dev->spec->pool_id); /* Really a BUG() */
3001 return -EIO;
3002 }
9e15b77d
AE
3003
3004 rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3005 if (!rbd_dev->spec->pool_name)
3006 return -ENOMEM;
3007
3008 /* Fetch the image name; tolerate failure here */
3009
3010 name = rbd_dev_image_name(rbd_dev);
69e7a02f 3011 if (name)
9e15b77d 3012 rbd_dev->spec->image_name = (char *) name;
69e7a02f 3013 else
06ecc6cb 3014 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d
AE
3015
3016 /* Look up the snapshot name. */
3017
3018 name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3019 if (!name) {
935dc89f
AE
3020 rbd_warn(rbd_dev, "no snapshot with id %llu",
3021 rbd_dev->spec->snap_id); /* Really a BUG() */
9e15b77d
AE
3022 ret = -EIO;
3023 goto out_err;
3024 }
3025 rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3026 if(!rbd_dev->spec->snap_name)
3027 goto out_err;
3028
3029 return 0;
3030out_err:
3031 kfree(reply_buf);
3032 kfree(rbd_dev->spec->pool_name);
3033 rbd_dev->spec->pool_name = NULL;
3034
3035 return ret;
3036}
3037
6e14b1a6 3038static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
35d489f9
AE
3039{
3040 size_t size;
3041 int ret;
3042 void *reply_buf;
3043 void *p;
3044 void *end;
3045 u64 seq;
3046 u32 snap_count;
3047 struct ceph_snap_context *snapc;
3048 u32 i;
3049
3050 /*
3051 * We'll need room for the seq value (maximum snapshot id),
3052 * snapshot count, and array of that many snapshot ids.
3053 * For now we have a fixed upper limit on the number we're
3054 * prepared to receive.
3055 */
3056 size = sizeof (__le64) + sizeof (__le32) +
3057 RBD_MAX_SNAP_COUNT * sizeof (__le64);
3058 reply_buf = kzalloc(size, GFP_KERNEL);
3059 if (!reply_buf)
3060 return -ENOMEM;
3061
3062 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
3063 "rbd", "get_snapcontext",
3064 NULL, 0,
07b2391f 3065 reply_buf, size, ver);
35d489f9
AE
3066 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
3067 if (ret < 0)
3068 goto out;
3069
3070 ret = -ERANGE;
3071 p = reply_buf;
3072 end = (char *) reply_buf + size;
3073 ceph_decode_64_safe(&p, end, seq, out);
3074 ceph_decode_32_safe(&p, end, snap_count, out);
3075
3076 /*
3077 * Make sure the reported number of snapshot ids wouldn't go
3078 * beyond the end of our buffer. But before checking that,
3079 * make sure the computed size of the snapshot context we
3080 * allocate is representable in a size_t.
3081 */
3082 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3083 / sizeof (u64)) {
3084 ret = -EINVAL;
3085 goto out;
3086 }
3087 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3088 goto out;
3089
3090 size = sizeof (struct ceph_snap_context) +
3091 snap_count * sizeof (snapc->snaps[0]);
3092 snapc = kmalloc(size, GFP_KERNEL);
3093 if (!snapc) {
3094 ret = -ENOMEM;
3095 goto out;
3096 }
3097
3098 atomic_set(&snapc->nref, 1);
3099 snapc->seq = seq;
3100 snapc->num_snaps = snap_count;
3101 for (i = 0; i < snap_count; i++)
3102 snapc->snaps[i] = ceph_decode_64(&p);
3103
3104 rbd_dev->header.snapc = snapc;
3105
3106 dout(" snap context seq = %llu, snap_count = %u\n",
3107 (unsigned long long) seq, (unsigned int) snap_count);
3108
3109out:
3110 kfree(reply_buf);
3111
3112 return 0;
3113}
3114
b8b1e2db
AE
3115static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3116{
3117 size_t size;
3118 void *reply_buf;
3119 __le64 snap_id;
3120 int ret;
3121 void *p;
3122 void *end;
b8b1e2db
AE
3123 char *snap_name;
3124
3125 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3126 reply_buf = kmalloc(size, GFP_KERNEL);
3127 if (!reply_buf)
3128 return ERR_PTR(-ENOMEM);
3129
3130 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3131 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
3132 "rbd", "get_snapshot_name",
3133 (char *) &snap_id, sizeof (snap_id),
07b2391f 3134 reply_buf, size, NULL);
b8b1e2db
AE
3135 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
3136 if (ret < 0)
3137 goto out;
3138
3139 p = reply_buf;
3140 end = (char *) reply_buf + size;
e5c35534 3141 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
b8b1e2db
AE
3142 if (IS_ERR(snap_name)) {
3143 ret = PTR_ERR(snap_name);
3144 goto out;
3145 } else {
3146 dout(" snap_id 0x%016llx snap_name = %s\n",
3147 (unsigned long long) le64_to_cpu(snap_id), snap_name);
3148 }
3149 kfree(reply_buf);
3150
3151 return snap_name;
3152out:
3153 kfree(reply_buf);
3154
3155 return ERR_PTR(ret);
3156}
3157
3158static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3159 u64 *snap_size, u64 *snap_features)
3160{
e0b49868 3161 u64 snap_id;
b8b1e2db
AE
3162 u8 order;
3163 int ret;
3164
3165 snap_id = rbd_dev->header.snapc->snaps[which];
3166 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3167 if (ret)
3168 return ERR_PTR(ret);
3169 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3170 if (ret)
3171 return ERR_PTR(ret);
3172
3173 return rbd_dev_v2_snap_name(rbd_dev, which);
3174}
3175
3176static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3177 u64 *snap_size, u64 *snap_features)
3178{
3179 if (rbd_dev->image_format == 1)
3180 return rbd_dev_v1_snap_info(rbd_dev, which,
3181 snap_size, snap_features);
3182 if (rbd_dev->image_format == 2)
3183 return rbd_dev_v2_snap_info(rbd_dev, which,
3184 snap_size, snap_features);
3185 return ERR_PTR(-EINVAL);
3186}
3187
117973fb
AE
3188static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3189{
3190 int ret;
3191 __u8 obj_order;
3192
3193 down_write(&rbd_dev->header_rwsem);
3194
3195 /* Grab old order first, to see if it changes */
3196
3197 obj_order = rbd_dev->header.obj_order,
3198 ret = rbd_dev_v2_image_size(rbd_dev);
3199 if (ret)
3200 goto out;
3201 if (rbd_dev->header.obj_order != obj_order) {
3202 ret = -EIO;
3203 goto out;
3204 }
3205 rbd_update_mapping_size(rbd_dev);
3206
3207 ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3208 dout("rbd_dev_v2_snap_context returned %d\n", ret);
3209 if (ret)
3210 goto out;
3211 ret = rbd_dev_snaps_update(rbd_dev);
3212 dout("rbd_dev_snaps_update returned %d\n", ret);
3213 if (ret)
3214 goto out;
3215 ret = rbd_dev_snaps_register(rbd_dev);
3216 dout("rbd_dev_snaps_register returned %d\n", ret);
3217out:
3218 up_write(&rbd_dev->header_rwsem);
3219
3220 return ret;
3221}
3222
dfc5606d 3223/*
35938150
AE
3224 * Scan the rbd device's current snapshot list and compare it to the
3225 * newly-received snapshot context. Remove any existing snapshots
3226 * not present in the new snapshot context. Add a new snapshot for
3227 * any snaphots in the snapshot context not in the current list.
3228 * And verify there are no changes to snapshots we already know
3229 * about.
3230 *
3231 * Assumes the snapshots in the snapshot context are sorted by
3232 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
3233 * are also maintained in that order.)
dfc5606d 3234 */
304f6808 3235static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
dfc5606d 3236{
35938150
AE
3237 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3238 const u32 snap_count = snapc->num_snaps;
35938150
AE
3239 struct list_head *head = &rbd_dev->snaps;
3240 struct list_head *links = head->next;
3241 u32 index = 0;
dfc5606d 3242
9fcbb800 3243 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
35938150
AE
3244 while (index < snap_count || links != head) {
3245 u64 snap_id;
3246 struct rbd_snap *snap;
cd892126
AE
3247 char *snap_name;
3248 u64 snap_size = 0;
3249 u64 snap_features = 0;
dfc5606d 3250
35938150
AE
3251 snap_id = index < snap_count ? snapc->snaps[index]
3252 : CEPH_NOSNAP;
3253 snap = links != head ? list_entry(links, struct rbd_snap, node)
3254 : NULL;
aafb230e 3255 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
dfc5606d 3256
35938150
AE
3257 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
3258 struct list_head *next = links->next;
dfc5606d 3259
35938150 3260 /* Existing snapshot not in the new snap context */
dfc5606d 3261
0d7dbfce 3262 if (rbd_dev->spec->snap_id == snap->id)
d78b650a 3263 atomic_set(&rbd_dev->exists, 0);
41f38c2b 3264 rbd_remove_snap_dev(snap);
9fcbb800 3265 dout("%ssnap id %llu has been removed\n",
0d7dbfce
AE
3266 rbd_dev->spec->snap_id == snap->id ?
3267 "mapped " : "",
9fcbb800 3268 (unsigned long long) snap->id);
35938150
AE
3269
3270 /* Done with this list entry; advance */
3271
3272 links = next;
dfc5606d
YS
3273 continue;
3274 }
35938150 3275
b8b1e2db
AE
3276 snap_name = rbd_dev_snap_info(rbd_dev, index,
3277 &snap_size, &snap_features);
cd892126
AE
3278 if (IS_ERR(snap_name))
3279 return PTR_ERR(snap_name);
3280
9fcbb800
AE
3281 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
3282 (unsigned long long) snap_id);
35938150
AE
3283 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
3284 struct rbd_snap *new_snap;
3285
3286 /* We haven't seen this snapshot before */
3287
c8d18425 3288 new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
cd892126 3289 snap_id, snap_size, snap_features);
9fcbb800
AE
3290 if (IS_ERR(new_snap)) {
3291 int err = PTR_ERR(new_snap);
3292
3293 dout(" failed to add dev, error %d\n", err);
3294
3295 return err;
3296 }
35938150
AE
3297
3298 /* New goes before existing, or at end of list */
3299
9fcbb800 3300 dout(" added dev%s\n", snap ? "" : " at end\n");
35938150
AE
3301 if (snap)
3302 list_add_tail(&new_snap->node, &snap->node);
3303 else
523f3258 3304 list_add_tail(&new_snap->node, head);
35938150
AE
3305 } else {
3306 /* Already have this one */
3307
9fcbb800
AE
3308 dout(" already present\n");
3309
cd892126 3310 rbd_assert(snap->size == snap_size);
aafb230e 3311 rbd_assert(!strcmp(snap->name, snap_name));
cd892126 3312 rbd_assert(snap->features == snap_features);
35938150
AE
3313
3314 /* Done with this list entry; advance */
3315
3316 links = links->next;
dfc5606d 3317 }
35938150
AE
3318
3319 /* Advance to the next entry in the snapshot context */
3320
3321 index++;
dfc5606d 3322 }
9fcbb800 3323 dout("%s: done\n", __func__);
dfc5606d
YS
3324
3325 return 0;
3326}
3327
304f6808
AE
3328/*
3329 * Scan the list of snapshots and register the devices for any that
3330 * have not already been registered.
3331 */
3332static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
3333{
3334 struct rbd_snap *snap;
3335 int ret = 0;
3336
3337 dout("%s called\n", __func__);
86ff77bb
AE
3338 if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
3339 return -EIO;
304f6808
AE
3340
3341 list_for_each_entry(snap, &rbd_dev->snaps, node) {
3342 if (!rbd_snap_registered(snap)) {
3343 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
3344 if (ret < 0)
3345 break;
3346 }
3347 }
3348 dout("%s: returning %d\n", __func__, ret);
3349
3350 return ret;
3351}
3352
dfc5606d
YS
3353static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
3354{
dfc5606d 3355 struct device *dev;
cd789ab9 3356 int ret;
dfc5606d
YS
3357
3358 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
dfc5606d 3359
cd789ab9 3360 dev = &rbd_dev->dev;
dfc5606d
YS
3361 dev->bus = &rbd_bus_type;
3362 dev->type = &rbd_device_type;
3363 dev->parent = &rbd_root_dev;
3364 dev->release = rbd_dev_release;
de71a297 3365 dev_set_name(dev, "%d", rbd_dev->dev_id);
dfc5606d 3366 ret = device_register(dev);
dfc5606d 3367
dfc5606d 3368 mutex_unlock(&ctl_mutex);
cd789ab9 3369
dfc5606d 3370 return ret;
602adf40
YS
3371}
3372
dfc5606d
YS
3373static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
3374{
3375 device_unregister(&rbd_dev->dev);
3376}
3377
e2839308 3378static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
1ddbe94e
AE
3379
3380/*
499afd5b
AE
3381 * Get a unique rbd identifier for the given new rbd_dev, and add
3382 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 3383 */
e2839308 3384static void rbd_dev_id_get(struct rbd_device *rbd_dev)
b7f23c36 3385{
e2839308 3386 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
499afd5b
AE
3387
3388 spin_lock(&rbd_dev_list_lock);
3389 list_add_tail(&rbd_dev->node, &rbd_dev_list);
3390 spin_unlock(&rbd_dev_list_lock);
e2839308
AE
3391 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3392 (unsigned long long) rbd_dev->dev_id);
1ddbe94e 3393}
b7f23c36 3394
1ddbe94e 3395/*
499afd5b
AE
3396 * Remove an rbd_dev from the global list, and record that its
3397 * identifier is no longer in use.
1ddbe94e 3398 */
e2839308 3399static void rbd_dev_id_put(struct rbd_device *rbd_dev)
1ddbe94e 3400{
d184f6bf 3401 struct list_head *tmp;
de71a297 3402 int rbd_id = rbd_dev->dev_id;
d184f6bf
AE
3403 int max_id;
3404
aafb230e 3405 rbd_assert(rbd_id > 0);
499afd5b 3406
e2839308
AE
3407 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3408 (unsigned long long) rbd_dev->dev_id);
499afd5b
AE
3409 spin_lock(&rbd_dev_list_lock);
3410 list_del_init(&rbd_dev->node);
d184f6bf
AE
3411
3412 /*
3413 * If the id being "put" is not the current maximum, there
3414 * is nothing special we need to do.
3415 */
e2839308 3416 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
d184f6bf
AE
3417 spin_unlock(&rbd_dev_list_lock);
3418 return;
3419 }
3420
3421 /*
3422 * We need to update the current maximum id. Search the
3423 * list to find out what it is. We're more likely to find
3424 * the maximum at the end, so search the list backward.
3425 */
3426 max_id = 0;
3427 list_for_each_prev(tmp, &rbd_dev_list) {
3428 struct rbd_device *rbd_dev;
3429
3430 rbd_dev = list_entry(tmp, struct rbd_device, node);
b213e0b1
AE
3431 if (rbd_dev->dev_id > max_id)
3432 max_id = rbd_dev->dev_id;
d184f6bf 3433 }
499afd5b 3434 spin_unlock(&rbd_dev_list_lock);
b7f23c36 3435
1ddbe94e 3436 /*
e2839308 3437 * The max id could have been updated by rbd_dev_id_get(), in
d184f6bf
AE
3438 * which case it now accurately reflects the new maximum.
3439 * Be careful not to overwrite the maximum value in that
3440 * case.
1ddbe94e 3441 */
e2839308
AE
3442 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3443 dout(" max dev id has been reset\n");
b7f23c36
AE
3444}
3445
e28fff26
AE
3446/*
3447 * Skips over white space at *buf, and updates *buf to point to the
3448 * first found non-space character (if any). Returns the length of
593a9e7b
AE
3449 * the token (string of non-white space characters) found. Note
3450 * that *buf must be terminated with '\0'.
e28fff26
AE
3451 */
3452static inline size_t next_token(const char **buf)
3453{
3454 /*
3455 * These are the characters that produce nonzero for
3456 * isspace() in the "C" and "POSIX" locales.
3457 */
3458 const char *spaces = " \f\n\r\t\v";
3459
3460 *buf += strspn(*buf, spaces); /* Find start of token */
3461
3462 return strcspn(*buf, spaces); /* Return token length */
3463}
3464
3465/*
3466 * Finds the next token in *buf, and if the provided token buffer is
3467 * big enough, copies the found token into it. The result, if
593a9e7b
AE
3468 * copied, is guaranteed to be terminated with '\0'. Note that *buf
3469 * must be terminated with '\0' on entry.
e28fff26
AE
3470 *
3471 * Returns the length of the token found (not including the '\0').
3472 * Return value will be 0 if no token is found, and it will be >=
3473 * token_size if the token would not fit.
3474 *
593a9e7b 3475 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
3476 * found token. Note that this occurs even if the token buffer is
3477 * too small to hold it.
3478 */
3479static inline size_t copy_token(const char **buf,
3480 char *token,
3481 size_t token_size)
3482{
3483 size_t len;
3484
3485 len = next_token(buf);
3486 if (len < token_size) {
3487 memcpy(token, *buf, len);
3488 *(token + len) = '\0';
3489 }
3490 *buf += len;
3491
3492 return len;
3493}
3494
ea3352f4
AE
3495/*
3496 * Finds the next token in *buf, dynamically allocates a buffer big
3497 * enough to hold a copy of it, and copies the token into the new
3498 * buffer. The copy is guaranteed to be terminated with '\0'. Note
3499 * that a duplicate buffer is created even for a zero-length token.
3500 *
3501 * Returns a pointer to the newly-allocated duplicate, or a null
3502 * pointer if memory for the duplicate was not available. If
3503 * the lenp argument is a non-null pointer, the length of the token
3504 * (not including the '\0') is returned in *lenp.
3505 *
3506 * If successful, the *buf pointer will be updated to point beyond
3507 * the end of the found token.
3508 *
3509 * Note: uses GFP_KERNEL for allocation.
3510 */
3511static inline char *dup_token(const char **buf, size_t *lenp)
3512{
3513 char *dup;
3514 size_t len;
3515
3516 len = next_token(buf);
4caf35f9 3517 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
3518 if (!dup)
3519 return NULL;
ea3352f4
AE
3520 *(dup + len) = '\0';
3521 *buf += len;
3522
3523 if (lenp)
3524 *lenp = len;
3525
3526 return dup;
3527}
3528
a725f65e 3529/*
859c31df
AE
3530 * Parse the options provided for an "rbd add" (i.e., rbd image
3531 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
3532 * and the data written is passed here via a NUL-terminated buffer.
3533 * Returns 0 if successful or an error code otherwise.
d22f76e7 3534 *
859c31df
AE
3535 * The information extracted from these options is recorded in
3536 * the other parameters which return dynamically-allocated
3537 * structures:
3538 * ceph_opts
3539 * The address of a pointer that will refer to a ceph options
3540 * structure. Caller must release the returned pointer using
3541 * ceph_destroy_options() when it is no longer needed.
3542 * rbd_opts
3543 * Address of an rbd options pointer. Fully initialized by
3544 * this function; caller must release with kfree().
3545 * spec
3546 * Address of an rbd image specification pointer. Fully
3547 * initialized by this function based on parsed options.
3548 * Caller must release with rbd_spec_put().
3549 *
3550 * The options passed take this form:
3551 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3552 * where:
3553 * <mon_addrs>
3554 * A comma-separated list of one or more monitor addresses.
3555 * A monitor address is an ip address, optionally followed
3556 * by a port number (separated by a colon).
3557 * I.e.: ip1[:port1][,ip2[:port2]...]
3558 * <options>
3559 * A comma-separated list of ceph and/or rbd options.
3560 * <pool_name>
3561 * The name of the rados pool containing the rbd image.
3562 * <image_name>
3563 * The name of the image in that pool to map.
3564 * <snap_id>
3565 * An optional snapshot id. If provided, the mapping will
3566 * present data from the image at the time that snapshot was
3567 * created. The image head is used if no snapshot id is
3568 * provided. Snapshot mappings are always read-only.
a725f65e 3569 */
859c31df 3570static int rbd_add_parse_args(const char *buf,
dc79b113 3571 struct ceph_options **ceph_opts,
859c31df
AE
3572 struct rbd_options **opts,
3573 struct rbd_spec **rbd_spec)
e28fff26 3574{
d22f76e7 3575 size_t len;
859c31df 3576 char *options;
0ddebc0c
AE
3577 const char *mon_addrs;
3578 size_t mon_addrs_size;
859c31df 3579 struct rbd_spec *spec = NULL;
4e9afeba 3580 struct rbd_options *rbd_opts = NULL;
859c31df 3581 struct ceph_options *copts;
dc79b113 3582 int ret;
e28fff26
AE
3583
3584 /* The first four tokens are required */
3585
7ef3214a 3586 len = next_token(&buf);
4fb5d671
AE
3587 if (!len) {
3588 rbd_warn(NULL, "no monitor address(es) provided");
3589 return -EINVAL;
3590 }
0ddebc0c 3591 mon_addrs = buf;
f28e565a 3592 mon_addrs_size = len + 1;
7ef3214a 3593 buf += len;
a725f65e 3594
dc79b113 3595 ret = -EINVAL;
f28e565a
AE
3596 options = dup_token(&buf, NULL);
3597 if (!options)
dc79b113 3598 return -ENOMEM;
4fb5d671
AE
3599 if (!*options) {
3600 rbd_warn(NULL, "no options provided");
3601 goto out_err;
3602 }
e28fff26 3603
859c31df
AE
3604 spec = rbd_spec_alloc();
3605 if (!spec)
f28e565a 3606 goto out_mem;
859c31df
AE
3607
3608 spec->pool_name = dup_token(&buf, NULL);
3609 if (!spec->pool_name)
3610 goto out_mem;
4fb5d671
AE
3611 if (!*spec->pool_name) {
3612 rbd_warn(NULL, "no pool name provided");
3613 goto out_err;
3614 }
e28fff26 3615
69e7a02f 3616 spec->image_name = dup_token(&buf, NULL);
859c31df 3617 if (!spec->image_name)
f28e565a 3618 goto out_mem;
4fb5d671
AE
3619 if (!*spec->image_name) {
3620 rbd_warn(NULL, "no image name provided");
3621 goto out_err;
3622 }
d4b125e9 3623
f28e565a
AE
3624 /*
3625 * Snapshot name is optional; default is to use "-"
3626 * (indicating the head/no snapshot).
3627 */
3feeb894 3628 len = next_token(&buf);
820a5f3e 3629 if (!len) {
3feeb894
AE
3630 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3631 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 3632 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 3633 ret = -ENAMETOOLONG;
f28e565a 3634 goto out_err;
849b4260 3635 }
4caf35f9 3636 spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
859c31df 3637 if (!spec->snap_name)
f28e565a 3638 goto out_mem;
859c31df 3639 *(spec->snap_name + len) = '\0';
e5c35534 3640
0ddebc0c 3641 /* Initialize all rbd options to the defaults */
e28fff26 3642
4e9afeba
AE
3643 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3644 if (!rbd_opts)
3645 goto out_mem;
3646
3647 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
d22f76e7 3648
859c31df 3649 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 3650 mon_addrs + mon_addrs_size - 1,
4e9afeba 3651 parse_rbd_opts_token, rbd_opts);
859c31df
AE
3652 if (IS_ERR(copts)) {
3653 ret = PTR_ERR(copts);
dc79b113
AE
3654 goto out_err;
3655 }
859c31df
AE
3656 kfree(options);
3657
3658 *ceph_opts = copts;
4e9afeba 3659 *opts = rbd_opts;
859c31df 3660 *rbd_spec = spec;
0ddebc0c 3661
dc79b113 3662 return 0;
f28e565a 3663out_mem:
dc79b113 3664 ret = -ENOMEM;
d22f76e7 3665out_err:
859c31df
AE
3666 kfree(rbd_opts);
3667 rbd_spec_put(spec);
f28e565a 3668 kfree(options);
d22f76e7 3669
dc79b113 3670 return ret;
a725f65e
AE
3671}
3672
589d30e0
AE
3673/*
3674 * An rbd format 2 image has a unique identifier, distinct from the
3675 * name given to it by the user. Internally, that identifier is
3676 * what's used to specify the names of objects related to the image.
3677 *
3678 * A special "rbd id" object is used to map an rbd image name to its
3679 * id. If that object doesn't exist, then there is no v2 rbd image
3680 * with the supplied name.
3681 *
3682 * This function will record the given rbd_dev's image_id field if
3683 * it can be determined, and in that case will return 0. If any
3684 * errors occur a negative errno will be returned and the rbd_dev's
3685 * image_id field will be unchanged (and should be NULL).
3686 */
3687static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3688{
3689 int ret;
3690 size_t size;
3691 char *object_name;
3692 void *response;
3693 void *p;
3694
2c0d0a10
AE
3695 /*
3696 * When probing a parent image, the image id is already
3697 * known (and the image name likely is not). There's no
3698 * need to fetch the image id again in this case.
3699 */
3700 if (rbd_dev->spec->image_id)
3701 return 0;
3702
589d30e0
AE
3703 /*
3704 * First, see if the format 2 image id file exists, and if
3705 * so, get the image's persistent id from it.
3706 */
69e7a02f 3707 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
589d30e0
AE
3708 object_name = kmalloc(size, GFP_NOIO);
3709 if (!object_name)
3710 return -ENOMEM;
0d7dbfce 3711 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
589d30e0
AE
3712 dout("rbd id object name is %s\n", object_name);
3713
3714 /* Response will be an encoded string, which includes a length */
3715
3716 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3717 response = kzalloc(size, GFP_NOIO);
3718 if (!response) {
3719 ret = -ENOMEM;
3720 goto out;
3721 }
3722
3723 ret = rbd_req_sync_exec(rbd_dev, object_name,
3724 "rbd", "get_id",
3725 NULL, 0,
07b2391f 3726 response, RBD_IMAGE_ID_LEN_MAX, NULL);
589d30e0
AE
3727 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
3728 if (ret < 0)
3729 goto out;
a0ea3a40 3730 ret = 0; /* rbd_req_sync_exec() can return positive */
589d30e0
AE
3731
3732 p = response;
0d7dbfce 3733 rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
589d30e0 3734 p + RBD_IMAGE_ID_LEN_MAX,
979ed480 3735 NULL, GFP_NOIO);
0d7dbfce
AE
3736 if (IS_ERR(rbd_dev->spec->image_id)) {
3737 ret = PTR_ERR(rbd_dev->spec->image_id);
3738 rbd_dev->spec->image_id = NULL;
589d30e0 3739 } else {
0d7dbfce 3740 dout("image_id is %s\n", rbd_dev->spec->image_id);
589d30e0
AE
3741 }
3742out:
3743 kfree(response);
3744 kfree(object_name);
3745
3746 return ret;
3747}
3748
a30b71b9
AE
3749static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3750{
3751 int ret;
3752 size_t size;
3753
3754 /* Version 1 images have no id; empty string is used */
3755
0d7dbfce
AE
3756 rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3757 if (!rbd_dev->spec->image_id)
a30b71b9 3758 return -ENOMEM;
a30b71b9
AE
3759
3760 /* Record the header object name for this rbd image. */
3761
69e7a02f 3762 size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
a30b71b9
AE
3763 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3764 if (!rbd_dev->header_name) {
3765 ret = -ENOMEM;
3766 goto out_err;
3767 }
0d7dbfce
AE
3768 sprintf(rbd_dev->header_name, "%s%s",
3769 rbd_dev->spec->image_name, RBD_SUFFIX);
a30b71b9
AE
3770
3771 /* Populate rbd image metadata */
3772
3773 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3774 if (ret < 0)
3775 goto out_err;
86b00e0d
AE
3776
3777 /* Version 1 images have no parent (no layering) */
3778
3779 rbd_dev->parent_spec = NULL;
3780 rbd_dev->parent_overlap = 0;
3781
a30b71b9
AE
3782 rbd_dev->image_format = 1;
3783
3784 dout("discovered version 1 image, header name is %s\n",
3785 rbd_dev->header_name);
3786
3787 return 0;
3788
3789out_err:
3790 kfree(rbd_dev->header_name);
3791 rbd_dev->header_name = NULL;
0d7dbfce
AE
3792 kfree(rbd_dev->spec->image_id);
3793 rbd_dev->spec->image_id = NULL;
a30b71b9
AE
3794
3795 return ret;
3796}
3797
3798static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3799{
3800 size_t size;
9d475de5 3801 int ret;
6e14b1a6 3802 u64 ver = 0;
a30b71b9
AE
3803
3804 /*
3805 * Image id was filled in by the caller. Record the header
3806 * object name for this rbd image.
3807 */
979ed480 3808 size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
a30b71b9
AE
3809 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3810 if (!rbd_dev->header_name)
3811 return -ENOMEM;
3812 sprintf(rbd_dev->header_name, "%s%s",
0d7dbfce 3813 RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
9d475de5
AE
3814
3815 /* Get the size and object order for the image */
3816
3817 ret = rbd_dev_v2_image_size(rbd_dev);
1e130199
AE
3818 if (ret < 0)
3819 goto out_err;
3820
3821 /* Get the object prefix (a.k.a. block_name) for the image */
3822
3823 ret = rbd_dev_v2_object_prefix(rbd_dev);
b1b5402a
AE
3824 if (ret < 0)
3825 goto out_err;
3826
d889140c 3827 /* Get the and check features for the image */
b1b5402a
AE
3828
3829 ret = rbd_dev_v2_features(rbd_dev);
9d475de5
AE
3830 if (ret < 0)
3831 goto out_err;
35d489f9 3832
86b00e0d
AE
3833 /* If the image supports layering, get the parent info */
3834
3835 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3836 ret = rbd_dev_v2_parent_info(rbd_dev);
3837 if (ret < 0)
3838 goto out_err;
3839 }
3840
6e14b1a6
AE
3841 /* crypto and compression type aren't (yet) supported for v2 images */
3842
3843 rbd_dev->header.crypt_type = 0;
3844 rbd_dev->header.comp_type = 0;
35d489f9 3845
6e14b1a6
AE
3846 /* Get the snapshot context, plus the header version */
3847
3848 ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
35d489f9
AE
3849 if (ret)
3850 goto out_err;
6e14b1a6
AE
3851 rbd_dev->header.obj_version = ver;
3852
a30b71b9
AE
3853 rbd_dev->image_format = 2;
3854
3855 dout("discovered version 2 image, header name is %s\n",
3856 rbd_dev->header_name);
3857
35152979 3858 return 0;
9d475de5 3859out_err:
86b00e0d
AE
3860 rbd_dev->parent_overlap = 0;
3861 rbd_spec_put(rbd_dev->parent_spec);
3862 rbd_dev->parent_spec = NULL;
9d475de5
AE
3863 kfree(rbd_dev->header_name);
3864 rbd_dev->header_name = NULL;
1e130199
AE
3865 kfree(rbd_dev->header.object_prefix);
3866 rbd_dev->header.object_prefix = NULL;
9d475de5
AE
3867
3868 return ret;
a30b71b9
AE
3869}
3870
83a06263
AE
3871static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
3872{
3873 int ret;
3874
3875 /* no need to lock here, as rbd_dev is not registered yet */
3876 ret = rbd_dev_snaps_update(rbd_dev);
3877 if (ret)
3878 return ret;
3879
9e15b77d
AE
3880 ret = rbd_dev_probe_update_spec(rbd_dev);
3881 if (ret)
3882 goto err_out_snaps;
3883
83a06263
AE
3884 ret = rbd_dev_set_mapping(rbd_dev);
3885 if (ret)
3886 goto err_out_snaps;
3887
3888 /* generate unique id: find highest unique id, add one */
3889 rbd_dev_id_get(rbd_dev);
3890
3891 /* Fill in the device name, now that we have its id. */
3892 BUILD_BUG_ON(DEV_NAME_LEN
3893 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3894 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3895
3896 /* Get our block major device number. */
3897
3898 ret = register_blkdev(0, rbd_dev->name);
3899 if (ret < 0)
3900 goto err_out_id;
3901 rbd_dev->major = ret;
3902
3903 /* Set up the blkdev mapping. */
3904
3905 ret = rbd_init_disk(rbd_dev);
3906 if (ret)
3907 goto err_out_blkdev;
3908
3909 ret = rbd_bus_add_dev(rbd_dev);
3910 if (ret)
3911 goto err_out_disk;
3912
3913 /*
3914 * At this point cleanup in the event of an error is the job
3915 * of the sysfs code (initiated by rbd_bus_del_dev()).
3916 */
3917 down_write(&rbd_dev->header_rwsem);
3918 ret = rbd_dev_snaps_register(rbd_dev);
3919 up_write(&rbd_dev->header_rwsem);
3920 if (ret)
3921 goto err_out_bus;
3922
9969ebc5 3923 ret = rbd_dev_header_watch_sync(rbd_dev, 1);
83a06263
AE
3924 if (ret)
3925 goto err_out_bus;
3926
3927 /* Everything's ready. Announce the disk to the world. */
3928
3929 add_disk(rbd_dev->disk);
3930
3931 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3932 (unsigned long long) rbd_dev->mapping.size);
3933
3934 return ret;
3935err_out_bus:
3936 /* this will also clean up rest of rbd_dev stuff */
3937
3938 rbd_bus_del_dev(rbd_dev);
3939
3940 return ret;
3941err_out_disk:
3942 rbd_free_disk(rbd_dev);
3943err_out_blkdev:
3944 unregister_blkdev(rbd_dev->major, rbd_dev->name);
3945err_out_id:
3946 rbd_dev_id_put(rbd_dev);
3947err_out_snaps:
3948 rbd_remove_all_snaps(rbd_dev);
3949
3950 return ret;
3951}
3952
a30b71b9
AE
3953/*
3954 * Probe for the existence of the header object for the given rbd
3955 * device. For format 2 images this includes determining the image
3956 * id.
3957 */
3958static int rbd_dev_probe(struct rbd_device *rbd_dev)
3959{
3960 int ret;
3961
3962 /*
3963 * Get the id from the image id object. If it's not a
3964 * format 2 image, we'll get ENOENT back, and we'll assume
3965 * it's a format 1 image.
3966 */
3967 ret = rbd_dev_image_id(rbd_dev);
3968 if (ret)
3969 ret = rbd_dev_v1_probe(rbd_dev);
3970 else
3971 ret = rbd_dev_v2_probe(rbd_dev);
83a06263 3972 if (ret) {
a30b71b9
AE
3973 dout("probe failed, returning %d\n", ret);
3974
83a06263
AE
3975 return ret;
3976 }
3977
3978 ret = rbd_dev_probe_finish(rbd_dev);
3979 if (ret)
3980 rbd_header_free(&rbd_dev->header);
3981
a30b71b9
AE
3982 return ret;
3983}
3984
59c2be1e
YS
3985static ssize_t rbd_add(struct bus_type *bus,
3986 const char *buf,
3987 size_t count)
602adf40 3988{
cb8627c7 3989 struct rbd_device *rbd_dev = NULL;
dc79b113 3990 struct ceph_options *ceph_opts = NULL;
4e9afeba 3991 struct rbd_options *rbd_opts = NULL;
859c31df 3992 struct rbd_spec *spec = NULL;
9d3997fd 3993 struct rbd_client *rbdc;
27cc2594
AE
3994 struct ceph_osd_client *osdc;
3995 int rc = -ENOMEM;
602adf40
YS
3996
3997 if (!try_module_get(THIS_MODULE))
3998 return -ENODEV;
3999
602adf40 4000 /* parse add command */
859c31df 4001 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 4002 if (rc < 0)
bd4ba655 4003 goto err_out_module;
78cea76e 4004
9d3997fd
AE
4005 rbdc = rbd_get_client(ceph_opts);
4006 if (IS_ERR(rbdc)) {
4007 rc = PTR_ERR(rbdc);
0ddebc0c 4008 goto err_out_args;
9d3997fd 4009 }
c53d5893 4010 ceph_opts = NULL; /* rbd_dev client now owns this */
602adf40 4011
602adf40 4012 /* pick the pool */
9d3997fd 4013 osdc = &rbdc->client->osdc;
859c31df 4014 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
602adf40
YS
4015 if (rc < 0)
4016 goto err_out_client;
859c31df
AE
4017 spec->pool_id = (u64) rc;
4018
0903e875
AE
4019 /* The ceph file layout needs to fit pool id in 32 bits */
4020
4021 if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4022 rc = -EIO;
4023 goto err_out_client;
4024 }
4025
c53d5893 4026 rbd_dev = rbd_dev_create(rbdc, spec);
bd4ba655
AE
4027 if (!rbd_dev)
4028 goto err_out_client;
c53d5893
AE
4029 rbdc = NULL; /* rbd_dev now owns this */
4030 spec = NULL; /* rbd_dev now owns this */
602adf40 4031
bd4ba655 4032 rbd_dev->mapping.read_only = rbd_opts->read_only;
c53d5893
AE
4033 kfree(rbd_opts);
4034 rbd_opts = NULL; /* done with this */
bd4ba655 4035
a30b71b9
AE
4036 rc = rbd_dev_probe(rbd_dev);
4037 if (rc < 0)
c53d5893 4038 goto err_out_rbd_dev;
05fd6f6f 4039
602adf40 4040 return count;
c53d5893
AE
4041err_out_rbd_dev:
4042 rbd_dev_destroy(rbd_dev);
bd4ba655 4043err_out_client:
9d3997fd 4044 rbd_put_client(rbdc);
0ddebc0c 4045err_out_args:
78cea76e
AE
4046 if (ceph_opts)
4047 ceph_destroy_options(ceph_opts);
4e9afeba 4048 kfree(rbd_opts);
859c31df 4049 rbd_spec_put(spec);
bd4ba655
AE
4050err_out_module:
4051 module_put(THIS_MODULE);
27cc2594 4052
602adf40 4053 dout("Error adding device %s\n", buf);
27cc2594
AE
4054
4055 return (ssize_t) rc;
602adf40
YS
4056}
4057
de71a297 4058static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
602adf40
YS
4059{
4060 struct list_head *tmp;
4061 struct rbd_device *rbd_dev;
4062
e124a82f 4063 spin_lock(&rbd_dev_list_lock);
602adf40
YS
4064 list_for_each(tmp, &rbd_dev_list) {
4065 rbd_dev = list_entry(tmp, struct rbd_device, node);
de71a297 4066 if (rbd_dev->dev_id == dev_id) {
e124a82f 4067 spin_unlock(&rbd_dev_list_lock);
602adf40 4068 return rbd_dev;
e124a82f 4069 }
602adf40 4070 }
e124a82f 4071 spin_unlock(&rbd_dev_list_lock);
602adf40
YS
4072 return NULL;
4073}
4074
dfc5606d 4075static void rbd_dev_release(struct device *dev)
602adf40 4076{
593a9e7b 4077 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 4078
1dbb4399
AE
4079 if (rbd_dev->watch_request) {
4080 struct ceph_client *client = rbd_dev->rbd_client->client;
4081
4082 ceph_osdc_unregister_linger_request(&client->osdc,
59c2be1e 4083 rbd_dev->watch_request);
1dbb4399 4084 }
59c2be1e 4085 if (rbd_dev->watch_event)
9969ebc5 4086 rbd_dev_header_watch_sync(rbd_dev, 0);
602adf40
YS
4087
4088 /* clean up and free blkdev */
4089 rbd_free_disk(rbd_dev);
4090 unregister_blkdev(rbd_dev->major, rbd_dev->name);
32eec68d 4091
2ac4e75d
AE
4092 /* release allocated disk header fields */
4093 rbd_header_free(&rbd_dev->header);
4094
32eec68d 4095 /* done with the id, and with the rbd_dev */
e2839308 4096 rbd_dev_id_put(rbd_dev);
c53d5893
AE
4097 rbd_assert(rbd_dev->rbd_client != NULL);
4098 rbd_dev_destroy(rbd_dev);
602adf40
YS
4099
4100 /* release module ref */
4101 module_put(THIS_MODULE);
602adf40
YS
4102}
4103
dfc5606d
YS
4104static ssize_t rbd_remove(struct bus_type *bus,
4105 const char *buf,
4106 size_t count)
602adf40
YS
4107{
4108 struct rbd_device *rbd_dev = NULL;
4109 int target_id, rc;
4110 unsigned long ul;
4111 int ret = count;
4112
4113 rc = strict_strtoul(buf, 10, &ul);
4114 if (rc)
4115 return rc;
4116
4117 /* convert to int; abort if we lost anything in the conversion */
4118 target_id = (int) ul;
4119 if (target_id != ul)
4120 return -EINVAL;
4121
4122 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4123
4124 rbd_dev = __rbd_get_dev(target_id);
4125 if (!rbd_dev) {
4126 ret = -ENOENT;
4127 goto done;
42382b70
AE
4128 }
4129
4130 if (rbd_dev->open_count) {
4131 ret = -EBUSY;
4132 goto done;
602adf40
YS
4133 }
4134
41f38c2b 4135 rbd_remove_all_snaps(rbd_dev);
dfc5606d 4136 rbd_bus_del_dev(rbd_dev);
602adf40
YS
4137
4138done:
4139 mutex_unlock(&ctl_mutex);
aafb230e 4140
602adf40
YS
4141 return ret;
4142}
4143
602adf40
YS
4144/*
4145 * create control files in sysfs
dfc5606d 4146 * /sys/bus/rbd/...
602adf40
YS
4147 */
4148static int rbd_sysfs_init(void)
4149{
dfc5606d 4150 int ret;
602adf40 4151
fed4c143 4152 ret = device_register(&rbd_root_dev);
21079786 4153 if (ret < 0)
dfc5606d 4154 return ret;
602adf40 4155
fed4c143
AE
4156 ret = bus_register(&rbd_bus_type);
4157 if (ret < 0)
4158 device_unregister(&rbd_root_dev);
602adf40 4159
602adf40
YS
4160 return ret;
4161}
4162
4163static void rbd_sysfs_cleanup(void)
4164{
dfc5606d 4165 bus_unregister(&rbd_bus_type);
fed4c143 4166 device_unregister(&rbd_root_dev);
602adf40
YS
4167}
4168
4169int __init rbd_init(void)
4170{
4171 int rc;
4172
4173 rc = rbd_sysfs_init();
4174 if (rc)
4175 return rc;
f0f8cef5 4176 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
602adf40
YS
4177 return 0;
4178}
4179
4180void __exit rbd_exit(void)
4181{
4182 rbd_sysfs_cleanup();
4183}
4184
4185module_init(rbd_init);
4186module_exit(rbd_exit);
4187
4188MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4189MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4190MODULE_DESCRIPTION("rados block device");
4191
4192/* following authorship retained from original osdblk.c */
4193MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4194
4195MODULE_LICENSE("GPL");
This page took 0.366461 seconds and 5 git commands to generate.