2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
44 #define RBD_DEBUG /* Activate rbd_assert() calls */
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
52 #define SECTOR_SHIFT 9
53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
55 /* It might be useful to have these defined elsewhere */
57 #define U8_MAX ((u8) (~0U))
58 #define U16_MAX ((u16) (~0U))
59 #define U32_MAX ((u32) (~0U))
60 #define U64_MAX ((u64) (~0ULL))
62 #define RBD_DRV_NAME "rbd"
63 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
65 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
67 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
68 #define RBD_MAX_SNAP_NAME_LEN \
69 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
71 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
73 #define RBD_SNAP_HEAD_NAME "-"
75 /* This allows a single page to hold an image name sent by OSD */
76 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
77 #define RBD_IMAGE_ID_LEN_MAX 64
79 #define RBD_OBJ_PREFIX_LEN_MAX 64
83 #define RBD_FEATURE_LAYERING 1
85 /* Features supported by this (client software) implementation. */
87 #define RBD_FEATURES_ALL (0)
90 * An RBD device name will be "rbd#", where the "rbd" comes from
91 * RBD_DRV_NAME above, and # is a unique integer identifier.
92 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
93 * enough to hold all possible device names.
95 #define DEV_NAME_LEN 32
96 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
99 * block device image metadata (in-memory version)
101 struct rbd_image_header
{
102 /* These four fields never change for a given rbd image */
109 /* The remaining fields need to be updated occasionally */
111 struct ceph_snap_context
*snapc
;
119 * An rbd image specification.
121 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
122 * identify an image. Each rbd_dev structure includes a pointer to
123 * an rbd_spec structure that encapsulates this identity.
125 * Each of the id's in an rbd_spec has an associated name. For a
126 * user-mapped image, the names are supplied and the id's associated
127 * with them are looked up. For a layered image, a parent image is
128 * defined by the tuple, and the names are looked up.
130 * An rbd_dev structure contains a parent_spec pointer which is
131 * non-null if the image it represents is a child in a layered
132 * image. This pointer will refer to the rbd_spec structure used
133 * by the parent rbd_dev for its own identity (i.e., the structure
134 * is shared between the parent and child).
136 * Since these structures are populated once, during the discovery
137 * phase of image construction, they are effectively immutable so
138 * we make no effort to synchronize access to them.
140 * Note that code herein does not assume the image name is known (it
141 * could be a null pointer).
157 * an instance of the client. multiple devices may share an rbd client.
160 struct ceph_client
*client
;
162 struct list_head node
;
165 struct rbd_img_request
;
166 typedef void (*rbd_img_callback_t
)(struct rbd_img_request
*);
168 #define BAD_WHICH U32_MAX /* Good which or bad which, which? */
170 struct rbd_obj_request
;
171 typedef void (*rbd_obj_callback_t
)(struct rbd_obj_request
*);
173 enum obj_request_type
{
174 OBJ_REQUEST_NODATA
, OBJ_REQUEST_BIO
, OBJ_REQUEST_PAGES
177 struct rbd_obj_request
{
178 const char *object_name
;
179 u64 offset
; /* object start byte */
180 u64 length
; /* bytes from offset */
182 struct rbd_img_request
*img_request
;
183 struct list_head links
; /* img_request->obj_requests */
184 u32 which
; /* posn image request list */
186 enum obj_request_type type
;
188 struct bio
*bio_list
;
195 struct ceph_osd_request
*osd_req
;
197 u64 xferred
; /* bytes transferred */
202 rbd_obj_callback_t callback
;
203 struct completion completion
;
208 struct rbd_img_request
{
210 struct rbd_device
*rbd_dev
;
211 u64 offset
; /* starting image byte offset */
212 u64 length
; /* byte count from offset */
213 bool write_request
; /* false for read */
215 struct ceph_snap_context
*snapc
; /* for writes */
216 u64 snap_id
; /* for reads */
218 spinlock_t completion_lock
;/* protects next_completion */
220 rbd_img_callback_t callback
;
222 u32 obj_request_count
;
223 struct list_head obj_requests
; /* rbd_obj_request structs */
228 #define for_each_obj_request(ireq, oreq) \
229 list_for_each_entry(oreq, &ireq->obj_requests, links)
230 #define for_each_obj_request_from(ireq, oreq) \
231 list_for_each_entry_from(oreq, &ireq->obj_requests, links)
232 #define for_each_obj_request_safe(ireq, oreq, n) \
233 list_for_each_entry_safe_reverse(oreq, n, &ireq->obj_requests, links)
239 struct list_head node
;
254 int dev_id
; /* blkdev unique id */
256 int major
; /* blkdev assigned major */
257 struct gendisk
*disk
; /* blkdev's gendisk and rq */
259 u32 image_format
; /* Either 1 or 2 */
260 struct rbd_client
*rbd_client
;
262 char name
[DEV_NAME_LEN
]; /* blkdev name, e.g. rbd3 */
264 spinlock_t lock
; /* queue lock */
266 struct rbd_image_header header
;
268 struct rbd_spec
*spec
;
272 struct ceph_file_layout layout
;
274 struct ceph_osd_event
*watch_event
;
275 struct ceph_osd_request
*watch_request
;
277 struct rbd_spec
*parent_spec
;
280 /* protects updating the header */
281 struct rw_semaphore header_rwsem
;
283 struct rbd_mapping mapping
;
285 struct list_head node
;
287 /* list of snapshots */
288 struct list_head snaps
;
292 unsigned long open_count
;
295 static DEFINE_MUTEX(ctl_mutex
); /* Serialize open/close/setup/teardown */
297 static LIST_HEAD(rbd_dev_list
); /* devices */
298 static DEFINE_SPINLOCK(rbd_dev_list_lock
);
300 static LIST_HEAD(rbd_client_list
); /* clients */
301 static DEFINE_SPINLOCK(rbd_client_list_lock
);
303 static int rbd_dev_snaps_update(struct rbd_device
*rbd_dev
);
304 static int rbd_dev_snaps_register(struct rbd_device
*rbd_dev
);
306 static void rbd_dev_release(struct device
*dev
);
307 static void rbd_remove_snap_dev(struct rbd_snap
*snap
);
309 static ssize_t
rbd_add(struct bus_type
*bus
, const char *buf
,
311 static ssize_t
rbd_remove(struct bus_type
*bus
, const char *buf
,
314 static struct bus_attribute rbd_bus_attrs
[] = {
315 __ATTR(add
, S_IWUSR
, NULL
, rbd_add
),
316 __ATTR(remove
, S_IWUSR
, NULL
, rbd_remove
),
320 static struct bus_type rbd_bus_type
= {
322 .bus_attrs
= rbd_bus_attrs
,
325 static void rbd_root_dev_release(struct device
*dev
)
329 static struct device rbd_root_dev
= {
331 .release
= rbd_root_dev_release
,
334 static __printf(2, 3)
335 void rbd_warn(struct rbd_device
*rbd_dev
, const char *fmt
, ...)
337 struct va_format vaf
;
345 printk(KERN_WARNING
"%s: %pV\n", RBD_DRV_NAME
, &vaf
);
346 else if (rbd_dev
->disk
)
347 printk(KERN_WARNING
"%s: %s: %pV\n",
348 RBD_DRV_NAME
, rbd_dev
->disk
->disk_name
, &vaf
);
349 else if (rbd_dev
->spec
&& rbd_dev
->spec
->image_name
)
350 printk(KERN_WARNING
"%s: image %s: %pV\n",
351 RBD_DRV_NAME
, rbd_dev
->spec
->image_name
, &vaf
);
352 else if (rbd_dev
->spec
&& rbd_dev
->spec
->image_id
)
353 printk(KERN_WARNING
"%s: id %s: %pV\n",
354 RBD_DRV_NAME
, rbd_dev
->spec
->image_id
, &vaf
);
356 printk(KERN_WARNING
"%s: rbd_dev %p: %pV\n",
357 RBD_DRV_NAME
, rbd_dev
, &vaf
);
362 #define rbd_assert(expr) \
363 if (unlikely(!(expr))) { \
364 printk(KERN_ERR "\nAssertion failure in %s() " \
366 "\trbd_assert(%s);\n\n", \
367 __func__, __LINE__, #expr); \
370 #else /* !RBD_DEBUG */
371 # define rbd_assert(expr) ((void) 0)
372 #endif /* !RBD_DEBUG */
374 static int rbd_dev_refresh(struct rbd_device
*rbd_dev
, u64
*hver
);
375 static int rbd_dev_v2_refresh(struct rbd_device
*rbd_dev
, u64
*hver
);
377 static int rbd_open(struct block_device
*bdev
, fmode_t mode
)
379 struct rbd_device
*rbd_dev
= bdev
->bd_disk
->private_data
;
381 if ((mode
& FMODE_WRITE
) && rbd_dev
->mapping
.read_only
)
384 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
385 (void) get_device(&rbd_dev
->dev
);
386 set_device_ro(bdev
, rbd_dev
->mapping
.read_only
);
387 rbd_dev
->open_count
++;
388 mutex_unlock(&ctl_mutex
);
393 static int rbd_release(struct gendisk
*disk
, fmode_t mode
)
395 struct rbd_device
*rbd_dev
= disk
->private_data
;
397 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
398 rbd_assert(rbd_dev
->open_count
> 0);
399 rbd_dev
->open_count
--;
400 put_device(&rbd_dev
->dev
);
401 mutex_unlock(&ctl_mutex
);
406 static const struct block_device_operations rbd_bd_ops
= {
407 .owner
= THIS_MODULE
,
409 .release
= rbd_release
,
413 * Initialize an rbd client instance.
416 static struct rbd_client
*rbd_client_create(struct ceph_options
*ceph_opts
)
418 struct rbd_client
*rbdc
;
421 dout("rbd_client_create\n");
422 rbdc
= kmalloc(sizeof(struct rbd_client
), GFP_KERNEL
);
426 kref_init(&rbdc
->kref
);
427 INIT_LIST_HEAD(&rbdc
->node
);
429 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
431 rbdc
->client
= ceph_create_client(ceph_opts
, rbdc
, 0, 0);
432 if (IS_ERR(rbdc
->client
))
434 ceph_opts
= NULL
; /* Now rbdc->client is responsible for ceph_opts */
436 ret
= ceph_open_session(rbdc
->client
);
440 spin_lock(&rbd_client_list_lock
);
441 list_add_tail(&rbdc
->node
, &rbd_client_list
);
442 spin_unlock(&rbd_client_list_lock
);
444 mutex_unlock(&ctl_mutex
);
446 dout("rbd_client_create created %p\n", rbdc
);
450 ceph_destroy_client(rbdc
->client
);
452 mutex_unlock(&ctl_mutex
);
456 ceph_destroy_options(ceph_opts
);
461 * Find a ceph client with specific addr and configuration. If
462 * found, bump its reference count.
464 static struct rbd_client
*rbd_client_find(struct ceph_options
*ceph_opts
)
466 struct rbd_client
*client_node
;
469 if (ceph_opts
->flags
& CEPH_OPT_NOSHARE
)
472 spin_lock(&rbd_client_list_lock
);
473 list_for_each_entry(client_node
, &rbd_client_list
, node
) {
474 if (!ceph_compare_options(ceph_opts
, client_node
->client
)) {
475 kref_get(&client_node
->kref
);
480 spin_unlock(&rbd_client_list_lock
);
482 return found
? client_node
: NULL
;
492 /* string args above */
495 /* Boolean args above */
499 static match_table_t rbd_opts_tokens
= {
501 /* string args above */
502 {Opt_read_only
, "read_only"},
503 {Opt_read_only
, "ro"}, /* Alternate spelling */
504 {Opt_read_write
, "read_write"},
505 {Opt_read_write
, "rw"}, /* Alternate spelling */
506 /* Boolean args above */
514 #define RBD_READ_ONLY_DEFAULT false
516 static int parse_rbd_opts_token(char *c
, void *private)
518 struct rbd_options
*rbd_opts
= private;
519 substring_t argstr
[MAX_OPT_ARGS
];
520 int token
, intval
, ret
;
522 token
= match_token(c
, rbd_opts_tokens
, argstr
);
526 if (token
< Opt_last_int
) {
527 ret
= match_int(&argstr
[0], &intval
);
529 pr_err("bad mount option arg (not int) "
533 dout("got int token %d val %d\n", token
, intval
);
534 } else if (token
> Opt_last_int
&& token
< Opt_last_string
) {
535 dout("got string token %d val %s\n", token
,
537 } else if (token
> Opt_last_string
&& token
< Opt_last_bool
) {
538 dout("got Boolean token %d\n", token
);
540 dout("got token %d\n", token
);
545 rbd_opts
->read_only
= true;
548 rbd_opts
->read_only
= false;
558 * Get a ceph client with specific addr and configuration, if one does
559 * not exist create it.
561 static struct rbd_client
*rbd_get_client(struct ceph_options
*ceph_opts
)
563 struct rbd_client
*rbdc
;
565 rbdc
= rbd_client_find(ceph_opts
);
566 if (rbdc
) /* using an existing client */
567 ceph_destroy_options(ceph_opts
);
569 rbdc
= rbd_client_create(ceph_opts
);
575 * Destroy ceph client
577 * Caller must hold rbd_client_list_lock.
579 static void rbd_client_release(struct kref
*kref
)
581 struct rbd_client
*rbdc
= container_of(kref
, struct rbd_client
, kref
);
583 dout("rbd_release_client %p\n", rbdc
);
584 spin_lock(&rbd_client_list_lock
);
585 list_del(&rbdc
->node
);
586 spin_unlock(&rbd_client_list_lock
);
588 ceph_destroy_client(rbdc
->client
);
593 * Drop reference to ceph client node. If it's not referenced anymore, release
596 static void rbd_put_client(struct rbd_client
*rbdc
)
599 kref_put(&rbdc
->kref
, rbd_client_release
);
602 static bool rbd_image_format_valid(u32 image_format
)
604 return image_format
== 1 || image_format
== 2;
607 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk
*ondisk
)
612 /* The header has to start with the magic rbd header text */
613 if (memcmp(&ondisk
->text
, RBD_HEADER_TEXT
, sizeof (RBD_HEADER_TEXT
)))
616 /* The bio layer requires at least sector-sized I/O */
618 if (ondisk
->options
.order
< SECTOR_SHIFT
)
621 /* If we use u64 in a few spots we may be able to loosen this */
623 if (ondisk
->options
.order
> 8 * sizeof (int) - 1)
627 * The size of a snapshot header has to fit in a size_t, and
628 * that limits the number of snapshots.
630 snap_count
= le32_to_cpu(ondisk
->snap_count
);
631 size
= SIZE_MAX
- sizeof (struct ceph_snap_context
);
632 if (snap_count
> size
/ sizeof (__le64
))
636 * Not only that, but the size of the entire the snapshot
637 * header must also be representable in a size_t.
639 size
-= snap_count
* sizeof (__le64
);
640 if ((u64
) size
< le64_to_cpu(ondisk
->snap_names_len
))
647 * Create a new header structure, translate header format from the on-disk
650 static int rbd_header_from_disk(struct rbd_image_header
*header
,
651 struct rbd_image_header_ondisk
*ondisk
)
658 memset(header
, 0, sizeof (*header
));
660 snap_count
= le32_to_cpu(ondisk
->snap_count
);
662 len
= strnlen(ondisk
->object_prefix
, sizeof (ondisk
->object_prefix
));
663 header
->object_prefix
= kmalloc(len
+ 1, GFP_KERNEL
);
664 if (!header
->object_prefix
)
666 memcpy(header
->object_prefix
, ondisk
->object_prefix
, len
);
667 header
->object_prefix
[len
] = '\0';
670 u64 snap_names_len
= le64_to_cpu(ondisk
->snap_names_len
);
672 /* Save a copy of the snapshot names */
674 if (snap_names_len
> (u64
) SIZE_MAX
)
676 header
->snap_names
= kmalloc(snap_names_len
, GFP_KERNEL
);
677 if (!header
->snap_names
)
680 * Note that rbd_dev_v1_header_read() guarantees
681 * the ondisk buffer we're working with has
682 * snap_names_len bytes beyond the end of the
683 * snapshot id array, this memcpy() is safe.
685 memcpy(header
->snap_names
, &ondisk
->snaps
[snap_count
],
688 /* Record each snapshot's size */
690 size
= snap_count
* sizeof (*header
->snap_sizes
);
691 header
->snap_sizes
= kmalloc(size
, GFP_KERNEL
);
692 if (!header
->snap_sizes
)
694 for (i
= 0; i
< snap_count
; i
++)
695 header
->snap_sizes
[i
] =
696 le64_to_cpu(ondisk
->snaps
[i
].image_size
);
698 WARN_ON(ondisk
->snap_names_len
);
699 header
->snap_names
= NULL
;
700 header
->snap_sizes
= NULL
;
703 header
->features
= 0; /* No features support in v1 images */
704 header
->obj_order
= ondisk
->options
.order
;
705 header
->crypt_type
= ondisk
->options
.crypt_type
;
706 header
->comp_type
= ondisk
->options
.comp_type
;
708 /* Allocate and fill in the snapshot context */
710 header
->image_size
= le64_to_cpu(ondisk
->image_size
);
711 size
= sizeof (struct ceph_snap_context
);
712 size
+= snap_count
* sizeof (header
->snapc
->snaps
[0]);
713 header
->snapc
= kzalloc(size
, GFP_KERNEL
);
717 atomic_set(&header
->snapc
->nref
, 1);
718 header
->snapc
->seq
= le64_to_cpu(ondisk
->snap_seq
);
719 header
->snapc
->num_snaps
= snap_count
;
720 for (i
= 0; i
< snap_count
; i
++)
721 header
->snapc
->snaps
[i
] =
722 le64_to_cpu(ondisk
->snaps
[i
].id
);
727 kfree(header
->snap_sizes
);
728 header
->snap_sizes
= NULL
;
729 kfree(header
->snap_names
);
730 header
->snap_names
= NULL
;
731 kfree(header
->object_prefix
);
732 header
->object_prefix
= NULL
;
737 static const char *rbd_snap_name(struct rbd_device
*rbd_dev
, u64 snap_id
)
739 struct rbd_snap
*snap
;
741 if (snap_id
== CEPH_NOSNAP
)
742 return RBD_SNAP_HEAD_NAME
;
744 list_for_each_entry(snap
, &rbd_dev
->snaps
, node
)
745 if (snap_id
== snap
->id
)
751 static int snap_by_name(struct rbd_device
*rbd_dev
, const char *snap_name
)
754 struct rbd_snap
*snap
;
756 list_for_each_entry(snap
, &rbd_dev
->snaps
, node
) {
757 if (!strcmp(snap_name
, snap
->name
)) {
758 rbd_dev
->spec
->snap_id
= snap
->id
;
759 rbd_dev
->mapping
.size
= snap
->size
;
760 rbd_dev
->mapping
.features
= snap
->features
;
769 static int rbd_dev_set_mapping(struct rbd_device
*rbd_dev
)
773 if (!memcmp(rbd_dev
->spec
->snap_name
, RBD_SNAP_HEAD_NAME
,
774 sizeof (RBD_SNAP_HEAD_NAME
))) {
775 rbd_dev
->spec
->snap_id
= CEPH_NOSNAP
;
776 rbd_dev
->mapping
.size
= rbd_dev
->header
.image_size
;
777 rbd_dev
->mapping
.features
= rbd_dev
->header
.features
;
780 ret
= snap_by_name(rbd_dev
, rbd_dev
->spec
->snap_name
);
783 rbd_dev
->mapping
.read_only
= true;
785 atomic_set(&rbd_dev
->exists
, 1);
790 static void rbd_header_free(struct rbd_image_header
*header
)
792 kfree(header
->object_prefix
);
793 header
->object_prefix
= NULL
;
794 kfree(header
->snap_sizes
);
795 header
->snap_sizes
= NULL
;
796 kfree(header
->snap_names
);
797 header
->snap_names
= NULL
;
798 ceph_put_snap_context(header
->snapc
);
799 header
->snapc
= NULL
;
802 static const char *rbd_segment_name(struct rbd_device
*rbd_dev
, u64 offset
)
808 name
= kmalloc(MAX_OBJ_NAME_SIZE
+ 1, GFP_NOIO
);
811 segment
= offset
>> rbd_dev
->header
.obj_order
;
812 ret
= snprintf(name
, MAX_OBJ_NAME_SIZE
+ 1, "%s.%012llx",
813 rbd_dev
->header
.object_prefix
, segment
);
814 if (ret
< 0 || ret
> MAX_OBJ_NAME_SIZE
) {
815 pr_err("error formatting segment name for #%llu (%d)\n",
824 static u64
rbd_segment_offset(struct rbd_device
*rbd_dev
, u64 offset
)
826 u64 segment_size
= (u64
) 1 << rbd_dev
->header
.obj_order
;
828 return offset
& (segment_size
- 1);
831 static u64
rbd_segment_length(struct rbd_device
*rbd_dev
,
832 u64 offset
, u64 length
)
834 u64 segment_size
= (u64
) 1 << rbd_dev
->header
.obj_order
;
836 offset
&= segment_size
- 1;
838 rbd_assert(length
<= U64_MAX
- offset
);
839 if (offset
+ length
> segment_size
)
840 length
= segment_size
- offset
;
846 * returns the size of an object in the image
848 static u64
rbd_obj_bytes(struct rbd_image_header
*header
)
850 return 1 << header
->obj_order
;
857 static void bio_chain_put(struct bio
*chain
)
863 chain
= chain
->bi_next
;
869 * zeros a bio chain, starting at specific offset
871 static void zero_bio_chain(struct bio
*chain
, int start_ofs
)
880 bio_for_each_segment(bv
, chain
, i
) {
881 if (pos
+ bv
->bv_len
> start_ofs
) {
882 int remainder
= max(start_ofs
- pos
, 0);
883 buf
= bvec_kmap_irq(bv
, &flags
);
884 memset(buf
+ remainder
, 0,
885 bv
->bv_len
- remainder
);
886 bvec_kunmap_irq(buf
, &flags
);
891 chain
= chain
->bi_next
;
896 * Clone a portion of a bio, starting at the given byte offset
897 * and continuing for the number of bytes indicated.
899 static struct bio
*bio_clone_range(struct bio
*bio_src
,
908 unsigned short end_idx
;
912 /* Handle the easy case for the caller */
914 if (!offset
&& len
== bio_src
->bi_size
)
915 return bio_clone(bio_src
, gfpmask
);
917 if (WARN_ON_ONCE(!len
))
919 if (WARN_ON_ONCE(len
> bio_src
->bi_size
))
921 if (WARN_ON_ONCE(offset
> bio_src
->bi_size
- len
))
924 /* Find first affected segment... */
927 __bio_for_each_segment(bv
, bio_src
, idx
, 0) {
928 if (resid
< bv
->bv_len
)
934 /* ...and the last affected segment */
937 __bio_for_each_segment(bv
, bio_src
, end_idx
, idx
) {
938 if (resid
<= bv
->bv_len
)
942 vcnt
= end_idx
- idx
+ 1;
944 /* Build the clone */
946 bio
= bio_alloc(gfpmask
, (unsigned int) vcnt
);
948 return NULL
; /* ENOMEM */
950 bio
->bi_bdev
= bio_src
->bi_bdev
;
951 bio
->bi_sector
= bio_src
->bi_sector
+ (offset
>> SECTOR_SHIFT
);
952 bio
->bi_rw
= bio_src
->bi_rw
;
953 bio
->bi_flags
|= 1 << BIO_CLONED
;
956 * Copy over our part of the bio_vec, then update the first
957 * and last (or only) entries.
959 memcpy(&bio
->bi_io_vec
[0], &bio_src
->bi_io_vec
[idx
],
960 vcnt
* sizeof (struct bio_vec
));
961 bio
->bi_io_vec
[0].bv_offset
+= voff
;
963 bio
->bi_io_vec
[0].bv_len
-= voff
;
964 bio
->bi_io_vec
[vcnt
- 1].bv_len
= resid
;
966 bio
->bi_io_vec
[0].bv_len
= len
;
977 * Clone a portion of a bio chain, starting at the given byte offset
978 * into the first bio in the source chain and continuing for the
979 * number of bytes indicated. The result is another bio chain of
980 * exactly the given length, or a null pointer on error.
982 * The bio_src and offset parameters are both in-out. On entry they
983 * refer to the first source bio and the offset into that bio where
984 * the start of data to be cloned is located.
986 * On return, bio_src is updated to refer to the bio in the source
987 * chain that contains first un-cloned byte, and *offset will
988 * contain the offset of that byte within that bio.
990 static struct bio
*bio_chain_clone_range(struct bio
**bio_src
,
991 unsigned int *offset
,
995 struct bio
*bi
= *bio_src
;
996 unsigned int off
= *offset
;
997 struct bio
*chain
= NULL
;
1000 /* Build up a chain of clone bios up to the limit */
1002 if (!bi
|| off
>= bi
->bi_size
|| !len
)
1003 return NULL
; /* Nothing to clone */
1007 unsigned int bi_size
;
1011 rbd_warn(NULL
, "bio_chain exhausted with %u left", len
);
1012 goto out_err
; /* EINVAL; ran out of bio's */
1014 bi_size
= min_t(unsigned int, bi
->bi_size
- off
, len
);
1015 bio
= bio_clone_range(bi
, off
, bi_size
, gfpmask
);
1017 goto out_err
; /* ENOMEM */
1020 end
= &bio
->bi_next
;
1023 if (off
== bi
->bi_size
) {
1034 bio_chain_put(chain
);
1039 static void rbd_obj_request_get(struct rbd_obj_request
*obj_request
)
1041 kref_get(&obj_request
->kref
);
1044 static void rbd_obj_request_destroy(struct kref
*kref
);
1045 static void rbd_obj_request_put(struct rbd_obj_request
*obj_request
)
1047 rbd_assert(obj_request
!= NULL
);
1048 kref_put(&obj_request
->kref
, rbd_obj_request_destroy
);
1051 static void rbd_img_request_get(struct rbd_img_request
*img_request
)
1053 kref_get(&img_request
->kref
);
1056 static void rbd_img_request_destroy(struct kref
*kref
);
1057 static void rbd_img_request_put(struct rbd_img_request
*img_request
)
1059 rbd_assert(img_request
!= NULL
);
1060 kref_put(&img_request
->kref
, rbd_img_request_destroy
);
1063 static inline void rbd_img_obj_request_add(struct rbd_img_request
*img_request
,
1064 struct rbd_obj_request
*obj_request
)
1066 rbd_obj_request_get(obj_request
);
1067 obj_request
->img_request
= img_request
;
1068 list_add_tail(&obj_request
->links
, &img_request
->obj_requests
);
1069 obj_request
->which
= img_request
->obj_request_count
++;
1070 rbd_assert(obj_request
->which
!= BAD_WHICH
);
1073 static inline void rbd_img_obj_request_del(struct rbd_img_request
*img_request
,
1074 struct rbd_obj_request
*obj_request
)
1076 rbd_assert(obj_request
->which
!= BAD_WHICH
);
1077 obj_request
->which
= BAD_WHICH
;
1078 list_del(&obj_request
->links
);
1079 rbd_assert(obj_request
->img_request
== img_request
);
1080 obj_request
->callback
= NULL
;
1081 obj_request
->img_request
= NULL
;
1082 rbd_obj_request_put(obj_request
);
1085 static bool obj_request_type_valid(enum obj_request_type type
)
1088 case OBJ_REQUEST_NODATA
:
1089 case OBJ_REQUEST_BIO
:
1090 case OBJ_REQUEST_PAGES
:
1097 struct ceph_osd_req_op
*rbd_osd_req_op_create(u16 opcode
, ...)
1099 struct ceph_osd_req_op
*op
;
1103 op
= kzalloc(sizeof (*op
), GFP_NOIO
);
1107 va_start(args
, opcode
);
1109 case CEPH_OSD_OP_READ
:
1110 case CEPH_OSD_OP_WRITE
:
1111 /* rbd_osd_req_op_create(READ, offset, length) */
1112 /* rbd_osd_req_op_create(WRITE, offset, length) */
1113 op
->extent
.offset
= va_arg(args
, u64
);
1114 op
->extent
.length
= va_arg(args
, u64
);
1115 if (opcode
== CEPH_OSD_OP_WRITE
)
1116 op
->payload_len
= op
->extent
.length
;
1118 case CEPH_OSD_OP_CALL
:
1119 /* rbd_osd_req_op_create(CALL, class, method, data, datalen) */
1120 op
->cls
.class_name
= va_arg(args
, char *);
1121 size
= strlen(op
->cls
.class_name
);
1122 rbd_assert(size
<= (size_t) U8_MAX
);
1123 op
->cls
.class_len
= size
;
1124 op
->payload_len
= size
;
1126 op
->cls
.method_name
= va_arg(args
, char *);
1127 size
= strlen(op
->cls
.method_name
);
1128 rbd_assert(size
<= (size_t) U8_MAX
);
1129 op
->cls
.method_len
= size
;
1130 op
->payload_len
+= size
;
1133 op
->cls
.indata
= va_arg(args
, void *);
1134 size
= va_arg(args
, size_t);
1135 rbd_assert(size
<= (size_t) U32_MAX
);
1136 op
->cls
.indata_len
= (u32
) size
;
1137 op
->payload_len
+= size
;
1139 case CEPH_OSD_OP_NOTIFY_ACK
:
1140 case CEPH_OSD_OP_WATCH
:
1141 /* rbd_osd_req_op_create(NOTIFY_ACK, cookie, version) */
1142 /* rbd_osd_req_op_create(WATCH, cookie, version, flag) */
1143 op
->watch
.cookie
= va_arg(args
, u64
);
1144 op
->watch
.ver
= va_arg(args
, u64
);
1145 op
->watch
.ver
= cpu_to_le64(op
->watch
.ver
);
1146 if (opcode
== CEPH_OSD_OP_WATCH
&& va_arg(args
, int))
1147 op
->watch
.flag
= (u8
) 1;
1150 rbd_warn(NULL
, "unsupported opcode %hu\n", opcode
);
1160 static void rbd_osd_req_op_destroy(struct ceph_osd_req_op
*op
)
1166 * Send ceph osd request
1168 static int rbd_do_request(struct request
*rq
,
1169 struct rbd_device
*rbd_dev
,
1170 struct ceph_snap_context
*snapc
,
1172 const char *object_name
, u64 ofs
, u64 len
,
1174 struct page
**pages
,
1177 struct ceph_osd_req_op
*op
,
1178 void (*rbd_cb
)(struct ceph_osd_request
*,
1182 struct ceph_osd_client
*osdc
;
1183 struct ceph_osd_request
*osd_req
;
1184 struct timespec mtime
= CURRENT_TIME
;
1187 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n",
1188 object_name
, (unsigned long long) ofs
,
1189 (unsigned long long) len
);
1191 osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
1192 osd_req
= ceph_osdc_alloc_request(osdc
, snapc
, 1, false, GFP_NOIO
);
1196 osd_req
->r_flags
= flags
;
1197 osd_req
->r_pages
= pages
;
1199 osd_req
->r_bio
= bio
;
1200 bio_get(osd_req
->r_bio
);
1203 osd_req
->r_callback
= rbd_cb
;
1204 osd_req
->r_priv
= NULL
;
1206 strncpy(osd_req
->r_oid
, object_name
, sizeof(osd_req
->r_oid
));
1207 osd_req
->r_oid_len
= strlen(osd_req
->r_oid
);
1209 osd_req
->r_file_layout
= rbd_dev
->layout
; /* struct */
1210 osd_req
->r_num_pages
= calc_pages_for(ofs
, len
);
1211 osd_req
->r_page_alignment
= ofs
& ~PAGE_MASK
;
1213 ceph_osdc_build_request(osd_req
, ofs
, len
, 1, op
,
1214 snapc
, snapid
, &mtime
);
1216 if (op
->op
== CEPH_OSD_OP_WATCH
&& op
->watch
.flag
) {
1217 ceph_osdc_set_request_linger(osdc
, osd_req
);
1218 rbd_dev
->watch_request
= osd_req
;
1221 ret
= ceph_osdc_start_request(osdc
, osd_req
, false);
1228 ret
= ceph_osdc_wait_request(osdc
, osd_req
);
1229 version
= le64_to_cpu(osd_req
->r_reassert_version
.version
);
1232 dout("reassert_ver=%llu\n", (unsigned long long) version
);
1233 ceph_osdc_put_request(osd_req
);
1239 bio_chain_put(osd_req
->r_bio
);
1240 ceph_osdc_put_request(osd_req
);
1245 static void rbd_simple_req_cb(struct ceph_osd_request
*osd_req
,
1246 struct ceph_msg
*msg
)
1248 ceph_osdc_put_request(osd_req
);
1252 * Do a synchronous ceph osd operation
1254 static int rbd_req_sync_op(struct rbd_device
*rbd_dev
,
1256 struct ceph_osd_req_op
*op
,
1257 const char *object_name
,
1258 u64 ofs
, u64 inbound_size
,
1263 struct page
**pages
;
1266 rbd_assert(op
!= NULL
);
1268 num_pages
= calc_pages_for(ofs
, inbound_size
);
1269 pages
= ceph_alloc_page_vector(num_pages
, GFP_KERNEL
);
1271 return PTR_ERR(pages
);
1273 ret
= rbd_do_request(NULL
, rbd_dev
, NULL
, CEPH_NOSNAP
,
1274 object_name
, ofs
, inbound_size
, NULL
,
1283 if ((flags
& CEPH_OSD_FLAG_READ
) && inbound
)
1284 ret
= ceph_copy_from_page_vector(pages
, inbound
, ofs
, ret
);
1287 ceph_release_page_vector(pages
, num_pages
);
1291 static int rbd_obj_request_submit(struct ceph_osd_client
*osdc
,
1292 struct rbd_obj_request
*obj_request
)
1294 return ceph_osdc_start_request(osdc
, obj_request
->osd_req
, false);
1297 static void rbd_img_request_complete(struct rbd_img_request
*img_request
)
1299 if (img_request
->callback
)
1300 img_request
->callback(img_request
);
1302 rbd_img_request_put(img_request
);
1305 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1307 static int rbd_obj_request_wait(struct rbd_obj_request
*obj_request
)
1309 return wait_for_completion_interruptible(&obj_request
->completion
);
1312 static void rbd_osd_trivial_callback(struct rbd_obj_request
*obj_request
,
1313 struct ceph_osd_op
*op
)
1315 atomic_set(&obj_request
->done
, 1);
1318 static void rbd_obj_request_complete(struct rbd_obj_request
*obj_request
)
1320 if (obj_request
->callback
)
1321 obj_request
->callback(obj_request
);
1323 complete_all(&obj_request
->completion
);
1327 * Request sync osd watch
1329 static int rbd_req_sync_notify_ack(struct rbd_device
*rbd_dev
,
1333 struct ceph_osd_req_op
*op
;
1336 op
= rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK
, notify_id
, ver
);
1340 ret
= rbd_do_request(NULL
, rbd_dev
, NULL
, CEPH_NOSNAP
,
1341 rbd_dev
->header_name
, 0, 0, NULL
,
1345 rbd_simple_req_cb
, NULL
);
1347 rbd_osd_req_op_destroy(op
);
1353 * Synchronous osd object method call
1355 static int rbd_req_sync_exec(struct rbd_device
*rbd_dev
,
1356 const char *object_name
,
1357 const char *class_name
,
1358 const char *method_name
,
1359 const char *outbound
,
1360 size_t outbound_size
,
1362 size_t inbound_size
,
1365 struct ceph_osd_req_op
*op
;
1369 * Any input parameters required by the method we're calling
1370 * will be sent along with the class and method names as
1371 * part of the message payload. That data and its size are
1372 * supplied via the indata and indata_len fields (named from
1373 * the perspective of the server side) in the OSD request
1376 op
= rbd_osd_req_op_create(CEPH_OSD_OP_CALL
, class_name
,
1377 method_name
, outbound
, outbound_size
);
1381 ret
= rbd_req_sync_op(rbd_dev
, CEPH_OSD_FLAG_READ
, op
,
1382 object_name
, 0, inbound_size
, inbound
,
1385 rbd_osd_req_op_destroy(op
);
1387 dout("cls_exec returned %d\n", ret
);
1391 static void rbd_osd_read_callback(struct rbd_obj_request
*obj_request
,
1392 struct ceph_osd_op
*op
)
1397 * We support a 64-bit length, but ultimately it has to be
1398 * passed to blk_end_request(), which takes an unsigned int.
1400 xferred
= le64_to_cpu(op
->extent
.length
);
1401 rbd_assert(xferred
< (u64
) UINT_MAX
);
1402 if (obj_request
->result
== (s32
) -ENOENT
) {
1403 zero_bio_chain(obj_request
->bio_list
, 0);
1404 obj_request
->result
= 0;
1405 } else if (xferred
< obj_request
->length
&& !obj_request
->result
) {
1406 zero_bio_chain(obj_request
->bio_list
, xferred
);
1407 xferred
= obj_request
->length
;
1409 obj_request
->xferred
= xferred
;
1410 atomic_set(&obj_request
->done
, 1);
1413 static void rbd_osd_write_callback(struct rbd_obj_request
*obj_request
,
1414 struct ceph_osd_op
*op
)
1416 obj_request
->xferred
= le64_to_cpu(op
->extent
.length
);
1417 atomic_set(&obj_request
->done
, 1);
1420 static void rbd_osd_req_callback(struct ceph_osd_request
*osd_req
,
1421 struct ceph_msg
*msg
)
1423 struct rbd_obj_request
*obj_request
= osd_req
->r_priv
;
1424 struct ceph_osd_reply_head
*reply_head
;
1425 struct ceph_osd_op
*op
;
1429 rbd_assert(osd_req
== obj_request
->osd_req
);
1430 rbd_assert(!!obj_request
->img_request
^
1431 (obj_request
->which
== BAD_WHICH
));
1433 obj_request
->xferred
= le32_to_cpu(msg
->hdr
.data_len
);
1434 reply_head
= msg
->front
.iov_base
;
1435 obj_request
->result
= (s32
) le32_to_cpu(reply_head
->result
);
1436 obj_request
->version
= le64_to_cpu(osd_req
->r_reassert_version
.version
);
1438 num_ops
= le32_to_cpu(reply_head
->num_ops
);
1439 WARN_ON(num_ops
!= 1); /* For now */
1441 op
= &reply_head
->ops
[0];
1442 opcode
= le16_to_cpu(op
->op
);
1444 case CEPH_OSD_OP_READ
:
1445 rbd_osd_read_callback(obj_request
, op
);
1447 case CEPH_OSD_OP_WRITE
:
1448 rbd_osd_write_callback(obj_request
, op
);
1450 case CEPH_OSD_OP_NOTIFY_ACK
:
1451 case CEPH_OSD_OP_WATCH
:
1452 rbd_osd_trivial_callback(obj_request
, op
);
1455 rbd_warn(NULL
, "%s: unsupported op %hu\n",
1456 obj_request
->object_name
, (unsigned short) opcode
);
1460 if (atomic_read(&obj_request
->done
))
1461 rbd_obj_request_complete(obj_request
);
1464 static struct ceph_osd_request
*rbd_osd_req_create(
1465 struct rbd_device
*rbd_dev
,
1467 struct rbd_obj_request
*obj_request
,
1468 struct ceph_osd_req_op
*op
)
1470 struct rbd_img_request
*img_request
= obj_request
->img_request
;
1471 struct ceph_snap_context
*snapc
= NULL
;
1472 struct ceph_osd_client
*osdc
;
1473 struct ceph_osd_request
*osd_req
;
1474 struct timespec now
;
1475 struct timespec
*mtime
;
1476 u64 snap_id
= CEPH_NOSNAP
;
1477 u64 offset
= obj_request
->offset
;
1478 u64 length
= obj_request
->length
;
1481 rbd_assert(img_request
->write_request
== write_request
);
1482 if (img_request
->write_request
)
1483 snapc
= img_request
->snapc
;
1485 snap_id
= img_request
->snap_id
;
1488 /* Allocate and initialize the request, for the single op */
1490 osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
1491 osd_req
= ceph_osdc_alloc_request(osdc
, snapc
, 1, false, GFP_ATOMIC
);
1493 return NULL
; /* ENOMEM */
1495 rbd_assert(obj_request_type_valid(obj_request
->type
));
1496 switch (obj_request
->type
) {
1497 case OBJ_REQUEST_NODATA
:
1498 break; /* Nothing to do */
1499 case OBJ_REQUEST_BIO
:
1500 rbd_assert(obj_request
->bio_list
!= NULL
);
1501 osd_req
->r_bio
= obj_request
->bio_list
;
1502 bio_get(osd_req
->r_bio
);
1503 /* osd client requires "num pages" even for bio */
1504 osd_req
->r_num_pages
= calc_pages_for(offset
, length
);
1506 case OBJ_REQUEST_PAGES
:
1507 osd_req
->r_pages
= obj_request
->pages
;
1508 osd_req
->r_num_pages
= obj_request
->page_count
;
1509 osd_req
->r_page_alignment
= offset
& ~PAGE_MASK
;
1513 if (write_request
) {
1514 osd_req
->r_flags
= CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
;
1518 osd_req
->r_flags
= CEPH_OSD_FLAG_READ
;
1519 mtime
= NULL
; /* not needed for reads */
1520 offset
= 0; /* These are not used... */
1521 length
= 0; /* ...for osd read requests */
1524 osd_req
->r_callback
= rbd_osd_req_callback
;
1525 osd_req
->r_priv
= obj_request
;
1527 osd_req
->r_oid_len
= strlen(obj_request
->object_name
);
1528 rbd_assert(osd_req
->r_oid_len
< sizeof (osd_req
->r_oid
));
1529 memcpy(osd_req
->r_oid
, obj_request
->object_name
, osd_req
->r_oid_len
);
1531 osd_req
->r_file_layout
= rbd_dev
->layout
; /* struct */
1533 /* osd_req will get its own reference to snapc (if non-null) */
1535 ceph_osdc_build_request(osd_req
, offset
, length
, 1, op
,
1536 snapc
, snap_id
, mtime
);
1541 static void rbd_osd_req_destroy(struct ceph_osd_request
*osd_req
)
1543 ceph_osdc_put_request(osd_req
);
1546 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1548 static struct rbd_obj_request
*rbd_obj_request_create(const char *object_name
,
1549 u64 offset
, u64 length
,
1550 enum obj_request_type type
)
1552 struct rbd_obj_request
*obj_request
;
1556 rbd_assert(obj_request_type_valid(type
));
1558 size
= strlen(object_name
) + 1;
1559 obj_request
= kzalloc(sizeof (*obj_request
) + size
, GFP_KERNEL
);
1563 name
= (char *)(obj_request
+ 1);
1564 obj_request
->object_name
= memcpy(name
, object_name
, size
);
1565 obj_request
->offset
= offset
;
1566 obj_request
->length
= length
;
1567 obj_request
->which
= BAD_WHICH
;
1568 obj_request
->type
= type
;
1569 INIT_LIST_HEAD(&obj_request
->links
);
1570 atomic_set(&obj_request
->done
, 0);
1571 init_completion(&obj_request
->completion
);
1572 kref_init(&obj_request
->kref
);
1577 static void rbd_obj_request_destroy(struct kref
*kref
)
1579 struct rbd_obj_request
*obj_request
;
1581 obj_request
= container_of(kref
, struct rbd_obj_request
, kref
);
1583 rbd_assert(obj_request
->img_request
== NULL
);
1584 rbd_assert(obj_request
->which
== BAD_WHICH
);
1586 if (obj_request
->osd_req
)
1587 rbd_osd_req_destroy(obj_request
->osd_req
);
1589 rbd_assert(obj_request_type_valid(obj_request
->type
));
1590 switch (obj_request
->type
) {
1591 case OBJ_REQUEST_NODATA
:
1592 break; /* Nothing to do */
1593 case OBJ_REQUEST_BIO
:
1594 if (obj_request
->bio_list
)
1595 bio_chain_put(obj_request
->bio_list
);
1597 case OBJ_REQUEST_PAGES
:
1598 if (obj_request
->pages
)
1599 ceph_release_page_vector(obj_request
->pages
,
1600 obj_request
->page_count
);
1608 * Caller is responsible for filling in the list of object requests
1609 * that comprises the image request, and the Linux request pointer
1610 * (if there is one).
1612 struct rbd_img_request
*rbd_img_request_create(struct rbd_device
*rbd_dev
,
1613 u64 offset
, u64 length
,
1616 struct rbd_img_request
*img_request
;
1617 struct ceph_snap_context
*snapc
= NULL
;
1619 img_request
= kmalloc(sizeof (*img_request
), GFP_ATOMIC
);
1623 if (write_request
) {
1624 down_read(&rbd_dev
->header_rwsem
);
1625 snapc
= ceph_get_snap_context(rbd_dev
->header
.snapc
);
1626 up_read(&rbd_dev
->header_rwsem
);
1627 if (WARN_ON(!snapc
)) {
1629 return NULL
; /* Shouldn't happen */
1633 img_request
->rq
= NULL
;
1634 img_request
->rbd_dev
= rbd_dev
;
1635 img_request
->offset
= offset
;
1636 img_request
->length
= length
;
1637 img_request
->write_request
= write_request
;
1639 img_request
->snapc
= snapc
;
1641 img_request
->snap_id
= rbd_dev
->spec
->snap_id
;
1642 spin_lock_init(&img_request
->completion_lock
);
1643 img_request
->next_completion
= 0;
1644 img_request
->callback
= NULL
;
1645 img_request
->obj_request_count
= 0;
1646 INIT_LIST_HEAD(&img_request
->obj_requests
);
1647 kref_init(&img_request
->kref
);
1649 rbd_img_request_get(img_request
); /* Avoid a warning */
1650 rbd_img_request_put(img_request
); /* TEMPORARY */
1655 static void rbd_img_request_destroy(struct kref
*kref
)
1657 struct rbd_img_request
*img_request
;
1658 struct rbd_obj_request
*obj_request
;
1659 struct rbd_obj_request
*next_obj_request
;
1661 img_request
= container_of(kref
, struct rbd_img_request
, kref
);
1663 for_each_obj_request_safe(img_request
, obj_request
, next_obj_request
)
1664 rbd_img_obj_request_del(img_request
, obj_request
);
1666 if (img_request
->write_request
)
1667 ceph_put_snap_context(img_request
->snapc
);
1672 static int rbd_img_request_fill_bio(struct rbd_img_request
*img_request
,
1673 struct bio
*bio_list
)
1675 struct rbd_device
*rbd_dev
= img_request
->rbd_dev
;
1676 struct rbd_obj_request
*obj_request
= NULL
;
1677 struct rbd_obj_request
*next_obj_request
;
1678 unsigned int bio_offset
;
1683 opcode
= img_request
->write_request
? CEPH_OSD_OP_WRITE
1686 image_offset
= img_request
->offset
;
1687 rbd_assert(image_offset
== bio_list
->bi_sector
<< SECTOR_SHIFT
);
1688 resid
= img_request
->length
;
1690 const char *object_name
;
1691 unsigned int clone_size
;
1692 struct ceph_osd_req_op
*op
;
1696 object_name
= rbd_segment_name(rbd_dev
, image_offset
);
1699 offset
= rbd_segment_offset(rbd_dev
, image_offset
);
1700 length
= rbd_segment_length(rbd_dev
, image_offset
, resid
);
1701 obj_request
= rbd_obj_request_create(object_name
,
1704 kfree(object_name
); /* object request has its own copy */
1708 rbd_assert(length
<= (u64
) UINT_MAX
);
1709 clone_size
= (unsigned int) length
;
1710 obj_request
->bio_list
= bio_chain_clone_range(&bio_list
,
1711 &bio_offset
, clone_size
,
1713 if (!obj_request
->bio_list
)
1717 * Build up the op to use in building the osd
1718 * request. Note that the contents of the op are
1719 * copied by rbd_osd_req_create().
1721 op
= rbd_osd_req_op_create(opcode
, offset
, length
);
1724 obj_request
->osd_req
= rbd_osd_req_create(rbd_dev
,
1725 img_request
->write_request
,
1727 rbd_osd_req_op_destroy(op
);
1728 if (!obj_request
->osd_req
)
1730 /* status and version are initially zero-filled */
1732 rbd_img_obj_request_add(img_request
, obj_request
);
1734 image_offset
+= length
;
1741 rbd_obj_request_put(obj_request
);
1743 for_each_obj_request_safe(img_request
, obj_request
, next_obj_request
)
1744 rbd_obj_request_put(obj_request
);
1749 static void rbd_img_obj_callback(struct rbd_obj_request
*obj_request
)
1751 struct rbd_img_request
*img_request
;
1752 u32 which
= obj_request
->which
;
1755 img_request
= obj_request
->img_request
;
1756 rbd_assert(img_request
!= NULL
);
1757 rbd_assert(img_request
->rq
!= NULL
);
1758 rbd_assert(which
!= BAD_WHICH
);
1759 rbd_assert(which
< img_request
->obj_request_count
);
1760 rbd_assert(which
>= img_request
->next_completion
);
1762 spin_lock_irq(&img_request
->completion_lock
);
1763 if (which
!= img_request
->next_completion
)
1766 for_each_obj_request_from(img_request
, obj_request
) {
1767 unsigned int xferred
;
1771 rbd_assert(which
< img_request
->obj_request_count
);
1773 if (!atomic_read(&obj_request
->done
))
1776 rbd_assert(obj_request
->xferred
<= (u64
) UINT_MAX
);
1777 xferred
= (unsigned int) obj_request
->xferred
;
1778 result
= (int) obj_request
->result
;
1780 rbd_warn(NULL
, "obj_request %s result %d xferred %u\n",
1781 img_request
->write_request
? "write" : "read",
1784 more
= blk_end_request(img_request
->rq
, result
, xferred
);
1787 rbd_assert(more
^ (which
== img_request
->obj_request_count
));
1788 img_request
->next_completion
= which
;
1790 spin_unlock_irq(&img_request
->completion_lock
);
1793 rbd_img_request_complete(img_request
);
1796 static int rbd_img_request_submit(struct rbd_img_request
*img_request
)
1798 struct rbd_device
*rbd_dev
= img_request
->rbd_dev
;
1799 struct ceph_osd_client
*osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
1800 struct rbd_obj_request
*obj_request
;
1802 for_each_obj_request(img_request
, obj_request
) {
1805 obj_request
->callback
= rbd_img_obj_callback
;
1806 ret
= rbd_obj_request_submit(osdc
, obj_request
);
1810 * The image request has its own reference to each
1811 * of its object requests, so we can safely drop the
1814 rbd_obj_request_put(obj_request
);
1820 static int rbd_obj_notify_ack_sync(struct rbd_device
*rbd_dev
,
1821 u64 ver
, u64 notify_id
)
1823 struct rbd_obj_request
*obj_request
;
1824 struct ceph_osd_req_op
*op
;
1825 struct ceph_osd_client
*osdc
;
1828 obj_request
= rbd_obj_request_create(rbd_dev
->header_name
, 0, 0,
1829 OBJ_REQUEST_NODATA
);
1834 op
= rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK
, notify_id
, ver
);
1837 obj_request
->osd_req
= rbd_osd_req_create(rbd_dev
, false,
1839 rbd_osd_req_op_destroy(op
);
1840 if (!obj_request
->osd_req
)
1843 osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
1844 ret
= rbd_obj_request_submit(osdc
, obj_request
);
1846 ret
= rbd_obj_request_wait(obj_request
);
1848 rbd_obj_request_put(obj_request
);
1853 static void rbd_watch_cb(u64 ver
, u64 notify_id
, u8 opcode
, void *data
)
1855 struct rbd_device
*rbd_dev
= (struct rbd_device
*)data
;
1862 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1863 rbd_dev
->header_name
, (unsigned long long) notify_id
,
1864 (unsigned int) opcode
);
1865 rc
= rbd_dev_refresh(rbd_dev
, &hver
);
1867 rbd_warn(rbd_dev
, "got notification but failed to "
1868 " update snaps: %d\n", rc
);
1870 (void) rbd_req_sync_notify_ack
; /* avoid a warning */
1871 rbd_obj_notify_ack_sync(rbd_dev
, hver
, notify_id
);
1875 * Request sync osd watch/unwatch. The value of "start" determines
1876 * whether a watch request is being initiated or torn down.
1878 static int rbd_dev_header_watch_sync(struct rbd_device
*rbd_dev
, int start
)
1880 struct ceph_osd_client
*osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
1881 struct rbd_obj_request
*obj_request
;
1882 struct ceph_osd_req_op
*op
;
1885 rbd_assert(start
^ !!rbd_dev
->watch_event
);
1886 rbd_assert(start
^ !!rbd_dev
->watch_request
);
1889 ret
= ceph_osdc_create_event(osdc
, rbd_watch_cb
, 0, rbd_dev
,
1890 &rbd_dev
->watch_event
);
1896 obj_request
= rbd_obj_request_create(rbd_dev
->header_name
, 0, 0,
1897 OBJ_REQUEST_NODATA
);
1901 op
= rbd_osd_req_op_create(CEPH_OSD_OP_WATCH
,
1902 rbd_dev
->watch_event
->cookie
,
1903 rbd_dev
->header
.obj_version
, start
);
1906 obj_request
->osd_req
= rbd_osd_req_create(rbd_dev
, true,
1908 rbd_osd_req_op_destroy(op
);
1909 if (!obj_request
->osd_req
)
1913 rbd_dev
->watch_request
= obj_request
->osd_req
;
1914 ceph_osdc_set_request_linger(osdc
, rbd_dev
->watch_request
);
1916 ret
= rbd_obj_request_submit(osdc
, obj_request
);
1919 ret
= rbd_obj_request_wait(obj_request
);
1923 ret
= obj_request
->result
;
1928 goto done
; /* Done if setting up the watch request */
1930 /* Cancel the event if we're tearing down, or on error */
1931 ceph_osdc_cancel_event(rbd_dev
->watch_event
);
1932 rbd_dev
->watch_event
= NULL
;
1935 rbd_obj_request_put(obj_request
);
1940 static void rbd_request_fn(struct request_queue
*q
)
1942 struct rbd_device
*rbd_dev
= q
->queuedata
;
1943 bool read_only
= rbd_dev
->mapping
.read_only
;
1947 while ((rq
= blk_fetch_request(q
))) {
1948 bool write_request
= rq_data_dir(rq
) == WRITE
;
1949 struct rbd_img_request
*img_request
;
1953 /* Ignore any non-FS requests that filter through. */
1955 if (rq
->cmd_type
!= REQ_TYPE_FS
) {
1956 __blk_end_request_all(rq
, 0);
1960 spin_unlock_irq(q
->queue_lock
);
1962 /* Disallow writes to a read-only device */
1964 if (write_request
) {
1968 rbd_assert(rbd_dev
->spec
->snap_id
== CEPH_NOSNAP
);
1971 /* Quit early if the snapshot has disappeared */
1973 if (!atomic_read(&rbd_dev
->exists
)) {
1974 dout("request for non-existent snapshot");
1975 rbd_assert(rbd_dev
->spec
->snap_id
!= CEPH_NOSNAP
);
1980 offset
= (u64
) blk_rq_pos(rq
) << SECTOR_SHIFT
;
1981 length
= (u64
) blk_rq_bytes(rq
);
1984 if (WARN_ON(offset
&& length
> U64_MAX
- offset
+ 1))
1985 goto end_request
; /* Shouldn't happen */
1988 img_request
= rbd_img_request_create(rbd_dev
, offset
, length
,
1993 img_request
->rq
= rq
;
1995 result
= rbd_img_request_fill_bio(img_request
, rq
->bio
);
1997 result
= rbd_img_request_submit(img_request
);
1999 rbd_img_request_put(img_request
);
2001 spin_lock_irq(q
->queue_lock
);
2003 rbd_warn(rbd_dev
, "obj_request %s result %d\n",
2004 write_request
? "write" : "read", result
);
2005 __blk_end_request_all(rq
, result
);
2011 * a queue callback. Makes sure that we don't create a bio that spans across
2012 * multiple osd objects. One exception would be with a single page bios,
2013 * which we handle later at bio_chain_clone_range()
2015 static int rbd_merge_bvec(struct request_queue
*q
, struct bvec_merge_data
*bmd
,
2016 struct bio_vec
*bvec
)
2018 struct rbd_device
*rbd_dev
= q
->queuedata
;
2019 sector_t sector_offset
;
2020 sector_t sectors_per_obj
;
2021 sector_t obj_sector_offset
;
2025 * Find how far into its rbd object the partition-relative
2026 * bio start sector is to offset relative to the enclosing
2029 sector_offset
= get_start_sect(bmd
->bi_bdev
) + bmd
->bi_sector
;
2030 sectors_per_obj
= 1 << (rbd_dev
->header
.obj_order
- SECTOR_SHIFT
);
2031 obj_sector_offset
= sector_offset
& (sectors_per_obj
- 1);
2034 * Compute the number of bytes from that offset to the end
2035 * of the object. Account for what's already used by the bio.
2037 ret
= (int) (sectors_per_obj
- obj_sector_offset
) << SECTOR_SHIFT
;
2038 if (ret
> bmd
->bi_size
)
2039 ret
-= bmd
->bi_size
;
2044 * Don't send back more than was asked for. And if the bio
2045 * was empty, let the whole thing through because: "Note
2046 * that a block device *must* allow a single page to be
2047 * added to an empty bio."
2049 rbd_assert(bvec
->bv_len
<= PAGE_SIZE
);
2050 if (ret
> (int) bvec
->bv_len
|| !bmd
->bi_size
)
2051 ret
= (int) bvec
->bv_len
;
2056 static void rbd_free_disk(struct rbd_device
*rbd_dev
)
2058 struct gendisk
*disk
= rbd_dev
->disk
;
2063 if (disk
->flags
& GENHD_FL_UP
)
2066 blk_cleanup_queue(disk
->queue
);
2070 static int rbd_obj_read_sync(struct rbd_device
*rbd_dev
,
2071 const char *object_name
,
2072 u64 offset
, u64 length
,
2073 char *buf
, u64
*version
)
2076 struct ceph_osd_req_op
*op
;
2077 struct rbd_obj_request
*obj_request
;
2078 struct ceph_osd_client
*osdc
;
2079 struct page
**pages
= NULL
;
2083 page_count
= (u32
) calc_pages_for(offset
, length
);
2084 pages
= ceph_alloc_page_vector(page_count
, GFP_KERNEL
);
2086 ret
= PTR_ERR(pages
);
2089 obj_request
= rbd_obj_request_create(object_name
, offset
, length
,
2094 obj_request
->pages
= pages
;
2095 obj_request
->page_count
= page_count
;
2097 op
= rbd_osd_req_op_create(CEPH_OSD_OP_READ
, offset
, length
);
2100 obj_request
->osd_req
= rbd_osd_req_create(rbd_dev
, false,
2102 rbd_osd_req_op_destroy(op
);
2103 if (!obj_request
->osd_req
)
2106 osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
2107 ret
= rbd_obj_request_submit(osdc
, obj_request
);
2110 ret
= rbd_obj_request_wait(obj_request
);
2114 ret
= obj_request
->result
;
2117 ret
= ceph_copy_from_page_vector(pages
, buf
, 0, obj_request
->xferred
);
2119 *version
= obj_request
->version
;
2122 rbd_obj_request_put(obj_request
);
2124 ceph_release_page_vector(pages
, page_count
);
2130 * Read the complete header for the given rbd device.
2132 * Returns a pointer to a dynamically-allocated buffer containing
2133 * the complete and validated header. Caller can pass the address
2134 * of a variable that will be filled in with the version of the
2135 * header object at the time it was read.
2137 * Returns a pointer-coded errno if a failure occurs.
2139 static struct rbd_image_header_ondisk
*
2140 rbd_dev_v1_header_read(struct rbd_device
*rbd_dev
, u64
*version
)
2142 struct rbd_image_header_ondisk
*ondisk
= NULL
;
2149 * The complete header will include an array of its 64-bit
2150 * snapshot ids, followed by the names of those snapshots as
2151 * a contiguous block of NUL-terminated strings. Note that
2152 * the number of snapshots could change by the time we read
2153 * it in, in which case we re-read it.
2160 size
= sizeof (*ondisk
);
2161 size
+= snap_count
* sizeof (struct rbd_image_snap_ondisk
);
2163 ondisk
= kmalloc(size
, GFP_KERNEL
);
2165 return ERR_PTR(-ENOMEM
);
2167 ret
= rbd_obj_read_sync(rbd_dev
, rbd_dev
->header_name
,
2169 (char *) ondisk
, version
);
2173 if (WARN_ON((size_t) ret
< size
)) {
2175 rbd_warn(rbd_dev
, "short header read (want %zd got %d)",
2179 if (!rbd_dev_ondisk_valid(ondisk
)) {
2181 rbd_warn(rbd_dev
, "invalid header");
2185 names_size
= le64_to_cpu(ondisk
->snap_names_len
);
2186 want_count
= snap_count
;
2187 snap_count
= le32_to_cpu(ondisk
->snap_count
);
2188 } while (snap_count
!= want_count
);
2195 return ERR_PTR(ret
);
2199 * reload the ondisk the header
2201 static int rbd_read_header(struct rbd_device
*rbd_dev
,
2202 struct rbd_image_header
*header
)
2204 struct rbd_image_header_ondisk
*ondisk
;
2208 ondisk
= rbd_dev_v1_header_read(rbd_dev
, &ver
);
2210 return PTR_ERR(ondisk
);
2211 ret
= rbd_header_from_disk(header
, ondisk
);
2213 header
->obj_version
= ver
;
2219 static void rbd_remove_all_snaps(struct rbd_device
*rbd_dev
)
2221 struct rbd_snap
*snap
;
2222 struct rbd_snap
*next
;
2224 list_for_each_entry_safe(snap
, next
, &rbd_dev
->snaps
, node
)
2225 rbd_remove_snap_dev(snap
);
2228 static void rbd_update_mapping_size(struct rbd_device
*rbd_dev
)
2232 if (rbd_dev
->spec
->snap_id
!= CEPH_NOSNAP
)
2235 size
= (sector_t
) rbd_dev
->header
.image_size
/ SECTOR_SIZE
;
2236 dout("setting size to %llu sectors", (unsigned long long) size
);
2237 rbd_dev
->mapping
.size
= (u64
) size
;
2238 set_capacity(rbd_dev
->disk
, size
);
2242 * only read the first part of the ondisk header, without the snaps info
2244 static int rbd_dev_v1_refresh(struct rbd_device
*rbd_dev
, u64
*hver
)
2247 struct rbd_image_header h
;
2249 ret
= rbd_read_header(rbd_dev
, &h
);
2253 down_write(&rbd_dev
->header_rwsem
);
2255 /* Update image size, and check for resize of mapped image */
2256 rbd_dev
->header
.image_size
= h
.image_size
;
2257 rbd_update_mapping_size(rbd_dev
);
2259 /* rbd_dev->header.object_prefix shouldn't change */
2260 kfree(rbd_dev
->header
.snap_sizes
);
2261 kfree(rbd_dev
->header
.snap_names
);
2262 /* osd requests may still refer to snapc */
2263 ceph_put_snap_context(rbd_dev
->header
.snapc
);
2266 *hver
= h
.obj_version
;
2267 rbd_dev
->header
.obj_version
= h
.obj_version
;
2268 rbd_dev
->header
.image_size
= h
.image_size
;
2269 rbd_dev
->header
.snapc
= h
.snapc
;
2270 rbd_dev
->header
.snap_names
= h
.snap_names
;
2271 rbd_dev
->header
.snap_sizes
= h
.snap_sizes
;
2272 /* Free the extra copy of the object prefix */
2273 WARN_ON(strcmp(rbd_dev
->header
.object_prefix
, h
.object_prefix
));
2274 kfree(h
.object_prefix
);
2276 ret
= rbd_dev_snaps_update(rbd_dev
);
2278 ret
= rbd_dev_snaps_register(rbd_dev
);
2280 up_write(&rbd_dev
->header_rwsem
);
2285 static int rbd_dev_refresh(struct rbd_device
*rbd_dev
, u64
*hver
)
2289 rbd_assert(rbd_image_format_valid(rbd_dev
->image_format
));
2290 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2291 if (rbd_dev
->image_format
== 1)
2292 ret
= rbd_dev_v1_refresh(rbd_dev
, hver
);
2294 ret
= rbd_dev_v2_refresh(rbd_dev
, hver
);
2295 mutex_unlock(&ctl_mutex
);
2300 static int rbd_init_disk(struct rbd_device
*rbd_dev
)
2302 struct gendisk
*disk
;
2303 struct request_queue
*q
;
2306 /* create gendisk info */
2307 disk
= alloc_disk(RBD_MINORS_PER_MAJOR
);
2311 snprintf(disk
->disk_name
, sizeof(disk
->disk_name
), RBD_DRV_NAME
"%d",
2313 disk
->major
= rbd_dev
->major
;
2314 disk
->first_minor
= 0;
2315 disk
->fops
= &rbd_bd_ops
;
2316 disk
->private_data
= rbd_dev
;
2318 q
= blk_init_queue(rbd_request_fn
, &rbd_dev
->lock
);
2322 /* We use the default size, but let's be explicit about it. */
2323 blk_queue_physical_block_size(q
, SECTOR_SIZE
);
2325 /* set io sizes to object size */
2326 segment_size
= rbd_obj_bytes(&rbd_dev
->header
);
2327 blk_queue_max_hw_sectors(q
, segment_size
/ SECTOR_SIZE
);
2328 blk_queue_max_segment_size(q
, segment_size
);
2329 blk_queue_io_min(q
, segment_size
);
2330 blk_queue_io_opt(q
, segment_size
);
2332 blk_queue_merge_bvec(q
, rbd_merge_bvec
);
2335 q
->queuedata
= rbd_dev
;
2337 rbd_dev
->disk
= disk
;
2339 set_capacity(rbd_dev
->disk
, rbd_dev
->mapping
.size
/ SECTOR_SIZE
);
2352 static struct rbd_device
*dev_to_rbd_dev(struct device
*dev
)
2354 return container_of(dev
, struct rbd_device
, dev
);
2357 static ssize_t
rbd_size_show(struct device
*dev
,
2358 struct device_attribute
*attr
, char *buf
)
2360 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2363 down_read(&rbd_dev
->header_rwsem
);
2364 size
= get_capacity(rbd_dev
->disk
);
2365 up_read(&rbd_dev
->header_rwsem
);
2367 return sprintf(buf
, "%llu\n", (unsigned long long) size
* SECTOR_SIZE
);
2371 * Note this shows the features for whatever's mapped, which is not
2372 * necessarily the base image.
2374 static ssize_t
rbd_features_show(struct device
*dev
,
2375 struct device_attribute
*attr
, char *buf
)
2377 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2379 return sprintf(buf
, "0x%016llx\n",
2380 (unsigned long long) rbd_dev
->mapping
.features
);
2383 static ssize_t
rbd_major_show(struct device
*dev
,
2384 struct device_attribute
*attr
, char *buf
)
2386 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2388 return sprintf(buf
, "%d\n", rbd_dev
->major
);
2391 static ssize_t
rbd_client_id_show(struct device
*dev
,
2392 struct device_attribute
*attr
, char *buf
)
2394 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2396 return sprintf(buf
, "client%lld\n",
2397 ceph_client_id(rbd_dev
->rbd_client
->client
));
2400 static ssize_t
rbd_pool_show(struct device
*dev
,
2401 struct device_attribute
*attr
, char *buf
)
2403 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2405 return sprintf(buf
, "%s\n", rbd_dev
->spec
->pool_name
);
2408 static ssize_t
rbd_pool_id_show(struct device
*dev
,
2409 struct device_attribute
*attr
, char *buf
)
2411 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2413 return sprintf(buf
, "%llu\n",
2414 (unsigned long long) rbd_dev
->spec
->pool_id
);
2417 static ssize_t
rbd_name_show(struct device
*dev
,
2418 struct device_attribute
*attr
, char *buf
)
2420 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2422 if (rbd_dev
->spec
->image_name
)
2423 return sprintf(buf
, "%s\n", rbd_dev
->spec
->image_name
);
2425 return sprintf(buf
, "(unknown)\n");
2428 static ssize_t
rbd_image_id_show(struct device
*dev
,
2429 struct device_attribute
*attr
, char *buf
)
2431 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2433 return sprintf(buf
, "%s\n", rbd_dev
->spec
->image_id
);
2437 * Shows the name of the currently-mapped snapshot (or
2438 * RBD_SNAP_HEAD_NAME for the base image).
2440 static ssize_t
rbd_snap_show(struct device
*dev
,
2441 struct device_attribute
*attr
,
2444 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2446 return sprintf(buf
, "%s\n", rbd_dev
->spec
->snap_name
);
2450 * For an rbd v2 image, shows the pool id, image id, and snapshot id
2451 * for the parent image. If there is no parent, simply shows
2452 * "(no parent image)".
2454 static ssize_t
rbd_parent_show(struct device
*dev
,
2455 struct device_attribute
*attr
,
2458 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2459 struct rbd_spec
*spec
= rbd_dev
->parent_spec
;
2464 return sprintf(buf
, "(no parent image)\n");
2466 count
= sprintf(bufp
, "pool_id %llu\npool_name %s\n",
2467 (unsigned long long) spec
->pool_id
, spec
->pool_name
);
2472 count
= sprintf(bufp
, "image_id %s\nimage_name %s\n", spec
->image_id
,
2473 spec
->image_name
? spec
->image_name
: "(unknown)");
2478 count
= sprintf(bufp
, "snap_id %llu\nsnap_name %s\n",
2479 (unsigned long long) spec
->snap_id
, spec
->snap_name
);
2484 count
= sprintf(bufp
, "overlap %llu\n", rbd_dev
->parent_overlap
);
2489 return (ssize_t
) (bufp
- buf
);
2492 static ssize_t
rbd_image_refresh(struct device
*dev
,
2493 struct device_attribute
*attr
,
2497 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2500 ret
= rbd_dev_refresh(rbd_dev
, NULL
);
2502 return ret
< 0 ? ret
: size
;
2505 static DEVICE_ATTR(size
, S_IRUGO
, rbd_size_show
, NULL
);
2506 static DEVICE_ATTR(features
, S_IRUGO
, rbd_features_show
, NULL
);
2507 static DEVICE_ATTR(major
, S_IRUGO
, rbd_major_show
, NULL
);
2508 static DEVICE_ATTR(client_id
, S_IRUGO
, rbd_client_id_show
, NULL
);
2509 static DEVICE_ATTR(pool
, S_IRUGO
, rbd_pool_show
, NULL
);
2510 static DEVICE_ATTR(pool_id
, S_IRUGO
, rbd_pool_id_show
, NULL
);
2511 static DEVICE_ATTR(name
, S_IRUGO
, rbd_name_show
, NULL
);
2512 static DEVICE_ATTR(image_id
, S_IRUGO
, rbd_image_id_show
, NULL
);
2513 static DEVICE_ATTR(refresh
, S_IWUSR
, NULL
, rbd_image_refresh
);
2514 static DEVICE_ATTR(current_snap
, S_IRUGO
, rbd_snap_show
, NULL
);
2515 static DEVICE_ATTR(parent
, S_IRUGO
, rbd_parent_show
, NULL
);
2517 static struct attribute
*rbd_attrs
[] = {
2518 &dev_attr_size
.attr
,
2519 &dev_attr_features
.attr
,
2520 &dev_attr_major
.attr
,
2521 &dev_attr_client_id
.attr
,
2522 &dev_attr_pool
.attr
,
2523 &dev_attr_pool_id
.attr
,
2524 &dev_attr_name
.attr
,
2525 &dev_attr_image_id
.attr
,
2526 &dev_attr_current_snap
.attr
,
2527 &dev_attr_parent
.attr
,
2528 &dev_attr_refresh
.attr
,
2532 static struct attribute_group rbd_attr_group
= {
2536 static const struct attribute_group
*rbd_attr_groups
[] = {
2541 static void rbd_sysfs_dev_release(struct device
*dev
)
2545 static struct device_type rbd_device_type
= {
2547 .groups
= rbd_attr_groups
,
2548 .release
= rbd_sysfs_dev_release
,
2556 static ssize_t
rbd_snap_size_show(struct device
*dev
,
2557 struct device_attribute
*attr
,
2560 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
2562 return sprintf(buf
, "%llu\n", (unsigned long long)snap
->size
);
2565 static ssize_t
rbd_snap_id_show(struct device
*dev
,
2566 struct device_attribute
*attr
,
2569 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
2571 return sprintf(buf
, "%llu\n", (unsigned long long)snap
->id
);
2574 static ssize_t
rbd_snap_features_show(struct device
*dev
,
2575 struct device_attribute
*attr
,
2578 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
2580 return sprintf(buf
, "0x%016llx\n",
2581 (unsigned long long) snap
->features
);
2584 static DEVICE_ATTR(snap_size
, S_IRUGO
, rbd_snap_size_show
, NULL
);
2585 static DEVICE_ATTR(snap_id
, S_IRUGO
, rbd_snap_id_show
, NULL
);
2586 static DEVICE_ATTR(snap_features
, S_IRUGO
, rbd_snap_features_show
, NULL
);
2588 static struct attribute
*rbd_snap_attrs
[] = {
2589 &dev_attr_snap_size
.attr
,
2590 &dev_attr_snap_id
.attr
,
2591 &dev_attr_snap_features
.attr
,
2595 static struct attribute_group rbd_snap_attr_group
= {
2596 .attrs
= rbd_snap_attrs
,
2599 static void rbd_snap_dev_release(struct device
*dev
)
2601 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
2606 static const struct attribute_group
*rbd_snap_attr_groups
[] = {
2607 &rbd_snap_attr_group
,
2611 static struct device_type rbd_snap_device_type
= {
2612 .groups
= rbd_snap_attr_groups
,
2613 .release
= rbd_snap_dev_release
,
2616 static struct rbd_spec
*rbd_spec_get(struct rbd_spec
*spec
)
2618 kref_get(&spec
->kref
);
2623 static void rbd_spec_free(struct kref
*kref
);
2624 static void rbd_spec_put(struct rbd_spec
*spec
)
2627 kref_put(&spec
->kref
, rbd_spec_free
);
2630 static struct rbd_spec
*rbd_spec_alloc(void)
2632 struct rbd_spec
*spec
;
2634 spec
= kzalloc(sizeof (*spec
), GFP_KERNEL
);
2637 kref_init(&spec
->kref
);
2639 rbd_spec_put(rbd_spec_get(spec
)); /* TEMPORARY */
2644 static void rbd_spec_free(struct kref
*kref
)
2646 struct rbd_spec
*spec
= container_of(kref
, struct rbd_spec
, kref
);
2648 kfree(spec
->pool_name
);
2649 kfree(spec
->image_id
);
2650 kfree(spec
->image_name
);
2651 kfree(spec
->snap_name
);
2655 struct rbd_device
*rbd_dev_create(struct rbd_client
*rbdc
,
2656 struct rbd_spec
*spec
)
2658 struct rbd_device
*rbd_dev
;
2660 rbd_dev
= kzalloc(sizeof (*rbd_dev
), GFP_KERNEL
);
2664 spin_lock_init(&rbd_dev
->lock
);
2665 atomic_set(&rbd_dev
->exists
, 0);
2666 INIT_LIST_HEAD(&rbd_dev
->node
);
2667 INIT_LIST_HEAD(&rbd_dev
->snaps
);
2668 init_rwsem(&rbd_dev
->header_rwsem
);
2670 rbd_dev
->spec
= spec
;
2671 rbd_dev
->rbd_client
= rbdc
;
2673 /* Initialize the layout used for all rbd requests */
2675 rbd_dev
->layout
.fl_stripe_unit
= cpu_to_le32(1 << RBD_MAX_OBJ_ORDER
);
2676 rbd_dev
->layout
.fl_stripe_count
= cpu_to_le32(1);
2677 rbd_dev
->layout
.fl_object_size
= cpu_to_le32(1 << RBD_MAX_OBJ_ORDER
);
2678 rbd_dev
->layout
.fl_pg_pool
= cpu_to_le32((u32
) spec
->pool_id
);
2683 static void rbd_dev_destroy(struct rbd_device
*rbd_dev
)
2685 rbd_spec_put(rbd_dev
->parent_spec
);
2686 kfree(rbd_dev
->header_name
);
2687 rbd_put_client(rbd_dev
->rbd_client
);
2688 rbd_spec_put(rbd_dev
->spec
);
2692 static bool rbd_snap_registered(struct rbd_snap
*snap
)
2694 bool ret
= snap
->dev
.type
== &rbd_snap_device_type
;
2695 bool reg
= device_is_registered(&snap
->dev
);
2697 rbd_assert(!ret
^ reg
);
2702 static void rbd_remove_snap_dev(struct rbd_snap
*snap
)
2704 list_del(&snap
->node
);
2705 if (device_is_registered(&snap
->dev
))
2706 device_unregister(&snap
->dev
);
2709 static int rbd_register_snap_dev(struct rbd_snap
*snap
,
2710 struct device
*parent
)
2712 struct device
*dev
= &snap
->dev
;
2715 dev
->type
= &rbd_snap_device_type
;
2716 dev
->parent
= parent
;
2717 dev
->release
= rbd_snap_dev_release
;
2718 dev_set_name(dev
, "%s%s", RBD_SNAP_DEV_NAME_PREFIX
, snap
->name
);
2719 dout("%s: registering device for snapshot %s\n", __func__
, snap
->name
);
2721 ret
= device_register(dev
);
2726 static struct rbd_snap
*__rbd_add_snap_dev(struct rbd_device
*rbd_dev
,
2727 const char *snap_name
,
2728 u64 snap_id
, u64 snap_size
,
2731 struct rbd_snap
*snap
;
2734 snap
= kzalloc(sizeof (*snap
), GFP_KERNEL
);
2736 return ERR_PTR(-ENOMEM
);
2739 snap
->name
= kstrdup(snap_name
, GFP_KERNEL
);
2744 snap
->size
= snap_size
;
2745 snap
->features
= snap_features
;
2753 return ERR_PTR(ret
);
2756 static char *rbd_dev_v1_snap_info(struct rbd_device
*rbd_dev
, u32 which
,
2757 u64
*snap_size
, u64
*snap_features
)
2761 rbd_assert(which
< rbd_dev
->header
.snapc
->num_snaps
);
2763 *snap_size
= rbd_dev
->header
.snap_sizes
[which
];
2764 *snap_features
= 0; /* No features for v1 */
2766 /* Skip over names until we find the one we are looking for */
2768 snap_name
= rbd_dev
->header
.snap_names
;
2770 snap_name
+= strlen(snap_name
) + 1;
2776 * Get the size and object order for an image snapshot, or if
2777 * snap_id is CEPH_NOSNAP, gets this information for the base
2780 static int _rbd_dev_v2_snap_size(struct rbd_device
*rbd_dev
, u64 snap_id
,
2781 u8
*order
, u64
*snap_size
)
2783 __le64 snapid
= cpu_to_le64(snap_id
);
2788 } __attribute__ ((packed
)) size_buf
= { 0 };
2790 ret
= rbd_req_sync_exec(rbd_dev
, rbd_dev
->header_name
,
2792 (char *) &snapid
, sizeof (snapid
),
2793 (char *) &size_buf
, sizeof (size_buf
), NULL
);
2794 dout("%s: rbd_req_sync_exec returned %d\n", __func__
, ret
);
2798 *order
= size_buf
.order
;
2799 *snap_size
= le64_to_cpu(size_buf
.size
);
2801 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
2802 (unsigned long long) snap_id
, (unsigned int) *order
,
2803 (unsigned long long) *snap_size
);
2808 static int rbd_dev_v2_image_size(struct rbd_device
*rbd_dev
)
2810 return _rbd_dev_v2_snap_size(rbd_dev
, CEPH_NOSNAP
,
2811 &rbd_dev
->header
.obj_order
,
2812 &rbd_dev
->header
.image_size
);
2815 static int rbd_dev_v2_object_prefix(struct rbd_device
*rbd_dev
)
2821 reply_buf
= kzalloc(RBD_OBJ_PREFIX_LEN_MAX
, GFP_KERNEL
);
2825 ret
= rbd_req_sync_exec(rbd_dev
, rbd_dev
->header_name
,
2826 "rbd", "get_object_prefix",
2828 reply_buf
, RBD_OBJ_PREFIX_LEN_MAX
, NULL
);
2829 dout("%s: rbd_req_sync_exec returned %d\n", __func__
, ret
);
2832 ret
= 0; /* rbd_req_sync_exec() can return positive */
2835 rbd_dev
->header
.object_prefix
= ceph_extract_encoded_string(&p
,
2836 p
+ RBD_OBJ_PREFIX_LEN_MAX
,
2839 if (IS_ERR(rbd_dev
->header
.object_prefix
)) {
2840 ret
= PTR_ERR(rbd_dev
->header
.object_prefix
);
2841 rbd_dev
->header
.object_prefix
= NULL
;
2843 dout(" object_prefix = %s\n", rbd_dev
->header
.object_prefix
);
2852 static int _rbd_dev_v2_snap_features(struct rbd_device
*rbd_dev
, u64 snap_id
,
2855 __le64 snapid
= cpu_to_le64(snap_id
);
2859 } features_buf
= { 0 };
2863 ret
= rbd_req_sync_exec(rbd_dev
, rbd_dev
->header_name
,
2864 "rbd", "get_features",
2865 (char *) &snapid
, sizeof (snapid
),
2866 (char *) &features_buf
, sizeof (features_buf
),
2868 dout("%s: rbd_req_sync_exec returned %d\n", __func__
, ret
);
2872 incompat
= le64_to_cpu(features_buf
.incompat
);
2873 if (incompat
& ~RBD_FEATURES_ALL
)
2876 *snap_features
= le64_to_cpu(features_buf
.features
);
2878 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2879 (unsigned long long) snap_id
,
2880 (unsigned long long) *snap_features
,
2881 (unsigned long long) le64_to_cpu(features_buf
.incompat
));
2886 static int rbd_dev_v2_features(struct rbd_device
*rbd_dev
)
2888 return _rbd_dev_v2_snap_features(rbd_dev
, CEPH_NOSNAP
,
2889 &rbd_dev
->header
.features
);
2892 static int rbd_dev_v2_parent_info(struct rbd_device
*rbd_dev
)
2894 struct rbd_spec
*parent_spec
;
2896 void *reply_buf
= NULL
;
2904 parent_spec
= rbd_spec_alloc();
2908 size
= sizeof (__le64
) + /* pool_id */
2909 sizeof (__le32
) + RBD_IMAGE_ID_LEN_MAX
+ /* image_id */
2910 sizeof (__le64
) + /* snap_id */
2911 sizeof (__le64
); /* overlap */
2912 reply_buf
= kmalloc(size
, GFP_KERNEL
);
2918 snapid
= cpu_to_le64(CEPH_NOSNAP
);
2919 ret
= rbd_req_sync_exec(rbd_dev
, rbd_dev
->header_name
,
2920 "rbd", "get_parent",
2921 (char *) &snapid
, sizeof (snapid
),
2922 (char *) reply_buf
, size
, NULL
);
2923 dout("%s: rbd_req_sync_exec returned %d\n", __func__
, ret
);
2929 end
= (char *) reply_buf
+ size
;
2930 ceph_decode_64_safe(&p
, end
, parent_spec
->pool_id
, out_err
);
2931 if (parent_spec
->pool_id
== CEPH_NOPOOL
)
2932 goto out
; /* No parent? No problem. */
2934 /* The ceph file layout needs to fit pool id in 32 bits */
2937 if (WARN_ON(parent_spec
->pool_id
> (u64
) U32_MAX
))
2940 image_id
= ceph_extract_encoded_string(&p
, end
, NULL
, GFP_KERNEL
);
2941 if (IS_ERR(image_id
)) {
2942 ret
= PTR_ERR(image_id
);
2945 parent_spec
->image_id
= image_id
;
2946 ceph_decode_64_safe(&p
, end
, parent_spec
->snap_id
, out_err
);
2947 ceph_decode_64_safe(&p
, end
, overlap
, out_err
);
2949 rbd_dev
->parent_overlap
= overlap
;
2950 rbd_dev
->parent_spec
= parent_spec
;
2951 parent_spec
= NULL
; /* rbd_dev now owns this */
2956 rbd_spec_put(parent_spec
);
2961 static char *rbd_dev_image_name(struct rbd_device
*rbd_dev
)
2963 size_t image_id_size
;
2968 void *reply_buf
= NULL
;
2970 char *image_name
= NULL
;
2973 rbd_assert(!rbd_dev
->spec
->image_name
);
2975 len
= strlen(rbd_dev
->spec
->image_id
);
2976 image_id_size
= sizeof (__le32
) + len
;
2977 image_id
= kmalloc(image_id_size
, GFP_KERNEL
);
2982 end
= (char *) image_id
+ image_id_size
;
2983 ceph_encode_string(&p
, end
, rbd_dev
->spec
->image_id
, (u32
) len
);
2985 size
= sizeof (__le32
) + RBD_IMAGE_NAME_LEN_MAX
;
2986 reply_buf
= kmalloc(size
, GFP_KERNEL
);
2990 ret
= rbd_req_sync_exec(rbd_dev
, RBD_DIRECTORY
,
2991 "rbd", "dir_get_name",
2992 image_id
, image_id_size
,
2993 (char *) reply_buf
, size
, NULL
);
2997 end
= (char *) reply_buf
+ size
;
2998 image_name
= ceph_extract_encoded_string(&p
, end
, &len
, GFP_KERNEL
);
2999 if (IS_ERR(image_name
))
3002 dout("%s: name is %s len is %zd\n", __func__
, image_name
, len
);
3011 * When a parent image gets probed, we only have the pool, image,
3012 * and snapshot ids but not the names of any of them. This call
3013 * is made later to fill in those names. It has to be done after
3014 * rbd_dev_snaps_update() has completed because some of the
3015 * information (in particular, snapshot name) is not available
3018 static int rbd_dev_probe_update_spec(struct rbd_device
*rbd_dev
)
3020 struct ceph_osd_client
*osdc
;
3022 void *reply_buf
= NULL
;
3025 if (rbd_dev
->spec
->pool_name
)
3026 return 0; /* Already have the names */
3028 /* Look up the pool name */
3030 osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
3031 name
= ceph_pg_pool_name_by_id(osdc
->osdmap
, rbd_dev
->spec
->pool_id
);
3033 rbd_warn(rbd_dev
, "there is no pool with id %llu",
3034 rbd_dev
->spec
->pool_id
); /* Really a BUG() */
3038 rbd_dev
->spec
->pool_name
= kstrdup(name
, GFP_KERNEL
);
3039 if (!rbd_dev
->spec
->pool_name
)
3042 /* Fetch the image name; tolerate failure here */
3044 name
= rbd_dev_image_name(rbd_dev
);
3046 rbd_dev
->spec
->image_name
= (char *) name
;
3048 rbd_warn(rbd_dev
, "unable to get image name");
3050 /* Look up the snapshot name. */
3052 name
= rbd_snap_name(rbd_dev
, rbd_dev
->spec
->snap_id
);
3054 rbd_warn(rbd_dev
, "no snapshot with id %llu",
3055 rbd_dev
->spec
->snap_id
); /* Really a BUG() */
3059 rbd_dev
->spec
->snap_name
= kstrdup(name
, GFP_KERNEL
);
3060 if(!rbd_dev
->spec
->snap_name
)
3066 kfree(rbd_dev
->spec
->pool_name
);
3067 rbd_dev
->spec
->pool_name
= NULL
;
3072 static int rbd_dev_v2_snap_context(struct rbd_device
*rbd_dev
, u64
*ver
)
3081 struct ceph_snap_context
*snapc
;
3085 * We'll need room for the seq value (maximum snapshot id),
3086 * snapshot count, and array of that many snapshot ids.
3087 * For now we have a fixed upper limit on the number we're
3088 * prepared to receive.
3090 size
= sizeof (__le64
) + sizeof (__le32
) +
3091 RBD_MAX_SNAP_COUNT
* sizeof (__le64
);
3092 reply_buf
= kzalloc(size
, GFP_KERNEL
);
3096 ret
= rbd_req_sync_exec(rbd_dev
, rbd_dev
->header_name
,
3097 "rbd", "get_snapcontext",
3099 reply_buf
, size
, ver
);
3100 dout("%s: rbd_req_sync_exec returned %d\n", __func__
, ret
);
3106 end
= (char *) reply_buf
+ size
;
3107 ceph_decode_64_safe(&p
, end
, seq
, out
);
3108 ceph_decode_32_safe(&p
, end
, snap_count
, out
);
3111 * Make sure the reported number of snapshot ids wouldn't go
3112 * beyond the end of our buffer. But before checking that,
3113 * make sure the computed size of the snapshot context we
3114 * allocate is representable in a size_t.
3116 if (snap_count
> (SIZE_MAX
- sizeof (struct ceph_snap_context
))
3121 if (!ceph_has_room(&p
, end
, snap_count
* sizeof (__le64
)))
3124 size
= sizeof (struct ceph_snap_context
) +
3125 snap_count
* sizeof (snapc
->snaps
[0]);
3126 snapc
= kmalloc(size
, GFP_KERNEL
);
3132 atomic_set(&snapc
->nref
, 1);
3134 snapc
->num_snaps
= snap_count
;
3135 for (i
= 0; i
< snap_count
; i
++)
3136 snapc
->snaps
[i
] = ceph_decode_64(&p
);
3138 rbd_dev
->header
.snapc
= snapc
;
3140 dout(" snap context seq = %llu, snap_count = %u\n",
3141 (unsigned long long) seq
, (unsigned int) snap_count
);
3149 static char *rbd_dev_v2_snap_name(struct rbd_device
*rbd_dev
, u32 which
)
3159 size
= sizeof (__le32
) + RBD_MAX_SNAP_NAME_LEN
;
3160 reply_buf
= kmalloc(size
, GFP_KERNEL
);
3162 return ERR_PTR(-ENOMEM
);
3164 snap_id
= cpu_to_le64(rbd_dev
->header
.snapc
->snaps
[which
]);
3165 ret
= rbd_req_sync_exec(rbd_dev
, rbd_dev
->header_name
,
3166 "rbd", "get_snapshot_name",
3167 (char *) &snap_id
, sizeof (snap_id
),
3168 reply_buf
, size
, NULL
);
3169 dout("%s: rbd_req_sync_exec returned %d\n", __func__
, ret
);
3174 end
= (char *) reply_buf
+ size
;
3175 snap_name
= ceph_extract_encoded_string(&p
, end
, NULL
, GFP_KERNEL
);
3176 if (IS_ERR(snap_name
)) {
3177 ret
= PTR_ERR(snap_name
);
3180 dout(" snap_id 0x%016llx snap_name = %s\n",
3181 (unsigned long long) le64_to_cpu(snap_id
), snap_name
);
3189 return ERR_PTR(ret
);
3192 static char *rbd_dev_v2_snap_info(struct rbd_device
*rbd_dev
, u32 which
,
3193 u64
*snap_size
, u64
*snap_features
)
3199 snap_id
= rbd_dev
->header
.snapc
->snaps
[which
];
3200 ret
= _rbd_dev_v2_snap_size(rbd_dev
, snap_id
, &order
, snap_size
);
3202 return ERR_PTR(ret
);
3203 ret
= _rbd_dev_v2_snap_features(rbd_dev
, snap_id
, snap_features
);
3205 return ERR_PTR(ret
);
3207 return rbd_dev_v2_snap_name(rbd_dev
, which
);
3210 static char *rbd_dev_snap_info(struct rbd_device
*rbd_dev
, u32 which
,
3211 u64
*snap_size
, u64
*snap_features
)
3213 if (rbd_dev
->image_format
== 1)
3214 return rbd_dev_v1_snap_info(rbd_dev
, which
,
3215 snap_size
, snap_features
);
3216 if (rbd_dev
->image_format
== 2)
3217 return rbd_dev_v2_snap_info(rbd_dev
, which
,
3218 snap_size
, snap_features
);
3219 return ERR_PTR(-EINVAL
);
3222 static int rbd_dev_v2_refresh(struct rbd_device
*rbd_dev
, u64
*hver
)
3227 down_write(&rbd_dev
->header_rwsem
);
3229 /* Grab old order first, to see if it changes */
3231 obj_order
= rbd_dev
->header
.obj_order
,
3232 ret
= rbd_dev_v2_image_size(rbd_dev
);
3235 if (rbd_dev
->header
.obj_order
!= obj_order
) {
3239 rbd_update_mapping_size(rbd_dev
);
3241 ret
= rbd_dev_v2_snap_context(rbd_dev
, hver
);
3242 dout("rbd_dev_v2_snap_context returned %d\n", ret
);
3245 ret
= rbd_dev_snaps_update(rbd_dev
);
3246 dout("rbd_dev_snaps_update returned %d\n", ret
);
3249 ret
= rbd_dev_snaps_register(rbd_dev
);
3250 dout("rbd_dev_snaps_register returned %d\n", ret
);
3252 up_write(&rbd_dev
->header_rwsem
);
3258 * Scan the rbd device's current snapshot list and compare it to the
3259 * newly-received snapshot context. Remove any existing snapshots
3260 * not present in the new snapshot context. Add a new snapshot for
3261 * any snaphots in the snapshot context not in the current list.
3262 * And verify there are no changes to snapshots we already know
3265 * Assumes the snapshots in the snapshot context are sorted by
3266 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
3267 * are also maintained in that order.)
3269 static int rbd_dev_snaps_update(struct rbd_device
*rbd_dev
)
3271 struct ceph_snap_context
*snapc
= rbd_dev
->header
.snapc
;
3272 const u32 snap_count
= snapc
->num_snaps
;
3273 struct list_head
*head
= &rbd_dev
->snaps
;
3274 struct list_head
*links
= head
->next
;
3277 dout("%s: snap count is %u\n", __func__
, (unsigned int) snap_count
);
3278 while (index
< snap_count
|| links
!= head
) {
3280 struct rbd_snap
*snap
;
3283 u64 snap_features
= 0;
3285 snap_id
= index
< snap_count
? snapc
->snaps
[index
]
3287 snap
= links
!= head
? list_entry(links
, struct rbd_snap
, node
)
3289 rbd_assert(!snap
|| snap
->id
!= CEPH_NOSNAP
);
3291 if (snap_id
== CEPH_NOSNAP
|| (snap
&& snap
->id
> snap_id
)) {
3292 struct list_head
*next
= links
->next
;
3294 /* Existing snapshot not in the new snap context */
3296 if (rbd_dev
->spec
->snap_id
== snap
->id
)
3297 atomic_set(&rbd_dev
->exists
, 0);
3298 rbd_remove_snap_dev(snap
);
3299 dout("%ssnap id %llu has been removed\n",
3300 rbd_dev
->spec
->snap_id
== snap
->id
?
3302 (unsigned long long) snap
->id
);
3304 /* Done with this list entry; advance */
3310 snap_name
= rbd_dev_snap_info(rbd_dev
, index
,
3311 &snap_size
, &snap_features
);
3312 if (IS_ERR(snap_name
))
3313 return PTR_ERR(snap_name
);
3315 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count
,
3316 (unsigned long long) snap_id
);
3317 if (!snap
|| (snap_id
!= CEPH_NOSNAP
&& snap
->id
< snap_id
)) {
3318 struct rbd_snap
*new_snap
;
3320 /* We haven't seen this snapshot before */
3322 new_snap
= __rbd_add_snap_dev(rbd_dev
, snap_name
,
3323 snap_id
, snap_size
, snap_features
);
3324 if (IS_ERR(new_snap
)) {
3325 int err
= PTR_ERR(new_snap
);
3327 dout(" failed to add dev, error %d\n", err
);
3332 /* New goes before existing, or at end of list */
3334 dout(" added dev%s\n", snap
? "" : " at end\n");
3336 list_add_tail(&new_snap
->node
, &snap
->node
);
3338 list_add_tail(&new_snap
->node
, head
);
3340 /* Already have this one */
3342 dout(" already present\n");
3344 rbd_assert(snap
->size
== snap_size
);
3345 rbd_assert(!strcmp(snap
->name
, snap_name
));
3346 rbd_assert(snap
->features
== snap_features
);
3348 /* Done with this list entry; advance */
3350 links
= links
->next
;
3353 /* Advance to the next entry in the snapshot context */
3357 dout("%s: done\n", __func__
);
3363 * Scan the list of snapshots and register the devices for any that
3364 * have not already been registered.
3366 static int rbd_dev_snaps_register(struct rbd_device
*rbd_dev
)
3368 struct rbd_snap
*snap
;
3371 dout("%s called\n", __func__
);
3372 if (WARN_ON(!device_is_registered(&rbd_dev
->dev
)))
3375 list_for_each_entry(snap
, &rbd_dev
->snaps
, node
) {
3376 if (!rbd_snap_registered(snap
)) {
3377 ret
= rbd_register_snap_dev(snap
, &rbd_dev
->dev
);
3382 dout("%s: returning %d\n", __func__
, ret
);
3387 static int rbd_bus_add_dev(struct rbd_device
*rbd_dev
)
3392 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
3394 dev
= &rbd_dev
->dev
;
3395 dev
->bus
= &rbd_bus_type
;
3396 dev
->type
= &rbd_device_type
;
3397 dev
->parent
= &rbd_root_dev
;
3398 dev
->release
= rbd_dev_release
;
3399 dev_set_name(dev
, "%d", rbd_dev
->dev_id
);
3400 ret
= device_register(dev
);
3402 mutex_unlock(&ctl_mutex
);
3407 static void rbd_bus_del_dev(struct rbd_device
*rbd_dev
)
3409 device_unregister(&rbd_dev
->dev
);
3412 static atomic64_t rbd_dev_id_max
= ATOMIC64_INIT(0);
3415 * Get a unique rbd identifier for the given new rbd_dev, and add
3416 * the rbd_dev to the global list. The minimum rbd id is 1.
3418 static void rbd_dev_id_get(struct rbd_device
*rbd_dev
)
3420 rbd_dev
->dev_id
= atomic64_inc_return(&rbd_dev_id_max
);
3422 spin_lock(&rbd_dev_list_lock
);
3423 list_add_tail(&rbd_dev
->node
, &rbd_dev_list
);
3424 spin_unlock(&rbd_dev_list_lock
);
3425 dout("rbd_dev %p given dev id %llu\n", rbd_dev
,
3426 (unsigned long long) rbd_dev
->dev_id
);
3430 * Remove an rbd_dev from the global list, and record that its
3431 * identifier is no longer in use.
3433 static void rbd_dev_id_put(struct rbd_device
*rbd_dev
)
3435 struct list_head
*tmp
;
3436 int rbd_id
= rbd_dev
->dev_id
;
3439 rbd_assert(rbd_id
> 0);
3441 dout("rbd_dev %p released dev id %llu\n", rbd_dev
,
3442 (unsigned long long) rbd_dev
->dev_id
);
3443 spin_lock(&rbd_dev_list_lock
);
3444 list_del_init(&rbd_dev
->node
);
3447 * If the id being "put" is not the current maximum, there
3448 * is nothing special we need to do.
3450 if (rbd_id
!= atomic64_read(&rbd_dev_id_max
)) {
3451 spin_unlock(&rbd_dev_list_lock
);
3456 * We need to update the current maximum id. Search the
3457 * list to find out what it is. We're more likely to find
3458 * the maximum at the end, so search the list backward.
3461 list_for_each_prev(tmp
, &rbd_dev_list
) {
3462 struct rbd_device
*rbd_dev
;
3464 rbd_dev
= list_entry(tmp
, struct rbd_device
, node
);
3465 if (rbd_dev
->dev_id
> max_id
)
3466 max_id
= rbd_dev
->dev_id
;
3468 spin_unlock(&rbd_dev_list_lock
);
3471 * The max id could have been updated by rbd_dev_id_get(), in
3472 * which case it now accurately reflects the new maximum.
3473 * Be careful not to overwrite the maximum value in that
3476 atomic64_cmpxchg(&rbd_dev_id_max
, rbd_id
, max_id
);
3477 dout(" max dev id has been reset\n");
3481 * Skips over white space at *buf, and updates *buf to point to the
3482 * first found non-space character (if any). Returns the length of
3483 * the token (string of non-white space characters) found. Note
3484 * that *buf must be terminated with '\0'.
3486 static inline size_t next_token(const char **buf
)
3489 * These are the characters that produce nonzero for
3490 * isspace() in the "C" and "POSIX" locales.
3492 const char *spaces
= " \f\n\r\t\v";
3494 *buf
+= strspn(*buf
, spaces
); /* Find start of token */
3496 return strcspn(*buf
, spaces
); /* Return token length */
3500 * Finds the next token in *buf, and if the provided token buffer is
3501 * big enough, copies the found token into it. The result, if
3502 * copied, is guaranteed to be terminated with '\0'. Note that *buf
3503 * must be terminated with '\0' on entry.
3505 * Returns the length of the token found (not including the '\0').
3506 * Return value will be 0 if no token is found, and it will be >=
3507 * token_size if the token would not fit.
3509 * The *buf pointer will be updated to point beyond the end of the
3510 * found token. Note that this occurs even if the token buffer is
3511 * too small to hold it.
3513 static inline size_t copy_token(const char **buf
,
3519 len
= next_token(buf
);
3520 if (len
< token_size
) {
3521 memcpy(token
, *buf
, len
);
3522 *(token
+ len
) = '\0';
3530 * Finds the next token in *buf, dynamically allocates a buffer big
3531 * enough to hold a copy of it, and copies the token into the new
3532 * buffer. The copy is guaranteed to be terminated with '\0'. Note
3533 * that a duplicate buffer is created even for a zero-length token.
3535 * Returns a pointer to the newly-allocated duplicate, or a null
3536 * pointer if memory for the duplicate was not available. If
3537 * the lenp argument is a non-null pointer, the length of the token
3538 * (not including the '\0') is returned in *lenp.
3540 * If successful, the *buf pointer will be updated to point beyond
3541 * the end of the found token.
3543 * Note: uses GFP_KERNEL for allocation.
3545 static inline char *dup_token(const char **buf
, size_t *lenp
)
3550 len
= next_token(buf
);
3551 dup
= kmemdup(*buf
, len
+ 1, GFP_KERNEL
);
3554 *(dup
+ len
) = '\0';
3564 * Parse the options provided for an "rbd add" (i.e., rbd image
3565 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
3566 * and the data written is passed here via a NUL-terminated buffer.
3567 * Returns 0 if successful or an error code otherwise.
3569 * The information extracted from these options is recorded in
3570 * the other parameters which return dynamically-allocated
3573 * The address of a pointer that will refer to a ceph options
3574 * structure. Caller must release the returned pointer using
3575 * ceph_destroy_options() when it is no longer needed.
3577 * Address of an rbd options pointer. Fully initialized by
3578 * this function; caller must release with kfree().
3580 * Address of an rbd image specification pointer. Fully
3581 * initialized by this function based on parsed options.
3582 * Caller must release with rbd_spec_put().
3584 * The options passed take this form:
3585 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3588 * A comma-separated list of one or more monitor addresses.
3589 * A monitor address is an ip address, optionally followed
3590 * by a port number (separated by a colon).
3591 * I.e.: ip1[:port1][,ip2[:port2]...]
3593 * A comma-separated list of ceph and/or rbd options.
3595 * The name of the rados pool containing the rbd image.
3597 * The name of the image in that pool to map.
3599 * An optional snapshot id. If provided, the mapping will
3600 * present data from the image at the time that snapshot was
3601 * created. The image head is used if no snapshot id is
3602 * provided. Snapshot mappings are always read-only.
3604 static int rbd_add_parse_args(const char *buf
,
3605 struct ceph_options
**ceph_opts
,
3606 struct rbd_options
**opts
,
3607 struct rbd_spec
**rbd_spec
)
3611 const char *mon_addrs
;
3612 size_t mon_addrs_size
;
3613 struct rbd_spec
*spec
= NULL
;
3614 struct rbd_options
*rbd_opts
= NULL
;
3615 struct ceph_options
*copts
;
3618 /* The first four tokens are required */
3620 len
= next_token(&buf
);
3622 rbd_warn(NULL
, "no monitor address(es) provided");
3626 mon_addrs_size
= len
+ 1;
3630 options
= dup_token(&buf
, NULL
);
3634 rbd_warn(NULL
, "no options provided");
3638 spec
= rbd_spec_alloc();
3642 spec
->pool_name
= dup_token(&buf
, NULL
);
3643 if (!spec
->pool_name
)
3645 if (!*spec
->pool_name
) {
3646 rbd_warn(NULL
, "no pool name provided");
3650 spec
->image_name
= dup_token(&buf
, NULL
);
3651 if (!spec
->image_name
)
3653 if (!*spec
->image_name
) {
3654 rbd_warn(NULL
, "no image name provided");
3659 * Snapshot name is optional; default is to use "-"
3660 * (indicating the head/no snapshot).
3662 len
= next_token(&buf
);
3664 buf
= RBD_SNAP_HEAD_NAME
; /* No snapshot supplied */
3665 len
= sizeof (RBD_SNAP_HEAD_NAME
) - 1;
3666 } else if (len
> RBD_MAX_SNAP_NAME_LEN
) {
3667 ret
= -ENAMETOOLONG
;
3670 spec
->snap_name
= kmemdup(buf
, len
+ 1, GFP_KERNEL
);
3671 if (!spec
->snap_name
)
3673 *(spec
->snap_name
+ len
) = '\0';
3675 /* Initialize all rbd options to the defaults */
3677 rbd_opts
= kzalloc(sizeof (*rbd_opts
), GFP_KERNEL
);
3681 rbd_opts
->read_only
= RBD_READ_ONLY_DEFAULT
;
3683 copts
= ceph_parse_options(options
, mon_addrs
,
3684 mon_addrs
+ mon_addrs_size
- 1,
3685 parse_rbd_opts_token
, rbd_opts
);
3686 if (IS_ERR(copts
)) {
3687 ret
= PTR_ERR(copts
);
3708 * An rbd format 2 image has a unique identifier, distinct from the
3709 * name given to it by the user. Internally, that identifier is
3710 * what's used to specify the names of objects related to the image.
3712 * A special "rbd id" object is used to map an rbd image name to its
3713 * id. If that object doesn't exist, then there is no v2 rbd image
3714 * with the supplied name.
3716 * This function will record the given rbd_dev's image_id field if
3717 * it can be determined, and in that case will return 0. If any
3718 * errors occur a negative errno will be returned and the rbd_dev's
3719 * image_id field will be unchanged (and should be NULL).
3721 static int rbd_dev_image_id(struct rbd_device
*rbd_dev
)
3730 * When probing a parent image, the image id is already
3731 * known (and the image name likely is not). There's no
3732 * need to fetch the image id again in this case.
3734 if (rbd_dev
->spec
->image_id
)
3738 * First, see if the format 2 image id file exists, and if
3739 * so, get the image's persistent id from it.
3741 size
= sizeof (RBD_ID_PREFIX
) + strlen(rbd_dev
->spec
->image_name
);
3742 object_name
= kmalloc(size
, GFP_NOIO
);
3745 sprintf(object_name
, "%s%s", RBD_ID_PREFIX
, rbd_dev
->spec
->image_name
);
3746 dout("rbd id object name is %s\n", object_name
);
3748 /* Response will be an encoded string, which includes a length */
3750 size
= sizeof (__le32
) + RBD_IMAGE_ID_LEN_MAX
;
3751 response
= kzalloc(size
, GFP_NOIO
);
3757 ret
= rbd_req_sync_exec(rbd_dev
, object_name
,
3760 response
, RBD_IMAGE_ID_LEN_MAX
, NULL
);
3761 dout("%s: rbd_req_sync_exec returned %d\n", __func__
, ret
);
3764 ret
= 0; /* rbd_req_sync_exec() can return positive */
3767 rbd_dev
->spec
->image_id
= ceph_extract_encoded_string(&p
,
3768 p
+ RBD_IMAGE_ID_LEN_MAX
,
3770 if (IS_ERR(rbd_dev
->spec
->image_id
)) {
3771 ret
= PTR_ERR(rbd_dev
->spec
->image_id
);
3772 rbd_dev
->spec
->image_id
= NULL
;
3774 dout("image_id is %s\n", rbd_dev
->spec
->image_id
);
3783 static int rbd_dev_v1_probe(struct rbd_device
*rbd_dev
)
3788 /* Version 1 images have no id; empty string is used */
3790 rbd_dev
->spec
->image_id
= kstrdup("", GFP_KERNEL
);
3791 if (!rbd_dev
->spec
->image_id
)
3794 /* Record the header object name for this rbd image. */
3796 size
= strlen(rbd_dev
->spec
->image_name
) + sizeof (RBD_SUFFIX
);
3797 rbd_dev
->header_name
= kmalloc(size
, GFP_KERNEL
);
3798 if (!rbd_dev
->header_name
) {
3802 sprintf(rbd_dev
->header_name
, "%s%s",
3803 rbd_dev
->spec
->image_name
, RBD_SUFFIX
);
3805 /* Populate rbd image metadata */
3807 ret
= rbd_read_header(rbd_dev
, &rbd_dev
->header
);
3811 /* Version 1 images have no parent (no layering) */
3813 rbd_dev
->parent_spec
= NULL
;
3814 rbd_dev
->parent_overlap
= 0;
3816 rbd_dev
->image_format
= 1;
3818 dout("discovered version 1 image, header name is %s\n",
3819 rbd_dev
->header_name
);
3824 kfree(rbd_dev
->header_name
);
3825 rbd_dev
->header_name
= NULL
;
3826 kfree(rbd_dev
->spec
->image_id
);
3827 rbd_dev
->spec
->image_id
= NULL
;
3832 static int rbd_dev_v2_probe(struct rbd_device
*rbd_dev
)
3839 * Image id was filled in by the caller. Record the header
3840 * object name for this rbd image.
3842 size
= sizeof (RBD_HEADER_PREFIX
) + strlen(rbd_dev
->spec
->image_id
);
3843 rbd_dev
->header_name
= kmalloc(size
, GFP_KERNEL
);
3844 if (!rbd_dev
->header_name
)
3846 sprintf(rbd_dev
->header_name
, "%s%s",
3847 RBD_HEADER_PREFIX
, rbd_dev
->spec
->image_id
);
3849 /* Get the size and object order for the image */
3851 ret
= rbd_dev_v2_image_size(rbd_dev
);
3855 /* Get the object prefix (a.k.a. block_name) for the image */
3857 ret
= rbd_dev_v2_object_prefix(rbd_dev
);
3861 /* Get the and check features for the image */
3863 ret
= rbd_dev_v2_features(rbd_dev
);
3867 /* If the image supports layering, get the parent info */
3869 if (rbd_dev
->header
.features
& RBD_FEATURE_LAYERING
) {
3870 ret
= rbd_dev_v2_parent_info(rbd_dev
);
3875 /* crypto and compression type aren't (yet) supported for v2 images */
3877 rbd_dev
->header
.crypt_type
= 0;
3878 rbd_dev
->header
.comp_type
= 0;
3880 /* Get the snapshot context, plus the header version */
3882 ret
= rbd_dev_v2_snap_context(rbd_dev
, &ver
);
3885 rbd_dev
->header
.obj_version
= ver
;
3887 rbd_dev
->image_format
= 2;
3889 dout("discovered version 2 image, header name is %s\n",
3890 rbd_dev
->header_name
);
3894 rbd_dev
->parent_overlap
= 0;
3895 rbd_spec_put(rbd_dev
->parent_spec
);
3896 rbd_dev
->parent_spec
= NULL
;
3897 kfree(rbd_dev
->header_name
);
3898 rbd_dev
->header_name
= NULL
;
3899 kfree(rbd_dev
->header
.object_prefix
);
3900 rbd_dev
->header
.object_prefix
= NULL
;
3905 static int rbd_dev_probe_finish(struct rbd_device
*rbd_dev
)
3909 /* no need to lock here, as rbd_dev is not registered yet */
3910 ret
= rbd_dev_snaps_update(rbd_dev
);
3914 ret
= rbd_dev_probe_update_spec(rbd_dev
);
3918 ret
= rbd_dev_set_mapping(rbd_dev
);
3922 /* generate unique id: find highest unique id, add one */
3923 rbd_dev_id_get(rbd_dev
);
3925 /* Fill in the device name, now that we have its id. */
3926 BUILD_BUG_ON(DEV_NAME_LEN
3927 < sizeof (RBD_DRV_NAME
) + MAX_INT_FORMAT_WIDTH
);
3928 sprintf(rbd_dev
->name
, "%s%d", RBD_DRV_NAME
, rbd_dev
->dev_id
);
3930 /* Get our block major device number. */
3932 ret
= register_blkdev(0, rbd_dev
->name
);
3935 rbd_dev
->major
= ret
;
3937 /* Set up the blkdev mapping. */
3939 ret
= rbd_init_disk(rbd_dev
);
3941 goto err_out_blkdev
;
3943 ret
= rbd_bus_add_dev(rbd_dev
);
3948 * At this point cleanup in the event of an error is the job
3949 * of the sysfs code (initiated by rbd_bus_del_dev()).
3951 down_write(&rbd_dev
->header_rwsem
);
3952 ret
= rbd_dev_snaps_register(rbd_dev
);
3953 up_write(&rbd_dev
->header_rwsem
);
3957 ret
= rbd_dev_header_watch_sync(rbd_dev
, 1);
3961 /* Everything's ready. Announce the disk to the world. */
3963 add_disk(rbd_dev
->disk
);
3965 pr_info("%s: added with size 0x%llx\n", rbd_dev
->disk
->disk_name
,
3966 (unsigned long long) rbd_dev
->mapping
.size
);
3970 /* this will also clean up rest of rbd_dev stuff */
3972 rbd_bus_del_dev(rbd_dev
);
3976 rbd_free_disk(rbd_dev
);
3978 unregister_blkdev(rbd_dev
->major
, rbd_dev
->name
);
3980 rbd_dev_id_put(rbd_dev
);
3982 rbd_remove_all_snaps(rbd_dev
);
3988 * Probe for the existence of the header object for the given rbd
3989 * device. For format 2 images this includes determining the image
3992 static int rbd_dev_probe(struct rbd_device
*rbd_dev
)
3997 * Get the id from the image id object. If it's not a
3998 * format 2 image, we'll get ENOENT back, and we'll assume
3999 * it's a format 1 image.
4001 ret
= rbd_dev_image_id(rbd_dev
);
4003 ret
= rbd_dev_v1_probe(rbd_dev
);
4005 ret
= rbd_dev_v2_probe(rbd_dev
);
4007 dout("probe failed, returning %d\n", ret
);
4012 ret
= rbd_dev_probe_finish(rbd_dev
);
4014 rbd_header_free(&rbd_dev
->header
);
4019 static ssize_t
rbd_add(struct bus_type
*bus
,
4023 struct rbd_device
*rbd_dev
= NULL
;
4024 struct ceph_options
*ceph_opts
= NULL
;
4025 struct rbd_options
*rbd_opts
= NULL
;
4026 struct rbd_spec
*spec
= NULL
;
4027 struct rbd_client
*rbdc
;
4028 struct ceph_osd_client
*osdc
;
4031 if (!try_module_get(THIS_MODULE
))
4034 /* parse add command */
4035 rc
= rbd_add_parse_args(buf
, &ceph_opts
, &rbd_opts
, &spec
);
4037 goto err_out_module
;
4039 rbdc
= rbd_get_client(ceph_opts
);
4044 ceph_opts
= NULL
; /* rbd_dev client now owns this */
4047 osdc
= &rbdc
->client
->osdc
;
4048 rc
= ceph_pg_poolid_by_name(osdc
->osdmap
, spec
->pool_name
);
4050 goto err_out_client
;
4051 spec
->pool_id
= (u64
) rc
;
4053 /* The ceph file layout needs to fit pool id in 32 bits */
4055 if (WARN_ON(spec
->pool_id
> (u64
) U32_MAX
)) {
4057 goto err_out_client
;
4060 rbd_dev
= rbd_dev_create(rbdc
, spec
);
4062 goto err_out_client
;
4063 rbdc
= NULL
; /* rbd_dev now owns this */
4064 spec
= NULL
; /* rbd_dev now owns this */
4066 rbd_dev
->mapping
.read_only
= rbd_opts
->read_only
;
4068 rbd_opts
= NULL
; /* done with this */
4070 rc
= rbd_dev_probe(rbd_dev
);
4072 goto err_out_rbd_dev
;
4076 rbd_dev_destroy(rbd_dev
);
4078 rbd_put_client(rbdc
);
4081 ceph_destroy_options(ceph_opts
);
4085 module_put(THIS_MODULE
);
4087 dout("Error adding device %s\n", buf
);
4089 return (ssize_t
) rc
;
4092 static struct rbd_device
*__rbd_get_dev(unsigned long dev_id
)
4094 struct list_head
*tmp
;
4095 struct rbd_device
*rbd_dev
;
4097 spin_lock(&rbd_dev_list_lock
);
4098 list_for_each(tmp
, &rbd_dev_list
) {
4099 rbd_dev
= list_entry(tmp
, struct rbd_device
, node
);
4100 if (rbd_dev
->dev_id
== dev_id
) {
4101 spin_unlock(&rbd_dev_list_lock
);
4105 spin_unlock(&rbd_dev_list_lock
);
4109 static void rbd_dev_release(struct device
*dev
)
4111 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
4113 if (rbd_dev
->watch_request
) {
4114 struct ceph_client
*client
= rbd_dev
->rbd_client
->client
;
4116 ceph_osdc_unregister_linger_request(&client
->osdc
,
4117 rbd_dev
->watch_request
);
4119 if (rbd_dev
->watch_event
)
4120 rbd_dev_header_watch_sync(rbd_dev
, 0);
4122 /* clean up and free blkdev */
4123 rbd_free_disk(rbd_dev
);
4124 unregister_blkdev(rbd_dev
->major
, rbd_dev
->name
);
4126 /* release allocated disk header fields */
4127 rbd_header_free(&rbd_dev
->header
);
4129 /* done with the id, and with the rbd_dev */
4130 rbd_dev_id_put(rbd_dev
);
4131 rbd_assert(rbd_dev
->rbd_client
!= NULL
);
4132 rbd_dev_destroy(rbd_dev
);
4134 /* release module ref */
4135 module_put(THIS_MODULE
);
4138 static ssize_t
rbd_remove(struct bus_type
*bus
,
4142 struct rbd_device
*rbd_dev
= NULL
;
4147 rc
= strict_strtoul(buf
, 10, &ul
);
4151 /* convert to int; abort if we lost anything in the conversion */
4152 target_id
= (int) ul
;
4153 if (target_id
!= ul
)
4156 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
4158 rbd_dev
= __rbd_get_dev(target_id
);
4164 if (rbd_dev
->open_count
) {
4169 rbd_remove_all_snaps(rbd_dev
);
4170 rbd_bus_del_dev(rbd_dev
);
4173 mutex_unlock(&ctl_mutex
);
4179 * create control files in sysfs
4182 static int rbd_sysfs_init(void)
4186 ret
= device_register(&rbd_root_dev
);
4190 ret
= bus_register(&rbd_bus_type
);
4192 device_unregister(&rbd_root_dev
);
4197 static void rbd_sysfs_cleanup(void)
4199 bus_unregister(&rbd_bus_type
);
4200 device_unregister(&rbd_root_dev
);
4203 int __init
rbd_init(void)
4207 rc
= rbd_sysfs_init();
4210 pr_info("loaded " RBD_DRV_NAME_LONG
"\n");
4214 void __exit
rbd_exit(void)
4216 rbd_sysfs_cleanup();
4219 module_init(rbd_init
);
4220 module_exit(rbd_exit
);
4222 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4223 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4224 MODULE_DESCRIPTION("rados block device");
4226 /* following authorship retained from original osdblk.c */
4227 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4229 MODULE_LICENSE("GPL");