summaryrefslogtreecommitdiffstats
path: root/fs/ceph/mds_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph/mds_client.c')
-rw-r--r--fs/ceph/mds_client.c195
1 files changed, 101 insertions, 94 deletions
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 21c33ed048ed..959b1bf7c327 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -550,15 +550,9 @@ void ceph_put_mds_session(struct ceph_mds_session *s)
struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
int mds)
{
- struct ceph_mds_session *session;
-
if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
return NULL;
- session = mdsc->sessions[mds];
- dout("lookup_mds_session %p %d\n", session,
- refcount_read(&session->s_ref));
- get_session(session);
- return session;
+ return get_session(mdsc->sessions[mds]);
}
static bool __have_session(struct ceph_mds_client *mdsc, int mds)
@@ -1284,9 +1278,9 @@ static void cleanup_session_requests(struct ceph_mds_client *mdsc,
*
* Caller must hold session s_mutex.
*/
-static int iterate_session_caps(struct ceph_mds_session *session,
- int (*cb)(struct inode *, struct ceph_cap *,
- void *), void *arg)
+int ceph_iterate_session_caps(struct ceph_mds_session *session,
+ int (*cb)(struct inode *, struct ceph_cap *,
+ void *), void *arg)
{
struct list_head *p;
struct ceph_cap *cap;
@@ -1414,6 +1408,15 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove);
ci->i_prealloc_cap_flush = NULL;
}
+
+ if (drop &&
+ ci->i_wrbuffer_ref_head == 0 &&
+ ci->i_wr_ref == 0 &&
+ ci->i_dirty_caps == 0 &&
+ ci->i_flushing_caps == 0) {
+ ceph_put_snap_context(ci->i_head_snapc);
+ ci->i_head_snapc = NULL;
+ }
}
spin_unlock(&ci->i_ceph_lock);
while (!list_empty(&to_remove)) {
@@ -1442,7 +1445,7 @@ static void remove_session_caps(struct ceph_mds_session *session)
LIST_HEAD(dispose);
dout("remove_session_caps on %p\n", session);
- iterate_session_caps(session, remove_session_caps_cb, fsc);
+ ceph_iterate_session_caps(session, remove_session_caps_cb, fsc);
wake_up_all(&fsc->mdsc->cap_flushing_wq);
@@ -1525,8 +1528,8 @@ static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
static void wake_up_session_caps(struct ceph_mds_session *session, int ev)
{
dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
- iterate_session_caps(session, wake_up_session_cb,
- (void *)(unsigned long)ev);
+ ceph_iterate_session_caps(session, wake_up_session_cb,
+ (void *)(unsigned long)ev);
}
/*
@@ -1759,7 +1762,7 @@ int ceph_trim_caps(struct ceph_mds_client *mdsc,
session->s_mds, session->s_nr_caps, max_caps, trim_caps);
if (trim_caps > 0) {
session->s_trim_caps = trim_caps;
- iterate_session_caps(session, trim_caps_cb, session);
+ ceph_iterate_session_caps(session, trim_caps_cb, session);
dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
session->s_mds, session->s_nr_caps, max_caps,
trim_caps - session->s_trim_caps);
@@ -1852,7 +1855,8 @@ again:
num_cap_releases--;
head = msg->front.iov_base;
- le32_add_cpu(&head->num, 1);
+ put_unaligned_le32(get_unaligned_le32(&head->num) + 1,
+ &head->num);
item = msg->front.iov_base + msg->front.iov_len;
item->ino = cpu_to_le64(cap->cap_ino);
item->cap_id = cpu_to_le64(cap->cap_id);
@@ -2080,43 +2084,29 @@ static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
* Encode hidden .snap dirs as a double /, i.e.
* foo/.snap/bar -> foo//bar
*/
-char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
+char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase,
int stop_on_nosnap)
{
struct dentry *temp;
char *path;
- int len, pos;
+ int pos;
unsigned seq;
+ u64 base;
if (!dentry)
return ERR_PTR(-EINVAL);
-retry:
- len = 0;
- seq = read_seqbegin(&rename_lock);
- rcu_read_lock();
- for (temp = dentry; !IS_ROOT(temp);) {
- struct inode *inode = d_inode(temp);
- if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
- len++; /* slash only */
- else if (stop_on_nosnap && inode &&
- ceph_snap(inode) == CEPH_NOSNAP)
- break;
- else
- len += 1 + temp->d_name.len;
- temp = temp->d_parent;
- }
- rcu_read_unlock();
- if (len)
- len--; /* no leading '/' */
-
- path = kmalloc(len+1, GFP_NOFS);
+ path = __getname();
if (!path)
return ERR_PTR(-ENOMEM);
- pos = len;
- path[pos] = 0; /* trailing null */
+retry:
+ pos = PATH_MAX - 1;
+ path[pos] = '\0';
+
+ seq = read_seqbegin(&rename_lock);
rcu_read_lock();
- for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
+ temp = dentry;
+ for (;;) {
struct inode *inode;
spin_lock(&temp->d_lock);
@@ -2134,43 +2124,50 @@ retry:
spin_unlock(&temp->d_lock);
break;
}
- strncpy(path + pos, temp->d_name.name,
- temp->d_name.len);
+ memcpy(path + pos, temp->d_name.name, temp->d_name.len);
}
spin_unlock(&temp->d_lock);
- if (pos)
- path[--pos] = '/';
temp = temp->d_parent;
+
+ /* Are we at the root? */
+ if (IS_ROOT(temp))
+ break;
+
+ /* Are we out of buffer? */
+ if (--pos < 0)
+ break;
+
+ path[pos] = '/';
}
+ base = ceph_ino(d_inode(temp));
rcu_read_unlock();
- if (pos != 0 || read_seqretry(&rename_lock, seq)) {
+ if (pos < 0 || read_seqretry(&rename_lock, seq)) {
pr_err("build_path did not end path lookup where "
- "expected, namelen is %d, pos is %d\n", len, pos);
+ "expected, pos is %d\n", pos);
/* presumably this is only possible if racing with a
rename of one of the parent directories (we can not
lock the dentries above us to prevent this, but
retrying should be harmless) */
- kfree(path);
goto retry;
}
- *base = ceph_ino(d_inode(temp));
- *plen = len;
+ *pbase = base;
+ *plen = PATH_MAX - 1 - pos;
dout("build_path on %p %d built %llx '%.*s'\n",
- dentry, d_count(dentry), *base, len, path);
- return path;
+ dentry, d_count(dentry), base, *plen, path + pos);
+ return path + pos;
}
static int build_dentry_path(struct dentry *dentry, struct inode *dir,
const char **ppath, int *ppathlen, u64 *pino,
- int *pfreepath)
+ bool *pfreepath, bool parent_locked)
{
char *path;
rcu_read_lock();
if (!dir)
dir = d_inode_rcu(dentry->d_parent);
- if (dir && ceph_snap(dir) == CEPH_NOSNAP) {
+ if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) {
*pino = ceph_ino(dir);
rcu_read_unlock();
*ppath = dentry->d_name.name;
@@ -2182,13 +2179,13 @@ static int build_dentry_path(struct dentry *dentry, struct inode *dir,
if (IS_ERR(path))
return PTR_ERR(path);
*ppath = path;
- *pfreepath = 1;
+ *pfreepath = true;
return 0;
}
static int build_inode_path(struct inode *inode,
const char **ppath, int *ppathlen, u64 *pino,
- int *pfreepath)
+ bool *pfreepath)
{
struct dentry *dentry;
char *path;
@@ -2204,7 +2201,7 @@ static int build_inode_path(struct inode *inode,
if (IS_ERR(path))
return PTR_ERR(path);
*ppath = path;
- *pfreepath = 1;
+ *pfreepath = true;
return 0;
}
@@ -2215,7 +2212,7 @@ static int build_inode_path(struct inode *inode,
static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
struct inode *rdiri, const char *rpath,
u64 rino, const char **ppath, int *pathlen,
- u64 *ino, int *freepath)
+ u64 *ino, bool *freepath, bool parent_locked)
{
int r = 0;
@@ -2225,7 +2222,7 @@ static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
ceph_snap(rinode));
} else if (rdentry) {
r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino,
- freepath);
+ freepath, parent_locked);
dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
*ppath);
} else if (rpath || rino) {
@@ -2251,7 +2248,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
const char *path2 = NULL;
u64 ino1 = 0, ino2 = 0;
int pathlen1 = 0, pathlen2 = 0;
- int freepath1 = 0, freepath2 = 0;
+ bool freepath1 = false, freepath2 = false;
int len;
u16 releases;
void *p, *end;
@@ -2259,16 +2256,19 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
ret = set_request_path_attr(req->r_inode, req->r_dentry,
req->r_parent, req->r_path1, req->r_ino1.ino,
- &path1, &pathlen1, &ino1, &freepath1);
+ &path1, &pathlen1, &ino1, &freepath1,
+ test_bit(CEPH_MDS_R_PARENT_LOCKED,
+ &req->r_req_flags));
if (ret < 0) {
msg = ERR_PTR(ret);
goto out;
}
+ /* If r_old_dentry is set, then assume that its parent is locked */
ret = set_request_path_attr(NULL, req->r_old_dentry,
req->r_old_dentry_dir,
req->r_path2, req->r_ino2.ino,
- &path2, &pathlen2, &ino2, &freepath2);
+ &path2, &pathlen2, &ino2, &freepath2, true);
if (ret < 0) {
msg = ERR_PTR(ret);
goto out_free1;
@@ -2283,9 +2283,9 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
(!!req->r_inode_drop + !!req->r_dentry_drop +
!!req->r_old_inode_drop + !!req->r_old_dentry_drop);
if (req->r_dentry_drop)
- len += req->r_dentry->d_name.len;
+ len += pathlen1;
if (req->r_old_dentry_drop)
- len += req->r_old_dentry->d_name.len;
+ len += pathlen2;
msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
if (!msg) {
@@ -2362,10 +2362,10 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
out_free2:
if (freepath2)
- kfree((char *)path2);
+ ceph_mdsc_free_path((char *)path2, pathlen2);
out_free1:
if (freepath1)
- kfree((char *)path1);
+ ceph_mdsc_free_path((char *)path1, pathlen1);
out:
return msg;
}
@@ -2379,8 +2379,7 @@ static void complete_request(struct ceph_mds_client *mdsc,
{
if (req->r_callback)
req->r_callback(mdsc, req);
- else
- complete_all(&req->r_completion);
+ complete_all(&req->r_completion);
}
/*
@@ -2622,28 +2621,11 @@ static void kick_requests(struct ceph_mds_client *mdsc, int mds)
}
}
-void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
+int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
struct ceph_mds_request *req)
{
- dout("submit_request on %p\n", req);
- mutex_lock(&mdsc->mutex);
- __register_request(mdsc, req, NULL);
- __do_request(mdsc, req);
- mutex_unlock(&mdsc->mutex);
-}
-
-/*
- * Synchrously perform an mds request. Take care of all of the
- * session setup, forwarding, retry details.
- */
-int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
- struct inode *dir,
- struct ceph_mds_request *req)
-{
int err;
- dout("do_request on %p\n", req);
-
/* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
if (req->r_inode)
ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
@@ -2653,18 +2635,21 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
CEPH_CAP_PIN);
- /* issue */
+ dout("submit_request on %p for inode %p\n", req, dir);
mutex_lock(&mdsc->mutex);
__register_request(mdsc, req, dir);
__do_request(mdsc, req);
+ err = req->r_err;
+ mutex_unlock(&mdsc->mutex);
+ return err;
+}
- if (req->r_err) {
- err = req->r_err;
- goto out;
- }
+static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
+ struct ceph_mds_request *req)
+{
+ int err;
/* wait */
- mutex_unlock(&mdsc->mutex);
dout("do_request waiting\n");
if (!req->r_timeout && req->r_wait_for_completion) {
err = req->r_wait_for_completion(mdsc, req);
@@ -2705,8 +2690,26 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
err = req->r_err;
}
-out:
mutex_unlock(&mdsc->mutex);
+ return err;
+}
+
+/*
+ * Synchrously perform an mds request. Take care of all of the
+ * session setup, forwarding, retry details.
+ */
+int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
+ struct inode *dir,
+ struct ceph_mds_request *req)
+{
+ int err;
+
+ dout("do_request on %p\n", req);
+
+ /* issue */
+ err = ceph_mdsc_submit_request(mdsc, dir, req);
+ if (!err)
+ err = ceph_mdsc_wait_request(mdsc, req);
dout("do_request %p done, result %d\n", req, err);
return err;
}
@@ -3437,7 +3440,7 @@ out_freeflocks:
ceph_pagelist_encode_string(pagelist, path, pathlen);
ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
out_freepath:
- kfree(path);
+ ceph_mdsc_free_path(path, pathlen);
}
out_err:
@@ -3594,7 +3597,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
recon_state.msg_version = 2;
}
/* trsaverse this session's caps */
- err = iterate_session_caps(session, encode_caps_cb, &recon_state);
+ err = ceph_iterate_session_caps(session, encode_caps_cb, &recon_state);
spin_lock(&session->s_cap_lock);
session->s_cap_reconnect = 0;
@@ -4077,6 +4080,8 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
mdsc->max_sessions = 0;
mdsc->stopping = 0;
atomic64_set(&mdsc->quotarealms_count, 0);
+ mdsc->quotarealms_inodes = RB_ROOT;
+ mutex_init(&mdsc->quotarealms_inodes_mutex);
mdsc->last_snap_seq = 0;
init_rwsem(&mdsc->snap_rwsem);
mdsc->snap_realms = RB_ROOT;
@@ -4168,6 +4173,8 @@ void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
* their inode/dcache refs
*/
ceph_msgr_flush();
+
+ ceph_cleanup_quotarealms_inodes(mdsc);
}
/*