Merge branch 'for-linus/2635-updates' of git://git.fluff.org/bjdooks/linux
[deliverable/linux.git] / fs / ceph / messenger.c
index cd4fadb6491afe3a48ad5081b5f4ab2a41e4b0bf..60b74839ebec4dd233206d56d25a2bd7356a12ee 100644 (file)
@@ -39,18 +39,6 @@ static void queue_con(struct ceph_connection *con);
 static void con_work(struct work_struct *);
 static void ceph_fault(struct ceph_connection *con);
 
-const char *ceph_name_type_str(int t)
-{
-       switch (t) {
-       case CEPH_ENTITY_TYPE_MON: return "mon";
-       case CEPH_ENTITY_TYPE_MDS: return "mds";
-       case CEPH_ENTITY_TYPE_OSD: return "osd";
-       case CEPH_ENTITY_TYPE_CLIENT: return "client";
-       case CEPH_ENTITY_TYPE_ADMIN: return "admin";
-       default: return "???";
-       }
-}
-
 /*
  * nicely render a sockaddr as a string.
  */
@@ -340,6 +328,7 @@ static void reset_connection(struct ceph_connection *con)
                ceph_msg_put(con->out_msg);
                con->out_msg = NULL;
        }
+       con->out_keepalive_pending = false;
        con->in_seq = 0;
        con->in_seq_acked = 0;
 }
@@ -357,6 +346,7 @@ void ceph_con_close(struct ceph_connection *con)
        clear_bit(WRITE_PENDING, &con->state);
        mutex_lock(&con->mutex);
        reset_connection(con);
+       con->peer_global_seq = 0;
        cancel_delayed_work(&con->work);
        mutex_unlock(&con->mutex);
        queue_con(con);
@@ -661,7 +651,7 @@ static void prepare_write_connect(struct ceph_messenger *msgr,
        dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
             con->connect_seq, global_seq, proto);
 
-       con->out_connect.features = CEPH_FEATURE_SUPPORTED;
+       con->out_connect.features = CEPH_FEATURE_SUPPORTED_CLIENT;
        con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
        con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
        con->out_connect.global_seq = cpu_to_le32(global_seq);
@@ -1124,8 +1114,8 @@ static void fail_protocol(struct ceph_connection *con)
 
 static int process_connect(struct ceph_connection *con)
 {
-       u64 sup_feat = CEPH_FEATURE_SUPPORTED;
-       u64 req_feat = CEPH_FEATURE_REQUIRED;
+       u64 sup_feat = CEPH_FEATURE_SUPPORTED_CLIENT;
+       u64 req_feat = CEPH_FEATURE_REQUIRED_CLIENT;
        u64 server_feat = le64_to_cpu(con->in_reply.features);
 
        dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
@@ -1233,6 +1223,7 @@ static int process_connect(struct ceph_connection *con)
                clear_bit(CONNECTING, &con->state);
                con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
                con->connect_seq++;
+               con->peer_features = server_feat;
                dout("process_connect got READY gseq %d cseq %d (%d)\n",
                     con->peer_global_seq,
                     le32_to_cpu(con->in_reply.connect_seq),
@@ -1402,19 +1393,17 @@ static int read_partial_message(struct ceph_connection *con)
                con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip);
                if (skip) {
                        /* skip this message */
-                       dout("alloc_msg returned NULL, skipping message\n");
+                       dout("alloc_msg said skip message\n");
                        con->in_base_pos = -front_len - middle_len - data_len -
                                sizeof(m->footer);
                        con->in_tag = CEPH_MSGR_TAG_READY;
                        con->in_seq++;
                        return 0;
                }
-               if (IS_ERR(con->in_msg)) {
-                       ret = PTR_ERR(con->in_msg);
-                       con->in_msg = NULL;
+               if (!con->in_msg) {
                        con->error_msg =
                                "error allocating memory for incoming message";
-                       return ret;
+                       return -ENOMEM;
                }
                m = con->in_msg;
                m->front.iov_len = 0;    /* haven't read it yet */
@@ -1514,14 +1503,14 @@ static void process_message(struct ceph_connection *con)
 
        /* if first message, set peer_name */
        if (con->peer_name.type == 0)
-               con->peer_name = msg->hdr.src.name;
+               con->peer_name = msg->hdr.src;
 
        con->in_seq++;
        mutex_unlock(&con->mutex);
 
        dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n",
             msg, le64_to_cpu(msg->hdr.seq),
-            ENTITY_NAME(msg->hdr.src.name),
+            ENTITY_NAME(msg->hdr.src),
             le16_to_cpu(msg->hdr.type),
             ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
             le32_to_cpu(msg->hdr.front_len),
@@ -1546,7 +1535,6 @@ static int try_write(struct ceph_connection *con)
        dout("try_write start %p state %lu nref %d\n", con, con->state,
             atomic_read(&con->nref));
 
-       mutex_lock(&con->mutex);
 more:
        dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
 
@@ -1639,7 +1627,6 @@ do_next:
 done:
        ret = 0;
 out:
-       mutex_unlock(&con->mutex);
        dout("try_write done on %p\n", con);
        return ret;
 }
@@ -1651,7 +1638,6 @@ out:
  */
 static int try_read(struct ceph_connection *con)
 {
-       struct ceph_messenger *msgr;
        int ret = -1;
 
        if (!con->sock)
@@ -1661,9 +1647,6 @@ static int try_read(struct ceph_connection *con)
                return 0;
 
        dout("try_read start on %p\n", con);
-       msgr = con->msgr;
-
-       mutex_lock(&con->mutex);
 
 more:
        dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
@@ -1758,7 +1741,6 @@ more:
 done:
        ret = 0;
 out:
-       mutex_unlock(&con->mutex);
        dout("try_read done on %p\n", con);
        return ret;
 
@@ -1830,6 +1812,8 @@ more:
        dout("con_work %p start, clearing QUEUED\n", con);
        clear_bit(QUEUED, &con->state);
 
+       mutex_lock(&con->mutex);
+
        if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */
                dout("con_work CLOSED\n");
                con_close_socket(con);
@@ -1844,11 +1828,16 @@ more:
        if (test_and_clear_bit(SOCK_CLOSED, &con->state) ||
            try_read(con) < 0 ||
            try_write(con) < 0) {
+               mutex_unlock(&con->mutex);
                backoff = 1;
                ceph_fault(con);     /* error/fault path */
+               goto done_unlocked;
        }
 
 done:
+       mutex_unlock(&con->mutex);
+
+done_unlocked:
        clear_bit(BUSY, &con->state);
        dout("con->state=%lu\n", con->state);
        if (test_bit(QUEUED, &con->state)) {
@@ -1947,7 +1936,7 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr)
 
        /* the zero page is needed if a request is "canceled" while the message
         * is being written over the socket */
-       msgr->zero_page = alloc_page(GFP_KERNEL | __GFP_ZERO);
+       msgr->zero_page = __page_cache_alloc(GFP_KERNEL | __GFP_ZERO);
        if (!msgr->zero_page) {
                kfree(msgr);
                return ERR_PTR(-ENOMEM);
@@ -1987,9 +1976,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
        }
 
        /* set src+dst */
-       msg->hdr.src.name = con->msgr->inst.name;
-       msg->hdr.src.addr = con->msgr->my_enc_addr;
-       msg->hdr.orig_src = msg->hdr.src;
+       msg->hdr.src = con->msgr->inst.name;
 
        BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len));
 
@@ -2083,12 +2070,11 @@ void ceph_con_keepalive(struct ceph_connection *con)
  * construct a new message with given type, size
  * the new msg has a ref count of 1.
  */
-struct ceph_msg *ceph_msg_new(int type, int front_len,
-                             int page_len, int page_off, struct page **pages)
+struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
 {
        struct ceph_msg *m;
 
-       m = kmalloc(sizeof(*m), GFP_NOFS);
+       m = kmalloc(sizeof(*m), flags);
        if (m == NULL)
                goto out;
        kref_init(&m->kref);
@@ -2100,8 +2086,8 @@ struct ceph_msg *ceph_msg_new(int type, int front_len,
        m->hdr.version = 0;
        m->hdr.front_len = cpu_to_le32(front_len);
        m->hdr.middle_len = 0;
-       m->hdr.data_len = cpu_to_le32(page_len);
-       m->hdr.data_off = cpu_to_le16(page_off);
+       m->hdr.data_len = 0;
+       m->hdr.data_off = 0;
        m->hdr.reserved = 0;
        m->footer.front_crc = 0;
        m->footer.middle_crc = 0;
@@ -2115,11 +2101,11 @@ struct ceph_msg *ceph_msg_new(int type, int front_len,
        /* front */
        if (front_len) {
                if (front_len > PAGE_CACHE_SIZE) {
-                       m->front.iov_base = __vmalloc(front_len, GFP_NOFS,
+                       m->front.iov_base = __vmalloc(front_len, flags,
                                                      PAGE_KERNEL);
                        m->front_is_vmalloc = true;
                } else {
-                       m->front.iov_base = kmalloc(front_len, GFP_NOFS);
+                       m->front.iov_base = kmalloc(front_len, flags);
                }
                if (m->front.iov_base == NULL) {
                        pr_err("msg_new can't allocate %d bytes\n",
@@ -2135,19 +2121,18 @@ struct ceph_msg *ceph_msg_new(int type, int front_len,
        m->middle = NULL;
 
        /* data */
-       m->nr_pages = calc_pages_for(page_off, page_len);
-       m->pages = pages;
+       m->nr_pages = 0;
+       m->pages = NULL;
        m->pagelist = NULL;
 
-       dout("ceph_msg_new %p page %d~%d -> %d\n", m, page_off, page_len,
-            m->nr_pages);
+       dout("ceph_msg_new %p front %d\n", m, front_len);
        return m;
 
 out2:
        ceph_msg_put(m);
 out:
-       pr_err("msg_new can't create type %d len %d\n", type, front_len);
-       return ERR_PTR(-ENOMEM);
+       pr_err("msg_new can't create type %d front %d\n", type, front_len);
+       return NULL;
 }
 
 /*
@@ -2190,29 +2175,25 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
                mutex_unlock(&con->mutex);
                msg = con->ops->alloc_msg(con, hdr, skip);
                mutex_lock(&con->mutex);
-               if (IS_ERR(msg))
-                       return msg;
-
-               if (*skip)
+               if (!msg || *skip)
                        return NULL;
        }
        if (!msg) {
                *skip = 0;
-               msg = ceph_msg_new(type, front_len, 0, 0, NULL);
+               msg = ceph_msg_new(type, front_len, GFP_NOFS);
                if (!msg) {
                        pr_err("unable to allocate msg type %d len %d\n",
                               type, front_len);
-                       return ERR_PTR(-ENOMEM);
+                       return NULL;
                }
        }
        memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
 
-       if (middle_len) {
+       if (middle_len && !msg->middle) {
                ret = ceph_alloc_middle(con, msg);
-
                if (ret < 0) {
                        ceph_msg_put(msg);
-                       return msg;
+                       return NULL;
                }
        }
 
This page took 0.030165 seconds and 5 git commands to generate.