rbd: use new code for notify ack
[deliverable/linux.git] / drivers / block / rbd.c
1 /*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24 For usage instructions, please refer to:
25
26 Documentation/ABI/testing/sysfs-bus-rbd
27
28 */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG /* Activate rbd_assert() calls */
45
46 /*
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
51 */
52 #define SECTOR_SHIFT 9
53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
54
55 /* It might be useful to have these defined elsewhere */
56
57 #define U8_MAX ((u8) (~0U))
58 #define U16_MAX ((u16) (~0U))
59 #define U32_MAX ((u32) (~0U))
60 #define U64_MAX ((u64) (~0ULL))
61
62 #define RBD_DRV_NAME "rbd"
63 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
64
65 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
66
67 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
68 #define RBD_MAX_SNAP_NAME_LEN \
69 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
70
71 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
72
73 #define RBD_SNAP_HEAD_NAME "-"
74
75 /* This allows a single page to hold an image name sent by OSD */
76 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
77 #define RBD_IMAGE_ID_LEN_MAX 64
78
79 #define RBD_OBJ_PREFIX_LEN_MAX 64
80
81 /* Feature bits */
82
83 #define RBD_FEATURE_LAYERING 1
84
85 /* Features supported by this (client software) implementation. */
86
87 #define RBD_FEATURES_ALL (0)
88
89 /*
90 * An RBD device name will be "rbd#", where the "rbd" comes from
91 * RBD_DRV_NAME above, and # is a unique integer identifier.
92 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
93 * enough to hold all possible device names.
94 */
95 #define DEV_NAME_LEN 32
96 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
97
98 /*
99 * block device image metadata (in-memory version)
100 */
101 struct rbd_image_header {
102 /* These four fields never change for a given rbd image */
103 char *object_prefix;
104 u64 features;
105 __u8 obj_order;
106 __u8 crypt_type;
107 __u8 comp_type;
108
109 /* The remaining fields need to be updated occasionally */
110 u64 image_size;
111 struct ceph_snap_context *snapc;
112 char *snap_names;
113 u64 *snap_sizes;
114
115 u64 obj_version;
116 };
117
118 /*
119 * An rbd image specification.
120 *
121 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
122 * identify an image. Each rbd_dev structure includes a pointer to
123 * an rbd_spec structure that encapsulates this identity.
124 *
125 * Each of the id's in an rbd_spec has an associated name. For a
126 * user-mapped image, the names are supplied and the id's associated
127 * with them are looked up. For a layered image, a parent image is
128 * defined by the tuple, and the names are looked up.
129 *
130 * An rbd_dev structure contains a parent_spec pointer which is
131 * non-null if the image it represents is a child in a layered
132 * image. This pointer will refer to the rbd_spec structure used
133 * by the parent rbd_dev for its own identity (i.e., the structure
134 * is shared between the parent and child).
135 *
136 * Since these structures are populated once, during the discovery
137 * phase of image construction, they are effectively immutable so
138 * we make no effort to synchronize access to them.
139 *
140 * Note that code herein does not assume the image name is known (it
141 * could be a null pointer).
142 */
143 struct rbd_spec {
144 u64 pool_id;
145 char *pool_name;
146
147 char *image_id;
148 char *image_name;
149
150 u64 snap_id;
151 char *snap_name;
152
153 struct kref kref;
154 };
155
156 /*
157 * an instance of the client. multiple devices may share an rbd client.
158 */
159 struct rbd_client {
160 struct ceph_client *client;
161 struct kref kref;
162 struct list_head node;
163 };
164
165 struct rbd_img_request;
166 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
167
168 #define BAD_WHICH U32_MAX /* Good which or bad which, which? */
169
170 struct rbd_obj_request;
171 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
172
173 enum obj_request_type {
174 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
175 };
176
177 struct rbd_obj_request {
178 const char *object_name;
179 u64 offset; /* object start byte */
180 u64 length; /* bytes from offset */
181
182 struct rbd_img_request *img_request;
183 struct list_head links; /* img_request->obj_requests */
184 u32 which; /* posn image request list */
185
186 enum obj_request_type type;
187 union {
188 struct bio *bio_list;
189 struct {
190 struct page **pages;
191 u32 page_count;
192 };
193 };
194
195 struct ceph_osd_request *osd_req;
196
197 u64 xferred; /* bytes transferred */
198 u64 version;
199 s32 result;
200 atomic_t done;
201
202 rbd_obj_callback_t callback;
203 struct completion completion;
204
205 struct kref kref;
206 };
207
208 struct rbd_img_request {
209 struct request *rq;
210 struct rbd_device *rbd_dev;
211 u64 offset; /* starting image byte offset */
212 u64 length; /* byte count from offset */
213 bool write_request; /* false for read */
214 union {
215 struct ceph_snap_context *snapc; /* for writes */
216 u64 snap_id; /* for reads */
217 };
218 spinlock_t completion_lock;/* protects next_completion */
219 u32 next_completion;
220 rbd_img_callback_t callback;
221
222 u32 obj_request_count;
223 struct list_head obj_requests; /* rbd_obj_request structs */
224
225 struct kref kref;
226 };
227
228 #define for_each_obj_request(ireq, oreq) \
229 list_for_each_entry(oreq, &ireq->obj_requests, links)
230 #define for_each_obj_request_from(ireq, oreq) \
231 list_for_each_entry_from(oreq, &ireq->obj_requests, links)
232 #define for_each_obj_request_safe(ireq, oreq, n) \
233 list_for_each_entry_safe_reverse(oreq, n, &ireq->obj_requests, links)
234
235 struct rbd_snap {
236 struct device dev;
237 const char *name;
238 u64 size;
239 struct list_head node;
240 u64 id;
241 u64 features;
242 };
243
244 struct rbd_mapping {
245 u64 size;
246 u64 features;
247 bool read_only;
248 };
249
250 /*
251 * a single device
252 */
253 struct rbd_device {
254 int dev_id; /* blkdev unique id */
255
256 int major; /* blkdev assigned major */
257 struct gendisk *disk; /* blkdev's gendisk and rq */
258
259 u32 image_format; /* Either 1 or 2 */
260 struct rbd_client *rbd_client;
261
262 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
263
264 spinlock_t lock; /* queue lock */
265
266 struct rbd_image_header header;
267 atomic_t exists;
268 struct rbd_spec *spec;
269
270 char *header_name;
271
272 struct ceph_file_layout layout;
273
274 struct ceph_osd_event *watch_event;
275 struct ceph_osd_request *watch_request;
276
277 struct rbd_spec *parent_spec;
278 u64 parent_overlap;
279
280 /* protects updating the header */
281 struct rw_semaphore header_rwsem;
282
283 struct rbd_mapping mapping;
284
285 struct list_head node;
286
287 /* list of snapshots */
288 struct list_head snaps;
289
290 /* sysfs related */
291 struct device dev;
292 unsigned long open_count;
293 };
294
295 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
296
297 static LIST_HEAD(rbd_dev_list); /* devices */
298 static DEFINE_SPINLOCK(rbd_dev_list_lock);
299
300 static LIST_HEAD(rbd_client_list); /* clients */
301 static DEFINE_SPINLOCK(rbd_client_list_lock);
302
303 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
304 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
305
306 static void rbd_dev_release(struct device *dev);
307 static void rbd_remove_snap_dev(struct rbd_snap *snap);
308
309 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
310 size_t count);
311 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
312 size_t count);
313
314 static struct bus_attribute rbd_bus_attrs[] = {
315 __ATTR(add, S_IWUSR, NULL, rbd_add),
316 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
317 __ATTR_NULL
318 };
319
320 static struct bus_type rbd_bus_type = {
321 .name = "rbd",
322 .bus_attrs = rbd_bus_attrs,
323 };
324
325 static void rbd_root_dev_release(struct device *dev)
326 {
327 }
328
329 static struct device rbd_root_dev = {
330 .init_name = "rbd",
331 .release = rbd_root_dev_release,
332 };
333
334 static __printf(2, 3)
335 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
336 {
337 struct va_format vaf;
338 va_list args;
339
340 va_start(args, fmt);
341 vaf.fmt = fmt;
342 vaf.va = &args;
343
344 if (!rbd_dev)
345 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
346 else if (rbd_dev->disk)
347 printk(KERN_WARNING "%s: %s: %pV\n",
348 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
349 else if (rbd_dev->spec && rbd_dev->spec->image_name)
350 printk(KERN_WARNING "%s: image %s: %pV\n",
351 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
352 else if (rbd_dev->spec && rbd_dev->spec->image_id)
353 printk(KERN_WARNING "%s: id %s: %pV\n",
354 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
355 else /* punt */
356 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
357 RBD_DRV_NAME, rbd_dev, &vaf);
358 va_end(args);
359 }
360
361 #ifdef RBD_DEBUG
362 #define rbd_assert(expr) \
363 if (unlikely(!(expr))) { \
364 printk(KERN_ERR "\nAssertion failure in %s() " \
365 "at line %d:\n\n" \
366 "\trbd_assert(%s);\n\n", \
367 __func__, __LINE__, #expr); \
368 BUG(); \
369 }
370 #else /* !RBD_DEBUG */
371 # define rbd_assert(expr) ((void) 0)
372 #endif /* !RBD_DEBUG */
373
374 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
375 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
376
377 static int rbd_open(struct block_device *bdev, fmode_t mode)
378 {
379 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
380
381 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
382 return -EROFS;
383
384 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
385 (void) get_device(&rbd_dev->dev);
386 set_device_ro(bdev, rbd_dev->mapping.read_only);
387 rbd_dev->open_count++;
388 mutex_unlock(&ctl_mutex);
389
390 return 0;
391 }
392
393 static int rbd_release(struct gendisk *disk, fmode_t mode)
394 {
395 struct rbd_device *rbd_dev = disk->private_data;
396
397 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
398 rbd_assert(rbd_dev->open_count > 0);
399 rbd_dev->open_count--;
400 put_device(&rbd_dev->dev);
401 mutex_unlock(&ctl_mutex);
402
403 return 0;
404 }
405
406 static const struct block_device_operations rbd_bd_ops = {
407 .owner = THIS_MODULE,
408 .open = rbd_open,
409 .release = rbd_release,
410 };
411
412 /*
413 * Initialize an rbd client instance.
414 * We own *ceph_opts.
415 */
416 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
417 {
418 struct rbd_client *rbdc;
419 int ret = -ENOMEM;
420
421 dout("rbd_client_create\n");
422 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
423 if (!rbdc)
424 goto out_opt;
425
426 kref_init(&rbdc->kref);
427 INIT_LIST_HEAD(&rbdc->node);
428
429 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
430
431 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
432 if (IS_ERR(rbdc->client))
433 goto out_mutex;
434 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
435
436 ret = ceph_open_session(rbdc->client);
437 if (ret < 0)
438 goto out_err;
439
440 spin_lock(&rbd_client_list_lock);
441 list_add_tail(&rbdc->node, &rbd_client_list);
442 spin_unlock(&rbd_client_list_lock);
443
444 mutex_unlock(&ctl_mutex);
445
446 dout("rbd_client_create created %p\n", rbdc);
447 return rbdc;
448
449 out_err:
450 ceph_destroy_client(rbdc->client);
451 out_mutex:
452 mutex_unlock(&ctl_mutex);
453 kfree(rbdc);
454 out_opt:
455 if (ceph_opts)
456 ceph_destroy_options(ceph_opts);
457 return ERR_PTR(ret);
458 }
459
460 /*
461 * Find a ceph client with specific addr and configuration. If
462 * found, bump its reference count.
463 */
464 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
465 {
466 struct rbd_client *client_node;
467 bool found = false;
468
469 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
470 return NULL;
471
472 spin_lock(&rbd_client_list_lock);
473 list_for_each_entry(client_node, &rbd_client_list, node) {
474 if (!ceph_compare_options(ceph_opts, client_node->client)) {
475 kref_get(&client_node->kref);
476 found = true;
477 break;
478 }
479 }
480 spin_unlock(&rbd_client_list_lock);
481
482 return found ? client_node : NULL;
483 }
484
485 /*
486 * mount options
487 */
488 enum {
489 Opt_last_int,
490 /* int args above */
491 Opt_last_string,
492 /* string args above */
493 Opt_read_only,
494 Opt_read_write,
495 /* Boolean args above */
496 Opt_last_bool,
497 };
498
499 static match_table_t rbd_opts_tokens = {
500 /* int args above */
501 /* string args above */
502 {Opt_read_only, "read_only"},
503 {Opt_read_only, "ro"}, /* Alternate spelling */
504 {Opt_read_write, "read_write"},
505 {Opt_read_write, "rw"}, /* Alternate spelling */
506 /* Boolean args above */
507 {-1, NULL}
508 };
509
510 struct rbd_options {
511 bool read_only;
512 };
513
514 #define RBD_READ_ONLY_DEFAULT false
515
516 static int parse_rbd_opts_token(char *c, void *private)
517 {
518 struct rbd_options *rbd_opts = private;
519 substring_t argstr[MAX_OPT_ARGS];
520 int token, intval, ret;
521
522 token = match_token(c, rbd_opts_tokens, argstr);
523 if (token < 0)
524 return -EINVAL;
525
526 if (token < Opt_last_int) {
527 ret = match_int(&argstr[0], &intval);
528 if (ret < 0) {
529 pr_err("bad mount option arg (not int) "
530 "at '%s'\n", c);
531 return ret;
532 }
533 dout("got int token %d val %d\n", token, intval);
534 } else if (token > Opt_last_int && token < Opt_last_string) {
535 dout("got string token %d val %s\n", token,
536 argstr[0].from);
537 } else if (token > Opt_last_string && token < Opt_last_bool) {
538 dout("got Boolean token %d\n", token);
539 } else {
540 dout("got token %d\n", token);
541 }
542
543 switch (token) {
544 case Opt_read_only:
545 rbd_opts->read_only = true;
546 break;
547 case Opt_read_write:
548 rbd_opts->read_only = false;
549 break;
550 default:
551 rbd_assert(false);
552 break;
553 }
554 return 0;
555 }
556
557 /*
558 * Get a ceph client with specific addr and configuration, if one does
559 * not exist create it.
560 */
561 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
562 {
563 struct rbd_client *rbdc;
564
565 rbdc = rbd_client_find(ceph_opts);
566 if (rbdc) /* using an existing client */
567 ceph_destroy_options(ceph_opts);
568 else
569 rbdc = rbd_client_create(ceph_opts);
570
571 return rbdc;
572 }
573
574 /*
575 * Destroy ceph client
576 *
577 * Caller must hold rbd_client_list_lock.
578 */
579 static void rbd_client_release(struct kref *kref)
580 {
581 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
582
583 dout("rbd_release_client %p\n", rbdc);
584 spin_lock(&rbd_client_list_lock);
585 list_del(&rbdc->node);
586 spin_unlock(&rbd_client_list_lock);
587
588 ceph_destroy_client(rbdc->client);
589 kfree(rbdc);
590 }
591
592 /*
593 * Drop reference to ceph client node. If it's not referenced anymore, release
594 * it.
595 */
596 static void rbd_put_client(struct rbd_client *rbdc)
597 {
598 if (rbdc)
599 kref_put(&rbdc->kref, rbd_client_release);
600 }
601
602 static bool rbd_image_format_valid(u32 image_format)
603 {
604 return image_format == 1 || image_format == 2;
605 }
606
607 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
608 {
609 size_t size;
610 u32 snap_count;
611
612 /* The header has to start with the magic rbd header text */
613 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
614 return false;
615
616 /* The bio layer requires at least sector-sized I/O */
617
618 if (ondisk->options.order < SECTOR_SHIFT)
619 return false;
620
621 /* If we use u64 in a few spots we may be able to loosen this */
622
623 if (ondisk->options.order > 8 * sizeof (int) - 1)
624 return false;
625
626 /*
627 * The size of a snapshot header has to fit in a size_t, and
628 * that limits the number of snapshots.
629 */
630 snap_count = le32_to_cpu(ondisk->snap_count);
631 size = SIZE_MAX - sizeof (struct ceph_snap_context);
632 if (snap_count > size / sizeof (__le64))
633 return false;
634
635 /*
636 * Not only that, but the size of the entire the snapshot
637 * header must also be representable in a size_t.
638 */
639 size -= snap_count * sizeof (__le64);
640 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
641 return false;
642
643 return true;
644 }
645
646 /*
647 * Create a new header structure, translate header format from the on-disk
648 * header.
649 */
650 static int rbd_header_from_disk(struct rbd_image_header *header,
651 struct rbd_image_header_ondisk *ondisk)
652 {
653 u32 snap_count;
654 size_t len;
655 size_t size;
656 u32 i;
657
658 memset(header, 0, sizeof (*header));
659
660 snap_count = le32_to_cpu(ondisk->snap_count);
661
662 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
663 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
664 if (!header->object_prefix)
665 return -ENOMEM;
666 memcpy(header->object_prefix, ondisk->object_prefix, len);
667 header->object_prefix[len] = '\0';
668
669 if (snap_count) {
670 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
671
672 /* Save a copy of the snapshot names */
673
674 if (snap_names_len > (u64) SIZE_MAX)
675 return -EIO;
676 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
677 if (!header->snap_names)
678 goto out_err;
679 /*
680 * Note that rbd_dev_v1_header_read() guarantees
681 * the ondisk buffer we're working with has
682 * snap_names_len bytes beyond the end of the
683 * snapshot id array, this memcpy() is safe.
684 */
685 memcpy(header->snap_names, &ondisk->snaps[snap_count],
686 snap_names_len);
687
688 /* Record each snapshot's size */
689
690 size = snap_count * sizeof (*header->snap_sizes);
691 header->snap_sizes = kmalloc(size, GFP_KERNEL);
692 if (!header->snap_sizes)
693 goto out_err;
694 for (i = 0; i < snap_count; i++)
695 header->snap_sizes[i] =
696 le64_to_cpu(ondisk->snaps[i].image_size);
697 } else {
698 WARN_ON(ondisk->snap_names_len);
699 header->snap_names = NULL;
700 header->snap_sizes = NULL;
701 }
702
703 header->features = 0; /* No features support in v1 images */
704 header->obj_order = ondisk->options.order;
705 header->crypt_type = ondisk->options.crypt_type;
706 header->comp_type = ondisk->options.comp_type;
707
708 /* Allocate and fill in the snapshot context */
709
710 header->image_size = le64_to_cpu(ondisk->image_size);
711 size = sizeof (struct ceph_snap_context);
712 size += snap_count * sizeof (header->snapc->snaps[0]);
713 header->snapc = kzalloc(size, GFP_KERNEL);
714 if (!header->snapc)
715 goto out_err;
716
717 atomic_set(&header->snapc->nref, 1);
718 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
719 header->snapc->num_snaps = snap_count;
720 for (i = 0; i < snap_count; i++)
721 header->snapc->snaps[i] =
722 le64_to_cpu(ondisk->snaps[i].id);
723
724 return 0;
725
726 out_err:
727 kfree(header->snap_sizes);
728 header->snap_sizes = NULL;
729 kfree(header->snap_names);
730 header->snap_names = NULL;
731 kfree(header->object_prefix);
732 header->object_prefix = NULL;
733
734 return -ENOMEM;
735 }
736
737 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
738 {
739 struct rbd_snap *snap;
740
741 if (snap_id == CEPH_NOSNAP)
742 return RBD_SNAP_HEAD_NAME;
743
744 list_for_each_entry(snap, &rbd_dev->snaps, node)
745 if (snap_id == snap->id)
746 return snap->name;
747
748 return NULL;
749 }
750
751 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
752 {
753
754 struct rbd_snap *snap;
755
756 list_for_each_entry(snap, &rbd_dev->snaps, node) {
757 if (!strcmp(snap_name, snap->name)) {
758 rbd_dev->spec->snap_id = snap->id;
759 rbd_dev->mapping.size = snap->size;
760 rbd_dev->mapping.features = snap->features;
761
762 return 0;
763 }
764 }
765
766 return -ENOENT;
767 }
768
769 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
770 {
771 int ret;
772
773 if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
774 sizeof (RBD_SNAP_HEAD_NAME))) {
775 rbd_dev->spec->snap_id = CEPH_NOSNAP;
776 rbd_dev->mapping.size = rbd_dev->header.image_size;
777 rbd_dev->mapping.features = rbd_dev->header.features;
778 ret = 0;
779 } else {
780 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
781 if (ret < 0)
782 goto done;
783 rbd_dev->mapping.read_only = true;
784 }
785 atomic_set(&rbd_dev->exists, 1);
786 done:
787 return ret;
788 }
789
790 static void rbd_header_free(struct rbd_image_header *header)
791 {
792 kfree(header->object_prefix);
793 header->object_prefix = NULL;
794 kfree(header->snap_sizes);
795 header->snap_sizes = NULL;
796 kfree(header->snap_names);
797 header->snap_names = NULL;
798 ceph_put_snap_context(header->snapc);
799 header->snapc = NULL;
800 }
801
802 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
803 {
804 char *name;
805 u64 segment;
806 int ret;
807
808 name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
809 if (!name)
810 return NULL;
811 segment = offset >> rbd_dev->header.obj_order;
812 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
813 rbd_dev->header.object_prefix, segment);
814 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
815 pr_err("error formatting segment name for #%llu (%d)\n",
816 segment, ret);
817 kfree(name);
818 name = NULL;
819 }
820
821 return name;
822 }
823
824 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
825 {
826 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
827
828 return offset & (segment_size - 1);
829 }
830
831 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
832 u64 offset, u64 length)
833 {
834 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
835
836 offset &= segment_size - 1;
837
838 rbd_assert(length <= U64_MAX - offset);
839 if (offset + length > segment_size)
840 length = segment_size - offset;
841
842 return length;
843 }
844
845 /*
846 * returns the size of an object in the image
847 */
848 static u64 rbd_obj_bytes(struct rbd_image_header *header)
849 {
850 return 1 << header->obj_order;
851 }
852
853 /*
854 * bio helpers
855 */
856
857 static void bio_chain_put(struct bio *chain)
858 {
859 struct bio *tmp;
860
861 while (chain) {
862 tmp = chain;
863 chain = chain->bi_next;
864 bio_put(tmp);
865 }
866 }
867
868 /*
869 * zeros a bio chain, starting at specific offset
870 */
871 static void zero_bio_chain(struct bio *chain, int start_ofs)
872 {
873 struct bio_vec *bv;
874 unsigned long flags;
875 void *buf;
876 int i;
877 int pos = 0;
878
879 while (chain) {
880 bio_for_each_segment(bv, chain, i) {
881 if (pos + bv->bv_len > start_ofs) {
882 int remainder = max(start_ofs - pos, 0);
883 buf = bvec_kmap_irq(bv, &flags);
884 memset(buf + remainder, 0,
885 bv->bv_len - remainder);
886 bvec_kunmap_irq(buf, &flags);
887 }
888 pos += bv->bv_len;
889 }
890
891 chain = chain->bi_next;
892 }
893 }
894
895 /*
896 * Clone a portion of a bio, starting at the given byte offset
897 * and continuing for the number of bytes indicated.
898 */
899 static struct bio *bio_clone_range(struct bio *bio_src,
900 unsigned int offset,
901 unsigned int len,
902 gfp_t gfpmask)
903 {
904 struct bio_vec *bv;
905 unsigned int resid;
906 unsigned short idx;
907 unsigned int voff;
908 unsigned short end_idx;
909 unsigned short vcnt;
910 struct bio *bio;
911
912 /* Handle the easy case for the caller */
913
914 if (!offset && len == bio_src->bi_size)
915 return bio_clone(bio_src, gfpmask);
916
917 if (WARN_ON_ONCE(!len))
918 return NULL;
919 if (WARN_ON_ONCE(len > bio_src->bi_size))
920 return NULL;
921 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
922 return NULL;
923
924 /* Find first affected segment... */
925
926 resid = offset;
927 __bio_for_each_segment(bv, bio_src, idx, 0) {
928 if (resid < bv->bv_len)
929 break;
930 resid -= bv->bv_len;
931 }
932 voff = resid;
933
934 /* ...and the last affected segment */
935
936 resid += len;
937 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
938 if (resid <= bv->bv_len)
939 break;
940 resid -= bv->bv_len;
941 }
942 vcnt = end_idx - idx + 1;
943
944 /* Build the clone */
945
946 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
947 if (!bio)
948 return NULL; /* ENOMEM */
949
950 bio->bi_bdev = bio_src->bi_bdev;
951 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
952 bio->bi_rw = bio_src->bi_rw;
953 bio->bi_flags |= 1 << BIO_CLONED;
954
955 /*
956 * Copy over our part of the bio_vec, then update the first
957 * and last (or only) entries.
958 */
959 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
960 vcnt * sizeof (struct bio_vec));
961 bio->bi_io_vec[0].bv_offset += voff;
962 if (vcnt > 1) {
963 bio->bi_io_vec[0].bv_len -= voff;
964 bio->bi_io_vec[vcnt - 1].bv_len = resid;
965 } else {
966 bio->bi_io_vec[0].bv_len = len;
967 }
968
969 bio->bi_vcnt = vcnt;
970 bio->bi_size = len;
971 bio->bi_idx = 0;
972
973 return bio;
974 }
975
976 /*
977 * Clone a portion of a bio chain, starting at the given byte offset
978 * into the first bio in the source chain and continuing for the
979 * number of bytes indicated. The result is another bio chain of
980 * exactly the given length, or a null pointer on error.
981 *
982 * The bio_src and offset parameters are both in-out. On entry they
983 * refer to the first source bio and the offset into that bio where
984 * the start of data to be cloned is located.
985 *
986 * On return, bio_src is updated to refer to the bio in the source
987 * chain that contains first un-cloned byte, and *offset will
988 * contain the offset of that byte within that bio.
989 */
990 static struct bio *bio_chain_clone_range(struct bio **bio_src,
991 unsigned int *offset,
992 unsigned int len,
993 gfp_t gfpmask)
994 {
995 struct bio *bi = *bio_src;
996 unsigned int off = *offset;
997 struct bio *chain = NULL;
998 struct bio **end;
999
1000 /* Build up a chain of clone bios up to the limit */
1001
1002 if (!bi || off >= bi->bi_size || !len)
1003 return NULL; /* Nothing to clone */
1004
1005 end = &chain;
1006 while (len) {
1007 unsigned int bi_size;
1008 struct bio *bio;
1009
1010 if (!bi) {
1011 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1012 goto out_err; /* EINVAL; ran out of bio's */
1013 }
1014 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1015 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1016 if (!bio)
1017 goto out_err; /* ENOMEM */
1018
1019 *end = bio;
1020 end = &bio->bi_next;
1021
1022 off += bi_size;
1023 if (off == bi->bi_size) {
1024 bi = bi->bi_next;
1025 off = 0;
1026 }
1027 len -= bi_size;
1028 }
1029 *bio_src = bi;
1030 *offset = off;
1031
1032 return chain;
1033 out_err:
1034 bio_chain_put(chain);
1035
1036 return NULL;
1037 }
1038
1039 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1040 {
1041 kref_get(&obj_request->kref);
1042 }
1043
1044 static void rbd_obj_request_destroy(struct kref *kref);
1045 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1046 {
1047 rbd_assert(obj_request != NULL);
1048 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1049 }
1050
1051 static void rbd_img_request_get(struct rbd_img_request *img_request)
1052 {
1053 kref_get(&img_request->kref);
1054 }
1055
1056 static void rbd_img_request_destroy(struct kref *kref);
1057 static void rbd_img_request_put(struct rbd_img_request *img_request)
1058 {
1059 rbd_assert(img_request != NULL);
1060 kref_put(&img_request->kref, rbd_img_request_destroy);
1061 }
1062
1063 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1064 struct rbd_obj_request *obj_request)
1065 {
1066 rbd_obj_request_get(obj_request);
1067 obj_request->img_request = img_request;
1068 list_add_tail(&obj_request->links, &img_request->obj_requests);
1069 obj_request->which = img_request->obj_request_count++;
1070 rbd_assert(obj_request->which != BAD_WHICH);
1071 }
1072
1073 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1074 struct rbd_obj_request *obj_request)
1075 {
1076 rbd_assert(obj_request->which != BAD_WHICH);
1077 obj_request->which = BAD_WHICH;
1078 list_del(&obj_request->links);
1079 rbd_assert(obj_request->img_request == img_request);
1080 obj_request->callback = NULL;
1081 obj_request->img_request = NULL;
1082 rbd_obj_request_put(obj_request);
1083 }
1084
1085 static bool obj_request_type_valid(enum obj_request_type type)
1086 {
1087 switch (type) {
1088 case OBJ_REQUEST_NODATA:
1089 case OBJ_REQUEST_BIO:
1090 case OBJ_REQUEST_PAGES:
1091 return true;
1092 default:
1093 return false;
1094 }
1095 }
1096
1097 struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...)
1098 {
1099 struct ceph_osd_req_op *op;
1100 va_list args;
1101 size_t size;
1102
1103 op = kzalloc(sizeof (*op), GFP_NOIO);
1104 if (!op)
1105 return NULL;
1106 op->op = opcode;
1107 va_start(args, opcode);
1108 switch (opcode) {
1109 case CEPH_OSD_OP_READ:
1110 case CEPH_OSD_OP_WRITE:
1111 /* rbd_osd_req_op_create(READ, offset, length) */
1112 /* rbd_osd_req_op_create(WRITE, offset, length) */
1113 op->extent.offset = va_arg(args, u64);
1114 op->extent.length = va_arg(args, u64);
1115 if (opcode == CEPH_OSD_OP_WRITE)
1116 op->payload_len = op->extent.length;
1117 break;
1118 case CEPH_OSD_OP_CALL:
1119 /* rbd_osd_req_op_create(CALL, class, method, data, datalen) */
1120 op->cls.class_name = va_arg(args, char *);
1121 size = strlen(op->cls.class_name);
1122 rbd_assert(size <= (size_t) U8_MAX);
1123 op->cls.class_len = size;
1124 op->payload_len = size;
1125
1126 op->cls.method_name = va_arg(args, char *);
1127 size = strlen(op->cls.method_name);
1128 rbd_assert(size <= (size_t) U8_MAX);
1129 op->cls.method_len = size;
1130 op->payload_len += size;
1131
1132 op->cls.argc = 0;
1133 op->cls.indata = va_arg(args, void *);
1134 size = va_arg(args, size_t);
1135 rbd_assert(size <= (size_t) U32_MAX);
1136 op->cls.indata_len = (u32) size;
1137 op->payload_len += size;
1138 break;
1139 case CEPH_OSD_OP_NOTIFY_ACK:
1140 case CEPH_OSD_OP_WATCH:
1141 /* rbd_osd_req_op_create(NOTIFY_ACK, cookie, version) */
1142 /* rbd_osd_req_op_create(WATCH, cookie, version, flag) */
1143 op->watch.cookie = va_arg(args, u64);
1144 op->watch.ver = va_arg(args, u64);
1145 op->watch.ver = cpu_to_le64(op->watch.ver);
1146 if (opcode == CEPH_OSD_OP_WATCH && va_arg(args, int))
1147 op->watch.flag = (u8) 1;
1148 break;
1149 default:
1150 rbd_warn(NULL, "unsupported opcode %hu\n", opcode);
1151 kfree(op);
1152 op = NULL;
1153 break;
1154 }
1155 va_end(args);
1156
1157 return op;
1158 }
1159
1160 static void rbd_osd_req_op_destroy(struct ceph_osd_req_op *op)
1161 {
1162 kfree(op);
1163 }
1164
1165 /*
1166 * Send ceph osd request
1167 */
1168 static int rbd_do_request(struct request *rq,
1169 struct rbd_device *rbd_dev,
1170 struct ceph_snap_context *snapc,
1171 u64 snapid,
1172 const char *object_name, u64 ofs, u64 len,
1173 struct bio *bio,
1174 struct page **pages,
1175 int num_pages,
1176 int flags,
1177 struct ceph_osd_req_op *op,
1178 void (*rbd_cb)(struct ceph_osd_request *,
1179 struct ceph_msg *),
1180 u64 *ver)
1181 {
1182 struct ceph_osd_client *osdc;
1183 struct ceph_osd_request *osd_req;
1184 struct timespec mtime = CURRENT_TIME;
1185 int ret;
1186
1187 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n",
1188 object_name, (unsigned long long) ofs,
1189 (unsigned long long) len);
1190
1191 osdc = &rbd_dev->rbd_client->client->osdc;
1192 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_NOIO);
1193 if (!osd_req)
1194 return -ENOMEM;
1195
1196 osd_req->r_flags = flags;
1197 osd_req->r_pages = pages;
1198 if (bio) {
1199 osd_req->r_bio = bio;
1200 bio_get(osd_req->r_bio);
1201 }
1202
1203 osd_req->r_callback = rbd_cb;
1204 osd_req->r_priv = NULL;
1205
1206 strncpy(osd_req->r_oid, object_name, sizeof(osd_req->r_oid));
1207 osd_req->r_oid_len = strlen(osd_req->r_oid);
1208
1209 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1210 osd_req->r_num_pages = calc_pages_for(ofs, len);
1211 osd_req->r_page_alignment = ofs & ~PAGE_MASK;
1212
1213 ceph_osdc_build_request(osd_req, ofs, len, 1, op,
1214 snapc, snapid, &mtime);
1215
1216 if (op->op == CEPH_OSD_OP_WATCH && op->watch.flag) {
1217 ceph_osdc_set_request_linger(osdc, osd_req);
1218 rbd_dev->watch_request = osd_req;
1219 }
1220
1221 ret = ceph_osdc_start_request(osdc, osd_req, false);
1222 if (ret < 0)
1223 goto done_err;
1224
1225 if (!rbd_cb) {
1226 u64 version;
1227
1228 ret = ceph_osdc_wait_request(osdc, osd_req);
1229 version = le64_to_cpu(osd_req->r_reassert_version.version);
1230 if (ver)
1231 *ver = version;
1232 dout("reassert_ver=%llu\n", (unsigned long long) version);
1233 ceph_osdc_put_request(osd_req);
1234 }
1235 return ret;
1236
1237 done_err:
1238 if (bio)
1239 bio_chain_put(osd_req->r_bio);
1240 ceph_osdc_put_request(osd_req);
1241
1242 return ret;
1243 }
1244
1245 static void rbd_simple_req_cb(struct ceph_osd_request *osd_req,
1246 struct ceph_msg *msg)
1247 {
1248 ceph_osdc_put_request(osd_req);
1249 }
1250
1251 /*
1252 * Do a synchronous ceph osd operation
1253 */
1254 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1255 int flags,
1256 struct ceph_osd_req_op *op,
1257 const char *object_name,
1258 u64 ofs, u64 inbound_size,
1259 char *inbound,
1260 u64 *ver)
1261 {
1262 int ret;
1263 struct page **pages;
1264 int num_pages;
1265
1266 rbd_assert(op != NULL);
1267
1268 num_pages = calc_pages_for(ofs, inbound_size);
1269 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1270 if (IS_ERR(pages))
1271 return PTR_ERR(pages);
1272
1273 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1274 object_name, ofs, inbound_size, NULL,
1275 pages, num_pages,
1276 flags,
1277 op,
1278 NULL,
1279 ver);
1280 if (ret < 0)
1281 goto done;
1282
1283 if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1284 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1285
1286 done:
1287 ceph_release_page_vector(pages, num_pages);
1288 return ret;
1289 }
1290
1291 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1292 struct rbd_obj_request *obj_request)
1293 {
1294 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1295 }
1296
1297 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1298 {
1299 if (img_request->callback)
1300 img_request->callback(img_request);
1301 else
1302 rbd_img_request_put(img_request);
1303 }
1304
1305 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1306
1307 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1308 {
1309 return wait_for_completion_interruptible(&obj_request->completion);
1310 }
1311
1312 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request,
1313 struct ceph_osd_op *op)
1314 {
1315 atomic_set(&obj_request->done, 1);
1316 }
1317
1318 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1319 {
1320 if (obj_request->callback)
1321 obj_request->callback(obj_request);
1322 else
1323 complete_all(&obj_request->completion);
1324 }
1325
1326 /*
1327 * Request sync osd watch
1328 */
1329 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1330 u64 ver,
1331 u64 notify_id)
1332 {
1333 struct ceph_osd_req_op *op;
1334 int ret;
1335
1336 op = rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver);
1337 if (!op)
1338 return -ENOMEM;
1339
1340 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1341 rbd_dev->header_name, 0, 0, NULL,
1342 NULL, 0,
1343 CEPH_OSD_FLAG_READ,
1344 op,
1345 rbd_simple_req_cb, NULL);
1346
1347 rbd_osd_req_op_destroy(op);
1348
1349 return ret;
1350 }
1351
1352 /*
1353 * Synchronous osd object method call
1354 */
1355 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1356 const char *object_name,
1357 const char *class_name,
1358 const char *method_name,
1359 const char *outbound,
1360 size_t outbound_size,
1361 char *inbound,
1362 size_t inbound_size,
1363 u64 *ver)
1364 {
1365 struct ceph_osd_req_op *op;
1366 int ret;
1367
1368 /*
1369 * Any input parameters required by the method we're calling
1370 * will be sent along with the class and method names as
1371 * part of the message payload. That data and its size are
1372 * supplied via the indata and indata_len fields (named from
1373 * the perspective of the server side) in the OSD request
1374 * operation.
1375 */
1376 op = rbd_osd_req_op_create(CEPH_OSD_OP_CALL, class_name,
1377 method_name, outbound, outbound_size);
1378 if (!op)
1379 return -ENOMEM;
1380
1381 ret = rbd_req_sync_op(rbd_dev, CEPH_OSD_FLAG_READ, op,
1382 object_name, 0, inbound_size, inbound,
1383 ver);
1384
1385 rbd_osd_req_op_destroy(op);
1386
1387 dout("cls_exec returned %d\n", ret);
1388 return ret;
1389 }
1390
1391 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request,
1392 struct ceph_osd_op *op)
1393 {
1394 u64 xferred;
1395
1396 /*
1397 * We support a 64-bit length, but ultimately it has to be
1398 * passed to blk_end_request(), which takes an unsigned int.
1399 */
1400 xferred = le64_to_cpu(op->extent.length);
1401 rbd_assert(xferred < (u64) UINT_MAX);
1402 if (obj_request->result == (s32) -ENOENT) {
1403 zero_bio_chain(obj_request->bio_list, 0);
1404 obj_request->result = 0;
1405 } else if (xferred < obj_request->length && !obj_request->result) {
1406 zero_bio_chain(obj_request->bio_list, xferred);
1407 xferred = obj_request->length;
1408 }
1409 obj_request->xferred = xferred;
1410 atomic_set(&obj_request->done, 1);
1411 }
1412
1413 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request,
1414 struct ceph_osd_op *op)
1415 {
1416 obj_request->xferred = le64_to_cpu(op->extent.length);
1417 atomic_set(&obj_request->done, 1);
1418 }
1419
1420 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1421 struct ceph_msg *msg)
1422 {
1423 struct rbd_obj_request *obj_request = osd_req->r_priv;
1424 struct ceph_osd_reply_head *reply_head;
1425 struct ceph_osd_op *op;
1426 u32 num_ops;
1427 u16 opcode;
1428
1429 rbd_assert(osd_req == obj_request->osd_req);
1430 rbd_assert(!!obj_request->img_request ^
1431 (obj_request->which == BAD_WHICH));
1432
1433 obj_request->xferred = le32_to_cpu(msg->hdr.data_len);
1434 reply_head = msg->front.iov_base;
1435 obj_request->result = (s32) le32_to_cpu(reply_head->result);
1436 obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1437
1438 num_ops = le32_to_cpu(reply_head->num_ops);
1439 WARN_ON(num_ops != 1); /* For now */
1440
1441 op = &reply_head->ops[0];
1442 opcode = le16_to_cpu(op->op);
1443 switch (opcode) {
1444 case CEPH_OSD_OP_READ:
1445 rbd_osd_read_callback(obj_request, op);
1446 break;
1447 case CEPH_OSD_OP_WRITE:
1448 rbd_osd_write_callback(obj_request, op);
1449 break;
1450 case CEPH_OSD_OP_NOTIFY_ACK:
1451 case CEPH_OSD_OP_WATCH:
1452 rbd_osd_trivial_callback(obj_request, op);
1453 break;
1454 default:
1455 rbd_warn(NULL, "%s: unsupported op %hu\n",
1456 obj_request->object_name, (unsigned short) opcode);
1457 break;
1458 }
1459
1460 if (atomic_read(&obj_request->done))
1461 rbd_obj_request_complete(obj_request);
1462 }
1463
1464 static struct ceph_osd_request *rbd_osd_req_create(
1465 struct rbd_device *rbd_dev,
1466 bool write_request,
1467 struct rbd_obj_request *obj_request,
1468 struct ceph_osd_req_op *op)
1469 {
1470 struct rbd_img_request *img_request = obj_request->img_request;
1471 struct ceph_snap_context *snapc = NULL;
1472 struct ceph_osd_client *osdc;
1473 struct ceph_osd_request *osd_req;
1474 struct timespec now;
1475 struct timespec *mtime;
1476 u64 snap_id = CEPH_NOSNAP;
1477 u64 offset = obj_request->offset;
1478 u64 length = obj_request->length;
1479
1480 if (img_request) {
1481 rbd_assert(img_request->write_request == write_request);
1482 if (img_request->write_request)
1483 snapc = img_request->snapc;
1484 else
1485 snap_id = img_request->snap_id;
1486 }
1487
1488 /* Allocate and initialize the request, for the single op */
1489
1490 osdc = &rbd_dev->rbd_client->client->osdc;
1491 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1492 if (!osd_req)
1493 return NULL; /* ENOMEM */
1494
1495 rbd_assert(obj_request_type_valid(obj_request->type));
1496 switch (obj_request->type) {
1497 case OBJ_REQUEST_NODATA:
1498 break; /* Nothing to do */
1499 case OBJ_REQUEST_BIO:
1500 rbd_assert(obj_request->bio_list != NULL);
1501 osd_req->r_bio = obj_request->bio_list;
1502 bio_get(osd_req->r_bio);
1503 /* osd client requires "num pages" even for bio */
1504 osd_req->r_num_pages = calc_pages_for(offset, length);
1505 break;
1506 case OBJ_REQUEST_PAGES:
1507 osd_req->r_pages = obj_request->pages;
1508 osd_req->r_num_pages = obj_request->page_count;
1509 osd_req->r_page_alignment = offset & ~PAGE_MASK;
1510 break;
1511 }
1512
1513 if (write_request) {
1514 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1515 now = CURRENT_TIME;
1516 mtime = &now;
1517 } else {
1518 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1519 mtime = NULL; /* not needed for reads */
1520 offset = 0; /* These are not used... */
1521 length = 0; /* ...for osd read requests */
1522 }
1523
1524 osd_req->r_callback = rbd_osd_req_callback;
1525 osd_req->r_priv = obj_request;
1526
1527 osd_req->r_oid_len = strlen(obj_request->object_name);
1528 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1529 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1530
1531 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1532
1533 /* osd_req will get its own reference to snapc (if non-null) */
1534
1535 ceph_osdc_build_request(osd_req, offset, length, 1, op,
1536 snapc, snap_id, mtime);
1537
1538 return osd_req;
1539 }
1540
1541 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1542 {
1543 ceph_osdc_put_request(osd_req);
1544 }
1545
1546 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1547
1548 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1549 u64 offset, u64 length,
1550 enum obj_request_type type)
1551 {
1552 struct rbd_obj_request *obj_request;
1553 size_t size;
1554 char *name;
1555
1556 rbd_assert(obj_request_type_valid(type));
1557
1558 size = strlen(object_name) + 1;
1559 obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1560 if (!obj_request)
1561 return NULL;
1562
1563 name = (char *)(obj_request + 1);
1564 obj_request->object_name = memcpy(name, object_name, size);
1565 obj_request->offset = offset;
1566 obj_request->length = length;
1567 obj_request->which = BAD_WHICH;
1568 obj_request->type = type;
1569 INIT_LIST_HEAD(&obj_request->links);
1570 atomic_set(&obj_request->done, 0);
1571 init_completion(&obj_request->completion);
1572 kref_init(&obj_request->kref);
1573
1574 return obj_request;
1575 }
1576
1577 static void rbd_obj_request_destroy(struct kref *kref)
1578 {
1579 struct rbd_obj_request *obj_request;
1580
1581 obj_request = container_of(kref, struct rbd_obj_request, kref);
1582
1583 rbd_assert(obj_request->img_request == NULL);
1584 rbd_assert(obj_request->which == BAD_WHICH);
1585
1586 if (obj_request->osd_req)
1587 rbd_osd_req_destroy(obj_request->osd_req);
1588
1589 rbd_assert(obj_request_type_valid(obj_request->type));
1590 switch (obj_request->type) {
1591 case OBJ_REQUEST_NODATA:
1592 break; /* Nothing to do */
1593 case OBJ_REQUEST_BIO:
1594 if (obj_request->bio_list)
1595 bio_chain_put(obj_request->bio_list);
1596 break;
1597 case OBJ_REQUEST_PAGES:
1598 if (obj_request->pages)
1599 ceph_release_page_vector(obj_request->pages,
1600 obj_request->page_count);
1601 break;
1602 }
1603
1604 kfree(obj_request);
1605 }
1606
1607 /*
1608 * Caller is responsible for filling in the list of object requests
1609 * that comprises the image request, and the Linux request pointer
1610 * (if there is one).
1611 */
1612 struct rbd_img_request *rbd_img_request_create(struct rbd_device *rbd_dev,
1613 u64 offset, u64 length,
1614 bool write_request)
1615 {
1616 struct rbd_img_request *img_request;
1617 struct ceph_snap_context *snapc = NULL;
1618
1619 img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1620 if (!img_request)
1621 return NULL;
1622
1623 if (write_request) {
1624 down_read(&rbd_dev->header_rwsem);
1625 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1626 up_read(&rbd_dev->header_rwsem);
1627 if (WARN_ON(!snapc)) {
1628 kfree(img_request);
1629 return NULL; /* Shouldn't happen */
1630 }
1631 }
1632
1633 img_request->rq = NULL;
1634 img_request->rbd_dev = rbd_dev;
1635 img_request->offset = offset;
1636 img_request->length = length;
1637 img_request->write_request = write_request;
1638 if (write_request)
1639 img_request->snapc = snapc;
1640 else
1641 img_request->snap_id = rbd_dev->spec->snap_id;
1642 spin_lock_init(&img_request->completion_lock);
1643 img_request->next_completion = 0;
1644 img_request->callback = NULL;
1645 img_request->obj_request_count = 0;
1646 INIT_LIST_HEAD(&img_request->obj_requests);
1647 kref_init(&img_request->kref);
1648
1649 rbd_img_request_get(img_request); /* Avoid a warning */
1650 rbd_img_request_put(img_request); /* TEMPORARY */
1651
1652 return img_request;
1653 }
1654
1655 static void rbd_img_request_destroy(struct kref *kref)
1656 {
1657 struct rbd_img_request *img_request;
1658 struct rbd_obj_request *obj_request;
1659 struct rbd_obj_request *next_obj_request;
1660
1661 img_request = container_of(kref, struct rbd_img_request, kref);
1662
1663 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1664 rbd_img_obj_request_del(img_request, obj_request);
1665
1666 if (img_request->write_request)
1667 ceph_put_snap_context(img_request->snapc);
1668
1669 kfree(img_request);
1670 }
1671
1672 static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
1673 struct bio *bio_list)
1674 {
1675 struct rbd_device *rbd_dev = img_request->rbd_dev;
1676 struct rbd_obj_request *obj_request = NULL;
1677 struct rbd_obj_request *next_obj_request;
1678 unsigned int bio_offset;
1679 u64 image_offset;
1680 u64 resid;
1681 u16 opcode;
1682
1683 opcode = img_request->write_request ? CEPH_OSD_OP_WRITE
1684 : CEPH_OSD_OP_READ;
1685 bio_offset = 0;
1686 image_offset = img_request->offset;
1687 rbd_assert(image_offset == bio_list->bi_sector << SECTOR_SHIFT);
1688 resid = img_request->length;
1689 while (resid) {
1690 const char *object_name;
1691 unsigned int clone_size;
1692 struct ceph_osd_req_op *op;
1693 u64 offset;
1694 u64 length;
1695
1696 object_name = rbd_segment_name(rbd_dev, image_offset);
1697 if (!object_name)
1698 goto out_unwind;
1699 offset = rbd_segment_offset(rbd_dev, image_offset);
1700 length = rbd_segment_length(rbd_dev, image_offset, resid);
1701 obj_request = rbd_obj_request_create(object_name,
1702 offset, length,
1703 OBJ_REQUEST_BIO);
1704 kfree(object_name); /* object request has its own copy */
1705 if (!obj_request)
1706 goto out_unwind;
1707
1708 rbd_assert(length <= (u64) UINT_MAX);
1709 clone_size = (unsigned int) length;
1710 obj_request->bio_list = bio_chain_clone_range(&bio_list,
1711 &bio_offset, clone_size,
1712 GFP_ATOMIC);
1713 if (!obj_request->bio_list)
1714 goto out_partial;
1715
1716 /*
1717 * Build up the op to use in building the osd
1718 * request. Note that the contents of the op are
1719 * copied by rbd_osd_req_create().
1720 */
1721 op = rbd_osd_req_op_create(opcode, offset, length);
1722 if (!op)
1723 goto out_partial;
1724 obj_request->osd_req = rbd_osd_req_create(rbd_dev,
1725 img_request->write_request,
1726 obj_request, op);
1727 rbd_osd_req_op_destroy(op);
1728 if (!obj_request->osd_req)
1729 goto out_partial;
1730 /* status and version are initially zero-filled */
1731
1732 rbd_img_obj_request_add(img_request, obj_request);
1733
1734 image_offset += length;
1735 resid -= length;
1736 }
1737
1738 return 0;
1739
1740 out_partial:
1741 rbd_obj_request_put(obj_request);
1742 out_unwind:
1743 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1744 rbd_obj_request_put(obj_request);
1745
1746 return -ENOMEM;
1747 }
1748
1749 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1750 {
1751 struct rbd_img_request *img_request;
1752 u32 which = obj_request->which;
1753 bool more = true;
1754
1755 img_request = obj_request->img_request;
1756 rbd_assert(img_request != NULL);
1757 rbd_assert(img_request->rq != NULL);
1758 rbd_assert(which != BAD_WHICH);
1759 rbd_assert(which < img_request->obj_request_count);
1760 rbd_assert(which >= img_request->next_completion);
1761
1762 spin_lock_irq(&img_request->completion_lock);
1763 if (which != img_request->next_completion)
1764 goto out;
1765
1766 for_each_obj_request_from(img_request, obj_request) {
1767 unsigned int xferred;
1768 int result;
1769
1770 rbd_assert(more);
1771 rbd_assert(which < img_request->obj_request_count);
1772
1773 if (!atomic_read(&obj_request->done))
1774 break;
1775
1776 rbd_assert(obj_request->xferred <= (u64) UINT_MAX);
1777 xferred = (unsigned int) obj_request->xferred;
1778 result = (int) obj_request->result;
1779 if (result)
1780 rbd_warn(NULL, "obj_request %s result %d xferred %u\n",
1781 img_request->write_request ? "write" : "read",
1782 result, xferred);
1783
1784 more = blk_end_request(img_request->rq, result, xferred);
1785 which++;
1786 }
1787 rbd_assert(more ^ (which == img_request->obj_request_count));
1788 img_request->next_completion = which;
1789 out:
1790 spin_unlock_irq(&img_request->completion_lock);
1791
1792 if (!more)
1793 rbd_img_request_complete(img_request);
1794 }
1795
1796 static int rbd_img_request_submit(struct rbd_img_request *img_request)
1797 {
1798 struct rbd_device *rbd_dev = img_request->rbd_dev;
1799 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1800 struct rbd_obj_request *obj_request;
1801
1802 for_each_obj_request(img_request, obj_request) {
1803 int ret;
1804
1805 obj_request->callback = rbd_img_obj_callback;
1806 ret = rbd_obj_request_submit(osdc, obj_request);
1807 if (ret)
1808 return ret;
1809 /*
1810 * The image request has its own reference to each
1811 * of its object requests, so we can safely drop the
1812 * initial one here.
1813 */
1814 rbd_obj_request_put(obj_request);
1815 }
1816
1817 return 0;
1818 }
1819
1820 static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev,
1821 u64 ver, u64 notify_id)
1822 {
1823 struct rbd_obj_request *obj_request;
1824 struct ceph_osd_req_op *op;
1825 struct ceph_osd_client *osdc;
1826 int ret;
1827
1828 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1829 OBJ_REQUEST_NODATA);
1830 if (!obj_request)
1831 return -ENOMEM;
1832
1833 ret = -ENOMEM;
1834 op = rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver);
1835 if (!op)
1836 goto out;
1837 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
1838 obj_request, op);
1839 rbd_osd_req_op_destroy(op);
1840 if (!obj_request->osd_req)
1841 goto out;
1842
1843 osdc = &rbd_dev->rbd_client->client->osdc;
1844 ret = rbd_obj_request_submit(osdc, obj_request);
1845 if (!ret)
1846 ret = rbd_obj_request_wait(obj_request);
1847 out:
1848 rbd_obj_request_put(obj_request);
1849
1850 return ret;
1851 }
1852
1853 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1854 {
1855 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1856 u64 hver;
1857 int rc;
1858
1859 if (!rbd_dev)
1860 return;
1861
1862 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1863 rbd_dev->header_name, (unsigned long long) notify_id,
1864 (unsigned int) opcode);
1865 rc = rbd_dev_refresh(rbd_dev, &hver);
1866 if (rc)
1867 rbd_warn(rbd_dev, "got notification but failed to "
1868 " update snaps: %d\n", rc);
1869
1870 (void) rbd_req_sync_notify_ack; /* avoid a warning */
1871 rbd_obj_notify_ack_sync(rbd_dev, hver, notify_id);
1872 }
1873
1874 /*
1875 * Request sync osd watch/unwatch. The value of "start" determines
1876 * whether a watch request is being initiated or torn down.
1877 */
1878 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
1879 {
1880 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1881 struct rbd_obj_request *obj_request;
1882 struct ceph_osd_req_op *op;
1883 int ret;
1884
1885 rbd_assert(start ^ !!rbd_dev->watch_event);
1886 rbd_assert(start ^ !!rbd_dev->watch_request);
1887
1888 if (start) {
1889 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0, rbd_dev,
1890 &rbd_dev->watch_event);
1891 if (ret < 0)
1892 return ret;
1893 }
1894
1895 ret = -ENOMEM;
1896 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1897 OBJ_REQUEST_NODATA);
1898 if (!obj_request)
1899 goto out_cancel;
1900
1901 op = rbd_osd_req_op_create(CEPH_OSD_OP_WATCH,
1902 rbd_dev->watch_event->cookie,
1903 rbd_dev->header.obj_version, start);
1904 if (!op)
1905 goto out_cancel;
1906 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true,
1907 obj_request, op);
1908 rbd_osd_req_op_destroy(op);
1909 if (!obj_request->osd_req)
1910 goto out_cancel;
1911
1912 if (start) {
1913 rbd_dev->watch_request = obj_request->osd_req;
1914 ceph_osdc_set_request_linger(osdc, rbd_dev->watch_request);
1915 }
1916 ret = rbd_obj_request_submit(osdc, obj_request);
1917 if (ret)
1918 goto out_cancel;
1919 ret = rbd_obj_request_wait(obj_request);
1920 if (ret)
1921 goto out_cancel;
1922
1923 ret = obj_request->result;
1924 if (ret)
1925 goto out_cancel;
1926
1927 if (start)
1928 goto done; /* Done if setting up the watch request */
1929 out_cancel:
1930 /* Cancel the event if we're tearing down, or on error */
1931 ceph_osdc_cancel_event(rbd_dev->watch_event);
1932 rbd_dev->watch_event = NULL;
1933 done:
1934 if (obj_request)
1935 rbd_obj_request_put(obj_request);
1936
1937 return ret;
1938 }
1939
1940 static void rbd_request_fn(struct request_queue *q)
1941 {
1942 struct rbd_device *rbd_dev = q->queuedata;
1943 bool read_only = rbd_dev->mapping.read_only;
1944 struct request *rq;
1945 int result;
1946
1947 while ((rq = blk_fetch_request(q))) {
1948 bool write_request = rq_data_dir(rq) == WRITE;
1949 struct rbd_img_request *img_request;
1950 u64 offset;
1951 u64 length;
1952
1953 /* Ignore any non-FS requests that filter through. */
1954
1955 if (rq->cmd_type != REQ_TYPE_FS) {
1956 __blk_end_request_all(rq, 0);
1957 continue;
1958 }
1959
1960 spin_unlock_irq(q->queue_lock);
1961
1962 /* Disallow writes to a read-only device */
1963
1964 if (write_request) {
1965 result = -EROFS;
1966 if (read_only)
1967 goto end_request;
1968 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
1969 }
1970
1971 /* Quit early if the snapshot has disappeared */
1972
1973 if (!atomic_read(&rbd_dev->exists)) {
1974 dout("request for non-existent snapshot");
1975 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
1976 result = -ENXIO;
1977 goto end_request;
1978 }
1979
1980 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
1981 length = (u64) blk_rq_bytes(rq);
1982
1983 result = -EINVAL;
1984 if (WARN_ON(offset && length > U64_MAX - offset + 1))
1985 goto end_request; /* Shouldn't happen */
1986
1987 result = -ENOMEM;
1988 img_request = rbd_img_request_create(rbd_dev, offset, length,
1989 write_request);
1990 if (!img_request)
1991 goto end_request;
1992
1993 img_request->rq = rq;
1994
1995 result = rbd_img_request_fill_bio(img_request, rq->bio);
1996 if (!result)
1997 result = rbd_img_request_submit(img_request);
1998 if (result)
1999 rbd_img_request_put(img_request);
2000 end_request:
2001 spin_lock_irq(q->queue_lock);
2002 if (result < 0) {
2003 rbd_warn(rbd_dev, "obj_request %s result %d\n",
2004 write_request ? "write" : "read", result);
2005 __blk_end_request_all(rq, result);
2006 }
2007 }
2008 }
2009
2010 /*
2011 * a queue callback. Makes sure that we don't create a bio that spans across
2012 * multiple osd objects. One exception would be with a single page bios,
2013 * which we handle later at bio_chain_clone_range()
2014 */
2015 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2016 struct bio_vec *bvec)
2017 {
2018 struct rbd_device *rbd_dev = q->queuedata;
2019 sector_t sector_offset;
2020 sector_t sectors_per_obj;
2021 sector_t obj_sector_offset;
2022 int ret;
2023
2024 /*
2025 * Find how far into its rbd object the partition-relative
2026 * bio start sector is to offset relative to the enclosing
2027 * device.
2028 */
2029 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2030 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2031 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2032
2033 /*
2034 * Compute the number of bytes from that offset to the end
2035 * of the object. Account for what's already used by the bio.
2036 */
2037 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2038 if (ret > bmd->bi_size)
2039 ret -= bmd->bi_size;
2040 else
2041 ret = 0;
2042
2043 /*
2044 * Don't send back more than was asked for. And if the bio
2045 * was empty, let the whole thing through because: "Note
2046 * that a block device *must* allow a single page to be
2047 * added to an empty bio."
2048 */
2049 rbd_assert(bvec->bv_len <= PAGE_SIZE);
2050 if (ret > (int) bvec->bv_len || !bmd->bi_size)
2051 ret = (int) bvec->bv_len;
2052
2053 return ret;
2054 }
2055
2056 static void rbd_free_disk(struct rbd_device *rbd_dev)
2057 {
2058 struct gendisk *disk = rbd_dev->disk;
2059
2060 if (!disk)
2061 return;
2062
2063 if (disk->flags & GENHD_FL_UP)
2064 del_gendisk(disk);
2065 if (disk->queue)
2066 blk_cleanup_queue(disk->queue);
2067 put_disk(disk);
2068 }
2069
2070 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2071 const char *object_name,
2072 u64 offset, u64 length,
2073 char *buf, u64 *version)
2074
2075 {
2076 struct ceph_osd_req_op *op;
2077 struct rbd_obj_request *obj_request;
2078 struct ceph_osd_client *osdc;
2079 struct page **pages = NULL;
2080 u32 page_count;
2081 int ret;
2082
2083 page_count = (u32) calc_pages_for(offset, length);
2084 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2085 if (IS_ERR(pages))
2086 ret = PTR_ERR(pages);
2087
2088 ret = -ENOMEM;
2089 obj_request = rbd_obj_request_create(object_name, offset, length,
2090 OBJ_REQUEST_PAGES);
2091 if (!obj_request)
2092 goto out;
2093
2094 obj_request->pages = pages;
2095 obj_request->page_count = page_count;
2096
2097 op = rbd_osd_req_op_create(CEPH_OSD_OP_READ, offset, length);
2098 if (!op)
2099 goto out;
2100 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2101 obj_request, op);
2102 rbd_osd_req_op_destroy(op);
2103 if (!obj_request->osd_req)
2104 goto out;
2105
2106 osdc = &rbd_dev->rbd_client->client->osdc;
2107 ret = rbd_obj_request_submit(osdc, obj_request);
2108 if (ret)
2109 goto out;
2110 ret = rbd_obj_request_wait(obj_request);
2111 if (ret)
2112 goto out;
2113
2114 ret = obj_request->result;
2115 if (ret < 0)
2116 goto out;
2117 ret = ceph_copy_from_page_vector(pages, buf, 0, obj_request->xferred);
2118 if (version)
2119 *version = obj_request->version;
2120 out:
2121 if (obj_request)
2122 rbd_obj_request_put(obj_request);
2123 else
2124 ceph_release_page_vector(pages, page_count);
2125
2126 return ret;
2127 }
2128
2129 /*
2130 * Read the complete header for the given rbd device.
2131 *
2132 * Returns a pointer to a dynamically-allocated buffer containing
2133 * the complete and validated header. Caller can pass the address
2134 * of a variable that will be filled in with the version of the
2135 * header object at the time it was read.
2136 *
2137 * Returns a pointer-coded errno if a failure occurs.
2138 */
2139 static struct rbd_image_header_ondisk *
2140 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2141 {
2142 struct rbd_image_header_ondisk *ondisk = NULL;
2143 u32 snap_count = 0;
2144 u64 names_size = 0;
2145 u32 want_count;
2146 int ret;
2147
2148 /*
2149 * The complete header will include an array of its 64-bit
2150 * snapshot ids, followed by the names of those snapshots as
2151 * a contiguous block of NUL-terminated strings. Note that
2152 * the number of snapshots could change by the time we read
2153 * it in, in which case we re-read it.
2154 */
2155 do {
2156 size_t size;
2157
2158 kfree(ondisk);
2159
2160 size = sizeof (*ondisk);
2161 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2162 size += names_size;
2163 ondisk = kmalloc(size, GFP_KERNEL);
2164 if (!ondisk)
2165 return ERR_PTR(-ENOMEM);
2166
2167 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2168 0, size,
2169 (char *) ondisk, version);
2170
2171 if (ret < 0)
2172 goto out_err;
2173 if (WARN_ON((size_t) ret < size)) {
2174 ret = -ENXIO;
2175 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2176 size, ret);
2177 goto out_err;
2178 }
2179 if (!rbd_dev_ondisk_valid(ondisk)) {
2180 ret = -ENXIO;
2181 rbd_warn(rbd_dev, "invalid header");
2182 goto out_err;
2183 }
2184
2185 names_size = le64_to_cpu(ondisk->snap_names_len);
2186 want_count = snap_count;
2187 snap_count = le32_to_cpu(ondisk->snap_count);
2188 } while (snap_count != want_count);
2189
2190 return ondisk;
2191
2192 out_err:
2193 kfree(ondisk);
2194
2195 return ERR_PTR(ret);
2196 }
2197
2198 /*
2199 * reload the ondisk the header
2200 */
2201 static int rbd_read_header(struct rbd_device *rbd_dev,
2202 struct rbd_image_header *header)
2203 {
2204 struct rbd_image_header_ondisk *ondisk;
2205 u64 ver = 0;
2206 int ret;
2207
2208 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2209 if (IS_ERR(ondisk))
2210 return PTR_ERR(ondisk);
2211 ret = rbd_header_from_disk(header, ondisk);
2212 if (ret >= 0)
2213 header->obj_version = ver;
2214 kfree(ondisk);
2215
2216 return ret;
2217 }
2218
2219 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
2220 {
2221 struct rbd_snap *snap;
2222 struct rbd_snap *next;
2223
2224 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
2225 rbd_remove_snap_dev(snap);
2226 }
2227
2228 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2229 {
2230 sector_t size;
2231
2232 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
2233 return;
2234
2235 size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
2236 dout("setting size to %llu sectors", (unsigned long long) size);
2237 rbd_dev->mapping.size = (u64) size;
2238 set_capacity(rbd_dev->disk, size);
2239 }
2240
2241 /*
2242 * only read the first part of the ondisk header, without the snaps info
2243 */
2244 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
2245 {
2246 int ret;
2247 struct rbd_image_header h;
2248
2249 ret = rbd_read_header(rbd_dev, &h);
2250 if (ret < 0)
2251 return ret;
2252
2253 down_write(&rbd_dev->header_rwsem);
2254
2255 /* Update image size, and check for resize of mapped image */
2256 rbd_dev->header.image_size = h.image_size;
2257 rbd_update_mapping_size(rbd_dev);
2258
2259 /* rbd_dev->header.object_prefix shouldn't change */
2260 kfree(rbd_dev->header.snap_sizes);
2261 kfree(rbd_dev->header.snap_names);
2262 /* osd requests may still refer to snapc */
2263 ceph_put_snap_context(rbd_dev->header.snapc);
2264
2265 if (hver)
2266 *hver = h.obj_version;
2267 rbd_dev->header.obj_version = h.obj_version;
2268 rbd_dev->header.image_size = h.image_size;
2269 rbd_dev->header.snapc = h.snapc;
2270 rbd_dev->header.snap_names = h.snap_names;
2271 rbd_dev->header.snap_sizes = h.snap_sizes;
2272 /* Free the extra copy of the object prefix */
2273 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
2274 kfree(h.object_prefix);
2275
2276 ret = rbd_dev_snaps_update(rbd_dev);
2277 if (!ret)
2278 ret = rbd_dev_snaps_register(rbd_dev);
2279
2280 up_write(&rbd_dev->header_rwsem);
2281
2282 return ret;
2283 }
2284
2285 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
2286 {
2287 int ret;
2288
2289 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
2290 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2291 if (rbd_dev->image_format == 1)
2292 ret = rbd_dev_v1_refresh(rbd_dev, hver);
2293 else
2294 ret = rbd_dev_v2_refresh(rbd_dev, hver);
2295 mutex_unlock(&ctl_mutex);
2296
2297 return ret;
2298 }
2299
2300 static int rbd_init_disk(struct rbd_device *rbd_dev)
2301 {
2302 struct gendisk *disk;
2303 struct request_queue *q;
2304 u64 segment_size;
2305
2306 /* create gendisk info */
2307 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
2308 if (!disk)
2309 return -ENOMEM;
2310
2311 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
2312 rbd_dev->dev_id);
2313 disk->major = rbd_dev->major;
2314 disk->first_minor = 0;
2315 disk->fops = &rbd_bd_ops;
2316 disk->private_data = rbd_dev;
2317
2318 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
2319 if (!q)
2320 goto out_disk;
2321
2322 /* We use the default size, but let's be explicit about it. */
2323 blk_queue_physical_block_size(q, SECTOR_SIZE);
2324
2325 /* set io sizes to object size */
2326 segment_size = rbd_obj_bytes(&rbd_dev->header);
2327 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
2328 blk_queue_max_segment_size(q, segment_size);
2329 blk_queue_io_min(q, segment_size);
2330 blk_queue_io_opt(q, segment_size);
2331
2332 blk_queue_merge_bvec(q, rbd_merge_bvec);
2333 disk->queue = q;
2334
2335 q->queuedata = rbd_dev;
2336
2337 rbd_dev->disk = disk;
2338
2339 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
2340
2341 return 0;
2342 out_disk:
2343 put_disk(disk);
2344
2345 return -ENOMEM;
2346 }
2347
2348 /*
2349 sysfs
2350 */
2351
2352 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
2353 {
2354 return container_of(dev, struct rbd_device, dev);
2355 }
2356
2357 static ssize_t rbd_size_show(struct device *dev,
2358 struct device_attribute *attr, char *buf)
2359 {
2360 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2361 sector_t size;
2362
2363 down_read(&rbd_dev->header_rwsem);
2364 size = get_capacity(rbd_dev->disk);
2365 up_read(&rbd_dev->header_rwsem);
2366
2367 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
2368 }
2369
2370 /*
2371 * Note this shows the features for whatever's mapped, which is not
2372 * necessarily the base image.
2373 */
2374 static ssize_t rbd_features_show(struct device *dev,
2375 struct device_attribute *attr, char *buf)
2376 {
2377 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2378
2379 return sprintf(buf, "0x%016llx\n",
2380 (unsigned long long) rbd_dev->mapping.features);
2381 }
2382
2383 static ssize_t rbd_major_show(struct device *dev,
2384 struct device_attribute *attr, char *buf)
2385 {
2386 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2387
2388 return sprintf(buf, "%d\n", rbd_dev->major);
2389 }
2390
2391 static ssize_t rbd_client_id_show(struct device *dev,
2392 struct device_attribute *attr, char *buf)
2393 {
2394 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2395
2396 return sprintf(buf, "client%lld\n",
2397 ceph_client_id(rbd_dev->rbd_client->client));
2398 }
2399
2400 static ssize_t rbd_pool_show(struct device *dev,
2401 struct device_attribute *attr, char *buf)
2402 {
2403 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2404
2405 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
2406 }
2407
2408 static ssize_t rbd_pool_id_show(struct device *dev,
2409 struct device_attribute *attr, char *buf)
2410 {
2411 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2412
2413 return sprintf(buf, "%llu\n",
2414 (unsigned long long) rbd_dev->spec->pool_id);
2415 }
2416
2417 static ssize_t rbd_name_show(struct device *dev,
2418 struct device_attribute *attr, char *buf)
2419 {
2420 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2421
2422 if (rbd_dev->spec->image_name)
2423 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2424
2425 return sprintf(buf, "(unknown)\n");
2426 }
2427
2428 static ssize_t rbd_image_id_show(struct device *dev,
2429 struct device_attribute *attr, char *buf)
2430 {
2431 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2432
2433 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
2434 }
2435
2436 /*
2437 * Shows the name of the currently-mapped snapshot (or
2438 * RBD_SNAP_HEAD_NAME for the base image).
2439 */
2440 static ssize_t rbd_snap_show(struct device *dev,
2441 struct device_attribute *attr,
2442 char *buf)
2443 {
2444 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2445
2446 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2447 }
2448
2449 /*
2450 * For an rbd v2 image, shows the pool id, image id, and snapshot id
2451 * for the parent image. If there is no parent, simply shows
2452 * "(no parent image)".
2453 */
2454 static ssize_t rbd_parent_show(struct device *dev,
2455 struct device_attribute *attr,
2456 char *buf)
2457 {
2458 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2459 struct rbd_spec *spec = rbd_dev->parent_spec;
2460 int count;
2461 char *bufp = buf;
2462
2463 if (!spec)
2464 return sprintf(buf, "(no parent image)\n");
2465
2466 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2467 (unsigned long long) spec->pool_id, spec->pool_name);
2468 if (count < 0)
2469 return count;
2470 bufp += count;
2471
2472 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2473 spec->image_name ? spec->image_name : "(unknown)");
2474 if (count < 0)
2475 return count;
2476 bufp += count;
2477
2478 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2479 (unsigned long long) spec->snap_id, spec->snap_name);
2480 if (count < 0)
2481 return count;
2482 bufp += count;
2483
2484 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2485 if (count < 0)
2486 return count;
2487 bufp += count;
2488
2489 return (ssize_t) (bufp - buf);
2490 }
2491
2492 static ssize_t rbd_image_refresh(struct device *dev,
2493 struct device_attribute *attr,
2494 const char *buf,
2495 size_t size)
2496 {
2497 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2498 int ret;
2499
2500 ret = rbd_dev_refresh(rbd_dev, NULL);
2501
2502 return ret < 0 ? ret : size;
2503 }
2504
2505 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2506 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2507 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2508 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2509 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2510 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2511 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2512 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2513 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2514 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2515 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
2516
2517 static struct attribute *rbd_attrs[] = {
2518 &dev_attr_size.attr,
2519 &dev_attr_features.attr,
2520 &dev_attr_major.attr,
2521 &dev_attr_client_id.attr,
2522 &dev_attr_pool.attr,
2523 &dev_attr_pool_id.attr,
2524 &dev_attr_name.attr,
2525 &dev_attr_image_id.attr,
2526 &dev_attr_current_snap.attr,
2527 &dev_attr_parent.attr,
2528 &dev_attr_refresh.attr,
2529 NULL
2530 };
2531
2532 static struct attribute_group rbd_attr_group = {
2533 .attrs = rbd_attrs,
2534 };
2535
2536 static const struct attribute_group *rbd_attr_groups[] = {
2537 &rbd_attr_group,
2538 NULL
2539 };
2540
2541 static void rbd_sysfs_dev_release(struct device *dev)
2542 {
2543 }
2544
2545 static struct device_type rbd_device_type = {
2546 .name = "rbd",
2547 .groups = rbd_attr_groups,
2548 .release = rbd_sysfs_dev_release,
2549 };
2550
2551
2552 /*
2553 sysfs - snapshots
2554 */
2555
2556 static ssize_t rbd_snap_size_show(struct device *dev,
2557 struct device_attribute *attr,
2558 char *buf)
2559 {
2560 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2561
2562 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2563 }
2564
2565 static ssize_t rbd_snap_id_show(struct device *dev,
2566 struct device_attribute *attr,
2567 char *buf)
2568 {
2569 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2570
2571 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2572 }
2573
2574 static ssize_t rbd_snap_features_show(struct device *dev,
2575 struct device_attribute *attr,
2576 char *buf)
2577 {
2578 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2579
2580 return sprintf(buf, "0x%016llx\n",
2581 (unsigned long long) snap->features);
2582 }
2583
2584 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2585 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2586 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2587
2588 static struct attribute *rbd_snap_attrs[] = {
2589 &dev_attr_snap_size.attr,
2590 &dev_attr_snap_id.attr,
2591 &dev_attr_snap_features.attr,
2592 NULL,
2593 };
2594
2595 static struct attribute_group rbd_snap_attr_group = {
2596 .attrs = rbd_snap_attrs,
2597 };
2598
2599 static void rbd_snap_dev_release(struct device *dev)
2600 {
2601 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2602 kfree(snap->name);
2603 kfree(snap);
2604 }
2605
2606 static const struct attribute_group *rbd_snap_attr_groups[] = {
2607 &rbd_snap_attr_group,
2608 NULL
2609 };
2610
2611 static struct device_type rbd_snap_device_type = {
2612 .groups = rbd_snap_attr_groups,
2613 .release = rbd_snap_dev_release,
2614 };
2615
2616 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2617 {
2618 kref_get(&spec->kref);
2619
2620 return spec;
2621 }
2622
2623 static void rbd_spec_free(struct kref *kref);
2624 static void rbd_spec_put(struct rbd_spec *spec)
2625 {
2626 if (spec)
2627 kref_put(&spec->kref, rbd_spec_free);
2628 }
2629
2630 static struct rbd_spec *rbd_spec_alloc(void)
2631 {
2632 struct rbd_spec *spec;
2633
2634 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2635 if (!spec)
2636 return NULL;
2637 kref_init(&spec->kref);
2638
2639 rbd_spec_put(rbd_spec_get(spec)); /* TEMPORARY */
2640
2641 return spec;
2642 }
2643
2644 static void rbd_spec_free(struct kref *kref)
2645 {
2646 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2647
2648 kfree(spec->pool_name);
2649 kfree(spec->image_id);
2650 kfree(spec->image_name);
2651 kfree(spec->snap_name);
2652 kfree(spec);
2653 }
2654
2655 struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2656 struct rbd_spec *spec)
2657 {
2658 struct rbd_device *rbd_dev;
2659
2660 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2661 if (!rbd_dev)
2662 return NULL;
2663
2664 spin_lock_init(&rbd_dev->lock);
2665 atomic_set(&rbd_dev->exists, 0);
2666 INIT_LIST_HEAD(&rbd_dev->node);
2667 INIT_LIST_HEAD(&rbd_dev->snaps);
2668 init_rwsem(&rbd_dev->header_rwsem);
2669
2670 rbd_dev->spec = spec;
2671 rbd_dev->rbd_client = rbdc;
2672
2673 /* Initialize the layout used for all rbd requests */
2674
2675 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2676 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
2677 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2678 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
2679
2680 return rbd_dev;
2681 }
2682
2683 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2684 {
2685 rbd_spec_put(rbd_dev->parent_spec);
2686 kfree(rbd_dev->header_name);
2687 rbd_put_client(rbd_dev->rbd_client);
2688 rbd_spec_put(rbd_dev->spec);
2689 kfree(rbd_dev);
2690 }
2691
2692 static bool rbd_snap_registered(struct rbd_snap *snap)
2693 {
2694 bool ret = snap->dev.type == &rbd_snap_device_type;
2695 bool reg = device_is_registered(&snap->dev);
2696
2697 rbd_assert(!ret ^ reg);
2698
2699 return ret;
2700 }
2701
2702 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2703 {
2704 list_del(&snap->node);
2705 if (device_is_registered(&snap->dev))
2706 device_unregister(&snap->dev);
2707 }
2708
2709 static int rbd_register_snap_dev(struct rbd_snap *snap,
2710 struct device *parent)
2711 {
2712 struct device *dev = &snap->dev;
2713 int ret;
2714
2715 dev->type = &rbd_snap_device_type;
2716 dev->parent = parent;
2717 dev->release = rbd_snap_dev_release;
2718 dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2719 dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2720
2721 ret = device_register(dev);
2722
2723 return ret;
2724 }
2725
2726 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2727 const char *snap_name,
2728 u64 snap_id, u64 snap_size,
2729 u64 snap_features)
2730 {
2731 struct rbd_snap *snap;
2732 int ret;
2733
2734 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2735 if (!snap)
2736 return ERR_PTR(-ENOMEM);
2737
2738 ret = -ENOMEM;
2739 snap->name = kstrdup(snap_name, GFP_KERNEL);
2740 if (!snap->name)
2741 goto err;
2742
2743 snap->id = snap_id;
2744 snap->size = snap_size;
2745 snap->features = snap_features;
2746
2747 return snap;
2748
2749 err:
2750 kfree(snap->name);
2751 kfree(snap);
2752
2753 return ERR_PTR(ret);
2754 }
2755
2756 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2757 u64 *snap_size, u64 *snap_features)
2758 {
2759 char *snap_name;
2760
2761 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2762
2763 *snap_size = rbd_dev->header.snap_sizes[which];
2764 *snap_features = 0; /* No features for v1 */
2765
2766 /* Skip over names until we find the one we are looking for */
2767
2768 snap_name = rbd_dev->header.snap_names;
2769 while (which--)
2770 snap_name += strlen(snap_name) + 1;
2771
2772 return snap_name;
2773 }
2774
2775 /*
2776 * Get the size and object order for an image snapshot, or if
2777 * snap_id is CEPH_NOSNAP, gets this information for the base
2778 * image.
2779 */
2780 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2781 u8 *order, u64 *snap_size)
2782 {
2783 __le64 snapid = cpu_to_le64(snap_id);
2784 int ret;
2785 struct {
2786 u8 order;
2787 __le64 size;
2788 } __attribute__ ((packed)) size_buf = { 0 };
2789
2790 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2791 "rbd", "get_size",
2792 (char *) &snapid, sizeof (snapid),
2793 (char *) &size_buf, sizeof (size_buf), NULL);
2794 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2795 if (ret < 0)
2796 return ret;
2797
2798 *order = size_buf.order;
2799 *snap_size = le64_to_cpu(size_buf.size);
2800
2801 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
2802 (unsigned long long) snap_id, (unsigned int) *order,
2803 (unsigned long long) *snap_size);
2804
2805 return 0;
2806 }
2807
2808 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2809 {
2810 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2811 &rbd_dev->header.obj_order,
2812 &rbd_dev->header.image_size);
2813 }
2814
2815 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2816 {
2817 void *reply_buf;
2818 int ret;
2819 void *p;
2820
2821 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2822 if (!reply_buf)
2823 return -ENOMEM;
2824
2825 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2826 "rbd", "get_object_prefix",
2827 NULL, 0,
2828 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
2829 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2830 if (ret < 0)
2831 goto out;
2832 ret = 0; /* rbd_req_sync_exec() can return positive */
2833
2834 p = reply_buf;
2835 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2836 p + RBD_OBJ_PREFIX_LEN_MAX,
2837 NULL, GFP_NOIO);
2838
2839 if (IS_ERR(rbd_dev->header.object_prefix)) {
2840 ret = PTR_ERR(rbd_dev->header.object_prefix);
2841 rbd_dev->header.object_prefix = NULL;
2842 } else {
2843 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
2844 }
2845
2846 out:
2847 kfree(reply_buf);
2848
2849 return ret;
2850 }
2851
2852 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2853 u64 *snap_features)
2854 {
2855 __le64 snapid = cpu_to_le64(snap_id);
2856 struct {
2857 __le64 features;
2858 __le64 incompat;
2859 } features_buf = { 0 };
2860 u64 incompat;
2861 int ret;
2862
2863 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2864 "rbd", "get_features",
2865 (char *) &snapid, sizeof (snapid),
2866 (char *) &features_buf, sizeof (features_buf),
2867 NULL);
2868 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2869 if (ret < 0)
2870 return ret;
2871
2872 incompat = le64_to_cpu(features_buf.incompat);
2873 if (incompat & ~RBD_FEATURES_ALL)
2874 return -ENXIO;
2875
2876 *snap_features = le64_to_cpu(features_buf.features);
2877
2878 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2879 (unsigned long long) snap_id,
2880 (unsigned long long) *snap_features,
2881 (unsigned long long) le64_to_cpu(features_buf.incompat));
2882
2883 return 0;
2884 }
2885
2886 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2887 {
2888 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2889 &rbd_dev->header.features);
2890 }
2891
2892 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2893 {
2894 struct rbd_spec *parent_spec;
2895 size_t size;
2896 void *reply_buf = NULL;
2897 __le64 snapid;
2898 void *p;
2899 void *end;
2900 char *image_id;
2901 u64 overlap;
2902 int ret;
2903
2904 parent_spec = rbd_spec_alloc();
2905 if (!parent_spec)
2906 return -ENOMEM;
2907
2908 size = sizeof (__le64) + /* pool_id */
2909 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
2910 sizeof (__le64) + /* snap_id */
2911 sizeof (__le64); /* overlap */
2912 reply_buf = kmalloc(size, GFP_KERNEL);
2913 if (!reply_buf) {
2914 ret = -ENOMEM;
2915 goto out_err;
2916 }
2917
2918 snapid = cpu_to_le64(CEPH_NOSNAP);
2919 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2920 "rbd", "get_parent",
2921 (char *) &snapid, sizeof (snapid),
2922 (char *) reply_buf, size, NULL);
2923 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2924 if (ret < 0)
2925 goto out_err;
2926
2927 ret = -ERANGE;
2928 p = reply_buf;
2929 end = (char *) reply_buf + size;
2930 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
2931 if (parent_spec->pool_id == CEPH_NOPOOL)
2932 goto out; /* No parent? No problem. */
2933
2934 /* The ceph file layout needs to fit pool id in 32 bits */
2935
2936 ret = -EIO;
2937 if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
2938 goto out;
2939
2940 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2941 if (IS_ERR(image_id)) {
2942 ret = PTR_ERR(image_id);
2943 goto out_err;
2944 }
2945 parent_spec->image_id = image_id;
2946 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
2947 ceph_decode_64_safe(&p, end, overlap, out_err);
2948
2949 rbd_dev->parent_overlap = overlap;
2950 rbd_dev->parent_spec = parent_spec;
2951 parent_spec = NULL; /* rbd_dev now owns this */
2952 out:
2953 ret = 0;
2954 out_err:
2955 kfree(reply_buf);
2956 rbd_spec_put(parent_spec);
2957
2958 return ret;
2959 }
2960
2961 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
2962 {
2963 size_t image_id_size;
2964 char *image_id;
2965 void *p;
2966 void *end;
2967 size_t size;
2968 void *reply_buf = NULL;
2969 size_t len = 0;
2970 char *image_name = NULL;
2971 int ret;
2972
2973 rbd_assert(!rbd_dev->spec->image_name);
2974
2975 len = strlen(rbd_dev->spec->image_id);
2976 image_id_size = sizeof (__le32) + len;
2977 image_id = kmalloc(image_id_size, GFP_KERNEL);
2978 if (!image_id)
2979 return NULL;
2980
2981 p = image_id;
2982 end = (char *) image_id + image_id_size;
2983 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
2984
2985 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
2986 reply_buf = kmalloc(size, GFP_KERNEL);
2987 if (!reply_buf)
2988 goto out;
2989
2990 ret = rbd_req_sync_exec(rbd_dev, RBD_DIRECTORY,
2991 "rbd", "dir_get_name",
2992 image_id, image_id_size,
2993 (char *) reply_buf, size, NULL);
2994 if (ret < 0)
2995 goto out;
2996 p = reply_buf;
2997 end = (char *) reply_buf + size;
2998 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
2999 if (IS_ERR(image_name))
3000 image_name = NULL;
3001 else
3002 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3003 out:
3004 kfree(reply_buf);
3005 kfree(image_id);
3006
3007 return image_name;
3008 }
3009
3010 /*
3011 * When a parent image gets probed, we only have the pool, image,
3012 * and snapshot ids but not the names of any of them. This call
3013 * is made later to fill in those names. It has to be done after
3014 * rbd_dev_snaps_update() has completed because some of the
3015 * information (in particular, snapshot name) is not available
3016 * until then.
3017 */
3018 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
3019 {
3020 struct ceph_osd_client *osdc;
3021 const char *name;
3022 void *reply_buf = NULL;
3023 int ret;
3024
3025 if (rbd_dev->spec->pool_name)
3026 return 0; /* Already have the names */
3027
3028 /* Look up the pool name */
3029
3030 osdc = &rbd_dev->rbd_client->client->osdc;
3031 name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
3032 if (!name) {
3033 rbd_warn(rbd_dev, "there is no pool with id %llu",
3034 rbd_dev->spec->pool_id); /* Really a BUG() */
3035 return -EIO;
3036 }
3037
3038 rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3039 if (!rbd_dev->spec->pool_name)
3040 return -ENOMEM;
3041
3042 /* Fetch the image name; tolerate failure here */
3043
3044 name = rbd_dev_image_name(rbd_dev);
3045 if (name)
3046 rbd_dev->spec->image_name = (char *) name;
3047 else
3048 rbd_warn(rbd_dev, "unable to get image name");
3049
3050 /* Look up the snapshot name. */
3051
3052 name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3053 if (!name) {
3054 rbd_warn(rbd_dev, "no snapshot with id %llu",
3055 rbd_dev->spec->snap_id); /* Really a BUG() */
3056 ret = -EIO;
3057 goto out_err;
3058 }
3059 rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3060 if(!rbd_dev->spec->snap_name)
3061 goto out_err;
3062
3063 return 0;
3064 out_err:
3065 kfree(reply_buf);
3066 kfree(rbd_dev->spec->pool_name);
3067 rbd_dev->spec->pool_name = NULL;
3068
3069 return ret;
3070 }
3071
3072 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3073 {
3074 size_t size;
3075 int ret;
3076 void *reply_buf;
3077 void *p;
3078 void *end;
3079 u64 seq;
3080 u32 snap_count;
3081 struct ceph_snap_context *snapc;
3082 u32 i;
3083
3084 /*
3085 * We'll need room for the seq value (maximum snapshot id),
3086 * snapshot count, and array of that many snapshot ids.
3087 * For now we have a fixed upper limit on the number we're
3088 * prepared to receive.
3089 */
3090 size = sizeof (__le64) + sizeof (__le32) +
3091 RBD_MAX_SNAP_COUNT * sizeof (__le64);
3092 reply_buf = kzalloc(size, GFP_KERNEL);
3093 if (!reply_buf)
3094 return -ENOMEM;
3095
3096 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
3097 "rbd", "get_snapcontext",
3098 NULL, 0,
3099 reply_buf, size, ver);
3100 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
3101 if (ret < 0)
3102 goto out;
3103
3104 ret = -ERANGE;
3105 p = reply_buf;
3106 end = (char *) reply_buf + size;
3107 ceph_decode_64_safe(&p, end, seq, out);
3108 ceph_decode_32_safe(&p, end, snap_count, out);
3109
3110 /*
3111 * Make sure the reported number of snapshot ids wouldn't go
3112 * beyond the end of our buffer. But before checking that,
3113 * make sure the computed size of the snapshot context we
3114 * allocate is representable in a size_t.
3115 */
3116 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3117 / sizeof (u64)) {
3118 ret = -EINVAL;
3119 goto out;
3120 }
3121 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3122 goto out;
3123
3124 size = sizeof (struct ceph_snap_context) +
3125 snap_count * sizeof (snapc->snaps[0]);
3126 snapc = kmalloc(size, GFP_KERNEL);
3127 if (!snapc) {
3128 ret = -ENOMEM;
3129 goto out;
3130 }
3131
3132 atomic_set(&snapc->nref, 1);
3133 snapc->seq = seq;
3134 snapc->num_snaps = snap_count;
3135 for (i = 0; i < snap_count; i++)
3136 snapc->snaps[i] = ceph_decode_64(&p);
3137
3138 rbd_dev->header.snapc = snapc;
3139
3140 dout(" snap context seq = %llu, snap_count = %u\n",
3141 (unsigned long long) seq, (unsigned int) snap_count);
3142
3143 out:
3144 kfree(reply_buf);
3145
3146 return 0;
3147 }
3148
3149 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3150 {
3151 size_t size;
3152 void *reply_buf;
3153 __le64 snap_id;
3154 int ret;
3155 void *p;
3156 void *end;
3157 char *snap_name;
3158
3159 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3160 reply_buf = kmalloc(size, GFP_KERNEL);
3161 if (!reply_buf)
3162 return ERR_PTR(-ENOMEM);
3163
3164 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3165 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
3166 "rbd", "get_snapshot_name",
3167 (char *) &snap_id, sizeof (snap_id),
3168 reply_buf, size, NULL);
3169 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
3170 if (ret < 0)
3171 goto out;
3172
3173 p = reply_buf;
3174 end = (char *) reply_buf + size;
3175 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3176 if (IS_ERR(snap_name)) {
3177 ret = PTR_ERR(snap_name);
3178 goto out;
3179 } else {
3180 dout(" snap_id 0x%016llx snap_name = %s\n",
3181 (unsigned long long) le64_to_cpu(snap_id), snap_name);
3182 }
3183 kfree(reply_buf);
3184
3185 return snap_name;
3186 out:
3187 kfree(reply_buf);
3188
3189 return ERR_PTR(ret);
3190 }
3191
3192 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3193 u64 *snap_size, u64 *snap_features)
3194 {
3195 u64 snap_id;
3196 u8 order;
3197 int ret;
3198
3199 snap_id = rbd_dev->header.snapc->snaps[which];
3200 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3201 if (ret)
3202 return ERR_PTR(ret);
3203 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3204 if (ret)
3205 return ERR_PTR(ret);
3206
3207 return rbd_dev_v2_snap_name(rbd_dev, which);
3208 }
3209
3210 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3211 u64 *snap_size, u64 *snap_features)
3212 {
3213 if (rbd_dev->image_format == 1)
3214 return rbd_dev_v1_snap_info(rbd_dev, which,
3215 snap_size, snap_features);
3216 if (rbd_dev->image_format == 2)
3217 return rbd_dev_v2_snap_info(rbd_dev, which,
3218 snap_size, snap_features);
3219 return ERR_PTR(-EINVAL);
3220 }
3221
3222 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3223 {
3224 int ret;
3225 __u8 obj_order;
3226
3227 down_write(&rbd_dev->header_rwsem);
3228
3229 /* Grab old order first, to see if it changes */
3230
3231 obj_order = rbd_dev->header.obj_order,
3232 ret = rbd_dev_v2_image_size(rbd_dev);
3233 if (ret)
3234 goto out;
3235 if (rbd_dev->header.obj_order != obj_order) {
3236 ret = -EIO;
3237 goto out;
3238 }
3239 rbd_update_mapping_size(rbd_dev);
3240
3241 ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3242 dout("rbd_dev_v2_snap_context returned %d\n", ret);
3243 if (ret)
3244 goto out;
3245 ret = rbd_dev_snaps_update(rbd_dev);
3246 dout("rbd_dev_snaps_update returned %d\n", ret);
3247 if (ret)
3248 goto out;
3249 ret = rbd_dev_snaps_register(rbd_dev);
3250 dout("rbd_dev_snaps_register returned %d\n", ret);
3251 out:
3252 up_write(&rbd_dev->header_rwsem);
3253
3254 return ret;
3255 }
3256
3257 /*
3258 * Scan the rbd device's current snapshot list and compare it to the
3259 * newly-received snapshot context. Remove any existing snapshots
3260 * not present in the new snapshot context. Add a new snapshot for
3261 * any snaphots in the snapshot context not in the current list.
3262 * And verify there are no changes to snapshots we already know
3263 * about.
3264 *
3265 * Assumes the snapshots in the snapshot context are sorted by
3266 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
3267 * are also maintained in that order.)
3268 */
3269 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
3270 {
3271 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3272 const u32 snap_count = snapc->num_snaps;
3273 struct list_head *head = &rbd_dev->snaps;
3274 struct list_head *links = head->next;
3275 u32 index = 0;
3276
3277 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
3278 while (index < snap_count || links != head) {
3279 u64 snap_id;
3280 struct rbd_snap *snap;
3281 char *snap_name;
3282 u64 snap_size = 0;
3283 u64 snap_features = 0;
3284
3285 snap_id = index < snap_count ? snapc->snaps[index]
3286 : CEPH_NOSNAP;
3287 snap = links != head ? list_entry(links, struct rbd_snap, node)
3288 : NULL;
3289 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
3290
3291 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
3292 struct list_head *next = links->next;
3293
3294 /* Existing snapshot not in the new snap context */
3295
3296 if (rbd_dev->spec->snap_id == snap->id)
3297 atomic_set(&rbd_dev->exists, 0);
3298 rbd_remove_snap_dev(snap);
3299 dout("%ssnap id %llu has been removed\n",
3300 rbd_dev->spec->snap_id == snap->id ?
3301 "mapped " : "",
3302 (unsigned long long) snap->id);
3303
3304 /* Done with this list entry; advance */
3305
3306 links = next;
3307 continue;
3308 }
3309
3310 snap_name = rbd_dev_snap_info(rbd_dev, index,
3311 &snap_size, &snap_features);
3312 if (IS_ERR(snap_name))
3313 return PTR_ERR(snap_name);
3314
3315 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
3316 (unsigned long long) snap_id);
3317 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
3318 struct rbd_snap *new_snap;
3319
3320 /* We haven't seen this snapshot before */
3321
3322 new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
3323 snap_id, snap_size, snap_features);
3324 if (IS_ERR(new_snap)) {
3325 int err = PTR_ERR(new_snap);
3326
3327 dout(" failed to add dev, error %d\n", err);
3328
3329 return err;
3330 }
3331
3332 /* New goes before existing, or at end of list */
3333
3334 dout(" added dev%s\n", snap ? "" : " at end\n");
3335 if (snap)
3336 list_add_tail(&new_snap->node, &snap->node);
3337 else
3338 list_add_tail(&new_snap->node, head);
3339 } else {
3340 /* Already have this one */
3341
3342 dout(" already present\n");
3343
3344 rbd_assert(snap->size == snap_size);
3345 rbd_assert(!strcmp(snap->name, snap_name));
3346 rbd_assert(snap->features == snap_features);
3347
3348 /* Done with this list entry; advance */
3349
3350 links = links->next;
3351 }
3352
3353 /* Advance to the next entry in the snapshot context */
3354
3355 index++;
3356 }
3357 dout("%s: done\n", __func__);
3358
3359 return 0;
3360 }
3361
3362 /*
3363 * Scan the list of snapshots and register the devices for any that
3364 * have not already been registered.
3365 */
3366 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
3367 {
3368 struct rbd_snap *snap;
3369 int ret = 0;
3370
3371 dout("%s called\n", __func__);
3372 if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
3373 return -EIO;
3374
3375 list_for_each_entry(snap, &rbd_dev->snaps, node) {
3376 if (!rbd_snap_registered(snap)) {
3377 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
3378 if (ret < 0)
3379 break;
3380 }
3381 }
3382 dout("%s: returning %d\n", __func__, ret);
3383
3384 return ret;
3385 }
3386
3387 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
3388 {
3389 struct device *dev;
3390 int ret;
3391
3392 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3393
3394 dev = &rbd_dev->dev;
3395 dev->bus = &rbd_bus_type;
3396 dev->type = &rbd_device_type;
3397 dev->parent = &rbd_root_dev;
3398 dev->release = rbd_dev_release;
3399 dev_set_name(dev, "%d", rbd_dev->dev_id);
3400 ret = device_register(dev);
3401
3402 mutex_unlock(&ctl_mutex);
3403
3404 return ret;
3405 }
3406
3407 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
3408 {
3409 device_unregister(&rbd_dev->dev);
3410 }
3411
3412 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
3413
3414 /*
3415 * Get a unique rbd identifier for the given new rbd_dev, and add
3416 * the rbd_dev to the global list. The minimum rbd id is 1.
3417 */
3418 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
3419 {
3420 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
3421
3422 spin_lock(&rbd_dev_list_lock);
3423 list_add_tail(&rbd_dev->node, &rbd_dev_list);
3424 spin_unlock(&rbd_dev_list_lock);
3425 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3426 (unsigned long long) rbd_dev->dev_id);
3427 }
3428
3429 /*
3430 * Remove an rbd_dev from the global list, and record that its
3431 * identifier is no longer in use.
3432 */
3433 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
3434 {
3435 struct list_head *tmp;
3436 int rbd_id = rbd_dev->dev_id;
3437 int max_id;
3438
3439 rbd_assert(rbd_id > 0);
3440
3441 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3442 (unsigned long long) rbd_dev->dev_id);
3443 spin_lock(&rbd_dev_list_lock);
3444 list_del_init(&rbd_dev->node);
3445
3446 /*
3447 * If the id being "put" is not the current maximum, there
3448 * is nothing special we need to do.
3449 */
3450 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
3451 spin_unlock(&rbd_dev_list_lock);
3452 return;
3453 }
3454
3455 /*
3456 * We need to update the current maximum id. Search the
3457 * list to find out what it is. We're more likely to find
3458 * the maximum at the end, so search the list backward.
3459 */
3460 max_id = 0;
3461 list_for_each_prev(tmp, &rbd_dev_list) {
3462 struct rbd_device *rbd_dev;
3463
3464 rbd_dev = list_entry(tmp, struct rbd_device, node);
3465 if (rbd_dev->dev_id > max_id)
3466 max_id = rbd_dev->dev_id;
3467 }
3468 spin_unlock(&rbd_dev_list_lock);
3469
3470 /*
3471 * The max id could have been updated by rbd_dev_id_get(), in
3472 * which case it now accurately reflects the new maximum.
3473 * Be careful not to overwrite the maximum value in that
3474 * case.
3475 */
3476 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3477 dout(" max dev id has been reset\n");
3478 }
3479
3480 /*
3481 * Skips over white space at *buf, and updates *buf to point to the
3482 * first found non-space character (if any). Returns the length of
3483 * the token (string of non-white space characters) found. Note
3484 * that *buf must be terminated with '\0'.
3485 */
3486 static inline size_t next_token(const char **buf)
3487 {
3488 /*
3489 * These are the characters that produce nonzero for
3490 * isspace() in the "C" and "POSIX" locales.
3491 */
3492 const char *spaces = " \f\n\r\t\v";
3493
3494 *buf += strspn(*buf, spaces); /* Find start of token */
3495
3496 return strcspn(*buf, spaces); /* Return token length */
3497 }
3498
3499 /*
3500 * Finds the next token in *buf, and if the provided token buffer is
3501 * big enough, copies the found token into it. The result, if
3502 * copied, is guaranteed to be terminated with '\0'. Note that *buf
3503 * must be terminated with '\0' on entry.
3504 *
3505 * Returns the length of the token found (not including the '\0').
3506 * Return value will be 0 if no token is found, and it will be >=
3507 * token_size if the token would not fit.
3508 *
3509 * The *buf pointer will be updated to point beyond the end of the
3510 * found token. Note that this occurs even if the token buffer is
3511 * too small to hold it.
3512 */
3513 static inline size_t copy_token(const char **buf,
3514 char *token,
3515 size_t token_size)
3516 {
3517 size_t len;
3518
3519 len = next_token(buf);
3520 if (len < token_size) {
3521 memcpy(token, *buf, len);
3522 *(token + len) = '\0';
3523 }
3524 *buf += len;
3525
3526 return len;
3527 }
3528
3529 /*
3530 * Finds the next token in *buf, dynamically allocates a buffer big
3531 * enough to hold a copy of it, and copies the token into the new
3532 * buffer. The copy is guaranteed to be terminated with '\0'. Note
3533 * that a duplicate buffer is created even for a zero-length token.
3534 *
3535 * Returns a pointer to the newly-allocated duplicate, or a null
3536 * pointer if memory for the duplicate was not available. If
3537 * the lenp argument is a non-null pointer, the length of the token
3538 * (not including the '\0') is returned in *lenp.
3539 *
3540 * If successful, the *buf pointer will be updated to point beyond
3541 * the end of the found token.
3542 *
3543 * Note: uses GFP_KERNEL for allocation.
3544 */
3545 static inline char *dup_token(const char **buf, size_t *lenp)
3546 {
3547 char *dup;
3548 size_t len;
3549
3550 len = next_token(buf);
3551 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
3552 if (!dup)
3553 return NULL;
3554 *(dup + len) = '\0';
3555 *buf += len;
3556
3557 if (lenp)
3558 *lenp = len;
3559
3560 return dup;
3561 }
3562
3563 /*
3564 * Parse the options provided for an "rbd add" (i.e., rbd image
3565 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
3566 * and the data written is passed here via a NUL-terminated buffer.
3567 * Returns 0 if successful or an error code otherwise.
3568 *
3569 * The information extracted from these options is recorded in
3570 * the other parameters which return dynamically-allocated
3571 * structures:
3572 * ceph_opts
3573 * The address of a pointer that will refer to a ceph options
3574 * structure. Caller must release the returned pointer using
3575 * ceph_destroy_options() when it is no longer needed.
3576 * rbd_opts
3577 * Address of an rbd options pointer. Fully initialized by
3578 * this function; caller must release with kfree().
3579 * spec
3580 * Address of an rbd image specification pointer. Fully
3581 * initialized by this function based on parsed options.
3582 * Caller must release with rbd_spec_put().
3583 *
3584 * The options passed take this form:
3585 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3586 * where:
3587 * <mon_addrs>
3588 * A comma-separated list of one or more monitor addresses.
3589 * A monitor address is an ip address, optionally followed
3590 * by a port number (separated by a colon).
3591 * I.e.: ip1[:port1][,ip2[:port2]...]
3592 * <options>
3593 * A comma-separated list of ceph and/or rbd options.
3594 * <pool_name>
3595 * The name of the rados pool containing the rbd image.
3596 * <image_name>
3597 * The name of the image in that pool to map.
3598 * <snap_id>
3599 * An optional snapshot id. If provided, the mapping will
3600 * present data from the image at the time that snapshot was
3601 * created. The image head is used if no snapshot id is
3602 * provided. Snapshot mappings are always read-only.
3603 */
3604 static int rbd_add_parse_args(const char *buf,
3605 struct ceph_options **ceph_opts,
3606 struct rbd_options **opts,
3607 struct rbd_spec **rbd_spec)
3608 {
3609 size_t len;
3610 char *options;
3611 const char *mon_addrs;
3612 size_t mon_addrs_size;
3613 struct rbd_spec *spec = NULL;
3614 struct rbd_options *rbd_opts = NULL;
3615 struct ceph_options *copts;
3616 int ret;
3617
3618 /* The first four tokens are required */
3619
3620 len = next_token(&buf);
3621 if (!len) {
3622 rbd_warn(NULL, "no monitor address(es) provided");
3623 return -EINVAL;
3624 }
3625 mon_addrs = buf;
3626 mon_addrs_size = len + 1;
3627 buf += len;
3628
3629 ret = -EINVAL;
3630 options = dup_token(&buf, NULL);
3631 if (!options)
3632 return -ENOMEM;
3633 if (!*options) {
3634 rbd_warn(NULL, "no options provided");
3635 goto out_err;
3636 }
3637
3638 spec = rbd_spec_alloc();
3639 if (!spec)
3640 goto out_mem;
3641
3642 spec->pool_name = dup_token(&buf, NULL);
3643 if (!spec->pool_name)
3644 goto out_mem;
3645 if (!*spec->pool_name) {
3646 rbd_warn(NULL, "no pool name provided");
3647 goto out_err;
3648 }
3649
3650 spec->image_name = dup_token(&buf, NULL);
3651 if (!spec->image_name)
3652 goto out_mem;
3653 if (!*spec->image_name) {
3654 rbd_warn(NULL, "no image name provided");
3655 goto out_err;
3656 }
3657
3658 /*
3659 * Snapshot name is optional; default is to use "-"
3660 * (indicating the head/no snapshot).
3661 */
3662 len = next_token(&buf);
3663 if (!len) {
3664 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3665 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
3666 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
3667 ret = -ENAMETOOLONG;
3668 goto out_err;
3669 }
3670 spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
3671 if (!spec->snap_name)
3672 goto out_mem;
3673 *(spec->snap_name + len) = '\0';
3674
3675 /* Initialize all rbd options to the defaults */
3676
3677 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3678 if (!rbd_opts)
3679 goto out_mem;
3680
3681 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
3682
3683 copts = ceph_parse_options(options, mon_addrs,
3684 mon_addrs + mon_addrs_size - 1,
3685 parse_rbd_opts_token, rbd_opts);
3686 if (IS_ERR(copts)) {
3687 ret = PTR_ERR(copts);
3688 goto out_err;
3689 }
3690 kfree(options);
3691
3692 *ceph_opts = copts;
3693 *opts = rbd_opts;
3694 *rbd_spec = spec;
3695
3696 return 0;
3697 out_mem:
3698 ret = -ENOMEM;
3699 out_err:
3700 kfree(rbd_opts);
3701 rbd_spec_put(spec);
3702 kfree(options);
3703
3704 return ret;
3705 }
3706
3707 /*
3708 * An rbd format 2 image has a unique identifier, distinct from the
3709 * name given to it by the user. Internally, that identifier is
3710 * what's used to specify the names of objects related to the image.
3711 *
3712 * A special "rbd id" object is used to map an rbd image name to its
3713 * id. If that object doesn't exist, then there is no v2 rbd image
3714 * with the supplied name.
3715 *
3716 * This function will record the given rbd_dev's image_id field if
3717 * it can be determined, and in that case will return 0. If any
3718 * errors occur a negative errno will be returned and the rbd_dev's
3719 * image_id field will be unchanged (and should be NULL).
3720 */
3721 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3722 {
3723 int ret;
3724 size_t size;
3725 char *object_name;
3726 void *response;
3727 void *p;
3728
3729 /*
3730 * When probing a parent image, the image id is already
3731 * known (and the image name likely is not). There's no
3732 * need to fetch the image id again in this case.
3733 */
3734 if (rbd_dev->spec->image_id)
3735 return 0;
3736
3737 /*
3738 * First, see if the format 2 image id file exists, and if
3739 * so, get the image's persistent id from it.
3740 */
3741 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
3742 object_name = kmalloc(size, GFP_NOIO);
3743 if (!object_name)
3744 return -ENOMEM;
3745 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
3746 dout("rbd id object name is %s\n", object_name);
3747
3748 /* Response will be an encoded string, which includes a length */
3749
3750 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3751 response = kzalloc(size, GFP_NOIO);
3752 if (!response) {
3753 ret = -ENOMEM;
3754 goto out;
3755 }
3756
3757 ret = rbd_req_sync_exec(rbd_dev, object_name,
3758 "rbd", "get_id",
3759 NULL, 0,
3760 response, RBD_IMAGE_ID_LEN_MAX, NULL);
3761 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
3762 if (ret < 0)
3763 goto out;
3764 ret = 0; /* rbd_req_sync_exec() can return positive */
3765
3766 p = response;
3767 rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
3768 p + RBD_IMAGE_ID_LEN_MAX,
3769 NULL, GFP_NOIO);
3770 if (IS_ERR(rbd_dev->spec->image_id)) {
3771 ret = PTR_ERR(rbd_dev->spec->image_id);
3772 rbd_dev->spec->image_id = NULL;
3773 } else {
3774 dout("image_id is %s\n", rbd_dev->spec->image_id);
3775 }
3776 out:
3777 kfree(response);
3778 kfree(object_name);
3779
3780 return ret;
3781 }
3782
3783 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3784 {
3785 int ret;
3786 size_t size;
3787
3788 /* Version 1 images have no id; empty string is used */
3789
3790 rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3791 if (!rbd_dev->spec->image_id)
3792 return -ENOMEM;
3793
3794 /* Record the header object name for this rbd image. */
3795
3796 size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
3797 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3798 if (!rbd_dev->header_name) {
3799 ret = -ENOMEM;
3800 goto out_err;
3801 }
3802 sprintf(rbd_dev->header_name, "%s%s",
3803 rbd_dev->spec->image_name, RBD_SUFFIX);
3804
3805 /* Populate rbd image metadata */
3806
3807 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3808 if (ret < 0)
3809 goto out_err;
3810
3811 /* Version 1 images have no parent (no layering) */
3812
3813 rbd_dev->parent_spec = NULL;
3814 rbd_dev->parent_overlap = 0;
3815
3816 rbd_dev->image_format = 1;
3817
3818 dout("discovered version 1 image, header name is %s\n",
3819 rbd_dev->header_name);
3820
3821 return 0;
3822
3823 out_err:
3824 kfree(rbd_dev->header_name);
3825 rbd_dev->header_name = NULL;
3826 kfree(rbd_dev->spec->image_id);
3827 rbd_dev->spec->image_id = NULL;
3828
3829 return ret;
3830 }
3831
3832 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3833 {
3834 size_t size;
3835 int ret;
3836 u64 ver = 0;
3837
3838 /*
3839 * Image id was filled in by the caller. Record the header
3840 * object name for this rbd image.
3841 */
3842 size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
3843 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3844 if (!rbd_dev->header_name)
3845 return -ENOMEM;
3846 sprintf(rbd_dev->header_name, "%s%s",
3847 RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
3848
3849 /* Get the size and object order for the image */
3850
3851 ret = rbd_dev_v2_image_size(rbd_dev);
3852 if (ret < 0)
3853 goto out_err;
3854
3855 /* Get the object prefix (a.k.a. block_name) for the image */
3856
3857 ret = rbd_dev_v2_object_prefix(rbd_dev);
3858 if (ret < 0)
3859 goto out_err;
3860
3861 /* Get the and check features for the image */
3862
3863 ret = rbd_dev_v2_features(rbd_dev);
3864 if (ret < 0)
3865 goto out_err;
3866
3867 /* If the image supports layering, get the parent info */
3868
3869 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3870 ret = rbd_dev_v2_parent_info(rbd_dev);
3871 if (ret < 0)
3872 goto out_err;
3873 }
3874
3875 /* crypto and compression type aren't (yet) supported for v2 images */
3876
3877 rbd_dev->header.crypt_type = 0;
3878 rbd_dev->header.comp_type = 0;
3879
3880 /* Get the snapshot context, plus the header version */
3881
3882 ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3883 if (ret)
3884 goto out_err;
3885 rbd_dev->header.obj_version = ver;
3886
3887 rbd_dev->image_format = 2;
3888
3889 dout("discovered version 2 image, header name is %s\n",
3890 rbd_dev->header_name);
3891
3892 return 0;
3893 out_err:
3894 rbd_dev->parent_overlap = 0;
3895 rbd_spec_put(rbd_dev->parent_spec);
3896 rbd_dev->parent_spec = NULL;
3897 kfree(rbd_dev->header_name);
3898 rbd_dev->header_name = NULL;
3899 kfree(rbd_dev->header.object_prefix);
3900 rbd_dev->header.object_prefix = NULL;
3901
3902 return ret;
3903 }
3904
3905 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
3906 {
3907 int ret;
3908
3909 /* no need to lock here, as rbd_dev is not registered yet */
3910 ret = rbd_dev_snaps_update(rbd_dev);
3911 if (ret)
3912 return ret;
3913
3914 ret = rbd_dev_probe_update_spec(rbd_dev);
3915 if (ret)
3916 goto err_out_snaps;
3917
3918 ret = rbd_dev_set_mapping(rbd_dev);
3919 if (ret)
3920 goto err_out_snaps;
3921
3922 /* generate unique id: find highest unique id, add one */
3923 rbd_dev_id_get(rbd_dev);
3924
3925 /* Fill in the device name, now that we have its id. */
3926 BUILD_BUG_ON(DEV_NAME_LEN
3927 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3928 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3929
3930 /* Get our block major device number. */
3931
3932 ret = register_blkdev(0, rbd_dev->name);
3933 if (ret < 0)
3934 goto err_out_id;
3935 rbd_dev->major = ret;
3936
3937 /* Set up the blkdev mapping. */
3938
3939 ret = rbd_init_disk(rbd_dev);
3940 if (ret)
3941 goto err_out_blkdev;
3942
3943 ret = rbd_bus_add_dev(rbd_dev);
3944 if (ret)
3945 goto err_out_disk;
3946
3947 /*
3948 * At this point cleanup in the event of an error is the job
3949 * of the sysfs code (initiated by rbd_bus_del_dev()).
3950 */
3951 down_write(&rbd_dev->header_rwsem);
3952 ret = rbd_dev_snaps_register(rbd_dev);
3953 up_write(&rbd_dev->header_rwsem);
3954 if (ret)
3955 goto err_out_bus;
3956
3957 ret = rbd_dev_header_watch_sync(rbd_dev, 1);
3958 if (ret)
3959 goto err_out_bus;
3960
3961 /* Everything's ready. Announce the disk to the world. */
3962
3963 add_disk(rbd_dev->disk);
3964
3965 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3966 (unsigned long long) rbd_dev->mapping.size);
3967
3968 return ret;
3969 err_out_bus:
3970 /* this will also clean up rest of rbd_dev stuff */
3971
3972 rbd_bus_del_dev(rbd_dev);
3973
3974 return ret;
3975 err_out_disk:
3976 rbd_free_disk(rbd_dev);
3977 err_out_blkdev:
3978 unregister_blkdev(rbd_dev->major, rbd_dev->name);
3979 err_out_id:
3980 rbd_dev_id_put(rbd_dev);
3981 err_out_snaps:
3982 rbd_remove_all_snaps(rbd_dev);
3983
3984 return ret;
3985 }
3986
3987 /*
3988 * Probe for the existence of the header object for the given rbd
3989 * device. For format 2 images this includes determining the image
3990 * id.
3991 */
3992 static int rbd_dev_probe(struct rbd_device *rbd_dev)
3993 {
3994 int ret;
3995
3996 /*
3997 * Get the id from the image id object. If it's not a
3998 * format 2 image, we'll get ENOENT back, and we'll assume
3999 * it's a format 1 image.
4000 */
4001 ret = rbd_dev_image_id(rbd_dev);
4002 if (ret)
4003 ret = rbd_dev_v1_probe(rbd_dev);
4004 else
4005 ret = rbd_dev_v2_probe(rbd_dev);
4006 if (ret) {
4007 dout("probe failed, returning %d\n", ret);
4008
4009 return ret;
4010 }
4011
4012 ret = rbd_dev_probe_finish(rbd_dev);
4013 if (ret)
4014 rbd_header_free(&rbd_dev->header);
4015
4016 return ret;
4017 }
4018
4019 static ssize_t rbd_add(struct bus_type *bus,
4020 const char *buf,
4021 size_t count)
4022 {
4023 struct rbd_device *rbd_dev = NULL;
4024 struct ceph_options *ceph_opts = NULL;
4025 struct rbd_options *rbd_opts = NULL;
4026 struct rbd_spec *spec = NULL;
4027 struct rbd_client *rbdc;
4028 struct ceph_osd_client *osdc;
4029 int rc = -ENOMEM;
4030
4031 if (!try_module_get(THIS_MODULE))
4032 return -ENODEV;
4033
4034 /* parse add command */
4035 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4036 if (rc < 0)
4037 goto err_out_module;
4038
4039 rbdc = rbd_get_client(ceph_opts);
4040 if (IS_ERR(rbdc)) {
4041 rc = PTR_ERR(rbdc);
4042 goto err_out_args;
4043 }
4044 ceph_opts = NULL; /* rbd_dev client now owns this */
4045
4046 /* pick the pool */
4047 osdc = &rbdc->client->osdc;
4048 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4049 if (rc < 0)
4050 goto err_out_client;
4051 spec->pool_id = (u64) rc;
4052
4053 /* The ceph file layout needs to fit pool id in 32 bits */
4054
4055 if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4056 rc = -EIO;
4057 goto err_out_client;
4058 }
4059
4060 rbd_dev = rbd_dev_create(rbdc, spec);
4061 if (!rbd_dev)
4062 goto err_out_client;
4063 rbdc = NULL; /* rbd_dev now owns this */
4064 spec = NULL; /* rbd_dev now owns this */
4065
4066 rbd_dev->mapping.read_only = rbd_opts->read_only;
4067 kfree(rbd_opts);
4068 rbd_opts = NULL; /* done with this */
4069
4070 rc = rbd_dev_probe(rbd_dev);
4071 if (rc < 0)
4072 goto err_out_rbd_dev;
4073
4074 return count;
4075 err_out_rbd_dev:
4076 rbd_dev_destroy(rbd_dev);
4077 err_out_client:
4078 rbd_put_client(rbdc);
4079 err_out_args:
4080 if (ceph_opts)
4081 ceph_destroy_options(ceph_opts);
4082 kfree(rbd_opts);
4083 rbd_spec_put(spec);
4084 err_out_module:
4085 module_put(THIS_MODULE);
4086
4087 dout("Error adding device %s\n", buf);
4088
4089 return (ssize_t) rc;
4090 }
4091
4092 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4093 {
4094 struct list_head *tmp;
4095 struct rbd_device *rbd_dev;
4096
4097 spin_lock(&rbd_dev_list_lock);
4098 list_for_each(tmp, &rbd_dev_list) {
4099 rbd_dev = list_entry(tmp, struct rbd_device, node);
4100 if (rbd_dev->dev_id == dev_id) {
4101 spin_unlock(&rbd_dev_list_lock);
4102 return rbd_dev;
4103 }
4104 }
4105 spin_unlock(&rbd_dev_list_lock);
4106 return NULL;
4107 }
4108
4109 static void rbd_dev_release(struct device *dev)
4110 {
4111 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4112
4113 if (rbd_dev->watch_request) {
4114 struct ceph_client *client = rbd_dev->rbd_client->client;
4115
4116 ceph_osdc_unregister_linger_request(&client->osdc,
4117 rbd_dev->watch_request);
4118 }
4119 if (rbd_dev->watch_event)
4120 rbd_dev_header_watch_sync(rbd_dev, 0);
4121
4122 /* clean up and free blkdev */
4123 rbd_free_disk(rbd_dev);
4124 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4125
4126 /* release allocated disk header fields */
4127 rbd_header_free(&rbd_dev->header);
4128
4129 /* done with the id, and with the rbd_dev */
4130 rbd_dev_id_put(rbd_dev);
4131 rbd_assert(rbd_dev->rbd_client != NULL);
4132 rbd_dev_destroy(rbd_dev);
4133
4134 /* release module ref */
4135 module_put(THIS_MODULE);
4136 }
4137
4138 static ssize_t rbd_remove(struct bus_type *bus,
4139 const char *buf,
4140 size_t count)
4141 {
4142 struct rbd_device *rbd_dev = NULL;
4143 int target_id, rc;
4144 unsigned long ul;
4145 int ret = count;
4146
4147 rc = strict_strtoul(buf, 10, &ul);
4148 if (rc)
4149 return rc;
4150
4151 /* convert to int; abort if we lost anything in the conversion */
4152 target_id = (int) ul;
4153 if (target_id != ul)
4154 return -EINVAL;
4155
4156 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4157
4158 rbd_dev = __rbd_get_dev(target_id);
4159 if (!rbd_dev) {
4160 ret = -ENOENT;
4161 goto done;
4162 }
4163
4164 if (rbd_dev->open_count) {
4165 ret = -EBUSY;
4166 goto done;
4167 }
4168
4169 rbd_remove_all_snaps(rbd_dev);
4170 rbd_bus_del_dev(rbd_dev);
4171
4172 done:
4173 mutex_unlock(&ctl_mutex);
4174
4175 return ret;
4176 }
4177
4178 /*
4179 * create control files in sysfs
4180 * /sys/bus/rbd/...
4181 */
4182 static int rbd_sysfs_init(void)
4183 {
4184 int ret;
4185
4186 ret = device_register(&rbd_root_dev);
4187 if (ret < 0)
4188 return ret;
4189
4190 ret = bus_register(&rbd_bus_type);
4191 if (ret < 0)
4192 device_unregister(&rbd_root_dev);
4193
4194 return ret;
4195 }
4196
4197 static void rbd_sysfs_cleanup(void)
4198 {
4199 bus_unregister(&rbd_bus_type);
4200 device_unregister(&rbd_root_dev);
4201 }
4202
4203 int __init rbd_init(void)
4204 {
4205 int rc;
4206
4207 rc = rbd_sysfs_init();
4208 if (rc)
4209 return rc;
4210 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
4211 return 0;
4212 }
4213
4214 void __exit rbd_exit(void)
4215 {
4216 rbd_sysfs_cleanup();
4217 }
4218
4219 module_init(rbd_init);
4220 module_exit(rbd_exit);
4221
4222 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4223 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4224 MODULE_DESCRIPTION("rados block device");
4225
4226 /* following authorship retained from original osdblk.c */
4227 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4228
4229 MODULE_LICENSE("GPL");
This page took 0.171218 seconds and 6 git commands to generate.