ceph: remove unnecessary ceph_con_shutdown
[deliverable/linux.git] / fs / ceph / mds_client.c
CommitLineData
2f2dc053
SW
1#include "ceph_debug.h"
2
3#include <linux/wait.h>
4#include <linux/sched.h>
5
6#include "mds_client.h"
7#include "mon_client.h"
8#include "super.h"
9#include "messenger.h"
10#include "decode.h"
11
12/*
13 * A cluster of MDS (metadata server) daemons is responsible for
14 * managing the file system namespace (the directory hierarchy and
15 * inodes) and for coordinating shared access to storage. Metadata is
16 * partitioning hierarchically across a number of servers, and that
17 * partition varies over time as the cluster adjusts the distribution
18 * in order to balance load.
19 *
20 * The MDS client is primarily responsible to managing synchronous
21 * metadata requests for operations like open, unlink, and so forth.
22 * If there is a MDS failure, we find out about it when we (possibly
23 * request and) receive a new MDS map, and can resubmit affected
24 * requests.
25 *
26 * For the most part, though, we take advantage of a lossless
27 * communications channel to the MDS, and do not need to worry about
28 * timing out or resubmitting requests.
29 *
30 * We maintain a stateful "session" with each MDS we interact with.
31 * Within each session, we sent periodic heartbeat messages to ensure
32 * any capabilities or leases we have been issues remain valid. If
33 * the session times out and goes stale, our leases and capabilities
34 * are no longer valid.
35 */
36
37static void __wake_requests(struct ceph_mds_client *mdsc,
38 struct list_head *head);
39
40const static struct ceph_connection_operations mds_con_ops;
41
42
43/*
44 * mds reply parsing
45 */
46
47/*
48 * parse individual inode info
49 */
50static int parse_reply_info_in(void **p, void *end,
51 struct ceph_mds_reply_info_in *info)
52{
53 int err = -EIO;
54
55 info->in = *p;
56 *p += sizeof(struct ceph_mds_reply_inode) +
57 sizeof(*info->in->fragtree.splits) *
58 le32_to_cpu(info->in->fragtree.nsplits);
59
60 ceph_decode_32_safe(p, end, info->symlink_len, bad);
61 ceph_decode_need(p, end, info->symlink_len, bad);
62 info->symlink = *p;
63 *p += info->symlink_len;
64
65 ceph_decode_32_safe(p, end, info->xattr_len, bad);
66 ceph_decode_need(p, end, info->xattr_len, bad);
67 info->xattr_data = *p;
68 *p += info->xattr_len;
69 return 0;
70bad:
71 return err;
72}
73
74/*
75 * parse a normal reply, which may contain a (dir+)dentry and/or a
76 * target inode.
77 */
78static int parse_reply_info_trace(void **p, void *end,
79 struct ceph_mds_reply_info_parsed *info)
80{
81 int err;
82
83 if (info->head->is_dentry) {
84 err = parse_reply_info_in(p, end, &info->diri);
85 if (err < 0)
86 goto out_bad;
87
88 if (unlikely(*p + sizeof(*info->dirfrag) > end))
89 goto bad;
90 info->dirfrag = *p;
91 *p += sizeof(*info->dirfrag) +
92 sizeof(u32)*le32_to_cpu(info->dirfrag->ndist);
93 if (unlikely(*p > end))
94 goto bad;
95
96 ceph_decode_32_safe(p, end, info->dname_len, bad);
97 ceph_decode_need(p, end, info->dname_len, bad);
98 info->dname = *p;
99 *p += info->dname_len;
100 info->dlease = *p;
101 *p += sizeof(*info->dlease);
102 }
103
104 if (info->head->is_target) {
105 err = parse_reply_info_in(p, end, &info->targeti);
106 if (err < 0)
107 goto out_bad;
108 }
109
110 if (unlikely(*p != end))
111 goto bad;
112 return 0;
113
114bad:
115 err = -EIO;
116out_bad:
117 pr_err("problem parsing mds trace %d\n", err);
118 return err;
119}
120
121/*
122 * parse readdir results
123 */
124static int parse_reply_info_dir(void **p, void *end,
125 struct ceph_mds_reply_info_parsed *info)
126{
127 u32 num, i = 0;
128 int err;
129
130 info->dir_dir = *p;
131 if (*p + sizeof(*info->dir_dir) > end)
132 goto bad;
133 *p += sizeof(*info->dir_dir) +
134 sizeof(u32)*le32_to_cpu(info->dir_dir->ndist);
135 if (*p > end)
136 goto bad;
137
138 ceph_decode_need(p, end, sizeof(num) + 2, bad);
c89136ea
SW
139 num = ceph_decode_32(p);
140 info->dir_end = ceph_decode_8(p);
141 info->dir_complete = ceph_decode_8(p);
2f2dc053
SW
142 if (num == 0)
143 goto done;
144
145 /* alloc large array */
146 info->dir_nr = num;
147 info->dir_in = kcalloc(num, sizeof(*info->dir_in) +
148 sizeof(*info->dir_dname) +
149 sizeof(*info->dir_dname_len) +
150 sizeof(*info->dir_dlease),
151 GFP_NOFS);
152 if (info->dir_in == NULL) {
153 err = -ENOMEM;
154 goto out_bad;
155 }
156 info->dir_dname = (void *)(info->dir_in + num);
157 info->dir_dname_len = (void *)(info->dir_dname + num);
158 info->dir_dlease = (void *)(info->dir_dname_len + num);
159
160 while (num) {
161 /* dentry */
162 ceph_decode_need(p, end, sizeof(u32)*2, bad);
c89136ea 163 info->dir_dname_len[i] = ceph_decode_32(p);
2f2dc053
SW
164 ceph_decode_need(p, end, info->dir_dname_len[i], bad);
165 info->dir_dname[i] = *p;
166 *p += info->dir_dname_len[i];
167 dout("parsed dir dname '%.*s'\n", info->dir_dname_len[i],
168 info->dir_dname[i]);
169 info->dir_dlease[i] = *p;
170 *p += sizeof(struct ceph_mds_reply_lease);
171
172 /* inode */
173 err = parse_reply_info_in(p, end, &info->dir_in[i]);
174 if (err < 0)
175 goto out_bad;
176 i++;
177 num--;
178 }
179
180done:
181 if (*p != end)
182 goto bad;
183 return 0;
184
185bad:
186 err = -EIO;
187out_bad:
188 pr_err("problem parsing dir contents %d\n", err);
189 return err;
190}
191
192/*
193 * parse entire mds reply
194 */
195static int parse_reply_info(struct ceph_msg *msg,
196 struct ceph_mds_reply_info_parsed *info)
197{
198 void *p, *end;
199 u32 len;
200 int err;
201
202 info->head = msg->front.iov_base;
203 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
204 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
205
206 /* trace */
207 ceph_decode_32_safe(&p, end, len, bad);
208 if (len > 0) {
209 err = parse_reply_info_trace(&p, p+len, info);
210 if (err < 0)
211 goto out_bad;
212 }
213
214 /* dir content */
215 ceph_decode_32_safe(&p, end, len, bad);
216 if (len > 0) {
217 err = parse_reply_info_dir(&p, p+len, info);
218 if (err < 0)
219 goto out_bad;
220 }
221
222 /* snap blob */
223 ceph_decode_32_safe(&p, end, len, bad);
224 info->snapblob_len = len;
225 info->snapblob = p;
226 p += len;
227
228 if (p != end)
229 goto bad;
230 return 0;
231
232bad:
233 err = -EIO;
234out_bad:
235 pr_err("mds parse_reply err %d\n", err);
236 return err;
237}
238
239static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
240{
241 kfree(info->dir_in);
242}
243
244
245/*
246 * sessions
247 */
248static const char *session_state_name(int s)
249{
250 switch (s) {
251 case CEPH_MDS_SESSION_NEW: return "new";
252 case CEPH_MDS_SESSION_OPENING: return "opening";
253 case CEPH_MDS_SESSION_OPEN: return "open";
254 case CEPH_MDS_SESSION_HUNG: return "hung";
255 case CEPH_MDS_SESSION_CLOSING: return "closing";
256 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
257 default: return "???";
258 }
259}
260
261static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
262{
263 if (atomic_inc_not_zero(&s->s_ref)) {
264 dout("mdsc get_session %p %d -> %d\n", s,
265 atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref));
266 return s;
267 } else {
268 dout("mdsc get_session %p 0 -- FAIL", s);
269 return NULL;
270 }
271}
272
273void ceph_put_mds_session(struct ceph_mds_session *s)
274{
275 dout("mdsc put_session %p %d -> %d\n", s,
276 atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
42ce56e5 277 if (atomic_dec_and_test(&s->s_ref))
2f2dc053 278 kfree(s);
2f2dc053
SW
279}
280
281/*
282 * called under mdsc->mutex
283 */
284struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
285 int mds)
286{
287 struct ceph_mds_session *session;
288
289 if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == NULL)
290 return NULL;
291 session = mdsc->sessions[mds];
292 dout("lookup_mds_session %p %d\n", session,
293 atomic_read(&session->s_ref));
294 get_session(session);
295 return session;
296}
297
298static bool __have_session(struct ceph_mds_client *mdsc, int mds)
299{
300 if (mds >= mdsc->max_sessions)
301 return false;
302 return mdsc->sessions[mds];
303}
304
305/*
306 * create+register a new session for given mds.
307 * called under mdsc->mutex.
308 */
309static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
310 int mds)
311{
312 struct ceph_mds_session *s;
313
314 s = kzalloc(sizeof(*s), GFP_NOFS);
315 s->s_mdsc = mdsc;
316 s->s_mds = mds;
317 s->s_state = CEPH_MDS_SESSION_NEW;
318 s->s_ttl = 0;
319 s->s_seq = 0;
320 mutex_init(&s->s_mutex);
321
322 ceph_con_init(mdsc->client->msgr, &s->s_con);
323 s->s_con.private = s;
324 s->s_con.ops = &mds_con_ops;
325 s->s_con.peer_name.type = CEPH_ENTITY_TYPE_MDS;
326 s->s_con.peer_name.num = cpu_to_le64(mds);
2f2dc053
SW
327
328 spin_lock_init(&s->s_cap_lock);
329 s->s_cap_gen = 0;
330 s->s_cap_ttl = 0;
331 s->s_renew_requested = 0;
332 s->s_renew_seq = 0;
333 INIT_LIST_HEAD(&s->s_caps);
334 s->s_nr_caps = 0;
335 atomic_set(&s->s_ref, 1);
336 INIT_LIST_HEAD(&s->s_waiting);
337 INIT_LIST_HEAD(&s->s_unsafe);
338 s->s_num_cap_releases = 0;
339 INIT_LIST_HEAD(&s->s_cap_releases);
340 INIT_LIST_HEAD(&s->s_cap_releases_done);
341 INIT_LIST_HEAD(&s->s_cap_flushing);
342 INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
343
344 dout("register_session mds%d\n", mds);
345 if (mds >= mdsc->max_sessions) {
346 int newmax = 1 << get_count_order(mds+1);
347 struct ceph_mds_session **sa;
348
349 dout("register_session realloc to %d\n", newmax);
350 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
351 if (sa == NULL)
42ce56e5 352 goto fail_realloc;
2f2dc053
SW
353 if (mdsc->sessions) {
354 memcpy(sa, mdsc->sessions,
355 mdsc->max_sessions * sizeof(void *));
356 kfree(mdsc->sessions);
357 }
358 mdsc->sessions = sa;
359 mdsc->max_sessions = newmax;
360 }
361 mdsc->sessions[mds] = s;
362 atomic_inc(&s->s_ref); /* one ref to sessions[], one to caller */
42ce56e5
SW
363
364 ceph_con_open(&s->s_con, ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
365
2f2dc053 366 return s;
42ce56e5
SW
367
368fail_realloc:
369 kfree(s);
370 return ERR_PTR(-ENOMEM);
2f2dc053
SW
371}
372
373/*
374 * called under mdsc->mutex
375 */
42ce56e5
SW
376static void unregister_session(struct ceph_mds_client *mdsc,
377 struct ceph_mds_session *s)
2f2dc053 378{
42ce56e5
SW
379 dout("unregister_session mds%d %p\n", s->s_mds, s);
380 mdsc->sessions[s->s_mds] = NULL;
381 ceph_con_close(&s->s_con);
382 ceph_put_mds_session(s);
2f2dc053
SW
383}
384
385/*
386 * drop session refs in request.
387 *
388 * should be last request ref, or hold mdsc->mutex
389 */
390static void put_request_session(struct ceph_mds_request *req)
391{
392 if (req->r_session) {
393 ceph_put_mds_session(req->r_session);
394 req->r_session = NULL;
395 }
396}
397
398void ceph_mdsc_put_request(struct ceph_mds_request *req)
399{
400 dout("mdsc put_request %p %d -> %d\n", req,
401 atomic_read(&req->r_ref), atomic_read(&req->r_ref)-1);
402 if (atomic_dec_and_test(&req->r_ref)) {
403 if (req->r_request)
404 ceph_msg_put(req->r_request);
405 if (req->r_reply) {
406 ceph_msg_put(req->r_reply);
407 destroy_reply_info(&req->r_reply_info);
408 }
409 if (req->r_inode) {
410 ceph_put_cap_refs(ceph_inode(req->r_inode),
411 CEPH_CAP_PIN);
412 iput(req->r_inode);
413 }
414 if (req->r_locked_dir)
415 ceph_put_cap_refs(ceph_inode(req->r_locked_dir),
416 CEPH_CAP_PIN);
417 if (req->r_target_inode)
418 iput(req->r_target_inode);
419 if (req->r_dentry)
420 dput(req->r_dentry);
421 if (req->r_old_dentry) {
422 ceph_put_cap_refs(
423 ceph_inode(req->r_old_dentry->d_parent->d_inode),
424 CEPH_CAP_PIN);
425 dput(req->r_old_dentry);
426 }
427 kfree(req->r_path1);
428 kfree(req->r_path2);
429 put_request_session(req);
430 ceph_unreserve_caps(&req->r_caps_reservation);
431 kfree(req);
432 }
433}
434
435/*
436 * lookup session, bump ref if found.
437 *
438 * called under mdsc->mutex.
439 */
440static struct ceph_mds_request *__lookup_request(struct ceph_mds_client *mdsc,
441 u64 tid)
442{
443 struct ceph_mds_request *req;
444 req = radix_tree_lookup(&mdsc->request_tree, tid);
445 if (req)
446 ceph_mdsc_get_request(req);
447 return req;
448}
449
450/*
451 * Register an in-flight request, and assign a tid. Link to directory
452 * are modifying (if any).
453 *
454 * Called under mdsc->mutex.
455 */
456static void __register_request(struct ceph_mds_client *mdsc,
457 struct ceph_mds_request *req,
458 struct inode *dir)
459{
460 req->r_tid = ++mdsc->last_tid;
461 if (req->r_num_caps)
462 ceph_reserve_caps(&req->r_caps_reservation, req->r_num_caps);
463 dout("__register_request %p tid %lld\n", req, req->r_tid);
464 ceph_mdsc_get_request(req);
465 radix_tree_insert(&mdsc->request_tree, req->r_tid, (void *)req);
466
467 if (dir) {
468 struct ceph_inode_info *ci = ceph_inode(dir);
469
470 spin_lock(&ci->i_unsafe_lock);
471 req->r_unsafe_dir = dir;
472 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
473 spin_unlock(&ci->i_unsafe_lock);
474 }
475}
476
477static void __unregister_request(struct ceph_mds_client *mdsc,
478 struct ceph_mds_request *req)
479{
480 dout("__unregister_request %p tid %lld\n", req, req->r_tid);
481 radix_tree_delete(&mdsc->request_tree, req->r_tid);
482 ceph_mdsc_put_request(req);
483
484 if (req->r_unsafe_dir) {
485 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
486
487 spin_lock(&ci->i_unsafe_lock);
488 list_del_init(&req->r_unsafe_dir_item);
489 spin_unlock(&ci->i_unsafe_lock);
490 }
491}
492
493/*
494 * Choose mds to send request to next. If there is a hint set in the
495 * request (e.g., due to a prior forward hint from the mds), use that.
496 * Otherwise, consult frag tree and/or caps to identify the
497 * appropriate mds. If all else fails, choose randomly.
498 *
499 * Called under mdsc->mutex.
500 */
501static int __choose_mds(struct ceph_mds_client *mdsc,
502 struct ceph_mds_request *req)
503{
504 struct inode *inode;
505 struct ceph_inode_info *ci;
506 struct ceph_cap *cap;
507 int mode = req->r_direct_mode;
508 int mds = -1;
509 u32 hash = req->r_direct_hash;
510 bool is_hash = req->r_direct_is_hash;
511
512 /*
513 * is there a specific mds we should try? ignore hint if we have
514 * no session and the mds is not up (active or recovering).
515 */
516 if (req->r_resend_mds >= 0 &&
517 (__have_session(mdsc, req->r_resend_mds) ||
518 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
519 dout("choose_mds using resend_mds mds%d\n",
520 req->r_resend_mds);
521 return req->r_resend_mds;
522 }
523
524 if (mode == USE_RANDOM_MDS)
525 goto random;
526
527 inode = NULL;
528 if (req->r_inode) {
529 inode = req->r_inode;
530 } else if (req->r_dentry) {
531 if (req->r_dentry->d_inode) {
532 inode = req->r_dentry->d_inode;
533 } else {
534 inode = req->r_dentry->d_parent->d_inode;
535 hash = req->r_dentry->d_name.hash;
536 is_hash = true;
537 }
538 }
539 dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
540 (int)hash, mode);
541 if (!inode)
542 goto random;
543 ci = ceph_inode(inode);
544
545 if (is_hash && S_ISDIR(inode->i_mode)) {
546 struct ceph_inode_frag frag;
547 int found;
548
549 ceph_choose_frag(ci, hash, &frag, &found);
550 if (found) {
551 if (mode == USE_ANY_MDS && frag.ndist > 0) {
552 u8 r;
553
554 /* choose a random replica */
555 get_random_bytes(&r, 1);
556 r %= frag.ndist;
557 mds = frag.dist[r];
558 dout("choose_mds %p %llx.%llx "
559 "frag %u mds%d (%d/%d)\n",
560 inode, ceph_vinop(inode),
561 frag.frag, frag.mds,
562 (int)r, frag.ndist);
563 return mds;
564 }
565
566 /* since this file/dir wasn't known to be
567 * replicated, then we want to look for the
568 * authoritative mds. */
569 mode = USE_AUTH_MDS;
570 if (frag.mds >= 0) {
571 /* choose auth mds */
572 mds = frag.mds;
573 dout("choose_mds %p %llx.%llx "
574 "frag %u mds%d (auth)\n",
575 inode, ceph_vinop(inode), frag.frag, mds);
576 return mds;
577 }
578 }
579 }
580
581 spin_lock(&inode->i_lock);
582 cap = NULL;
583 if (mode == USE_AUTH_MDS)
584 cap = ci->i_auth_cap;
585 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
586 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
587 if (!cap) {
588 spin_unlock(&inode->i_lock);
589 goto random;
590 }
591 mds = cap->session->s_mds;
592 dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
593 inode, ceph_vinop(inode), mds,
594 cap == ci->i_auth_cap ? "auth " : "", cap);
595 spin_unlock(&inode->i_lock);
596 return mds;
597
598random:
599 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
600 dout("choose_mds chose random mds%d\n", mds);
601 return mds;
602}
603
604
605/*
606 * session messages
607 */
608static struct ceph_msg *create_session_msg(u32 op, u64 seq)
609{
610 struct ceph_msg *msg;
611 struct ceph_mds_session_head *h;
612
613 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), 0, 0, NULL);
614 if (IS_ERR(msg)) {
615 pr_err("create_session_msg ENOMEM creating msg\n");
616 return ERR_PTR(PTR_ERR(msg));
617 }
618 h = msg->front.iov_base;
619 h->op = cpu_to_le32(op);
620 h->seq = cpu_to_le64(seq);
621 return msg;
622}
623
624/*
625 * send session open request.
626 *
627 * called under mdsc->mutex
628 */
629static int __open_session(struct ceph_mds_client *mdsc,
630 struct ceph_mds_session *session)
631{
632 struct ceph_msg *msg;
633 int mstate;
634 int mds = session->s_mds;
635 int err = 0;
636
637 /* wait for mds to go active? */
638 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
639 dout("open_session to mds%d (%s)\n", mds,
640 ceph_mds_state_name(mstate));
641 session->s_state = CEPH_MDS_SESSION_OPENING;
642 session->s_renew_requested = jiffies;
643
644 /* send connect message */
645 msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq);
646 if (IS_ERR(msg)) {
647 err = PTR_ERR(msg);
648 goto out;
649 }
650 ceph_con_send(&session->s_con, msg);
651
652out:
653 return 0;
654}
655
656/*
657 * session caps
658 */
659
660/*
661 * Free preallocated cap messages assigned to this session
662 */
663static void cleanup_cap_releases(struct ceph_mds_session *session)
664{
665 struct ceph_msg *msg;
666
667 spin_lock(&session->s_cap_lock);
668 while (!list_empty(&session->s_cap_releases)) {
669 msg = list_first_entry(&session->s_cap_releases,
670 struct ceph_msg, list_head);
671 list_del_init(&msg->list_head);
672 ceph_msg_put(msg);
673 }
674 while (!list_empty(&session->s_cap_releases_done)) {
675 msg = list_first_entry(&session->s_cap_releases_done,
676 struct ceph_msg, list_head);
677 list_del_init(&msg->list_head);
678 ceph_msg_put(msg);
679 }
680 spin_unlock(&session->s_cap_lock);
681}
682
683/*
684 * Helper to safely iterate over all caps associated with a session.
685 *
686 * caller must hold session s_mutex
687 */
688static int iterate_session_caps(struct ceph_mds_session *session,
689 int (*cb)(struct inode *, struct ceph_cap *,
690 void *), void *arg)
691{
692 struct ceph_cap *cap, *ncap;
693 struct inode *inode;
694 int ret;
695
696 dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
697 spin_lock(&session->s_cap_lock);
698 list_for_each_entry_safe(cap, ncap, &session->s_caps, session_caps) {
699 inode = igrab(&cap->ci->vfs_inode);
700 if (!inode)
701 continue;
702 spin_unlock(&session->s_cap_lock);
703 ret = cb(inode, cap, arg);
704 iput(inode);
705 if (ret < 0)
706 return ret;
707 spin_lock(&session->s_cap_lock);
708 }
709 spin_unlock(&session->s_cap_lock);
710
711 return 0;
712}
713
714static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
715 void *arg)
716{
717 struct ceph_inode_info *ci = ceph_inode(inode);
718 dout("removing cap %p, ci is %p, inode is %p\n",
719 cap, ci, &ci->vfs_inode);
720 ceph_remove_cap(cap);
721 return 0;
722}
723
724/*
725 * caller must hold session s_mutex
726 */
727static void remove_session_caps(struct ceph_mds_session *session)
728{
729 dout("remove_session_caps on %p\n", session);
730 iterate_session_caps(session, remove_session_caps_cb, NULL);
731 BUG_ON(session->s_nr_caps > 0);
732 cleanup_cap_releases(session);
733}
734
735/*
736 * wake up any threads waiting on this session's caps. if the cap is
737 * old (didn't get renewed on the client reconnect), remove it now.
738 *
739 * caller must hold s_mutex.
740 */
741static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
742 void *arg)
743{
2f2dc053 744 wake_up(&ceph_inode(inode)->i_cap_wq);
2f2dc053
SW
745 return 0;
746}
747
748static void wake_up_session_caps(struct ceph_mds_session *session)
749{
750 dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
cdac8303 751 iterate_session_caps(session, wake_up_session_cb, NULL);
2f2dc053
SW
752}
753
754/*
755 * Send periodic message to MDS renewing all currently held caps. The
756 * ack will reset the expiration for all caps from this session.
757 *
758 * caller holds s_mutex
759 */
760static int send_renew_caps(struct ceph_mds_client *mdsc,
761 struct ceph_mds_session *session)
762{
763 struct ceph_msg *msg;
764 int state;
765
766 if (time_after_eq(jiffies, session->s_cap_ttl) &&
767 time_after_eq(session->s_cap_ttl, session->s_renew_requested))
768 pr_info("mds%d caps stale\n", session->s_mds);
769
770 /* do not try to renew caps until a recovering mds has reconnected
771 * with its clients. */
772 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
773 if (state < CEPH_MDS_STATE_RECONNECT) {
774 dout("send_renew_caps ignoring mds%d (%s)\n",
775 session->s_mds, ceph_mds_state_name(state));
776 return 0;
777 }
778
779 dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
780 ceph_mds_state_name(state));
781 session->s_renew_requested = jiffies;
782 msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
783 ++session->s_renew_seq);
784 if (IS_ERR(msg))
785 return PTR_ERR(msg);
786 ceph_con_send(&session->s_con, msg);
787 return 0;
788}
789
790/*
791 * Note new cap ttl, and any transition from stale -> not stale (fresh?).
792 */
793static void renewed_caps(struct ceph_mds_client *mdsc,
794 struct ceph_mds_session *session, int is_renew)
795{
796 int was_stale;
797 int wake = 0;
798
799 spin_lock(&session->s_cap_lock);
800 was_stale = is_renew && (session->s_cap_ttl == 0 ||
801 time_after_eq(jiffies, session->s_cap_ttl));
802
803 session->s_cap_ttl = session->s_renew_requested +
804 mdsc->mdsmap->m_session_timeout*HZ;
805
806 if (was_stale) {
807 if (time_before(jiffies, session->s_cap_ttl)) {
808 pr_info("mds%d caps renewed\n", session->s_mds);
809 wake = 1;
810 } else {
811 pr_info("mds%d caps still stale\n", session->s_mds);
812 }
813 }
814 dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
815 session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
816 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
817 spin_unlock(&session->s_cap_lock);
818
819 if (wake)
820 wake_up_session_caps(session);
821}
822
823/*
824 * send a session close request
825 */
826static int request_close_session(struct ceph_mds_client *mdsc,
827 struct ceph_mds_session *session)
828{
829 struct ceph_msg *msg;
830 int err = 0;
831
832 dout("request_close_session mds%d state %s seq %lld\n",
833 session->s_mds, session_state_name(session->s_state),
834 session->s_seq);
835 msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
836 if (IS_ERR(msg))
837 err = PTR_ERR(msg);
838 else
839 ceph_con_send(&session->s_con, msg);
840 return err;
841}
842
843/*
844 * Called with s_mutex held.
845 */
846static int __close_session(struct ceph_mds_client *mdsc,
847 struct ceph_mds_session *session)
848{
849 if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
850 return 0;
851 session->s_state = CEPH_MDS_SESSION_CLOSING;
852 return request_close_session(mdsc, session);
853}
854
855/*
856 * Trim old(er) caps.
857 *
858 * Because we can't cache an inode without one or more caps, we do
859 * this indirectly: if a cap is unused, we prune its aliases, at which
860 * point the inode will hopefully get dropped to.
861 *
862 * Yes, this is a bit sloppy. Our only real goal here is to respond to
863 * memory pressure from the MDS, though, so it needn't be perfect.
864 */
865static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
866{
867 struct ceph_mds_session *session = arg;
868 struct ceph_inode_info *ci = ceph_inode(inode);
869 int used, oissued, mine;
870
871 if (session->s_trim_caps <= 0)
872 return -1;
873
874 spin_lock(&inode->i_lock);
875 mine = cap->issued | cap->implemented;
876 used = __ceph_caps_used(ci);
877 oissued = __ceph_caps_issued_other(ci, cap);
878
879 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s\n",
880 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
881 ceph_cap_string(used));
882 if (ci->i_dirty_caps)
883 goto out; /* dirty caps */
884 if ((used & ~oissued) & mine)
885 goto out; /* we need these caps */
886
887 session->s_trim_caps--;
888 if (oissued) {
889 /* we aren't the only cap.. just remove us */
890 __ceph_remove_cap(cap, NULL);
891 } else {
892 /* try to drop referring dentries */
893 spin_unlock(&inode->i_lock);
894 d_prune_aliases(inode);
895 dout("trim_caps_cb %p cap %p pruned, count now %d\n",
896 inode, cap, atomic_read(&inode->i_count));
897 return 0;
898 }
899
900out:
901 spin_unlock(&inode->i_lock);
902 return 0;
903}
904
905/*
906 * Trim session cap count down to some max number.
907 */
908static int trim_caps(struct ceph_mds_client *mdsc,
909 struct ceph_mds_session *session,
910 int max_caps)
911{
912 int trim_caps = session->s_nr_caps - max_caps;
913
914 dout("trim_caps mds%d start: %d / %d, trim %d\n",
915 session->s_mds, session->s_nr_caps, max_caps, trim_caps);
916 if (trim_caps > 0) {
917 session->s_trim_caps = trim_caps;
918 iterate_session_caps(session, trim_caps_cb, session);
919 dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
920 session->s_mds, session->s_nr_caps, max_caps,
921 trim_caps - session->s_trim_caps);
922 }
923 return 0;
924}
925
926/*
927 * Allocate cap_release messages. If there is a partially full message
928 * in the queue, try to allocate enough to cover it's remainder, so that
929 * we can send it immediately.
930 *
931 * Called under s_mutex.
932 */
933static int add_cap_releases(struct ceph_mds_client *mdsc,
934 struct ceph_mds_session *session,
935 int extra)
936{
937 struct ceph_msg *msg;
938 struct ceph_mds_cap_release *head;
939 int err = -ENOMEM;
940
941 if (extra < 0)
6b805185 942 extra = mdsc->client->mount_args->cap_release_safety;
2f2dc053
SW
943
944 spin_lock(&session->s_cap_lock);
945
946 if (!list_empty(&session->s_cap_releases)) {
947 msg = list_first_entry(&session->s_cap_releases,
948 struct ceph_msg,
949 list_head);
950 head = msg->front.iov_base;
951 extra += CEPH_CAPS_PER_RELEASE - le32_to_cpu(head->num);
952 }
953
954 while (session->s_num_cap_releases < session->s_nr_caps + extra) {
955 spin_unlock(&session->s_cap_lock);
956 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
957 0, 0, NULL);
958 if (!msg)
959 goto out_unlocked;
960 dout("add_cap_releases %p msg %p now %d\n", session, msg,
961 (int)msg->front.iov_len);
962 head = msg->front.iov_base;
963 head->num = cpu_to_le32(0);
964 msg->front.iov_len = sizeof(*head);
965 spin_lock(&session->s_cap_lock);
966 list_add(&msg->list_head, &session->s_cap_releases);
967 session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE;
968 }
969
970 if (!list_empty(&session->s_cap_releases)) {
971 msg = list_first_entry(&session->s_cap_releases,
972 struct ceph_msg,
973 list_head);
974 head = msg->front.iov_base;
975 if (head->num) {
976 dout(" queueing non-full %p (%d)\n", msg,
977 le32_to_cpu(head->num));
978 list_move_tail(&msg->list_head,
979 &session->s_cap_releases_done);
980 session->s_num_cap_releases -=
981 CEPH_CAPS_PER_RELEASE - le32_to_cpu(head->num);
982 }
983 }
984 err = 0;
985 spin_unlock(&session->s_cap_lock);
986out_unlocked:
987 return err;
988}
989
990/*
991 * flush all dirty inode data to disk.
992 *
993 * returns true if we've flushed through want_flush_seq
994 */
995static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
996{
997 int mds, ret = 1;
998
999 dout("check_cap_flush want %lld\n", want_flush_seq);
1000 mutex_lock(&mdsc->mutex);
1001 for (mds = 0; ret && mds < mdsc->max_sessions; mds++) {
1002 struct ceph_mds_session *session = mdsc->sessions[mds];
1003
1004 if (!session)
1005 continue;
1006 get_session(session);
1007 mutex_unlock(&mdsc->mutex);
1008
1009 mutex_lock(&session->s_mutex);
1010 if (!list_empty(&session->s_cap_flushing)) {
1011 struct ceph_inode_info *ci =
1012 list_entry(session->s_cap_flushing.next,
1013 struct ceph_inode_info,
1014 i_flushing_item);
1015 struct inode *inode = &ci->vfs_inode;
1016
1017 spin_lock(&inode->i_lock);
1018 if (ci->i_cap_flush_seq <= want_flush_seq) {
1019 dout("check_cap_flush still flushing %p "
1020 "seq %lld <= %lld to mds%d\n", inode,
1021 ci->i_cap_flush_seq, want_flush_seq,
1022 session->s_mds);
1023 ret = 0;
1024 }
1025 spin_unlock(&inode->i_lock);
1026 }
1027 mutex_unlock(&session->s_mutex);
1028 ceph_put_mds_session(session);
1029
1030 if (!ret)
1031 return ret;
1032 mutex_lock(&mdsc->mutex);
1033 }
1034
1035 mutex_unlock(&mdsc->mutex);
1036 dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq);
1037 return ret;
1038}
1039
1040/*
1041 * called under s_mutex
1042 */
1043static void send_cap_releases(struct ceph_mds_client *mdsc,
1044 struct ceph_mds_session *session)
1045{
1046 struct ceph_msg *msg;
1047
1048 dout("send_cap_releases mds%d\n", session->s_mds);
1049 while (1) {
1050 spin_lock(&session->s_cap_lock);
1051 if (list_empty(&session->s_cap_releases_done))
1052 break;
1053 msg = list_first_entry(&session->s_cap_releases_done,
1054 struct ceph_msg, list_head);
1055 list_del_init(&msg->list_head);
1056 spin_unlock(&session->s_cap_lock);
1057 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1058 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1059 ceph_con_send(&session->s_con, msg);
1060 }
1061 spin_unlock(&session->s_cap_lock);
1062}
1063
1064/*
1065 * requests
1066 */
1067
1068/*
1069 * Create an mds request.
1070 */
1071struct ceph_mds_request *
1072ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
1073{
1074 struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
1075
1076 if (!req)
1077 return ERR_PTR(-ENOMEM);
1078
1079 req->r_started = jiffies;
1080 req->r_resend_mds = -1;
1081 INIT_LIST_HEAD(&req->r_unsafe_dir_item);
1082 req->r_fmode = -1;
1083 atomic_set(&req->r_ref, 1); /* one for request_tree, one for caller */
1084 INIT_LIST_HEAD(&req->r_wait);
1085 init_completion(&req->r_completion);
1086 init_completion(&req->r_safe_completion);
1087 INIT_LIST_HEAD(&req->r_unsafe_item);
1088
1089 req->r_op = op;
1090 req->r_direct_mode = mode;
1091 return req;
1092}
1093
1094/*
1095 * return oldest (lowest) tid in request tree, 0 if none.
1096 *
1097 * called under mdsc->mutex.
1098 */
1099static u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
1100{
1101 struct ceph_mds_request *first;
1102 if (radix_tree_gang_lookup(&mdsc->request_tree,
1103 (void **)&first, 0, 1) <= 0)
1104 return 0;
1105 return first->r_tid;
1106}
1107
1108/*
1109 * Build a dentry's path. Allocate on heap; caller must kfree. Based
1110 * on build_path_from_dentry in fs/cifs/dir.c.
1111 *
1112 * If @stop_on_nosnap, generate path relative to the first non-snapped
1113 * inode.
1114 *
1115 * Encode hidden .snap dirs as a double /, i.e.
1116 * foo/.snap/bar -> foo//bar
1117 */
1118char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
1119 int stop_on_nosnap)
1120{
1121 struct dentry *temp;
1122 char *path;
1123 int len, pos;
1124
1125 if (dentry == NULL)
1126 return ERR_PTR(-EINVAL);
1127
1128retry:
1129 len = 0;
1130 for (temp = dentry; !IS_ROOT(temp);) {
1131 struct inode *inode = temp->d_inode;
1132 if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
1133 len++; /* slash only */
1134 else if (stop_on_nosnap && inode &&
1135 ceph_snap(inode) == CEPH_NOSNAP)
1136 break;
1137 else
1138 len += 1 + temp->d_name.len;
1139 temp = temp->d_parent;
1140 if (temp == NULL) {
1141 pr_err("build_path_dentry corrupt dentry %p\n", dentry);
1142 return ERR_PTR(-EINVAL);
1143 }
1144 }
1145 if (len)
1146 len--; /* no leading '/' */
1147
1148 path = kmalloc(len+1, GFP_NOFS);
1149 if (path == NULL)
1150 return ERR_PTR(-ENOMEM);
1151 pos = len;
1152 path[pos] = 0; /* trailing null */
1153 for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
1154 struct inode *inode = temp->d_inode;
1155
1156 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
1157 dout("build_path_dentry path+%d: %p SNAPDIR\n",
1158 pos, temp);
1159 } else if (stop_on_nosnap && inode &&
1160 ceph_snap(inode) == CEPH_NOSNAP) {
1161 break;
1162 } else {
1163 pos -= temp->d_name.len;
1164 if (pos < 0)
1165 break;
1166 strncpy(path + pos, temp->d_name.name,
1167 temp->d_name.len);
1168 dout("build_path_dentry path+%d: %p '%.*s'\n",
1169 pos, temp, temp->d_name.len, path + pos);
1170 }
1171 if (pos)
1172 path[--pos] = '/';
1173 temp = temp->d_parent;
1174 if (temp == NULL) {
1175 pr_err("build_path_dentry corrupt dentry\n");
1176 kfree(path);
1177 return ERR_PTR(-EINVAL);
1178 }
1179 }
1180 if (pos != 0) {
1181 pr_err("build_path_dentry did not end path lookup where "
1182 "expected, namelen is %d, pos is %d\n", len, pos);
1183 /* presumably this is only possible if racing with a
1184 rename of one of the parent directories (we can not
1185 lock the dentries above us to prevent this, but
1186 retrying should be harmless) */
1187 kfree(path);
1188 goto retry;
1189 }
1190
1191 *base = ceph_ino(temp->d_inode);
1192 *plen = len;
1193 dout("build_path_dentry on %p %d built %llx '%.*s'\n",
1194 dentry, atomic_read(&dentry->d_count), *base, len, path);
1195 return path;
1196}
1197
1198static int build_dentry_path(struct dentry *dentry,
1199 const char **ppath, int *ppathlen, u64 *pino,
1200 int *pfreepath)
1201{
1202 char *path;
1203
1204 if (ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP) {
1205 *pino = ceph_ino(dentry->d_parent->d_inode);
1206 *ppath = dentry->d_name.name;
1207 *ppathlen = dentry->d_name.len;
1208 return 0;
1209 }
1210 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1211 if (IS_ERR(path))
1212 return PTR_ERR(path);
1213 *ppath = path;
1214 *pfreepath = 1;
1215 return 0;
1216}
1217
1218static int build_inode_path(struct inode *inode,
1219 const char **ppath, int *ppathlen, u64 *pino,
1220 int *pfreepath)
1221{
1222 struct dentry *dentry;
1223 char *path;
1224
1225 if (ceph_snap(inode) == CEPH_NOSNAP) {
1226 *pino = ceph_ino(inode);
1227 *ppathlen = 0;
1228 return 0;
1229 }
1230 dentry = d_find_alias(inode);
1231 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1232 dput(dentry);
1233 if (IS_ERR(path))
1234 return PTR_ERR(path);
1235 *ppath = path;
1236 *pfreepath = 1;
1237 return 0;
1238}
1239
1240/*
1241 * request arguments may be specified via an inode *, a dentry *, or
1242 * an explicit ino+path.
1243 */
1244static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
1245 const char *rpath, u64 rino,
1246 const char **ppath, int *pathlen,
1247 u64 *ino, int *freepath)
1248{
1249 int r = 0;
1250
1251 if (rinode) {
1252 r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
1253 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
1254 ceph_snap(rinode));
1255 } else if (rdentry) {
1256 r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath);
1257 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
1258 *ppath);
1259 } else if (rpath) {
1260 *ino = rino;
1261 *ppath = rpath;
1262 *pathlen = strlen(rpath);
1263 dout(" path %.*s\n", *pathlen, rpath);
1264 }
1265
1266 return r;
1267}
1268
1269/*
1270 * called under mdsc->mutex
1271 */
1272static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1273 struct ceph_mds_request *req,
1274 int mds)
1275{
1276 struct ceph_msg *msg;
1277 struct ceph_mds_request_head *head;
1278 const char *path1 = NULL;
1279 const char *path2 = NULL;
1280 u64 ino1 = 0, ino2 = 0;
1281 int pathlen1 = 0, pathlen2 = 0;
1282 int freepath1 = 0, freepath2 = 0;
1283 int len;
1284 u16 releases;
1285 void *p, *end;
1286 int ret;
1287
1288 ret = set_request_path_attr(req->r_inode, req->r_dentry,
1289 req->r_path1, req->r_ino1.ino,
1290 &path1, &pathlen1, &ino1, &freepath1);
1291 if (ret < 0) {
1292 msg = ERR_PTR(ret);
1293 goto out;
1294 }
1295
1296 ret = set_request_path_attr(NULL, req->r_old_dentry,
1297 req->r_path2, req->r_ino2.ino,
1298 &path2, &pathlen2, &ino2, &freepath2);
1299 if (ret < 0) {
1300 msg = ERR_PTR(ret);
1301 goto out_free1;
1302 }
1303
1304 len = sizeof(*head) +
1305 pathlen1 + pathlen2 + 2*(sizeof(u32) + sizeof(u64));
1306
1307 /* calculate (max) length for cap releases */
1308 len += sizeof(struct ceph_mds_request_release) *
1309 (!!req->r_inode_drop + !!req->r_dentry_drop +
1310 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
1311 if (req->r_dentry_drop)
1312 len += req->r_dentry->d_name.len;
1313 if (req->r_old_dentry_drop)
1314 len += req->r_old_dentry->d_name.len;
1315
1316 msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, 0, 0, NULL);
1317 if (IS_ERR(msg))
1318 goto out_free2;
1319
1320 head = msg->front.iov_base;
1321 p = msg->front.iov_base + sizeof(*head);
1322 end = msg->front.iov_base + msg->front.iov_len;
1323
1324 head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
1325 head->op = cpu_to_le32(req->r_op);
1326 head->caller_uid = cpu_to_le32(current_fsuid());
1327 head->caller_gid = cpu_to_le32(current_fsgid());
1328 head->args = req->r_args;
1329
1330 ceph_encode_filepath(&p, end, ino1, path1);
1331 ceph_encode_filepath(&p, end, ino2, path2);
1332
1333 /* cap releases */
1334 releases = 0;
1335 if (req->r_inode_drop)
1336 releases += ceph_encode_inode_release(&p,
1337 req->r_inode ? req->r_inode : req->r_dentry->d_inode,
1338 mds, req->r_inode_drop, req->r_inode_unless, 0);
1339 if (req->r_dentry_drop)
1340 releases += ceph_encode_dentry_release(&p, req->r_dentry,
1341 mds, req->r_dentry_drop, req->r_dentry_unless);
1342 if (req->r_old_dentry_drop)
1343 releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
1344 mds, req->r_old_dentry_drop, req->r_old_dentry_unless);
1345 if (req->r_old_inode_drop)
1346 releases += ceph_encode_inode_release(&p,
1347 req->r_old_dentry->d_inode,
1348 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
1349 head->num_releases = cpu_to_le16(releases);
1350
1351 BUG_ON(p > end);
1352 msg->front.iov_len = p - msg->front.iov_base;
1353 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1354
1355 msg->pages = req->r_pages;
1356 msg->nr_pages = req->r_num_pages;
1357 msg->hdr.data_len = cpu_to_le32(req->r_data_len);
1358 msg->hdr.data_off = cpu_to_le16(0);
1359
1360out_free2:
1361 if (freepath2)
1362 kfree((char *)path2);
1363out_free1:
1364 if (freepath1)
1365 kfree((char *)path1);
1366out:
1367 return msg;
1368}
1369
1370/*
1371 * called under mdsc->mutex if error, under no mutex if
1372 * success.
1373 */
1374static void complete_request(struct ceph_mds_client *mdsc,
1375 struct ceph_mds_request *req)
1376{
1377 if (req->r_callback)
1378 req->r_callback(mdsc, req);
1379 else
1380 complete(&req->r_completion);
1381}
1382
1383/*
1384 * called under mdsc->mutex
1385 */
1386static int __prepare_send_request(struct ceph_mds_client *mdsc,
1387 struct ceph_mds_request *req,
1388 int mds)
1389{
1390 struct ceph_mds_request_head *rhead;
1391 struct ceph_msg *msg;
1392 int flags = 0;
1393
1394 req->r_mds = mds;
1395 req->r_attempts++;
1396 dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
1397 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
1398
1399 if (req->r_request) {
1400 ceph_msg_put(req->r_request);
1401 req->r_request = NULL;
1402 }
1403 msg = create_request_message(mdsc, req, mds);
1404 if (IS_ERR(msg)) {
1405 req->r_reply = ERR_PTR(PTR_ERR(msg));
1406 complete_request(mdsc, req);
1407 return -PTR_ERR(msg);
1408 }
1409 req->r_request = msg;
1410
1411 rhead = msg->front.iov_base;
1412 rhead->tid = cpu_to_le64(req->r_tid);
1413 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
1414 if (req->r_got_unsafe)
1415 flags |= CEPH_MDS_FLAG_REPLAY;
1416 if (req->r_locked_dir)
1417 flags |= CEPH_MDS_FLAG_WANT_DENTRY;
1418 rhead->flags = cpu_to_le32(flags);
1419 rhead->num_fwd = req->r_num_fwd;
1420 rhead->num_retry = req->r_attempts - 1;
1421
1422 dout(" r_locked_dir = %p\n", req->r_locked_dir);
1423
1424 if (req->r_target_inode && req->r_got_unsafe)
1425 rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
1426 else
1427 rhead->ino = 0;
1428 return 0;
1429}
1430
1431/*
1432 * send request, or put it on the appropriate wait list.
1433 */
1434static int __do_request(struct ceph_mds_client *mdsc,
1435 struct ceph_mds_request *req)
1436{
1437 struct ceph_mds_session *session = NULL;
1438 int mds = -1;
1439 int err = -EAGAIN;
1440
1441 if (req->r_reply)
1442 goto out;
1443
1444 if (req->r_timeout &&
1445 time_after_eq(jiffies, req->r_started + req->r_timeout)) {
1446 dout("do_request timed out\n");
1447 err = -EIO;
1448 goto finish;
1449 }
1450
1451 mds = __choose_mds(mdsc, req);
1452 if (mds < 0 ||
1453 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
1454 dout("do_request no mds or not active, waiting for map\n");
1455 list_add(&req->r_wait, &mdsc->waiting_for_map);
1456 goto out;
1457 }
1458
1459 /* get, open session */
1460 session = __ceph_lookup_mds_session(mdsc, mds);
1461 if (!session)
1462 session = register_session(mdsc, mds);
1463 dout("do_request mds%d session %p state %s\n", mds, session,
1464 session_state_name(session->s_state));
1465 if (session->s_state != CEPH_MDS_SESSION_OPEN &&
1466 session->s_state != CEPH_MDS_SESSION_HUNG) {
1467 if (session->s_state == CEPH_MDS_SESSION_NEW ||
1468 session->s_state == CEPH_MDS_SESSION_CLOSING)
1469 __open_session(mdsc, session);
1470 list_add(&req->r_wait, &session->s_waiting);
1471 goto out_session;
1472 }
1473
1474 /* send request */
1475 req->r_session = get_session(session);
1476 req->r_resend_mds = -1; /* forget any previous mds hint */
1477
1478 if (req->r_request_started == 0) /* note request start time */
1479 req->r_request_started = jiffies;
1480
1481 err = __prepare_send_request(mdsc, req, mds);
1482 if (!err) {
1483 ceph_msg_get(req->r_request);
1484 ceph_con_send(&session->s_con, req->r_request);
1485 }
1486
1487out_session:
1488 ceph_put_mds_session(session);
1489out:
1490 return err;
1491
1492finish:
1493 req->r_reply = ERR_PTR(err);
1494 complete_request(mdsc, req);
1495 goto out;
1496}
1497
1498/*
1499 * called under mdsc->mutex
1500 */
1501static void __wake_requests(struct ceph_mds_client *mdsc,
1502 struct list_head *head)
1503{
1504 struct ceph_mds_request *req, *nreq;
1505
1506 list_for_each_entry_safe(req, nreq, head, r_wait) {
1507 list_del_init(&req->r_wait);
1508 __do_request(mdsc, req);
1509 }
1510}
1511
1512/*
1513 * Wake up threads with requests pending for @mds, so that they can
1514 * resubmit their requests to a possibly different mds. If @all is set,
1515 * wake up if their requests has been forwarded to @mds, too.
1516 */
1517static void kick_requests(struct ceph_mds_client *mdsc, int mds, int all)
1518{
1519 struct ceph_mds_request *reqs[10];
1520 u64 nexttid = 0;
1521 int i, got;
1522
1523 dout("kick_requests mds%d\n", mds);
1524 while (nexttid <= mdsc->last_tid) {
1525 got = radix_tree_gang_lookup(&mdsc->request_tree,
1526 (void **)&reqs, nexttid, 10);
1527 if (got == 0)
1528 break;
1529 nexttid = reqs[got-1]->r_tid + 1;
1530 for (i = 0; i < got; i++) {
1531 if (reqs[i]->r_got_unsafe)
1532 continue;
1533 if (reqs[i]->r_session &&
1534 reqs[i]->r_session->s_mds == mds) {
1535 dout(" kicking tid %llu\n", reqs[i]->r_tid);
1536 put_request_session(reqs[i]);
1537 __do_request(mdsc, reqs[i]);
1538 }
1539 }
1540 }
1541}
1542
1543void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
1544 struct ceph_mds_request *req)
1545{
1546 dout("submit_request on %p\n", req);
1547 mutex_lock(&mdsc->mutex);
1548 __register_request(mdsc, req, NULL);
1549 __do_request(mdsc, req);
1550 mutex_unlock(&mdsc->mutex);
1551}
1552
1553/*
1554 * Synchrously perform an mds request. Take care of all of the
1555 * session setup, forwarding, retry details.
1556 */
1557int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
1558 struct inode *dir,
1559 struct ceph_mds_request *req)
1560{
1561 int err;
1562
1563 dout("do_request on %p\n", req);
1564
1565 /* take CAP_PIN refs for r_inode, r_locked_dir, r_old_dentry */
1566 if (req->r_inode)
1567 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
1568 if (req->r_locked_dir)
1569 ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
1570 if (req->r_old_dentry)
1571 ceph_get_cap_refs(
1572 ceph_inode(req->r_old_dentry->d_parent->d_inode),
1573 CEPH_CAP_PIN);
1574
1575 /* issue */
1576 mutex_lock(&mdsc->mutex);
1577 __register_request(mdsc, req, dir);
1578 __do_request(mdsc, req);
1579
1580 /* wait */
1581 if (!req->r_reply) {
1582 mutex_unlock(&mdsc->mutex);
1583 if (req->r_timeout) {
1584 err = wait_for_completion_timeout(&req->r_completion,
1585 req->r_timeout);
1586 if (err > 0)
1587 err = 0;
1588 else if (err == 0)
1589 req->r_reply = ERR_PTR(-EIO);
1590 } else {
1591 wait_for_completion(&req->r_completion);
1592 }
1593 mutex_lock(&mdsc->mutex);
1594 }
1595
1596 if (IS_ERR(req->r_reply)) {
1597 err = PTR_ERR(req->r_reply);
1598 req->r_reply = NULL;
1599
1600 /* clean up */
1601 __unregister_request(mdsc, req);
1602 if (!list_empty(&req->r_unsafe_item))
1603 list_del_init(&req->r_unsafe_item);
1604 complete(&req->r_safe_completion);
1605 } else if (req->r_err) {
1606 err = req->r_err;
1607 } else {
1608 err = le32_to_cpu(req->r_reply_info.head->result);
1609 }
1610 mutex_unlock(&mdsc->mutex);
1611
1612 dout("do_request %p done, result %d\n", req, err);
1613 return err;
1614}
1615
1616/*
1617 * Handle mds reply.
1618 *
1619 * We take the session mutex and parse and process the reply immediately.
1620 * This preserves the logical ordering of replies, capabilities, etc., sent
1621 * by the MDS as they are applied to our local cache.
1622 */
1623static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
1624{
1625 struct ceph_mds_client *mdsc = session->s_mdsc;
1626 struct ceph_mds_request *req;
1627 struct ceph_mds_reply_head *head = msg->front.iov_base;
1628 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */
1629 u64 tid;
1630 int err, result;
1631 int mds;
1632
1633 if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS)
1634 return;
1635 if (msg->front.iov_len < sizeof(*head)) {
1636 pr_err("mdsc_handle_reply got corrupt (short) reply\n");
1637 return;
1638 }
1639
1640 /* get request, session */
1641 tid = le64_to_cpu(head->tid);
1642 mutex_lock(&mdsc->mutex);
1643 req = __lookup_request(mdsc, tid);
1644 if (!req) {
1645 dout("handle_reply on unknown tid %llu\n", tid);
1646 mutex_unlock(&mdsc->mutex);
1647 return;
1648 }
1649 dout("handle_reply %p\n", req);
1650 mds = le64_to_cpu(msg->hdr.src.name.num);
1651
1652 /* correct session? */
1653 if (!req->r_session && req->r_session != session) {
1654 pr_err("mdsc_handle_reply got %llu on session mds%d"
1655 " not mds%d\n", tid, session->s_mds,
1656 req->r_session ? req->r_session->s_mds : -1);
1657 mutex_unlock(&mdsc->mutex);
1658 goto out;
1659 }
1660
1661 /* dup? */
1662 if ((req->r_got_unsafe && !head->safe) ||
1663 (req->r_got_safe && head->safe)) {
1664 pr_warning("got a dup %s reply on %llu from mds%d\n",
1665 head->safe ? "safe" : "unsafe", tid, mds);
1666 mutex_unlock(&mdsc->mutex);
1667 goto out;
1668 }
1669
1670 result = le32_to_cpu(head->result);
1671
1672 /*
1673 * Tolerate 2 consecutive ESTALEs from the same mds.
1674 * FIXME: we should be looking at the cap migrate_seq.
1675 */
1676 if (result == -ESTALE) {
1677 req->r_direct_mode = USE_AUTH_MDS;
1678 req->r_num_stale++;
1679 if (req->r_num_stale <= 2) {
1680 __do_request(mdsc, req);
1681 mutex_unlock(&mdsc->mutex);
1682 goto out;
1683 }
1684 } else {
1685 req->r_num_stale = 0;
1686 }
1687
1688 if (head->safe) {
1689 req->r_got_safe = true;
1690 __unregister_request(mdsc, req);
1691 complete(&req->r_safe_completion);
1692
1693 if (req->r_got_unsafe) {
1694 /*
1695 * We already handled the unsafe response, now do the
1696 * cleanup. No need to examine the response; the MDS
1697 * doesn't include any result info in the safe
1698 * response. And even if it did, there is nothing
1699 * useful we could do with a revised return value.
1700 */
1701 dout("got safe reply %llu, mds%d\n", tid, mds);
1702 list_del_init(&req->r_unsafe_item);
1703
1704 /* last unsafe request during umount? */
1705 if (mdsc->stopping && !__get_oldest_tid(mdsc))
1706 complete(&mdsc->safe_umount_waiters);
1707 mutex_unlock(&mdsc->mutex);
1708 goto out;
1709 }
1710 }
1711
1712 BUG_ON(req->r_reply);
1713
1714 if (!head->safe) {
1715 req->r_got_unsafe = true;
1716 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
1717 }
1718
1719 dout("handle_reply tid %lld result %d\n", tid, result);
1720 rinfo = &req->r_reply_info;
1721 err = parse_reply_info(msg, rinfo);
1722 mutex_unlock(&mdsc->mutex);
1723
1724 mutex_lock(&session->s_mutex);
1725 if (err < 0) {
1726 pr_err("mdsc_handle_reply got corrupt reply mds%d\n", mds);
1727 goto out_err;
1728 }
1729
1730 /* snap trace */
1731 if (rinfo->snapblob_len) {
1732 down_write(&mdsc->snap_rwsem);
1733 ceph_update_snap_trace(mdsc, rinfo->snapblob,
1734 rinfo->snapblob + rinfo->snapblob_len,
1735 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP);
1736 downgrade_write(&mdsc->snap_rwsem);
1737 } else {
1738 down_read(&mdsc->snap_rwsem);
1739 }
1740
1741 /* insert trace into our cache */
1742 err = ceph_fill_trace(mdsc->client->sb, req, req->r_session);
1743 if (err == 0) {
1744 if (result == 0 && rinfo->dir_nr)
1745 ceph_readdir_prepopulate(req, req->r_session);
1746 ceph_unreserve_caps(&req->r_caps_reservation);
1747 }
1748
1749 up_read(&mdsc->snap_rwsem);
1750out_err:
1751 if (err) {
1752 req->r_err = err;
1753 } else {
1754 req->r_reply = msg;
1755 ceph_msg_get(msg);
1756 }
1757
1758 add_cap_releases(mdsc, req->r_session, -1);
1759 mutex_unlock(&session->s_mutex);
1760
1761 /* kick calling process */
1762 complete_request(mdsc, req);
1763out:
1764 ceph_mdsc_put_request(req);
1765 return;
1766}
1767
1768
1769
1770/*
1771 * handle mds notification that our request has been forwarded.
1772 */
1773static void handle_forward(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
1774{
1775 struct ceph_mds_request *req;
1776 u64 tid;
1777 u32 next_mds;
1778 u32 fwd_seq;
1779 u8 must_resend;
1780 int err = -EINVAL;
1781 void *p = msg->front.iov_base;
1782 void *end = p + msg->front.iov_len;
1783 int from_mds, state;
1784
1785 if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS)
1786 goto bad;
1787 from_mds = le64_to_cpu(msg->hdr.src.name.num);
1788
1789 ceph_decode_need(&p, end, sizeof(u64)+2*sizeof(u32), bad);
c89136ea
SW
1790 tid = ceph_decode_64(&p);
1791 next_mds = ceph_decode_32(&p);
1792 fwd_seq = ceph_decode_32(&p);
1793 must_resend = ceph_decode_8(&p);
2f2dc053
SW
1794
1795 WARN_ON(must_resend); /* shouldn't happen. */
1796
1797 mutex_lock(&mdsc->mutex);
1798 req = __lookup_request(mdsc, tid);
1799 if (!req) {
1800 dout("forward %llu dne\n", tid);
1801 goto out; /* dup reply? */
1802 }
1803
1804 state = mdsc->sessions[next_mds]->s_state;
1805 if (fwd_seq <= req->r_num_fwd) {
1806 dout("forward %llu to mds%d - old seq %d <= %d\n",
1807 tid, next_mds, req->r_num_fwd, fwd_seq);
1808 } else {
1809 /* resend. forward race not possible; mds would drop */
1810 dout("forward %llu to mds%d (we resend)\n", tid, next_mds);
1811 req->r_num_fwd = fwd_seq;
1812 req->r_resend_mds = next_mds;
1813 put_request_session(req);
1814 __do_request(mdsc, req);
1815 }
1816 ceph_mdsc_put_request(req);
1817out:
1818 mutex_unlock(&mdsc->mutex);
1819 return;
1820
1821bad:
1822 pr_err("mdsc_handle_forward decode error err=%d\n", err);
1823}
1824
1825/*
1826 * handle a mds session control message
1827 */
1828static void handle_session(struct ceph_mds_session *session,
1829 struct ceph_msg *msg)
1830{
1831 struct ceph_mds_client *mdsc = session->s_mdsc;
1832 u32 op;
1833 u64 seq;
1834 int mds;
1835 struct ceph_mds_session_head *h = msg->front.iov_base;
1836 int wake = 0;
1837
1838 if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS)
1839 return;
1840 mds = le64_to_cpu(msg->hdr.src.name.num);
1841
1842 /* decode */
1843 if (msg->front.iov_len != sizeof(*h))
1844 goto bad;
1845 op = le32_to_cpu(h->op);
1846 seq = le64_to_cpu(h->seq);
1847
1848 mutex_lock(&mdsc->mutex);
1849 /* FIXME: this ttl calculation is generous */
1850 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
1851 mutex_unlock(&mdsc->mutex);
1852
1853 mutex_lock(&session->s_mutex);
1854
1855 dout("handle_session mds%d %s %p state %s seq %llu\n",
1856 mds, ceph_session_op_name(op), session,
1857 session_state_name(session->s_state), seq);
1858
1859 if (session->s_state == CEPH_MDS_SESSION_HUNG) {
1860 session->s_state = CEPH_MDS_SESSION_OPEN;
1861 pr_info("mds%d came back\n", session->s_mds);
1862 }
1863
1864 switch (op) {
1865 case CEPH_SESSION_OPEN:
1866 session->s_state = CEPH_MDS_SESSION_OPEN;
1867 renewed_caps(mdsc, session, 0);
1868 wake = 1;
1869 if (mdsc->stopping)
1870 __close_session(mdsc, session);
1871 break;
1872
1873 case CEPH_SESSION_RENEWCAPS:
1874 if (session->s_renew_seq == seq)
1875 renewed_caps(mdsc, session, 1);
1876 break;
1877
1878 case CEPH_SESSION_CLOSE:
42ce56e5 1879 unregister_session(mdsc, session);
2f2dc053
SW
1880 remove_session_caps(session);
1881 wake = 1; /* for good measure */
1882 complete(&mdsc->session_close_waiters);
1883 kick_requests(mdsc, mds, 0); /* cur only */
1884 break;
1885
1886 case CEPH_SESSION_STALE:
1887 pr_info("mds%d caps went stale, renewing\n",
1888 session->s_mds);
1889 spin_lock(&session->s_cap_lock);
1890 session->s_cap_gen++;
1891 session->s_cap_ttl = 0;
1892 spin_unlock(&session->s_cap_lock);
1893 send_renew_caps(mdsc, session);
1894 break;
1895
1896 case CEPH_SESSION_RECALL_STATE:
1897 trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
1898 break;
1899
1900 default:
1901 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
1902 WARN_ON(1);
1903 }
1904
1905 mutex_unlock(&session->s_mutex);
1906 if (wake) {
1907 mutex_lock(&mdsc->mutex);
1908 __wake_requests(mdsc, &session->s_waiting);
1909 mutex_unlock(&mdsc->mutex);
1910 }
1911 return;
1912
1913bad:
1914 pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
1915 (int)msg->front.iov_len);
1916 return;
1917}
1918
1919
1920/*
1921 * called under session->mutex.
1922 */
1923static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
1924 struct ceph_mds_session *session)
1925{
1926 struct ceph_mds_request *req, *nreq;
1927 int err;
1928
1929 dout("replay_unsafe_requests mds%d\n", session->s_mds);
1930
1931 mutex_lock(&mdsc->mutex);
1932 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
1933 err = __prepare_send_request(mdsc, req, session->s_mds);
1934 if (!err) {
1935 ceph_msg_get(req->r_request);
1936 ceph_con_send(&session->s_con, req->r_request);
1937 }
1938 }
1939 mutex_unlock(&mdsc->mutex);
1940}
1941
1942/*
1943 * Encode information about a cap for a reconnect with the MDS.
1944 */
1945struct encode_caps_data {
1946 void **pp;
1947 void *end;
1948 int *num_caps;
1949};
1950
1951static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
1952 void *arg)
1953{
1954 struct ceph_mds_cap_reconnect *rec;
1955 struct ceph_inode_info *ci;
1956 struct encode_caps_data *data = (struct encode_caps_data *)arg;
1957 void *p = *(data->pp);
1958 void *end = data->end;
1959 char *path;
1960 int pathlen, err;
1961 u64 pathbase;
1962 struct dentry *dentry;
1963
1964 ci = cap->ci;
1965
1966 dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
1967 inode, ceph_vinop(inode), cap, cap->cap_id,
1968 ceph_cap_string(cap->issued));
1969 ceph_decode_need(&p, end, sizeof(u64), needmore);
1970 ceph_encode_64(&p, ceph_ino(inode));
1971
1972 dentry = d_find_alias(inode);
1973 if (dentry) {
1974 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
1975 if (IS_ERR(path)) {
1976 err = PTR_ERR(path);
1977 BUG_ON(err);
1978 }
1979 } else {
1980 path = NULL;
1981 pathlen = 0;
1982 }
1983 ceph_decode_need(&p, end, pathlen+4, needmore);
1984 ceph_encode_string(&p, end, path, pathlen);
1985
1986 ceph_decode_need(&p, end, sizeof(*rec), needmore);
1987 rec = p;
1988 p += sizeof(*rec);
1989 BUG_ON(p > end);
1990 spin_lock(&inode->i_lock);
1991 cap->seq = 0; /* reset cap seq */
1992 cap->issue_seq = 0; /* and issue_seq */
1993 rec->cap_id = cpu_to_le64(cap->cap_id);
1994 rec->pathbase = cpu_to_le64(pathbase);
1995 rec->wanted = cpu_to_le32(__ceph_caps_wanted(ci));
1996 rec->issued = cpu_to_le32(cap->issued);
1997 rec->size = cpu_to_le64(inode->i_size);
1998 ceph_encode_timespec(&rec->mtime, &inode->i_mtime);
1999 ceph_encode_timespec(&rec->atime, &inode->i_atime);
2000 rec->snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2001 spin_unlock(&inode->i_lock);
2002
2003 kfree(path);
2004 dput(dentry);
2005 (*data->num_caps)++;
2006 *(data->pp) = p;
2007 return 0;
2008needmore:
2009 return -ENOSPC;
2010}
2011
2012
2013/*
2014 * If an MDS fails and recovers, clients need to reconnect in order to
2015 * reestablish shared state. This includes all caps issued through
2016 * this session _and_ the snap_realm hierarchy. Because it's not
2017 * clear which snap realms the mds cares about, we send everything we
2018 * know about.. that ensures we'll then get any new info the
2019 * recovering MDS might have.
2020 *
2021 * This is a relatively heavyweight operation, but it's rare.
2022 *
2023 * called with mdsc->mutex held.
2024 */
2025static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
2026{
2027 struct ceph_mds_session *session;
2028 struct ceph_msg *reply;
2029 int newlen, len = 4 + 1;
2030 void *p, *end;
2031 int err;
2032 int num_caps, num_realms = 0;
2033 int got;
2034 u64 next_snap_ino = 0;
2035 __le32 *pnum_caps, *pnum_realms;
2036 struct encode_caps_data iter_args;
2037
2038 pr_info("reconnect to recovering mds%d\n", mds);
2039
2040 /* find session */
2041 session = __ceph_lookup_mds_session(mdsc, mds);
2042 mutex_unlock(&mdsc->mutex); /* drop lock for duration */
2043
2044 if (session) {
2045 mutex_lock(&session->s_mutex);
2046
2047 session->s_state = CEPH_MDS_SESSION_RECONNECTING;
2048 session->s_seq = 0;
2049
2050 ceph_con_open(&session->s_con,
2051 ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
2052
2053 /* replay unsafe requests */
2054 replay_unsafe_requests(mdsc, session);
2055
2056 /* estimate needed space */
2057 len += session->s_nr_caps *
2058 (100+sizeof(struct ceph_mds_cap_reconnect));
2059 pr_info("estimating i need %d bytes for %d caps\n",
2060 len, session->s_nr_caps);
2061 } else {
2062 dout("no session for mds%d, will send short reconnect\n",
2063 mds);
2064 }
2065
2066 down_read(&mdsc->snap_rwsem);
2067
2068retry:
2069 /* build reply */
2070 reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, len, 0, 0, NULL);
2071 if (IS_ERR(reply)) {
2072 err = PTR_ERR(reply);
2073 pr_err("send_mds_reconnect ENOMEM on %d for mds%d\n",
2074 len, mds);
2075 goto out;
2076 }
2077 p = reply->front.iov_base;
2078 end = p + len;
2079
2080 if (!session) {
2081 ceph_encode_8(&p, 1); /* session was closed */
2082 ceph_encode_32(&p, 0);
2083 goto send;
2084 }
2085 dout("session %p state %s\n", session,
2086 session_state_name(session->s_state));
2087
2088 /* traverse this session's caps */
2089 ceph_encode_8(&p, 0);
2090 pnum_caps = p;
2091 ceph_encode_32(&p, session->s_nr_caps);
2092 num_caps = 0;
2093
2094 iter_args.pp = &p;
2095 iter_args.end = end;
2096 iter_args.num_caps = &num_caps;
2097 err = iterate_session_caps(session, encode_caps_cb, &iter_args);
2098 if (err == -ENOSPC)
2099 goto needmore;
2100 if (err < 0)
2101 goto out;
2102 *pnum_caps = cpu_to_le32(num_caps);
2103
2104 /*
2105 * snaprealms. we provide mds with the ino, seq (version), and
2106 * parent for all of our realms. If the mds has any newer info,
2107 * it will tell us.
2108 */
2109 next_snap_ino = 0;
2110 /* save some space for the snaprealm count */
2111 pnum_realms = p;
2112 ceph_decode_need(&p, end, sizeof(*pnum_realms), needmore);
2113 p += sizeof(*pnum_realms);
2114 num_realms = 0;
2115 while (1) {
2116 struct ceph_snap_realm *realm;
2117 struct ceph_mds_snaprealm_reconnect *sr_rec;
2118 got = radix_tree_gang_lookup(&mdsc->snap_realms,
2119 (void **)&realm, next_snap_ino, 1);
2120 if (!got)
2121 break;
2122
2123 dout(" adding snap realm %llx seq %lld parent %llx\n",
2124 realm->ino, realm->seq, realm->parent_ino);
2125 ceph_decode_need(&p, end, sizeof(*sr_rec), needmore);
2126 sr_rec = p;
2127 sr_rec->ino = cpu_to_le64(realm->ino);
2128 sr_rec->seq = cpu_to_le64(realm->seq);
2129 sr_rec->parent = cpu_to_le64(realm->parent_ino);
2130 p += sizeof(*sr_rec);
2131 num_realms++;
2132 next_snap_ino = realm->ino + 1;
2133 }
2134 *pnum_realms = cpu_to_le32(num_realms);
2135
2136send:
2137 reply->front.iov_len = p - reply->front.iov_base;
2138 reply->hdr.front_len = cpu_to_le32(reply->front.iov_len);
2139 dout("final len was %u (guessed %d)\n",
2140 (unsigned)reply->front.iov_len, len);
2141 ceph_con_send(&session->s_con, reply);
2142
2143 if (session) {
2144 session->s_state = CEPH_MDS_SESSION_OPEN;
2145 __wake_requests(mdsc, &session->s_waiting);
2146 }
2147
2148out:
2149 up_read(&mdsc->snap_rwsem);
2150 if (session) {
2151 mutex_unlock(&session->s_mutex);
2152 ceph_put_mds_session(session);
2153 }
2154 mutex_lock(&mdsc->mutex);
2155 return;
2156
2157needmore:
2158 /*
2159 * we need a larger buffer. this doesn't very accurately
2160 * factor in snap realms, but it's safe.
2161 */
2162 num_caps += num_realms;
2163 newlen = len * ((100 * (session->s_nr_caps+3)) / (num_caps + 1)) / 100;
2164 pr_info("i guessed %d, and did %d of %d caps, retrying with %d\n",
2165 len, num_caps, session->s_nr_caps, newlen);
2166 len = newlen;
2167 ceph_msg_put(reply);
2168 goto retry;
2169}
2170
2171
2172/*
2173 * compare old and new mdsmaps, kicking requests
2174 * and closing out old connections as necessary
2175 *
2176 * called under mdsc->mutex.
2177 */
2178static void check_new_map(struct ceph_mds_client *mdsc,
2179 struct ceph_mdsmap *newmap,
2180 struct ceph_mdsmap *oldmap)
2181{
2182 int i;
2183 int oldstate, newstate;
2184 struct ceph_mds_session *s;
2185
2186 dout("check_new_map new %u old %u\n",
2187 newmap->m_epoch, oldmap->m_epoch);
2188
2189 for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) {
2190 if (mdsc->sessions[i] == NULL)
2191 continue;
2192 s = mdsc->sessions[i];
2193 oldstate = ceph_mdsmap_get_state(oldmap, i);
2194 newstate = ceph_mdsmap_get_state(newmap, i);
2195
2196 dout("check_new_map mds%d state %s -> %s (session %s)\n",
2197 i, ceph_mds_state_name(oldstate),
2198 ceph_mds_state_name(newstate),
2199 session_state_name(s->s_state));
2200
2201 if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
2202 ceph_mdsmap_get_addr(newmap, i),
2203 sizeof(struct ceph_entity_addr))) {
2204 if (s->s_state == CEPH_MDS_SESSION_OPENING) {
2205 /* the session never opened, just close it
2206 * out now */
2207 __wake_requests(mdsc, &s->s_waiting);
42ce56e5 2208 unregister_session(mdsc, s);
2f2dc053
SW
2209 } else {
2210 /* just close it */
2211 mutex_unlock(&mdsc->mutex);
2212 mutex_lock(&s->s_mutex);
2213 mutex_lock(&mdsc->mutex);
2214 ceph_con_close(&s->s_con);
2215 mutex_unlock(&s->s_mutex);
2216 s->s_state = CEPH_MDS_SESSION_RESTARTING;
2217 }
2218
2219 /* kick any requests waiting on the recovering mds */
2220 kick_requests(mdsc, i, 1);
2221 } else if (oldstate == newstate) {
2222 continue; /* nothing new with this mds */
2223 }
2224
2225 /*
2226 * send reconnect?
2227 */
2228 if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
2229 newstate >= CEPH_MDS_STATE_RECONNECT)
2230 send_mds_reconnect(mdsc, i);
2231
2232 /*
2233 * kick requests on any mds that has gone active.
2234 *
2235 * kick requests on cur or forwarder: we may have sent
2236 * the request to mds1, mds1 told us it forwarded it
2237 * to mds2, but then we learn mds1 failed and can't be
2238 * sure it successfully forwarded our request before
2239 * it died.
2240 */
2241 if (oldstate < CEPH_MDS_STATE_ACTIVE &&
2242 newstate >= CEPH_MDS_STATE_ACTIVE) {
fef320ff 2243 pr_info("mds%d reconnect completed\n", s->s_mds);
2f2dc053
SW
2244 kick_requests(mdsc, i, 1);
2245 ceph_kick_flushing_caps(mdsc, s);
2246 }
2247 }
2248}
2249
2250
2251
2252/*
2253 * leases
2254 */
2255
2256/*
2257 * caller must hold session s_mutex, dentry->d_lock
2258 */
2259void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
2260{
2261 struct ceph_dentry_info *di = ceph_dentry(dentry);
2262
2263 ceph_put_mds_session(di->lease_session);
2264 di->lease_session = NULL;
2265}
2266
2267static void handle_lease(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
2268{
2269 struct super_block *sb = mdsc->client->sb;
2270 struct inode *inode;
2271 struct ceph_mds_session *session;
2272 struct ceph_inode_info *ci;
2273 struct dentry *parent, *dentry;
2274 struct ceph_dentry_info *di;
2275 int mds;
2276 struct ceph_mds_lease *h = msg->front.iov_base;
2277 struct ceph_vino vino;
2278 int mask;
2279 struct qstr dname;
2280 int release = 0;
2281
2282 if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS)
2283 return;
2284 mds = le64_to_cpu(msg->hdr.src.name.num);
2285 dout("handle_lease from mds%d\n", mds);
2286
2287 /* decode */
2288 if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
2289 goto bad;
2290 vino.ino = le64_to_cpu(h->ino);
2291 vino.snap = CEPH_NOSNAP;
2292 mask = le16_to_cpu(h->mask);
2293 dname.name = (void *)h + sizeof(*h) + sizeof(u32);
2294 dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32);
2295 if (dname.len != get_unaligned_le32(h+1))
2296 goto bad;
2297
2298 /* find session */
2299 mutex_lock(&mdsc->mutex);
2300 session = __ceph_lookup_mds_session(mdsc, mds);
2301 mutex_unlock(&mdsc->mutex);
2302 if (!session) {
2303 pr_err("handle_lease got lease but no session mds%d\n", mds);
2304 return;
2305 }
2306
2307 mutex_lock(&session->s_mutex);
2308 session->s_seq++;
2309
2310 /* lookup inode */
2311 inode = ceph_find_inode(sb, vino);
2312 dout("handle_lease '%s', mask %d, ino %llx %p\n",
2313 ceph_lease_op_name(h->action), mask, vino.ino, inode);
2314 if (inode == NULL) {
2315 dout("handle_lease no inode %llx\n", vino.ino);
2316 goto release;
2317 }
2318 ci = ceph_inode(inode);
2319
2320 /* dentry */
2321 parent = d_find_alias(inode);
2322 if (!parent) {
2323 dout("no parent dentry on inode %p\n", inode);
2324 WARN_ON(1);
2325 goto release; /* hrm... */
2326 }
2327 dname.hash = full_name_hash(dname.name, dname.len);
2328 dentry = d_lookup(parent, &dname);
2329 dput(parent);
2330 if (!dentry)
2331 goto release;
2332
2333 spin_lock(&dentry->d_lock);
2334 di = ceph_dentry(dentry);
2335 switch (h->action) {
2336 case CEPH_MDS_LEASE_REVOKE:
2337 if (di && di->lease_session == session) {
2338 h->seq = cpu_to_le32(di->lease_seq);
2339 __ceph_mdsc_drop_dentry_lease(dentry);
2340 }
2341 release = 1;
2342 break;
2343
2344 case CEPH_MDS_LEASE_RENEW:
2345 if (di && di->lease_session == session &&
2346 di->lease_gen == session->s_cap_gen &&
2347 di->lease_renew_from &&
2348 di->lease_renew_after == 0) {
2349 unsigned long duration =
2350 le32_to_cpu(h->duration_ms) * HZ / 1000;
2351
2352 di->lease_seq = le32_to_cpu(h->seq);
2353 dentry->d_time = di->lease_renew_from + duration;
2354 di->lease_renew_after = di->lease_renew_from +
2355 (duration >> 1);
2356 di->lease_renew_from = 0;
2357 }
2358 break;
2359 }
2360 spin_unlock(&dentry->d_lock);
2361 dput(dentry);
2362
2363 if (!release)
2364 goto out;
2365
2366release:
2367 /* let's just reuse the same message */
2368 h->action = CEPH_MDS_LEASE_REVOKE_ACK;
2369 ceph_msg_get(msg);
2370 ceph_con_send(&session->s_con, msg);
2371
2372out:
2373 iput(inode);
2374 mutex_unlock(&session->s_mutex);
2375 ceph_put_mds_session(session);
2376 return;
2377
2378bad:
2379 pr_err("corrupt lease message\n");
2380}
2381
2382void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
2383 struct inode *inode,
2384 struct dentry *dentry, char action,
2385 u32 seq)
2386{
2387 struct ceph_msg *msg;
2388 struct ceph_mds_lease *lease;
2389 int len = sizeof(*lease) + sizeof(u32);
2390 int dnamelen = 0;
2391
2392 dout("lease_send_msg inode %p dentry %p %s to mds%d\n",
2393 inode, dentry, ceph_lease_op_name(action), session->s_mds);
2394 dnamelen = dentry->d_name.len;
2395 len += dnamelen;
2396
2397 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, 0, 0, NULL);
2398 if (IS_ERR(msg))
2399 return;
2400 lease = msg->front.iov_base;
2401 lease->action = action;
2402 lease->mask = cpu_to_le16(CEPH_LOCK_DN);
2403 lease->ino = cpu_to_le64(ceph_vino(inode).ino);
2404 lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
2405 lease->seq = cpu_to_le32(seq);
2406 put_unaligned_le32(dnamelen, lease + 1);
2407 memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen);
2408
2409 /*
2410 * if this is a preemptive lease RELEASE, no need to
2411 * flush request stream, since the actual request will
2412 * soon follow.
2413 */
2414 msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
2415
2416 ceph_con_send(&session->s_con, msg);
2417}
2418
2419/*
2420 * Preemptively release a lease we expect to invalidate anyway.
2421 * Pass @inode always, @dentry is optional.
2422 */
2423void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
2424 struct dentry *dentry, int mask)
2425{
2426 struct ceph_dentry_info *di;
2427 struct ceph_mds_session *session;
2428 u32 seq;
2429
2430 BUG_ON(inode == NULL);
2431 BUG_ON(dentry == NULL);
2432 BUG_ON(mask != CEPH_LOCK_DN);
2433
2434 /* is dentry lease valid? */
2435 spin_lock(&dentry->d_lock);
2436 di = ceph_dentry(dentry);
2437 if (!di || !di->lease_session ||
2438 di->lease_session->s_mds < 0 ||
2439 di->lease_gen != di->lease_session->s_cap_gen ||
2440 !time_before(jiffies, dentry->d_time)) {
2441 dout("lease_release inode %p dentry %p -- "
2442 "no lease on %d\n",
2443 inode, dentry, mask);
2444 spin_unlock(&dentry->d_lock);
2445 return;
2446 }
2447
2448 /* we do have a lease on this dentry; note mds and seq */
2449 session = ceph_get_mds_session(di->lease_session);
2450 seq = di->lease_seq;
2451 __ceph_mdsc_drop_dentry_lease(dentry);
2452 spin_unlock(&dentry->d_lock);
2453
2454 dout("lease_release inode %p dentry %p mask %d to mds%d\n",
2455 inode, dentry, mask, session->s_mds);
2456 ceph_mdsc_lease_send_msg(session, inode, dentry,
2457 CEPH_MDS_LEASE_RELEASE, seq);
2458 ceph_put_mds_session(session);
2459}
2460
2461/*
2462 * drop all leases (and dentry refs) in preparation for umount
2463 */
2464static void drop_leases(struct ceph_mds_client *mdsc)
2465{
2466 int i;
2467
2468 dout("drop_leases\n");
2469 mutex_lock(&mdsc->mutex);
2470 for (i = 0; i < mdsc->max_sessions; i++) {
2471 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
2472 if (!s)
2473 continue;
2474 mutex_unlock(&mdsc->mutex);
2475 mutex_lock(&s->s_mutex);
2476 mutex_unlock(&s->s_mutex);
2477 ceph_put_mds_session(s);
2478 mutex_lock(&mdsc->mutex);
2479 }
2480 mutex_unlock(&mdsc->mutex);
2481}
2482
2483
2484
2485/*
2486 * delayed work -- periodically trim expired leases, renew caps with mds
2487 */
2488static void schedule_delayed(struct ceph_mds_client *mdsc)
2489{
2490 int delay = 5;
2491 unsigned hz = round_jiffies_relative(HZ * delay);
2492 schedule_delayed_work(&mdsc->delayed_work, hz);
2493}
2494
2495static void delayed_work(struct work_struct *work)
2496{
2497 int i;
2498 struct ceph_mds_client *mdsc =
2499 container_of(work, struct ceph_mds_client, delayed_work.work);
2500 int renew_interval;
2501 int renew_caps;
2502
2503 dout("mdsc delayed_work\n");
afcdaea3 2504 ceph_check_delayed_caps(mdsc);
2f2dc053
SW
2505
2506 mutex_lock(&mdsc->mutex);
2507 renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
2508 renew_caps = time_after_eq(jiffies, HZ*renew_interval +
2509 mdsc->last_renew_caps);
2510 if (renew_caps)
2511 mdsc->last_renew_caps = jiffies;
2512
2513 for (i = 0; i < mdsc->max_sessions; i++) {
2514 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
2515 if (s == NULL)
2516 continue;
2517 if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
2518 dout("resending session close request for mds%d\n",
2519 s->s_mds);
2520 request_close_session(mdsc, s);
2521 ceph_put_mds_session(s);
2522 continue;
2523 }
2524 if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
2525 if (s->s_state == CEPH_MDS_SESSION_OPEN) {
2526 s->s_state = CEPH_MDS_SESSION_HUNG;
2527 pr_info("mds%d hung\n", s->s_mds);
2528 }
2529 }
2530 if (s->s_state < CEPH_MDS_SESSION_OPEN) {
2531 /* this mds is failed or recovering, just wait */
2532 ceph_put_mds_session(s);
2533 continue;
2534 }
2535 mutex_unlock(&mdsc->mutex);
2536
2537 mutex_lock(&s->s_mutex);
2538 if (renew_caps)
2539 send_renew_caps(mdsc, s);
2540 else
2541 ceph_con_keepalive(&s->s_con);
2542 add_cap_releases(mdsc, s, -1);
2543 send_cap_releases(mdsc, s);
2544 mutex_unlock(&s->s_mutex);
2545 ceph_put_mds_session(s);
2546
2547 mutex_lock(&mdsc->mutex);
2548 }
2549 mutex_unlock(&mdsc->mutex);
2550
2551 schedule_delayed(mdsc);
2552}
2553
2554
2555void ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
2556{
2557 mdsc->client = client;
2558 mutex_init(&mdsc->mutex);
2559 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
2560 init_completion(&mdsc->safe_umount_waiters);
2561 init_completion(&mdsc->session_close_waiters);
2562 INIT_LIST_HEAD(&mdsc->waiting_for_map);
2563 mdsc->sessions = NULL;
2564 mdsc->max_sessions = 0;
2565 mdsc->stopping = 0;
2566 init_rwsem(&mdsc->snap_rwsem);
2567 INIT_RADIX_TREE(&mdsc->snap_realms, GFP_NOFS);
2568 INIT_LIST_HEAD(&mdsc->snap_empty);
2569 spin_lock_init(&mdsc->snap_empty_lock);
2570 mdsc->last_tid = 0;
2571 INIT_RADIX_TREE(&mdsc->request_tree, GFP_NOFS);
2572 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
2573 mdsc->last_renew_caps = jiffies;
2574 INIT_LIST_HEAD(&mdsc->cap_delay_list);
2575 spin_lock_init(&mdsc->cap_delay_lock);
2576 INIT_LIST_HEAD(&mdsc->snap_flush_list);
2577 spin_lock_init(&mdsc->snap_flush_lock);
2578 mdsc->cap_flush_seq = 0;
2579 INIT_LIST_HEAD(&mdsc->cap_dirty);
2580 mdsc->num_cap_flushing = 0;
2581 spin_lock_init(&mdsc->cap_dirty_lock);
2582 init_waitqueue_head(&mdsc->cap_flushing_wq);
2583 spin_lock_init(&mdsc->dentry_lru_lock);
2584 INIT_LIST_HEAD(&mdsc->dentry_lru);
2585}
2586
2587/*
2588 * Wait for safe replies on open mds requests. If we time out, drop
2589 * all requests from the tree to avoid dangling dentry refs.
2590 */
2591static void wait_requests(struct ceph_mds_client *mdsc)
2592{
2593 struct ceph_mds_request *req;
2594 struct ceph_client *client = mdsc->client;
2595
2596 mutex_lock(&mdsc->mutex);
2597 if (__get_oldest_tid(mdsc)) {
2598 mutex_unlock(&mdsc->mutex);
2599 dout("wait_requests waiting for requests\n");
2600 wait_for_completion_timeout(&mdsc->safe_umount_waiters,
6b805185 2601 client->mount_args->mount_timeout * HZ);
2f2dc053
SW
2602 mutex_lock(&mdsc->mutex);
2603
2604 /* tear down remaining requests */
2605 while (radix_tree_gang_lookup(&mdsc->request_tree,
2606 (void **)&req, 0, 1)) {
2607 dout("wait_requests timed out on tid %llu\n",
2608 req->r_tid);
2609 radix_tree_delete(&mdsc->request_tree, req->r_tid);
2610 ceph_mdsc_put_request(req);
2611 }
2612 }
2613 mutex_unlock(&mdsc->mutex);
2614 dout("wait_requests done\n");
2615}
2616
2617/*
2618 * called before mount is ro, and before dentries are torn down.
2619 * (hmm, does this still race with new lookups?)
2620 */
2621void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
2622{
2623 dout("pre_umount\n");
2624 mdsc->stopping = 1;
2625
2626 drop_leases(mdsc);
afcdaea3 2627 ceph_flush_dirty_caps(mdsc);
2f2dc053
SW
2628 wait_requests(mdsc);
2629}
2630
2631/*
2632 * wait for all write mds requests to flush.
2633 */
2634static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
2635{
2636 struct ceph_mds_request *req;
2637 u64 next_tid = 0;
2638 int got;
2639
2640 mutex_lock(&mdsc->mutex);
2641 dout("wait_unsafe_requests want %lld\n", want_tid);
2642 while (1) {
2643 got = radix_tree_gang_lookup(&mdsc->request_tree, (void **)&req,
2644 next_tid, 1);
2645 if (!got)
2646 break;
2647 if (req->r_tid > want_tid)
2648 break;
2649
2650 next_tid = req->r_tid + 1;
2651 if ((req->r_op & CEPH_MDS_OP_WRITE) == 0)
2652 continue; /* not a write op */
2653
2654 ceph_mdsc_get_request(req);
2655 mutex_unlock(&mdsc->mutex);
2656 dout("wait_unsafe_requests wait on %llu (want %llu)\n",
2657 req->r_tid, want_tid);
2658 wait_for_completion(&req->r_safe_completion);
2659 mutex_lock(&mdsc->mutex);
2660 ceph_mdsc_put_request(req);
2661 }
2662 mutex_unlock(&mdsc->mutex);
2663 dout("wait_unsafe_requests done\n");
2664}
2665
2666void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
2667{
2668 u64 want_tid, want_flush;
2669
2670 dout("sync\n");
2671 mutex_lock(&mdsc->mutex);
2672 want_tid = mdsc->last_tid;
2673 want_flush = mdsc->cap_flush_seq;
2674 mutex_unlock(&mdsc->mutex);
2675 dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
2676
afcdaea3 2677 ceph_flush_dirty_caps(mdsc);
2f2dc053
SW
2678
2679 wait_unsafe_requests(mdsc, want_tid);
2680 wait_event(mdsc->cap_flushing_wq, check_cap_flush(mdsc, want_flush));
2681}
2682
2683
2684/*
2685 * called after sb is ro.
2686 */
2687void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
2688{
2689 struct ceph_mds_session *session;
2690 int i;
2691 int n;
2692 struct ceph_client *client = mdsc->client;
6b805185 2693 unsigned long started, timeout = client->mount_args->mount_timeout * HZ;
2f2dc053
SW
2694
2695 dout("close_sessions\n");
2696
2697 mutex_lock(&mdsc->mutex);
2698
2699 /* close sessions */
2700 started = jiffies;
2701 while (time_before(jiffies, started + timeout)) {
2702 dout("closing sessions\n");
2703 n = 0;
2704 for (i = 0; i < mdsc->max_sessions; i++) {
2705 session = __ceph_lookup_mds_session(mdsc, i);
2706 if (!session)
2707 continue;
2708 mutex_unlock(&mdsc->mutex);
2709 mutex_lock(&session->s_mutex);
2710 __close_session(mdsc, session);
2711 mutex_unlock(&session->s_mutex);
2712 ceph_put_mds_session(session);
2713 mutex_lock(&mdsc->mutex);
2714 n++;
2715 }
2716 if (n == 0)
2717 break;
2718
2719 if (client->mount_state == CEPH_MOUNT_SHUTDOWN)
2720 break;
2721
2722 dout("waiting for sessions to close\n");
2723 mutex_unlock(&mdsc->mutex);
2724 wait_for_completion_timeout(&mdsc->session_close_waiters,
2725 timeout);
2726 mutex_lock(&mdsc->mutex);
2727 }
2728
2729 /* tear down remaining sessions */
2730 for (i = 0; i < mdsc->max_sessions; i++) {
2731 if (mdsc->sessions[i]) {
2732 session = get_session(mdsc->sessions[i]);
42ce56e5 2733 unregister_session(mdsc, session);
2f2dc053
SW
2734 mutex_unlock(&mdsc->mutex);
2735 mutex_lock(&session->s_mutex);
2736 remove_session_caps(session);
2737 mutex_unlock(&session->s_mutex);
2738 ceph_put_mds_session(session);
2739 mutex_lock(&mdsc->mutex);
2740 }
2741 }
2742
2743 WARN_ON(!list_empty(&mdsc->cap_delay_list));
2744
2745 mutex_unlock(&mdsc->mutex);
2746
2747 ceph_cleanup_empty_realms(mdsc);
2748
2749 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
2750
2751 dout("stopped\n");
2752}
2753
2754void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
2755{
2756 dout("stop\n");
2757 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
2758 if (mdsc->mdsmap)
2759 ceph_mdsmap_destroy(mdsc->mdsmap);
2760 kfree(mdsc->sessions);
2761}
2762
2763
2764/*
2765 * handle mds map update.
2766 */
2767void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
2768{
2769 u32 epoch;
2770 u32 maplen;
2771 void *p = msg->front.iov_base;
2772 void *end = p + msg->front.iov_len;
2773 struct ceph_mdsmap *newmap, *oldmap;
2774 struct ceph_fsid fsid;
2775 int err = -EINVAL;
2776
2777 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
2778 ceph_decode_copy(&p, &fsid, sizeof(fsid));
2779 if (ceph_fsid_compare(&fsid, &mdsc->client->monc.monmap->fsid)) {
2780 pr_err("got mdsmap with wrong fsid\n");
2781 return;
2782 }
c89136ea
SW
2783 epoch = ceph_decode_32(&p);
2784 maplen = ceph_decode_32(&p);
2f2dc053
SW
2785 dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
2786
2787 /* do we need it? */
2788 ceph_monc_got_mdsmap(&mdsc->client->monc, epoch);
2789 mutex_lock(&mdsc->mutex);
2790 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
2791 dout("handle_map epoch %u <= our %u\n",
2792 epoch, mdsc->mdsmap->m_epoch);
2793 mutex_unlock(&mdsc->mutex);
2794 return;
2795 }
2796
2797 newmap = ceph_mdsmap_decode(&p, end);
2798 if (IS_ERR(newmap)) {
2799 err = PTR_ERR(newmap);
2800 goto bad_unlock;
2801 }
2802
2803 /* swap into place */
2804 if (mdsc->mdsmap) {
2805 oldmap = mdsc->mdsmap;
2806 mdsc->mdsmap = newmap;
2807 check_new_map(mdsc, newmap, oldmap);
2808 ceph_mdsmap_destroy(oldmap);
2809 } else {
2810 mdsc->mdsmap = newmap; /* first mds map */
2811 }
2812 mdsc->client->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
2813
2814 __wake_requests(mdsc, &mdsc->waiting_for_map);
2815
2816 mutex_unlock(&mdsc->mutex);
2817 schedule_delayed(mdsc);
2818 return;
2819
2820bad_unlock:
2821 mutex_unlock(&mdsc->mutex);
2822bad:
2823 pr_err("error decoding mdsmap %d\n", err);
2824 return;
2825}
2826
2827static struct ceph_connection *con_get(struct ceph_connection *con)
2828{
2829 struct ceph_mds_session *s = con->private;
2830
2831 if (get_session(s)) {
2832 dout("mdsc con_get %p %d -> %d\n", s,
2833 atomic_read(&s->s_ref) - 1, atomic_read(&s->s_ref));
2834 return con;
2835 }
2836 dout("mdsc con_get %p FAIL\n", s);
2837 return NULL;
2838}
2839
2840static void con_put(struct ceph_connection *con)
2841{
2842 struct ceph_mds_session *s = con->private;
2843
2844 dout("mdsc con_put %p %d -> %d\n", s, atomic_read(&s->s_ref),
2845 atomic_read(&s->s_ref) - 1);
2846 ceph_put_mds_session(s);
2847}
2848
2849/*
2850 * if the client is unresponsive for long enough, the mds will kill
2851 * the session entirely.
2852 */
2853static void peer_reset(struct ceph_connection *con)
2854{
2855 struct ceph_mds_session *s = con->private;
2856
2857 pr_err("mds%d gave us the boot. IMPLEMENT RECONNECT.\n",
2858 s->s_mds);
2859}
2860
2861static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
2862{
2863 struct ceph_mds_session *s = con->private;
2864 struct ceph_mds_client *mdsc = s->s_mdsc;
2865 int type = le16_to_cpu(msg->hdr.type);
2866
2867 switch (type) {
2868 case CEPH_MSG_MDS_MAP:
2869 ceph_mdsc_handle_map(mdsc, msg);
2870 break;
2871 case CEPH_MSG_CLIENT_SESSION:
2872 handle_session(s, msg);
2873 break;
2874 case CEPH_MSG_CLIENT_REPLY:
2875 handle_reply(s, msg);
2876 break;
2877 case CEPH_MSG_CLIENT_REQUEST_FORWARD:
2878 handle_forward(mdsc, msg);
2879 break;
2880 case CEPH_MSG_CLIENT_CAPS:
2881 ceph_handle_caps(s, msg);
2882 break;
2883 case CEPH_MSG_CLIENT_SNAP:
2884 ceph_handle_snap(mdsc, msg);
2885 break;
2886 case CEPH_MSG_CLIENT_LEASE:
2887 handle_lease(mdsc, msg);
2888 break;
2889
2890 default:
2891 pr_err("received unknown message type %d %s\n", type,
2892 ceph_msg_type_name(type));
2893 }
2894 ceph_msg_put(msg);
2895}
2896
2897const static struct ceph_connection_operations mds_con_ops = {
2898 .get = con_get,
2899 .put = con_put,
2900 .dispatch = dispatch,
2901 .peer_reset = peer_reset,
2902 .alloc_msg = ceph_alloc_msg,
2903 .alloc_middle = ceph_alloc_middle,
2904};
2905
2906
2907
2908
2909/* eof */
This page took 0.187717 seconds and 5 git commands to generate.