summaryrefslogtreecommitdiffstats
path: root/fs/ceph/caps.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph/caps.c')
-rw-r--r--fs/ceph/caps.c439
1 files changed, 251 insertions, 188 deletions
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index b81be9a56487..98ab13e2b71d 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -1,4 +1,4 @@
-#include "ceph_debug.h"
+#include <linux/ceph/ceph_debug.h>
#include <linux/fs.h>
#include <linux/kernel.h>
@@ -9,8 +9,9 @@
#include <linux/writeback.h>
#include "super.h"
-#include "decode.h"
-#include "messenger.h"
+#include "mds_client.h"
+#include <linux/ceph/decode.h>
+#include <linux/ceph/messenger.h>
/*
* Capability management
@@ -113,58 +114,41 @@ const char *ceph_cap_string(int caps)
return cap_str[i];
}
-/*
- * Cap reservations
- *
- * Maintain a global pool of preallocated struct ceph_caps, referenced
- * by struct ceph_caps_reservations. This ensures that we preallocate
- * memory needed to successfully process an MDS response. (If an MDS
- * sends us cap information and we fail to process it, we will have
- * problems due to the client and MDS being out of sync.)
- *
- * Reservations are 'owned' by a ceph_cap_reservation context.
- */
-static spinlock_t caps_list_lock;
-static struct list_head caps_list; /* unused (reserved or unreserved) */
-static int caps_total_count; /* total caps allocated */
-static int caps_use_count; /* in use */
-static int caps_reserve_count; /* unused, reserved */
-static int caps_avail_count; /* unused, unreserved */
-static int caps_min_count; /* keep at least this many (unreserved) */
-
-void __init ceph_caps_init(void)
+void ceph_caps_init(struct ceph_mds_client *mdsc)
{
- INIT_LIST_HEAD(&caps_list);
- spin_lock_init(&caps_list_lock);
+ INIT_LIST_HEAD(&mdsc->caps_list);
+ spin_lock_init(&mdsc->caps_list_lock);
}
-void ceph_caps_finalize(void)
+void ceph_caps_finalize(struct ceph_mds_client *mdsc)
{
struct ceph_cap *cap;
- spin_lock(&caps_list_lock);
- while (!list_empty(&caps_list)) {
- cap = list_first_entry(&caps_list, struct ceph_cap, caps_item);
+ spin_lock(&mdsc->caps_list_lock);
+ while (!list_empty(&mdsc->caps_list)) {
+ cap = list_first_entry(&mdsc->caps_list,
+ struct ceph_cap, caps_item);
list_del(&cap->caps_item);
kmem_cache_free(ceph_cap_cachep, cap);
}
- caps_total_count = 0;
- caps_avail_count = 0;
- caps_use_count = 0;
- caps_reserve_count = 0;
- caps_min_count = 0;
- spin_unlock(&caps_list_lock);
+ mdsc->caps_total_count = 0;
+ mdsc->caps_avail_count = 0;
+ mdsc->caps_use_count = 0;
+ mdsc->caps_reserve_count = 0;
+ mdsc->caps_min_count = 0;
+ spin_unlock(&mdsc->caps_list_lock);
}
-void ceph_adjust_min_caps(int delta)
+void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta)
{
- spin_lock(&caps_list_lock);
- caps_min_count += delta;
- BUG_ON(caps_min_count < 0);
- spin_unlock(&caps_list_lock);
+ spin_lock(&mdsc->caps_list_lock);
+ mdsc->caps_min_count += delta;
+ BUG_ON(mdsc->caps_min_count < 0);
+ spin_unlock(&mdsc->caps_list_lock);
}
-int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need)
+int ceph_reserve_caps(struct ceph_mds_client *mdsc,
+ struct ceph_cap_reservation *ctx, int need)
{
int i;
struct ceph_cap *cap;
@@ -176,16 +160,17 @@ int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need)
dout("reserve caps ctx=%p need=%d\n", ctx, need);
/* first reserve any caps that are already allocated */
- spin_lock(&caps_list_lock);
- if (caps_avail_count >= need)
+ spin_lock(&mdsc->caps_list_lock);
+ if (mdsc->caps_avail_count >= need)
have = need;
else
- have = caps_avail_count;
- caps_avail_count -= have;
- caps_reserve_count += have;
- BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
- caps_avail_count);
- spin_unlock(&caps_list_lock);
+ have = mdsc->caps_avail_count;
+ mdsc->caps_avail_count -= have;
+ mdsc->caps_reserve_count += have;
+ BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+ mdsc->caps_reserve_count +
+ mdsc->caps_avail_count);
+ spin_unlock(&mdsc->caps_list_lock);
for (i = have; i < need; i++) {
cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
@@ -198,19 +183,20 @@ int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need)
}
BUG_ON(have + alloc != need);
- spin_lock(&caps_list_lock);
- caps_total_count += alloc;
- caps_reserve_count += alloc;
- list_splice(&newcaps, &caps_list);
+ spin_lock(&mdsc->caps_list_lock);
+ mdsc->caps_total_count += alloc;
+ mdsc->caps_reserve_count += alloc;
+ list_splice(&newcaps, &mdsc->caps_list);
- BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
- caps_avail_count);
- spin_unlock(&caps_list_lock);
+ BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+ mdsc->caps_reserve_count +
+ mdsc->caps_avail_count);
+ spin_unlock(&mdsc->caps_list_lock);
ctx->count = need;
dout("reserve caps ctx=%p %d = %d used + %d resv + %d avail\n",
- ctx, caps_total_count, caps_use_count, caps_reserve_count,
- caps_avail_count);
+ ctx, mdsc->caps_total_count, mdsc->caps_use_count,
+ mdsc->caps_reserve_count, mdsc->caps_avail_count);
return 0;
out_alloc_count:
@@ -220,26 +206,29 @@ out_alloc_count:
return ret;
}
-int ceph_unreserve_caps(struct ceph_cap_reservation *ctx)
+int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
+ struct ceph_cap_reservation *ctx)
{
dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count);
if (ctx->count) {
- spin_lock(&caps_list_lock);
- BUG_ON(caps_reserve_count < ctx->count);
- caps_reserve_count -= ctx->count;
- caps_avail_count += ctx->count;
+ spin_lock(&mdsc->caps_list_lock);
+ BUG_ON(mdsc->caps_reserve_count < ctx->count);
+ mdsc->caps_reserve_count -= ctx->count;
+ mdsc->caps_avail_count += ctx->count;
ctx->count = 0;
dout("unreserve caps %d = %d used + %d resv + %d avail\n",
- caps_total_count, caps_use_count, caps_reserve_count,
- caps_avail_count);
- BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
- caps_avail_count);
- spin_unlock(&caps_list_lock);
+ mdsc->caps_total_count, mdsc->caps_use_count,
+ mdsc->caps_reserve_count, mdsc->caps_avail_count);
+ BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+ mdsc->caps_reserve_count +
+ mdsc->caps_avail_count);
+ spin_unlock(&mdsc->caps_list_lock);
}
return 0;
}
-static struct ceph_cap *get_cap(struct ceph_cap_reservation *ctx)
+static struct ceph_cap *get_cap(struct ceph_mds_client *mdsc,
+ struct ceph_cap_reservation *ctx)
{
struct ceph_cap *cap = NULL;
@@ -247,71 +236,74 @@ static struct ceph_cap *get_cap(struct ceph_cap_reservation *ctx)
if (!ctx) {
cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
if (cap) {
- caps_use_count++;
- caps_total_count++;
+ mdsc->caps_use_count++;
+ mdsc->caps_total_count++;
}
return cap;
}
- spin_lock(&caps_list_lock);
+ spin_lock(&mdsc->caps_list_lock);
dout("get_cap ctx=%p (%d) %d = %d used + %d resv + %d avail\n",
- ctx, ctx->count, caps_total_count, caps_use_count,
- caps_reserve_count, caps_avail_count);
+ ctx, ctx->count, mdsc->caps_total_count, mdsc->caps_use_count,
+ mdsc->caps_reserve_count, mdsc->caps_avail_count);
BUG_ON(!ctx->count);
- BUG_ON(ctx->count > caps_reserve_count);
- BUG_ON(list_empty(&caps_list));
+ BUG_ON(ctx->count > mdsc->caps_reserve_count);
+ BUG_ON(list_empty(&mdsc->caps_list));
ctx->count--;
- caps_reserve_count--;
- caps_use_count++;
+ mdsc->caps_reserve_count--;
+ mdsc->caps_use_count++;
- cap = list_first_entry(&caps_list, struct ceph_cap, caps_item);
+ cap = list_first_entry(&mdsc->caps_list, struct ceph_cap, caps_item);
list_del(&cap->caps_item);
- BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
- caps_avail_count);
- spin_unlock(&caps_list_lock);
+ BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+ mdsc->caps_reserve_count + mdsc->caps_avail_count);
+ spin_unlock(&mdsc->caps_list_lock);
return cap;
}
-void ceph_put_cap(struct ceph_cap *cap)
+void ceph_put_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap)
{
- spin_lock(&caps_list_lock);
+ spin_lock(&mdsc->caps_list_lock);
dout("put_cap %p %d = %d used + %d resv + %d avail\n",
- cap, caps_total_count, caps_use_count,
- caps_reserve_count, caps_avail_count);
- caps_use_count--;
+ cap, mdsc->caps_total_count, mdsc->caps_use_count,
+ mdsc->caps_reserve_count, mdsc->caps_avail_count);
+ mdsc->caps_use_count--;
/*
* Keep some preallocated caps around (ceph_min_count), to
* avoid lots of free/alloc churn.
*/
- if (caps_avail_count >= caps_reserve_count + caps_min_count) {
- caps_total_count--;
+ if (mdsc->caps_avail_count >= mdsc->caps_reserve_count +
+ mdsc->caps_min_count) {
+ mdsc->caps_total_count--;
kmem_cache_free(ceph_cap_cachep, cap);
} else {
- caps_avail_count++;
- list_add(&cap->caps_item, &caps_list);
+ mdsc->caps_avail_count++;
+ list_add(&cap->caps_item, &mdsc->caps_list);
}
- BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
- caps_avail_count);
- spin_unlock(&caps_list_lock);
+ BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+ mdsc->caps_reserve_count + mdsc->caps_avail_count);
+ spin_unlock(&mdsc->caps_list_lock);
}
-void ceph_reservation_status(struct ceph_client *client,
+void ceph_reservation_status(struct ceph_fs_client *fsc,
int *total, int *avail, int *used, int *reserved,
int *min)
{
+ struct ceph_mds_client *mdsc = fsc->mdsc;
+
if (total)
- *total = caps_total_count;
+ *total = mdsc->caps_total_count;
if (avail)
- *avail = caps_avail_count;
+ *avail = mdsc->caps_avail_count;
if (used)
- *used = caps_use_count;
+ *used = mdsc->caps_use_count;
if (reserved)
- *reserved = caps_reserve_count;
+ *reserved = mdsc->caps_reserve_count;
if (min)
- *min = caps_min_count;
+ *min = mdsc->caps_min_count;
}
/*
@@ -336,22 +328,29 @@ static struct ceph_cap *__get_cap_for_mds(struct ceph_inode_info *ci, int mds)
return NULL;
}
+struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci, int mds)
+{
+ struct ceph_cap *cap;
+
+ spin_lock(&ci->vfs_inode.i_lock);
+ cap = __get_cap_for_mds(ci, mds);
+ spin_unlock(&ci->vfs_inode.i_lock);
+ return cap;
+}
+
/*
- * Return id of any MDS with a cap, preferably FILE_WR|WRBUFFER|EXCL, else
- * -1.
+ * Return id of any MDS with a cap, preferably FILE_WR|BUFFER|EXCL, else -1.
*/
-static int __ceph_get_cap_mds(struct ceph_inode_info *ci, u32 *mseq)
+static int __ceph_get_cap_mds(struct ceph_inode_info *ci)
{
struct ceph_cap *cap;
int mds = -1;
struct rb_node *p;
- /* prefer mds with WR|WRBUFFER|EXCL caps */
+ /* prefer mds with WR|BUFFER|EXCL caps */
for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
cap = rb_entry(p, struct ceph_cap, ci_node);
mds = cap->mds;
- if (mseq)
- *mseq = cap->mseq;
if (cap->issued & (CEPH_CAP_FILE_WR |
CEPH_CAP_FILE_BUFFER |
CEPH_CAP_FILE_EXCL))
@@ -364,7 +363,7 @@ int ceph_get_cap_mds(struct inode *inode)
{
int mds;
spin_lock(&inode->i_lock);
- mds = __ceph_get_cap_mds(ceph_inode(inode), NULL);
+ mds = __ceph_get_cap_mds(ceph_inode(inode));
spin_unlock(&inode->i_lock);
return mds;
}
@@ -401,7 +400,7 @@ static void __insert_cap_node(struct ceph_inode_info *ci,
static void __cap_set_timeouts(struct ceph_mds_client *mdsc,
struct ceph_inode_info *ci)
{
- struct ceph_mount_args *ma = mdsc->client->mount_args;
+ struct ceph_mount_options *ma = mdsc->fsc->mount_options;
ci->i_hold_caps_min = round_jiffies(jiffies +
ma->caps_wanted_delay_min * HZ);
@@ -483,8 +482,8 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
* Each time we receive FILE_CACHE anew, we increment
* i_rdcache_gen.
*/
- if ((issued & CEPH_CAP_FILE_CACHE) &&
- (had & CEPH_CAP_FILE_CACHE) == 0)
+ if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
+ (had & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0)
ci->i_rdcache_gen++;
/*
@@ -517,7 +516,7 @@ int ceph_add_cap(struct inode *inode,
unsigned seq, unsigned mseq, u64 realmino, int flags,
struct ceph_cap_reservation *caps_reservation)
{
- struct ceph_mds_client *mdsc = &ceph_inode_to_client(inode)->mdsc;
+ struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_cap *new_cap = NULL;
struct ceph_cap *cap;
@@ -543,7 +542,7 @@ retry:
new_cap = NULL;
} else {
spin_unlock(&inode->i_lock);
- new_cap = get_cap(caps_reservation);
+ new_cap = get_cap(mdsc, caps_reservation);
if (new_cap == NULL)
return -ENOMEM;
goto retry;
@@ -588,6 +587,7 @@ retry:
} else {
pr_err("ceph_add_cap: couldn't find snap realm %llx\n",
realmino);
+ WARN_ON(!realm);
}
}
@@ -815,7 +815,7 @@ int __ceph_caps_used(struct ceph_inode_info *ci)
used |= CEPH_CAP_PIN;
if (ci->i_rd_ref)
used |= CEPH_CAP_FILE_RD;
- if (ci->i_rdcache_ref || ci->i_rdcache_gen)
+ if (ci->i_rdcache_ref || ci->vfs_inode.i_data.nrpages)
used |= CEPH_CAP_FILE_CACHE;
if (ci->i_wr_ref)
used |= CEPH_CAP_FILE_WR;
@@ -831,7 +831,7 @@ int __ceph_caps_file_wanted(struct ceph_inode_info *ci)
{
int want = 0;
int mode;
- for (mode = 0; mode < 4; mode++)
+ for (mode = 0; mode < CEPH_FILE_MODE_NUM; mode++)
if (ci->i_nr_by_mode[mode])
want |= ceph_caps_for_mode(mode);
return want;
@@ -874,7 +874,7 @@ void __ceph_remove_cap(struct ceph_cap *cap)
struct ceph_mds_session *session = cap->session;
struct ceph_inode_info *ci = cap->ci;
struct ceph_mds_client *mdsc =
- &ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc;
+ ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc;
int removed = 0;
dout("__ceph_remove_cap %p from %p\n", cap, &ci->vfs_inode);
@@ -901,7 +901,7 @@ void __ceph_remove_cap(struct ceph_cap *cap)
ci->i_auth_cap = NULL;
if (removed)
- ceph_put_cap(cap);
+ ceph_put_cap(mdsc, cap);
if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) {
struct ceph_snap_realm *realm = ci->i_snap_realm;
@@ -1083,6 +1083,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
gid_t gid;
struct ceph_mds_session *session;
u64 xattr_version = 0;
+ struct ceph_buffer *xattr_blob = NULL;
int delayed = 0;
u64 flush_tid = 0;
int i;
@@ -1143,6 +1144,10 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
for (i = 0; i < CEPH_CAP_BITS; i++)
if (flushing & (1 << i))
ci->i_cap_flush_tid[i] = flush_tid;
+
+ follows = ci->i_head_snapc->seq;
+ } else {
+ follows = 0;
}
keep = cap->implemented;
@@ -1156,14 +1161,14 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
mtime = inode->i_mtime;
atime = inode->i_atime;
time_warp_seq = ci->i_time_warp_seq;
- follows = ci->i_snap_realm->cached_context->seq;
uid = inode->i_uid;
gid = inode->i_gid;
mode = inode->i_mode;
- if (dropping & CEPH_CAP_XATTR_EXCL) {
+ if (flushing & CEPH_CAP_XATTR_EXCL) {
__ceph_build_xattrs_blob(ci);
- xattr_version = ci->i_xattrs.version + 1;
+ xattr_blob = ci->i_xattrs.blob;
+ xattr_version = ci->i_xattrs.version;
}
spin_unlock(&inode->i_lock);
@@ -1171,9 +1176,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id,
op, keep, want, flushing, seq, flush_tid, issue_seq, mseq,
size, max_size, &mtime, &atime, time_warp_seq,
- uid, gid, mode,
- xattr_version,
- (flushing & CEPH_CAP_XATTR_EXCL) ? ci->i_xattrs.blob : NULL,
+ uid, gid, mode, xattr_version, xattr_blob,
follows);
if (ret < 0) {
dout("error sending cap msg, must requeue %p\n", inode);
@@ -1193,16 +1196,22 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
* asynchronously back to the MDS once sync writes complete and dirty
* data is written out.
*
+ * Unless @again is true, skip cap_snaps that were already sent to
+ * the MDS (i.e., during this session).
+ *
* Called under i_lock. Takes s_mutex as needed.
*/
void __ceph_flush_snaps(struct ceph_inode_info *ci,
- struct ceph_mds_session **psession)
+ struct ceph_mds_session **psession,
+ int again)
+ __releases(ci->vfs_inode->i_lock)
+ __acquires(ci->vfs_inode->i_lock)
{
struct inode *inode = &ci->vfs_inode;
int mds;
struct ceph_cap_snap *capsnap;
u32 mseq;
- struct ceph_mds_client *mdsc = &ceph_inode_to_client(inode)->mdsc;
+ struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
struct ceph_mds_session *session = NULL; /* if session != NULL, we hold
session->s_mutex */
u64 next_follows = 0; /* keep track of how far we've gotten through the
@@ -1223,7 +1232,7 @@ retry:
* pages to be written out.
*/
if (capsnap->dirty_pages || capsnap->writing)
- continue;
+ break;
/*
* if cap writeback already occurred, we should have dropped
@@ -1232,7 +1241,20 @@ retry:
BUG_ON(capsnap->dirty == 0);
/* pick mds, take s_mutex */
- mds = __ceph_get_cap_mds(ci, &mseq);
+ if (ci->i_auth_cap == NULL) {
+ dout("no auth cap (migrating?), doing nothing\n");
+ goto out;
+ }
+
+ /* only flush each capsnap once */
+ if (!again && !list_empty(&capsnap->flushing_item)) {
+ dout("already flushed %p, skipping\n", capsnap);
+ continue;
+ }
+
+ mds = ci->i_auth_cap->session->s_mds;
+ mseq = ci->i_auth_cap->mseq;
+
if (session && session->s_mds != mds) {
dout("oops, wrong session %p mutex\n", session);
mutex_unlock(&session->s_mutex);
@@ -1251,8 +1273,8 @@ retry:
}
/*
* if session == NULL, we raced against a cap
- * deletion. retry, and we'll get a better
- * @mds value next time.
+ * deletion or migration. retry, and we'll
+ * get a better @mds value next time.
*/
spin_lock(&inode->i_lock);
goto retry;
@@ -1266,8 +1288,8 @@ retry:
&session->s_cap_snaps_flushing);
spin_unlock(&inode->i_lock);
- dout("flush_snaps %p cap_snap %p follows %lld size %llu\n",
- inode, capsnap, next_follows, capsnap->size);
+ dout("flush_snaps %p cap_snap %p follows %lld tid %llu\n",
+ inode, capsnap, capsnap->follows, capsnap->flush_tid);
send_cap_msg(session, ceph_vino(inode).ino, 0,
CEPH_CAP_OP_FLUSHSNAP, capsnap->issued, 0,
capsnap->dirty, 0, capsnap->flush_tid, 0, mseq,
@@ -1275,7 +1297,7 @@ retry:
&capsnap->mtime, &capsnap->atime,
capsnap->time_warp_seq,
capsnap->uid, capsnap->gid, capsnap->mode,
- 0, NULL,
+ capsnap->xattr_version, capsnap->xattr_blob,
capsnap->follows);
next_follows = capsnap->follows + 1;
@@ -1290,6 +1312,7 @@ retry:
list_del_init(&ci->i_snap_flush_item);
spin_unlock(&mdsc->snap_flush_lock);
+out:
if (psession)
*psession = session;
else if (session) {
@@ -1303,7 +1326,7 @@ static void ceph_flush_snaps(struct ceph_inode_info *ci)
struct inode *inode = &ci->vfs_inode;
spin_lock(&inode->i_lock);
- __ceph_flush_snaps(ci, NULL);
+ __ceph_flush_snaps(ci, NULL, 0);
spin_unlock(&inode->i_lock);
}
@@ -1314,7 +1337,7 @@ static void ceph_flush_snaps(struct ceph_inode_info *ci)
void __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
{
struct ceph_mds_client *mdsc =
- &ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc;
+ ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc;
struct inode *inode = &ci->vfs_inode;
int was = ci->i_dirty_caps;
int dirty = 0;
@@ -1324,7 +1347,11 @@ void __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
ceph_cap_string(was | mask));
ci->i_dirty_caps |= mask;
if (was == 0) {
- dout(" inode %p now dirty\n", &ci->vfs_inode);
+ if (!ci->i_head_snapc)
+ ci->i_head_snapc = ceph_get_snap_context(
+ ci->i_snap_realm->cached_context);
+ dout(" inode %p now dirty snapc %p\n", &ci->vfs_inode,
+ ci->i_head_snapc);
BUG_ON(!list_empty(&ci->i_dirty_item));
spin_lock(&mdsc->cap_dirty_lock);
list_add(&ci->i_dirty_item, &mdsc->cap_dirty);
@@ -1352,7 +1379,7 @@ void __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
static int __mark_caps_flushing(struct inode *inode,
struct ceph_mds_session *session)
{
- struct ceph_mds_client *mdsc = &ceph_sb_to_client(inode->i_sb)->mdsc;
+ struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
struct ceph_inode_info *ci = ceph_inode(inode);
int flushing;
@@ -1390,17 +1417,6 @@ static int __mark_caps_flushing(struct inode *inode,
/*
* try to invalidate mapping pages without blocking.
*/
-static int mapping_is_empty(struct address_space *mapping)
-{
- struct page *page = find_get_page(mapping, 0);
-
- if (!page)
- return 1;
-
- put_page(page);
- return 0;
-}
-
static int try_nonblocking_invalidate(struct inode *inode)
{
struct ceph_inode_info *ci = ceph_inode(inode);
@@ -1410,7 +1426,7 @@ static int try_nonblocking_invalidate(struct inode *inode)
invalidate_mapping_pages(&inode->i_data, 0, -1);
spin_lock(&inode->i_lock);
- if (mapping_is_empty(&inode->i_data) &&
+ if (inode->i_data.nrpages == 0 &&
invalidating_gen == ci->i_rdcache_gen) {
/* success. */
dout("try_nonblocking_invalidate %p success\n", inode);
@@ -1435,10 +1451,9 @@ static int try_nonblocking_invalidate(struct inode *inode)
*/
void ceph_check_caps(struct ceph_inode_info *ci, int flags,
struct ceph_mds_session *session)
- __releases(session->s_mutex)
{
- struct ceph_client *client = ceph_inode_to_client(&ci->vfs_inode);
- struct ceph_mds_client *mdsc = &client->mdsc;
+ struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode);
+ struct ceph_mds_client *mdsc = fsc->mdsc;
struct inode *inode = &ci->vfs_inode;
struct ceph_cap *cap;
int file_wanted, used;
@@ -1463,7 +1478,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
/* flush snaps first time around only */
if (!list_empty(&ci->i_cap_snaps))
- __ceph_flush_snaps(ci, &session);
+ __ceph_flush_snaps(ci, &session, 0);
goto retry_locked;
retry:
spin_lock(&inode->i_lock);
@@ -1508,13 +1523,15 @@ retry_locked:
*/
if ((!is_delayed || mdsc->stopping) &&
ci->i_wrbuffer_ref == 0 && /* no dirty pages... */
- ci->i_rdcache_gen && /* may have cached pages */
+ inode->i_data.nrpages && /* have cached pages */
(file_wanted == 0 || /* no open files */
- (revoking & CEPH_CAP_FILE_CACHE)) && /* or revoking cache */
+ (revoking & (CEPH_CAP_FILE_CACHE|
+ CEPH_CAP_FILE_LAZYIO))) && /* or revoking cache */
!tried_invalidate) {
dout("check_caps trying to invalidate on %p\n", inode);
if (try_nonblocking_invalidate(inode) < 0) {
- if (revoking & CEPH_CAP_FILE_CACHE) {
+ if (revoking & (CEPH_CAP_FILE_CACHE|
+ CEPH_CAP_FILE_LAZYIO)) {
dout("check_caps queuing invalidate\n");
queue_invalidate = 1;
ci->i_rdcache_revoking = ci->i_rdcache_gen;
@@ -1679,7 +1696,7 @@ ack:
static int try_flush_caps(struct inode *inode, struct ceph_mds_session *session,
unsigned *flush_tid)
{
- struct ceph_mds_client *mdsc = &ceph_sb_to_client(inode->i_sb)->mdsc;
+ struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
struct ceph_inode_info *ci = ceph_inode(inode);
int unlock_session = session ? 0 : 1;
int flushing = 0;
@@ -1845,7 +1862,7 @@ int ceph_write_inode(struct inode *inode, struct writeback_control *wbc)
caps_are_flushed(inode, flush_tid));
} else {
struct ceph_mds_client *mdsc =
- &ceph_sb_to_client(inode->i_sb)->mdsc;
+ ceph_sb_to_client(inode->i_sb)->mdsc;
spin_lock(&inode->i_lock);
if (__ceph_caps_dirty(ci))
@@ -1878,7 +1895,7 @@ static void kick_flushing_capsnaps(struct ceph_mds_client *mdsc,
if (cap && cap->session == session) {
dout("kick_flushing_caps %p cap %p capsnap %p\n", inode,
cap, capsnap);
- __ceph_flush_snaps(ci, &session);
+ __ceph_flush_snaps(ci, &session, 1);
} else {
pr_err("%p auth cap %p not mds%d ???\n", inode,
cap, session->s_mds);
@@ -2181,7 +2198,9 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
if (ci->i_head_snapc == snapc) {
ci->i_wrbuffer_ref_head -= nr;
- if (!ci->i_wrbuffer_ref_head) {
+ if (ci->i_wrbuffer_ref_head == 0 &&
+ ci->i_dirty_caps == 0 && ci->i_flushing_caps == 0) {
+ BUG_ON(!ci->i_head_snapc);
ceph_put_snap_context(ci->i_head_snapc);
ci->i_head_snapc = NULL;
}
@@ -2250,12 +2269,12 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
struct ceph_mds_session *session,
struct ceph_cap *cap,
struct ceph_buffer *xattr_buf)
- __releases(inode->i_lock)
- __releases(session->s_mutex)
+ __releases(inode->i_lock)
{
struct ceph_inode_info *ci = ceph_inode(inode);
int mds = session->s_mds;
- int seq = le32_to_cpu(grant->seq);
+ unsigned seq = le32_to_cpu(grant->seq);
+ unsigned issue_seq = le32_to_cpu(grant->issue_seq);
int newcaps = le32_to_cpu(grant->caps);
int issued, implemented, used, wanted, dirty;
u64 size = le64_to_cpu(grant->size);
@@ -2267,8 +2286,8 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
int revoked_rdcache = 0;
int queue_invalidate = 0;
- dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n",
- inode, cap, mds, seq, ceph_cap_string(newcaps));
+ dout("handle_cap_grant inode %p cap %p mds%d seq %u/%u %s\n",
+ inode, cap, mds, seq, issue_seq, ceph_cap_string(newcaps));
dout(" size %llu max_size %llu, i_size %llu\n", size, max_size,
inode->i_size);
@@ -2278,6 +2297,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
* will invalidate _after_ writeback.)
*/
if (((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) &&
+ (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
!ci->i_wrbuffer_ref) {
if (try_nonblocking_invalidate(inode) == 0) {
revoked_rdcache = 1;
@@ -2363,21 +2383,29 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
}
cap->seq = seq;
+ cap->issue_seq = issue_seq;
/* file layout may have changed */
ci->i_layout = grant->layout;
/* revocation, grant, or no-op? */
if (cap->issued & ~newcaps) {
- dout("revocation: %s -> %s\n", ceph_cap_string(cap->issued),
- ceph_cap_string(newcaps));
- if ((used & ~newcaps) & CEPH_CAP_FILE_BUFFER)
- writeback = 1; /* will delay ack */
- else if (dirty & ~newcaps)
- check_caps = 1; /* initiate writeback in check_caps */
- else if (((used & ~newcaps) & CEPH_CAP_FILE_CACHE) == 0 ||
- revoked_rdcache)
- check_caps = 2; /* send revoke ack in check_caps */
+ int revoking = cap->issued & ~newcaps;
+
+ dout("revocation: %s -> %s (revoking %s)\n",
+ ceph_cap_string(cap->issued),
+ ceph_cap_string(newcaps),
+ ceph_cap_string(revoking));
+ if (revoking & used & CEPH_CAP_FILE_BUFFER)
+ writeback = 1; /* initiate writeback; will delay ack */
+ else if (revoking == CEPH_CAP_FILE_CACHE &&
+ (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
+ queue_invalidate)
+ ; /* do nothing yet, invalidation will be queued */
+ else if (cap == ci->i_auth_cap)
+ check_caps = 1; /* check auth cap only */
+ else
+ check_caps = 2; /* check all caps */
cap->issued = newcaps;
cap->implemented |= newcaps;
} else if (cap->issued == newcaps) {
@@ -2427,7 +2455,7 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
__releases(inode->i_lock)
{
struct ceph_inode_info *ci = ceph_inode(inode);
- struct ceph_mds_client *mdsc = &ceph_sb_to_client(inode->i_sb)->mdsc;
+ struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
unsigned seq = le32_to_cpu(m->seq);
int dirty = le32_to_cpu(m->dirty);
int cleaned = 0;
@@ -2467,6 +2495,11 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
dout(" inode %p now clean\n", inode);
BUG_ON(!list_empty(&ci->i_dirty_item));
drop = 1;
+ if (ci->i_wrbuffer_ref_head == 0) {
+ BUG_ON(!ci->i_head_snapc);
+ ceph_put_snap_context(ci->i_head_snapc);
+ ci->i_head_snapc = NULL;
+ }
} else {
BUG_ON(list_empty(&ci->i_dirty_item));
}
@@ -2568,7 +2601,8 @@ static void handle_cap_trunc(struct inode *inode,
* caller holds s_mutex
*/
static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
- struct ceph_mds_session *session)
+ struct ceph_mds_session *session,
+ int *open_target_sessions)
{
struct ceph_inode_info *ci = ceph_inode(inode);
int mds = session->s_mds;
@@ -2600,6 +2634,12 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
ci->i_cap_exporting_mds = mds;
ci->i_cap_exporting_mseq = mseq;
ci->i_cap_exporting_issued = cap->issued;
+
+ /*
+ * make sure we have open sessions with all possible
+ * export targets, so that we get the matching IMPORT
+ */
+ *open_target_sessions = 1;
}
__ceph_remove_cap(cap);
}
@@ -2663,7 +2703,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
struct ceph_msg *msg)
{
struct ceph_mds_client *mdsc = session->s_mdsc;
- struct super_block *sb = mdsc->client->sb;
+ struct super_block *sb = mdsc->fsc->sb;
struct inode *inode;
struct ceph_cap *cap;
struct ceph_mds_caps *h;
@@ -2675,6 +2715,10 @@ void ceph_handle_caps(struct ceph_mds_session *session,
u64 size, max_size;
u64 tid;
void *snaptrace;
+ size_t snaptrace_len;
+ void *flock;
+ u32 flock_len;
+ int open_target_sessions = 0;
dout("handle_caps from mds%d\n", mds);
@@ -2683,7 +2727,6 @@ void ceph_handle_caps(struct ceph_mds_session *session,
if (msg->front.iov_len < sizeof(*h))
goto bad;
h = msg->front.iov_base;
- snaptrace = h + 1;
op = le32_to_cpu(h->op);
vino.ino = le64_to_cpu(h->ino);
vino.snap = CEPH_NOSNAP;
@@ -2693,6 +2736,21 @@ void ceph_handle_caps(struct ceph_mds_session *session,
size = le64_to_cpu(h->size);
max_size = le64_to_cpu(h->max_size);
+ snaptrace = h + 1;
+ snaptrace_len = le32_to_cpu(h->snap_trace_len);
+
+ if (le16_to_cpu(msg->hdr.version) >= 2) {
+ void *p, *end;
+
+ p = snaptrace + snaptrace_len;
+ end = msg->front.iov_base + msg->front.iov_len;
+ ceph_decode_32_safe(&p, end, flock_len, bad);
+ flock = p;
+ } else {
+ flock = NULL;
+ flock_len = 0;
+ }
+
mutex_lock(&session->s_mutex);
session->s_seq++;
dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq,
@@ -2708,15 +2766,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
if (op == CEPH_CAP_OP_IMPORT)
__queue_cap_release(session, vino.ino, cap_id,
mseq, seq);
-
- /*
- * send any full release message to try to move things
- * along for the mds (who clearly thinks we still have this
- * cap).
- */
- ceph_add_cap_releases(mdsc, session, -1);
- ceph_send_cap_releases(mdsc, session);
- goto done;
+ goto flush_cap_releases;
}
/* these will work even if we don't have a cap yet */
@@ -2726,12 +2776,12 @@ void ceph_handle_caps(struct ceph_mds_session *session,
goto done;
case CEPH_CAP_OP_EXPORT:
- handle_cap_export(inode, h, session);
+ handle_cap_export(inode, h, session, &open_target_sessions);
goto done;
case CEPH_CAP_OP_IMPORT:
handle_cap_import(mdsc, inode, h, session,
- snaptrace, le32_to_cpu(h->snap_trace_len));
+ snaptrace, snaptrace_len);
ceph_check_caps(ceph_inode(inode), CHECK_CAPS_NODELAY,
session);
goto done_unlocked;
@@ -2744,7 +2794,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
dout(" no cap on %p ino %llx.%llx from mds%d\n",
inode, ceph_ino(inode), ceph_snap(inode), mds);
spin_unlock(&inode->i_lock);
- goto done;
+ goto flush_cap_releases;
}
/* note that each of these drops i_lock for us */
@@ -2768,11 +2818,24 @@ void ceph_handle_caps(struct ceph_mds_session *session,
ceph_cap_op_name(op));
}
+ goto done;
+
+flush_cap_releases:
+ /*
+ * send any full release message to try to move things
+ * along for the mds (who clearly thinks we still have this
+ * cap).
+ */
+ ceph_add_cap_releases(mdsc, session);
+ ceph_send_cap_releases(mdsc, session);
+
done:
mutex_unlock(&session->s_mutex);
done_unlocked:
if (inode)
iput(inode);
+ if (open_target_sessions)
+ ceph_mdsc_open_export_target_sessions(mdsc, session);
return;
bad: