rbd: define rbd_assert()
[deliverable/linux.git] / drivers / block / rbd.c
CommitLineData
602adf40
YS
1/*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
dfc5606d 24 For usage instructions, please refer to:
602adf40 25
dfc5606d 26 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
59c2be1e 34#include <linux/parser.h>
602adf40
YS
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
aafb230e
AE
44#define RBD_DEBUG /* Activate rbd_assert() calls */
45
593a9e7b
AE
46/*
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
51 */
52#define SECTOR_SHIFT 9
53#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
54
df111be6
AE
55/* It might be useful to have this defined elsewhere too */
56
57#define U64_MAX ((u64) (~0ULL))
58
f0f8cef5
AE
59#define RBD_DRV_NAME "rbd"
60#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
61
62#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
63
602adf40
YS
64#define RBD_MAX_SNAP_NAME_LEN 32
65#define RBD_MAX_OPT_LEN 1024
66
67#define RBD_SNAP_HEAD_NAME "-"
68
81a89793
AE
69/*
70 * An RBD device name will be "rbd#", where the "rbd" comes from
71 * RBD_DRV_NAME above, and # is a unique integer identifier.
72 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
73 * enough to hold all possible device names.
74 */
602adf40 75#define DEV_NAME_LEN 32
81a89793 76#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40 77
cc0538b6 78#define RBD_READ_ONLY_DEFAULT false
59c2be1e 79
602adf40
YS
80/*
81 * block device image metadata (in-memory version)
82 */
83struct rbd_image_header {
84 u64 image_size;
849b4260 85 char *object_prefix;
602adf40
YS
86 __u8 obj_order;
87 __u8 crypt_type;
88 __u8 comp_type;
602adf40 89 struct ceph_snap_context *snapc;
602adf40
YS
90 u32 total_snaps;
91
92 char *snap_names;
93 u64 *snap_sizes;
59c2be1e
YS
94
95 u64 obj_version;
96};
97
98struct rbd_options {
cc0538b6 99 bool read_only;
602adf40
YS
100};
101
102/*
f0f8cef5 103 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
104 */
105struct rbd_client {
106 struct ceph_client *client;
107 struct kref kref;
108 struct list_head node;
109};
110
111/*
f0f8cef5 112 * a request completion status
602adf40 113 */
1fec7093
YS
114struct rbd_req_status {
115 int done;
116 int rc;
117 u64 bytes;
118};
119
120/*
121 * a collection of requests
122 */
123struct rbd_req_coll {
124 int total;
125 int num_done;
126 struct kref kref;
127 struct rbd_req_status status[0];
602adf40
YS
128};
129
f0f8cef5
AE
130/*
131 * a single io request
132 */
133struct rbd_request {
134 struct request *rq; /* blk layer request */
135 struct bio *bio; /* cloned bio */
136 struct page **pages; /* list of used pages */
137 u64 len;
138 int coll_index;
139 struct rbd_req_coll *coll;
140};
141
dfc5606d
YS
142struct rbd_snap {
143 struct device dev;
144 const char *name;
3591538f 145 u64 size;
dfc5606d
YS
146 struct list_head node;
147 u64 id;
148};
149
602adf40
YS
150/*
151 * a single device
152 */
153struct rbd_device {
de71a297 154 int dev_id; /* blkdev unique id */
602adf40
YS
155
156 int major; /* blkdev assigned major */
157 struct gendisk *disk; /* blkdev's gendisk and rq */
158 struct request_queue *q;
159
f8c38929 160 struct rbd_options rbd_opts;
602adf40
YS
161 struct rbd_client *rbd_client;
162
163 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
164
165 spinlock_t lock; /* queue lock */
166
167 struct rbd_image_header header;
0bed54dc
AE
168 char *image_name;
169 size_t image_name_len;
170 char *header_name;
d22f76e7 171 char *pool_name;
9bb2f334 172 int pool_id;
602adf40 173
59c2be1e
YS
174 struct ceph_osd_event *watch_event;
175 struct ceph_osd_request *watch_request;
176
c666601a
JD
177 /* protects updating the header */
178 struct rw_semaphore header_rwsem;
e88a36ec 179 /* name of the snapshot this device reads from */
820a5f3e 180 char *snap_name;
e88a36ec 181 /* id of the snapshot this device reads from */
77dfe99f 182 u64 snap_id; /* current snapshot id */
e88a36ec
JD
183 /* whether the snap_id this device reads from still exists */
184 bool snap_exists;
cc0538b6 185 bool read_only;
602adf40
YS
186
187 struct list_head node;
dfc5606d
YS
188
189 /* list of snapshots */
190 struct list_head snaps;
191
192 /* sysfs related */
193 struct device dev;
194};
195
602adf40 196static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 197
602adf40 198static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
199static DEFINE_SPINLOCK(rbd_dev_list_lock);
200
432b8587
AE
201static LIST_HEAD(rbd_client_list); /* clients */
202static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 203
dfc5606d
YS
204static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
205static void rbd_dev_release(struct device *dev);
dfc5606d
YS
206static ssize_t rbd_snap_add(struct device *dev,
207 struct device_attribute *attr,
208 const char *buf,
209 size_t count);
14e7085d 210static void __rbd_remove_snap_dev(struct rbd_snap *snap);
dfc5606d 211
f0f8cef5
AE
212static ssize_t rbd_add(struct bus_type *bus, const char *buf,
213 size_t count);
214static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
215 size_t count);
216
217static struct bus_attribute rbd_bus_attrs[] = {
218 __ATTR(add, S_IWUSR, NULL, rbd_add),
219 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
220 __ATTR_NULL
221};
222
223static struct bus_type rbd_bus_type = {
224 .name = "rbd",
225 .bus_attrs = rbd_bus_attrs,
226};
227
228static void rbd_root_dev_release(struct device *dev)
229{
230}
231
232static struct device rbd_root_dev = {
233 .init_name = "rbd",
234 .release = rbd_root_dev_release,
235};
236
aafb230e
AE
237#ifdef RBD_DEBUG
238#define rbd_assert(expr) \
239 if (unlikely(!(expr))) { \
240 printk(KERN_ERR "\nAssertion failure in %s() " \
241 "at line %d:\n\n" \
242 "\trbd_assert(%s);\n\n", \
243 __func__, __LINE__, #expr); \
244 BUG(); \
245 }
246#else /* !RBD_DEBUG */
247# define rbd_assert(expr) ((void) 0)
248#endif /* !RBD_DEBUG */
dfc5606d 249
dfc5606d
YS
250static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
251{
252 return get_device(&rbd_dev->dev);
253}
254
255static void rbd_put_dev(struct rbd_device *rbd_dev)
256{
257 put_device(&rbd_dev->dev);
258}
602adf40 259
1fe5e993 260static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
59c2be1e 261
602adf40
YS
262static int rbd_open(struct block_device *bdev, fmode_t mode)
263{
f0f8cef5 264 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
602adf40 265
602adf40
YS
266 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
267 return -EROFS;
268
340c7a2b
AE
269 rbd_get_dev(rbd_dev);
270 set_device_ro(bdev, rbd_dev->read_only);
271
602adf40
YS
272 return 0;
273}
274
dfc5606d
YS
275static int rbd_release(struct gendisk *disk, fmode_t mode)
276{
277 struct rbd_device *rbd_dev = disk->private_data;
278
279 rbd_put_dev(rbd_dev);
280
281 return 0;
282}
283
602adf40
YS
284static const struct block_device_operations rbd_bd_ops = {
285 .owner = THIS_MODULE,
286 .open = rbd_open,
dfc5606d 287 .release = rbd_release,
602adf40
YS
288};
289
290/*
291 * Initialize an rbd client instance.
43ae4701 292 * We own *ceph_opts.
602adf40 293 */
f8c38929 294static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
295{
296 struct rbd_client *rbdc;
297 int ret = -ENOMEM;
298
299 dout("rbd_client_create\n");
300 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
301 if (!rbdc)
302 goto out_opt;
303
304 kref_init(&rbdc->kref);
305 INIT_LIST_HEAD(&rbdc->node);
306
bc534d86
AE
307 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
308
43ae4701 309 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 310 if (IS_ERR(rbdc->client))
bc534d86 311 goto out_mutex;
43ae4701 312 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
313
314 ret = ceph_open_session(rbdc->client);
315 if (ret < 0)
316 goto out_err;
317
432b8587 318 spin_lock(&rbd_client_list_lock);
602adf40 319 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 320 spin_unlock(&rbd_client_list_lock);
602adf40 321
bc534d86
AE
322 mutex_unlock(&ctl_mutex);
323
602adf40
YS
324 dout("rbd_client_create created %p\n", rbdc);
325 return rbdc;
326
327out_err:
328 ceph_destroy_client(rbdc->client);
bc534d86
AE
329out_mutex:
330 mutex_unlock(&ctl_mutex);
602adf40
YS
331 kfree(rbdc);
332out_opt:
43ae4701
AE
333 if (ceph_opts)
334 ceph_destroy_options(ceph_opts);
28f259b7 335 return ERR_PTR(ret);
602adf40
YS
336}
337
338/*
1f7ba331
AE
339 * Find a ceph client with specific addr and configuration. If
340 * found, bump its reference count.
602adf40 341 */
1f7ba331 342static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
343{
344 struct rbd_client *client_node;
1f7ba331 345 bool found = false;
602adf40 346
43ae4701 347 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
348 return NULL;
349
1f7ba331
AE
350 spin_lock(&rbd_client_list_lock);
351 list_for_each_entry(client_node, &rbd_client_list, node) {
352 if (!ceph_compare_options(ceph_opts, client_node->client)) {
353 kref_get(&client_node->kref);
354 found = true;
355 break;
356 }
357 }
358 spin_unlock(&rbd_client_list_lock);
359
360 return found ? client_node : NULL;
602adf40
YS
361}
362
59c2be1e
YS
363/*
364 * mount options
365 */
366enum {
59c2be1e
YS
367 Opt_last_int,
368 /* int args above */
369 Opt_last_string,
370 /* string args above */
cc0538b6
AE
371 Opt_read_only,
372 Opt_read_write,
373 /* Boolean args above */
374 Opt_last_bool,
59c2be1e
YS
375};
376
43ae4701 377static match_table_t rbd_opts_tokens = {
59c2be1e
YS
378 /* int args above */
379 /* string args above */
cc0538b6
AE
380 {Opt_read_only, "read_only"},
381 {Opt_read_only, "ro"}, /* Alternate spelling */
382 {Opt_read_write, "read_write"},
383 {Opt_read_write, "rw"}, /* Alternate spelling */
384 /* Boolean args above */
59c2be1e
YS
385 {-1, NULL}
386};
387
388static int parse_rbd_opts_token(char *c, void *private)
389{
43ae4701 390 struct rbd_options *rbd_opts = private;
59c2be1e
YS
391 substring_t argstr[MAX_OPT_ARGS];
392 int token, intval, ret;
393
43ae4701 394 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
395 if (token < 0)
396 return -EINVAL;
397
398 if (token < Opt_last_int) {
399 ret = match_int(&argstr[0], &intval);
400 if (ret < 0) {
401 pr_err("bad mount option arg (not int) "
402 "at '%s'\n", c);
403 return ret;
404 }
405 dout("got int token %d val %d\n", token, intval);
406 } else if (token > Opt_last_int && token < Opt_last_string) {
407 dout("got string token %d val %s\n", token,
408 argstr[0].from);
cc0538b6
AE
409 } else if (token > Opt_last_string && token < Opt_last_bool) {
410 dout("got Boolean token %d\n", token);
59c2be1e
YS
411 } else {
412 dout("got token %d\n", token);
413 }
414
415 switch (token) {
cc0538b6
AE
416 case Opt_read_only:
417 rbd_opts->read_only = true;
418 break;
419 case Opt_read_write:
420 rbd_opts->read_only = false;
421 break;
59c2be1e 422 default:
aafb230e
AE
423 rbd_assert(false);
424 break;
59c2be1e
YS
425 }
426 return 0;
427}
428
602adf40
YS
429/*
430 * Get a ceph client with specific addr and configuration, if one does
431 * not exist create it.
432 */
f8c38929
AE
433static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
434 size_t mon_addr_len, char *options)
602adf40 435{
f8c38929 436 struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
43ae4701 437 struct ceph_options *ceph_opts;
f8c38929 438 struct rbd_client *rbdc;
59c2be1e 439
cc0538b6 440 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
602adf40 441
43ae4701
AE
442 ceph_opts = ceph_parse_options(options, mon_addr,
443 mon_addr + mon_addr_len,
444 parse_rbd_opts_token, rbd_opts);
f8c38929
AE
445 if (IS_ERR(ceph_opts))
446 return PTR_ERR(ceph_opts);
602adf40 447
1f7ba331 448 rbdc = rbd_client_find(ceph_opts);
602adf40 449 if (rbdc) {
602adf40 450 /* using an existing client */
43ae4701 451 ceph_destroy_options(ceph_opts);
f8c38929
AE
452 } else {
453 rbdc = rbd_client_create(ceph_opts);
454 if (IS_ERR(rbdc))
455 return PTR_ERR(rbdc);
602adf40 456 }
f8c38929 457 rbd_dev->rbd_client = rbdc;
602adf40 458
f8c38929 459 return 0;
602adf40
YS
460}
461
462/*
463 * Destroy ceph client
d23a4b3f 464 *
432b8587 465 * Caller must hold rbd_client_list_lock.
602adf40
YS
466 */
467static void rbd_client_release(struct kref *kref)
468{
469 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
470
471 dout("rbd_release_client %p\n", rbdc);
cd9d9f5d 472 spin_lock(&rbd_client_list_lock);
602adf40 473 list_del(&rbdc->node);
cd9d9f5d 474 spin_unlock(&rbd_client_list_lock);
602adf40
YS
475
476 ceph_destroy_client(rbdc->client);
477 kfree(rbdc);
478}
479
480/*
481 * Drop reference to ceph client node. If it's not referenced anymore, release
482 * it.
483 */
484static void rbd_put_client(struct rbd_device *rbd_dev)
485{
486 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
487 rbd_dev->rbd_client = NULL;
602adf40
YS
488}
489
1fec7093
YS
490/*
491 * Destroy requests collection
492 */
493static void rbd_coll_release(struct kref *kref)
494{
495 struct rbd_req_coll *coll =
496 container_of(kref, struct rbd_req_coll, kref);
497
498 dout("rbd_coll_release %p\n", coll);
499 kfree(coll);
500}
602adf40 501
8e94af8e
AE
502static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
503{
103a150f
AE
504 size_t size;
505 u32 snap_count;
506
507 /* The header has to start with the magic rbd header text */
508 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
509 return false;
510
511 /*
512 * The size of a snapshot header has to fit in a size_t, and
513 * that limits the number of snapshots.
514 */
515 snap_count = le32_to_cpu(ondisk->snap_count);
516 size = SIZE_MAX - sizeof (struct ceph_snap_context);
517 if (snap_count > size / sizeof (__le64))
518 return false;
519
520 /*
521 * Not only that, but the size of the entire the snapshot
522 * header must also be representable in a size_t.
523 */
524 size -= snap_count * sizeof (__le64);
525 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
526 return false;
527
528 return true;
8e94af8e
AE
529}
530
602adf40
YS
531/*
532 * Create a new header structure, translate header format from the on-disk
533 * header.
534 */
535static int rbd_header_from_disk(struct rbd_image_header *header,
4156d998 536 struct rbd_image_header_ondisk *ondisk)
602adf40 537{
ccece235 538 u32 snap_count;
58c17b0e 539 size_t len;
d2bb24e5 540 size_t size;
621901d6 541 u32 i;
602adf40 542
6a52325f
AE
543 memset(header, 0, sizeof (*header));
544
103a150f
AE
545 snap_count = le32_to_cpu(ondisk->snap_count);
546
58c17b0e
AE
547 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
548 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
6a52325f 549 if (!header->object_prefix)
602adf40 550 return -ENOMEM;
58c17b0e
AE
551 memcpy(header->object_prefix, ondisk->object_prefix, len);
552 header->object_prefix[len] = '\0';
00f1f36f 553
602adf40 554 if (snap_count) {
f785cc1d
AE
555 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
556
621901d6
AE
557 /* Save a copy of the snapshot names */
558
f785cc1d
AE
559 if (snap_names_len > (u64) SIZE_MAX)
560 return -EIO;
561 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
602adf40 562 if (!header->snap_names)
6a52325f 563 goto out_err;
f785cc1d
AE
564 /*
565 * Note that rbd_dev_v1_header_read() guarantees
566 * the ondisk buffer we're working with has
567 * snap_names_len bytes beyond the end of the
568 * snapshot id array, this memcpy() is safe.
569 */
570 memcpy(header->snap_names, &ondisk->snaps[snap_count],
571 snap_names_len);
6a52325f 572
621901d6
AE
573 /* Record each snapshot's size */
574
d2bb24e5
AE
575 size = snap_count * sizeof (*header->snap_sizes);
576 header->snap_sizes = kmalloc(size, GFP_KERNEL);
602adf40 577 if (!header->snap_sizes)
6a52325f 578 goto out_err;
621901d6
AE
579 for (i = 0; i < snap_count; i++)
580 header->snap_sizes[i] =
581 le64_to_cpu(ondisk->snaps[i].image_size);
602adf40 582 } else {
ccece235 583 WARN_ON(ondisk->snap_names_len);
602adf40
YS
584 header->snap_names = NULL;
585 header->snap_sizes = NULL;
586 }
849b4260 587
602adf40
YS
588 header->image_size = le64_to_cpu(ondisk->image_size);
589 header->obj_order = ondisk->options.order;
590 header->crypt_type = ondisk->options.crypt_type;
591 header->comp_type = ondisk->options.comp_type;
6a52325f
AE
592 header->total_snaps = snap_count;
593
621901d6
AE
594 /* Allocate and fill in the snapshot context */
595
6a52325f
AE
596 size = sizeof (struct ceph_snap_context);
597 size += snap_count * sizeof (header->snapc->snaps[0]);
598 header->snapc = kzalloc(size, GFP_KERNEL);
599 if (!header->snapc)
600 goto out_err;
602adf40
YS
601
602 atomic_set(&header->snapc->nref, 1);
505cbb9b 603 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 604 header->snapc->num_snaps = snap_count;
621901d6
AE
605 for (i = 0; i < snap_count; i++)
606 header->snapc->snaps[i] =
607 le64_to_cpu(ondisk->snaps[i].id);
602adf40
YS
608
609 return 0;
610
6a52325f 611out_err:
849b4260 612 kfree(header->snap_sizes);
ccece235 613 header->snap_sizes = NULL;
602adf40 614 kfree(header->snap_names);
ccece235 615 header->snap_names = NULL;
6a52325f
AE
616 kfree(header->object_prefix);
617 header->object_prefix = NULL;
ccece235 618
00f1f36f 619 return -ENOMEM;
602adf40
YS
620}
621
602adf40
YS
622static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
623 u64 *seq, u64 *size)
624{
625 int i;
626 char *p = header->snap_names;
627
00f1f36f
AE
628 for (i = 0; i < header->total_snaps; i++) {
629 if (!strcmp(snap_name, p)) {
602adf40 630
00f1f36f 631 /* Found it. Pass back its id and/or size */
602adf40 632
00f1f36f
AE
633 if (seq)
634 *seq = header->snapc->snaps[i];
635 if (size)
636 *size = header->snap_sizes[i];
637 return i;
638 }
639 p += strlen(p) + 1; /* Skip ahead to the next name */
640 }
641 return -ENOENT;
602adf40
YS
642}
643
0ce1a794 644static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
602adf40 645{
78dc447d 646 int ret;
602adf40 647
0ce1a794 648 down_write(&rbd_dev->header_rwsem);
602adf40 649
0ce1a794 650 if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
cc9d734c 651 sizeof (RBD_SNAP_HEAD_NAME))) {
0ce1a794 652 rbd_dev->snap_id = CEPH_NOSNAP;
e88a36ec 653 rbd_dev->snap_exists = false;
cc0538b6 654 rbd_dev->read_only = rbd_dev->rbd_opts.read_only;
602adf40 655 if (size)
78dc447d 656 *size = rbd_dev->header.image_size;
602adf40 657 } else {
78dc447d
AE
658 u64 snap_id = 0;
659
660 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
661 &snap_id, size);
602adf40
YS
662 if (ret < 0)
663 goto done;
78dc447d 664 rbd_dev->snap_id = snap_id;
e88a36ec 665 rbd_dev->snap_exists = true;
cc0538b6 666 rbd_dev->read_only = true; /* No choice for snapshots */
602adf40
YS
667 }
668
669 ret = 0;
670done:
0ce1a794 671 up_write(&rbd_dev->header_rwsem);
602adf40
YS
672 return ret;
673}
674
675static void rbd_header_free(struct rbd_image_header *header)
676{
849b4260 677 kfree(header->object_prefix);
d78fd7ae 678 header->object_prefix = NULL;
602adf40 679 kfree(header->snap_sizes);
d78fd7ae 680 header->snap_sizes = NULL;
849b4260 681 kfree(header->snap_names);
d78fd7ae 682 header->snap_names = NULL;
d1d25646 683 ceph_put_snap_context(header->snapc);
d78fd7ae 684 header->snapc = NULL;
602adf40
YS
685}
686
65ccfe21 687static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
602adf40 688{
65ccfe21
AE
689 char *name;
690 u64 segment;
691 int ret;
602adf40 692
65ccfe21
AE
693 name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
694 if (!name)
695 return NULL;
696 segment = offset >> rbd_dev->header.obj_order;
697 ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
698 rbd_dev->header.object_prefix, segment);
699 if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
700 pr_err("error formatting segment name for #%llu (%d)\n",
701 segment, ret);
702 kfree(name);
703 name = NULL;
704 }
602adf40 705
65ccfe21
AE
706 return name;
707}
602adf40 708
65ccfe21
AE
709static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
710{
711 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
602adf40 712
65ccfe21
AE
713 return offset & (segment_size - 1);
714}
715
716static u64 rbd_segment_length(struct rbd_device *rbd_dev,
717 u64 offset, u64 length)
718{
719 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
720
721 offset &= segment_size - 1;
722
aafb230e 723 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
724 if (offset + length > segment_size)
725 length = segment_size - offset;
726
727 return length;
602adf40
YS
728}
729
1fec7093
YS
730static int rbd_get_num_segments(struct rbd_image_header *header,
731 u64 ofs, u64 len)
732{
df111be6
AE
733 u64 start_seg;
734 u64 end_seg;
735
736 if (!len)
737 return 0;
738 if (len - 1 > U64_MAX - ofs)
739 return -ERANGE;
740
741 start_seg = ofs >> header->obj_order;
742 end_seg = (ofs + len - 1) >> header->obj_order;
743
1fec7093
YS
744 return end_seg - start_seg + 1;
745}
746
029bcbd8
JD
747/*
748 * returns the size of an object in the image
749 */
750static u64 rbd_obj_bytes(struct rbd_image_header *header)
751{
752 return 1 << header->obj_order;
753}
754
602adf40
YS
755/*
756 * bio helpers
757 */
758
759static void bio_chain_put(struct bio *chain)
760{
761 struct bio *tmp;
762
763 while (chain) {
764 tmp = chain;
765 chain = chain->bi_next;
766 bio_put(tmp);
767 }
768}
769
770/*
771 * zeros a bio chain, starting at specific offset
772 */
773static void zero_bio_chain(struct bio *chain, int start_ofs)
774{
775 struct bio_vec *bv;
776 unsigned long flags;
777 void *buf;
778 int i;
779 int pos = 0;
780
781 while (chain) {
782 bio_for_each_segment(bv, chain, i) {
783 if (pos + bv->bv_len > start_ofs) {
784 int remainder = max(start_ofs - pos, 0);
785 buf = bvec_kmap_irq(bv, &flags);
786 memset(buf + remainder, 0,
787 bv->bv_len - remainder);
85b5aaa6 788 bvec_kunmap_irq(buf, &flags);
602adf40
YS
789 }
790 pos += bv->bv_len;
791 }
792
793 chain = chain->bi_next;
794 }
795}
796
797/*
798 * bio_chain_clone - clone a chain of bios up to a certain length.
799 * might return a bio_pair that will need to be released.
800 */
801static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
802 struct bio_pair **bp,
803 int len, gfp_t gfpmask)
804{
542582fc
AE
805 struct bio *old_chain = *old;
806 struct bio *new_chain = NULL;
807 struct bio *tail;
602adf40
YS
808 int total = 0;
809
810 if (*bp) {
811 bio_pair_release(*bp);
812 *bp = NULL;
813 }
814
815 while (old_chain && (total < len)) {
542582fc
AE
816 struct bio *tmp;
817
602adf40
YS
818 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
819 if (!tmp)
820 goto err_out;
542582fc 821 gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
602adf40
YS
822
823 if (total + old_chain->bi_size > len) {
824 struct bio_pair *bp;
825
826 /*
827 * this split can only happen with a single paged bio,
828 * split_bio will BUG_ON if this is not the case
829 */
830 dout("bio_chain_clone split! total=%d remaining=%d"
bd919d45
AE
831 "bi_size=%u\n",
832 total, len - total, old_chain->bi_size);
602adf40
YS
833
834 /* split the bio. We'll release it either in the next
835 call, or it will have to be released outside */
593a9e7b 836 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
602adf40
YS
837 if (!bp)
838 goto err_out;
839
840 __bio_clone(tmp, &bp->bio1);
841
842 *next = &bp->bio2;
843 } else {
844 __bio_clone(tmp, old_chain);
845 *next = old_chain->bi_next;
846 }
847
848 tmp->bi_bdev = NULL;
602adf40 849 tmp->bi_next = NULL;
542582fc 850 if (new_chain)
602adf40 851 tail->bi_next = tmp;
542582fc
AE
852 else
853 new_chain = tmp;
854 tail = tmp;
602adf40
YS
855 old_chain = old_chain->bi_next;
856
857 total += tmp->bi_size;
858 }
859
aafb230e 860 rbd_assert(total == len);
602adf40 861
602adf40
YS
862 *old = old_chain;
863
864 return new_chain;
865
866err_out:
867 dout("bio_chain_clone with err\n");
868 bio_chain_put(new_chain);
869 return NULL;
870}
871
872/*
873 * helpers for osd request op vectors.
874 */
57cfc106
AE
875static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
876 int opcode, u32 payload_len)
602adf40 877{
57cfc106
AE
878 struct ceph_osd_req_op *ops;
879
880 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
881 if (!ops)
882 return NULL;
883
884 ops[0].op = opcode;
885
602adf40
YS
886 /*
887 * op extent offset and length will be set later on
888 * in calc_raw_layout()
889 */
57cfc106
AE
890 ops[0].payload_len = payload_len;
891
892 return ops;
602adf40
YS
893}
894
895static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
896{
897 kfree(ops);
898}
899
1fec7093
YS
900static void rbd_coll_end_req_index(struct request *rq,
901 struct rbd_req_coll *coll,
902 int index,
903 int ret, u64 len)
904{
905 struct request_queue *q;
906 int min, max, i;
907
bd919d45
AE
908 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
909 coll, index, ret, (unsigned long long) len);
1fec7093
YS
910
911 if (!rq)
912 return;
913
914 if (!coll) {
915 blk_end_request(rq, ret, len);
916 return;
917 }
918
919 q = rq->q;
920
921 spin_lock_irq(q->queue_lock);
922 coll->status[index].done = 1;
923 coll->status[index].rc = ret;
924 coll->status[index].bytes = len;
925 max = min = coll->num_done;
926 while (max < coll->total && coll->status[max].done)
927 max++;
928
929 for (i = min; i<max; i++) {
930 __blk_end_request(rq, coll->status[i].rc,
931 coll->status[i].bytes);
932 coll->num_done++;
933 kref_put(&coll->kref, rbd_coll_release);
934 }
935 spin_unlock_irq(q->queue_lock);
936}
937
938static void rbd_coll_end_req(struct rbd_request *req,
939 int ret, u64 len)
940{
941 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
942}
943
602adf40
YS
944/*
945 * Send ceph osd request
946 */
947static int rbd_do_request(struct request *rq,
0ce1a794 948 struct rbd_device *rbd_dev,
602adf40
YS
949 struct ceph_snap_context *snapc,
950 u64 snapid,
aded07ea 951 const char *object_name, u64 ofs, u64 len,
602adf40
YS
952 struct bio *bio,
953 struct page **pages,
954 int num_pages,
955 int flags,
956 struct ceph_osd_req_op *ops,
1fec7093
YS
957 struct rbd_req_coll *coll,
958 int coll_index,
602adf40 959 void (*rbd_cb)(struct ceph_osd_request *req,
59c2be1e
YS
960 struct ceph_msg *msg),
961 struct ceph_osd_request **linger_req,
962 u64 *ver)
602adf40
YS
963{
964 struct ceph_osd_request *req;
965 struct ceph_file_layout *layout;
966 int ret;
967 u64 bno;
968 struct timespec mtime = CURRENT_TIME;
969 struct rbd_request *req_data;
970 struct ceph_osd_request_head *reqhead;
1dbb4399 971 struct ceph_osd_client *osdc;
602adf40 972
602adf40 973 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1fec7093
YS
974 if (!req_data) {
975 if (coll)
976 rbd_coll_end_req_index(rq, coll, coll_index,
977 -ENOMEM, len);
978 return -ENOMEM;
979 }
980
981 if (coll) {
982 req_data->coll = coll;
983 req_data->coll_index = coll_index;
984 }
602adf40 985
bd919d45
AE
986 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
987 (unsigned long long) ofs, (unsigned long long) len);
602adf40 988
0ce1a794 989 osdc = &rbd_dev->rbd_client->client->osdc;
1dbb4399
AE
990 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
991 false, GFP_NOIO, pages, bio);
4ad12621 992 if (!req) {
4ad12621 993 ret = -ENOMEM;
602adf40
YS
994 goto done_pages;
995 }
996
997 req->r_callback = rbd_cb;
998
999 req_data->rq = rq;
1000 req_data->bio = bio;
1001 req_data->pages = pages;
1002 req_data->len = len;
1003
1004 req->r_priv = req_data;
1005
1006 reqhead = req->r_request->front.iov_base;
1007 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1008
aded07ea 1009 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
602adf40
YS
1010 req->r_oid_len = strlen(req->r_oid);
1011
1012 layout = &req->r_file_layout;
1013 memset(layout, 0, sizeof(*layout));
1014 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1015 layout->fl_stripe_count = cpu_to_le32(1);
1016 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
0ce1a794 1017 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
1dbb4399
AE
1018 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1019 req, ops);
602adf40
YS
1020
1021 ceph_osdc_build_request(req, ofs, &len,
1022 ops,
1023 snapc,
1024 &mtime,
1025 req->r_oid, req->r_oid_len);
602adf40 1026
59c2be1e 1027 if (linger_req) {
1dbb4399 1028 ceph_osdc_set_request_linger(osdc, req);
59c2be1e
YS
1029 *linger_req = req;
1030 }
1031
1dbb4399 1032 ret = ceph_osdc_start_request(osdc, req, false);
602adf40
YS
1033 if (ret < 0)
1034 goto done_err;
1035
1036 if (!rbd_cb) {
1dbb4399 1037 ret = ceph_osdc_wait_request(osdc, req);
59c2be1e
YS
1038 if (ver)
1039 *ver = le64_to_cpu(req->r_reassert_version.version);
bd919d45
AE
1040 dout("reassert_ver=%llu\n",
1041 (unsigned long long)
1042 le64_to_cpu(req->r_reassert_version.version));
602adf40
YS
1043 ceph_osdc_put_request(req);
1044 }
1045 return ret;
1046
1047done_err:
1048 bio_chain_put(req_data->bio);
1049 ceph_osdc_put_request(req);
1050done_pages:
1fec7093 1051 rbd_coll_end_req(req_data, ret, len);
602adf40 1052 kfree(req_data);
602adf40
YS
1053 return ret;
1054}
1055
1056/*
1057 * Ceph osd op callback
1058 */
1059static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1060{
1061 struct rbd_request *req_data = req->r_priv;
1062 struct ceph_osd_reply_head *replyhead;
1063 struct ceph_osd_op *op;
1064 __s32 rc;
1065 u64 bytes;
1066 int read_op;
1067
1068 /* parse reply */
1069 replyhead = msg->front.iov_base;
1070 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1071 op = (void *)(replyhead + 1);
1072 rc = le32_to_cpu(replyhead->result);
1073 bytes = le64_to_cpu(op->extent.length);
895cfcc8 1074 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
602adf40 1075
bd919d45
AE
1076 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1077 (unsigned long long) bytes, read_op, (int) rc);
602adf40
YS
1078
1079 if (rc == -ENOENT && read_op) {
1080 zero_bio_chain(req_data->bio, 0);
1081 rc = 0;
1082 } else if (rc == 0 && read_op && bytes < req_data->len) {
1083 zero_bio_chain(req_data->bio, bytes);
1084 bytes = req_data->len;
1085 }
1086
1fec7093 1087 rbd_coll_end_req(req_data, rc, bytes);
602adf40
YS
1088
1089 if (req_data->bio)
1090 bio_chain_put(req_data->bio);
1091
1092 ceph_osdc_put_request(req);
1093 kfree(req_data);
1094}
1095
59c2be1e
YS
1096static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1097{
1098 ceph_osdc_put_request(req);
1099}
1100
602adf40
YS
1101/*
1102 * Do a synchronous ceph osd operation
1103 */
0ce1a794 1104static int rbd_req_sync_op(struct rbd_device *rbd_dev,
602adf40
YS
1105 struct ceph_snap_context *snapc,
1106 u64 snapid,
602adf40 1107 int flags,
913d2fdc 1108 struct ceph_osd_req_op *ops,
aded07ea 1109 const char *object_name,
602adf40 1110 u64 ofs, u64 len,
59c2be1e
YS
1111 char *buf,
1112 struct ceph_osd_request **linger_req,
1113 u64 *ver)
602adf40
YS
1114{
1115 int ret;
1116 struct page **pages;
1117 int num_pages;
913d2fdc 1118
aafb230e 1119 rbd_assert(ops != NULL);
602adf40
YS
1120
1121 num_pages = calc_pages_for(ofs , len);
1122 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
b8d0638a
DC
1123 if (IS_ERR(pages))
1124 return PTR_ERR(pages);
602adf40 1125
0ce1a794 1126 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
aded07ea 1127 object_name, ofs, len, NULL,
602adf40
YS
1128 pages, num_pages,
1129 flags,
1130 ops,
1fec7093 1131 NULL, 0,
59c2be1e
YS
1132 NULL,
1133 linger_req, ver);
602adf40 1134 if (ret < 0)
913d2fdc 1135 goto done;
602adf40
YS
1136
1137 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1138 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1139
602adf40
YS
1140done:
1141 ceph_release_page_vector(pages, num_pages);
1142 return ret;
1143}
1144
1145/*
1146 * Do an asynchronous ceph osd operation
1147 */
1148static int rbd_do_op(struct request *rq,
0ce1a794 1149 struct rbd_device *rbd_dev,
602adf40
YS
1150 struct ceph_snap_context *snapc,
1151 u64 snapid,
d1f57ea6 1152 int opcode, int flags,
602adf40 1153 u64 ofs, u64 len,
1fec7093
YS
1154 struct bio *bio,
1155 struct rbd_req_coll *coll,
1156 int coll_index)
602adf40
YS
1157{
1158 char *seg_name;
1159 u64 seg_ofs;
1160 u64 seg_len;
1161 int ret;
1162 struct ceph_osd_req_op *ops;
1163 u32 payload_len;
1164
65ccfe21 1165 seg_name = rbd_segment_name(rbd_dev, ofs);
602adf40
YS
1166 if (!seg_name)
1167 return -ENOMEM;
65ccfe21
AE
1168 seg_len = rbd_segment_length(rbd_dev, ofs, len);
1169 seg_ofs = rbd_segment_offset(rbd_dev, ofs);
602adf40
YS
1170
1171 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1172
57cfc106
AE
1173 ret = -ENOMEM;
1174 ops = rbd_create_rw_ops(1, opcode, payload_len);
1175 if (!ops)
602adf40
YS
1176 goto done;
1177
1178 /* we've taken care of segment sizes earlier when we
1179 cloned the bios. We should never have a segment
1180 truncated at this point */
aafb230e 1181 rbd_assert(seg_len == len);
602adf40
YS
1182
1183 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1184 seg_name, seg_ofs, seg_len,
1185 bio,
1186 NULL, 0,
1187 flags,
1188 ops,
1fec7093 1189 coll, coll_index,
59c2be1e 1190 rbd_req_cb, 0, NULL);
11f77002
SW
1191
1192 rbd_destroy_ops(ops);
602adf40
YS
1193done:
1194 kfree(seg_name);
1195 return ret;
1196}
1197
1198/*
1199 * Request async osd write
1200 */
1201static int rbd_req_write(struct request *rq,
1202 struct rbd_device *rbd_dev,
1203 struct ceph_snap_context *snapc,
1204 u64 ofs, u64 len,
1fec7093
YS
1205 struct bio *bio,
1206 struct rbd_req_coll *coll,
1207 int coll_index)
602adf40
YS
1208{
1209 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1210 CEPH_OSD_OP_WRITE,
1211 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1fec7093 1212 ofs, len, bio, coll, coll_index);
602adf40
YS
1213}
1214
1215/*
1216 * Request async osd read
1217 */
1218static int rbd_req_read(struct request *rq,
1219 struct rbd_device *rbd_dev,
1220 u64 snapid,
1221 u64 ofs, u64 len,
1fec7093
YS
1222 struct bio *bio,
1223 struct rbd_req_coll *coll,
1224 int coll_index)
602adf40
YS
1225{
1226 return rbd_do_op(rq, rbd_dev, NULL,
b06e6a6b 1227 snapid,
602adf40
YS
1228 CEPH_OSD_OP_READ,
1229 CEPH_OSD_FLAG_READ,
1fec7093 1230 ofs, len, bio, coll, coll_index);
602adf40
YS
1231}
1232
1233/*
1234 * Request sync osd read
1235 */
0ce1a794 1236static int rbd_req_sync_read(struct rbd_device *rbd_dev,
602adf40 1237 u64 snapid,
aded07ea 1238 const char *object_name,
602adf40 1239 u64 ofs, u64 len,
59c2be1e
YS
1240 char *buf,
1241 u64 *ver)
602adf40 1242{
913d2fdc
AE
1243 struct ceph_osd_req_op *ops;
1244 int ret;
1245
1246 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1247 if (!ops)
1248 return -ENOMEM;
1249
1250 ret = rbd_req_sync_op(rbd_dev, NULL,
b06e6a6b 1251 snapid,
602adf40 1252 CEPH_OSD_FLAG_READ,
913d2fdc
AE
1253 ops, object_name, ofs, len, buf, NULL, ver);
1254 rbd_destroy_ops(ops);
1255
1256 return ret;
602adf40
YS
1257}
1258
1259/*
59c2be1e
YS
1260 * Request sync osd watch
1261 */
0ce1a794 1262static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
59c2be1e 1263 u64 ver,
7f0a24d8 1264 u64 notify_id)
59c2be1e
YS
1265{
1266 struct ceph_osd_req_op *ops;
11f77002
SW
1267 int ret;
1268
57cfc106
AE
1269 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1270 if (!ops)
1271 return -ENOMEM;
59c2be1e 1272
a71b891b 1273 ops[0].watch.ver = cpu_to_le64(ver);
59c2be1e
YS
1274 ops[0].watch.cookie = notify_id;
1275 ops[0].watch.flag = 0;
1276
0ce1a794 1277 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
7f0a24d8 1278 rbd_dev->header_name, 0, 0, NULL,
ad4f232f 1279 NULL, 0,
59c2be1e
YS
1280 CEPH_OSD_FLAG_READ,
1281 ops,
1fec7093 1282 NULL, 0,
59c2be1e
YS
1283 rbd_simple_req_cb, 0, NULL);
1284
1285 rbd_destroy_ops(ops);
1286 return ret;
1287}
1288
1289static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1290{
0ce1a794 1291 struct rbd_device *rbd_dev = (struct rbd_device *)data;
a71b891b 1292 u64 hver;
13143d2d
SW
1293 int rc;
1294
0ce1a794 1295 if (!rbd_dev)
59c2be1e
YS
1296 return;
1297
bd919d45
AE
1298 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1299 rbd_dev->header_name, (unsigned long long) notify_id,
1300 (unsigned int) opcode);
1fe5e993 1301 rc = rbd_refresh_header(rbd_dev, &hver);
13143d2d 1302 if (rc)
f0f8cef5 1303 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
0ce1a794 1304 " update snaps: %d\n", rbd_dev->major, rc);
59c2be1e 1305
7f0a24d8 1306 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
59c2be1e
YS
1307}
1308
1309/*
1310 * Request sync osd watch
1311 */
0e6f322d 1312static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
59c2be1e
YS
1313{
1314 struct ceph_osd_req_op *ops;
0ce1a794 1315 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
57cfc106 1316 int ret;
59c2be1e 1317
57cfc106
AE
1318 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1319 if (!ops)
1320 return -ENOMEM;
59c2be1e
YS
1321
1322 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
0ce1a794 1323 (void *)rbd_dev, &rbd_dev->watch_event);
59c2be1e
YS
1324 if (ret < 0)
1325 goto fail;
1326
0e6f322d 1327 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
0ce1a794 1328 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
59c2be1e
YS
1329 ops[0].watch.flag = 1;
1330
0ce1a794 1331 ret = rbd_req_sync_op(rbd_dev, NULL,
59c2be1e 1332 CEPH_NOSNAP,
59c2be1e
YS
1333 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1334 ops,
0e6f322d
AE
1335 rbd_dev->header_name,
1336 0, 0, NULL,
0ce1a794 1337 &rbd_dev->watch_request, NULL);
59c2be1e
YS
1338
1339 if (ret < 0)
1340 goto fail_event;
1341
1342 rbd_destroy_ops(ops);
1343 return 0;
1344
1345fail_event:
0ce1a794
AE
1346 ceph_osdc_cancel_event(rbd_dev->watch_event);
1347 rbd_dev->watch_event = NULL;
59c2be1e
YS
1348fail:
1349 rbd_destroy_ops(ops);
1350 return ret;
1351}
1352
79e3057c
YS
1353/*
1354 * Request sync osd unwatch
1355 */
070c633f 1356static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
79e3057c
YS
1357{
1358 struct ceph_osd_req_op *ops;
57cfc106 1359 int ret;
79e3057c 1360
57cfc106
AE
1361 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1362 if (!ops)
1363 return -ENOMEM;
79e3057c
YS
1364
1365 ops[0].watch.ver = 0;
0ce1a794 1366 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
79e3057c
YS
1367 ops[0].watch.flag = 0;
1368
0ce1a794 1369 ret = rbd_req_sync_op(rbd_dev, NULL,
79e3057c 1370 CEPH_NOSNAP,
79e3057c
YS
1371 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1372 ops,
070c633f
AE
1373 rbd_dev->header_name,
1374 0, 0, NULL, NULL, NULL);
1375
79e3057c
YS
1376
1377 rbd_destroy_ops(ops);
0ce1a794
AE
1378 ceph_osdc_cancel_event(rbd_dev->watch_event);
1379 rbd_dev->watch_event = NULL;
79e3057c
YS
1380 return ret;
1381}
1382
59c2be1e 1383struct rbd_notify_info {
0ce1a794 1384 struct rbd_device *rbd_dev;
59c2be1e
YS
1385};
1386
1387static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1388{
0ce1a794
AE
1389 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1390 if (!rbd_dev)
59c2be1e
YS
1391 return;
1392
bd919d45
AE
1393 dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1394 rbd_dev->header_name, (unsigned long long) notify_id,
1395 (unsigned int) opcode);
59c2be1e
YS
1396}
1397
1398/*
1399 * Request sync osd notify
1400 */
4cb16250 1401static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
59c2be1e
YS
1402{
1403 struct ceph_osd_req_op *ops;
0ce1a794 1404 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
59c2be1e
YS
1405 struct ceph_osd_event *event;
1406 struct rbd_notify_info info;
1407 int payload_len = sizeof(u32) + sizeof(u32);
1408 int ret;
1409
57cfc106
AE
1410 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1411 if (!ops)
1412 return -ENOMEM;
59c2be1e 1413
0ce1a794 1414 info.rbd_dev = rbd_dev;
59c2be1e
YS
1415
1416 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1417 (void *)&info, &event);
1418 if (ret < 0)
1419 goto fail;
1420
1421 ops[0].watch.ver = 1;
1422 ops[0].watch.flag = 1;
1423 ops[0].watch.cookie = event->cookie;
1424 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1425 ops[0].watch.timeout = 12;
1426
0ce1a794 1427 ret = rbd_req_sync_op(rbd_dev, NULL,
59c2be1e 1428 CEPH_NOSNAP,
59c2be1e
YS
1429 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1430 ops,
4cb16250
AE
1431 rbd_dev->header_name,
1432 0, 0, NULL, NULL, NULL);
59c2be1e
YS
1433 if (ret < 0)
1434 goto fail_event;
1435
1436 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1437 dout("ceph_osdc_wait_event returned %d\n", ret);
1438 rbd_destroy_ops(ops);
1439 return 0;
1440
1441fail_event:
1442 ceph_osdc_cancel_event(event);
1443fail:
1444 rbd_destroy_ops(ops);
1445 return ret;
1446}
1447
602adf40
YS
1448/*
1449 * Request sync osd read
1450 */
0ce1a794 1451static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
aded07ea
AE
1452 const char *object_name,
1453 const char *class_name,
1454 const char *method_name,
602adf40 1455 const char *data,
59c2be1e
YS
1456 int len,
1457 u64 *ver)
602adf40
YS
1458{
1459 struct ceph_osd_req_op *ops;
aded07ea
AE
1460 int class_name_len = strlen(class_name);
1461 int method_name_len = strlen(method_name);
57cfc106
AE
1462 int ret;
1463
1464 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
aded07ea 1465 class_name_len + method_name_len + len);
57cfc106
AE
1466 if (!ops)
1467 return -ENOMEM;
602adf40 1468
aded07ea
AE
1469 ops[0].cls.class_name = class_name;
1470 ops[0].cls.class_len = (__u8) class_name_len;
1471 ops[0].cls.method_name = method_name;
1472 ops[0].cls.method_len = (__u8) method_name_len;
602adf40
YS
1473 ops[0].cls.argc = 0;
1474 ops[0].cls.indata = data;
1475 ops[0].cls.indata_len = len;
1476
0ce1a794 1477 ret = rbd_req_sync_op(rbd_dev, NULL,
602adf40 1478 CEPH_NOSNAP,
602adf40
YS
1479 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1480 ops,
d1f57ea6 1481 object_name, 0, 0, NULL, NULL, ver);
602adf40
YS
1482
1483 rbd_destroy_ops(ops);
1484
1485 dout("cls_exec returned %d\n", ret);
1486 return ret;
1487}
1488
1fec7093
YS
1489static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1490{
1491 struct rbd_req_coll *coll =
1492 kzalloc(sizeof(struct rbd_req_coll) +
1493 sizeof(struct rbd_req_status) * num_reqs,
1494 GFP_ATOMIC);
1495
1496 if (!coll)
1497 return NULL;
1498 coll->total = num_reqs;
1499 kref_init(&coll->kref);
1500 return coll;
1501}
1502
602adf40
YS
1503/*
1504 * block device queue callback
1505 */
1506static void rbd_rq_fn(struct request_queue *q)
1507{
1508 struct rbd_device *rbd_dev = q->queuedata;
1509 struct request *rq;
1510 struct bio_pair *bp = NULL;
1511
00f1f36f 1512 while ((rq = blk_fetch_request(q))) {
602adf40
YS
1513 struct bio *bio;
1514 struct bio *rq_bio, *next_bio = NULL;
1515 bool do_write;
bd919d45
AE
1516 unsigned int size;
1517 u64 op_size = 0;
602adf40 1518 u64 ofs;
1fec7093
YS
1519 int num_segs, cur_seg = 0;
1520 struct rbd_req_coll *coll;
d1d25646 1521 struct ceph_snap_context *snapc;
602adf40 1522
602adf40
YS
1523 dout("fetched request\n");
1524
1525 /* filter out block requests we don't understand */
1526 if ((rq->cmd_type != REQ_TYPE_FS)) {
1527 __blk_end_request_all(rq, 0);
00f1f36f 1528 continue;
602adf40
YS
1529 }
1530
1531 /* deduce our operation (read, write) */
1532 do_write = (rq_data_dir(rq) == WRITE);
1533
1534 size = blk_rq_bytes(rq);
593a9e7b 1535 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
602adf40
YS
1536 rq_bio = rq->bio;
1537 if (do_write && rbd_dev->read_only) {
1538 __blk_end_request_all(rq, -EROFS);
00f1f36f 1539 continue;
602adf40
YS
1540 }
1541
1542 spin_unlock_irq(q->queue_lock);
1543
d1d25646 1544 down_read(&rbd_dev->header_rwsem);
e88a36ec 1545
d1d25646 1546 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
e88a36ec 1547 up_read(&rbd_dev->header_rwsem);
d1d25646
JD
1548 dout("request for non-existent snapshot");
1549 spin_lock_irq(q->queue_lock);
1550 __blk_end_request_all(rq, -ENXIO);
1551 continue;
e88a36ec
JD
1552 }
1553
d1d25646
JD
1554 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1555
1556 up_read(&rbd_dev->header_rwsem);
1557
602adf40
YS
1558 dout("%s 0x%x bytes at 0x%llx\n",
1559 do_write ? "write" : "read",
bd919d45 1560 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
602adf40 1561
1fec7093 1562 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
df111be6
AE
1563 if (num_segs <= 0) {
1564 spin_lock_irq(q->queue_lock);
1565 __blk_end_request_all(rq, num_segs);
1566 ceph_put_snap_context(snapc);
1567 continue;
1568 }
1fec7093
YS
1569 coll = rbd_alloc_coll(num_segs);
1570 if (!coll) {
1571 spin_lock_irq(q->queue_lock);
1572 __blk_end_request_all(rq, -ENOMEM);
d1d25646 1573 ceph_put_snap_context(snapc);
00f1f36f 1574 continue;
1fec7093
YS
1575 }
1576
602adf40
YS
1577 do {
1578 /* a bio clone to be passed down to OSD req */
bd919d45 1579 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
65ccfe21 1580 op_size = rbd_segment_length(rbd_dev, ofs, size);
1fec7093 1581 kref_get(&coll->kref);
602adf40
YS
1582 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1583 op_size, GFP_ATOMIC);
1584 if (!bio) {
1fec7093
YS
1585 rbd_coll_end_req_index(rq, coll, cur_seg,
1586 -ENOMEM, op_size);
1587 goto next_seg;
602adf40
YS
1588 }
1589
1fec7093 1590
602adf40
YS
1591 /* init OSD command: write or read */
1592 if (do_write)
1593 rbd_req_write(rq, rbd_dev,
d1d25646 1594 snapc,
602adf40 1595 ofs,
1fec7093
YS
1596 op_size, bio,
1597 coll, cur_seg);
602adf40
YS
1598 else
1599 rbd_req_read(rq, rbd_dev,
77dfe99f 1600 rbd_dev->snap_id,
602adf40 1601 ofs,
1fec7093
YS
1602 op_size, bio,
1603 coll, cur_seg);
602adf40 1604
1fec7093 1605next_seg:
602adf40
YS
1606 size -= op_size;
1607 ofs += op_size;
1608
1fec7093 1609 cur_seg++;
602adf40
YS
1610 rq_bio = next_bio;
1611 } while (size > 0);
1fec7093 1612 kref_put(&coll->kref, rbd_coll_release);
602adf40
YS
1613
1614 if (bp)
1615 bio_pair_release(bp);
602adf40 1616 spin_lock_irq(q->queue_lock);
d1d25646
JD
1617
1618 ceph_put_snap_context(snapc);
602adf40
YS
1619 }
1620}
1621
1622/*
1623 * a queue callback. Makes sure that we don't create a bio that spans across
1624 * multiple osd objects. One exception would be with a single page bios,
1625 * which we handle later at bio_chain_clone
1626 */
1627static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1628 struct bio_vec *bvec)
1629{
1630 struct rbd_device *rbd_dev = q->queuedata;
593a9e7b
AE
1631 unsigned int chunk_sectors;
1632 sector_t sector;
1633 unsigned int bio_sectors;
602adf40
YS
1634 int max;
1635
593a9e7b
AE
1636 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1637 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1638 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1639
602adf40 1640 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
593a9e7b 1641 + bio_sectors)) << SECTOR_SHIFT;
602adf40
YS
1642 if (max < 0)
1643 max = 0; /* bio_add cannot handle a negative return */
1644 if (max <= bvec->bv_len && bio_sectors == 0)
1645 return bvec->bv_len;
1646 return max;
1647}
1648
1649static void rbd_free_disk(struct rbd_device *rbd_dev)
1650{
1651 struct gendisk *disk = rbd_dev->disk;
1652
1653 if (!disk)
1654 return;
1655
1656 rbd_header_free(&rbd_dev->header);
1657
1658 if (disk->flags & GENHD_FL_UP)
1659 del_gendisk(disk);
1660 if (disk->queue)
1661 blk_cleanup_queue(disk->queue);
1662 put_disk(disk);
1663}
1664
1665/*
4156d998
AE
1666 * Read the complete header for the given rbd device.
1667 *
1668 * Returns a pointer to a dynamically-allocated buffer containing
1669 * the complete and validated header. Caller can pass the address
1670 * of a variable that will be filled in with the version of the
1671 * header object at the time it was read.
1672 *
1673 * Returns a pointer-coded errno if a failure occurs.
602adf40 1674 */
4156d998
AE
1675static struct rbd_image_header_ondisk *
1676rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
602adf40 1677{
4156d998 1678 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 1679 u32 snap_count = 0;
4156d998
AE
1680 u64 names_size = 0;
1681 u32 want_count;
1682 int ret;
602adf40 1683
00f1f36f 1684 /*
4156d998
AE
1685 * The complete header will include an array of its 64-bit
1686 * snapshot ids, followed by the names of those snapshots as
1687 * a contiguous block of NUL-terminated strings. Note that
1688 * the number of snapshots could change by the time we read
1689 * it in, in which case we re-read it.
00f1f36f 1690 */
4156d998
AE
1691 do {
1692 size_t size;
1693
1694 kfree(ondisk);
1695
1696 size = sizeof (*ondisk);
1697 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1698 size += names_size;
1699 ondisk = kmalloc(size, GFP_KERNEL);
1700 if (!ondisk)
1701 return ERR_PTR(-ENOMEM);
1702
1703 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
0bed54dc 1704 rbd_dev->header_name,
4156d998
AE
1705 0, size,
1706 (char *) ondisk, version);
1707
1708 if (ret < 0)
1709 goto out_err;
1710 if (WARN_ON((size_t) ret < size)) {
1711 ret = -ENXIO;
1712 pr_warning("short header read for image %s"
1713 " (want %zd got %d)\n",
1714 rbd_dev->image_name, size, ret);
1715 goto out_err;
1716 }
1717 if (!rbd_dev_ondisk_valid(ondisk)) {
1718 ret = -ENXIO;
1719 pr_warning("invalid header for image %s\n",
1720 rbd_dev->image_name);
1721 goto out_err;
81e759fb 1722 }
602adf40 1723
4156d998
AE
1724 names_size = le64_to_cpu(ondisk->snap_names_len);
1725 want_count = snap_count;
1726 snap_count = le32_to_cpu(ondisk->snap_count);
1727 } while (snap_count != want_count);
00f1f36f 1728
4156d998 1729 return ondisk;
00f1f36f 1730
4156d998
AE
1731out_err:
1732 kfree(ondisk);
1733
1734 return ERR_PTR(ret);
1735}
1736
1737/*
1738 * reload the ondisk the header
1739 */
1740static int rbd_read_header(struct rbd_device *rbd_dev,
1741 struct rbd_image_header *header)
1742{
1743 struct rbd_image_header_ondisk *ondisk;
1744 u64 ver = 0;
1745 int ret;
602adf40 1746
4156d998
AE
1747 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1748 if (IS_ERR(ondisk))
1749 return PTR_ERR(ondisk);
1750 ret = rbd_header_from_disk(header, ondisk);
1751 if (ret >= 0)
1752 header->obj_version = ver;
1753 kfree(ondisk);
1754
1755 return ret;
602adf40
YS
1756}
1757
1758/*
1759 * create a snapshot
1760 */
0ce1a794 1761static int rbd_header_add_snap(struct rbd_device *rbd_dev,
602adf40
YS
1762 const char *snap_name,
1763 gfp_t gfp_flags)
1764{
1765 int name_len = strlen(snap_name);
1766 u64 new_snapid;
1767 int ret;
916d4d67 1768 void *data, *p, *e;
1dbb4399 1769 struct ceph_mon_client *monc;
602adf40
YS
1770
1771 /* we should create a snapshot only if we're pointing at the head */
0ce1a794 1772 if (rbd_dev->snap_id != CEPH_NOSNAP)
602adf40
YS
1773 return -EINVAL;
1774
0ce1a794
AE
1775 monc = &rbd_dev->rbd_client->client->monc;
1776 ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
bd919d45 1777 dout("created snapid=%llu\n", (unsigned long long) new_snapid);
602adf40
YS
1778 if (ret < 0)
1779 return ret;
1780
1781 data = kmalloc(name_len + 16, gfp_flags);
1782 if (!data)
1783 return -ENOMEM;
1784
916d4d67
SW
1785 p = data;
1786 e = data + name_len + 16;
602adf40 1787
916d4d67
SW
1788 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1789 ceph_encode_64_safe(&p, e, new_snapid, bad);
602adf40 1790
0bed54dc 1791 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
0ce1a794 1792 "rbd", "snap_add",
d67d4be5 1793 data, p - data, NULL);
602adf40 1794
916d4d67 1795 kfree(data);
602adf40 1796
505cbb9b 1797 return ret < 0 ? ret : 0;
602adf40
YS
1798bad:
1799 return -ERANGE;
1800}
1801
dfc5606d
YS
1802static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1803{
1804 struct rbd_snap *snap;
a0593290 1805 struct rbd_snap *next;
dfc5606d 1806
a0593290 1807 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
14e7085d 1808 __rbd_remove_snap_dev(snap);
dfc5606d
YS
1809}
1810
602adf40
YS
1811/*
1812 * only read the first part of the ondisk header, without the snaps info
1813 */
b813623a 1814static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
602adf40
YS
1815{
1816 int ret;
1817 struct rbd_image_header h;
602adf40
YS
1818
1819 ret = rbd_read_header(rbd_dev, &h);
1820 if (ret < 0)
1821 return ret;
1822
a51aa0c0
JD
1823 down_write(&rbd_dev->header_rwsem);
1824
9db4b3e3 1825 /* resized? */
474ef7ce
JD
1826 if (rbd_dev->snap_id == CEPH_NOSNAP) {
1827 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1828
1829 dout("setting size to %llu sectors", (unsigned long long) size);
1830 set_capacity(rbd_dev->disk, size);
1831 }
9db4b3e3 1832
849b4260 1833 /* rbd_dev->header.object_prefix shouldn't change */
602adf40 1834 kfree(rbd_dev->header.snap_sizes);
849b4260 1835 kfree(rbd_dev->header.snap_names);
d1d25646
JD
1836 /* osd requests may still refer to snapc */
1837 ceph_put_snap_context(rbd_dev->header.snapc);
602adf40 1838
b813623a
AE
1839 if (hver)
1840 *hver = h.obj_version;
a71b891b 1841 rbd_dev->header.obj_version = h.obj_version;
93a24e08 1842 rbd_dev->header.image_size = h.image_size;
602adf40
YS
1843 rbd_dev->header.total_snaps = h.total_snaps;
1844 rbd_dev->header.snapc = h.snapc;
1845 rbd_dev->header.snap_names = h.snap_names;
1846 rbd_dev->header.snap_sizes = h.snap_sizes;
849b4260
AE
1847 /* Free the extra copy of the object prefix */
1848 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1849 kfree(h.object_prefix);
1850
dfc5606d
YS
1851 ret = __rbd_init_snaps_header(rbd_dev);
1852
c666601a 1853 up_write(&rbd_dev->header_rwsem);
602adf40 1854
dfc5606d 1855 return ret;
602adf40
YS
1856}
1857
1fe5e993
AE
1858static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1859{
1860 int ret;
1861
1862 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1863 ret = __rbd_refresh_header(rbd_dev, hver);
1864 mutex_unlock(&ctl_mutex);
1865
1866 return ret;
1867}
1868
602adf40
YS
1869static int rbd_init_disk(struct rbd_device *rbd_dev)
1870{
1871 struct gendisk *disk;
1872 struct request_queue *q;
1873 int rc;
593a9e7b 1874 u64 segment_size;
602adf40
YS
1875 u64 total_size = 0;
1876
1877 /* contact OSD, request size info about the object being mapped */
1878 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1879 if (rc)
1880 return rc;
1881
dfc5606d
YS
1882 /* no need to lock here, as rbd_dev is not registered yet */
1883 rc = __rbd_init_snaps_header(rbd_dev);
1884 if (rc)
1885 return rc;
1886
cc9d734c 1887 rc = rbd_header_set_snap(rbd_dev, &total_size);
602adf40
YS
1888 if (rc)
1889 return rc;
1890
1891 /* create gendisk info */
1892 rc = -ENOMEM;
1893 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1894 if (!disk)
1895 goto out;
1896
f0f8cef5 1897 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 1898 rbd_dev->dev_id);
602adf40
YS
1899 disk->major = rbd_dev->major;
1900 disk->first_minor = 0;
1901 disk->fops = &rbd_bd_ops;
1902 disk->private_data = rbd_dev;
1903
1904 /* init rq */
1905 rc = -ENOMEM;
1906 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1907 if (!q)
1908 goto out_disk;
029bcbd8 1909
593a9e7b
AE
1910 /* We use the default size, but let's be explicit about it. */
1911 blk_queue_physical_block_size(q, SECTOR_SIZE);
1912
029bcbd8 1913 /* set io sizes to object size */
593a9e7b
AE
1914 segment_size = rbd_obj_bytes(&rbd_dev->header);
1915 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1916 blk_queue_max_segment_size(q, segment_size);
1917 blk_queue_io_min(q, segment_size);
1918 blk_queue_io_opt(q, segment_size);
029bcbd8 1919
602adf40
YS
1920 blk_queue_merge_bvec(q, rbd_merge_bvec);
1921 disk->queue = q;
1922
1923 q->queuedata = rbd_dev;
1924
1925 rbd_dev->disk = disk;
1926 rbd_dev->q = q;
1927
1928 /* finally, announce the disk to the world */
593a9e7b 1929 set_capacity(disk, total_size / SECTOR_SIZE);
602adf40
YS
1930 add_disk(disk);
1931
1932 pr_info("%s: added with size 0x%llx\n",
1933 disk->disk_name, (unsigned long long)total_size);
1934 return 0;
1935
1936out_disk:
1937 put_disk(disk);
1938out:
1939 return rc;
1940}
1941
dfc5606d
YS
1942/*
1943 sysfs
1944*/
1945
593a9e7b
AE
1946static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1947{
1948 return container_of(dev, struct rbd_device, dev);
1949}
1950
dfc5606d
YS
1951static ssize_t rbd_size_show(struct device *dev,
1952 struct device_attribute *attr, char *buf)
1953{
593a9e7b 1954 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0
JD
1955 sector_t size;
1956
1957 down_read(&rbd_dev->header_rwsem);
1958 size = get_capacity(rbd_dev->disk);
1959 up_read(&rbd_dev->header_rwsem);
dfc5606d 1960
a51aa0c0 1961 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
dfc5606d
YS
1962}
1963
1964static ssize_t rbd_major_show(struct device *dev,
1965 struct device_attribute *attr, char *buf)
1966{
593a9e7b 1967 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 1968
dfc5606d
YS
1969 return sprintf(buf, "%d\n", rbd_dev->major);
1970}
1971
1972static ssize_t rbd_client_id_show(struct device *dev,
1973 struct device_attribute *attr, char *buf)
602adf40 1974{
593a9e7b 1975 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 1976
1dbb4399
AE
1977 return sprintf(buf, "client%lld\n",
1978 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
1979}
1980
dfc5606d
YS
1981static ssize_t rbd_pool_show(struct device *dev,
1982 struct device_attribute *attr, char *buf)
602adf40 1983{
593a9e7b 1984 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
1985
1986 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1987}
1988
9bb2f334
AE
1989static ssize_t rbd_pool_id_show(struct device *dev,
1990 struct device_attribute *attr, char *buf)
1991{
1992 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1993
1994 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1995}
1996
dfc5606d
YS
1997static ssize_t rbd_name_show(struct device *dev,
1998 struct device_attribute *attr, char *buf)
1999{
593a9e7b 2000 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 2001
0bed54dc 2002 return sprintf(buf, "%s\n", rbd_dev->image_name);
dfc5606d
YS
2003}
2004
2005static ssize_t rbd_snap_show(struct device *dev,
2006 struct device_attribute *attr,
2007 char *buf)
2008{
593a9e7b 2009 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
2010
2011 return sprintf(buf, "%s\n", rbd_dev->snap_name);
2012}
2013
2014static ssize_t rbd_image_refresh(struct device *dev,
2015 struct device_attribute *attr,
2016 const char *buf,
2017 size_t size)
2018{
593a9e7b 2019 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 2020 int ret;
602adf40 2021
1fe5e993 2022 ret = rbd_refresh_header(rbd_dev, NULL);
b813623a
AE
2023
2024 return ret < 0 ? ret : size;
dfc5606d 2025}
602adf40 2026
dfc5606d
YS
2027static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2028static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2029static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2030static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 2031static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d
YS
2032static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2033static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2034static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2035static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
dfc5606d
YS
2036
2037static struct attribute *rbd_attrs[] = {
2038 &dev_attr_size.attr,
2039 &dev_attr_major.attr,
2040 &dev_attr_client_id.attr,
2041 &dev_attr_pool.attr,
9bb2f334 2042 &dev_attr_pool_id.attr,
dfc5606d
YS
2043 &dev_attr_name.attr,
2044 &dev_attr_current_snap.attr,
2045 &dev_attr_refresh.attr,
2046 &dev_attr_create_snap.attr,
dfc5606d
YS
2047 NULL
2048};
2049
2050static struct attribute_group rbd_attr_group = {
2051 .attrs = rbd_attrs,
2052};
2053
2054static const struct attribute_group *rbd_attr_groups[] = {
2055 &rbd_attr_group,
2056 NULL
2057};
2058
2059static void rbd_sysfs_dev_release(struct device *dev)
2060{
2061}
2062
2063static struct device_type rbd_device_type = {
2064 .name = "rbd",
2065 .groups = rbd_attr_groups,
2066 .release = rbd_sysfs_dev_release,
2067};
2068
2069
2070/*
2071 sysfs - snapshots
2072*/
2073
2074static ssize_t rbd_snap_size_show(struct device *dev,
2075 struct device_attribute *attr,
2076 char *buf)
2077{
2078 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2079
3591538f 2080 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
dfc5606d
YS
2081}
2082
2083static ssize_t rbd_snap_id_show(struct device *dev,
2084 struct device_attribute *attr,
2085 char *buf)
2086{
2087 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2088
3591538f 2089 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
dfc5606d
YS
2090}
2091
2092static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2093static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2094
2095static struct attribute *rbd_snap_attrs[] = {
2096 &dev_attr_snap_size.attr,
2097 &dev_attr_snap_id.attr,
2098 NULL,
2099};
2100
2101static struct attribute_group rbd_snap_attr_group = {
2102 .attrs = rbd_snap_attrs,
2103};
2104
2105static void rbd_snap_dev_release(struct device *dev)
2106{
2107 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2108 kfree(snap->name);
2109 kfree(snap);
2110}
2111
2112static const struct attribute_group *rbd_snap_attr_groups[] = {
2113 &rbd_snap_attr_group,
2114 NULL
2115};
2116
2117static struct device_type rbd_snap_device_type = {
2118 .groups = rbd_snap_attr_groups,
2119 .release = rbd_snap_dev_release,
2120};
2121
14e7085d 2122static void __rbd_remove_snap_dev(struct rbd_snap *snap)
dfc5606d
YS
2123{
2124 list_del(&snap->node);
2125 device_unregister(&snap->dev);
2126}
2127
14e7085d 2128static int rbd_register_snap_dev(struct rbd_snap *snap,
dfc5606d
YS
2129 struct device *parent)
2130{
2131 struct device *dev = &snap->dev;
2132 int ret;
2133
2134 dev->type = &rbd_snap_device_type;
2135 dev->parent = parent;
2136 dev->release = rbd_snap_dev_release;
2137 dev_set_name(dev, "snap_%s", snap->name);
2138 ret = device_register(dev);
2139
2140 return ret;
2141}
2142
4e891e0a
AE
2143static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2144 int i, const char *name)
dfc5606d 2145{
4e891e0a 2146 struct rbd_snap *snap;
dfc5606d 2147 int ret;
4e891e0a
AE
2148
2149 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
dfc5606d 2150 if (!snap)
4e891e0a
AE
2151 return ERR_PTR(-ENOMEM);
2152
2153 ret = -ENOMEM;
dfc5606d 2154 snap->name = kstrdup(name, GFP_KERNEL);
4e891e0a
AE
2155 if (!snap->name)
2156 goto err;
2157
dfc5606d
YS
2158 snap->size = rbd_dev->header.snap_sizes[i];
2159 snap->id = rbd_dev->header.snapc->snaps[i];
2160 if (device_is_registered(&rbd_dev->dev)) {
14e7085d 2161 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
dfc5606d
YS
2162 if (ret < 0)
2163 goto err;
2164 }
4e891e0a
AE
2165
2166 return snap;
2167
dfc5606d
YS
2168err:
2169 kfree(snap->name);
2170 kfree(snap);
4e891e0a
AE
2171
2172 return ERR_PTR(ret);
dfc5606d
YS
2173}
2174
2175/*
35938150
AE
2176 * Scan the rbd device's current snapshot list and compare it to the
2177 * newly-received snapshot context. Remove any existing snapshots
2178 * not present in the new snapshot context. Add a new snapshot for
2179 * any snaphots in the snapshot context not in the current list.
2180 * And verify there are no changes to snapshots we already know
2181 * about.
2182 *
2183 * Assumes the snapshots in the snapshot context are sorted by
2184 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2185 * are also maintained in that order.)
dfc5606d
YS
2186 */
2187static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2188{
35938150
AE
2189 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2190 const u32 snap_count = snapc->num_snaps;
2191 char *snap_name = rbd_dev->header.snap_names;
2192 struct list_head *head = &rbd_dev->snaps;
2193 struct list_head *links = head->next;
2194 u32 index = 0;
dfc5606d 2195
35938150
AE
2196 while (index < snap_count || links != head) {
2197 u64 snap_id;
2198 struct rbd_snap *snap;
dfc5606d 2199
35938150
AE
2200 snap_id = index < snap_count ? snapc->snaps[index]
2201 : CEPH_NOSNAP;
2202 snap = links != head ? list_entry(links, struct rbd_snap, node)
2203 : NULL;
aafb230e 2204 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
dfc5606d 2205
35938150
AE
2206 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2207 struct list_head *next = links->next;
dfc5606d 2208
35938150 2209 /* Existing snapshot not in the new snap context */
dfc5606d 2210
35938150 2211 if (rbd_dev->snap_id == snap->id)
e88a36ec 2212 rbd_dev->snap_exists = false;
35938150
AE
2213 __rbd_remove_snap_dev(snap);
2214
2215 /* Done with this list entry; advance */
2216
2217 links = next;
dfc5606d
YS
2218 continue;
2219 }
35938150
AE
2220
2221 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2222 struct rbd_snap *new_snap;
2223
2224 /* We haven't seen this snapshot before */
2225
2226 new_snap = __rbd_add_snap_dev(rbd_dev, index,
2227 snap_name);
2228 if (IS_ERR(new_snap))
2229 return PTR_ERR(new_snap);
2230
2231 /* New goes before existing, or at end of list */
2232
2233 if (snap)
2234 list_add_tail(&new_snap->node, &snap->node);
2235 else
523f3258 2236 list_add_tail(&new_snap->node, head);
35938150
AE
2237 } else {
2238 /* Already have this one */
2239
aafb230e
AE
2240 rbd_assert(snap->size ==
2241 rbd_dev->header.snap_sizes[index]);
2242 rbd_assert(!strcmp(snap->name, snap_name));
35938150
AE
2243
2244 /* Done with this list entry; advance */
2245
2246 links = links->next;
dfc5606d 2247 }
35938150
AE
2248
2249 /* Advance to the next entry in the snapshot context */
2250
2251 index++;
2252 snap_name += strlen(snap_name) + 1;
dfc5606d
YS
2253 }
2254
2255 return 0;
2256}
2257
dfc5606d
YS
2258static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2259{
f0f8cef5 2260 int ret;
dfc5606d
YS
2261 struct device *dev;
2262 struct rbd_snap *snap;
2263
2264 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2265 dev = &rbd_dev->dev;
2266
2267 dev->bus = &rbd_bus_type;
2268 dev->type = &rbd_device_type;
2269 dev->parent = &rbd_root_dev;
2270 dev->release = rbd_dev_release;
de71a297 2271 dev_set_name(dev, "%d", rbd_dev->dev_id);
dfc5606d
YS
2272 ret = device_register(dev);
2273 if (ret < 0)
f0f8cef5 2274 goto out;
dfc5606d
YS
2275
2276 list_for_each_entry(snap, &rbd_dev->snaps, node) {
14e7085d 2277 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
dfc5606d 2278 if (ret < 0)
602adf40
YS
2279 break;
2280 }
f0f8cef5 2281out:
dfc5606d
YS
2282 mutex_unlock(&ctl_mutex);
2283 return ret;
602adf40
YS
2284}
2285
dfc5606d
YS
2286static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2287{
2288 device_unregister(&rbd_dev->dev);
2289}
2290
59c2be1e
YS
2291static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2292{
2293 int ret, rc;
2294
2295 do {
0e6f322d 2296 ret = rbd_req_sync_watch(rbd_dev);
59c2be1e 2297 if (ret == -ERANGE) {
1fe5e993 2298 rc = rbd_refresh_header(rbd_dev, NULL);
59c2be1e
YS
2299 if (rc < 0)
2300 return rc;
2301 }
2302 } while (ret == -ERANGE);
2303
2304 return ret;
2305}
2306
1ddbe94e
AE
2307static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2308
2309/*
499afd5b
AE
2310 * Get a unique rbd identifier for the given new rbd_dev, and add
2311 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 2312 */
499afd5b 2313static void rbd_id_get(struct rbd_device *rbd_dev)
b7f23c36 2314{
de71a297 2315 rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
499afd5b
AE
2316
2317 spin_lock(&rbd_dev_list_lock);
2318 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2319 spin_unlock(&rbd_dev_list_lock);
1ddbe94e 2320}
b7f23c36 2321
1ddbe94e 2322/*
499afd5b
AE
2323 * Remove an rbd_dev from the global list, and record that its
2324 * identifier is no longer in use.
1ddbe94e 2325 */
499afd5b 2326static void rbd_id_put(struct rbd_device *rbd_dev)
1ddbe94e 2327{
d184f6bf 2328 struct list_head *tmp;
de71a297 2329 int rbd_id = rbd_dev->dev_id;
d184f6bf
AE
2330 int max_id;
2331
aafb230e 2332 rbd_assert(rbd_id > 0);
499afd5b
AE
2333
2334 spin_lock(&rbd_dev_list_lock);
2335 list_del_init(&rbd_dev->node);
d184f6bf
AE
2336
2337 /*
2338 * If the id being "put" is not the current maximum, there
2339 * is nothing special we need to do.
2340 */
2341 if (rbd_id != atomic64_read(&rbd_id_max)) {
2342 spin_unlock(&rbd_dev_list_lock);
2343 return;
2344 }
2345
2346 /*
2347 * We need to update the current maximum id. Search the
2348 * list to find out what it is. We're more likely to find
2349 * the maximum at the end, so search the list backward.
2350 */
2351 max_id = 0;
2352 list_for_each_prev(tmp, &rbd_dev_list) {
2353 struct rbd_device *rbd_dev;
2354
2355 rbd_dev = list_entry(tmp, struct rbd_device, node);
2356 if (rbd_id > max_id)
2357 max_id = rbd_id;
2358 }
499afd5b 2359 spin_unlock(&rbd_dev_list_lock);
b7f23c36 2360
1ddbe94e 2361 /*
d184f6bf
AE
2362 * The max id could have been updated by rbd_id_get(), in
2363 * which case it now accurately reflects the new maximum.
2364 * Be careful not to overwrite the maximum value in that
2365 * case.
1ddbe94e 2366 */
d184f6bf 2367 atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
b7f23c36
AE
2368}
2369
e28fff26
AE
2370/*
2371 * Skips over white space at *buf, and updates *buf to point to the
2372 * first found non-space character (if any). Returns the length of
593a9e7b
AE
2373 * the token (string of non-white space characters) found. Note
2374 * that *buf must be terminated with '\0'.
e28fff26
AE
2375 */
2376static inline size_t next_token(const char **buf)
2377{
2378 /*
2379 * These are the characters that produce nonzero for
2380 * isspace() in the "C" and "POSIX" locales.
2381 */
2382 const char *spaces = " \f\n\r\t\v";
2383
2384 *buf += strspn(*buf, spaces); /* Find start of token */
2385
2386 return strcspn(*buf, spaces); /* Return token length */
2387}
2388
2389/*
2390 * Finds the next token in *buf, and if the provided token buffer is
2391 * big enough, copies the found token into it. The result, if
593a9e7b
AE
2392 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2393 * must be terminated with '\0' on entry.
e28fff26
AE
2394 *
2395 * Returns the length of the token found (not including the '\0').
2396 * Return value will be 0 if no token is found, and it will be >=
2397 * token_size if the token would not fit.
2398 *
593a9e7b 2399 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
2400 * found token. Note that this occurs even if the token buffer is
2401 * too small to hold it.
2402 */
2403static inline size_t copy_token(const char **buf,
2404 char *token,
2405 size_t token_size)
2406{
2407 size_t len;
2408
2409 len = next_token(buf);
2410 if (len < token_size) {
2411 memcpy(token, *buf, len);
2412 *(token + len) = '\0';
2413 }
2414 *buf += len;
2415
2416 return len;
2417}
2418
ea3352f4
AE
2419/*
2420 * Finds the next token in *buf, dynamically allocates a buffer big
2421 * enough to hold a copy of it, and copies the token into the new
2422 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2423 * that a duplicate buffer is created even for a zero-length token.
2424 *
2425 * Returns a pointer to the newly-allocated duplicate, or a null
2426 * pointer if memory for the duplicate was not available. If
2427 * the lenp argument is a non-null pointer, the length of the token
2428 * (not including the '\0') is returned in *lenp.
2429 *
2430 * If successful, the *buf pointer will be updated to point beyond
2431 * the end of the found token.
2432 *
2433 * Note: uses GFP_KERNEL for allocation.
2434 */
2435static inline char *dup_token(const char **buf, size_t *lenp)
2436{
2437 char *dup;
2438 size_t len;
2439
2440 len = next_token(buf);
2441 dup = kmalloc(len + 1, GFP_KERNEL);
2442 if (!dup)
2443 return NULL;
2444
2445 memcpy(dup, *buf, len);
2446 *(dup + len) = '\0';
2447 *buf += len;
2448
2449 if (lenp)
2450 *lenp = len;
2451
2452 return dup;
2453}
2454
a725f65e 2455/*
0bed54dc 2456 * This fills in the pool_name, image_name, image_name_len, snap_name,
a725f65e
AE
2457 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2458 * on the list of monitor addresses and other options provided via
2459 * /sys/bus/rbd/add.
d22f76e7
AE
2460 *
2461 * Note: rbd_dev is assumed to have been initially zero-filled.
a725f65e
AE
2462 */
2463static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2464 const char *buf,
7ef3214a 2465 const char **mon_addrs,
5214ecc4 2466 size_t *mon_addrs_size,
e28fff26 2467 char *options,
0bed54dc 2468 size_t options_size)
e28fff26 2469{
d22f76e7
AE
2470 size_t len;
2471 int ret;
e28fff26
AE
2472
2473 /* The first four tokens are required */
2474
7ef3214a
AE
2475 len = next_token(&buf);
2476 if (!len)
a725f65e 2477 return -EINVAL;
5214ecc4 2478 *mon_addrs_size = len + 1;
7ef3214a
AE
2479 *mon_addrs = buf;
2480
2481 buf += len;
a725f65e 2482
e28fff26
AE
2483 len = copy_token(&buf, options, options_size);
2484 if (!len || len >= options_size)
2485 return -EINVAL;
2486
bf3e5ae1 2487 ret = -ENOMEM;
d22f76e7
AE
2488 rbd_dev->pool_name = dup_token(&buf, NULL);
2489 if (!rbd_dev->pool_name)
d22f76e7 2490 goto out_err;
e28fff26 2491
0bed54dc
AE
2492 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2493 if (!rbd_dev->image_name)
bf3e5ae1 2494 goto out_err;
a725f65e 2495
cb8627c7
AE
2496 /* Create the name of the header object */
2497
0bed54dc 2498 rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
bf3e5ae1
AE
2499 + sizeof (RBD_SUFFIX),
2500 GFP_KERNEL);
0bed54dc 2501 if (!rbd_dev->header_name)
cb8627c7 2502 goto out_err;
0bed54dc 2503 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
a725f65e 2504
e28fff26 2505 /*
820a5f3e
AE
2506 * The snapshot name is optional. If none is is supplied,
2507 * we use the default value.
e28fff26 2508 */
820a5f3e
AE
2509 rbd_dev->snap_name = dup_token(&buf, &len);
2510 if (!rbd_dev->snap_name)
2511 goto out_err;
2512 if (!len) {
2513 /* Replace the empty name with the default */
2514 kfree(rbd_dev->snap_name);
2515 rbd_dev->snap_name
2516 = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2517 if (!rbd_dev->snap_name)
2518 goto out_err;
2519
e28fff26
AE
2520 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2521 sizeof (RBD_SNAP_HEAD_NAME));
849b4260 2522 }
e28fff26 2523
a725f65e 2524 return 0;
d22f76e7
AE
2525
2526out_err:
0bed54dc 2527 kfree(rbd_dev->header_name);
d78fd7ae 2528 rbd_dev->header_name = NULL;
0bed54dc 2529 kfree(rbd_dev->image_name);
d78fd7ae
AE
2530 rbd_dev->image_name = NULL;
2531 rbd_dev->image_name_len = 0;
d22f76e7
AE
2532 kfree(rbd_dev->pool_name);
2533 rbd_dev->pool_name = NULL;
2534
2535 return ret;
a725f65e
AE
2536}
2537
59c2be1e
YS
2538static ssize_t rbd_add(struct bus_type *bus,
2539 const char *buf,
2540 size_t count)
602adf40 2541{
cb8627c7
AE
2542 char *options;
2543 struct rbd_device *rbd_dev = NULL;
7ef3214a
AE
2544 const char *mon_addrs = NULL;
2545 size_t mon_addrs_size = 0;
27cc2594
AE
2546 struct ceph_osd_client *osdc;
2547 int rc = -ENOMEM;
602adf40
YS
2548
2549 if (!try_module_get(THIS_MODULE))
2550 return -ENODEV;
2551
60571c7d 2552 options = kmalloc(count, GFP_KERNEL);
602adf40 2553 if (!options)
27cc2594 2554 goto err_nomem;
cb8627c7
AE
2555 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2556 if (!rbd_dev)
2557 goto err_nomem;
602adf40
YS
2558
2559 /* static rbd_device initialization */
2560 spin_lock_init(&rbd_dev->lock);
2561 INIT_LIST_HEAD(&rbd_dev->node);
dfc5606d 2562 INIT_LIST_HEAD(&rbd_dev->snaps);
c666601a 2563 init_rwsem(&rbd_dev->header_rwsem);
602adf40 2564
d184f6bf 2565 /* generate unique id: find highest unique id, add one */
499afd5b 2566 rbd_id_get(rbd_dev);
602adf40 2567
a725f65e 2568 /* Fill in the device name, now that we have its id. */
81a89793
AE
2569 BUILD_BUG_ON(DEV_NAME_LEN
2570 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
de71a297 2571 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
a725f65e 2572
602adf40 2573 /* parse add command */
7ef3214a 2574 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
e28fff26 2575 options, count);
a725f65e 2576 if (rc)
f0f8cef5 2577 goto err_put_id;
e124a82f 2578
f8c38929
AE
2579 rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
2580 if (rc < 0)
f0f8cef5 2581 goto err_put_id;
602adf40 2582
602adf40 2583 /* pick the pool */
1dbb4399 2584 osdc = &rbd_dev->rbd_client->client->osdc;
602adf40
YS
2585 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2586 if (rc < 0)
2587 goto err_out_client;
9bb2f334 2588 rbd_dev->pool_id = rc;
602adf40
YS
2589
2590 /* register our block device */
27cc2594
AE
2591 rc = register_blkdev(0, rbd_dev->name);
2592 if (rc < 0)
602adf40 2593 goto err_out_client;
27cc2594 2594 rbd_dev->major = rc;
602adf40 2595
dfc5606d
YS
2596 rc = rbd_bus_add_dev(rbd_dev);
2597 if (rc)
766fc439
YS
2598 goto err_out_blkdev;
2599
32eec68d
AE
2600 /*
2601 * At this point cleanup in the event of an error is the job
2602 * of the sysfs code (initiated by rbd_bus_del_dev()).
2603 *
2604 * Set up and announce blkdev mapping.
2605 */
602adf40
YS
2606 rc = rbd_init_disk(rbd_dev);
2607 if (rc)
766fc439 2608 goto err_out_bus;
602adf40 2609
59c2be1e
YS
2610 rc = rbd_init_watch_dev(rbd_dev);
2611 if (rc)
2612 goto err_out_bus;
2613
602adf40
YS
2614 return count;
2615
766fc439 2616err_out_bus:
766fc439
YS
2617 /* this will also clean up rest of rbd_dev stuff */
2618
2619 rbd_bus_del_dev(rbd_dev);
2620 kfree(options);
766fc439
YS
2621 return rc;
2622
602adf40
YS
2623err_out_blkdev:
2624 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2625err_out_client:
2626 rbd_put_client(rbd_dev);
f0f8cef5 2627err_put_id:
cb8627c7 2628 if (rbd_dev->pool_name) {
820a5f3e 2629 kfree(rbd_dev->snap_name);
0bed54dc
AE
2630 kfree(rbd_dev->header_name);
2631 kfree(rbd_dev->image_name);
cb8627c7
AE
2632 kfree(rbd_dev->pool_name);
2633 }
499afd5b 2634 rbd_id_put(rbd_dev);
27cc2594 2635err_nomem:
27cc2594 2636 kfree(rbd_dev);
cb8627c7 2637 kfree(options);
27cc2594 2638
602adf40
YS
2639 dout("Error adding device %s\n", buf);
2640 module_put(THIS_MODULE);
27cc2594
AE
2641
2642 return (ssize_t) rc;
602adf40
YS
2643}
2644
de71a297 2645static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
602adf40
YS
2646{
2647 struct list_head *tmp;
2648 struct rbd_device *rbd_dev;
2649
e124a82f 2650 spin_lock(&rbd_dev_list_lock);
602adf40
YS
2651 list_for_each(tmp, &rbd_dev_list) {
2652 rbd_dev = list_entry(tmp, struct rbd_device, node);
de71a297 2653 if (rbd_dev->dev_id == dev_id) {
e124a82f 2654 spin_unlock(&rbd_dev_list_lock);
602adf40 2655 return rbd_dev;
e124a82f 2656 }
602adf40 2657 }
e124a82f 2658 spin_unlock(&rbd_dev_list_lock);
602adf40
YS
2659 return NULL;
2660}
2661
dfc5606d 2662static void rbd_dev_release(struct device *dev)
602adf40 2663{
593a9e7b 2664 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 2665
1dbb4399
AE
2666 if (rbd_dev->watch_request) {
2667 struct ceph_client *client = rbd_dev->rbd_client->client;
2668
2669 ceph_osdc_unregister_linger_request(&client->osdc,
59c2be1e 2670 rbd_dev->watch_request);
1dbb4399 2671 }
59c2be1e 2672 if (rbd_dev->watch_event)
070c633f 2673 rbd_req_sync_unwatch(rbd_dev);
59c2be1e 2674
602adf40
YS
2675 rbd_put_client(rbd_dev);
2676
2677 /* clean up and free blkdev */
2678 rbd_free_disk(rbd_dev);
2679 unregister_blkdev(rbd_dev->major, rbd_dev->name);
32eec68d
AE
2680
2681 /* done with the id, and with the rbd_dev */
820a5f3e 2682 kfree(rbd_dev->snap_name);
0bed54dc 2683 kfree(rbd_dev->header_name);
d22f76e7 2684 kfree(rbd_dev->pool_name);
0bed54dc 2685 kfree(rbd_dev->image_name);
32eec68d 2686 rbd_id_put(rbd_dev);
602adf40
YS
2687 kfree(rbd_dev);
2688
2689 /* release module ref */
2690 module_put(THIS_MODULE);
602adf40
YS
2691}
2692
dfc5606d
YS
2693static ssize_t rbd_remove(struct bus_type *bus,
2694 const char *buf,
2695 size_t count)
602adf40
YS
2696{
2697 struct rbd_device *rbd_dev = NULL;
2698 int target_id, rc;
2699 unsigned long ul;
2700 int ret = count;
2701
2702 rc = strict_strtoul(buf, 10, &ul);
2703 if (rc)
2704 return rc;
2705
2706 /* convert to int; abort if we lost anything in the conversion */
2707 target_id = (int) ul;
2708 if (target_id != ul)
2709 return -EINVAL;
2710
2711 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2712
2713 rbd_dev = __rbd_get_dev(target_id);
2714 if (!rbd_dev) {
2715 ret = -ENOENT;
2716 goto done;
2717 }
2718
dfc5606d
YS
2719 __rbd_remove_all_snaps(rbd_dev);
2720 rbd_bus_del_dev(rbd_dev);
602adf40
YS
2721
2722done:
2723 mutex_unlock(&ctl_mutex);
aafb230e 2724
602adf40
YS
2725 return ret;
2726}
2727
dfc5606d
YS
2728static ssize_t rbd_snap_add(struct device *dev,
2729 struct device_attribute *attr,
2730 const char *buf,
2731 size_t count)
602adf40 2732{
593a9e7b 2733 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
2734 int ret;
2735 char *name = kmalloc(count + 1, GFP_KERNEL);
602adf40
YS
2736 if (!name)
2737 return -ENOMEM;
2738
dfc5606d 2739 snprintf(name, count, "%s", buf);
602adf40
YS
2740
2741 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2742
602adf40
YS
2743 ret = rbd_header_add_snap(rbd_dev,
2744 name, GFP_KERNEL);
2745 if (ret < 0)
59c2be1e 2746 goto err_unlock;
602adf40 2747
b813623a 2748 ret = __rbd_refresh_header(rbd_dev, NULL);
602adf40 2749 if (ret < 0)
59c2be1e
YS
2750 goto err_unlock;
2751
2752 /* shouldn't hold ctl_mutex when notifying.. notify might
2753 trigger a watch callback that would need to get that mutex */
2754 mutex_unlock(&ctl_mutex);
2755
2756 /* make a best effort, don't error if failed */
4cb16250 2757 rbd_req_sync_notify(rbd_dev);
602adf40
YS
2758
2759 ret = count;
59c2be1e
YS
2760 kfree(name);
2761 return ret;
2762
2763err_unlock:
602adf40 2764 mutex_unlock(&ctl_mutex);
602adf40
YS
2765 kfree(name);
2766 return ret;
2767}
2768
602adf40
YS
2769/*
2770 * create control files in sysfs
dfc5606d 2771 * /sys/bus/rbd/...
602adf40
YS
2772 */
2773static int rbd_sysfs_init(void)
2774{
dfc5606d 2775 int ret;
602adf40 2776
fed4c143 2777 ret = device_register(&rbd_root_dev);
21079786 2778 if (ret < 0)
dfc5606d 2779 return ret;
602adf40 2780
fed4c143
AE
2781 ret = bus_register(&rbd_bus_type);
2782 if (ret < 0)
2783 device_unregister(&rbd_root_dev);
602adf40 2784
602adf40
YS
2785 return ret;
2786}
2787
2788static void rbd_sysfs_cleanup(void)
2789{
dfc5606d 2790 bus_unregister(&rbd_bus_type);
fed4c143 2791 device_unregister(&rbd_root_dev);
602adf40
YS
2792}
2793
2794int __init rbd_init(void)
2795{
2796 int rc;
2797
2798 rc = rbd_sysfs_init();
2799 if (rc)
2800 return rc;
f0f8cef5 2801 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
602adf40
YS
2802 return 0;
2803}
2804
2805void __exit rbd_exit(void)
2806{
2807 rbd_sysfs_cleanup();
2808}
2809
2810module_init(rbd_init);
2811module_exit(rbd_exit);
2812
2813MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2814MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2815MODULE_DESCRIPTION("rados block device");
2816
2817/* following authorship retained from original osdblk.c */
2818MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2819
2820MODULE_LICENSE("GPL");
This page took 0.656548 seconds and 5 git commands to generate.