From 97a385e558292ba0851906783642239865670a5f Mon Sep 17 00:00:00 2001 From: Christoph Hellwig Date: Wed, 1 May 2019 16:40:32 -0400 Subject: libceph: remove ceph_get_direct_page_vector() This function is entirely unused. Signed-off-by: Christoph Hellwig Reviewed-by: Ilya Dryomov Signed-off-by: Ilya Dryomov --- net/ceph/pagevec.c | 33 --------------------------------- 1 file changed, 33 deletions(-) (limited to 'net/ceph') diff --git a/net/ceph/pagevec.c b/net/ceph/pagevec.c index 74cafc0142ea..64305e7056a1 100644 --- a/net/ceph/pagevec.c +++ b/net/ceph/pagevec.c @@ -10,39 +10,6 @@ #include -/* - * build a vector of user pages - */ -struct page **ceph_get_direct_page_vector(const void __user *data, - int num_pages, bool write_page) -{ - struct page **pages; - int got = 0; - int rc = 0; - - pages = kmalloc_array(num_pages, sizeof(*pages), GFP_NOFS); - if (!pages) - return ERR_PTR(-ENOMEM); - - while (got < num_pages) { - rc = get_user_pages_fast( - (unsigned long)data + ((unsigned long)got * PAGE_SIZE), - num_pages - got, write_page ? FOLL_WRITE : 0, pages + got); - if (rc < 0) - break; - BUG_ON(rc == 0); - got += rc; - } - if (rc < 0) - goto fail; - return pages; - -fail: - ceph_put_page_vector(pages, got, false); - return ERR_PTR(rc); -} -EXPORT_SYMBOL(ceph_get_direct_page_vector); - void ceph_put_page_vector(struct page **pages, int num_pages, bool dirty) { int i; -- cgit v1.2.3-55-g7522 From bc07532cc51f4c33cb6136405c9807c5961e468b Mon Sep 17 00:00:00 2001 From: Jeff Layton Date: Tue, 4 Jun 2019 13:13:48 -0400 Subject: libceph: fix sa_family just after reading address It doesn't make sense to leave it undecoded until later. Signed-off-by: Jeff Layton Reviewed-by: "Yan, Zheng" Signed-off-by: Ilya Dryomov --- net/ceph/messenger.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'net/ceph') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index cd0b094468b6..8d0c51dd4666 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -1732,12 +1732,14 @@ static int read_partial_banner(struct ceph_connection *con) ret = read_partial(con, end, size, &con->actual_peer_addr); if (ret <= 0) goto out; + ceph_decode_addr(&con->actual_peer_addr); size = sizeof (con->peer_addr_for_me); end += size; ret = read_partial(con, end, size, &con->peer_addr_for_me); if (ret <= 0) goto out; + ceph_decode_addr(&con->peer_addr_for_me); out: return ret; @@ -2010,9 +2012,6 @@ static int process_banner(struct ceph_connection *con) if (verify_hello(con) < 0) return -1; - ceph_decode_addr(&con->actual_peer_addr); - ceph_decode_addr(&con->peer_addr_for_me); - /* * Make sure the other end is who we wanted. note that the other * end may not yet know their ip address, so if it's 0.0.0.0, give -- cgit v1.2.3-55-g7522 From 6c37f0e64173571914a443f74d36e5a22dabfc05 Mon Sep 17 00:00:00 2001 From: Jeff Layton Date: Mon, 3 Jun 2019 14:45:16 -0400 Subject: libceph: add ceph_decode_entity_addr Add a function for decoding an entity_addr_t. Once CEPH_FEATURE_MSG_ADDR2 is enabled, the server daemons will start encoding entity_addr_t differently. Add a new helper function that can handle either format. Signed-off-by: Jeff Layton Reviewed-by: "Yan, Zheng" Signed-off-by: Ilya Dryomov --- include/linux/ceph/decode.h | 2 + net/ceph/Makefile | 2 +- net/ceph/decode.c | 90 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 93 insertions(+), 1 deletion(-) create mode 100644 net/ceph/decode.c (limited to 'net/ceph') diff --git a/include/linux/ceph/decode.h b/include/linux/ceph/decode.h index a6c2a48d42e0..1c0a665bfc03 100644 --- a/include/linux/ceph/decode.h +++ b/include/linux/ceph/decode.h @@ -230,6 +230,8 @@ static inline void ceph_decode_addr(struct ceph_entity_addr *a) WARN_ON(a->in_addr.ss_family == 512); } +extern int ceph_decode_entity_addr(void **p, void *end, + struct ceph_entity_addr *addr); /* * encoders */ diff --git a/net/ceph/Makefile b/net/ceph/Makefile index db09defe27d0..59d0ba2072de 100644 --- a/net/ceph/Makefile +++ b/net/ceph/Makefile @@ -5,7 +5,7 @@ obj-$(CONFIG_CEPH_LIB) += libceph.o libceph-y := ceph_common.o messenger.o msgpool.o buffer.o pagelist.o \ - mon_client.o \ + mon_client.o decode.o \ cls_lock_client.o \ osd_client.o osdmap.o crush/crush.o crush/mapper.o crush/hash.o \ striper.o \ diff --git a/net/ceph/decode.c b/net/ceph/decode.c new file mode 100644 index 000000000000..b82981199549 --- /dev/null +++ b/net/ceph/decode.c @@ -0,0 +1,90 @@ +// SPDX-License-Identifier: GPL-2.0 + +#include + +static int +ceph_decode_entity_addr_versioned(void **p, void *end, + struct ceph_entity_addr *addr) +{ + int ret; + u8 struct_v; + u32 struct_len, addr_len; + void *struct_end; + + ret = ceph_start_decoding(p, end, 1, "entity_addr_t", &struct_v, + &struct_len); + if (ret) + goto bad; + + ret = -EINVAL; + struct_end = *p + struct_len; + + ceph_decode_copy_safe(p, end, &addr->type, sizeof(addr->type), bad); + + /* + * TYPE_NONE == 0 + * TYPE_LEGACY == 1 + * + * Clients that don't support ADDR2 always send TYPE_NONE. + * For now, since all we support is msgr1, just set this to 0 + * when we get a TYPE_LEGACY type. + */ + if (addr->type == cpu_to_le32(1)) + addr->type = 0; + + ceph_decode_copy_safe(p, end, &addr->nonce, sizeof(addr->nonce), bad); + + ceph_decode_32_safe(p, end, addr_len, bad); + if (addr_len > sizeof(addr->in_addr)) + goto bad; + + memset(&addr->in_addr, 0, sizeof(addr->in_addr)); + if (addr_len) { + ceph_decode_copy_safe(p, end, &addr->in_addr, addr_len, bad); + + addr->in_addr.ss_family = + le16_to_cpu((__force __le16)addr->in_addr.ss_family); + } + + /* Advance past anything the client doesn't yet understand */ + *p = struct_end; + ret = 0; +bad: + return ret; +} + +static int +ceph_decode_entity_addr_legacy(void **p, void *end, + struct ceph_entity_addr *addr) +{ + int ret = -EINVAL; + + /* Skip rest of type field */ + ceph_decode_skip_n(p, end, 3, bad); + addr->type = 0; + ceph_decode_copy_safe(p, end, &addr->nonce, sizeof(addr->nonce), bad); + memset(&addr->in_addr, 0, sizeof(addr->in_addr)); + ceph_decode_copy_safe(p, end, &addr->in_addr, + sizeof(addr->in_addr), bad); + addr->in_addr.ss_family = + be16_to_cpu((__force __be16)addr->in_addr.ss_family); + ret = 0; +bad: + return ret; +} + +int +ceph_decode_entity_addr(void **p, void *end, struct ceph_entity_addr *addr) +{ + u8 marker; + + ceph_decode_8_safe(p, end, marker, bad); + if (marker == 1) + return ceph_decode_entity_addr_versioned(p, end, addr); + else if (marker == 0) + return ceph_decode_entity_addr_legacy(p, end, addr); +bad: + return -EINVAL; +} +EXPORT_SYMBOL(ceph_decode_entity_addr); + -- cgit v1.2.3-55-g7522 From 0bfb0f288992adbf8d1f0d5f22f0fd398b146316 Mon Sep 17 00:00:00 2001 From: Jeff Layton Date: Fri, 31 May 2019 15:32:28 -0400 Subject: libceph: ADDR2 support for monmap Switch the MonMap decoder to use the new decoding routine for entity_addr_t's. Signed-off-by: Jeff Layton Reviewed-by: "Yan, Zheng" Signed-off-by: Ilya Dryomov --- include/linux/ceph/mon_client.h | 1 - net/ceph/mon_client.c | 21 +++++++++++++-------- 2 files changed, 13 insertions(+), 9 deletions(-) (limited to 'net/ceph') diff --git a/include/linux/ceph/mon_client.h b/include/linux/ceph/mon_client.h index 3a4688af7455..b4d134d3312a 100644 --- a/include/linux/ceph/mon_client.h +++ b/include/linux/ceph/mon_client.h @@ -104,7 +104,6 @@ struct ceph_mon_client { #endif }; -extern struct ceph_monmap *ceph_monmap_decode(void *p, void *end); extern int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr); diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c index 895679d3529b..0520bf9825aa 100644 --- a/net/ceph/mon_client.c +++ b/net/ceph/mon_client.c @@ -39,7 +39,7 @@ static int __validate_auth(struct ceph_mon_client *monc); /* * Decode a monmap blob (e.g., during mount). */ -struct ceph_monmap *ceph_monmap_decode(void *p, void *end) +static struct ceph_monmap *ceph_monmap_decode(void *p, void *end) { struct ceph_monmap *m = NULL; int i, err = -EINVAL; @@ -50,7 +50,7 @@ struct ceph_monmap *ceph_monmap_decode(void *p, void *end) ceph_decode_32_safe(&p, end, len, bad); ceph_decode_need(&p, end, len, bad); - dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p)); + dout("monmap_decode %p %p len %d (%d)\n", p, end, len, (int)(end-p)); p += sizeof(u16); /* skip version */ ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad); @@ -58,7 +58,6 @@ struct ceph_monmap *ceph_monmap_decode(void *p, void *end) epoch = ceph_decode_32(&p); num_mon = ceph_decode_32(&p); - ceph_decode_need(&p, end, num_mon*sizeof(m->mon_inst[0]), bad); if (num_mon > CEPH_MAX_MON) goto bad; @@ -68,17 +67,22 @@ struct ceph_monmap *ceph_monmap_decode(void *p, void *end) m->fsid = fsid; m->epoch = epoch; m->num_mon = num_mon; - ceph_decode_copy(&p, m->mon_inst, num_mon*sizeof(m->mon_inst[0])); - for (i = 0; i < num_mon; i++) - ceph_decode_addr(&m->mon_inst[i].addr); - + for (i = 0; i < num_mon; ++i) { + struct ceph_entity_inst *inst = &m->mon_inst[i]; + + /* copy name portion */ + ceph_decode_copy_safe(&p, end, &inst->name, + sizeof(inst->name), bad); + err = ceph_decode_entity_addr(&p, end, &inst->addr); + if (err) + goto bad; + } dout("monmap_decode epoch %d, num_mon %d\n", m->epoch, m->num_mon); for (i = 0; i < m->num_mon; i++) dout("monmap_decode mon%d is %s\n", i, ceph_pr_addr(&m->mon_inst[i].addr)); return m; - bad: dout("monmap_decode failed with %d\n", err); kfree(m); @@ -469,6 +473,7 @@ static void ceph_monc_handle_map(struct ceph_mon_client *monc, if (IS_ERR(monmap)) { pr_err("problem decoding monmap, %d\n", (int)PTR_ERR(monmap)); + ceph_msg_dump(msg); goto out; } -- cgit v1.2.3-55-g7522 From dcbc919a5dc8c2629684a113a90c0b6fe10c3462 Mon Sep 17 00:00:00 2001 From: Jeff Layton Date: Mon, 3 Jun 2019 15:08:13 -0400 Subject: libceph: switch osdmap decoding to use ceph_decode_entity_addr Signed-off-by: Jeff Layton Reviewed-by: "Yan, Zheng" Signed-off-by: Ilya Dryomov --- net/ceph/osdmap.c | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) (limited to 'net/ceph') diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 48a31dc9161c..95e98ae59a54 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -1489,11 +1489,9 @@ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map) /* osd_state, osd_weight, osd_addrs->client_addr */ ceph_decode_need(p, end, 3*sizeof(u32) + - map->max_osd*((struct_v >= 5 ? sizeof(u32) : - sizeof(u8)) + - sizeof(*map->osd_weight) + - sizeof(*map->osd_addr)), e_inval); - + map->max_osd*(struct_v >= 5 ? sizeof(u32) : + sizeof(u8)) + + sizeof(*map->osd_weight), e_inval); if (ceph_decode_32(p) != map->max_osd) goto e_inval; @@ -1514,9 +1512,11 @@ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map) if (ceph_decode_32(p) != map->max_osd) goto e_inval; - ceph_decode_copy(p, map->osd_addr, map->max_osd*sizeof(*map->osd_addr)); - for (i = 0; i < map->max_osd; i++) - ceph_decode_addr(&map->osd_addr[i]); + for (i = 0; i < map->max_osd; i++) { + err = ceph_decode_entity_addr(p, end, &map->osd_addr[i]); + if (err) + goto bad; + } /* pg_temp */ err = decode_pg_temp(p, end, map); -- cgit v1.2.3-55-g7522 From 51fc7ab44519adc6684ac6575826f8b1a16ee4b3 Mon Sep 17 00:00:00 2001 From: Jeff Layton Date: Tue, 4 Jun 2019 14:35:18 -0400 Subject: libceph: fix watch_item_t decoding to use ceph_decode_entity_addr While we're in there, let's also fix up the decoder to do proper bounds checking. Signed-off-by: Jeff Layton Reviewed-by: "Yan, Zheng" Signed-off-by: Ilya Dryomov --- net/ceph/osd_client.c | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) (limited to 'net/ceph') diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 9a8eca5eda65..54170a35ecec 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -4914,20 +4914,26 @@ static int decode_watcher(void **p, void *end, struct ceph_watch_item *item) ret = ceph_start_decoding(p, end, 2, "watch_item_t", &struct_v, &struct_len); if (ret) - return ret; + goto bad; + + ret = -EINVAL; + ceph_decode_copy_safe(p, end, &item->name, sizeof(item->name), bad); + ceph_decode_64_safe(p, end, item->cookie, bad); + ceph_decode_skip_32(p, end, bad); /* skip timeout seconds */ - ceph_decode_copy(p, &item->name, sizeof(item->name)); - item->cookie = ceph_decode_64(p); - *p += 4; /* skip timeout_seconds */ if (struct_v >= 2) { - ceph_decode_copy(p, &item->addr, sizeof(item->addr)); - ceph_decode_addr(&item->addr); + ret = ceph_decode_entity_addr(p, end, &item->addr); + if (ret) + goto bad; + } else { + ret = 0; } dout("%s %s%llu cookie %llu addr %s\n", __func__, ENTITY_NAME(item->name), item->cookie, ceph_pr_addr(&item->addr)); - return 0; +bad: + return ret; } static int decode_watchers(void **p, void *end, -- cgit v1.2.3-55-g7522 From 8cb5f2b4fcf4b8a4043e26c232b435e83e2abe87 Mon Sep 17 00:00:00 2001 From: Jeff Layton Date: Tue, 4 Jun 2019 15:10:44 -0400 Subject: libceph: correctly decode ADDR2 addresses in incremental OSD maps Given the new format, we have to decode the addresses twice. Once to skip past the new_up_client field, and a second time to collect the addresses. Signed-off-by: Jeff Layton Reviewed-by: "Yan, Zheng" Signed-off-by: Ilya Dryomov --- net/ceph/osdmap.c | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) (limited to 'net/ceph') diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 95e98ae59a54..90437906b7bc 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -1618,12 +1618,17 @@ static int decode_new_up_state_weight(void **p, void *end, u8 struct_v, void *new_state; void *new_weight_end; u32 len; + int i; new_up_client = *p; ceph_decode_32_safe(p, end, len, e_inval); - len *= sizeof(u32) + sizeof(struct ceph_entity_addr); - ceph_decode_need(p, end, len, e_inval); - *p += len; + for (i = 0; i < len; ++i) { + struct ceph_entity_addr addr; + + ceph_decode_skip_32(p, end, e_inval); + if (ceph_decode_entity_addr(p, end, &addr)) + goto e_inval; + } new_state = *p; ceph_decode_32_safe(p, end, len, e_inval); @@ -1699,9 +1704,9 @@ static int decode_new_up_state_weight(void **p, void *end, u8 struct_v, struct ceph_entity_addr addr; osd = ceph_decode_32(p); - ceph_decode_copy(p, &addr, sizeof(addr)); - ceph_decode_addr(&addr); BUG_ON(osd >= map->max_osd); + if (ceph_decode_entity_addr(p, end, &addr)) + goto e_inval; pr_info("osd%d up\n", osd); map->osd_state[osd] |= CEPH_OSD_EXISTS | CEPH_OSD_UP; map->osd_addr[osd] = addr; -- cgit v1.2.3-55-g7522 From 2f9800c899dc1f4ad0ca32105069bfa83e80a05b Mon Sep 17 00:00:00 2001 From: Jeff Layton Date: Tue, 4 Jun 2019 15:17:32 -0400 Subject: ceph: fix decode_locker to use ceph_decode_entity_addr Signed-off-by: Jeff Layton Reviewed-by: "Yan, Zheng" Signed-off-by: Ilya Dryomov --- net/ceph/cls_lock_client.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'net/ceph') diff --git a/net/ceph/cls_lock_client.c b/net/ceph/cls_lock_client.c index 4cc28541281b..b1d12bf4b83e 100644 --- a/net/ceph/cls_lock_client.c +++ b/net/ceph/cls_lock_client.c @@ -264,8 +264,11 @@ static int decode_locker(void **p, void *end, struct ceph_locker *locker) return ret; *p += sizeof(struct ceph_timespec); /* skip expiration */ - ceph_decode_copy(p, &locker->info.addr, sizeof(locker->info.addr)); - ceph_decode_addr(&locker->info.addr); + + ret = ceph_decode_entity_addr(p, end, &locker->info.addr); + if (ret) + return ret; + len = ceph_decode_32(p); *p += len; /* skip description */ -- cgit v1.2.3-55-g7522 From d3c3c0a841d5dafc5395be363996d619255a732f Mon Sep 17 00:00:00 2001 From: Jeff Layton Date: Mon, 17 Jun 2019 06:57:25 -0400 Subject: libceph: use TYPE_LEGACY for entity addrs instead of TYPE_NONE Going forward, we'll have different address types so let's use the addr2 TYPE_LEGACY for internal tracking rather than TYPE_NONE. Also, make ceph_pr_addr print the address type value as well. Signed-off-by: Jeff Layton Reviewed-by: "Yan, Zheng" Signed-off-by: Ilya Dryomov --- include/linux/ceph/decode.h | 7 +++++++ net/ceph/decode.c | 18 ++++++------------ net/ceph/messenger.c | 7 +++++-- 3 files changed, 18 insertions(+), 14 deletions(-) (limited to 'net/ceph') diff --git a/include/linux/ceph/decode.h b/include/linux/ceph/decode.h index 1c0a665bfc03..ce488d95be89 100644 --- a/include/linux/ceph/decode.h +++ b/include/linux/ceph/decode.h @@ -218,16 +218,23 @@ static inline void ceph_encode_timespec64(struct ceph_timespec *tv, /* * sockaddr_storage <-> ceph_sockaddr */ +#define CEPH_ENTITY_ADDR_TYPE_NONE 0 +#define CEPH_ENTITY_ADDR_TYPE_LEGACY __cpu_to_le32(1) + static inline void ceph_encode_addr(struct ceph_entity_addr *a) { __be16 ss_family = htons(a->in_addr.ss_family); a->in_addr.ss_family = *(__u16 *)&ss_family; + + /* Banner addresses require TYPE_NONE */ + a->type = CEPH_ENTITY_ADDR_TYPE_NONE; } static inline void ceph_decode_addr(struct ceph_entity_addr *a) { __be16 ss_family = *(__be16 *)&a->in_addr.ss_family; a->in_addr.ss_family = ntohs(ss_family); WARN_ON(a->in_addr.ss_family == 512); + a->type = CEPH_ENTITY_ADDR_TYPE_LEGACY; } extern int ceph_decode_entity_addr(void **p, void *end, diff --git a/net/ceph/decode.c b/net/ceph/decode.c index b82981199549..eea529595a7a 100644 --- a/net/ceph/decode.c +++ b/net/ceph/decode.c @@ -21,17 +21,6 @@ ceph_decode_entity_addr_versioned(void **p, void *end, ceph_decode_copy_safe(p, end, &addr->type, sizeof(addr->type), bad); - /* - * TYPE_NONE == 0 - * TYPE_LEGACY == 1 - * - * Clients that don't support ADDR2 always send TYPE_NONE. - * For now, since all we support is msgr1, just set this to 0 - * when we get a TYPE_LEGACY type. - */ - if (addr->type == cpu_to_le32(1)) - addr->type = 0; - ceph_decode_copy_safe(p, end, &addr->nonce, sizeof(addr->nonce), bad); ceph_decode_32_safe(p, end, addr_len, bad); @@ -61,7 +50,12 @@ ceph_decode_entity_addr_legacy(void **p, void *end, /* Skip rest of type field */ ceph_decode_skip_n(p, end, 3, bad); - addr->type = 0; + + /* + * Clients that don't support ADDR2 always send TYPE_NONE, change it + * to TYPE_LEGACY for forward compatibility. + */ + addr->type = CEPH_ENTITY_ADDR_TYPE_LEGACY; ceph_decode_copy_safe(p, end, &addr->nonce, sizeof(addr->nonce), bad); memset(&addr->in_addr, 0, sizeof(addr->in_addr)); ceph_decode_copy_safe(p, end, &addr->in_addr, diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 8d0c51dd4666..0a3ef33cf7ac 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -199,12 +199,14 @@ const char *ceph_pr_addr(const struct ceph_entity_addr *addr) switch (ss.ss_family) { case AF_INET: - snprintf(s, MAX_ADDR_STR_LEN, "%pI4:%hu", &in4->sin_addr, + snprintf(s, MAX_ADDR_STR_LEN, "(%d)%pI4:%hu", + le32_to_cpu(addr->type), &in4->sin_addr, ntohs(in4->sin_port)); break; case AF_INET6: - snprintf(s, MAX_ADDR_STR_LEN, "[%pI6c]:%hu", &in6->sin6_addr, + snprintf(s, MAX_ADDR_STR_LEN, "(%d)[%pI6c]:%hu", + le32_to_cpu(addr->type), &in6->sin6_addr, ntohs(in6->sin6_port)); break; @@ -1982,6 +1984,7 @@ int ceph_parse_ips(const char *c, const char *end, } addr_set_port(&addr[i], port); + addr[i].type = CEPH_ENTITY_ADDR_TYPE_LEGACY; dout("parse_ips got %s\n", ceph_pr_addr(&addr[i])); -- cgit v1.2.3-55-g7522 From 2c66de560fa2dda0a600e908897116914db8f500 Mon Sep 17 00:00:00 2001 From: Jeff Layton Date: Mon, 17 Jun 2019 09:24:31 -0400 Subject: libceph: rename ceph_encode_addr to ceph_encode_banner_addr ...ditto for the decode function. We only use these functions to fix up banner addresses now, so let's name them more appropriately. Signed-off-by: Jeff Layton Reviewed-by: "Yan, Zheng" Signed-off-by: Ilya Dryomov --- include/linux/ceph/decode.h | 4 ++-- net/ceph/messenger.c | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) (limited to 'net/ceph') diff --git a/include/linux/ceph/decode.h b/include/linux/ceph/decode.h index ce488d95be89..450384fe487c 100644 --- a/include/linux/ceph/decode.h +++ b/include/linux/ceph/decode.h @@ -221,7 +221,7 @@ static inline void ceph_encode_timespec64(struct ceph_timespec *tv, #define CEPH_ENTITY_ADDR_TYPE_NONE 0 #define CEPH_ENTITY_ADDR_TYPE_LEGACY __cpu_to_le32(1) -static inline void ceph_encode_addr(struct ceph_entity_addr *a) +static inline void ceph_encode_banner_addr(struct ceph_entity_addr *a) { __be16 ss_family = htons(a->in_addr.ss_family); a->in_addr.ss_family = *(__u16 *)&ss_family; @@ -229,7 +229,7 @@ static inline void ceph_encode_addr(struct ceph_entity_addr *a) /* Banner addresses require TYPE_NONE */ a->type = CEPH_ENTITY_ADDR_TYPE_NONE; } -static inline void ceph_decode_addr(struct ceph_entity_addr *a) +static inline void ceph_decode_banner_addr(struct ceph_entity_addr *a) { __be16 ss_family = *(__be16 *)&a->in_addr.ss_family; a->in_addr.ss_family = ntohs(ss_family); diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 0a3ef33cf7ac..0473d9a7b1f4 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -222,7 +222,7 @@ EXPORT_SYMBOL(ceph_pr_addr); static void encode_my_addr(struct ceph_messenger *msgr) { memcpy(&msgr->my_enc_addr, &msgr->inst.addr, sizeof(msgr->my_enc_addr)); - ceph_encode_addr(&msgr->my_enc_addr); + ceph_encode_banner_addr(&msgr->my_enc_addr); } /* @@ -1734,14 +1734,14 @@ static int read_partial_banner(struct ceph_connection *con) ret = read_partial(con, end, size, &con->actual_peer_addr); if (ret <= 0) goto out; - ceph_decode_addr(&con->actual_peer_addr); + ceph_decode_banner_addr(&con->actual_peer_addr); size = sizeof (con->peer_addr_for_me); end += size; ret = read_partial(con, end, size, &con->peer_addr_for_me); if (ret <= 0) goto out; - ceph_decode_addr(&con->peer_addr_for_me); + ceph_decode_banner_addr(&con->peer_addr_for_me); out: return ret; -- cgit v1.2.3-55-g7522 From 94e85771881027e62afdddadd31e3eec73025990 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 8 Jul 2019 12:50:09 +0200 Subject: libceph: rename r_unsafe_item to r_private_item This list item remained from when we had safe and unsafe replies (commit vs ack). It has since become a private list item for use by clients. Signed-off-by: Ilya Dryomov --- fs/ceph/file.c | 6 +++--- include/linux/ceph/osd_client.h | 2 +- net/ceph/osd_client.c | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) (limited to 'net/ceph') diff --git a/fs/ceph/file.c b/fs/ceph/file.c index a06090c8281e..d5bee928603a 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -1026,7 +1026,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, req->r_callback = ceph_aio_complete_req; req->r_inode = inode; req->r_priv = aio_req; - list_add_tail(&req->r_unsafe_item, &aio_req->osd_reqs); + list_add_tail(&req->r_private_item, &aio_req->osd_reqs); pos += len; continue; @@ -1086,8 +1086,8 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, while (!list_empty(&osd_reqs)) { req = list_first_entry(&osd_reqs, struct ceph_osd_request, - r_unsafe_item); - list_del_init(&req->r_unsafe_item); + r_private_item); + list_del_init(&req->r_private_item); if (ret >= 0) ret = ceph_osdc_start_request(req->r_osdc, req, false); diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 2294f963dab7..024f6fed0ac5 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -198,9 +198,9 @@ struct ceph_osd_request { bool r_mempool; struct completion r_completion; /* private to osd_client.c */ ceph_osdc_callback_t r_callback; - struct list_head r_unsafe_item; struct inode *r_inode; /* for use by callbacks */ + struct list_head r_private_item; /* ditto */ void *r_priv; /* ditto */ /* set by submitter */ diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 54170a35ecec..6495982c5c07 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -478,7 +478,7 @@ static void request_release_checks(struct ceph_osd_request *req) { WARN_ON(!RB_EMPTY_NODE(&req->r_node)); WARN_ON(!RB_EMPTY_NODE(&req->r_mc_node)); - WARN_ON(!list_empty(&req->r_unsafe_item)); + WARN_ON(!list_empty(&req->r_private_item)); WARN_ON(req->r_osd); } @@ -538,7 +538,7 @@ static void request_init(struct ceph_osd_request *req) init_completion(&req->r_completion); RB_CLEAR_NODE(&req->r_node); RB_CLEAR_NODE(&req->r_mc_node); - INIT_LIST_HEAD(&req->r_unsafe_item); + INIT_LIST_HEAD(&req->r_private_item); target_init(&req->r_t); } -- cgit v1.2.3-55-g7522 From 68ada915eea10f36760ffe414810390a104df093 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 14 Jun 2019 18:16:51 +0200 Subject: libceph: change ceph_osdc_call() to take page vector for response This will be used for loading object map. rbd_obj_read_sync() isn't suitable because object map must be accessed through class methods. Signed-off-by: Ilya Dryomov Reviewed-by: Dongsheng Yang Reviewed-by: Jeff Layton --- drivers/block/rbd.c | 8 ++++---- include/linux/ceph/osd_client.h | 2 +- net/ceph/cls_lock_client.c | 2 +- net/ceph/osd_client.c | 10 +++++----- 4 files changed, 11 insertions(+), 11 deletions(-) (limited to 'net/ceph') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 6d1df82eb883..f0814c148b1c 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -4076,7 +4076,7 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name, CEPH_OSD_FLAG_READ, req_page, outbound_size, - reply_page, &inbound_size); + &reply_page, &inbound_size); if (!ret) { memcpy(inbound, page_address(reply_page), inbound_size); ret = inbound_size; @@ -5102,7 +5102,7 @@ static int __get_parent_info(struct rbd_device *rbd_dev, ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, "rbd", "parent_get", CEPH_OSD_FLAG_READ, - req_page, sizeof(u64), reply_page, &reply_len); + req_page, sizeof(u64), &reply_page, &reply_len); if (ret) return ret == -EOPNOTSUPP ? 1 : ret; @@ -5114,7 +5114,7 @@ static int __get_parent_info(struct rbd_device *rbd_dev, ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, "rbd", "parent_overlap_get", CEPH_OSD_FLAG_READ, - req_page, sizeof(u64), reply_page, &reply_len); + req_page, sizeof(u64), &reply_page, &reply_len); if (ret) return ret; @@ -5145,7 +5145,7 @@ static int __get_parent_info_legacy(struct rbd_device *rbd_dev, ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, "rbd", "get_parent", CEPH_OSD_FLAG_READ, - req_page, sizeof(u64), reply_page, &reply_len); + req_page, sizeof(u64), &reply_page, &reply_len); if (ret) return ret; diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 024f6fed0ac5..c567cfa4f107 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -497,7 +497,7 @@ int ceph_osdc_call(struct ceph_osd_client *osdc, const char *class, const char *method, unsigned int flags, struct page *req_page, size_t req_len, - struct page *resp_page, size_t *resp_len); + struct page **resp_pages, size_t *resp_len); extern int ceph_osdc_readpages(struct ceph_osd_client *osdc, struct ceph_vino vino, diff --git a/net/ceph/cls_lock_client.c b/net/ceph/cls_lock_client.c index b1d12bf4b83e..fb59094caf13 100644 --- a/net/ceph/cls_lock_client.c +++ b/net/ceph/cls_lock_client.c @@ -363,7 +363,7 @@ int ceph_cls_lock_info(struct ceph_osd_client *osdc, dout("%s lock_name %s\n", __func__, lock_name); ret = ceph_osdc_call(osdc, oid, oloc, "lock", "get_info", CEPH_OSD_FLAG_READ, get_info_op_page, - get_info_op_buf_size, reply_page, &reply_len); + get_info_op_buf_size, &reply_page, &reply_len); dout("%s: status %d\n", __func__, ret); if (ret >= 0) { diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 6495982c5c07..a90fbfce7e93 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -5050,12 +5050,12 @@ int ceph_osdc_call(struct ceph_osd_client *osdc, const char *class, const char *method, unsigned int flags, struct page *req_page, size_t req_len, - struct page *resp_page, size_t *resp_len) + struct page **resp_pages, size_t *resp_len) { struct ceph_osd_request *req; int ret; - if (req_len > PAGE_SIZE || (resp_page && *resp_len > PAGE_SIZE)) + if (req_len > PAGE_SIZE) return -E2BIG; req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); @@ -5073,8 +5073,8 @@ int ceph_osdc_call(struct ceph_osd_client *osdc, if (req_page) osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len, 0, false, false); - if (resp_page) - osd_req_op_cls_response_data_pages(req, 0, &resp_page, + if (resp_pages) + osd_req_op_cls_response_data_pages(req, 0, resp_pages, *resp_len, 0, false, false); ret = ceph_osdc_alloc_messages(req, GFP_NOIO); @@ -5085,7 +5085,7 @@ int ceph_osdc_call(struct ceph_osd_client *osdc, ret = ceph_osdc_wait_request(osdc, req); if (ret >= 0) { ret = req->r_ops[0].rval; - if (resp_page) + if (resp_pages) *resp_len = req->r_ops[0].outdata_len; } -- cgit v1.2.3-55-g7522 From 4cf3e6dff7ea517544e1da7810a0b3ebba380d2c Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 14 Jun 2019 18:00:19 +0200 Subject: libceph: export osd_req_op_data() macro We already have one exported wrapper around it for extent.osd_data and rbd_object_map_update_finish() needs another one for cls.request_data. Signed-off-by: Ilya Dryomov Reviewed-by: Dongsheng Yang Reviewed-by: Jeff Layton --- include/linux/ceph/osd_client.h | 8 ++++++++ net/ceph/osd_client.c | 8 -------- 2 files changed, 8 insertions(+), 8 deletions(-) (limited to 'net/ceph') diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index c567cfa4f107..ad7fe5d10dcd 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -389,6 +389,14 @@ extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc, void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb); void ceph_osdc_abort_requests(struct ceph_osd_client *osdc, int err); +#define osd_req_op_data(oreq, whch, typ, fld) \ +({ \ + struct ceph_osd_request *__oreq = (oreq); \ + unsigned int __whch = (whch); \ + BUG_ON(__whch >= __oreq->r_num_ops); \ + &__oreq->r_ops[__whch].typ.fld; \ +}) + extern void osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which, u16 opcode, u32 flags); diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index a90fbfce7e93..0b2df09b2554 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -171,14 +171,6 @@ static void ceph_osd_data_bvecs_init(struct ceph_osd_data *osd_data, osd_data->num_bvecs = num_bvecs; } -#define osd_req_op_data(oreq, whch, typ, fld) \ -({ \ - struct ceph_osd_request *__oreq = (oreq); \ - unsigned int __whch = (whch); \ - BUG_ON(__whch >= __oreq->r_num_ops); \ - &__oreq->r_ops[__whch].typ.fld; \ -}) - static struct ceph_osd_data * osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which) { -- cgit v1.2.3-55-g7522 From 22e8bd51bb0469d1a524130a057f894ff632376a Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Wed, 5 Jun 2019 19:25:11 +0200 Subject: rbd: support for object-map and fast-diff Speed up reads, discards and zeroouts through RBD_OBJ_FLAG_MAY_EXIST and RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT based on object map. Invalid object maps are not trusted, but still updated. Note that we never iterate, resize or invalidate object maps. If object-map feature is enabled but object map fails to load, we just fail the requester (either "rbd map" or I/O, by way of post-acquire action). Signed-off-by: Ilya Dryomov --- drivers/block/rbd.c | 720 ++++++++++++++++++++++++++++++++++- drivers/block/rbd_types.h | 10 + include/linux/ceph/cls_lock_client.h | 3 + include/linux/ceph/striper.h | 2 + net/ceph/cls_lock_client.c | 45 +++ net/ceph/striper.c | 17 + 6 files changed, 794 insertions(+), 3 deletions(-) (limited to 'net/ceph') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 3d861d3013f8..0df91665c4eb 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -115,6 +115,8 @@ static int atomic_dec_return_safe(atomic_t *v) #define RBD_FEATURE_LAYERING (1ULL<<0) #define RBD_FEATURE_STRIPINGV2 (1ULL<<1) #define RBD_FEATURE_EXCLUSIVE_LOCK (1ULL<<2) +#define RBD_FEATURE_OBJECT_MAP (1ULL<<3) +#define RBD_FEATURE_FAST_DIFF (1ULL<<4) #define RBD_FEATURE_DEEP_FLATTEN (1ULL<<5) #define RBD_FEATURE_DATA_POOL (1ULL<<7) #define RBD_FEATURE_OPERATIONS (1ULL<<8) @@ -122,6 +124,8 @@ static int atomic_dec_return_safe(atomic_t *v) #define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \ RBD_FEATURE_STRIPINGV2 | \ RBD_FEATURE_EXCLUSIVE_LOCK | \ + RBD_FEATURE_OBJECT_MAP | \ + RBD_FEATURE_FAST_DIFF | \ RBD_FEATURE_DEEP_FLATTEN | \ RBD_FEATURE_DATA_POOL | \ RBD_FEATURE_OPERATIONS) @@ -227,6 +231,8 @@ enum obj_operation_type { #define RBD_OBJ_FLAG_DELETION (1U << 0) #define RBD_OBJ_FLAG_COPYUP_ENABLED (1U << 1) #define RBD_OBJ_FLAG_COPYUP_ZEROS (1U << 2) +#define RBD_OBJ_FLAG_MAY_EXIST (1U << 3) +#define RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT (1U << 4) enum rbd_obj_read_state { RBD_OBJ_READ_START = 1, @@ -261,14 +267,18 @@ enum rbd_obj_read_state { */ enum rbd_obj_write_state { RBD_OBJ_WRITE_START = 1, + RBD_OBJ_WRITE_PRE_OBJECT_MAP, RBD_OBJ_WRITE_OBJECT, __RBD_OBJ_WRITE_COPYUP, RBD_OBJ_WRITE_COPYUP, + RBD_OBJ_WRITE_POST_OBJECT_MAP, }; enum rbd_obj_copyup_state { RBD_OBJ_COPYUP_START = 1, RBD_OBJ_COPYUP_READ_PARENT, + __RBD_OBJ_COPYUP_OBJECT_MAPS, + RBD_OBJ_COPYUP_OBJECT_MAPS, __RBD_OBJ_COPYUP_WRITE_OBJECT, RBD_OBJ_COPYUP_WRITE_OBJECT, }; @@ -419,6 +429,11 @@ struct rbd_device { int acquire_err; struct completion releasing_wait; + spinlock_t object_map_lock; + u8 *object_map; + u64 object_map_size; /* in objects */ + u64 object_map_flags; + struct workqueue_struct *task_wq; struct rbd_spec *parent_spec; @@ -620,6 +635,7 @@ static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id, u8 *order, u64 *snap_size); static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, u64 *snap_features); +static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev); static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result); static void rbd_img_handle_request(struct rbd_img_request *img_req, int result); @@ -1768,6 +1784,466 @@ static void rbd_img_request_destroy(struct kref *kref) kmem_cache_free(rbd_img_request_cache, img_request); } +#define BITS_PER_OBJ 2 +#define OBJS_PER_BYTE (BITS_PER_BYTE / BITS_PER_OBJ) +#define OBJ_MASK ((1 << BITS_PER_OBJ) - 1) + +static void __rbd_object_map_index(struct rbd_device *rbd_dev, u64 objno, + u64 *index, u8 *shift) +{ + u32 off; + + rbd_assert(objno < rbd_dev->object_map_size); + *index = div_u64_rem(objno, OBJS_PER_BYTE, &off); + *shift = (OBJS_PER_BYTE - off - 1) * BITS_PER_OBJ; +} + +static u8 __rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno) +{ + u64 index; + u8 shift; + + lockdep_assert_held(&rbd_dev->object_map_lock); + __rbd_object_map_index(rbd_dev, objno, &index, &shift); + return (rbd_dev->object_map[index] >> shift) & OBJ_MASK; +} + +static void __rbd_object_map_set(struct rbd_device *rbd_dev, u64 objno, u8 val) +{ + u64 index; + u8 shift; + u8 *p; + + lockdep_assert_held(&rbd_dev->object_map_lock); + rbd_assert(!(val & ~OBJ_MASK)); + + __rbd_object_map_index(rbd_dev, objno, &index, &shift); + p = &rbd_dev->object_map[index]; + *p = (*p & ~(OBJ_MASK << shift)) | (val << shift); +} + +static u8 rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno) +{ + u8 state; + + spin_lock(&rbd_dev->object_map_lock); + state = __rbd_object_map_get(rbd_dev, objno); + spin_unlock(&rbd_dev->object_map_lock); + return state; +} + +static bool use_object_map(struct rbd_device *rbd_dev) +{ + return ((rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) && + !(rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID)); +} + +static bool rbd_object_map_may_exist(struct rbd_device *rbd_dev, u64 objno) +{ + u8 state; + + /* fall back to default logic if object map is disabled or invalid */ + if (!use_object_map(rbd_dev)) + return true; + + state = rbd_object_map_get(rbd_dev, objno); + return state != OBJECT_NONEXISTENT; +} + +static void rbd_object_map_name(struct rbd_device *rbd_dev, u64 snap_id, + struct ceph_object_id *oid) +{ + if (snap_id == CEPH_NOSNAP) + ceph_oid_printf(oid, "%s%s", RBD_OBJECT_MAP_PREFIX, + rbd_dev->spec->image_id); + else + ceph_oid_printf(oid, "%s%s.%016llx", RBD_OBJECT_MAP_PREFIX, + rbd_dev->spec->image_id, snap_id); +} + +static int rbd_object_map_lock(struct rbd_device *rbd_dev) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + CEPH_DEFINE_OID_ONSTACK(oid); + u8 lock_type; + char *lock_tag; + struct ceph_locker *lockers; + u32 num_lockers; + bool broke_lock = false; + int ret; + + rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid); + +again: + ret = ceph_cls_lock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME, + CEPH_CLS_LOCK_EXCLUSIVE, "", "", "", 0); + if (ret != -EBUSY || broke_lock) { + if (ret == -EEXIST) + ret = 0; /* already locked by myself */ + if (ret) + rbd_warn(rbd_dev, "failed to lock object map: %d", ret); + return ret; + } + + ret = ceph_cls_lock_info(osdc, &oid, &rbd_dev->header_oloc, + RBD_LOCK_NAME, &lock_type, &lock_tag, + &lockers, &num_lockers); + if (ret) { + if (ret == -ENOENT) + goto again; + + rbd_warn(rbd_dev, "failed to get object map lockers: %d", ret); + return ret; + } + + kfree(lock_tag); + if (num_lockers == 0) + goto again; + + rbd_warn(rbd_dev, "breaking object map lock owned by %s%llu", + ENTITY_NAME(lockers[0].id.name)); + + ret = ceph_cls_break_lock(osdc, &oid, &rbd_dev->header_oloc, + RBD_LOCK_NAME, lockers[0].id.cookie, + &lockers[0].id.name); + ceph_free_lockers(lockers, num_lockers); + if (ret) { + if (ret == -ENOENT) + goto again; + + rbd_warn(rbd_dev, "failed to break object map lock: %d", ret); + return ret; + } + + broke_lock = true; + goto again; +} + +static void rbd_object_map_unlock(struct rbd_device *rbd_dev) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + CEPH_DEFINE_OID_ONSTACK(oid); + int ret; + + rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid); + + ret = ceph_cls_unlock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME, + ""); + if (ret && ret != -ENOENT) + rbd_warn(rbd_dev, "failed to unlock object map: %d", ret); +} + +static int decode_object_map_header(void **p, void *end, u64 *object_map_size) +{ + u8 struct_v; + u32 struct_len; + u32 header_len; + void *header_end; + int ret; + + ceph_decode_32_safe(p, end, header_len, e_inval); + header_end = *p + header_len; + + ret = ceph_start_decoding(p, end, 1, "BitVector header", &struct_v, + &struct_len); + if (ret) + return ret; + + ceph_decode_64_safe(p, end, *object_map_size, e_inval); + + *p = header_end; + return 0; + +e_inval: + return -EINVAL; +} + +static int __rbd_object_map_load(struct rbd_device *rbd_dev) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + CEPH_DEFINE_OID_ONSTACK(oid); + struct page **pages; + void *p, *end; + size_t reply_len; + u64 num_objects; + u64 object_map_bytes; + u64 object_map_size; + int num_pages; + int ret; + + rbd_assert(!rbd_dev->object_map && !rbd_dev->object_map_size); + + num_objects = ceph_get_num_objects(&rbd_dev->layout, + rbd_dev->mapping.size); + object_map_bytes = DIV_ROUND_UP_ULL(num_objects * BITS_PER_OBJ, + BITS_PER_BYTE); + num_pages = calc_pages_for(0, object_map_bytes) + 1; + pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); + if (IS_ERR(pages)) + return PTR_ERR(pages); + + reply_len = num_pages * PAGE_SIZE; + rbd_object_map_name(rbd_dev, rbd_dev->spec->snap_id, &oid); + ret = ceph_osdc_call(osdc, &oid, &rbd_dev->header_oloc, + "rbd", "object_map_load", CEPH_OSD_FLAG_READ, + NULL, 0, pages, &reply_len); + if (ret) + goto out; + + p = page_address(pages[0]); + end = p + min(reply_len, (size_t)PAGE_SIZE); + ret = decode_object_map_header(&p, end, &object_map_size); + if (ret) + goto out; + + if (object_map_size != num_objects) { + rbd_warn(rbd_dev, "object map size mismatch: %llu vs %llu", + object_map_size, num_objects); + ret = -EINVAL; + goto out; + } + + if (offset_in_page(p) + object_map_bytes > reply_len) { + ret = -EINVAL; + goto out; + } + + rbd_dev->object_map = kvmalloc(object_map_bytes, GFP_KERNEL); + if (!rbd_dev->object_map) { + ret = -ENOMEM; + goto out; + } + + rbd_dev->object_map_size = object_map_size; + ceph_copy_from_page_vector(pages, rbd_dev->object_map, + offset_in_page(p), object_map_bytes); + +out: + ceph_release_page_vector(pages, num_pages); + return ret; +} + +static void rbd_object_map_free(struct rbd_device *rbd_dev) +{ + kvfree(rbd_dev->object_map); + rbd_dev->object_map = NULL; + rbd_dev->object_map_size = 0; +} + +static int rbd_object_map_load(struct rbd_device *rbd_dev) +{ + int ret; + + ret = __rbd_object_map_load(rbd_dev); + if (ret) + return ret; + + ret = rbd_dev_v2_get_flags(rbd_dev); + if (ret) { + rbd_object_map_free(rbd_dev); + return ret; + } + + if (rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID) + rbd_warn(rbd_dev, "object map is invalid"); + + return 0; +} + +static int rbd_object_map_open(struct rbd_device *rbd_dev) +{ + int ret; + + ret = rbd_object_map_lock(rbd_dev); + if (ret) + return ret; + + ret = rbd_object_map_load(rbd_dev); + if (ret) { + rbd_object_map_unlock(rbd_dev); + return ret; + } + + return 0; +} + +static void rbd_object_map_close(struct rbd_device *rbd_dev) +{ + rbd_object_map_free(rbd_dev); + rbd_object_map_unlock(rbd_dev); +} + +/* + * This function needs snap_id (or more precisely just something to + * distinguish between HEAD and snapshot object maps), new_state and + * current_state that were passed to rbd_object_map_update(). + * + * To avoid allocating and stashing a context we piggyback on the OSD + * request. A HEAD update has two ops (assert_locked). For new_state + * and current_state we decode our own object_map_update op, encoded in + * rbd_cls_object_map_update(). + */ +static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req, + struct ceph_osd_request *osd_req) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + struct ceph_osd_data *osd_data; + u64 objno; + u8 state, new_state, current_state; + bool has_current_state; + void *p; + + if (osd_req->r_result) + return osd_req->r_result; + + /* + * Nothing to do for a snapshot object map. + */ + if (osd_req->r_num_ops == 1) + return 0; + + /* + * Update in-memory HEAD object map. + */ + rbd_assert(osd_req->r_num_ops == 2); + osd_data = osd_req_op_data(osd_req, 1, cls, request_data); + rbd_assert(osd_data->type == CEPH_OSD_DATA_TYPE_PAGES); + + p = page_address(osd_data->pages[0]); + objno = ceph_decode_64(&p); + rbd_assert(objno == obj_req->ex.oe_objno); + rbd_assert(ceph_decode_64(&p) == objno + 1); + new_state = ceph_decode_8(&p); + has_current_state = ceph_decode_8(&p); + if (has_current_state) + current_state = ceph_decode_8(&p); + + spin_lock(&rbd_dev->object_map_lock); + state = __rbd_object_map_get(rbd_dev, objno); + if (!has_current_state || current_state == state || + (current_state == OBJECT_EXISTS && state == OBJECT_EXISTS_CLEAN)) + __rbd_object_map_set(rbd_dev, objno, new_state); + spin_unlock(&rbd_dev->object_map_lock); + + return 0; +} + +static void rbd_object_map_callback(struct ceph_osd_request *osd_req) +{ + struct rbd_obj_request *obj_req = osd_req->r_priv; + int result; + + dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req, + osd_req->r_result, obj_req); + + result = rbd_object_map_update_finish(obj_req, osd_req); + rbd_obj_handle_request(obj_req, result); +} + +static bool update_needed(struct rbd_device *rbd_dev, u64 objno, u8 new_state) +{ + u8 state = rbd_object_map_get(rbd_dev, objno); + + if (state == new_state || + (new_state == OBJECT_PENDING && state == OBJECT_NONEXISTENT) || + (new_state == OBJECT_NONEXISTENT && state != OBJECT_PENDING)) + return false; + + return true; +} + +static int rbd_cls_object_map_update(struct ceph_osd_request *req, + int which, u64 objno, u8 new_state, + const u8 *current_state) +{ + struct page **pages; + void *p, *start; + int ret; + + ret = osd_req_op_cls_init(req, which, "rbd", "object_map_update"); + if (ret) + return ret; + + pages = ceph_alloc_page_vector(1, GFP_NOIO); + if (IS_ERR(pages)) + return PTR_ERR(pages); + + p = start = page_address(pages[0]); + ceph_encode_64(&p, objno); + ceph_encode_64(&p, objno + 1); + ceph_encode_8(&p, new_state); + if (current_state) { + ceph_encode_8(&p, 1); + ceph_encode_8(&p, *current_state); + } else { + ceph_encode_8(&p, 0); + } + + osd_req_op_cls_request_data_pages(req, which, pages, p - start, 0, + false, true); + return 0; +} + +/* + * Return: + * 0 - object map update sent + * 1 - object map update isn't needed + * <0 - error + */ +static int rbd_object_map_update(struct rbd_obj_request *obj_req, u64 snap_id, + u8 new_state, const u8 *current_state) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + struct ceph_osd_request *req; + int num_ops = 1; + int which = 0; + int ret; + + if (snap_id == CEPH_NOSNAP) { + if (!update_needed(rbd_dev, obj_req->ex.oe_objno, new_state)) + return 1; + + num_ops++; /* assert_locked */ + } + + req = ceph_osdc_alloc_request(osdc, NULL, num_ops, false, GFP_NOIO); + if (!req) + return -ENOMEM; + + list_add_tail(&req->r_private_item, &obj_req->osd_reqs); + req->r_callback = rbd_object_map_callback; + req->r_priv = obj_req; + + rbd_object_map_name(rbd_dev, snap_id, &req->r_base_oid); + ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc); + req->r_flags = CEPH_OSD_FLAG_WRITE; + ktime_get_real_ts64(&req->r_mtime); + + if (snap_id == CEPH_NOSNAP) { + /* + * Protect against possible race conditions during lock + * ownership transitions. + */ + ret = ceph_cls_assert_locked(req, which++, RBD_LOCK_NAME, + CEPH_CLS_LOCK_EXCLUSIVE, "", ""); + if (ret) + return ret; + } + + ret = rbd_cls_object_map_update(req, which, obj_req->ex.oe_objno, + new_state, current_state); + if (ret) + return ret; + + ret = ceph_osdc_alloc_messages(req, GFP_NOIO); + if (ret) + return ret; + + ceph_osdc_start_request(osdc, req, false); + return 0; +} + static void prune_extents(struct ceph_file_extent *img_extents, u32 *num_img_extents, u64 overlap) { @@ -1975,6 +2451,7 @@ static int rbd_obj_init_discard(struct rbd_obj_request *obj_req) if (ret) return ret; + obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT; if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) obj_req->flags |= RBD_OBJ_FLAG_DELETION; @@ -2022,6 +2499,7 @@ static int rbd_obj_init_zeroout(struct rbd_obj_request *obj_req) if (rbd_obj_copyup_enabled(obj_req)) obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED; if (!obj_req->num_img_extents) { + obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT; if (rbd_obj_is_entire(obj_req)) obj_req->flags |= RBD_OBJ_FLAG_DELETION; } @@ -2407,6 +2885,20 @@ static void rbd_img_schedule(struct rbd_img_request *img_req, int result) queue_work(rbd_wq, &img_req->work); } +static bool rbd_obj_may_exist(struct rbd_obj_request *obj_req) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + + if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno)) { + obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST; + return true; + } + + dout("%s %p objno %llu assuming dne\n", __func__, obj_req, + obj_req->ex.oe_objno); + return false; +} + static int rbd_obj_read_object(struct rbd_obj_request *obj_req) { struct ceph_osd_request *osd_req; @@ -2482,10 +2974,17 @@ static bool rbd_obj_advance_read(struct rbd_obj_request *obj_req, int *result) struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; int ret; +again: switch (obj_req->read_state) { case RBD_OBJ_READ_START: rbd_assert(!*result); + if (!rbd_obj_may_exist(obj_req)) { + *result = -ENOENT; + obj_req->read_state = RBD_OBJ_READ_OBJECT; + goto again; + } + ret = rbd_obj_read_object(obj_req); if (ret) { *result = ret; @@ -2536,6 +3035,44 @@ static bool rbd_obj_advance_read(struct rbd_obj_request *obj_req, int *result) } } +static bool rbd_obj_write_is_noop(struct rbd_obj_request *obj_req) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + + if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno)) + obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST; + + if (!(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST) && + (obj_req->flags & RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT)) { + dout("%s %p noop for nonexistent\n", __func__, obj_req); + return true; + } + + return false; +} + +/* + * Return: + * 0 - object map update sent + * 1 - object map update isn't needed + * <0 - error + */ +static int rbd_obj_write_pre_object_map(struct rbd_obj_request *obj_req) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + u8 new_state; + + if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) + return 1; + + if (obj_req->flags & RBD_OBJ_FLAG_DELETION) + new_state = OBJECT_PENDING; + else + new_state = OBJECT_EXISTS; + + return rbd_object_map_update(obj_req, CEPH_NOSNAP, new_state, NULL); +} + static int rbd_obj_write_object(struct rbd_obj_request *obj_req) { struct ceph_osd_request *osd_req; @@ -2706,6 +3243,41 @@ static int rbd_obj_copyup_read_parent(struct rbd_obj_request *obj_req) return rbd_obj_read_from_parent(obj_req); } +static void rbd_obj_copyup_object_maps(struct rbd_obj_request *obj_req) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + struct ceph_snap_context *snapc = obj_req->img_request->snapc; + u8 new_state; + u32 i; + int ret; + + rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending); + + if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) + return; + + if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS) + return; + + for (i = 0; i < snapc->num_snaps; i++) { + if ((rbd_dev->header.features & RBD_FEATURE_FAST_DIFF) && + i + 1 < snapc->num_snaps) + new_state = OBJECT_EXISTS_CLEAN; + else + new_state = OBJECT_EXISTS; + + ret = rbd_object_map_update(obj_req, snapc->snaps[i], + new_state, NULL); + if (ret < 0) { + obj_req->pending.result = ret; + return; + } + + rbd_assert(!ret); + obj_req->pending.num_pending++; + } +} + static void rbd_obj_copyup_write_object(struct rbd_obj_request *obj_req) { u32 bytes = rbd_obj_img_extents_bytes(obj_req); @@ -2749,6 +3321,7 @@ static void rbd_obj_copyup_write_object(struct rbd_obj_request *obj_req) static bool rbd_obj_advance_copyup(struct rbd_obj_request *obj_req, int *result) { + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; int ret; again: @@ -2776,6 +3349,25 @@ again: obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ZEROS; } + rbd_obj_copyup_object_maps(obj_req); + if (!obj_req->pending.num_pending) { + *result = obj_req->pending.result; + obj_req->copyup_state = RBD_OBJ_COPYUP_OBJECT_MAPS; + goto again; + } + obj_req->copyup_state = __RBD_OBJ_COPYUP_OBJECT_MAPS; + return false; + case __RBD_OBJ_COPYUP_OBJECT_MAPS: + if (!pending_result_dec(&obj_req->pending, result)) + return false; + /* fall through */ + case RBD_OBJ_COPYUP_OBJECT_MAPS: + if (*result) { + rbd_warn(rbd_dev, "snap object map update failed: %d", + *result); + return true; + } + rbd_obj_copyup_write_object(obj_req); if (!obj_req->pending.num_pending) { *result = obj_req->pending.result; @@ -2795,6 +3387,27 @@ again: } } +/* + * Return: + * 0 - object map update sent + * 1 - object map update isn't needed + * <0 - error + */ +static int rbd_obj_write_post_object_map(struct rbd_obj_request *obj_req) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + u8 current_state = OBJECT_PENDING; + + if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) + return 1; + + if (!(obj_req->flags & RBD_OBJ_FLAG_DELETION)) + return 1; + + return rbd_object_map_update(obj_req, CEPH_NOSNAP, OBJECT_NONEXISTENT, + ¤t_state); +} + static bool rbd_obj_advance_write(struct rbd_obj_request *obj_req, int *result) { struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; @@ -2805,6 +3418,24 @@ again: case RBD_OBJ_WRITE_START: rbd_assert(!*result); + if (rbd_obj_write_is_noop(obj_req)) + return true; + + ret = rbd_obj_write_pre_object_map(obj_req); + if (ret < 0) { + *result = ret; + return true; + } + obj_req->write_state = RBD_OBJ_WRITE_PRE_OBJECT_MAP; + if (ret > 0) + goto again; + return false; + case RBD_OBJ_WRITE_PRE_OBJECT_MAP: + if (*result) { + rbd_warn(rbd_dev, "pre object map update failed: %d", + *result); + return true; + } ret = rbd_obj_write_object(obj_req); if (ret) { *result = ret; @@ -2837,8 +3468,23 @@ again: return false; /* fall through */ case RBD_OBJ_WRITE_COPYUP: - if (*result) + if (*result) { rbd_warn(rbd_dev, "copyup failed: %d", *result); + return true; + } + ret = rbd_obj_write_post_object_map(obj_req); + if (ret < 0) { + *result = ret; + return true; + } + obj_req->write_state = RBD_OBJ_WRITE_POST_OBJECT_MAP; + if (ret > 0) + goto again; + return false; + case RBD_OBJ_WRITE_POST_OBJECT_MAP: + if (*result) + rbd_warn(rbd_dev, "post object map update failed: %d", + *result); return true; default: BUG(); @@ -2892,7 +3538,8 @@ static bool need_exclusive_lock(struct rbd_img_request *img_req) return false; rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags)); - if (rbd_dev->opts->lock_on_read) + if (rbd_dev->opts->lock_on_read || + (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) return true; return rbd_img_is_write(img_req); @@ -3431,7 +4078,7 @@ static int rbd_try_lock(struct rbd_device *rbd_dev) if (ret) goto out; /* request lock or error */ - rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock", + rbd_warn(rbd_dev, "breaking header lock owned by %s%llu", ENTITY_NAME(lockers[0].id.name)); ret = ceph_monc_blacklist_add(&client->monc, @@ -3458,6 +4105,19 @@ out: return ret; } +static int rbd_post_acquire_action(struct rbd_device *rbd_dev) +{ + int ret; + + if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) { + ret = rbd_object_map_open(rbd_dev); + if (ret) + return ret; + } + + return 0; +} + /* * Return: * 0 - lock acquired @@ -3501,6 +4161,17 @@ static int rbd_try_acquire_lock(struct rbd_device *rbd_dev) rbd_assert(rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED); rbd_assert(list_empty(&rbd_dev->running_list)); + ret = rbd_post_acquire_action(rbd_dev); + if (ret) { + rbd_warn(rbd_dev, "post-acquire action failed: %d", ret); + /* + * Can't stay in RBD_LOCK_STATE_LOCKED because + * rbd_lock_add_request() would let the request through, + * assuming that e.g. object map is locked and loaded. + */ + rbd_unlock(rbd_dev); + } + out: wake_lock_waiters(rbd_dev, ret); up_write(&rbd_dev->lock_rwsem); @@ -3574,10 +4245,17 @@ static bool rbd_quiesce_lock(struct rbd_device *rbd_dev) return true; } +static void rbd_pre_release_action(struct rbd_device *rbd_dev) +{ + if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) + rbd_object_map_close(rbd_dev); +} + static void __rbd_release_lock(struct rbd_device *rbd_dev) { rbd_assert(list_empty(&rbd_dev->running_list)); + rbd_pre_release_action(rbd_dev); rbd_unlock(rbd_dev); } @@ -4864,6 +5542,8 @@ static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc, init_completion(&rbd_dev->acquire_wait); init_completion(&rbd_dev->releasing_wait); + spin_lock_init(&rbd_dev->object_map_lock); + rbd_dev->dev.bus = &rbd_bus_type; rbd_dev->dev.type = &rbd_device_type; rbd_dev->dev.parent = &rbd_root_dev; @@ -5045,6 +5725,32 @@ static int rbd_dev_v2_features(struct rbd_device *rbd_dev) &rbd_dev->header.features); } +/* + * These are generic image flags, but since they are used only for + * object map, store them in rbd_dev->object_map_flags. + * + * For the same reason, this function is called only on object map + * (re)load and not on header refresh. + */ +static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev) +{ + __le64 snapid = cpu_to_le64(rbd_dev->spec->snap_id); + __le64 flags; + int ret; + + ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, + &rbd_dev->header_oloc, "get_flags", + &snapid, sizeof(snapid), + &flags, sizeof(flags)); + if (ret < 0) + return ret; + if (ret < sizeof(flags)) + return -EBADMSG; + + rbd_dev->object_map_flags = le64_to_cpu(flags); + return 0; +} + struct parent_image_info { u64 pool_id; const char *pool_ns; @@ -6018,6 +6724,7 @@ static void rbd_dev_unprobe(struct rbd_device *rbd_dev) struct rbd_image_header *header; rbd_dev_parent_put(rbd_dev); + rbd_object_map_free(rbd_dev); rbd_dev_mapping_clear(rbd_dev); /* Free dynamic fields from the header, then zero it out */ @@ -6267,6 +6974,13 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth) if (ret) goto err_out_probe; + if (rbd_dev->spec->snap_id != CEPH_NOSNAP && + (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) { + ret = rbd_object_map_load(rbd_dev); + if (ret) + goto err_out_probe; + } + if (rbd_dev->header.features & RBD_FEATURE_LAYERING) { ret = rbd_dev_v2_parent_info(rbd_dev); if (ret) diff --git a/drivers/block/rbd_types.h b/drivers/block/rbd_types.h index 62ff50d3e7a6..ac98ab6ccd3b 100644 --- a/drivers/block/rbd_types.h +++ b/drivers/block/rbd_types.h @@ -18,6 +18,7 @@ /* For format version 2, rbd image 'foo' consists of objects * rbd_id.foo - id of image * rbd_header. - image metadata + * rbd_object_map. - optional image object map * rbd_data..0000000000000000 * rbd_data..0000000000000001 * ... - data @@ -25,6 +26,7 @@ */ #define RBD_HEADER_PREFIX "rbd_header." +#define RBD_OBJECT_MAP_PREFIX "rbd_object_map." #define RBD_ID_PREFIX "rbd_id." #define RBD_V2_DATA_FORMAT "%s.%016llx" @@ -39,6 +41,14 @@ enum rbd_notify_op { RBD_NOTIFY_OP_HEADER_UPDATE = 3, }; +#define OBJECT_NONEXISTENT 0 +#define OBJECT_EXISTS 1 +#define OBJECT_PENDING 2 +#define OBJECT_EXISTS_CLEAN 3 + +#define RBD_FLAG_OBJECT_MAP_INVALID (1ULL << 0) +#define RBD_FLAG_FAST_DIFF_INVALID (1ULL << 1) + /* * For format version 1, rbd image 'foo' consists of objects * foo.rbd - image metadata diff --git a/include/linux/ceph/cls_lock_client.h b/include/linux/ceph/cls_lock_client.h index bea6c77d2093..17bc7584d1fe 100644 --- a/include/linux/ceph/cls_lock_client.h +++ b/include/linux/ceph/cls_lock_client.h @@ -52,4 +52,7 @@ int ceph_cls_lock_info(struct ceph_osd_client *osdc, char *lock_name, u8 *type, char **tag, struct ceph_locker **lockers, u32 *num_lockers); +int ceph_cls_assert_locked(struct ceph_osd_request *req, int which, + char *lock_name, u8 type, char *cookie, char *tag); + #endif diff --git a/include/linux/ceph/striper.h b/include/linux/ceph/striper.h index cbd0d24b7148..3486636c0e6e 100644 --- a/include/linux/ceph/striper.h +++ b/include/linux/ceph/striper.h @@ -66,4 +66,6 @@ int ceph_extent_to_file(struct ceph_file_layout *l, struct ceph_file_extent **file_extents, u32 *num_file_extents); +u64 ceph_get_num_objects(struct ceph_file_layout *l, u64 size); + #endif diff --git a/net/ceph/cls_lock_client.c b/net/ceph/cls_lock_client.c index fb59094caf13..17447c19d937 100644 --- a/net/ceph/cls_lock_client.c +++ b/net/ceph/cls_lock_client.c @@ -6,6 +6,7 @@ #include #include +#include /** * ceph_cls_lock - grab rados lock for object @@ -378,3 +379,47 @@ int ceph_cls_lock_info(struct ceph_osd_client *osdc, return ret; } EXPORT_SYMBOL(ceph_cls_lock_info); + +int ceph_cls_assert_locked(struct ceph_osd_request *req, int which, + char *lock_name, u8 type, char *cookie, char *tag) +{ + int assert_op_buf_size; + int name_len = strlen(lock_name); + int cookie_len = strlen(cookie); + int tag_len = strlen(tag); + struct page **pages; + void *p, *end; + int ret; + + assert_op_buf_size = name_len + sizeof(__le32) + + cookie_len + sizeof(__le32) + + tag_len + sizeof(__le32) + + sizeof(u8) + CEPH_ENCODING_START_BLK_LEN; + if (assert_op_buf_size > PAGE_SIZE) + return -E2BIG; + + ret = osd_req_op_cls_init(req, which, "lock", "assert_locked"); + if (ret) + return ret; + + pages = ceph_alloc_page_vector(1, GFP_NOIO); + if (IS_ERR(pages)) + return PTR_ERR(pages); + + p = page_address(pages[0]); + end = p + assert_op_buf_size; + + /* encode cls_lock_assert_op struct */ + ceph_start_encoding(&p, 1, 1, + assert_op_buf_size - CEPH_ENCODING_START_BLK_LEN); + ceph_encode_string(&p, end, lock_name, name_len); + ceph_encode_8(&p, type); + ceph_encode_string(&p, end, cookie, cookie_len); + ceph_encode_string(&p, end, tag, tag_len); + WARN_ON(p != end); + + osd_req_op_cls_request_data_pages(req, which, pages, assert_op_buf_size, + 0, false, true); + return 0; +} +EXPORT_SYMBOL(ceph_cls_assert_locked); diff --git a/net/ceph/striper.c b/net/ceph/striper.c index c36462dc86b7..3b3fa75d1189 100644 --- a/net/ceph/striper.c +++ b/net/ceph/striper.c @@ -259,3 +259,20 @@ int ceph_extent_to_file(struct ceph_file_layout *l, return 0; } EXPORT_SYMBOL(ceph_extent_to_file); + +u64 ceph_get_num_objects(struct ceph_file_layout *l, u64 size) +{ + u64 period = (u64)l->stripe_count * l->object_size; + u64 num_periods = DIV64_U64_ROUND_UP(size, period); + u64 remainder_bytes; + u64 remainder_objs = 0; + + div64_u64_rem(size, period, &remainder_bytes); + if (remainder_bytes > 0 && + remainder_bytes < (u64)l->stripe_count * l->stripe_unit) + remainder_objs = l->stripe_count - + DIV_ROUND_UP_ULL(remainder_bytes, l->stripe_unit); + + return num_periods * l->stripe_count - remainder_objs; +} +EXPORT_SYMBOL(ceph_get_num_objects); -- cgit v1.2.3-55-g7522