rbd: expand rbd_dev_ondisk_valid() checks
[deliverable/linux.git] / drivers / block / rbd.c
CommitLineData
602adf40
YS
1/*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
dfc5606d 24 For usage instructions, please refer to:
602adf40 25
dfc5606d 26 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
59c2be1e 34#include <linux/parser.h>
602adf40
YS
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
593a9e7b
AE
44/*
45 * The basic unit of block I/O is a sector. It is interpreted in a
46 * number of contexts in Linux (blk, bio, genhd), but the default is
47 * universally 512 bytes. These symbols are just slightly more
48 * meaningful than the bare numbers they represent.
49 */
50#define SECTOR_SHIFT 9
51#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
52
f0f8cef5
AE
53#define RBD_DRV_NAME "rbd"
54#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
55
56#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
57
602adf40
YS
58#define RBD_MAX_SNAP_NAME_LEN 32
59#define RBD_MAX_OPT_LEN 1024
60
61#define RBD_SNAP_HEAD_NAME "-"
62
81a89793
AE
63/*
64 * An RBD device name will be "rbd#", where the "rbd" comes from
65 * RBD_DRV_NAME above, and # is a unique integer identifier.
66 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67 * enough to hold all possible device names.
68 */
602adf40 69#define DEV_NAME_LEN 32
81a89793 70#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40 71
59c2be1e
YS
72#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
73
602adf40
YS
74/*
75 * block device image metadata (in-memory version)
76 */
77struct rbd_image_header {
78 u64 image_size;
849b4260 79 char *object_prefix;
602adf40
YS
80 __u8 obj_order;
81 __u8 crypt_type;
82 __u8 comp_type;
602adf40 83 struct ceph_snap_context *snapc;
0f1d3f93 84 u64 snap_names_len;
602adf40
YS
85 u32 total_snaps;
86
87 char *snap_names;
88 u64 *snap_sizes;
59c2be1e
YS
89
90 u64 obj_version;
91};
92
93struct rbd_options {
94 int notify_timeout;
602adf40
YS
95};
96
97/*
f0f8cef5 98 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
99 */
100struct rbd_client {
101 struct ceph_client *client;
59c2be1e 102 struct rbd_options *rbd_opts;
602adf40
YS
103 struct kref kref;
104 struct list_head node;
105};
106
107/*
f0f8cef5 108 * a request completion status
602adf40 109 */
1fec7093
YS
110struct rbd_req_status {
111 int done;
112 int rc;
113 u64 bytes;
114};
115
116/*
117 * a collection of requests
118 */
119struct rbd_req_coll {
120 int total;
121 int num_done;
122 struct kref kref;
123 struct rbd_req_status status[0];
602adf40
YS
124};
125
f0f8cef5
AE
126/*
127 * a single io request
128 */
129struct rbd_request {
130 struct request *rq; /* blk layer request */
131 struct bio *bio; /* cloned bio */
132 struct page **pages; /* list of used pages */
133 u64 len;
134 int coll_index;
135 struct rbd_req_coll *coll;
136};
137
dfc5606d
YS
138struct rbd_snap {
139 struct device dev;
140 const char *name;
3591538f 141 u64 size;
dfc5606d
YS
142 struct list_head node;
143 u64 id;
144};
145
602adf40
YS
146/*
147 * a single device
148 */
149struct rbd_device {
de71a297 150 int dev_id; /* blkdev unique id */
602adf40
YS
151
152 int major; /* blkdev assigned major */
153 struct gendisk *disk; /* blkdev's gendisk and rq */
154 struct request_queue *q;
155
602adf40
YS
156 struct rbd_client *rbd_client;
157
158 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
159
160 spinlock_t lock; /* queue lock */
161
162 struct rbd_image_header header;
0bed54dc
AE
163 char *image_name;
164 size_t image_name_len;
165 char *header_name;
d22f76e7 166 char *pool_name;
9bb2f334 167 int pool_id;
602adf40 168
59c2be1e
YS
169 struct ceph_osd_event *watch_event;
170 struct ceph_osd_request *watch_request;
171
c666601a
JD
172 /* protects updating the header */
173 struct rw_semaphore header_rwsem;
e88a36ec 174 /* name of the snapshot this device reads from */
820a5f3e 175 char *snap_name;
e88a36ec 176 /* id of the snapshot this device reads from */
77dfe99f 177 u64 snap_id; /* current snapshot id */
e88a36ec
JD
178 /* whether the snap_id this device reads from still exists */
179 bool snap_exists;
180 int read_only;
602adf40
YS
181
182 struct list_head node;
dfc5606d
YS
183
184 /* list of snapshots */
185 struct list_head snaps;
186
187 /* sysfs related */
188 struct device dev;
189};
190
602adf40 191static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 192
602adf40 193static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
194static DEFINE_SPINLOCK(rbd_dev_list_lock);
195
432b8587
AE
196static LIST_HEAD(rbd_client_list); /* clients */
197static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 198
dfc5606d
YS
199static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
200static void rbd_dev_release(struct device *dev);
dfc5606d
YS
201static ssize_t rbd_snap_add(struct device *dev,
202 struct device_attribute *attr,
203 const char *buf,
204 size_t count);
14e7085d 205static void __rbd_remove_snap_dev(struct rbd_snap *snap);
dfc5606d 206
f0f8cef5
AE
207static ssize_t rbd_add(struct bus_type *bus, const char *buf,
208 size_t count);
209static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
210 size_t count);
211
212static struct bus_attribute rbd_bus_attrs[] = {
213 __ATTR(add, S_IWUSR, NULL, rbd_add),
214 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
215 __ATTR_NULL
216};
217
218static struct bus_type rbd_bus_type = {
219 .name = "rbd",
220 .bus_attrs = rbd_bus_attrs,
221};
222
223static void rbd_root_dev_release(struct device *dev)
224{
225}
226
227static struct device rbd_root_dev = {
228 .init_name = "rbd",
229 .release = rbd_root_dev_release,
230};
231
dfc5606d 232
dfc5606d
YS
233static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
234{
235 return get_device(&rbd_dev->dev);
236}
237
238static void rbd_put_dev(struct rbd_device *rbd_dev)
239{
240 put_device(&rbd_dev->dev);
241}
602adf40 242
1fe5e993 243static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
59c2be1e 244
602adf40
YS
245static int rbd_open(struct block_device *bdev, fmode_t mode)
246{
f0f8cef5 247 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
602adf40 248
602adf40
YS
249 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
250 return -EROFS;
251
340c7a2b
AE
252 rbd_get_dev(rbd_dev);
253 set_device_ro(bdev, rbd_dev->read_only);
254
602adf40
YS
255 return 0;
256}
257
dfc5606d
YS
258static int rbd_release(struct gendisk *disk, fmode_t mode)
259{
260 struct rbd_device *rbd_dev = disk->private_data;
261
262 rbd_put_dev(rbd_dev);
263
264 return 0;
265}
266
602adf40
YS
267static const struct block_device_operations rbd_bd_ops = {
268 .owner = THIS_MODULE,
269 .open = rbd_open,
dfc5606d 270 .release = rbd_release,
602adf40
YS
271};
272
273/*
274 * Initialize an rbd client instance.
43ae4701 275 * We own *ceph_opts.
602adf40 276 */
43ae4701 277static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
59c2be1e 278 struct rbd_options *rbd_opts)
602adf40
YS
279{
280 struct rbd_client *rbdc;
281 int ret = -ENOMEM;
282
283 dout("rbd_client_create\n");
284 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
285 if (!rbdc)
286 goto out_opt;
287
288 kref_init(&rbdc->kref);
289 INIT_LIST_HEAD(&rbdc->node);
290
bc534d86
AE
291 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
292
43ae4701 293 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 294 if (IS_ERR(rbdc->client))
bc534d86 295 goto out_mutex;
43ae4701 296 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
297
298 ret = ceph_open_session(rbdc->client);
299 if (ret < 0)
300 goto out_err;
301
59c2be1e
YS
302 rbdc->rbd_opts = rbd_opts;
303
432b8587 304 spin_lock(&rbd_client_list_lock);
602adf40 305 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 306 spin_unlock(&rbd_client_list_lock);
602adf40 307
bc534d86
AE
308 mutex_unlock(&ctl_mutex);
309
602adf40
YS
310 dout("rbd_client_create created %p\n", rbdc);
311 return rbdc;
312
313out_err:
314 ceph_destroy_client(rbdc->client);
bc534d86
AE
315out_mutex:
316 mutex_unlock(&ctl_mutex);
602adf40
YS
317 kfree(rbdc);
318out_opt:
43ae4701
AE
319 if (ceph_opts)
320 ceph_destroy_options(ceph_opts);
28f259b7 321 return ERR_PTR(ret);
602adf40
YS
322}
323
324/*
325 * Find a ceph client with specific addr and configuration.
326 */
43ae4701 327static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
328{
329 struct rbd_client *client_node;
330
43ae4701 331 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
332 return NULL;
333
334 list_for_each_entry(client_node, &rbd_client_list, node)
43ae4701 335 if (!ceph_compare_options(ceph_opts, client_node->client))
602adf40
YS
336 return client_node;
337 return NULL;
338}
339
59c2be1e
YS
340/*
341 * mount options
342 */
343enum {
344 Opt_notify_timeout,
345 Opt_last_int,
346 /* int args above */
347 Opt_last_string,
348 /* string args above */
349};
350
43ae4701 351static match_table_t rbd_opts_tokens = {
59c2be1e
YS
352 {Opt_notify_timeout, "notify_timeout=%d"},
353 /* int args above */
354 /* string args above */
355 {-1, NULL}
356};
357
358static int parse_rbd_opts_token(char *c, void *private)
359{
43ae4701 360 struct rbd_options *rbd_opts = private;
59c2be1e
YS
361 substring_t argstr[MAX_OPT_ARGS];
362 int token, intval, ret;
363
43ae4701 364 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
365 if (token < 0)
366 return -EINVAL;
367
368 if (token < Opt_last_int) {
369 ret = match_int(&argstr[0], &intval);
370 if (ret < 0) {
371 pr_err("bad mount option arg (not int) "
372 "at '%s'\n", c);
373 return ret;
374 }
375 dout("got int token %d val %d\n", token, intval);
376 } else if (token > Opt_last_int && token < Opt_last_string) {
377 dout("got string token %d val %s\n", token,
378 argstr[0].from);
379 } else {
380 dout("got token %d\n", token);
381 }
382
383 switch (token) {
384 case Opt_notify_timeout:
43ae4701 385 rbd_opts->notify_timeout = intval;
59c2be1e
YS
386 break;
387 default:
388 BUG_ON(token);
389 }
390 return 0;
391}
392
602adf40
YS
393/*
394 * Get a ceph client with specific addr and configuration, if one does
395 * not exist create it.
396 */
5214ecc4
AE
397static struct rbd_client *rbd_get_client(const char *mon_addr,
398 size_t mon_addr_len,
399 char *options)
602adf40
YS
400{
401 struct rbd_client *rbdc;
43ae4701 402 struct ceph_options *ceph_opts;
59c2be1e
YS
403 struct rbd_options *rbd_opts;
404
405 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
406 if (!rbd_opts)
d720bcb0 407 return ERR_PTR(-ENOMEM);
59c2be1e
YS
408
409 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
602adf40 410
43ae4701
AE
411 ceph_opts = ceph_parse_options(options, mon_addr,
412 mon_addr + mon_addr_len,
413 parse_rbd_opts_token, rbd_opts);
414 if (IS_ERR(ceph_opts)) {
d720bcb0 415 kfree(rbd_opts);
43ae4701 416 return ERR_CAST(ceph_opts);
ee57741c 417 }
602adf40 418
432b8587 419 spin_lock(&rbd_client_list_lock);
43ae4701 420 rbdc = __rbd_client_find(ceph_opts);
602adf40 421 if (rbdc) {
602adf40
YS
422 /* using an existing client */
423 kref_get(&rbdc->kref);
432b8587 424 spin_unlock(&rbd_client_list_lock);
e6994d3d 425
43ae4701 426 ceph_destroy_options(ceph_opts);
e6994d3d
AE
427 kfree(rbd_opts);
428
d720bcb0 429 return rbdc;
602adf40 430 }
432b8587 431 spin_unlock(&rbd_client_list_lock);
602adf40 432
43ae4701 433 rbdc = rbd_client_create(ceph_opts, rbd_opts);
d97081b0 434
d720bcb0
AE
435 if (IS_ERR(rbdc))
436 kfree(rbd_opts);
602adf40 437
d720bcb0 438 return rbdc;
602adf40
YS
439}
440
441/*
442 * Destroy ceph client
d23a4b3f 443 *
432b8587 444 * Caller must hold rbd_client_list_lock.
602adf40
YS
445 */
446static void rbd_client_release(struct kref *kref)
447{
448 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
449
450 dout("rbd_release_client %p\n", rbdc);
cd9d9f5d 451 spin_lock(&rbd_client_list_lock);
602adf40 452 list_del(&rbdc->node);
cd9d9f5d 453 spin_unlock(&rbd_client_list_lock);
602adf40
YS
454
455 ceph_destroy_client(rbdc->client);
59c2be1e 456 kfree(rbdc->rbd_opts);
602adf40
YS
457 kfree(rbdc);
458}
459
460/*
461 * Drop reference to ceph client node. If it's not referenced anymore, release
462 * it.
463 */
464static void rbd_put_client(struct rbd_device *rbd_dev)
465{
466 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
467 rbd_dev->rbd_client = NULL;
602adf40
YS
468}
469
1fec7093
YS
470/*
471 * Destroy requests collection
472 */
473static void rbd_coll_release(struct kref *kref)
474{
475 struct rbd_req_coll *coll =
476 container_of(kref, struct rbd_req_coll, kref);
477
478 dout("rbd_coll_release %p\n", coll);
479 kfree(coll);
480}
602adf40 481
8e94af8e
AE
482static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
483{
103a150f
AE
484 size_t size;
485 u32 snap_count;
486
487 /* The header has to start with the magic rbd header text */
488 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
489 return false;
490
491 /*
492 * The size of a snapshot header has to fit in a size_t, and
493 * that limits the number of snapshots.
494 */
495 snap_count = le32_to_cpu(ondisk->snap_count);
496 size = SIZE_MAX - sizeof (struct ceph_snap_context);
497 if (snap_count > size / sizeof (__le64))
498 return false;
499
500 /*
501 * Not only that, but the size of the entire the snapshot
502 * header must also be representable in a size_t.
503 */
504 size -= snap_count * sizeof (__le64);
505 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
506 return false;
507
508 return true;
8e94af8e
AE
509}
510
602adf40
YS
511/*
512 * Create a new header structure, translate header format from the on-disk
513 * header.
514 */
515static int rbd_header_from_disk(struct rbd_image_header *header,
516 struct rbd_image_header_ondisk *ondisk,
ed63f4fd 517 u32 allocated_snaps)
602adf40 518{
ccece235 519 u32 snap_count;
d2bb24e5 520 size_t size;
602adf40 521
8e94af8e 522 if (!rbd_dev_ondisk_valid(ondisk))
81e759fb 523 return -ENXIO;
81e759fb 524
6a52325f
AE
525 memset(header, 0, sizeof (*header));
526
103a150f
AE
527 snap_count = le32_to_cpu(ondisk->snap_count);
528
6a52325f
AE
529 size = sizeof (ondisk->block_name) + 1;
530 header->object_prefix = kmalloc(size, GFP_KERNEL);
531 if (!header->object_prefix)
602adf40 532 return -ENOMEM;
6a52325f
AE
533 memcpy(header->object_prefix, ondisk->block_name, size - 1);
534 header->object_prefix[size - 1] = '\0';
00f1f36f 535
602adf40 536 if (snap_count) {
ccece235 537 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
0f1d3f93 538 BUG_ON(header->snap_names_len > (u64) SIZE_MAX);
602adf40 539 header->snap_names = kmalloc(header->snap_names_len,
ed63f4fd 540 GFP_KERNEL);
602adf40 541 if (!header->snap_names)
6a52325f
AE
542 goto out_err;
543
d2bb24e5
AE
544 size = snap_count * sizeof (*header->snap_sizes);
545 header->snap_sizes = kmalloc(size, GFP_KERNEL);
602adf40 546 if (!header->snap_sizes)
6a52325f 547 goto out_err;
602adf40 548 } else {
ccece235
AE
549 WARN_ON(ondisk->snap_names_len);
550 header->snap_names_len = 0;
602adf40
YS
551 header->snap_names = NULL;
552 header->snap_sizes = NULL;
553 }
849b4260 554
602adf40
YS
555 header->image_size = le64_to_cpu(ondisk->image_size);
556 header->obj_order = ondisk->options.order;
557 header->crypt_type = ondisk->options.crypt_type;
558 header->comp_type = ondisk->options.comp_type;
6a52325f
AE
559 header->total_snaps = snap_count;
560
28cb775d
AE
561 /*
562 * If the number of snapshot ids provided by the caller
563 * doesn't match the number in the entire context there's
564 * no point in going further. Caller will try again after
565 * getting an updated snapshot context from the server.
566 */
567 if (allocated_snaps != snap_count)
568 return 0;
6a52325f
AE
569
570 size = sizeof (struct ceph_snap_context);
571 size += snap_count * sizeof (header->snapc->snaps[0]);
572 header->snapc = kzalloc(size, GFP_KERNEL);
573 if (!header->snapc)
574 goto out_err;
602adf40
YS
575
576 atomic_set(&header->snapc->nref, 1);
505cbb9b 577 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 578 header->snapc->num_snaps = snap_count;
602adf40 579
28cb775d
AE
580 /* Fill in the snapshot information */
581
582 if (snap_count) {
583 u32 i;
ccece235 584
602adf40
YS
585 for (i = 0; i < snap_count; i++) {
586 header->snapc->snaps[i] =
587 le64_to_cpu(ondisk->snaps[i].id);
588 header->snap_sizes[i] =
589 le64_to_cpu(ondisk->snaps[i].image_size);
590 }
591
592 /* copy snapshot names */
ccece235 593 memcpy(header->snap_names, &ondisk->snaps[snap_count],
602adf40
YS
594 header->snap_names_len);
595 }
596
597 return 0;
598
6a52325f 599out_err:
849b4260 600 kfree(header->snap_sizes);
ccece235 601 header->snap_sizes = NULL;
602adf40 602 kfree(header->snap_names);
ccece235 603 header->snap_names = NULL;
d78fd7ae 604 header->snap_names_len = 0;
6a52325f
AE
605 kfree(header->object_prefix);
606 header->object_prefix = NULL;
ccece235 607
00f1f36f 608 return -ENOMEM;
602adf40
YS
609}
610
602adf40
YS
611static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
612 u64 *seq, u64 *size)
613{
614 int i;
615 char *p = header->snap_names;
616
00f1f36f
AE
617 for (i = 0; i < header->total_snaps; i++) {
618 if (!strcmp(snap_name, p)) {
602adf40 619
00f1f36f 620 /* Found it. Pass back its id and/or size */
602adf40 621
00f1f36f
AE
622 if (seq)
623 *seq = header->snapc->snaps[i];
624 if (size)
625 *size = header->snap_sizes[i];
626 return i;
627 }
628 p += strlen(p) + 1; /* Skip ahead to the next name */
629 }
630 return -ENOENT;
602adf40
YS
631}
632
0ce1a794 633static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
602adf40 634{
78dc447d 635 int ret;
602adf40 636
0ce1a794 637 down_write(&rbd_dev->header_rwsem);
602adf40 638
0ce1a794 639 if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
cc9d734c 640 sizeof (RBD_SNAP_HEAD_NAME))) {
0ce1a794 641 rbd_dev->snap_id = CEPH_NOSNAP;
e88a36ec 642 rbd_dev->snap_exists = false;
0ce1a794 643 rbd_dev->read_only = 0;
602adf40 644 if (size)
78dc447d 645 *size = rbd_dev->header.image_size;
602adf40 646 } else {
78dc447d
AE
647 u64 snap_id = 0;
648
649 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
650 &snap_id, size);
602adf40
YS
651 if (ret < 0)
652 goto done;
78dc447d 653 rbd_dev->snap_id = snap_id;
e88a36ec 654 rbd_dev->snap_exists = true;
0ce1a794 655 rbd_dev->read_only = 1;
602adf40
YS
656 }
657
658 ret = 0;
659done:
0ce1a794 660 up_write(&rbd_dev->header_rwsem);
602adf40
YS
661 return ret;
662}
663
664static void rbd_header_free(struct rbd_image_header *header)
665{
849b4260 666 kfree(header->object_prefix);
d78fd7ae 667 header->object_prefix = NULL;
602adf40 668 kfree(header->snap_sizes);
d78fd7ae 669 header->snap_sizes = NULL;
849b4260 670 kfree(header->snap_names);
d78fd7ae
AE
671 header->snap_names = NULL;
672 header->snap_names_len = 0;
d1d25646 673 ceph_put_snap_context(header->snapc);
d78fd7ae 674 header->snapc = NULL;
602adf40
YS
675}
676
677/*
678 * get the actual striped segment name, offset and length
679 */
680static u64 rbd_get_segment(struct rbd_image_header *header,
ca1e49a6 681 const char *object_prefix,
602adf40
YS
682 u64 ofs, u64 len,
683 char *seg_name, u64 *segofs)
684{
685 u64 seg = ofs >> header->obj_order;
686
687 if (seg_name)
688 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
ca1e49a6 689 "%s.%012llx", object_prefix, seg);
602adf40
YS
690
691 ofs = ofs & ((1 << header->obj_order) - 1);
692 len = min_t(u64, len, (1 << header->obj_order) - ofs);
693
694 if (segofs)
695 *segofs = ofs;
696
697 return len;
698}
699
1fec7093
YS
700static int rbd_get_num_segments(struct rbd_image_header *header,
701 u64 ofs, u64 len)
702{
703 u64 start_seg = ofs >> header->obj_order;
704 u64 end_seg = (ofs + len - 1) >> header->obj_order;
705 return end_seg - start_seg + 1;
706}
707
029bcbd8
JD
708/*
709 * returns the size of an object in the image
710 */
711static u64 rbd_obj_bytes(struct rbd_image_header *header)
712{
713 return 1 << header->obj_order;
714}
715
602adf40
YS
716/*
717 * bio helpers
718 */
719
720static void bio_chain_put(struct bio *chain)
721{
722 struct bio *tmp;
723
724 while (chain) {
725 tmp = chain;
726 chain = chain->bi_next;
727 bio_put(tmp);
728 }
729}
730
731/*
732 * zeros a bio chain, starting at specific offset
733 */
734static void zero_bio_chain(struct bio *chain, int start_ofs)
735{
736 struct bio_vec *bv;
737 unsigned long flags;
738 void *buf;
739 int i;
740 int pos = 0;
741
742 while (chain) {
743 bio_for_each_segment(bv, chain, i) {
744 if (pos + bv->bv_len > start_ofs) {
745 int remainder = max(start_ofs - pos, 0);
746 buf = bvec_kmap_irq(bv, &flags);
747 memset(buf + remainder, 0,
748 bv->bv_len - remainder);
85b5aaa6 749 bvec_kunmap_irq(buf, &flags);
602adf40
YS
750 }
751 pos += bv->bv_len;
752 }
753
754 chain = chain->bi_next;
755 }
756}
757
758/*
759 * bio_chain_clone - clone a chain of bios up to a certain length.
760 * might return a bio_pair that will need to be released.
761 */
762static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
763 struct bio_pair **bp,
764 int len, gfp_t gfpmask)
765{
766 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
767 int total = 0;
768
769 if (*bp) {
770 bio_pair_release(*bp);
771 *bp = NULL;
772 }
773
774 while (old_chain && (total < len)) {
775 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
776 if (!tmp)
777 goto err_out;
778
779 if (total + old_chain->bi_size > len) {
780 struct bio_pair *bp;
781
782 /*
783 * this split can only happen with a single paged bio,
784 * split_bio will BUG_ON if this is not the case
785 */
786 dout("bio_chain_clone split! total=%d remaining=%d"
bd919d45
AE
787 "bi_size=%u\n",
788 total, len - total, old_chain->bi_size);
602adf40
YS
789
790 /* split the bio. We'll release it either in the next
791 call, or it will have to be released outside */
593a9e7b 792 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
602adf40
YS
793 if (!bp)
794 goto err_out;
795
796 __bio_clone(tmp, &bp->bio1);
797
798 *next = &bp->bio2;
799 } else {
800 __bio_clone(tmp, old_chain);
801 *next = old_chain->bi_next;
802 }
803
804 tmp->bi_bdev = NULL;
805 gfpmask &= ~__GFP_WAIT;
806 tmp->bi_next = NULL;
807
808 if (!new_chain) {
809 new_chain = tail = tmp;
810 } else {
811 tail->bi_next = tmp;
812 tail = tmp;
813 }
814 old_chain = old_chain->bi_next;
815
816 total += tmp->bi_size;
817 }
818
819 BUG_ON(total < len);
820
821 if (tail)
822 tail->bi_next = NULL;
823
824 *old = old_chain;
825
826 return new_chain;
827
828err_out:
829 dout("bio_chain_clone with err\n");
830 bio_chain_put(new_chain);
831 return NULL;
832}
833
834/*
835 * helpers for osd request op vectors.
836 */
57cfc106
AE
837static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
838 int opcode, u32 payload_len)
602adf40 839{
57cfc106
AE
840 struct ceph_osd_req_op *ops;
841
842 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
843 if (!ops)
844 return NULL;
845
846 ops[0].op = opcode;
847
602adf40
YS
848 /*
849 * op extent offset and length will be set later on
850 * in calc_raw_layout()
851 */
57cfc106
AE
852 ops[0].payload_len = payload_len;
853
854 return ops;
602adf40
YS
855}
856
857static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
858{
859 kfree(ops);
860}
861
1fec7093
YS
862static void rbd_coll_end_req_index(struct request *rq,
863 struct rbd_req_coll *coll,
864 int index,
865 int ret, u64 len)
866{
867 struct request_queue *q;
868 int min, max, i;
869
bd919d45
AE
870 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
871 coll, index, ret, (unsigned long long) len);
1fec7093
YS
872
873 if (!rq)
874 return;
875
876 if (!coll) {
877 blk_end_request(rq, ret, len);
878 return;
879 }
880
881 q = rq->q;
882
883 spin_lock_irq(q->queue_lock);
884 coll->status[index].done = 1;
885 coll->status[index].rc = ret;
886 coll->status[index].bytes = len;
887 max = min = coll->num_done;
888 while (max < coll->total && coll->status[max].done)
889 max++;
890
891 for (i = min; i<max; i++) {
892 __blk_end_request(rq, coll->status[i].rc,
893 coll->status[i].bytes);
894 coll->num_done++;
895 kref_put(&coll->kref, rbd_coll_release);
896 }
897 spin_unlock_irq(q->queue_lock);
898}
899
900static void rbd_coll_end_req(struct rbd_request *req,
901 int ret, u64 len)
902{
903 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
904}
905
602adf40
YS
906/*
907 * Send ceph osd request
908 */
909static int rbd_do_request(struct request *rq,
0ce1a794 910 struct rbd_device *rbd_dev,
602adf40
YS
911 struct ceph_snap_context *snapc,
912 u64 snapid,
aded07ea 913 const char *object_name, u64 ofs, u64 len,
602adf40
YS
914 struct bio *bio,
915 struct page **pages,
916 int num_pages,
917 int flags,
918 struct ceph_osd_req_op *ops,
1fec7093
YS
919 struct rbd_req_coll *coll,
920 int coll_index,
602adf40 921 void (*rbd_cb)(struct ceph_osd_request *req,
59c2be1e
YS
922 struct ceph_msg *msg),
923 struct ceph_osd_request **linger_req,
924 u64 *ver)
602adf40
YS
925{
926 struct ceph_osd_request *req;
927 struct ceph_file_layout *layout;
928 int ret;
929 u64 bno;
930 struct timespec mtime = CURRENT_TIME;
931 struct rbd_request *req_data;
932 struct ceph_osd_request_head *reqhead;
1dbb4399 933 struct ceph_osd_client *osdc;
602adf40 934
602adf40 935 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1fec7093
YS
936 if (!req_data) {
937 if (coll)
938 rbd_coll_end_req_index(rq, coll, coll_index,
939 -ENOMEM, len);
940 return -ENOMEM;
941 }
942
943 if (coll) {
944 req_data->coll = coll;
945 req_data->coll_index = coll_index;
946 }
602adf40 947
bd919d45
AE
948 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
949 (unsigned long long) ofs, (unsigned long long) len);
602adf40 950
0ce1a794 951 osdc = &rbd_dev->rbd_client->client->osdc;
1dbb4399
AE
952 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
953 false, GFP_NOIO, pages, bio);
4ad12621 954 if (!req) {
4ad12621 955 ret = -ENOMEM;
602adf40
YS
956 goto done_pages;
957 }
958
959 req->r_callback = rbd_cb;
960
961 req_data->rq = rq;
962 req_data->bio = bio;
963 req_data->pages = pages;
964 req_data->len = len;
965
966 req->r_priv = req_data;
967
968 reqhead = req->r_request->front.iov_base;
969 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
970
aded07ea 971 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
602adf40
YS
972 req->r_oid_len = strlen(req->r_oid);
973
974 layout = &req->r_file_layout;
975 memset(layout, 0, sizeof(*layout));
976 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
977 layout->fl_stripe_count = cpu_to_le32(1);
978 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
0ce1a794 979 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
1dbb4399
AE
980 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
981 req, ops);
602adf40
YS
982
983 ceph_osdc_build_request(req, ofs, &len,
984 ops,
985 snapc,
986 &mtime,
987 req->r_oid, req->r_oid_len);
602adf40 988
59c2be1e 989 if (linger_req) {
1dbb4399 990 ceph_osdc_set_request_linger(osdc, req);
59c2be1e
YS
991 *linger_req = req;
992 }
993
1dbb4399 994 ret = ceph_osdc_start_request(osdc, req, false);
602adf40
YS
995 if (ret < 0)
996 goto done_err;
997
998 if (!rbd_cb) {
1dbb4399 999 ret = ceph_osdc_wait_request(osdc, req);
59c2be1e
YS
1000 if (ver)
1001 *ver = le64_to_cpu(req->r_reassert_version.version);
bd919d45
AE
1002 dout("reassert_ver=%llu\n",
1003 (unsigned long long)
1004 le64_to_cpu(req->r_reassert_version.version));
602adf40
YS
1005 ceph_osdc_put_request(req);
1006 }
1007 return ret;
1008
1009done_err:
1010 bio_chain_put(req_data->bio);
1011 ceph_osdc_put_request(req);
1012done_pages:
1fec7093 1013 rbd_coll_end_req(req_data, ret, len);
602adf40 1014 kfree(req_data);
602adf40
YS
1015 return ret;
1016}
1017
1018/*
1019 * Ceph osd op callback
1020 */
1021static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1022{
1023 struct rbd_request *req_data = req->r_priv;
1024 struct ceph_osd_reply_head *replyhead;
1025 struct ceph_osd_op *op;
1026 __s32 rc;
1027 u64 bytes;
1028 int read_op;
1029
1030 /* parse reply */
1031 replyhead = msg->front.iov_base;
1032 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1033 op = (void *)(replyhead + 1);
1034 rc = le32_to_cpu(replyhead->result);
1035 bytes = le64_to_cpu(op->extent.length);
895cfcc8 1036 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
602adf40 1037
bd919d45
AE
1038 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1039 (unsigned long long) bytes, read_op, (int) rc);
602adf40
YS
1040
1041 if (rc == -ENOENT && read_op) {
1042 zero_bio_chain(req_data->bio, 0);
1043 rc = 0;
1044 } else if (rc == 0 && read_op && bytes < req_data->len) {
1045 zero_bio_chain(req_data->bio, bytes);
1046 bytes = req_data->len;
1047 }
1048
1fec7093 1049 rbd_coll_end_req(req_data, rc, bytes);
602adf40
YS
1050
1051 if (req_data->bio)
1052 bio_chain_put(req_data->bio);
1053
1054 ceph_osdc_put_request(req);
1055 kfree(req_data);
1056}
1057
59c2be1e
YS
1058static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1059{
1060 ceph_osdc_put_request(req);
1061}
1062
602adf40
YS
1063/*
1064 * Do a synchronous ceph osd operation
1065 */
0ce1a794 1066static int rbd_req_sync_op(struct rbd_device *rbd_dev,
602adf40
YS
1067 struct ceph_snap_context *snapc,
1068 u64 snapid,
602adf40 1069 int flags,
913d2fdc 1070 struct ceph_osd_req_op *ops,
aded07ea 1071 const char *object_name,
602adf40 1072 u64 ofs, u64 len,
59c2be1e
YS
1073 char *buf,
1074 struct ceph_osd_request **linger_req,
1075 u64 *ver)
602adf40
YS
1076{
1077 int ret;
1078 struct page **pages;
1079 int num_pages;
913d2fdc
AE
1080
1081 BUG_ON(ops == NULL);
602adf40
YS
1082
1083 num_pages = calc_pages_for(ofs , len);
1084 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
b8d0638a
DC
1085 if (IS_ERR(pages))
1086 return PTR_ERR(pages);
602adf40 1087
0ce1a794 1088 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
aded07ea 1089 object_name, ofs, len, NULL,
602adf40
YS
1090 pages, num_pages,
1091 flags,
1092 ops,
1fec7093 1093 NULL, 0,
59c2be1e
YS
1094 NULL,
1095 linger_req, ver);
602adf40 1096 if (ret < 0)
913d2fdc 1097 goto done;
602adf40
YS
1098
1099 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1100 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1101
602adf40
YS
1102done:
1103 ceph_release_page_vector(pages, num_pages);
1104 return ret;
1105}
1106
1107/*
1108 * Do an asynchronous ceph osd operation
1109 */
1110static int rbd_do_op(struct request *rq,
0ce1a794 1111 struct rbd_device *rbd_dev,
602adf40
YS
1112 struct ceph_snap_context *snapc,
1113 u64 snapid,
d1f57ea6 1114 int opcode, int flags,
602adf40 1115 u64 ofs, u64 len,
1fec7093
YS
1116 struct bio *bio,
1117 struct rbd_req_coll *coll,
1118 int coll_index)
602adf40
YS
1119{
1120 char *seg_name;
1121 u64 seg_ofs;
1122 u64 seg_len;
1123 int ret;
1124 struct ceph_osd_req_op *ops;
1125 u32 payload_len;
1126
1127 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1128 if (!seg_name)
1129 return -ENOMEM;
1130
1131 seg_len = rbd_get_segment(&rbd_dev->header,
ca1e49a6 1132 rbd_dev->header.object_prefix,
602adf40
YS
1133 ofs, len,
1134 seg_name, &seg_ofs);
602adf40
YS
1135
1136 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1137
57cfc106
AE
1138 ret = -ENOMEM;
1139 ops = rbd_create_rw_ops(1, opcode, payload_len);
1140 if (!ops)
602adf40
YS
1141 goto done;
1142
1143 /* we've taken care of segment sizes earlier when we
1144 cloned the bios. We should never have a segment
1145 truncated at this point */
1146 BUG_ON(seg_len < len);
1147
1148 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1149 seg_name, seg_ofs, seg_len,
1150 bio,
1151 NULL, 0,
1152 flags,
1153 ops,
1fec7093 1154 coll, coll_index,
59c2be1e 1155 rbd_req_cb, 0, NULL);
11f77002
SW
1156
1157 rbd_destroy_ops(ops);
602adf40
YS
1158done:
1159 kfree(seg_name);
1160 return ret;
1161}
1162
1163/*
1164 * Request async osd write
1165 */
1166static int rbd_req_write(struct request *rq,
1167 struct rbd_device *rbd_dev,
1168 struct ceph_snap_context *snapc,
1169 u64 ofs, u64 len,
1fec7093
YS
1170 struct bio *bio,
1171 struct rbd_req_coll *coll,
1172 int coll_index)
602adf40
YS
1173{
1174 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1175 CEPH_OSD_OP_WRITE,
1176 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1fec7093 1177 ofs, len, bio, coll, coll_index);
602adf40
YS
1178}
1179
1180/*
1181 * Request async osd read
1182 */
1183static int rbd_req_read(struct request *rq,
1184 struct rbd_device *rbd_dev,
1185 u64 snapid,
1186 u64 ofs, u64 len,
1fec7093
YS
1187 struct bio *bio,
1188 struct rbd_req_coll *coll,
1189 int coll_index)
602adf40
YS
1190{
1191 return rbd_do_op(rq, rbd_dev, NULL,
b06e6a6b 1192 snapid,
602adf40
YS
1193 CEPH_OSD_OP_READ,
1194 CEPH_OSD_FLAG_READ,
1fec7093 1195 ofs, len, bio, coll, coll_index);
602adf40
YS
1196}
1197
1198/*
1199 * Request sync osd read
1200 */
0ce1a794 1201static int rbd_req_sync_read(struct rbd_device *rbd_dev,
602adf40 1202 u64 snapid,
aded07ea 1203 const char *object_name,
602adf40 1204 u64 ofs, u64 len,
59c2be1e
YS
1205 char *buf,
1206 u64 *ver)
602adf40 1207{
913d2fdc
AE
1208 struct ceph_osd_req_op *ops;
1209 int ret;
1210
1211 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1212 if (!ops)
1213 return -ENOMEM;
1214
1215 ret = rbd_req_sync_op(rbd_dev, NULL,
b06e6a6b 1216 snapid,
602adf40 1217 CEPH_OSD_FLAG_READ,
913d2fdc
AE
1218 ops, object_name, ofs, len, buf, NULL, ver);
1219 rbd_destroy_ops(ops);
1220
1221 return ret;
602adf40
YS
1222}
1223
1224/*
59c2be1e
YS
1225 * Request sync osd watch
1226 */
0ce1a794 1227static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
59c2be1e 1228 u64 ver,
7f0a24d8 1229 u64 notify_id)
59c2be1e
YS
1230{
1231 struct ceph_osd_req_op *ops;
11f77002
SW
1232 int ret;
1233
57cfc106
AE
1234 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1235 if (!ops)
1236 return -ENOMEM;
59c2be1e 1237
a71b891b 1238 ops[0].watch.ver = cpu_to_le64(ver);
59c2be1e
YS
1239 ops[0].watch.cookie = notify_id;
1240 ops[0].watch.flag = 0;
1241
0ce1a794 1242 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
7f0a24d8 1243 rbd_dev->header_name, 0, 0, NULL,
ad4f232f 1244 NULL, 0,
59c2be1e
YS
1245 CEPH_OSD_FLAG_READ,
1246 ops,
1fec7093 1247 NULL, 0,
59c2be1e
YS
1248 rbd_simple_req_cb, 0, NULL);
1249
1250 rbd_destroy_ops(ops);
1251 return ret;
1252}
1253
1254static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1255{
0ce1a794 1256 struct rbd_device *rbd_dev = (struct rbd_device *)data;
a71b891b 1257 u64 hver;
13143d2d
SW
1258 int rc;
1259
0ce1a794 1260 if (!rbd_dev)
59c2be1e
YS
1261 return;
1262
bd919d45
AE
1263 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1264 rbd_dev->header_name, (unsigned long long) notify_id,
1265 (unsigned int) opcode);
1fe5e993 1266 rc = rbd_refresh_header(rbd_dev, &hver);
13143d2d 1267 if (rc)
f0f8cef5 1268 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
0ce1a794 1269 " update snaps: %d\n", rbd_dev->major, rc);
59c2be1e 1270
7f0a24d8 1271 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
59c2be1e
YS
1272}
1273
1274/*
1275 * Request sync osd watch
1276 */
0e6f322d 1277static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
59c2be1e
YS
1278{
1279 struct ceph_osd_req_op *ops;
0ce1a794 1280 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
57cfc106 1281 int ret;
59c2be1e 1282
57cfc106
AE
1283 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1284 if (!ops)
1285 return -ENOMEM;
59c2be1e
YS
1286
1287 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
0ce1a794 1288 (void *)rbd_dev, &rbd_dev->watch_event);
59c2be1e
YS
1289 if (ret < 0)
1290 goto fail;
1291
0e6f322d 1292 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
0ce1a794 1293 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
59c2be1e
YS
1294 ops[0].watch.flag = 1;
1295
0ce1a794 1296 ret = rbd_req_sync_op(rbd_dev, NULL,
59c2be1e 1297 CEPH_NOSNAP,
59c2be1e
YS
1298 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1299 ops,
0e6f322d
AE
1300 rbd_dev->header_name,
1301 0, 0, NULL,
0ce1a794 1302 &rbd_dev->watch_request, NULL);
59c2be1e
YS
1303
1304 if (ret < 0)
1305 goto fail_event;
1306
1307 rbd_destroy_ops(ops);
1308 return 0;
1309
1310fail_event:
0ce1a794
AE
1311 ceph_osdc_cancel_event(rbd_dev->watch_event);
1312 rbd_dev->watch_event = NULL;
59c2be1e
YS
1313fail:
1314 rbd_destroy_ops(ops);
1315 return ret;
1316}
1317
79e3057c
YS
1318/*
1319 * Request sync osd unwatch
1320 */
070c633f 1321static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
79e3057c
YS
1322{
1323 struct ceph_osd_req_op *ops;
57cfc106 1324 int ret;
79e3057c 1325
57cfc106
AE
1326 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1327 if (!ops)
1328 return -ENOMEM;
79e3057c
YS
1329
1330 ops[0].watch.ver = 0;
0ce1a794 1331 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
79e3057c
YS
1332 ops[0].watch.flag = 0;
1333
0ce1a794 1334 ret = rbd_req_sync_op(rbd_dev, NULL,
79e3057c 1335 CEPH_NOSNAP,
79e3057c
YS
1336 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1337 ops,
070c633f
AE
1338 rbd_dev->header_name,
1339 0, 0, NULL, NULL, NULL);
1340
79e3057c
YS
1341
1342 rbd_destroy_ops(ops);
0ce1a794
AE
1343 ceph_osdc_cancel_event(rbd_dev->watch_event);
1344 rbd_dev->watch_event = NULL;
79e3057c
YS
1345 return ret;
1346}
1347
59c2be1e 1348struct rbd_notify_info {
0ce1a794 1349 struct rbd_device *rbd_dev;
59c2be1e
YS
1350};
1351
1352static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1353{
0ce1a794
AE
1354 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1355 if (!rbd_dev)
59c2be1e
YS
1356 return;
1357
bd919d45
AE
1358 dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1359 rbd_dev->header_name, (unsigned long long) notify_id,
1360 (unsigned int) opcode);
59c2be1e
YS
1361}
1362
1363/*
1364 * Request sync osd notify
1365 */
4cb16250 1366static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
59c2be1e
YS
1367{
1368 struct ceph_osd_req_op *ops;
0ce1a794 1369 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
59c2be1e
YS
1370 struct ceph_osd_event *event;
1371 struct rbd_notify_info info;
1372 int payload_len = sizeof(u32) + sizeof(u32);
1373 int ret;
1374
57cfc106
AE
1375 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1376 if (!ops)
1377 return -ENOMEM;
59c2be1e 1378
0ce1a794 1379 info.rbd_dev = rbd_dev;
59c2be1e
YS
1380
1381 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1382 (void *)&info, &event);
1383 if (ret < 0)
1384 goto fail;
1385
1386 ops[0].watch.ver = 1;
1387 ops[0].watch.flag = 1;
1388 ops[0].watch.cookie = event->cookie;
1389 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1390 ops[0].watch.timeout = 12;
1391
0ce1a794 1392 ret = rbd_req_sync_op(rbd_dev, NULL,
59c2be1e 1393 CEPH_NOSNAP,
59c2be1e
YS
1394 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1395 ops,
4cb16250
AE
1396 rbd_dev->header_name,
1397 0, 0, NULL, NULL, NULL);
59c2be1e
YS
1398 if (ret < 0)
1399 goto fail_event;
1400
1401 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1402 dout("ceph_osdc_wait_event returned %d\n", ret);
1403 rbd_destroy_ops(ops);
1404 return 0;
1405
1406fail_event:
1407 ceph_osdc_cancel_event(event);
1408fail:
1409 rbd_destroy_ops(ops);
1410 return ret;
1411}
1412
602adf40
YS
1413/*
1414 * Request sync osd read
1415 */
0ce1a794 1416static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
aded07ea
AE
1417 const char *object_name,
1418 const char *class_name,
1419 const char *method_name,
602adf40 1420 const char *data,
59c2be1e
YS
1421 int len,
1422 u64 *ver)
602adf40
YS
1423{
1424 struct ceph_osd_req_op *ops;
aded07ea
AE
1425 int class_name_len = strlen(class_name);
1426 int method_name_len = strlen(method_name);
57cfc106
AE
1427 int ret;
1428
1429 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
aded07ea 1430 class_name_len + method_name_len + len);
57cfc106
AE
1431 if (!ops)
1432 return -ENOMEM;
602adf40 1433
aded07ea
AE
1434 ops[0].cls.class_name = class_name;
1435 ops[0].cls.class_len = (__u8) class_name_len;
1436 ops[0].cls.method_name = method_name;
1437 ops[0].cls.method_len = (__u8) method_name_len;
602adf40
YS
1438 ops[0].cls.argc = 0;
1439 ops[0].cls.indata = data;
1440 ops[0].cls.indata_len = len;
1441
0ce1a794 1442 ret = rbd_req_sync_op(rbd_dev, NULL,
602adf40 1443 CEPH_NOSNAP,
602adf40
YS
1444 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1445 ops,
d1f57ea6 1446 object_name, 0, 0, NULL, NULL, ver);
602adf40
YS
1447
1448 rbd_destroy_ops(ops);
1449
1450 dout("cls_exec returned %d\n", ret);
1451 return ret;
1452}
1453
1fec7093
YS
1454static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1455{
1456 struct rbd_req_coll *coll =
1457 kzalloc(sizeof(struct rbd_req_coll) +
1458 sizeof(struct rbd_req_status) * num_reqs,
1459 GFP_ATOMIC);
1460
1461 if (!coll)
1462 return NULL;
1463 coll->total = num_reqs;
1464 kref_init(&coll->kref);
1465 return coll;
1466}
1467
602adf40
YS
1468/*
1469 * block device queue callback
1470 */
1471static void rbd_rq_fn(struct request_queue *q)
1472{
1473 struct rbd_device *rbd_dev = q->queuedata;
1474 struct request *rq;
1475 struct bio_pair *bp = NULL;
1476
00f1f36f 1477 while ((rq = blk_fetch_request(q))) {
602adf40
YS
1478 struct bio *bio;
1479 struct bio *rq_bio, *next_bio = NULL;
1480 bool do_write;
bd919d45
AE
1481 unsigned int size;
1482 u64 op_size = 0;
602adf40 1483 u64 ofs;
1fec7093
YS
1484 int num_segs, cur_seg = 0;
1485 struct rbd_req_coll *coll;
d1d25646 1486 struct ceph_snap_context *snapc;
602adf40
YS
1487
1488 /* peek at request from block layer */
1489 if (!rq)
1490 break;
1491
1492 dout("fetched request\n");
1493
1494 /* filter out block requests we don't understand */
1495 if ((rq->cmd_type != REQ_TYPE_FS)) {
1496 __blk_end_request_all(rq, 0);
00f1f36f 1497 continue;
602adf40
YS
1498 }
1499
1500 /* deduce our operation (read, write) */
1501 do_write = (rq_data_dir(rq) == WRITE);
1502
1503 size = blk_rq_bytes(rq);
593a9e7b 1504 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
602adf40
YS
1505 rq_bio = rq->bio;
1506 if (do_write && rbd_dev->read_only) {
1507 __blk_end_request_all(rq, -EROFS);
00f1f36f 1508 continue;
602adf40
YS
1509 }
1510
1511 spin_unlock_irq(q->queue_lock);
1512
d1d25646 1513 down_read(&rbd_dev->header_rwsem);
e88a36ec 1514
d1d25646 1515 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
e88a36ec 1516 up_read(&rbd_dev->header_rwsem);
d1d25646
JD
1517 dout("request for non-existent snapshot");
1518 spin_lock_irq(q->queue_lock);
1519 __blk_end_request_all(rq, -ENXIO);
1520 continue;
e88a36ec
JD
1521 }
1522
d1d25646
JD
1523 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1524
1525 up_read(&rbd_dev->header_rwsem);
1526
602adf40
YS
1527 dout("%s 0x%x bytes at 0x%llx\n",
1528 do_write ? "write" : "read",
bd919d45 1529 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
602adf40 1530
1fec7093
YS
1531 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1532 coll = rbd_alloc_coll(num_segs);
1533 if (!coll) {
1534 spin_lock_irq(q->queue_lock);
1535 __blk_end_request_all(rq, -ENOMEM);
d1d25646 1536 ceph_put_snap_context(snapc);
00f1f36f 1537 continue;
1fec7093
YS
1538 }
1539
602adf40
YS
1540 do {
1541 /* a bio clone to be passed down to OSD req */
bd919d45 1542 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
602adf40 1543 op_size = rbd_get_segment(&rbd_dev->header,
ca1e49a6 1544 rbd_dev->header.object_prefix,
602adf40
YS
1545 ofs, size,
1546 NULL, NULL);
1fec7093 1547 kref_get(&coll->kref);
602adf40
YS
1548 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1549 op_size, GFP_ATOMIC);
1550 if (!bio) {
1fec7093
YS
1551 rbd_coll_end_req_index(rq, coll, cur_seg,
1552 -ENOMEM, op_size);
1553 goto next_seg;
602adf40
YS
1554 }
1555
1fec7093 1556
602adf40
YS
1557 /* init OSD command: write or read */
1558 if (do_write)
1559 rbd_req_write(rq, rbd_dev,
d1d25646 1560 snapc,
602adf40 1561 ofs,
1fec7093
YS
1562 op_size, bio,
1563 coll, cur_seg);
602adf40
YS
1564 else
1565 rbd_req_read(rq, rbd_dev,
77dfe99f 1566 rbd_dev->snap_id,
602adf40 1567 ofs,
1fec7093
YS
1568 op_size, bio,
1569 coll, cur_seg);
602adf40 1570
1fec7093 1571next_seg:
602adf40
YS
1572 size -= op_size;
1573 ofs += op_size;
1574
1fec7093 1575 cur_seg++;
602adf40
YS
1576 rq_bio = next_bio;
1577 } while (size > 0);
1fec7093 1578 kref_put(&coll->kref, rbd_coll_release);
602adf40
YS
1579
1580 if (bp)
1581 bio_pair_release(bp);
602adf40 1582 spin_lock_irq(q->queue_lock);
d1d25646
JD
1583
1584 ceph_put_snap_context(snapc);
602adf40
YS
1585 }
1586}
1587
1588/*
1589 * a queue callback. Makes sure that we don't create a bio that spans across
1590 * multiple osd objects. One exception would be with a single page bios,
1591 * which we handle later at bio_chain_clone
1592 */
1593static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1594 struct bio_vec *bvec)
1595{
1596 struct rbd_device *rbd_dev = q->queuedata;
593a9e7b
AE
1597 unsigned int chunk_sectors;
1598 sector_t sector;
1599 unsigned int bio_sectors;
602adf40
YS
1600 int max;
1601
593a9e7b
AE
1602 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1603 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1604 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1605
602adf40 1606 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
593a9e7b 1607 + bio_sectors)) << SECTOR_SHIFT;
602adf40
YS
1608 if (max < 0)
1609 max = 0; /* bio_add cannot handle a negative return */
1610 if (max <= bvec->bv_len && bio_sectors == 0)
1611 return bvec->bv_len;
1612 return max;
1613}
1614
1615static void rbd_free_disk(struct rbd_device *rbd_dev)
1616{
1617 struct gendisk *disk = rbd_dev->disk;
1618
1619 if (!disk)
1620 return;
1621
1622 rbd_header_free(&rbd_dev->header);
1623
1624 if (disk->flags & GENHD_FL_UP)
1625 del_gendisk(disk);
1626 if (disk->queue)
1627 blk_cleanup_queue(disk->queue);
1628 put_disk(disk);
1629}
1630
1631/*
1632 * reload the ondisk the header
1633 */
1634static int rbd_read_header(struct rbd_device *rbd_dev,
1635 struct rbd_image_header *header)
1636{
1637 ssize_t rc;
1638 struct rbd_image_header_ondisk *dh;
50f7c4c9 1639 u32 snap_count = 0;
59c2be1e 1640 u64 ver;
00f1f36f 1641 size_t len;
602adf40 1642
00f1f36f
AE
1643 /*
1644 * First reads the fixed-size header to determine the number
1645 * of snapshots, then re-reads it, along with all snapshot
1646 * records as well as their stored names.
1647 */
1648 len = sizeof (*dh);
602adf40 1649 while (1) {
602adf40
YS
1650 dh = kmalloc(len, GFP_KERNEL);
1651 if (!dh)
1652 return -ENOMEM;
1653
1654 rc = rbd_req_sync_read(rbd_dev,
9a5d690b 1655 CEPH_NOSNAP,
0bed54dc 1656 rbd_dev->header_name,
602adf40 1657 0, len,
59c2be1e 1658 (char *)dh, &ver);
602adf40
YS
1659 if (rc < 0)
1660 goto out_dh;
1661
ed63f4fd 1662 rc = rbd_header_from_disk(header, dh, snap_count);
81e759fb 1663 if (rc < 0) {
00f1f36f 1664 if (rc == -ENXIO)
81e759fb 1665 pr_warning("unrecognized header format"
0bed54dc
AE
1666 " for image %s\n",
1667 rbd_dev->image_name);
602adf40 1668 goto out_dh;
81e759fb 1669 }
602adf40 1670
00f1f36f
AE
1671 if (snap_count == header->total_snaps)
1672 break;
1673
1674 snap_count = header->total_snaps;
1675 len = sizeof (*dh) +
1676 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1677 header->snap_names_len;
1678
1679 rbd_header_free(header);
1680 kfree(dh);
602adf40 1681 }
59c2be1e 1682 header->obj_version = ver;
602adf40
YS
1683
1684out_dh:
1685 kfree(dh);
1686 return rc;
1687}
1688
1689/*
1690 * create a snapshot
1691 */
0ce1a794 1692static int rbd_header_add_snap(struct rbd_device *rbd_dev,
602adf40
YS
1693 const char *snap_name,
1694 gfp_t gfp_flags)
1695{
1696 int name_len = strlen(snap_name);
1697 u64 new_snapid;
1698 int ret;
916d4d67 1699 void *data, *p, *e;
1dbb4399 1700 struct ceph_mon_client *monc;
602adf40
YS
1701
1702 /* we should create a snapshot only if we're pointing at the head */
0ce1a794 1703 if (rbd_dev->snap_id != CEPH_NOSNAP)
602adf40
YS
1704 return -EINVAL;
1705
0ce1a794
AE
1706 monc = &rbd_dev->rbd_client->client->monc;
1707 ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
bd919d45 1708 dout("created snapid=%llu\n", (unsigned long long) new_snapid);
602adf40
YS
1709 if (ret < 0)
1710 return ret;
1711
1712 data = kmalloc(name_len + 16, gfp_flags);
1713 if (!data)
1714 return -ENOMEM;
1715
916d4d67
SW
1716 p = data;
1717 e = data + name_len + 16;
602adf40 1718
916d4d67
SW
1719 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1720 ceph_encode_64_safe(&p, e, new_snapid, bad);
602adf40 1721
0bed54dc 1722 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
0ce1a794 1723 "rbd", "snap_add",
d67d4be5 1724 data, p - data, NULL);
602adf40 1725
916d4d67 1726 kfree(data);
602adf40 1727
505cbb9b 1728 return ret < 0 ? ret : 0;
602adf40
YS
1729bad:
1730 return -ERANGE;
1731}
1732
dfc5606d
YS
1733static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1734{
1735 struct rbd_snap *snap;
a0593290 1736 struct rbd_snap *next;
dfc5606d 1737
a0593290 1738 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
14e7085d 1739 __rbd_remove_snap_dev(snap);
dfc5606d
YS
1740}
1741
602adf40
YS
1742/*
1743 * only read the first part of the ondisk header, without the snaps info
1744 */
b813623a 1745static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
602adf40
YS
1746{
1747 int ret;
1748 struct rbd_image_header h;
602adf40
YS
1749
1750 ret = rbd_read_header(rbd_dev, &h);
1751 if (ret < 0)
1752 return ret;
1753
a51aa0c0
JD
1754 down_write(&rbd_dev->header_rwsem);
1755
9db4b3e3 1756 /* resized? */
474ef7ce
JD
1757 if (rbd_dev->snap_id == CEPH_NOSNAP) {
1758 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1759
1760 dout("setting size to %llu sectors", (unsigned long long) size);
1761 set_capacity(rbd_dev->disk, size);
1762 }
9db4b3e3 1763
849b4260 1764 /* rbd_dev->header.object_prefix shouldn't change */
602adf40 1765 kfree(rbd_dev->header.snap_sizes);
849b4260 1766 kfree(rbd_dev->header.snap_names);
d1d25646
JD
1767 /* osd requests may still refer to snapc */
1768 ceph_put_snap_context(rbd_dev->header.snapc);
602adf40 1769
b813623a
AE
1770 if (hver)
1771 *hver = h.obj_version;
a71b891b 1772 rbd_dev->header.obj_version = h.obj_version;
93a24e08 1773 rbd_dev->header.image_size = h.image_size;
602adf40
YS
1774 rbd_dev->header.total_snaps = h.total_snaps;
1775 rbd_dev->header.snapc = h.snapc;
1776 rbd_dev->header.snap_names = h.snap_names;
dfc5606d 1777 rbd_dev->header.snap_names_len = h.snap_names_len;
602adf40 1778 rbd_dev->header.snap_sizes = h.snap_sizes;
849b4260
AE
1779 /* Free the extra copy of the object prefix */
1780 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1781 kfree(h.object_prefix);
1782
dfc5606d
YS
1783 ret = __rbd_init_snaps_header(rbd_dev);
1784
c666601a 1785 up_write(&rbd_dev->header_rwsem);
602adf40 1786
dfc5606d 1787 return ret;
602adf40
YS
1788}
1789
1fe5e993
AE
1790static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1791{
1792 int ret;
1793
1794 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1795 ret = __rbd_refresh_header(rbd_dev, hver);
1796 mutex_unlock(&ctl_mutex);
1797
1798 return ret;
1799}
1800
602adf40
YS
1801static int rbd_init_disk(struct rbd_device *rbd_dev)
1802{
1803 struct gendisk *disk;
1804 struct request_queue *q;
1805 int rc;
593a9e7b 1806 u64 segment_size;
602adf40
YS
1807 u64 total_size = 0;
1808
1809 /* contact OSD, request size info about the object being mapped */
1810 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1811 if (rc)
1812 return rc;
1813
dfc5606d
YS
1814 /* no need to lock here, as rbd_dev is not registered yet */
1815 rc = __rbd_init_snaps_header(rbd_dev);
1816 if (rc)
1817 return rc;
1818
cc9d734c 1819 rc = rbd_header_set_snap(rbd_dev, &total_size);
602adf40
YS
1820 if (rc)
1821 return rc;
1822
1823 /* create gendisk info */
1824 rc = -ENOMEM;
1825 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1826 if (!disk)
1827 goto out;
1828
f0f8cef5 1829 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 1830 rbd_dev->dev_id);
602adf40
YS
1831 disk->major = rbd_dev->major;
1832 disk->first_minor = 0;
1833 disk->fops = &rbd_bd_ops;
1834 disk->private_data = rbd_dev;
1835
1836 /* init rq */
1837 rc = -ENOMEM;
1838 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1839 if (!q)
1840 goto out_disk;
029bcbd8 1841
593a9e7b
AE
1842 /* We use the default size, but let's be explicit about it. */
1843 blk_queue_physical_block_size(q, SECTOR_SIZE);
1844
029bcbd8 1845 /* set io sizes to object size */
593a9e7b
AE
1846 segment_size = rbd_obj_bytes(&rbd_dev->header);
1847 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1848 blk_queue_max_segment_size(q, segment_size);
1849 blk_queue_io_min(q, segment_size);
1850 blk_queue_io_opt(q, segment_size);
029bcbd8 1851
602adf40
YS
1852 blk_queue_merge_bvec(q, rbd_merge_bvec);
1853 disk->queue = q;
1854
1855 q->queuedata = rbd_dev;
1856
1857 rbd_dev->disk = disk;
1858 rbd_dev->q = q;
1859
1860 /* finally, announce the disk to the world */
593a9e7b 1861 set_capacity(disk, total_size / SECTOR_SIZE);
602adf40
YS
1862 add_disk(disk);
1863
1864 pr_info("%s: added with size 0x%llx\n",
1865 disk->disk_name, (unsigned long long)total_size);
1866 return 0;
1867
1868out_disk:
1869 put_disk(disk);
1870out:
1871 return rc;
1872}
1873
dfc5606d
YS
1874/*
1875 sysfs
1876*/
1877
593a9e7b
AE
1878static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1879{
1880 return container_of(dev, struct rbd_device, dev);
1881}
1882
dfc5606d
YS
1883static ssize_t rbd_size_show(struct device *dev,
1884 struct device_attribute *attr, char *buf)
1885{
593a9e7b 1886 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0
JD
1887 sector_t size;
1888
1889 down_read(&rbd_dev->header_rwsem);
1890 size = get_capacity(rbd_dev->disk);
1891 up_read(&rbd_dev->header_rwsem);
dfc5606d 1892
a51aa0c0 1893 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
dfc5606d
YS
1894}
1895
1896static ssize_t rbd_major_show(struct device *dev,
1897 struct device_attribute *attr, char *buf)
1898{
593a9e7b 1899 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 1900
dfc5606d
YS
1901 return sprintf(buf, "%d\n", rbd_dev->major);
1902}
1903
1904static ssize_t rbd_client_id_show(struct device *dev,
1905 struct device_attribute *attr, char *buf)
602adf40 1906{
593a9e7b 1907 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 1908
1dbb4399
AE
1909 return sprintf(buf, "client%lld\n",
1910 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
1911}
1912
dfc5606d
YS
1913static ssize_t rbd_pool_show(struct device *dev,
1914 struct device_attribute *attr, char *buf)
602adf40 1915{
593a9e7b 1916 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
1917
1918 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1919}
1920
9bb2f334
AE
1921static ssize_t rbd_pool_id_show(struct device *dev,
1922 struct device_attribute *attr, char *buf)
1923{
1924 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1925
1926 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1927}
1928
dfc5606d
YS
1929static ssize_t rbd_name_show(struct device *dev,
1930 struct device_attribute *attr, char *buf)
1931{
593a9e7b 1932 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 1933
0bed54dc 1934 return sprintf(buf, "%s\n", rbd_dev->image_name);
dfc5606d
YS
1935}
1936
1937static ssize_t rbd_snap_show(struct device *dev,
1938 struct device_attribute *attr,
1939 char *buf)
1940{
593a9e7b 1941 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
1942
1943 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1944}
1945
1946static ssize_t rbd_image_refresh(struct device *dev,
1947 struct device_attribute *attr,
1948 const char *buf,
1949 size_t size)
1950{
593a9e7b 1951 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 1952 int ret;
602adf40 1953
1fe5e993 1954 ret = rbd_refresh_header(rbd_dev, NULL);
b813623a
AE
1955
1956 return ret < 0 ? ret : size;
dfc5606d 1957}
602adf40 1958
dfc5606d
YS
1959static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1960static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1961static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1962static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 1963static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d
YS
1964static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1965static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1966static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1967static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
dfc5606d
YS
1968
1969static struct attribute *rbd_attrs[] = {
1970 &dev_attr_size.attr,
1971 &dev_attr_major.attr,
1972 &dev_attr_client_id.attr,
1973 &dev_attr_pool.attr,
9bb2f334 1974 &dev_attr_pool_id.attr,
dfc5606d
YS
1975 &dev_attr_name.attr,
1976 &dev_attr_current_snap.attr,
1977 &dev_attr_refresh.attr,
1978 &dev_attr_create_snap.attr,
dfc5606d
YS
1979 NULL
1980};
1981
1982static struct attribute_group rbd_attr_group = {
1983 .attrs = rbd_attrs,
1984};
1985
1986static const struct attribute_group *rbd_attr_groups[] = {
1987 &rbd_attr_group,
1988 NULL
1989};
1990
1991static void rbd_sysfs_dev_release(struct device *dev)
1992{
1993}
1994
1995static struct device_type rbd_device_type = {
1996 .name = "rbd",
1997 .groups = rbd_attr_groups,
1998 .release = rbd_sysfs_dev_release,
1999};
2000
2001
2002/*
2003 sysfs - snapshots
2004*/
2005
2006static ssize_t rbd_snap_size_show(struct device *dev,
2007 struct device_attribute *attr,
2008 char *buf)
2009{
2010 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2011
3591538f 2012 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
dfc5606d
YS
2013}
2014
2015static ssize_t rbd_snap_id_show(struct device *dev,
2016 struct device_attribute *attr,
2017 char *buf)
2018{
2019 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2020
3591538f 2021 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
dfc5606d
YS
2022}
2023
2024static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2025static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2026
2027static struct attribute *rbd_snap_attrs[] = {
2028 &dev_attr_snap_size.attr,
2029 &dev_attr_snap_id.attr,
2030 NULL,
2031};
2032
2033static struct attribute_group rbd_snap_attr_group = {
2034 .attrs = rbd_snap_attrs,
2035};
2036
2037static void rbd_snap_dev_release(struct device *dev)
2038{
2039 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2040 kfree(snap->name);
2041 kfree(snap);
2042}
2043
2044static const struct attribute_group *rbd_snap_attr_groups[] = {
2045 &rbd_snap_attr_group,
2046 NULL
2047};
2048
2049static struct device_type rbd_snap_device_type = {
2050 .groups = rbd_snap_attr_groups,
2051 .release = rbd_snap_dev_release,
2052};
2053
14e7085d 2054static void __rbd_remove_snap_dev(struct rbd_snap *snap)
dfc5606d
YS
2055{
2056 list_del(&snap->node);
2057 device_unregister(&snap->dev);
2058}
2059
14e7085d 2060static int rbd_register_snap_dev(struct rbd_snap *snap,
dfc5606d
YS
2061 struct device *parent)
2062{
2063 struct device *dev = &snap->dev;
2064 int ret;
2065
2066 dev->type = &rbd_snap_device_type;
2067 dev->parent = parent;
2068 dev->release = rbd_snap_dev_release;
2069 dev_set_name(dev, "snap_%s", snap->name);
2070 ret = device_register(dev);
2071
2072 return ret;
2073}
2074
4e891e0a
AE
2075static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2076 int i, const char *name)
dfc5606d 2077{
4e891e0a 2078 struct rbd_snap *snap;
dfc5606d 2079 int ret;
4e891e0a
AE
2080
2081 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
dfc5606d 2082 if (!snap)
4e891e0a
AE
2083 return ERR_PTR(-ENOMEM);
2084
2085 ret = -ENOMEM;
dfc5606d 2086 snap->name = kstrdup(name, GFP_KERNEL);
4e891e0a
AE
2087 if (!snap->name)
2088 goto err;
2089
dfc5606d
YS
2090 snap->size = rbd_dev->header.snap_sizes[i];
2091 snap->id = rbd_dev->header.snapc->snaps[i];
2092 if (device_is_registered(&rbd_dev->dev)) {
14e7085d 2093 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
dfc5606d
YS
2094 if (ret < 0)
2095 goto err;
2096 }
4e891e0a
AE
2097
2098 return snap;
2099
dfc5606d
YS
2100err:
2101 kfree(snap->name);
2102 kfree(snap);
4e891e0a
AE
2103
2104 return ERR_PTR(ret);
dfc5606d
YS
2105}
2106
2107/*
35938150
AE
2108 * Scan the rbd device's current snapshot list and compare it to the
2109 * newly-received snapshot context. Remove any existing snapshots
2110 * not present in the new snapshot context. Add a new snapshot for
2111 * any snaphots in the snapshot context not in the current list.
2112 * And verify there are no changes to snapshots we already know
2113 * about.
2114 *
2115 * Assumes the snapshots in the snapshot context are sorted by
2116 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2117 * are also maintained in that order.)
dfc5606d
YS
2118 */
2119static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2120{
35938150
AE
2121 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2122 const u32 snap_count = snapc->num_snaps;
2123 char *snap_name = rbd_dev->header.snap_names;
2124 struct list_head *head = &rbd_dev->snaps;
2125 struct list_head *links = head->next;
2126 u32 index = 0;
dfc5606d 2127
35938150
AE
2128 while (index < snap_count || links != head) {
2129 u64 snap_id;
2130 struct rbd_snap *snap;
dfc5606d 2131
35938150
AE
2132 snap_id = index < snap_count ? snapc->snaps[index]
2133 : CEPH_NOSNAP;
2134 snap = links != head ? list_entry(links, struct rbd_snap, node)
2135 : NULL;
2136 BUG_ON(snap && snap->id == CEPH_NOSNAP);
dfc5606d 2137
35938150
AE
2138 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2139 struct list_head *next = links->next;
dfc5606d 2140
35938150 2141 /* Existing snapshot not in the new snap context */
dfc5606d 2142
35938150 2143 if (rbd_dev->snap_id == snap->id)
e88a36ec 2144 rbd_dev->snap_exists = false;
35938150
AE
2145 __rbd_remove_snap_dev(snap);
2146
2147 /* Done with this list entry; advance */
2148
2149 links = next;
dfc5606d
YS
2150 continue;
2151 }
35938150
AE
2152
2153 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2154 struct rbd_snap *new_snap;
2155
2156 /* We haven't seen this snapshot before */
2157
2158 new_snap = __rbd_add_snap_dev(rbd_dev, index,
2159 snap_name);
2160 if (IS_ERR(new_snap))
2161 return PTR_ERR(new_snap);
2162
2163 /* New goes before existing, or at end of list */
2164
2165 if (snap)
2166 list_add_tail(&new_snap->node, &snap->node);
2167 else
2168 list_add(&new_snap->node, head);
2169 } else {
2170 /* Already have this one */
2171
2172 BUG_ON(snap->size != rbd_dev->header.snap_sizes[index]);
2173 BUG_ON(strcmp(snap->name, snap_name));
2174
2175 /* Done with this list entry; advance */
2176
2177 links = links->next;
dfc5606d 2178 }
35938150
AE
2179
2180 /* Advance to the next entry in the snapshot context */
2181
2182 index++;
2183 snap_name += strlen(snap_name) + 1;
dfc5606d
YS
2184 }
2185
2186 return 0;
2187}
2188
dfc5606d
YS
2189static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2190{
f0f8cef5 2191 int ret;
dfc5606d
YS
2192 struct device *dev;
2193 struct rbd_snap *snap;
2194
2195 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2196 dev = &rbd_dev->dev;
2197
2198 dev->bus = &rbd_bus_type;
2199 dev->type = &rbd_device_type;
2200 dev->parent = &rbd_root_dev;
2201 dev->release = rbd_dev_release;
de71a297 2202 dev_set_name(dev, "%d", rbd_dev->dev_id);
dfc5606d
YS
2203 ret = device_register(dev);
2204 if (ret < 0)
f0f8cef5 2205 goto out;
dfc5606d
YS
2206
2207 list_for_each_entry(snap, &rbd_dev->snaps, node) {
14e7085d 2208 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
dfc5606d 2209 if (ret < 0)
602adf40
YS
2210 break;
2211 }
f0f8cef5 2212out:
dfc5606d
YS
2213 mutex_unlock(&ctl_mutex);
2214 return ret;
602adf40
YS
2215}
2216
dfc5606d
YS
2217static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2218{
2219 device_unregister(&rbd_dev->dev);
2220}
2221
59c2be1e
YS
2222static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2223{
2224 int ret, rc;
2225
2226 do {
0e6f322d 2227 ret = rbd_req_sync_watch(rbd_dev);
59c2be1e 2228 if (ret == -ERANGE) {
1fe5e993 2229 rc = rbd_refresh_header(rbd_dev, NULL);
59c2be1e
YS
2230 if (rc < 0)
2231 return rc;
2232 }
2233 } while (ret == -ERANGE);
2234
2235 return ret;
2236}
2237
1ddbe94e
AE
2238static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2239
2240/*
499afd5b
AE
2241 * Get a unique rbd identifier for the given new rbd_dev, and add
2242 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 2243 */
499afd5b 2244static void rbd_id_get(struct rbd_device *rbd_dev)
b7f23c36 2245{
de71a297 2246 rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
499afd5b
AE
2247
2248 spin_lock(&rbd_dev_list_lock);
2249 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2250 spin_unlock(&rbd_dev_list_lock);
1ddbe94e 2251}
b7f23c36 2252
1ddbe94e 2253/*
499afd5b
AE
2254 * Remove an rbd_dev from the global list, and record that its
2255 * identifier is no longer in use.
1ddbe94e 2256 */
499afd5b 2257static void rbd_id_put(struct rbd_device *rbd_dev)
1ddbe94e 2258{
d184f6bf 2259 struct list_head *tmp;
de71a297 2260 int rbd_id = rbd_dev->dev_id;
d184f6bf
AE
2261 int max_id;
2262
2263 BUG_ON(rbd_id < 1);
499afd5b
AE
2264
2265 spin_lock(&rbd_dev_list_lock);
2266 list_del_init(&rbd_dev->node);
d184f6bf
AE
2267
2268 /*
2269 * If the id being "put" is not the current maximum, there
2270 * is nothing special we need to do.
2271 */
2272 if (rbd_id != atomic64_read(&rbd_id_max)) {
2273 spin_unlock(&rbd_dev_list_lock);
2274 return;
2275 }
2276
2277 /*
2278 * We need to update the current maximum id. Search the
2279 * list to find out what it is. We're more likely to find
2280 * the maximum at the end, so search the list backward.
2281 */
2282 max_id = 0;
2283 list_for_each_prev(tmp, &rbd_dev_list) {
2284 struct rbd_device *rbd_dev;
2285
2286 rbd_dev = list_entry(tmp, struct rbd_device, node);
2287 if (rbd_id > max_id)
2288 max_id = rbd_id;
2289 }
499afd5b 2290 spin_unlock(&rbd_dev_list_lock);
b7f23c36 2291
1ddbe94e 2292 /*
d184f6bf
AE
2293 * The max id could have been updated by rbd_id_get(), in
2294 * which case it now accurately reflects the new maximum.
2295 * Be careful not to overwrite the maximum value in that
2296 * case.
1ddbe94e 2297 */
d184f6bf 2298 atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
b7f23c36
AE
2299}
2300
e28fff26
AE
2301/*
2302 * Skips over white space at *buf, and updates *buf to point to the
2303 * first found non-space character (if any). Returns the length of
593a9e7b
AE
2304 * the token (string of non-white space characters) found. Note
2305 * that *buf must be terminated with '\0'.
e28fff26
AE
2306 */
2307static inline size_t next_token(const char **buf)
2308{
2309 /*
2310 * These are the characters that produce nonzero for
2311 * isspace() in the "C" and "POSIX" locales.
2312 */
2313 const char *spaces = " \f\n\r\t\v";
2314
2315 *buf += strspn(*buf, spaces); /* Find start of token */
2316
2317 return strcspn(*buf, spaces); /* Return token length */
2318}
2319
2320/*
2321 * Finds the next token in *buf, and if the provided token buffer is
2322 * big enough, copies the found token into it. The result, if
593a9e7b
AE
2323 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2324 * must be terminated with '\0' on entry.
e28fff26
AE
2325 *
2326 * Returns the length of the token found (not including the '\0').
2327 * Return value will be 0 if no token is found, and it will be >=
2328 * token_size if the token would not fit.
2329 *
593a9e7b 2330 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
2331 * found token. Note that this occurs even if the token buffer is
2332 * too small to hold it.
2333 */
2334static inline size_t copy_token(const char **buf,
2335 char *token,
2336 size_t token_size)
2337{
2338 size_t len;
2339
2340 len = next_token(buf);
2341 if (len < token_size) {
2342 memcpy(token, *buf, len);
2343 *(token + len) = '\0';
2344 }
2345 *buf += len;
2346
2347 return len;
2348}
2349
ea3352f4
AE
2350/*
2351 * Finds the next token in *buf, dynamically allocates a buffer big
2352 * enough to hold a copy of it, and copies the token into the new
2353 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2354 * that a duplicate buffer is created even for a zero-length token.
2355 *
2356 * Returns a pointer to the newly-allocated duplicate, or a null
2357 * pointer if memory for the duplicate was not available. If
2358 * the lenp argument is a non-null pointer, the length of the token
2359 * (not including the '\0') is returned in *lenp.
2360 *
2361 * If successful, the *buf pointer will be updated to point beyond
2362 * the end of the found token.
2363 *
2364 * Note: uses GFP_KERNEL for allocation.
2365 */
2366static inline char *dup_token(const char **buf, size_t *lenp)
2367{
2368 char *dup;
2369 size_t len;
2370
2371 len = next_token(buf);
2372 dup = kmalloc(len + 1, GFP_KERNEL);
2373 if (!dup)
2374 return NULL;
2375
2376 memcpy(dup, *buf, len);
2377 *(dup + len) = '\0';
2378 *buf += len;
2379
2380 if (lenp)
2381 *lenp = len;
2382
2383 return dup;
2384}
2385
a725f65e 2386/*
0bed54dc 2387 * This fills in the pool_name, image_name, image_name_len, snap_name,
a725f65e
AE
2388 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2389 * on the list of monitor addresses and other options provided via
2390 * /sys/bus/rbd/add.
d22f76e7
AE
2391 *
2392 * Note: rbd_dev is assumed to have been initially zero-filled.
a725f65e
AE
2393 */
2394static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2395 const char *buf,
7ef3214a 2396 const char **mon_addrs,
5214ecc4 2397 size_t *mon_addrs_size,
e28fff26 2398 char *options,
0bed54dc 2399 size_t options_size)
e28fff26 2400{
d22f76e7
AE
2401 size_t len;
2402 int ret;
e28fff26
AE
2403
2404 /* The first four tokens are required */
2405
7ef3214a
AE
2406 len = next_token(&buf);
2407 if (!len)
a725f65e 2408 return -EINVAL;
5214ecc4 2409 *mon_addrs_size = len + 1;
7ef3214a
AE
2410 *mon_addrs = buf;
2411
2412 buf += len;
a725f65e 2413
e28fff26
AE
2414 len = copy_token(&buf, options, options_size);
2415 if (!len || len >= options_size)
2416 return -EINVAL;
2417
bf3e5ae1 2418 ret = -ENOMEM;
d22f76e7
AE
2419 rbd_dev->pool_name = dup_token(&buf, NULL);
2420 if (!rbd_dev->pool_name)
d22f76e7 2421 goto out_err;
e28fff26 2422
0bed54dc
AE
2423 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2424 if (!rbd_dev->image_name)
bf3e5ae1 2425 goto out_err;
a725f65e 2426
cb8627c7
AE
2427 /* Create the name of the header object */
2428
0bed54dc 2429 rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
bf3e5ae1
AE
2430 + sizeof (RBD_SUFFIX),
2431 GFP_KERNEL);
0bed54dc 2432 if (!rbd_dev->header_name)
cb8627c7 2433 goto out_err;
0bed54dc 2434 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
a725f65e 2435
e28fff26 2436 /*
820a5f3e
AE
2437 * The snapshot name is optional. If none is is supplied,
2438 * we use the default value.
e28fff26 2439 */
820a5f3e
AE
2440 rbd_dev->snap_name = dup_token(&buf, &len);
2441 if (!rbd_dev->snap_name)
2442 goto out_err;
2443 if (!len) {
2444 /* Replace the empty name with the default */
2445 kfree(rbd_dev->snap_name);
2446 rbd_dev->snap_name
2447 = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2448 if (!rbd_dev->snap_name)
2449 goto out_err;
2450
e28fff26
AE
2451 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2452 sizeof (RBD_SNAP_HEAD_NAME));
849b4260 2453 }
e28fff26 2454
a725f65e 2455 return 0;
d22f76e7
AE
2456
2457out_err:
0bed54dc 2458 kfree(rbd_dev->header_name);
d78fd7ae 2459 rbd_dev->header_name = NULL;
0bed54dc 2460 kfree(rbd_dev->image_name);
d78fd7ae
AE
2461 rbd_dev->image_name = NULL;
2462 rbd_dev->image_name_len = 0;
d22f76e7
AE
2463 kfree(rbd_dev->pool_name);
2464 rbd_dev->pool_name = NULL;
2465
2466 return ret;
a725f65e
AE
2467}
2468
59c2be1e
YS
2469static ssize_t rbd_add(struct bus_type *bus,
2470 const char *buf,
2471 size_t count)
602adf40 2472{
cb8627c7
AE
2473 char *options;
2474 struct rbd_device *rbd_dev = NULL;
7ef3214a
AE
2475 const char *mon_addrs = NULL;
2476 size_t mon_addrs_size = 0;
27cc2594
AE
2477 struct ceph_osd_client *osdc;
2478 int rc = -ENOMEM;
602adf40
YS
2479
2480 if (!try_module_get(THIS_MODULE))
2481 return -ENODEV;
2482
60571c7d 2483 options = kmalloc(count, GFP_KERNEL);
602adf40 2484 if (!options)
27cc2594 2485 goto err_nomem;
cb8627c7
AE
2486 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2487 if (!rbd_dev)
2488 goto err_nomem;
602adf40
YS
2489
2490 /* static rbd_device initialization */
2491 spin_lock_init(&rbd_dev->lock);
2492 INIT_LIST_HEAD(&rbd_dev->node);
dfc5606d 2493 INIT_LIST_HEAD(&rbd_dev->snaps);
c666601a 2494 init_rwsem(&rbd_dev->header_rwsem);
602adf40 2495
d184f6bf 2496 /* generate unique id: find highest unique id, add one */
499afd5b 2497 rbd_id_get(rbd_dev);
602adf40 2498
a725f65e 2499 /* Fill in the device name, now that we have its id. */
81a89793
AE
2500 BUILD_BUG_ON(DEV_NAME_LEN
2501 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
de71a297 2502 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
a725f65e 2503
602adf40 2504 /* parse add command */
7ef3214a 2505 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
e28fff26 2506 options, count);
a725f65e 2507 if (rc)
f0f8cef5 2508 goto err_put_id;
e124a82f 2509
5214ecc4
AE
2510 rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2511 options);
d720bcb0
AE
2512 if (IS_ERR(rbd_dev->rbd_client)) {
2513 rc = PTR_ERR(rbd_dev->rbd_client);
d78fd7ae 2514 rbd_dev->rbd_client = NULL;
f0f8cef5 2515 goto err_put_id;
d720bcb0 2516 }
602adf40 2517
602adf40 2518 /* pick the pool */
1dbb4399 2519 osdc = &rbd_dev->rbd_client->client->osdc;
602adf40
YS
2520 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2521 if (rc < 0)
2522 goto err_out_client;
9bb2f334 2523 rbd_dev->pool_id = rc;
602adf40
YS
2524
2525 /* register our block device */
27cc2594
AE
2526 rc = register_blkdev(0, rbd_dev->name);
2527 if (rc < 0)
602adf40 2528 goto err_out_client;
27cc2594 2529 rbd_dev->major = rc;
602adf40 2530
dfc5606d
YS
2531 rc = rbd_bus_add_dev(rbd_dev);
2532 if (rc)
766fc439
YS
2533 goto err_out_blkdev;
2534
32eec68d
AE
2535 /*
2536 * At this point cleanup in the event of an error is the job
2537 * of the sysfs code (initiated by rbd_bus_del_dev()).
2538 *
2539 * Set up and announce blkdev mapping.
2540 */
602adf40
YS
2541 rc = rbd_init_disk(rbd_dev);
2542 if (rc)
766fc439 2543 goto err_out_bus;
602adf40 2544
59c2be1e
YS
2545 rc = rbd_init_watch_dev(rbd_dev);
2546 if (rc)
2547 goto err_out_bus;
2548
602adf40
YS
2549 return count;
2550
766fc439 2551err_out_bus:
766fc439
YS
2552 /* this will also clean up rest of rbd_dev stuff */
2553
2554 rbd_bus_del_dev(rbd_dev);
2555 kfree(options);
766fc439
YS
2556 return rc;
2557
602adf40
YS
2558err_out_blkdev:
2559 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2560err_out_client:
2561 rbd_put_client(rbd_dev);
f0f8cef5 2562err_put_id:
cb8627c7 2563 if (rbd_dev->pool_name) {
820a5f3e 2564 kfree(rbd_dev->snap_name);
0bed54dc
AE
2565 kfree(rbd_dev->header_name);
2566 kfree(rbd_dev->image_name);
cb8627c7
AE
2567 kfree(rbd_dev->pool_name);
2568 }
499afd5b 2569 rbd_id_put(rbd_dev);
27cc2594 2570err_nomem:
27cc2594 2571 kfree(rbd_dev);
cb8627c7 2572 kfree(options);
27cc2594 2573
602adf40
YS
2574 dout("Error adding device %s\n", buf);
2575 module_put(THIS_MODULE);
27cc2594
AE
2576
2577 return (ssize_t) rc;
602adf40
YS
2578}
2579
de71a297 2580static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
602adf40
YS
2581{
2582 struct list_head *tmp;
2583 struct rbd_device *rbd_dev;
2584
e124a82f 2585 spin_lock(&rbd_dev_list_lock);
602adf40
YS
2586 list_for_each(tmp, &rbd_dev_list) {
2587 rbd_dev = list_entry(tmp, struct rbd_device, node);
de71a297 2588 if (rbd_dev->dev_id == dev_id) {
e124a82f 2589 spin_unlock(&rbd_dev_list_lock);
602adf40 2590 return rbd_dev;
e124a82f 2591 }
602adf40 2592 }
e124a82f 2593 spin_unlock(&rbd_dev_list_lock);
602adf40
YS
2594 return NULL;
2595}
2596
dfc5606d 2597static void rbd_dev_release(struct device *dev)
602adf40 2598{
593a9e7b 2599 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 2600
1dbb4399
AE
2601 if (rbd_dev->watch_request) {
2602 struct ceph_client *client = rbd_dev->rbd_client->client;
2603
2604 ceph_osdc_unregister_linger_request(&client->osdc,
59c2be1e 2605 rbd_dev->watch_request);
1dbb4399 2606 }
59c2be1e 2607 if (rbd_dev->watch_event)
070c633f 2608 rbd_req_sync_unwatch(rbd_dev);
59c2be1e 2609
602adf40
YS
2610 rbd_put_client(rbd_dev);
2611
2612 /* clean up and free blkdev */
2613 rbd_free_disk(rbd_dev);
2614 unregister_blkdev(rbd_dev->major, rbd_dev->name);
32eec68d
AE
2615
2616 /* done with the id, and with the rbd_dev */
820a5f3e 2617 kfree(rbd_dev->snap_name);
0bed54dc 2618 kfree(rbd_dev->header_name);
d22f76e7 2619 kfree(rbd_dev->pool_name);
0bed54dc 2620 kfree(rbd_dev->image_name);
32eec68d 2621 rbd_id_put(rbd_dev);
602adf40
YS
2622 kfree(rbd_dev);
2623
2624 /* release module ref */
2625 module_put(THIS_MODULE);
602adf40
YS
2626}
2627
dfc5606d
YS
2628static ssize_t rbd_remove(struct bus_type *bus,
2629 const char *buf,
2630 size_t count)
602adf40
YS
2631{
2632 struct rbd_device *rbd_dev = NULL;
2633 int target_id, rc;
2634 unsigned long ul;
2635 int ret = count;
2636
2637 rc = strict_strtoul(buf, 10, &ul);
2638 if (rc)
2639 return rc;
2640
2641 /* convert to int; abort if we lost anything in the conversion */
2642 target_id = (int) ul;
2643 if (target_id != ul)
2644 return -EINVAL;
2645
2646 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2647
2648 rbd_dev = __rbd_get_dev(target_id);
2649 if (!rbd_dev) {
2650 ret = -ENOENT;
2651 goto done;
2652 }
2653
dfc5606d
YS
2654 __rbd_remove_all_snaps(rbd_dev);
2655 rbd_bus_del_dev(rbd_dev);
602adf40
YS
2656
2657done:
2658 mutex_unlock(&ctl_mutex);
2659 return ret;
2660}
2661
dfc5606d
YS
2662static ssize_t rbd_snap_add(struct device *dev,
2663 struct device_attribute *attr,
2664 const char *buf,
2665 size_t count)
602adf40 2666{
593a9e7b 2667 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
2668 int ret;
2669 char *name = kmalloc(count + 1, GFP_KERNEL);
602adf40
YS
2670 if (!name)
2671 return -ENOMEM;
2672
dfc5606d 2673 snprintf(name, count, "%s", buf);
602adf40
YS
2674
2675 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2676
602adf40
YS
2677 ret = rbd_header_add_snap(rbd_dev,
2678 name, GFP_KERNEL);
2679 if (ret < 0)
59c2be1e 2680 goto err_unlock;
602adf40 2681
b813623a 2682 ret = __rbd_refresh_header(rbd_dev, NULL);
602adf40 2683 if (ret < 0)
59c2be1e
YS
2684 goto err_unlock;
2685
2686 /* shouldn't hold ctl_mutex when notifying.. notify might
2687 trigger a watch callback that would need to get that mutex */
2688 mutex_unlock(&ctl_mutex);
2689
2690 /* make a best effort, don't error if failed */
4cb16250 2691 rbd_req_sync_notify(rbd_dev);
602adf40
YS
2692
2693 ret = count;
59c2be1e
YS
2694 kfree(name);
2695 return ret;
2696
2697err_unlock:
602adf40 2698 mutex_unlock(&ctl_mutex);
602adf40
YS
2699 kfree(name);
2700 return ret;
2701}
2702
602adf40
YS
2703/*
2704 * create control files in sysfs
dfc5606d 2705 * /sys/bus/rbd/...
602adf40
YS
2706 */
2707static int rbd_sysfs_init(void)
2708{
dfc5606d 2709 int ret;
602adf40 2710
fed4c143 2711 ret = device_register(&rbd_root_dev);
21079786 2712 if (ret < 0)
dfc5606d 2713 return ret;
602adf40 2714
fed4c143
AE
2715 ret = bus_register(&rbd_bus_type);
2716 if (ret < 0)
2717 device_unregister(&rbd_root_dev);
602adf40 2718
602adf40
YS
2719 return ret;
2720}
2721
2722static void rbd_sysfs_cleanup(void)
2723{
dfc5606d 2724 bus_unregister(&rbd_bus_type);
fed4c143 2725 device_unregister(&rbd_root_dev);
602adf40
YS
2726}
2727
2728int __init rbd_init(void)
2729{
2730 int rc;
2731
2732 rc = rbd_sysfs_init();
2733 if (rc)
2734 return rc;
f0f8cef5 2735 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
602adf40
YS
2736 return 0;
2737}
2738
2739void __exit rbd_exit(void)
2740{
2741 rbd_sysfs_cleanup();
2742}
2743
2744module_init(rbd_init);
2745module_exit(rbd_exit);
2746
2747MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2748MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2749MODULE_DESCRIPTION("rados block device");
2750
2751/* following authorship retained from original osdblk.c */
2752MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2753
2754MODULE_LICENSE("GPL");
This page took 0.278847 seconds and 5 git commands to generate.