From d1109aa56c71e19fc117e75bff11384fc7279a3b Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Mon, 13 Aug 2018 15:48:42 -0400 Subject: SUNRPC: Rename TCP receive-specific state variables Since we will want to introduce similar TCP state variables for the transmission of requests, let's rename the existing ones to label that they are for the receive side. Signed-off-by: Trond Myklebust --- net/sunrpc/xprtsock.c | 178 +++++++++++++++++++++++++------------------------- 1 file changed, 89 insertions(+), 89 deletions(-) (limited to 'net/sunrpc/xprtsock.c') diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 6b7539c0466e..cd7d093721ae 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -1169,42 +1169,42 @@ static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_rea size_t len, used; char *p; - p = ((char *) &transport->tcp_fraghdr) + transport->tcp_offset; - len = sizeof(transport->tcp_fraghdr) - transport->tcp_offset; + p = ((char *) &transport->recv.fraghdr) + transport->recv.offset; + len = sizeof(transport->recv.fraghdr) - transport->recv.offset; used = xdr_skb_read_bits(desc, p, len); - transport->tcp_offset += used; + transport->recv.offset += used; if (used != len) return; - transport->tcp_reclen = ntohl(transport->tcp_fraghdr); - if (transport->tcp_reclen & RPC_LAST_STREAM_FRAGMENT) - transport->tcp_flags |= TCP_RCV_LAST_FRAG; + transport->recv.len = ntohl(transport->recv.fraghdr); + if (transport->recv.len & RPC_LAST_STREAM_FRAGMENT) + transport->recv.flags |= TCP_RCV_LAST_FRAG; else - transport->tcp_flags &= ~TCP_RCV_LAST_FRAG; - transport->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK; + transport->recv.flags &= ~TCP_RCV_LAST_FRAG; + transport->recv.len &= RPC_FRAGMENT_SIZE_MASK; - transport->tcp_flags &= ~TCP_RCV_COPY_FRAGHDR; - transport->tcp_offset = 0; + transport->recv.flags &= ~TCP_RCV_COPY_FRAGHDR; + transport->recv.offset = 0; /* Sanity check of the record length */ - if (unlikely(transport->tcp_reclen < 8)) { + if (unlikely(transport->recv.len < 8)) { dprintk("RPC: invalid TCP record fragment length\n"); xs_tcp_force_close(xprt); return; } dprintk("RPC: reading TCP record fragment of length %d\n", - transport->tcp_reclen); + transport->recv.len); } static void xs_tcp_check_fraghdr(struct sock_xprt *transport) { - if (transport->tcp_offset == transport->tcp_reclen) { - transport->tcp_flags |= TCP_RCV_COPY_FRAGHDR; - transport->tcp_offset = 0; - if (transport->tcp_flags & TCP_RCV_LAST_FRAG) { - transport->tcp_flags &= ~TCP_RCV_COPY_DATA; - transport->tcp_flags |= TCP_RCV_COPY_XID; - transport->tcp_copied = 0; + if (transport->recv.offset == transport->recv.len) { + transport->recv.flags |= TCP_RCV_COPY_FRAGHDR; + transport->recv.offset = 0; + if (transport->recv.flags & TCP_RCV_LAST_FRAG) { + transport->recv.flags &= ~TCP_RCV_COPY_DATA; + transport->recv.flags |= TCP_RCV_COPY_XID; + transport->recv.copied = 0; } } } @@ -1214,20 +1214,20 @@ static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_r size_t len, used; char *p; - len = sizeof(transport->tcp_xid) - transport->tcp_offset; + len = sizeof(transport->recv.xid) - transport->recv.offset; dprintk("RPC: reading XID (%zu bytes)\n", len); - p = ((char *) &transport->tcp_xid) + transport->tcp_offset; + p = ((char *) &transport->recv.xid) + transport->recv.offset; used = xdr_skb_read_bits(desc, p, len); - transport->tcp_offset += used; + transport->recv.offset += used; if (used != len) return; - transport->tcp_flags &= ~TCP_RCV_COPY_XID; - transport->tcp_flags |= TCP_RCV_READ_CALLDIR; - transport->tcp_copied = 4; + transport->recv.flags &= ~TCP_RCV_COPY_XID; + transport->recv.flags |= TCP_RCV_READ_CALLDIR; + transport->recv.copied = 4; dprintk("RPC: reading %s XID %08x\n", - (transport->tcp_flags & TCP_RPC_REPLY) ? "reply for" + (transport->recv.flags & TCP_RPC_REPLY) ? "reply for" : "request with", - ntohl(transport->tcp_xid)); + ntohl(transport->recv.xid)); xs_tcp_check_fraghdr(transport); } @@ -1239,34 +1239,34 @@ static inline void xs_tcp_read_calldir(struct sock_xprt *transport, char *p; /* - * We want transport->tcp_offset to be 8 at the end of this routine + * We want transport->recv.offset to be 8 at the end of this routine * (4 bytes for the xid and 4 bytes for the call/reply flag). * When this function is called for the first time, - * transport->tcp_offset is 4 (after having already read the xid). + * transport->recv.offset is 4 (after having already read the xid). */ - offset = transport->tcp_offset - sizeof(transport->tcp_xid); - len = sizeof(transport->tcp_calldir) - offset; + offset = transport->recv.offset - sizeof(transport->recv.xid); + len = sizeof(transport->recv.calldir) - offset; dprintk("RPC: reading CALL/REPLY flag (%zu bytes)\n", len); - p = ((char *) &transport->tcp_calldir) + offset; + p = ((char *) &transport->recv.calldir) + offset; used = xdr_skb_read_bits(desc, p, len); - transport->tcp_offset += used; + transport->recv.offset += used; if (used != len) return; - transport->tcp_flags &= ~TCP_RCV_READ_CALLDIR; + transport->recv.flags &= ~TCP_RCV_READ_CALLDIR; /* * We don't yet have the XDR buffer, so we will write the calldir * out after we get the buffer from the 'struct rpc_rqst' */ - switch (ntohl(transport->tcp_calldir)) { + switch (ntohl(transport->recv.calldir)) { case RPC_REPLY: - transport->tcp_flags |= TCP_RCV_COPY_CALLDIR; - transport->tcp_flags |= TCP_RCV_COPY_DATA; - transport->tcp_flags |= TCP_RPC_REPLY; + transport->recv.flags |= TCP_RCV_COPY_CALLDIR; + transport->recv.flags |= TCP_RCV_COPY_DATA; + transport->recv.flags |= TCP_RPC_REPLY; break; case RPC_CALL: - transport->tcp_flags |= TCP_RCV_COPY_CALLDIR; - transport->tcp_flags |= TCP_RCV_COPY_DATA; - transport->tcp_flags &= ~TCP_RPC_REPLY; + transport->recv.flags |= TCP_RCV_COPY_CALLDIR; + transport->recv.flags |= TCP_RCV_COPY_DATA; + transport->recv.flags &= ~TCP_RPC_REPLY; break; default: dprintk("RPC: invalid request message type\n"); @@ -1287,21 +1287,21 @@ static inline void xs_tcp_read_common(struct rpc_xprt *xprt, rcvbuf = &req->rq_private_buf; - if (transport->tcp_flags & TCP_RCV_COPY_CALLDIR) { + if (transport->recv.flags & TCP_RCV_COPY_CALLDIR) { /* * Save the RPC direction in the XDR buffer */ - memcpy(rcvbuf->head[0].iov_base + transport->tcp_copied, - &transport->tcp_calldir, - sizeof(transport->tcp_calldir)); - transport->tcp_copied += sizeof(transport->tcp_calldir); - transport->tcp_flags &= ~TCP_RCV_COPY_CALLDIR; + memcpy(rcvbuf->head[0].iov_base + transport->recv.copied, + &transport->recv.calldir, + sizeof(transport->recv.calldir)); + transport->recv.copied += sizeof(transport->recv.calldir); + transport->recv.flags &= ~TCP_RCV_COPY_CALLDIR; } len = desc->count; - if (len > transport->tcp_reclen - transport->tcp_offset) - desc->count = transport->tcp_reclen - transport->tcp_offset; - r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied, + if (len > transport->recv.len - transport->recv.offset) + desc->count = transport->recv.len - transport->recv.offset; + r = xdr_partial_copy_from_skb(rcvbuf, transport->recv.copied, desc, xdr_skb_read_bits); if (desc->count) { @@ -1314,31 +1314,31 @@ static inline void xs_tcp_read_common(struct rpc_xprt *xprt, * Any remaining data from this record will * be discarded. */ - transport->tcp_flags &= ~TCP_RCV_COPY_DATA; + transport->recv.flags &= ~TCP_RCV_COPY_DATA; dprintk("RPC: XID %08x truncated request\n", - ntohl(transport->tcp_xid)); - dprintk("RPC: xprt = %p, tcp_copied = %lu, " - "tcp_offset = %u, tcp_reclen = %u\n", - xprt, transport->tcp_copied, - transport->tcp_offset, transport->tcp_reclen); + ntohl(transport->recv.xid)); + dprintk("RPC: xprt = %p, recv.copied = %lu, " + "recv.offset = %u, recv.len = %u\n", + xprt, transport->recv.copied, + transport->recv.offset, transport->recv.len); return; } - transport->tcp_copied += r; - transport->tcp_offset += r; + transport->recv.copied += r; + transport->recv.offset += r; desc->count = len - r; dprintk("RPC: XID %08x read %zd bytes\n", - ntohl(transport->tcp_xid), r); - dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, " - "tcp_reclen = %u\n", xprt, transport->tcp_copied, - transport->tcp_offset, transport->tcp_reclen); - - if (transport->tcp_copied == req->rq_private_buf.buflen) - transport->tcp_flags &= ~TCP_RCV_COPY_DATA; - else if (transport->tcp_offset == transport->tcp_reclen) { - if (transport->tcp_flags & TCP_RCV_LAST_FRAG) - transport->tcp_flags &= ~TCP_RCV_COPY_DATA; + ntohl(transport->recv.xid), r); + dprintk("RPC: xprt = %p, recv.copied = %lu, recv.offset = %u, " + "recv.len = %u\n", xprt, transport->recv.copied, + transport->recv.offset, transport->recv.len); + + if (transport->recv.copied == req->rq_private_buf.buflen) + transport->recv.flags &= ~TCP_RCV_COPY_DATA; + else if (transport->recv.offset == transport->recv.len) { + if (transport->recv.flags & TCP_RCV_LAST_FRAG) + transport->recv.flags &= ~TCP_RCV_COPY_DATA; } } @@ -1353,14 +1353,14 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt, container_of(xprt, struct sock_xprt, xprt); struct rpc_rqst *req; - dprintk("RPC: read reply XID %08x\n", ntohl(transport->tcp_xid)); + dprintk("RPC: read reply XID %08x\n", ntohl(transport->recv.xid)); /* Find and lock the request corresponding to this xid */ spin_lock(&xprt->recv_lock); - req = xprt_lookup_rqst(xprt, transport->tcp_xid); + req = xprt_lookup_rqst(xprt, transport->recv.xid); if (!req) { dprintk("RPC: XID %08x request not found!\n", - ntohl(transport->tcp_xid)); + ntohl(transport->recv.xid)); spin_unlock(&xprt->recv_lock); return -1; } @@ -1370,8 +1370,8 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt, xs_tcp_read_common(xprt, desc, req); spin_lock(&xprt->recv_lock); - if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) - xprt_complete_rqst(req->rq_task, transport->tcp_copied); + if (!(transport->recv.flags & TCP_RCV_COPY_DATA)) + xprt_complete_rqst(req->rq_task, transport->recv.copied); xprt_unpin_rqst(req); spin_unlock(&xprt->recv_lock); return 0; @@ -1393,7 +1393,7 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt, struct rpc_rqst *req; /* Look up the request corresponding to the given XID */ - req = xprt_lookup_bc_request(xprt, transport->tcp_xid); + req = xprt_lookup_bc_request(xprt, transport->recv.xid); if (req == NULL) { printk(KERN_WARNING "Callback slot table overflowed\n"); xprt_force_disconnect(xprt); @@ -1403,8 +1403,8 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt, dprintk("RPC: read callback XID %08x\n", ntohl(req->rq_xid)); xs_tcp_read_common(xprt, desc, req); - if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) - xprt_complete_bc_request(req, transport->tcp_copied); + if (!(transport->recv.flags & TCP_RCV_COPY_DATA)) + xprt_complete_bc_request(req, transport->recv.copied); return 0; } @@ -1415,7 +1415,7 @@ static inline int _xs_tcp_read_data(struct rpc_xprt *xprt, struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); - return (transport->tcp_flags & TCP_RPC_REPLY) ? + return (transport->recv.flags & TCP_RPC_REPLY) ? xs_tcp_read_reply(xprt, desc) : xs_tcp_read_callback(xprt, desc); } @@ -1458,9 +1458,9 @@ static void xs_tcp_read_data(struct rpc_xprt *xprt, else { /* * The transport_lock protects the request handling. - * There's no need to hold it to update the tcp_flags. + * There's no need to hold it to update the recv.flags. */ - transport->tcp_flags &= ~TCP_RCV_COPY_DATA; + transport->recv.flags &= ~TCP_RCV_COPY_DATA; } } @@ -1468,12 +1468,12 @@ static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_s { size_t len; - len = transport->tcp_reclen - transport->tcp_offset; + len = transport->recv.len - transport->recv.offset; if (len > desc->count) len = desc->count; desc->count -= len; desc->offset += len; - transport->tcp_offset += len; + transport->recv.offset += len; dprintk("RPC: discarded %zu bytes\n", len); xs_tcp_check_fraghdr(transport); } @@ -1494,22 +1494,22 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns trace_xs_tcp_data_recv(transport); /* Read in a new fragment marker if necessary */ /* Can we ever really expect to get completely empty fragments? */ - if (transport->tcp_flags & TCP_RCV_COPY_FRAGHDR) { + if (transport->recv.flags & TCP_RCV_COPY_FRAGHDR) { xs_tcp_read_fraghdr(xprt, &desc); continue; } /* Read in the xid if necessary */ - if (transport->tcp_flags & TCP_RCV_COPY_XID) { + if (transport->recv.flags & TCP_RCV_COPY_XID) { xs_tcp_read_xid(transport, &desc); continue; } /* Read in the call/reply flag */ - if (transport->tcp_flags & TCP_RCV_READ_CALLDIR) { + if (transport->recv.flags & TCP_RCV_READ_CALLDIR) { xs_tcp_read_calldir(transport, &desc); continue; } /* Read in the request data */ - if (transport->tcp_flags & TCP_RCV_COPY_DATA) { + if (transport->recv.flags & TCP_RCV_COPY_DATA) { xs_tcp_read_data(xprt, &desc); continue; } @@ -1602,10 +1602,10 @@ static void xs_tcp_state_change(struct sock *sk) if (!xprt_test_and_set_connected(xprt)) { /* Reset TCP record info */ - transport->tcp_offset = 0; - transport->tcp_reclen = 0; - transport->tcp_copied = 0; - transport->tcp_flags = + transport->recv.offset = 0; + transport->recv.len = 0; + transport->recv.copied = 0; + transport->recv.flags = TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID; xprt->connect_cookie++; clear_bit(XPRT_SOCK_CONNECTING, &transport->sock_state); -- cgit v1.2.3-55-g7522 From e1806c7bfb803408df4dc53dfe502ffab0f46a67 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Mon, 13 Aug 2018 16:50:49 -0400 Subject: SUNRPC: Move reset of TCP state variables into the reconnect code Rather than resetting state variables in socket state_change() callback, do it in the sunrpc TCP connect function itself. Signed-off-by: Trond Myklebust --- net/sunrpc/xprtsock.c | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) (limited to 'net/sunrpc/xprtsock.c') diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index cd7d093721ae..ec1e3f93e707 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -1600,13 +1600,6 @@ static void xs_tcp_state_change(struct sock *sk) case TCP_ESTABLISHED: spin_lock(&xprt->transport_lock); if (!xprt_test_and_set_connected(xprt)) { - - /* Reset TCP record info */ - transport->recv.offset = 0; - transport->recv.len = 0; - transport->recv.copied = 0; - transport->recv.flags = - TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID; xprt->connect_cookie++; clear_bit(XPRT_SOCK_CONNECTING, &transport->sock_state); xprt_clear_connecting(xprt); @@ -2386,6 +2379,12 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) xs_set_memalloc(xprt); + /* Reset TCP record info */ + transport->recv.offset = 0; + transport->recv.len = 0; + transport->recv.copied = 0; + transport->recv.flags = TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID; + /* Tell the socket layer to start connecting... */ xprt->stat.connect_count++; xprt->stat.connect_start = jiffies; -- cgit v1.2.3-55-g7522 From 6c7a64e5a44dbc6d073b83a56a48d0a4099f1dd2 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Mon, 13 Aug 2018 16:54:57 -0400 Subject: SUNRPC: Add socket transmit queue offset tracking Signed-off-by: Trond Myklebust --- include/linux/sunrpc/xprtsock.h | 7 +++++++ net/sunrpc/xprtsock.c | 40 ++++++++++++++++++++++------------------ 2 files changed, 29 insertions(+), 18 deletions(-) (limited to 'net/sunrpc/xprtsock.c') diff --git a/include/linux/sunrpc/xprtsock.h b/include/linux/sunrpc/xprtsock.h index 90d5ca8e65f4..005cfb6e7238 100644 --- a/include/linux/sunrpc/xprtsock.h +++ b/include/linux/sunrpc/xprtsock.h @@ -42,6 +42,13 @@ struct sock_xprt { flags; } recv; + /* + * State of TCP transmit queue + */ + struct { + u32 offset; + } xmit; + /* * Connection of transports */ diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index ec1e3f93e707..629cc45e1e6c 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -461,7 +461,7 @@ static int xs_nospace(struct rpc_task *task) int ret = -EAGAIN; dprintk("RPC: %5u xmit incomplete (%u left of %u)\n", - task->tk_pid, req->rq_slen - req->rq_bytes_sent, + task->tk_pid, req->rq_slen - transport->xmit.offset, req->rq_slen); /* Protect against races with write_space */ @@ -528,19 +528,22 @@ static int xs_local_send_request(struct rpc_task *task) req->rq_svec->iov_base, req->rq_svec->iov_len); req->rq_xtime = ktime_get(); - status = xs_sendpages(transport->sock, NULL, 0, xdr, req->rq_bytes_sent, + status = xs_sendpages(transport->sock, NULL, 0, xdr, + transport->xmit.offset, true, &sent); dprintk("RPC: %s(%u) = %d\n", - __func__, xdr->len - req->rq_bytes_sent, status); + __func__, xdr->len - transport->xmit.offset, status); if (status == -EAGAIN && sock_writeable(transport->inet)) status = -ENOBUFS; if (likely(sent > 0) || status == 0) { - req->rq_bytes_sent += sent; - req->rq_xmit_bytes_sent += sent; + transport->xmit.offset += sent; + req->rq_bytes_sent = transport->xmit.offset; if (likely(req->rq_bytes_sent >= req->rq_slen)) { + req->rq_xmit_bytes_sent += transport->xmit.offset; req->rq_bytes_sent = 0; + transport->xmit.offset = 0; return 0; } status = -EAGAIN; @@ -592,10 +595,10 @@ static int xs_udp_send_request(struct rpc_task *task) return -ENOTCONN; req->rq_xtime = ktime_get(); status = xs_sendpages(transport->sock, xs_addr(xprt), xprt->addrlen, - xdr, req->rq_bytes_sent, true, &sent); + xdr, 0, true, &sent); dprintk("RPC: xs_udp_send_request(%u) = %d\n", - xdr->len - req->rq_bytes_sent, status); + xdr->len, status); /* firewall is blocking us, don't return -EAGAIN or we end up looping */ if (status == -EPERM) @@ -684,17 +687,20 @@ static int xs_tcp_send_request(struct rpc_task *task) while (1) { sent = 0; status = xs_sendpages(transport->sock, NULL, 0, xdr, - req->rq_bytes_sent, zerocopy, &sent); + transport->xmit.offset, + zerocopy, &sent); dprintk("RPC: xs_tcp_send_request(%u) = %d\n", - xdr->len - req->rq_bytes_sent, status); + xdr->len - transport->xmit.offset, status); /* If we've sent the entire packet, immediately * reset the count of bytes sent. */ - req->rq_bytes_sent += sent; - req->rq_xmit_bytes_sent += sent; + transport->xmit.offset += sent; + req->rq_bytes_sent = transport->xmit.offset; if (likely(req->rq_bytes_sent >= req->rq_slen)) { + req->rq_xmit_bytes_sent += transport->xmit.offset; req->rq_bytes_sent = 0; + transport->xmit.offset = 0; return 0; } @@ -760,18 +766,13 @@ static int xs_tcp_send_request(struct rpc_task *task) */ static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) { - struct rpc_rqst *req; + struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); if (task != xprt->snd_task) return; if (task == NULL) goto out_release; - req = task->tk_rqstp; - if (req == NULL) - goto out_release; - if (req->rq_bytes_sent == 0) - goto out_release; - if (req->rq_bytes_sent == req->rq_snd_buf.len) + if (transport->xmit.offset == 0 || !xprt_connected(xprt)) goto out_release; set_bit(XPRT_CLOSE_WAIT, &xprt->state); out_release: @@ -2021,6 +2022,8 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt, write_unlock_bh(&sk->sk_callback_lock); } + transport->xmit.offset = 0; + /* Tell the socket layer to start connecting... */ xprt->stat.connect_count++; xprt->stat.connect_start = jiffies; @@ -2384,6 +2387,7 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) transport->recv.len = 0; transport->recv.copied = 0; transport->recv.flags = TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID; + transport->xmit.offset = 0; /* Tell the socket layer to start connecting... */ xprt->stat.connect_count++; -- cgit v1.2.3-55-g7522 From 4cd34e7c2e412e3db2f6bf7371581ab60591174b Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Fri, 31 Aug 2018 10:00:02 -0400 Subject: SUNRPC: Simplify dealing with aborted partially transmitted messages If the previous message was only partially transmitted, we need to close the socket in order to avoid corruption of the message stream. To do so, we currently hijack the unlocking of the socket in order to schedule the close. Now that we track the message offset in the socket state, we can move that kind of checking out of the socket lock code, which is needed to allow messages to remain queued after dropping the socket lock. Signed-off-by: Trond Myklebust --- net/sunrpc/xprtsock.c | 51 +++++++++++++++++++++++++-------------------------- 1 file changed, 25 insertions(+), 26 deletions(-) (limited to 'net/sunrpc/xprtsock.c') diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 629cc45e1e6c..3fbccebd0b10 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -491,6 +491,16 @@ static int xs_nospace(struct rpc_task *task) return ret; } +/* + * Determine if the previous message in the stream was aborted before it + * could complete transmission. + */ +static bool +xs_send_request_was_aborted(struct sock_xprt *transport, struct rpc_rqst *req) +{ + return transport->xmit.offset != 0 && req->rq_bytes_sent == 0; +} + /* * Construct a stream transport record marker in @buf. */ @@ -522,6 +532,12 @@ static int xs_local_send_request(struct rpc_task *task) int status; int sent = 0; + /* Close the stream if the previous transmission was incomplete */ + if (xs_send_request_was_aborted(transport, req)) { + xs_close(xprt); + return -ENOTCONN; + } + xs_encode_stream_record_marker(&req->rq_snd_buf); xs_pktdump("packet data:", @@ -665,6 +681,13 @@ static int xs_tcp_send_request(struct rpc_task *task) int status; int sent; + /* Close the stream if the previous transmission was incomplete */ + if (xs_send_request_was_aborted(transport, req)) { + if (transport->sock != NULL) + kernel_sock_shutdown(transport->sock, SHUT_RDWR); + return -ENOTCONN; + } + xs_encode_stream_record_marker(&req->rq_snd_buf); xs_pktdump("packet data:", @@ -755,30 +778,6 @@ static int xs_tcp_send_request(struct rpc_task *task) return status; } -/** - * xs_tcp_release_xprt - clean up after a tcp transmission - * @xprt: transport - * @task: rpc task - * - * This cleans up if an error causes us to abort the transmission of a request. - * In this case, the socket may need to be reset in order to avoid confusing - * the server. - */ -static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) -{ - struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); - - if (task != xprt->snd_task) - return; - if (task == NULL) - goto out_release; - if (transport->xmit.offset == 0 || !xprt_connected(xprt)) - goto out_release; - set_bit(XPRT_CLOSE_WAIT, &xprt->state); -out_release: - xprt_release_xprt(xprt, task); -} - static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk) { transport->old_data_ready = sk->sk_data_ready; @@ -2764,7 +2763,7 @@ static void bc_destroy(struct rpc_xprt *xprt) static const struct rpc_xprt_ops xs_local_ops = { .reserve_xprt = xprt_reserve_xprt, - .release_xprt = xs_tcp_release_xprt, + .release_xprt = xprt_release_xprt, .alloc_slot = xprt_alloc_slot, .free_slot = xprt_free_slot, .rpcbind = xs_local_rpcbind, @@ -2806,7 +2805,7 @@ static const struct rpc_xprt_ops xs_udp_ops = { static const struct rpc_xprt_ops xs_tcp_ops = { .reserve_xprt = xprt_reserve_xprt, - .release_xprt = xs_tcp_release_xprt, + .release_xprt = xprt_release_xprt, .alloc_slot = xprt_lock_and_alloc_slot, .free_slot = xprt_free_slot, .rpcbind = rpcb_getport_async, -- cgit v1.2.3-55-g7522 From 75c84151a9dc7a755c607e6761d8f14a1690dbf0 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Fri, 31 Aug 2018 10:21:00 -0400 Subject: SUNRPC: Rename xprt->recv_lock to xprt->queue_lock We will use the same lock to protect both the transmit and receive queues. Signed-off-by: Trond Myklebust --- include/linux/sunrpc/xprt.h | 2 +- net/sunrpc/svcsock.c | 6 +++--- net/sunrpc/xprt.c | 24 ++++++++++++------------ net/sunrpc/xprtrdma/rpc_rdma.c | 10 +++++----- net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 4 ++-- net/sunrpc/xprtsock.c | 30 +++++++++++++++--------------- 6 files changed, 38 insertions(+), 38 deletions(-) (limited to 'net/sunrpc/xprtsock.c') diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h index bd743c51a865..c25d0a5fda69 100644 --- a/include/linux/sunrpc/xprt.h +++ b/include/linux/sunrpc/xprt.h @@ -235,7 +235,7 @@ struct rpc_xprt { */ spinlock_t transport_lock; /* lock transport info */ spinlock_t reserve_lock; /* lock slot table */ - spinlock_t recv_lock; /* lock receive list */ + spinlock_t queue_lock; /* send/receive queue lock */ u32 xid; /* Next XID value to use */ struct rpc_task * snd_task; /* Task blocked in send */ struct svc_xprt *bc_xprt; /* NFSv4.1 backchannel */ diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c index 5445145e639c..db8bb6b3a2b0 100644 --- a/net/sunrpc/svcsock.c +++ b/net/sunrpc/svcsock.c @@ -1004,7 +1004,7 @@ static int receive_cb_reply(struct svc_sock *svsk, struct svc_rqst *rqstp) if (!bc_xprt) return -EAGAIN; - spin_lock(&bc_xprt->recv_lock); + spin_lock(&bc_xprt->queue_lock); req = xprt_lookup_rqst(bc_xprt, xid); if (!req) goto unlock_notfound; @@ -1022,7 +1022,7 @@ static int receive_cb_reply(struct svc_sock *svsk, struct svc_rqst *rqstp) memcpy(dst->iov_base, src->iov_base, src->iov_len); xprt_complete_rqst(req->rq_task, rqstp->rq_arg.len); rqstp->rq_arg.len = 0; - spin_unlock(&bc_xprt->recv_lock); + spin_unlock(&bc_xprt->queue_lock); return 0; unlock_notfound: printk(KERN_NOTICE @@ -1031,7 +1031,7 @@ unlock_notfound: __func__, ntohl(calldir), bc_xprt, ntohl(xid)); unlock_eagain: - spin_unlock(&bc_xprt->recv_lock); + spin_unlock(&bc_xprt->queue_lock); return -EAGAIN; } diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 3a3b3445a7c0..6e3d4b4ee79e 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -826,7 +826,7 @@ static void xprt_connect_status(struct rpc_task *task) * @xprt: transport on which the original request was transmitted * @xid: RPC XID of incoming reply * - * Caller holds xprt->recv_lock. + * Caller holds xprt->queue_lock. */ struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid) { @@ -892,7 +892,7 @@ static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req) * xprt_update_rtt - Update RPC RTT statistics * @task: RPC request that recently completed * - * Caller holds xprt->recv_lock. + * Caller holds xprt->queue_lock. */ void xprt_update_rtt(struct rpc_task *task) { @@ -914,7 +914,7 @@ EXPORT_SYMBOL_GPL(xprt_update_rtt); * @task: RPC request that recently completed * @copied: actual number of bytes received from the transport * - * Caller holds xprt->recv_lock. + * Caller holds xprt->queue_lock. */ void xprt_complete_rqst(struct rpc_task *task, int copied) { @@ -1034,10 +1034,10 @@ void xprt_transmit(struct rpc_task *task) memcpy(&req->rq_private_buf, &req->rq_rcv_buf, sizeof(req->rq_private_buf)); /* Add request to the receive list */ - spin_lock(&xprt->recv_lock); + spin_lock(&xprt->queue_lock); list_add_tail(&req->rq_list, &xprt->recv); set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate); - spin_unlock(&xprt->recv_lock); + spin_unlock(&xprt->queue_lock); xprt_reset_majortimeo(req); /* Turn off autodisconnect */ del_singleshot_timer_sync(&xprt->timer); @@ -1076,7 +1076,7 @@ void xprt_transmit(struct rpc_task *task) * The spinlock ensures atomicity between the test of * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on(). */ - spin_lock(&xprt->recv_lock); + spin_lock(&xprt->queue_lock); if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) { rpc_sleep_on(&xprt->pending, task, xprt_timer); /* Wake up immediately if the connection was dropped */ @@ -1084,7 +1084,7 @@ void xprt_transmit(struct rpc_task *task) rpc_wake_up_queued_task_set_status(&xprt->pending, task, -ENOTCONN); } - spin_unlock(&xprt->recv_lock); + spin_unlock(&xprt->queue_lock); } } @@ -1379,18 +1379,18 @@ void xprt_release(struct rpc_task *task) task->tk_ops->rpc_count_stats(task, task->tk_calldata); else if (task->tk_client) rpc_count_iostats(task, task->tk_client->cl_metrics); - spin_lock(&xprt->recv_lock); + spin_lock(&xprt->queue_lock); if (!list_empty(&req->rq_list)) { list_del_init(&req->rq_list); if (xprt_is_pinned_rqst(req)) { set_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate); - spin_unlock(&xprt->recv_lock); + spin_unlock(&xprt->queue_lock); xprt_wait_on_pinned_rqst(req); - spin_lock(&xprt->recv_lock); + spin_lock(&xprt->queue_lock); clear_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate); } } - spin_unlock(&xprt->recv_lock); + spin_unlock(&xprt->queue_lock); spin_lock_bh(&xprt->transport_lock); xprt->ops->release_xprt(xprt, task); if (xprt->ops->release_request) @@ -1420,7 +1420,7 @@ static void xprt_init(struct rpc_xprt *xprt, struct net *net) spin_lock_init(&xprt->transport_lock); spin_lock_init(&xprt->reserve_lock); - spin_lock_init(&xprt->recv_lock); + spin_lock_init(&xprt->queue_lock); INIT_LIST_HEAD(&xprt->free); INIT_LIST_HEAD(&xprt->recv); diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index c8ae983c6cc0..0020dc401215 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -1238,7 +1238,7 @@ void rpcrdma_complete_rqst(struct rpcrdma_rep *rep) goto out_badheader; out: - spin_lock(&xprt->recv_lock); + spin_lock(&xprt->queue_lock); cwnd = xprt->cwnd; xprt->cwnd = r_xprt->rx_buf.rb_credits << RPC_CWNDSHIFT; if (xprt->cwnd > cwnd) @@ -1246,7 +1246,7 @@ out: xprt_complete_rqst(rqst->rq_task, status); xprt_unpin_rqst(rqst); - spin_unlock(&xprt->recv_lock); + spin_unlock(&xprt->queue_lock); return; /* If the incoming reply terminated a pending RPC, the next @@ -1345,7 +1345,7 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep) /* Match incoming rpcrdma_rep to an rpcrdma_req to * get context for handling any incoming chunks. */ - spin_lock(&xprt->recv_lock); + spin_lock(&xprt->queue_lock); rqst = xprt_lookup_rqst(xprt, rep->rr_xid); if (!rqst) goto out_norqst; @@ -1357,7 +1357,7 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep) credits = buf->rb_max_requests; buf->rb_credits = credits; - spin_unlock(&xprt->recv_lock); + spin_unlock(&xprt->queue_lock); req = rpcr_to_rdmar(rqst); req->rl_reply = rep; @@ -1378,7 +1378,7 @@ out_badversion: * is corrupt. */ out_norqst: - spin_unlock(&xprt->recv_lock); + spin_unlock(&xprt->queue_lock); trace_xprtrdma_reply_rqst(rep); goto repost; diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c index a68180090554..09b12b7568fe 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c +++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c @@ -56,7 +56,7 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp, if (src->iov_len < 24) goto out_shortreply; - spin_lock(&xprt->recv_lock); + spin_lock(&xprt->queue_lock); req = xprt_lookup_rqst(xprt, xid); if (!req) goto out_notfound; @@ -86,7 +86,7 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp, rcvbuf->len = 0; out_unlock: - spin_unlock(&xprt->recv_lock); + spin_unlock(&xprt->queue_lock); out: return ret; diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 3fbccebd0b10..8d6404259ff9 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -966,12 +966,12 @@ static void xs_local_data_read_skb(struct rpc_xprt *xprt, return; /* Look up and lock the request corresponding to the given XID */ - spin_lock(&xprt->recv_lock); + spin_lock(&xprt->queue_lock); rovr = xprt_lookup_rqst(xprt, *xp); if (!rovr) goto out_unlock; xprt_pin_rqst(rovr); - spin_unlock(&xprt->recv_lock); + spin_unlock(&xprt->queue_lock); task = rovr->rq_task; copied = rovr->rq_private_buf.buflen; @@ -980,16 +980,16 @@ static void xs_local_data_read_skb(struct rpc_xprt *xprt, if (xs_local_copy_to_xdr(&rovr->rq_private_buf, skb)) { dprintk("RPC: sk_buff copy failed\n"); - spin_lock(&xprt->recv_lock); + spin_lock(&xprt->queue_lock); goto out_unpin; } - spin_lock(&xprt->recv_lock); + spin_lock(&xprt->queue_lock); xprt_complete_rqst(task, copied); out_unpin: xprt_unpin_rqst(rovr); out_unlock: - spin_unlock(&xprt->recv_lock); + spin_unlock(&xprt->queue_lock); } static void xs_local_data_receive(struct sock_xprt *transport) @@ -1058,13 +1058,13 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt, return; /* Look up and lock the request corresponding to the given XID */ - spin_lock(&xprt->recv_lock); + spin_lock(&xprt->queue_lock); rovr = xprt_lookup_rqst(xprt, *xp); if (!rovr) goto out_unlock; xprt_pin_rqst(rovr); xprt_update_rtt(rovr->rq_task); - spin_unlock(&xprt->recv_lock); + spin_unlock(&xprt->queue_lock); task = rovr->rq_task; if ((copied = rovr->rq_private_buf.buflen) > repsize) @@ -1072,7 +1072,7 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt, /* Suck it into the iovec, verify checksum if not done by hw. */ if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) { - spin_lock(&xprt->recv_lock); + spin_lock(&xprt->queue_lock); __UDPX_INC_STATS(sk, UDP_MIB_INERRORS); goto out_unpin; } @@ -1081,13 +1081,13 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt, spin_lock_bh(&xprt->transport_lock); xprt_adjust_cwnd(xprt, task, copied); spin_unlock_bh(&xprt->transport_lock); - spin_lock(&xprt->recv_lock); + spin_lock(&xprt->queue_lock); xprt_complete_rqst(task, copied); __UDPX_INC_STATS(sk, UDP_MIB_INDATAGRAMS); out_unpin: xprt_unpin_rqst(rovr); out_unlock: - spin_unlock(&xprt->recv_lock); + spin_unlock(&xprt->queue_lock); } static void xs_udp_data_receive(struct sock_xprt *transport) @@ -1356,24 +1356,24 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt, dprintk("RPC: read reply XID %08x\n", ntohl(transport->recv.xid)); /* Find and lock the request corresponding to this xid */ - spin_lock(&xprt->recv_lock); + spin_lock(&xprt->queue_lock); req = xprt_lookup_rqst(xprt, transport->recv.xid); if (!req) { dprintk("RPC: XID %08x request not found!\n", ntohl(transport->recv.xid)); - spin_unlock(&xprt->recv_lock); + spin_unlock(&xprt->queue_lock); return -1; } xprt_pin_rqst(req); - spin_unlock(&xprt->recv_lock); + spin_unlock(&xprt->queue_lock); xs_tcp_read_common(xprt, desc, req); - spin_lock(&xprt->recv_lock); + spin_lock(&xprt->queue_lock); if (!(transport->recv.flags & TCP_RCV_COPY_DATA)) xprt_complete_rqst(req->rq_task, transport->recv.copied); xprt_unpin_rqst(req); - spin_unlock(&xprt->recv_lock); + spin_unlock(&xprt->queue_lock); return 0; } -- cgit v1.2.3-55-g7522 From 50f484e298218b7271fad8a23bd44c82fb3110e1 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Thu, 30 Aug 2018 13:27:29 -0400 Subject: SUNRPC: Treat the task and request as separate in the xprt_ops->send_request() When we shift to using the transmit queue, then the task that holds the write lock will not necessarily be the same as the one being transmitted. Signed-off-by: Trond Myklebust --- include/linux/sunrpc/xprt.h | 2 +- net/sunrpc/xprt.c | 2 +- net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 3 +-- net/sunrpc/xprtrdma/transport.c | 5 ++--- net/sunrpc/xprtsock.c | 27 +++++++++++++-------------- 5 files changed, 18 insertions(+), 21 deletions(-) (limited to 'net/sunrpc/xprtsock.c') diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h index b8a7de161f67..8c2bb078f00c 100644 --- a/include/linux/sunrpc/xprt.h +++ b/include/linux/sunrpc/xprt.h @@ -140,7 +140,7 @@ struct rpc_xprt_ops { void (*connect)(struct rpc_xprt *xprt, struct rpc_task *task); int (*buf_alloc)(struct rpc_task *task); void (*buf_free)(struct rpc_task *task); - int (*send_request)(struct rpc_task *task); + int (*send_request)(struct rpc_rqst *req, struct rpc_task *task); void (*set_retrans_timeout)(struct rpc_task *task); void (*timer)(struct rpc_xprt *xprt, struct rpc_task *task); void (*release_request)(struct rpc_task *task); diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index f5be739492d4..6e735dd1fde0 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -1191,7 +1191,7 @@ void xprt_transmit(struct rpc_task *task) } connect_cookie = xprt->connect_cookie; - status = xprt->ops->send_request(task); + status = xprt->ops->send_request(req, task); trace_xprt_transmit(xprt, req->rq_xid, status); if (status != 0) { task->tk_status = status; diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c index 09b12b7568fe..d1618c70edb4 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c +++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c @@ -215,9 +215,8 @@ drop_connection: * connection. */ static int -xprt_rdma_bc_send_request(struct rpc_task *task) +xprt_rdma_bc_send_request(struct rpc_rqst *rqst, struct rpc_task *task) { - struct rpc_rqst *rqst = task->tk_rqstp; struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt; struct svcxprt_rdma *rdma; int ret; diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index 143ce2579ba9..fa684bf4d090 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -706,9 +706,8 @@ xprt_rdma_free(struct rpc_task *task) * sent. Do not try to send this message again. */ static int -xprt_rdma_send_request(struct rpc_task *task) +xprt_rdma_send_request(struct rpc_rqst *rqst, struct rpc_task *task) { - struct rpc_rqst *rqst = task->tk_rqstp; struct rpc_xprt *xprt = rqst->rq_xprt; struct rpcrdma_req *req = rpcr_to_rdmar(rqst); struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); @@ -741,7 +740,7 @@ xprt_rdma_send_request(struct rpc_task *task) /* An RPC with no reply will throw off credit accounting, * so drop the connection to reset the credit grant. */ - if (!rpc_reply_expected(task)) + if (!rpc_reply_expected(rqst->rq_task)) goto drop_connection; return 0; diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 8d6404259ff9..b8143eded4af 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -449,12 +449,12 @@ static void xs_nospace_callback(struct rpc_task *task) /** * xs_nospace - place task on wait queue if transmit was incomplete + * @req: pointer to RPC request * @task: task to put to sleep * */ -static int xs_nospace(struct rpc_task *task) +static int xs_nospace(struct rpc_rqst *req, struct rpc_task *task) { - struct rpc_rqst *req = task->tk_rqstp; struct rpc_xprt *xprt = req->rq_xprt; struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); struct sock *sk = transport->inet; @@ -513,6 +513,7 @@ static inline void xs_encode_stream_record_marker(struct xdr_buf *buf) /** * xs_local_send_request - write an RPC request to an AF_LOCAL socket + * @req: pointer to RPC request * @task: RPC task that manages the state of an RPC request * * Return values: @@ -522,9 +523,8 @@ static inline void xs_encode_stream_record_marker(struct xdr_buf *buf) * ENOTCONN: Caller needs to invoke connect logic then call again * other: Some other error occured, the request was not sent */ -static int xs_local_send_request(struct rpc_task *task) +static int xs_local_send_request(struct rpc_rqst *req, struct rpc_task *task) { - struct rpc_rqst *req = task->tk_rqstp; struct rpc_xprt *xprt = req->rq_xprt; struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); @@ -569,7 +569,7 @@ static int xs_local_send_request(struct rpc_task *task) case -ENOBUFS: break; case -EAGAIN: - status = xs_nospace(task); + status = xs_nospace(req, task); break; default: dprintk("RPC: sendmsg returned unrecognized error %d\n", @@ -585,6 +585,7 @@ static int xs_local_send_request(struct rpc_task *task) /** * xs_udp_send_request - write an RPC request to a UDP socket + * @req: pointer to RPC request * @task: address of RPC task that manages the state of an RPC request * * Return values: @@ -594,9 +595,8 @@ static int xs_local_send_request(struct rpc_task *task) * ENOTCONN: Caller needs to invoke connect logic then call again * other: Some other error occurred, the request was not sent */ -static int xs_udp_send_request(struct rpc_task *task) +static int xs_udp_send_request(struct rpc_rqst *req, struct rpc_task *task) { - struct rpc_rqst *req = task->tk_rqstp; struct rpc_xprt *xprt = req->rq_xprt; struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); struct xdr_buf *xdr = &req->rq_snd_buf; @@ -638,7 +638,7 @@ process_status: /* Should we call xs_close() here? */ break; case -EAGAIN: - status = xs_nospace(task); + status = xs_nospace(req, task); break; case -ENETUNREACH: case -ENOBUFS: @@ -658,6 +658,7 @@ process_status: /** * xs_tcp_send_request - write an RPC request to a TCP socket + * @req: pointer to RPC request * @task: address of RPC task that manages the state of an RPC request * * Return values: @@ -670,9 +671,8 @@ process_status: * XXX: In the case of soft timeouts, should we eventually give up * if sendmsg is not able to make progress? */ -static int xs_tcp_send_request(struct rpc_task *task) +static int xs_tcp_send_request(struct rpc_rqst *req, struct rpc_task *task) { - struct rpc_rqst *req = task->tk_rqstp; struct rpc_xprt *xprt = req->rq_xprt; struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); struct xdr_buf *xdr = &req->rq_snd_buf; @@ -697,7 +697,7 @@ static int xs_tcp_send_request(struct rpc_task *task) * completes while the socket holds a reference to the pages, * then we may end up resending corrupted data. */ - if (task->tk_flags & RPC_TASK_SENT) + if (req->rq_task->tk_flags & RPC_TASK_SENT) zerocopy = false; if (test_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state)) @@ -761,7 +761,7 @@ static int xs_tcp_send_request(struct rpc_task *task) /* Should we call xs_close() here? */ break; case -EAGAIN: - status = xs_nospace(task); + status = xs_nospace(req, task); break; case -ECONNRESET: case -ECONNREFUSED: @@ -2706,9 +2706,8 @@ static int bc_sendto(struct rpc_rqst *req) /* * The send routine. Borrows from svc_send */ -static int bc_send_request(struct rpc_task *task) +static int bc_send_request(struct rpc_rqst *req, struct rpc_task *task) { - struct rpc_rqst *req = task->tk_rqstp; struct svc_xprt *xprt; int len; -- cgit v1.2.3-55-g7522 From 75891f502f5fc70f52a01af5b924384ed4866907 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Mon, 3 Sep 2018 17:37:36 -0400 Subject: SUNRPC: Support for congestion control when queuing is enabled Both RDMA and UDP transports require the request to get a "congestion control" credit before they can be transmitted. Right now, this is done when the request locks the socket. We'd like it to happen when a request attempts to be transmitted for the first time. In order to support retransmission of requests that already hold such credits, we also want to ensure that they get queued first, so that we don't deadlock with requests that have yet to obtain a credit. Signed-off-by: Trond Myklebust --- include/linux/sunrpc/xprt.h | 2 + net/sunrpc/clnt.c | 5 ++ net/sunrpc/xprt.c | 128 +++++++++++++++++++++++++++----------- net/sunrpc/xprtrdma/backchannel.c | 3 + net/sunrpc/xprtrdma/transport.c | 3 + net/sunrpc/xprtsock.c | 4 ++ 6 files changed, 109 insertions(+), 36 deletions(-) (limited to 'net/sunrpc/xprtsock.c') diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h index e377620b9744..0d0cc127615e 100644 --- a/include/linux/sunrpc/xprt.h +++ b/include/linux/sunrpc/xprt.h @@ -397,6 +397,7 @@ void xprt_complete_rqst(struct rpc_task *task, int copied); void xprt_pin_rqst(struct rpc_rqst *req); void xprt_unpin_rqst(struct rpc_rqst *req); void xprt_release_rqst_cong(struct rpc_task *task); +bool xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req); void xprt_disconnect_done(struct rpc_xprt *xprt); void xprt_force_disconnect(struct rpc_xprt *xprt); void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie); @@ -415,6 +416,7 @@ void xprt_unlock_connect(struct rpc_xprt *, void *); #define XPRT_BINDING (5) #define XPRT_CLOSING (6) #define XPRT_CONGESTED (9) +#define XPRT_CWND_WAIT (10) static inline void xprt_set_connected(struct rpc_xprt *xprt) { diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index 8dc3d33827c4..f03911f84953 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -1996,6 +1996,11 @@ call_transmit_status(struct rpc_task *task) dprint_status(task); xprt_end_transmit(task); break; + case -EBADSLT: + xprt_end_transmit(task); + task->tk_action = call_transmit; + task->tk_status = 0; + break; case -EBADMSG: xprt_end_transmit(task); task->tk_status = 0; diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 44d0eeaddaac..b03355ae7b16 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -68,8 +68,6 @@ static void xprt_init(struct rpc_xprt *xprt, struct net *net); static __be32 xprt_alloc_xid(struct rpc_xprt *xprt); static void xprt_connect_status(struct rpc_task *task); -static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *); -static void __xprt_put_cong(struct rpc_xprt *, struct rpc_rqst *); static void xprt_destroy(struct rpc_xprt *xprt); static DEFINE_SPINLOCK(xprt_list_lock); @@ -221,6 +219,31 @@ static void xprt_clear_locked(struct rpc_xprt *xprt) queue_work(xprtiod_workqueue, &xprt->task_cleanup); } +static bool +xprt_need_congestion_window_wait(struct rpc_xprt *xprt) +{ + return test_bit(XPRT_CWND_WAIT, &xprt->state); +} + +static void +xprt_set_congestion_window_wait(struct rpc_xprt *xprt) +{ + if (!list_empty(&xprt->xmit_queue)) { + /* Peek at head of queue to see if it can make progress */ + if (list_first_entry(&xprt->xmit_queue, struct rpc_rqst, + rq_xmit)->rq_cong) + return; + } + set_bit(XPRT_CWND_WAIT, &xprt->state); +} + +static void +xprt_test_and_clear_congestion_window_wait(struct rpc_xprt *xprt) +{ + if (!RPCXPRT_CONGESTED(xprt)) + clear_bit(XPRT_CWND_WAIT, &xprt->state); +} + /* * xprt_reserve_xprt_cong - serialize write access to transports * @task: task that is requesting access to the transport @@ -228,6 +251,7 @@ static void xprt_clear_locked(struct rpc_xprt *xprt) * Same as xprt_reserve_xprt, but Van Jacobson congestion control is * integrated into the decision of whether a request is allowed to be * woken up and given access to the transport. + * Note that the lock is only granted if we know there are free slots. */ int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) { @@ -243,14 +267,12 @@ int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) xprt->snd_task = task; return 1; } - if (__xprt_get_cong(xprt, task)) { + if (!xprt_need_congestion_window_wait(xprt)) { xprt->snd_task = task; return 1; } xprt_clear_locked(xprt); out_sleep: - if (req) - __xprt_put_cong(xprt, req); dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt); task->tk_timeout = 0; task->tk_status = -EAGAIN; @@ -294,32 +316,14 @@ static void __xprt_lock_write_next(struct rpc_xprt *xprt) xprt_clear_locked(xprt); } -static bool __xprt_lock_write_cong_func(struct rpc_task *task, void *data) -{ - struct rpc_xprt *xprt = data; - struct rpc_rqst *req; - - req = task->tk_rqstp; - if (req == NULL) { - xprt->snd_task = task; - return true; - } - if (__xprt_get_cong(xprt, task)) { - xprt->snd_task = task; - req->rq_ntrans++; - return true; - } - return false; -} - static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt) { if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) return; - if (RPCXPRT_CONGESTED(xprt)) + if (xprt_need_congestion_window_wait(xprt)) goto out_unlock; if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, - __xprt_lock_write_cong_func, xprt)) + __xprt_lock_write_func, xprt)) return; out_unlock: xprt_clear_locked(xprt); @@ -370,16 +374,16 @@ static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *ta * overflowed. Put the task to sleep if this is the case. */ static int -__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task) +__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) { - struct rpc_rqst *req = task->tk_rqstp; - if (req->rq_cong) return 1; dprintk("RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n", - task->tk_pid, xprt->cong, xprt->cwnd); - if (RPCXPRT_CONGESTED(xprt)) + req->rq_task->tk_pid, xprt->cong, xprt->cwnd); + if (RPCXPRT_CONGESTED(xprt)) { + xprt_set_congestion_window_wait(xprt); return 0; + } req->rq_cong = 1; xprt->cong += RPC_CWNDSCALE; return 1; @@ -396,9 +400,31 @@ __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) return; req->rq_cong = 0; xprt->cong -= RPC_CWNDSCALE; + xprt_test_and_clear_congestion_window_wait(xprt); __xprt_lock_write_next_cong(xprt); } +/** + * xprt_request_get_cong - Request congestion control credits + * @xprt: pointer to transport + * @req: pointer to RPC request + * + * Useful for transports that require congestion control. + */ +bool +xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) +{ + bool ret = false; + + if (req->rq_cong) + return true; + spin_lock_bh(&xprt->transport_lock); + ret = __xprt_get_cong(xprt, req) != 0; + spin_unlock_bh(&xprt->transport_lock); + return ret; +} +EXPORT_SYMBOL_GPL(xprt_request_get_cong); + /** * xprt_release_rqst_cong - housekeeping when request is complete * @task: RPC request that recently completed @@ -413,6 +439,20 @@ void xprt_release_rqst_cong(struct rpc_task *task) } EXPORT_SYMBOL_GPL(xprt_release_rqst_cong); +/* + * Clear the congestion window wait flag and wake up the next + * entry on xprt->sending + */ +static void +xprt_clear_congestion_window_wait(struct rpc_xprt *xprt) +{ + if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) { + spin_lock_bh(&xprt->transport_lock); + __xprt_lock_write_next_cong(xprt); + spin_unlock_bh(&xprt->transport_lock); + } +} + /** * xprt_adjust_cwnd - adjust transport congestion window * @xprt: pointer to xprt @@ -1058,12 +1098,28 @@ xprt_request_enqueue_transmit(struct rpc_task *task) if (xprt_request_need_enqueue_transmit(task, req)) { spin_lock(&xprt->queue_lock); - list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { - if (pos->rq_task->tk_owner != task->tk_owner) - continue; - list_add_tail(&req->rq_xmit2, &pos->rq_xmit2); - INIT_LIST_HEAD(&req->rq_xmit); - goto out; + /* + * Requests that carry congestion control credits are added + * to the head of the list to avoid starvation issues. + */ + if (req->rq_cong) { + xprt_clear_congestion_window_wait(xprt); + list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { + if (pos->rq_cong) + continue; + /* Note: req is added _before_ pos */ + list_add_tail(&req->rq_xmit, &pos->rq_xmit); + INIT_LIST_HEAD(&req->rq_xmit2); + goto out; + } + } else { + list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { + if (pos->rq_task->tk_owner != task->tk_owner) + continue; + list_add_tail(&req->rq_xmit2, &pos->rq_xmit2); + INIT_LIST_HEAD(&req->rq_xmit); + goto out; + } } list_add_tail(&req->rq_xmit, &xprt->xmit_queue); INIT_LIST_HEAD(&req->rq_xmit2); diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c index ed58761e6b23..e7c445cee16f 100644 --- a/net/sunrpc/xprtrdma/backchannel.c +++ b/net/sunrpc/xprtrdma/backchannel.c @@ -200,6 +200,9 @@ int xprt_rdma_bc_send_reply(struct rpc_rqst *rqst) if (!xprt_connected(rqst->rq_xprt)) goto drop_connection; + if (!xprt_request_get_cong(rqst->rq_xprt, rqst)) + return -EBADSLT; + rc = rpcrdma_bc_marshal_reply(rqst); if (rc < 0) goto failed_marshal; diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index fa684bf4d090..9ff322e53f37 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -721,6 +721,9 @@ xprt_rdma_send_request(struct rpc_rqst *rqst, struct rpc_task *task) if (!xprt_connected(xprt)) goto drop_connection; + if (!xprt_request_get_cong(xprt, rqst)) + return -EBADSLT; + rc = rpcrdma_marshal_req(r_xprt, rqst); if (rc < 0) goto failed_marshal; diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index b8143eded4af..8831e84a058a 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -609,6 +609,10 @@ static int xs_udp_send_request(struct rpc_rqst *req, struct rpc_task *task) if (!xprt_bound(xprt)) return -ENOTCONN; + + if (!xprt_request_get_cong(xprt, req)) + return -EBADSLT; + req->rq_xtime = ktime_get(); status = xs_sendpages(transport->sock, xs_addr(xprt), xprt->addrlen, xdr, 0, true, &sent); -- cgit v1.2.3-55-g7522 From 36bd7de949f41d586ef7794169af75462b67acbc Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Mon, 3 Sep 2018 18:41:32 -0400 Subject: SUNRPC: Turn off throttling of RPC slots for TCP sockets The theory was that we would need to grab the socket lock anyway, so we might as well use it to gate the allocation of RPC slots for a TCP socket. Signed-off-by: Trond Myklebust --- include/linux/sunrpc/xprt.h | 1 - net/sunrpc/xprt.c | 14 -------------- net/sunrpc/xprtsock.c | 2 +- 3 files changed, 1 insertion(+), 16 deletions(-) (limited to 'net/sunrpc/xprtsock.c') diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h index 0d0cc127615e..14c9b4d49fb4 100644 --- a/include/linux/sunrpc/xprt.h +++ b/include/linux/sunrpc/xprt.h @@ -343,7 +343,6 @@ int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task); void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task); void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req); -void xprt_lock_and_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task); bool xprt_prepare_transmit(struct rpc_task *task); void xprt_request_enqueue_transmit(struct rpc_task *task); void xprt_request_enqueue_receive(struct rpc_task *task); diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 533df198a0e9..849e102e3c5a 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -1429,20 +1429,6 @@ out_init_req: } EXPORT_SYMBOL_GPL(xprt_alloc_slot); -void xprt_lock_and_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task) -{ - /* Note: grabbing the xprt_lock_write() ensures that we throttle - * new slot allocation if the transport is congested (i.e. when - * reconnecting a stream transport or when out of socket write - * buffer space). - */ - if (xprt_lock_write(xprt, task)) { - xprt_alloc_slot(xprt, task); - xprt_release_write(xprt, task); - } -} -EXPORT_SYMBOL_GPL(xprt_lock_and_alloc_slot); - void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) { spin_lock(&xprt->reserve_lock); diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 8831e84a058a..f54e8110f4c6 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -2809,7 +2809,7 @@ static const struct rpc_xprt_ops xs_udp_ops = { static const struct rpc_xprt_ops xs_tcp_ops = { .reserve_xprt = xprt_reserve_xprt, .release_xprt = xprt_release_xprt, - .alloc_slot = xprt_lock_and_alloc_slot, + .alloc_slot = xprt_alloc_slot, .free_slot = xprt_free_slot, .rpcbind = rpcb_getport_async, .set_port = xs_set_port, -- cgit v1.2.3-55-g7522 From c544577daddb618c7dd5fa7fb98d6a41782f020e Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Mon, 3 Sep 2018 23:39:27 -0400 Subject: SUNRPC: Clean up transport write space handling Treat socket write space handling in the same way we now treat transport congestion: by denying the XPRT_LOCK until the transport signals that it has free buffer space. Signed-off-by: Trond Myklebust --- include/linux/sunrpc/svc_xprt.h | 1 - include/linux/sunrpc/xprt.h | 5 +- net/sunrpc/clnt.c | 28 ++++------- net/sunrpc/svc_xprt.c | 2 - net/sunrpc/xprt.c | 77 ++++++++++++++++++------------ net/sunrpc/xprtrdma/rpc_rdma.c | 2 +- net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 7 +-- net/sunrpc/xprtsock.c | 33 +++++-------- 8 files changed, 73 insertions(+), 82 deletions(-) (limited to 'net/sunrpc/xprtsock.c') diff --git a/include/linux/sunrpc/svc_xprt.h b/include/linux/sunrpc/svc_xprt.h index c3d72066d4b1..6b7a86c4d6e6 100644 --- a/include/linux/sunrpc/svc_xprt.h +++ b/include/linux/sunrpc/svc_xprt.h @@ -84,7 +84,6 @@ struct svc_xprt { struct sockaddr_storage xpt_remote; /* remote peer's address */ size_t xpt_remotelen; /* length of address */ char xpt_remotebuf[INET6_ADDRSTRLEN + 10]; - struct rpc_wait_queue xpt_bc_pending; /* backchannel wait queue */ struct list_head xpt_users; /* callbacks on free */ struct net *xpt_net; diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h index 14c9b4d49fb4..5600242ccbf9 100644 --- a/include/linux/sunrpc/xprt.h +++ b/include/linux/sunrpc/xprt.h @@ -387,8 +387,8 @@ int xprt_load_transport(const char *); void xprt_set_retrans_timeout_def(struct rpc_task *task); void xprt_set_retrans_timeout_rtt(struct rpc_task *task); void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status); -void xprt_wait_for_buffer_space(struct rpc_task *task, rpc_action action); -void xprt_write_space(struct rpc_xprt *xprt); +void xprt_wait_for_buffer_space(struct rpc_xprt *xprt); +bool xprt_write_space(struct rpc_xprt *xprt); void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result); struct rpc_rqst * xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid); void xprt_update_rtt(struct rpc_task *task); @@ -416,6 +416,7 @@ void xprt_unlock_connect(struct rpc_xprt *, void *); #define XPRT_CLOSING (6) #define XPRT_CONGESTED (9) #define XPRT_CWND_WAIT (10) +#define XPRT_WRITE_SPACE (11) static inline void xprt_set_connected(struct rpc_xprt *xprt) { diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index f03911f84953..0c4b2e7d791f 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -1964,13 +1964,14 @@ call_transmit(struct rpc_task *task) { dprint_status(task); + task->tk_status = 0; + if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) { + if (!xprt_prepare_transmit(task)) + return; + xprt_transmit(task); + } task->tk_action = call_transmit_status; - if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) - return; - - if (!xprt_prepare_transmit(task)) - return; - xprt_transmit(task); + xprt_end_transmit(task); } /* @@ -1986,7 +1987,6 @@ call_transmit_status(struct rpc_task *task) * test first. */ if (task->tk_status == 0) { - xprt_end_transmit(task); xprt_request_wait_receive(task); return; } @@ -1994,15 +1994,8 @@ call_transmit_status(struct rpc_task *task) switch (task->tk_status) { default: dprint_status(task); - xprt_end_transmit(task); - break; - case -EBADSLT: - xprt_end_transmit(task); - task->tk_action = call_transmit; - task->tk_status = 0; break; case -EBADMSG: - xprt_end_transmit(task); task->tk_status = 0; task->tk_action = call_encode; break; @@ -2015,6 +2008,7 @@ call_transmit_status(struct rpc_task *task) case -ENOBUFS: rpc_delay(task, HZ>>2); /* fall through */ + case -EBADSLT: case -EAGAIN: task->tk_action = call_transmit; task->tk_status = 0; @@ -2026,7 +2020,6 @@ call_transmit_status(struct rpc_task *task) case -ENETUNREACH: case -EPERM: if (RPC_IS_SOFTCONN(task)) { - xprt_end_transmit(task); if (!task->tk_msg.rpc_proc->p_proc) trace_xprt_ping(task->tk_xprt, task->tk_status); @@ -2069,9 +2062,6 @@ call_bc_transmit(struct rpc_task *task) xprt_transmit(task); - if (task->tk_status == -EAGAIN) - goto out_retry; - xprt_end_transmit(task); dprint_status(task); switch (task->tk_status) { @@ -2087,6 +2077,8 @@ call_bc_transmit(struct rpc_task *task) case -ENOTCONN: case -EPIPE: break; + case -EAGAIN: + goto out_retry; case -ETIMEDOUT: /* * Problem reaching the server. Disconnect and let the diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c index 5185efb9027b..87533fbb96cf 100644 --- a/net/sunrpc/svc_xprt.c +++ b/net/sunrpc/svc_xprt.c @@ -171,7 +171,6 @@ void svc_xprt_init(struct net *net, struct svc_xprt_class *xcl, mutex_init(&xprt->xpt_mutex); spin_lock_init(&xprt->xpt_lock); set_bit(XPT_BUSY, &xprt->xpt_flags); - rpc_init_wait_queue(&xprt->xpt_bc_pending, "xpt_bc_pending"); xprt->xpt_net = get_net(net); strcpy(xprt->xpt_remotebuf, "uninitialized"); } @@ -895,7 +894,6 @@ int svc_send(struct svc_rqst *rqstp) else len = xprt->xpt_ops->xpo_sendto(rqstp); mutex_unlock(&xprt->xpt_mutex); - rpc_wake_up(&xprt->xpt_bc_pending); trace_svc_send(rqstp, len); svc_xprt_release(rqstp); diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 849e102e3c5a..55dc5c7069b9 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -169,6 +169,17 @@ out: } EXPORT_SYMBOL_GPL(xprt_load_transport); +static void xprt_clear_locked(struct rpc_xprt *xprt) +{ + xprt->snd_task = NULL; + if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) { + smp_mb__before_atomic(); + clear_bit(XPRT_LOCKED, &xprt->state); + smp_mb__after_atomic(); + } else + queue_work(xprtiod_workqueue, &xprt->task_cleanup); +} + /** * xprt_reserve_xprt - serialize write access to transports * @task: task that is requesting access to the transport @@ -188,10 +199,14 @@ int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task) return 1; goto out_sleep; } + if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) + goto out_unlock; xprt->snd_task = task; return 1; +out_unlock: + xprt_clear_locked(xprt); out_sleep: dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt); @@ -208,17 +223,6 @@ out_sleep: } EXPORT_SYMBOL_GPL(xprt_reserve_xprt); -static void xprt_clear_locked(struct rpc_xprt *xprt) -{ - xprt->snd_task = NULL; - if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) { - smp_mb__before_atomic(); - clear_bit(XPRT_LOCKED, &xprt->state); - smp_mb__after_atomic(); - } else - queue_work(xprtiod_workqueue, &xprt->task_cleanup); -} - static bool xprt_need_congestion_window_wait(struct rpc_xprt *xprt) { @@ -267,10 +271,13 @@ int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) xprt->snd_task = task; return 1; } + if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) + goto out_unlock; if (!xprt_need_congestion_window_wait(xprt)) { xprt->snd_task = task; return 1; } +out_unlock: xprt_clear_locked(xprt); out_sleep: dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt); @@ -309,10 +316,12 @@ static void __xprt_lock_write_next(struct rpc_xprt *xprt) { if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) return; - + if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) + goto out_unlock; if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, __xprt_lock_write_func, xprt)) return; +out_unlock: xprt_clear_locked(xprt); } @@ -320,6 +329,8 @@ static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt) { if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) return; + if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) + goto out_unlock; if (xprt_need_congestion_window_wait(xprt)) goto out_unlock; if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, @@ -510,39 +521,46 @@ EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks); /** * xprt_wait_for_buffer_space - wait for transport output buffer to clear - * @task: task to be put to sleep - * @action: function pointer to be executed after wait + * @xprt: transport * * Note that we only set the timer for the case of RPC_IS_SOFT(), since * we don't in general want to force a socket disconnection due to * an incomplete RPC call transmission. */ -void xprt_wait_for_buffer_space(struct rpc_task *task, rpc_action action) +void xprt_wait_for_buffer_space(struct rpc_xprt *xprt) { - struct rpc_rqst *req = task->tk_rqstp; - struct rpc_xprt *xprt = req->rq_xprt; - - task->tk_timeout = RPC_IS_SOFT(task) ? req->rq_timeout : 0; - rpc_sleep_on(&xprt->pending, task, action); + set_bit(XPRT_WRITE_SPACE, &xprt->state); } EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space); +static bool +xprt_clear_write_space_locked(struct rpc_xprt *xprt) +{ + if (test_and_clear_bit(XPRT_WRITE_SPACE, &xprt->state)) { + __xprt_lock_write_next(xprt); + dprintk("RPC: write space: waking waiting task on " + "xprt %p\n", xprt); + return true; + } + return false; +} + /** * xprt_write_space - wake the task waiting for transport output buffer space * @xprt: transport with waiting tasks * * Can be called in a soft IRQ context, so xprt_write_space never sleeps. */ -void xprt_write_space(struct rpc_xprt *xprt) +bool xprt_write_space(struct rpc_xprt *xprt) { + bool ret; + + if (!test_bit(XPRT_WRITE_SPACE, &xprt->state)) + return false; spin_lock_bh(&xprt->transport_lock); - if (xprt->snd_task) { - dprintk("RPC: write space: waking waiting task on " - "xprt %p\n", xprt); - rpc_wake_up_queued_task_on_wq(xprtiod_workqueue, - &xprt->pending, xprt->snd_task); - } + ret = xprt_clear_write_space_locked(xprt); spin_unlock_bh(&xprt->transport_lock); + return ret; } EXPORT_SYMBOL_GPL(xprt_write_space); @@ -653,6 +671,7 @@ void xprt_disconnect_done(struct rpc_xprt *xprt) dprintk("RPC: disconnected transport %p\n", xprt); spin_lock_bh(&xprt->transport_lock); xprt_clear_connected(xprt); + xprt_clear_write_space_locked(xprt); xprt_wake_pending_tasks(xprt, -EAGAIN); spin_unlock_bh(&xprt->transport_lock); } @@ -1326,9 +1345,7 @@ xprt_transmit(struct rpc_task *task) if (!xprt_request_data_received(task) || test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) continue; - } else if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) - rpc_wake_up_queued_task(&xprt->pending, task); - else + } else if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) task->tk_status = status; break; } diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 0020dc401215..53fa95d60015 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -866,7 +866,7 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) out_err: switch (ret) { case -EAGAIN: - xprt_wait_for_buffer_space(rqst->rq_task, NULL); + xprt_wait_for_buffer_space(rqst->rq_xprt); break; case -ENOBUFS: break; diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c index d1618c70edb4..35a8c3aab302 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c +++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c @@ -224,12 +224,7 @@ xprt_rdma_bc_send_request(struct rpc_rqst *rqst, struct rpc_task *task) dprintk("svcrdma: sending bc call with xid: %08x\n", be32_to_cpu(rqst->rq_xid)); - if (!mutex_trylock(&sxprt->xpt_mutex)) { - rpc_sleep_on(&sxprt->xpt_bc_pending, task, NULL); - if (!mutex_trylock(&sxprt->xpt_mutex)) - return -EAGAIN; - rpc_wake_up_queued_task(&sxprt->xpt_bc_pending, task); - } + mutex_lock(&sxprt->xpt_mutex); ret = -ENOTCONN; rdma = container_of(sxprt, struct svcxprt_rdma, sc_xprt); diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index f54e8110f4c6..ef8d0e81cbda 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -440,20 +440,12 @@ out: return err; } -static void xs_nospace_callback(struct rpc_task *task) -{ - struct sock_xprt *transport = container_of(task->tk_rqstp->rq_xprt, struct sock_xprt, xprt); - - transport->inet->sk_write_pending--; -} - /** - * xs_nospace - place task on wait queue if transmit was incomplete + * xs_nospace - handle transmit was incomplete * @req: pointer to RPC request - * @task: task to put to sleep * */ -static int xs_nospace(struct rpc_rqst *req, struct rpc_task *task) +static int xs_nospace(struct rpc_rqst *req) { struct rpc_xprt *xprt = req->rq_xprt; struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); @@ -461,7 +453,8 @@ static int xs_nospace(struct rpc_rqst *req, struct rpc_task *task) int ret = -EAGAIN; dprintk("RPC: %5u xmit incomplete (%u left of %u)\n", - task->tk_pid, req->rq_slen - transport->xmit.offset, + req->rq_task->tk_pid, + req->rq_slen - transport->xmit.offset, req->rq_slen); /* Protect against races with write_space */ @@ -471,7 +464,7 @@ static int xs_nospace(struct rpc_rqst *req, struct rpc_task *task) if (xprt_connected(xprt)) { /* wait for more buffer space */ sk->sk_write_pending++; - xprt_wait_for_buffer_space(task, xs_nospace_callback); + xprt_wait_for_buffer_space(xprt); } else ret = -ENOTCONN; @@ -569,7 +562,7 @@ static int xs_local_send_request(struct rpc_rqst *req, struct rpc_task *task) case -ENOBUFS: break; case -EAGAIN: - status = xs_nospace(req, task); + status = xs_nospace(req); break; default: dprintk("RPC: sendmsg returned unrecognized error %d\n", @@ -642,7 +635,7 @@ process_status: /* Should we call xs_close() here? */ break; case -EAGAIN: - status = xs_nospace(req, task); + status = xs_nospace(req); break; case -ENETUNREACH: case -ENOBUFS: @@ -765,7 +758,7 @@ static int xs_tcp_send_request(struct rpc_rqst *req, struct rpc_task *task) /* Should we call xs_close() here? */ break; case -EAGAIN: - status = xs_nospace(req, task); + status = xs_nospace(req); break; case -ECONNRESET: case -ECONNREFUSED: @@ -1672,7 +1665,8 @@ static void xs_write_space(struct sock *sk) if (!wq || test_and_clear_bit(SOCKWQ_ASYNC_NOSPACE, &wq->flags) == 0) goto out; - xprt_write_space(xprt); + if (xprt_write_space(xprt)) + sk->sk_write_pending--; out: rcu_read_unlock(); } @@ -2725,12 +2719,7 @@ static int bc_send_request(struct rpc_rqst *req, struct rpc_task *task) * Grab the mutex to serialize data as the connection is shared * with the fore channel */ - if (!mutex_trylock(&xprt->xpt_mutex)) { - rpc_sleep_on(&xprt->xpt_bc_pending, task, NULL); - if (!mutex_trylock(&xprt->xpt_mutex)) - return -EAGAIN; - rpc_wake_up_queued_task(&xprt->xpt_bc_pending, task); - } + mutex_lock(&xprt->xpt_mutex); if (test_bit(XPT_DEAD, &xprt->xpt_flags)) len = -ENOTCONN; else -- cgit v1.2.3-55-g7522 From adfa71446dd0943ba376eff3e05c7c89582f8038 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Mon, 3 Sep 2018 23:58:59 -0400 Subject: SUNRPC: Cleanup: remove the unused 'task' argument from the request_send() Signed-off-by: Trond Myklebust --- include/linux/sunrpc/xprt.h | 2 +- net/sunrpc/xprt.c | 2 +- net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 2 +- net/sunrpc/xprtrdma/transport.c | 4 ++-- net/sunrpc/xprtsock.c | 11 ++++------- 5 files changed, 9 insertions(+), 12 deletions(-) (limited to 'net/sunrpc/xprtsock.c') diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h index 5600242ccbf9..823860cce0bc 100644 --- a/include/linux/sunrpc/xprt.h +++ b/include/linux/sunrpc/xprt.h @@ -141,7 +141,7 @@ struct rpc_xprt_ops { void (*connect)(struct rpc_xprt *xprt, struct rpc_task *task); int (*buf_alloc)(struct rpc_task *task); void (*buf_free)(struct rpc_task *task); - int (*send_request)(struct rpc_rqst *req, struct rpc_task *task); + int (*send_request)(struct rpc_rqst *req); void (*set_retrans_timeout)(struct rpc_task *task); void (*timer)(struct rpc_xprt *xprt, struct rpc_task *task); void (*release_request)(struct rpc_task *task); diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 55dc5c7069b9..c86a5df6c338 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -1283,7 +1283,7 @@ xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task) req->rq_ntrans++; connect_cookie = xprt->connect_cookie; - status = xprt->ops->send_request(req, snd_task); + status = xprt->ops->send_request(req); trace_xprt_transmit(xprt, req->rq_xid, status); if (status != 0) { req->rq_ntrans--; diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c index 35a8c3aab302..992312504cfd 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c +++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c @@ -215,7 +215,7 @@ drop_connection: * connection. */ static int -xprt_rdma_bc_send_request(struct rpc_rqst *rqst, struct rpc_task *task) +xprt_rdma_bc_send_request(struct rpc_rqst *rqst) { struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt; struct svcxprt_rdma *rdma; diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index 9ff322e53f37..a5a6a4a353f2 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -693,7 +693,7 @@ xprt_rdma_free(struct rpc_task *task) /** * xprt_rdma_send_request - marshal and send an RPC request - * @task: RPC task with an RPC message in rq_snd_buf + * @rqst: RPC message in rq_snd_buf * * Caller holds the transport's write lock. * @@ -706,7 +706,7 @@ xprt_rdma_free(struct rpc_task *task) * sent. Do not try to send this message again. */ static int -xprt_rdma_send_request(struct rpc_rqst *rqst, struct rpc_task *task) +xprt_rdma_send_request(struct rpc_rqst *rqst) { struct rpc_xprt *xprt = rqst->rq_xprt; struct rpcrdma_req *req = rpcr_to_rdmar(rqst); diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index ef8d0e81cbda..f16406228ead 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -507,7 +507,6 @@ static inline void xs_encode_stream_record_marker(struct xdr_buf *buf) /** * xs_local_send_request - write an RPC request to an AF_LOCAL socket * @req: pointer to RPC request - * @task: RPC task that manages the state of an RPC request * * Return values: * 0: The request has been sent @@ -516,7 +515,7 @@ static inline void xs_encode_stream_record_marker(struct xdr_buf *buf) * ENOTCONN: Caller needs to invoke connect logic then call again * other: Some other error occured, the request was not sent */ -static int xs_local_send_request(struct rpc_rqst *req, struct rpc_task *task) +static int xs_local_send_request(struct rpc_rqst *req) { struct rpc_xprt *xprt = req->rq_xprt; struct sock_xprt *transport = @@ -579,7 +578,6 @@ static int xs_local_send_request(struct rpc_rqst *req, struct rpc_task *task) /** * xs_udp_send_request - write an RPC request to a UDP socket * @req: pointer to RPC request - * @task: address of RPC task that manages the state of an RPC request * * Return values: * 0: The request has been sent @@ -588,7 +586,7 @@ static int xs_local_send_request(struct rpc_rqst *req, struct rpc_task *task) * ENOTCONN: Caller needs to invoke connect logic then call again * other: Some other error occurred, the request was not sent */ -static int xs_udp_send_request(struct rpc_rqst *req, struct rpc_task *task) +static int xs_udp_send_request(struct rpc_rqst *req) { struct rpc_xprt *xprt = req->rq_xprt; struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); @@ -656,7 +654,6 @@ process_status: /** * xs_tcp_send_request - write an RPC request to a TCP socket * @req: pointer to RPC request - * @task: address of RPC task that manages the state of an RPC request * * Return values: * 0: The request has been sent @@ -668,7 +665,7 @@ process_status: * XXX: In the case of soft timeouts, should we eventually give up * if sendmsg is not able to make progress? */ -static int xs_tcp_send_request(struct rpc_rqst *req, struct rpc_task *task) +static int xs_tcp_send_request(struct rpc_rqst *req) { struct rpc_xprt *xprt = req->rq_xprt; struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); @@ -2704,7 +2701,7 @@ static int bc_sendto(struct rpc_rqst *req) /* * The send routine. Borrows from svc_send */ -static int bc_send_request(struct rpc_rqst *req, struct rpc_task *task) +static int bc_send_request(struct rpc_rqst *req) { struct svc_xprt *xprt; int len; -- cgit v1.2.3-55-g7522 From 277e4ab7d530bf287e02b65cfcd3ea8f489784f6 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Fri, 14 Sep 2018 09:49:06 -0400 Subject: SUNRPC: Simplify TCP receive code by switching to using iterators Most of this code should also be reusable with other socket types. Signed-off-by: Trond Myklebust --- include/linux/sunrpc/xprtsock.h | 19 +- include/trace/events/sunrpc.h | 15 +- net/sunrpc/xprtsock.c | 697 +++++++++++++++++++--------------------- 3 files changed, 338 insertions(+), 393 deletions(-) (limited to 'net/sunrpc/xprtsock.c') diff --git a/include/linux/sunrpc/xprtsock.h b/include/linux/sunrpc/xprtsock.h index 005cfb6e7238..458bfe0137f5 100644 --- a/include/linux/sunrpc/xprtsock.h +++ b/include/linux/sunrpc/xprtsock.h @@ -31,15 +31,16 @@ struct sock_xprt { * State of TCP reply receive */ struct { - __be32 fraghdr, + struct { + __be32 fraghdr, xid, calldir; + } __attribute__((packed)); u32 offset, len; - unsigned long copied, - flags; + unsigned long copied; } recv; /* @@ -76,21 +77,9 @@ struct sock_xprt { void (*old_error_report)(struct sock *); }; -/* - * TCP receive state flags - */ -#define TCP_RCV_LAST_FRAG (1UL << 0) -#define TCP_RCV_COPY_FRAGHDR (1UL << 1) -#define TCP_RCV_COPY_XID (1UL << 2) -#define TCP_RCV_COPY_DATA (1UL << 3) -#define TCP_RCV_READ_CALLDIR (1UL << 4) -#define TCP_RCV_COPY_CALLDIR (1UL << 5) - /* * TCP RPC flags */ -#define TCP_RPC_REPLY (1UL << 6) - #define XPRT_SOCK_CONNECTING 1U #define XPRT_SOCK_DATA_READY (2) #define XPRT_SOCK_UPD_TIMEOUT (3) diff --git a/include/trace/events/sunrpc.h b/include/trace/events/sunrpc.h index 0aa347194e0f..19e08d12696c 100644 --- a/include/trace/events/sunrpc.h +++ b/include/trace/events/sunrpc.h @@ -497,16 +497,6 @@ TRACE_EVENT(xs_tcp_data_ready, __get_str(port), __entry->err, __entry->total) ); -#define rpc_show_sock_xprt_flags(flags) \ - __print_flags(flags, "|", \ - { TCP_RCV_LAST_FRAG, "TCP_RCV_LAST_FRAG" }, \ - { TCP_RCV_COPY_FRAGHDR, "TCP_RCV_COPY_FRAGHDR" }, \ - { TCP_RCV_COPY_XID, "TCP_RCV_COPY_XID" }, \ - { TCP_RCV_COPY_DATA, "TCP_RCV_COPY_DATA" }, \ - { TCP_RCV_READ_CALLDIR, "TCP_RCV_READ_CALLDIR" }, \ - { TCP_RCV_COPY_CALLDIR, "TCP_RCV_COPY_CALLDIR" }, \ - { TCP_RPC_REPLY, "TCP_RPC_REPLY" }) - TRACE_EVENT(xs_tcp_data_recv, TP_PROTO(struct sock_xprt *xs), @@ -516,7 +506,6 @@ TRACE_EVENT(xs_tcp_data_recv, __string(addr, xs->xprt.address_strings[RPC_DISPLAY_ADDR]) __string(port, xs->xprt.address_strings[RPC_DISPLAY_PORT]) __field(u32, xid) - __field(unsigned long, flags) __field(unsigned long, copied) __field(unsigned int, reclen) __field(unsigned long, offset) @@ -526,15 +515,13 @@ TRACE_EVENT(xs_tcp_data_recv, __assign_str(addr, xs->xprt.address_strings[RPC_DISPLAY_ADDR]); __assign_str(port, xs->xprt.address_strings[RPC_DISPLAY_PORT]); __entry->xid = be32_to_cpu(xs->recv.xid); - __entry->flags = xs->recv.flags; __entry->copied = xs->recv.copied; __entry->reclen = xs->recv.len; __entry->offset = xs->recv.offset; ), - TP_printk("peer=[%s]:%s xid=0x%08x flags=%s copied=%lu reclen=%u offset=%lu", + TP_printk("peer=[%s]:%s xid=0x%08x copied=%lu reclen=%u offset=%lu", __get_str(addr), __get_str(port), __entry->xid, - rpc_show_sock_xprt_flags(__entry->flags), __entry->copied, __entry->reclen, __entry->offset) ); diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index f16406228ead..06aa75008708 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -47,13 +47,13 @@ #include #include #include +#include +#include #include #include "sunrpc.h" -#define RPC_TCP_READ_CHUNK_SZ (3*512*1024) - static void xs_close(struct rpc_xprt *xprt); static void xs_tcp_set_socket_timeouts(struct rpc_xprt *xprt, struct socket *sock); @@ -325,6 +325,323 @@ static void xs_free_peer_addresses(struct rpc_xprt *xprt) } } +static size_t +xs_alloc_sparse_pages(struct xdr_buf *buf, size_t want, gfp_t gfp) +{ + size_t i,n; + + if (!(buf->flags & XDRBUF_SPARSE_PAGES)) + return want; + if (want > buf->page_len) + want = buf->page_len; + n = (buf->page_base + want + PAGE_SIZE - 1) >> PAGE_SHIFT; + for (i = 0; i < n; i++) { + if (buf->pages[i]) + continue; + buf->bvec[i].bv_page = buf->pages[i] = alloc_page(gfp); + if (!buf->pages[i]) { + buf->page_len = (i * PAGE_SIZE) - buf->page_base; + return buf->page_len; + } + } + return want; +} + +static ssize_t +xs_sock_recvmsg(struct socket *sock, struct msghdr *msg, int flags, size_t seek) +{ + ssize_t ret; + if (seek != 0) + iov_iter_advance(&msg->msg_iter, seek); + ret = sock_recvmsg(sock, msg, flags); + return ret > 0 ? ret + seek : ret; +} + +static ssize_t +xs_read_kvec(struct socket *sock, struct msghdr *msg, int flags, + struct kvec *kvec, size_t count, size_t seek) +{ + iov_iter_kvec(&msg->msg_iter, READ | ITER_KVEC, kvec, 1, count); + return xs_sock_recvmsg(sock, msg, flags, seek); +} + +static ssize_t +xs_read_bvec(struct socket *sock, struct msghdr *msg, int flags, + struct bio_vec *bvec, unsigned long nr, size_t count, + size_t seek) +{ + iov_iter_bvec(&msg->msg_iter, READ | ITER_BVEC, bvec, nr, count); + return xs_sock_recvmsg(sock, msg, flags, seek); +} + +static ssize_t +xs_read_discard(struct socket *sock, struct msghdr *msg, int flags, + size_t count) +{ + struct kvec kvec = { 0 }; + return xs_read_kvec(sock, msg, flags | MSG_TRUNC, &kvec, count, 0); +} + +static ssize_t +xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags, + struct xdr_buf *buf, size_t count, size_t seek, size_t *read) +{ + size_t want, seek_init = seek, offset = 0; + ssize_t ret; + + if (seek < buf->head[0].iov_len) { + want = min_t(size_t, count, buf->head[0].iov_len); + ret = xs_read_kvec(sock, msg, flags, &buf->head[0], want, seek); + if (ret <= 0) + goto sock_err; + offset += ret; + if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC)) + goto out; + if (ret != want) + goto eagain; + seek = 0; + } else { + seek -= buf->head[0].iov_len; + offset += buf->head[0].iov_len; + } + if (seek < buf->page_len) { + want = xs_alloc_sparse_pages(buf, + min_t(size_t, count - offset, buf->page_len), + GFP_NOWAIT); + ret = xs_read_bvec(sock, msg, flags, buf->bvec, + xdr_buf_pagecount(buf), + want + buf->page_base, + seek + buf->page_base); + if (ret <= 0) + goto sock_err; + offset += ret - buf->page_base; + if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC)) + goto out; + if (ret != want) + goto eagain; + seek = 0; + } else { + seek -= buf->page_len; + offset += buf->page_len; + } + if (seek < buf->tail[0].iov_len) { + want = min_t(size_t, count - offset, buf->tail[0].iov_len); + ret = xs_read_kvec(sock, msg, flags, &buf->tail[0], want, seek); + if (ret <= 0) + goto sock_err; + offset += ret; + if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC)) + goto out; + if (ret != want) + goto eagain; + } else + offset += buf->tail[0].iov_len; + ret = -EMSGSIZE; + msg->msg_flags |= MSG_TRUNC; +out: + *read = offset - seek_init; + return ret; +eagain: + ret = -EAGAIN; + goto out; +sock_err: + offset += seek; + goto out; +} + +static void +xs_read_header(struct sock_xprt *transport, struct xdr_buf *buf) +{ + if (!transport->recv.copied) { + if (buf->head[0].iov_len >= transport->recv.offset) + memcpy(buf->head[0].iov_base, + &transport->recv.xid, + transport->recv.offset); + transport->recv.copied = transport->recv.offset; + } +} + +static bool +xs_read_stream_request_done(struct sock_xprt *transport) +{ + return transport->recv.fraghdr & cpu_to_be32(RPC_LAST_STREAM_FRAGMENT); +} + +static ssize_t +xs_read_stream_request(struct sock_xprt *transport, struct msghdr *msg, + int flags, struct rpc_rqst *req) +{ + struct xdr_buf *buf = &req->rq_private_buf; + size_t want, read; + ssize_t ret; + + xs_read_header(transport, buf); + + want = transport->recv.len - transport->recv.offset; + ret = xs_read_xdr_buf(transport->sock, msg, flags, buf, + transport->recv.copied + want, transport->recv.copied, + &read); + transport->recv.offset += read; + transport->recv.copied += read; + if (transport->recv.offset == transport->recv.len) { + if (xs_read_stream_request_done(transport)) + msg->msg_flags |= MSG_EOR; + return transport->recv.copied; + } + + switch (ret) { + case -EMSGSIZE: + return transport->recv.copied; + case 0: + return -ESHUTDOWN; + default: + if (ret < 0) + return ret; + } + return -EAGAIN; +} + +static size_t +xs_read_stream_headersize(bool isfrag) +{ + if (isfrag) + return sizeof(__be32); + return 3 * sizeof(__be32); +} + +static ssize_t +xs_read_stream_header(struct sock_xprt *transport, struct msghdr *msg, + int flags, size_t want, size_t seek) +{ + struct kvec kvec = { + .iov_base = &transport->recv.fraghdr, + .iov_len = want, + }; + return xs_read_kvec(transport->sock, msg, flags, &kvec, want, seek); +} + +#if defined(CONFIG_SUNRPC_BACKCHANNEL) +static ssize_t +xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags) +{ + struct rpc_xprt *xprt = &transport->xprt; + struct rpc_rqst *req; + ssize_t ret; + + /* Look up and lock the request corresponding to the given XID */ + req = xprt_lookup_bc_request(xprt, transport->recv.xid); + if (!req) { + printk(KERN_WARNING "Callback slot table overflowed\n"); + return -ESHUTDOWN; + } + + ret = xs_read_stream_request(transport, msg, flags, req); + if (msg->msg_flags & (MSG_EOR|MSG_TRUNC)) + xprt_complete_bc_request(req, ret); + + return ret; +} +#else /* CONFIG_SUNRPC_BACKCHANNEL */ +static ssize_t +xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags) +{ + return -ESHUTDOWN; +} +#endif /* CONFIG_SUNRPC_BACKCHANNEL */ + +static ssize_t +xs_read_stream_reply(struct sock_xprt *transport, struct msghdr *msg, int flags) +{ + struct rpc_xprt *xprt = &transport->xprt; + struct rpc_rqst *req; + ssize_t ret = 0; + + /* Look up and lock the request corresponding to the given XID */ + spin_lock(&xprt->queue_lock); + req = xprt_lookup_rqst(xprt, transport->recv.xid); + if (!req) { + msg->msg_flags |= MSG_TRUNC; + goto out; + } + xprt_pin_rqst(req); + spin_unlock(&xprt->queue_lock); + + ret = xs_read_stream_request(transport, msg, flags, req); + + spin_lock(&xprt->queue_lock); + if (msg->msg_flags & (MSG_EOR|MSG_TRUNC)) + xprt_complete_rqst(req->rq_task, ret); + xprt_unpin_rqst(req); +out: + spin_unlock(&xprt->queue_lock); + return ret; +} + +static ssize_t +xs_read_stream(struct sock_xprt *transport, int flags) +{ + struct msghdr msg = { 0 }; + size_t want, read = 0; + ssize_t ret = 0; + + if (transport->recv.len == 0) { + want = xs_read_stream_headersize(transport->recv.copied != 0); + ret = xs_read_stream_header(transport, &msg, flags, want, + transport->recv.offset); + if (ret <= 0) + goto out_err; + transport->recv.offset = ret; + if (ret != want) { + ret = -EAGAIN; + goto out_err; + } + transport->recv.len = be32_to_cpu(transport->recv.fraghdr) & + RPC_FRAGMENT_SIZE_MASK; + transport->recv.offset -= sizeof(transport->recv.fraghdr); + read = ret; + } + + switch (be32_to_cpu(transport->recv.calldir)) { + case RPC_CALL: + ret = xs_read_stream_call(transport, &msg, flags); + break; + case RPC_REPLY: + ret = xs_read_stream_reply(transport, &msg, flags); + } + if (msg.msg_flags & MSG_TRUNC) { + transport->recv.calldir = cpu_to_be32(-1); + transport->recv.copied = -1; + } + if (ret < 0) + goto out_err; + read += ret; + if (transport->recv.offset < transport->recv.len) { + ret = xs_read_discard(transport->sock, &msg, flags, + transport->recv.len - transport->recv.offset); + if (ret <= 0) + goto out_err; + transport->recv.offset += ret; + read += ret; + if (transport->recv.offset != transport->recv.len) + return -EAGAIN; + } + if (xs_read_stream_request_done(transport)) { + trace_xs_tcp_data_recv(transport); + transport->recv.copied = 0; + } + transport->recv.offset = 0; + transport->recv.len = 0; + return read; +out_err: + switch (ret) { + case 0: + case -ESHUTDOWN: + xprt_force_disconnect(&transport->xprt); + return -ESHUTDOWN; + } + return ret; +} + #define XS_SENDMSG_FLAGS (MSG_DONTWAIT | MSG_NOSIGNAL) static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more) @@ -484,6 +801,12 @@ static int xs_nospace(struct rpc_rqst *req) return ret; } +static void +xs_stream_prepare_request(struct rpc_rqst *req) +{ + req->rq_task->tk_status = xdr_alloc_bvec(&req->rq_rcv_buf, GFP_NOIO); +} + /* * Determine if the previous message in the stream was aborted before it * could complete transmission. @@ -1157,263 +1480,7 @@ static void xs_tcp_force_close(struct rpc_xprt *xprt) xprt_force_disconnect(xprt); } -static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_reader *desc) -{ - struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); - size_t len, used; - char *p; - - p = ((char *) &transport->recv.fraghdr) + transport->recv.offset; - len = sizeof(transport->recv.fraghdr) - transport->recv.offset; - used = xdr_skb_read_bits(desc, p, len); - transport->recv.offset += used; - if (used != len) - return; - - transport->recv.len = ntohl(transport->recv.fraghdr); - if (transport->recv.len & RPC_LAST_STREAM_FRAGMENT) - transport->recv.flags |= TCP_RCV_LAST_FRAG; - else - transport->recv.flags &= ~TCP_RCV_LAST_FRAG; - transport->recv.len &= RPC_FRAGMENT_SIZE_MASK; - - transport->recv.flags &= ~TCP_RCV_COPY_FRAGHDR; - transport->recv.offset = 0; - - /* Sanity check of the record length */ - if (unlikely(transport->recv.len < 8)) { - dprintk("RPC: invalid TCP record fragment length\n"); - xs_tcp_force_close(xprt); - return; - } - dprintk("RPC: reading TCP record fragment of length %d\n", - transport->recv.len); -} - -static void xs_tcp_check_fraghdr(struct sock_xprt *transport) -{ - if (transport->recv.offset == transport->recv.len) { - transport->recv.flags |= TCP_RCV_COPY_FRAGHDR; - transport->recv.offset = 0; - if (transport->recv.flags & TCP_RCV_LAST_FRAG) { - transport->recv.flags &= ~TCP_RCV_COPY_DATA; - transport->recv.flags |= TCP_RCV_COPY_XID; - transport->recv.copied = 0; - } - } -} - -static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_reader *desc) -{ - size_t len, used; - char *p; - - len = sizeof(transport->recv.xid) - transport->recv.offset; - dprintk("RPC: reading XID (%zu bytes)\n", len); - p = ((char *) &transport->recv.xid) + transport->recv.offset; - used = xdr_skb_read_bits(desc, p, len); - transport->recv.offset += used; - if (used != len) - return; - transport->recv.flags &= ~TCP_RCV_COPY_XID; - transport->recv.flags |= TCP_RCV_READ_CALLDIR; - transport->recv.copied = 4; - dprintk("RPC: reading %s XID %08x\n", - (transport->recv.flags & TCP_RPC_REPLY) ? "reply for" - : "request with", - ntohl(transport->recv.xid)); - xs_tcp_check_fraghdr(transport); -} - -static inline void xs_tcp_read_calldir(struct sock_xprt *transport, - struct xdr_skb_reader *desc) -{ - size_t len, used; - u32 offset; - char *p; - - /* - * We want transport->recv.offset to be 8 at the end of this routine - * (4 bytes for the xid and 4 bytes for the call/reply flag). - * When this function is called for the first time, - * transport->recv.offset is 4 (after having already read the xid). - */ - offset = transport->recv.offset - sizeof(transport->recv.xid); - len = sizeof(transport->recv.calldir) - offset; - dprintk("RPC: reading CALL/REPLY flag (%zu bytes)\n", len); - p = ((char *) &transport->recv.calldir) + offset; - used = xdr_skb_read_bits(desc, p, len); - transport->recv.offset += used; - if (used != len) - return; - transport->recv.flags &= ~TCP_RCV_READ_CALLDIR; - /* - * We don't yet have the XDR buffer, so we will write the calldir - * out after we get the buffer from the 'struct rpc_rqst' - */ - switch (ntohl(transport->recv.calldir)) { - case RPC_REPLY: - transport->recv.flags |= TCP_RCV_COPY_CALLDIR; - transport->recv.flags |= TCP_RCV_COPY_DATA; - transport->recv.flags |= TCP_RPC_REPLY; - break; - case RPC_CALL: - transport->recv.flags |= TCP_RCV_COPY_CALLDIR; - transport->recv.flags |= TCP_RCV_COPY_DATA; - transport->recv.flags &= ~TCP_RPC_REPLY; - break; - default: - dprintk("RPC: invalid request message type\n"); - xs_tcp_force_close(&transport->xprt); - } - xs_tcp_check_fraghdr(transport); -} - -static inline void xs_tcp_read_common(struct rpc_xprt *xprt, - struct xdr_skb_reader *desc, - struct rpc_rqst *req) -{ - struct sock_xprt *transport = - container_of(xprt, struct sock_xprt, xprt); - struct xdr_buf *rcvbuf; - size_t len; - ssize_t r; - - rcvbuf = &req->rq_private_buf; - - if (transport->recv.flags & TCP_RCV_COPY_CALLDIR) { - /* - * Save the RPC direction in the XDR buffer - */ - memcpy(rcvbuf->head[0].iov_base + transport->recv.copied, - &transport->recv.calldir, - sizeof(transport->recv.calldir)); - transport->recv.copied += sizeof(transport->recv.calldir); - transport->recv.flags &= ~TCP_RCV_COPY_CALLDIR; - } - - len = desc->count; - if (len > transport->recv.len - transport->recv.offset) - desc->count = transport->recv.len - transport->recv.offset; - r = xdr_partial_copy_from_skb(rcvbuf, transport->recv.copied, - desc, xdr_skb_read_bits); - - if (desc->count) { - /* Error when copying to the receive buffer, - * usually because we weren't able to allocate - * additional buffer pages. All we can do now - * is turn off TCP_RCV_COPY_DATA, so the request - * will not receive any additional updates, - * and time out. - * Any remaining data from this record will - * be discarded. - */ - transport->recv.flags &= ~TCP_RCV_COPY_DATA; - dprintk("RPC: XID %08x truncated request\n", - ntohl(transport->recv.xid)); - dprintk("RPC: xprt = %p, recv.copied = %lu, " - "recv.offset = %u, recv.len = %u\n", - xprt, transport->recv.copied, - transport->recv.offset, transport->recv.len); - return; - } - - transport->recv.copied += r; - transport->recv.offset += r; - desc->count = len - r; - - dprintk("RPC: XID %08x read %zd bytes\n", - ntohl(transport->recv.xid), r); - dprintk("RPC: xprt = %p, recv.copied = %lu, recv.offset = %u, " - "recv.len = %u\n", xprt, transport->recv.copied, - transport->recv.offset, transport->recv.len); - - if (transport->recv.copied == req->rq_private_buf.buflen) - transport->recv.flags &= ~TCP_RCV_COPY_DATA; - else if (transport->recv.offset == transport->recv.len) { - if (transport->recv.flags & TCP_RCV_LAST_FRAG) - transport->recv.flags &= ~TCP_RCV_COPY_DATA; - } -} - -/* - * Finds the request corresponding to the RPC xid and invokes the common - * tcp read code to read the data. - */ -static inline int xs_tcp_read_reply(struct rpc_xprt *xprt, - struct xdr_skb_reader *desc) -{ - struct sock_xprt *transport = - container_of(xprt, struct sock_xprt, xprt); - struct rpc_rqst *req; - - dprintk("RPC: read reply XID %08x\n", ntohl(transport->recv.xid)); - - /* Find and lock the request corresponding to this xid */ - spin_lock(&xprt->queue_lock); - req = xprt_lookup_rqst(xprt, transport->recv.xid); - if (!req) { - dprintk("RPC: XID %08x request not found!\n", - ntohl(transport->recv.xid)); - spin_unlock(&xprt->queue_lock); - return -1; - } - xprt_pin_rqst(req); - spin_unlock(&xprt->queue_lock); - - xs_tcp_read_common(xprt, desc, req); - - spin_lock(&xprt->queue_lock); - if (!(transport->recv.flags & TCP_RCV_COPY_DATA)) - xprt_complete_rqst(req->rq_task, transport->recv.copied); - xprt_unpin_rqst(req); - spin_unlock(&xprt->queue_lock); - return 0; -} - #if defined(CONFIG_SUNRPC_BACKCHANNEL) -/* - * Obtains an rpc_rqst previously allocated and invokes the common - * tcp read code to read the data. The result is placed in the callback - * queue. - * If we're unable to obtain the rpc_rqst we schedule the closing of the - * connection and return -1. - */ -static int xs_tcp_read_callback(struct rpc_xprt *xprt, - struct xdr_skb_reader *desc) -{ - struct sock_xprt *transport = - container_of(xprt, struct sock_xprt, xprt); - struct rpc_rqst *req; - - /* Look up the request corresponding to the given XID */ - req = xprt_lookup_bc_request(xprt, transport->recv.xid); - if (req == NULL) { - printk(KERN_WARNING "Callback slot table overflowed\n"); - xprt_force_disconnect(xprt); - return -1; - } - - dprintk("RPC: read callback XID %08x\n", ntohl(req->rq_xid)); - xs_tcp_read_common(xprt, desc, req); - - if (!(transport->recv.flags & TCP_RCV_COPY_DATA)) - xprt_complete_bc_request(req, transport->recv.copied); - - return 0; -} - -static inline int _xs_tcp_read_data(struct rpc_xprt *xprt, - struct xdr_skb_reader *desc) -{ - struct sock_xprt *transport = - container_of(xprt, struct sock_xprt, xprt); - - return (transport->recv.flags & TCP_RPC_REPLY) ? - xs_tcp_read_reply(xprt, desc) : - xs_tcp_read_callback(xprt, desc); -} - static int xs_tcp_bc_up(struct svc_serv *serv, struct net *net) { int ret; @@ -1429,106 +1496,14 @@ static size_t xs_tcp_bc_maxpayload(struct rpc_xprt *xprt) { return PAGE_SIZE; } -#else -static inline int _xs_tcp_read_data(struct rpc_xprt *xprt, - struct xdr_skb_reader *desc) -{ - return xs_tcp_read_reply(xprt, desc); -} #endif /* CONFIG_SUNRPC_BACKCHANNEL */ -/* - * Read data off the transport. This can be either an RPC_CALL or an - * RPC_REPLY. Relay the processing to helper functions. - */ -static void xs_tcp_read_data(struct rpc_xprt *xprt, - struct xdr_skb_reader *desc) -{ - struct sock_xprt *transport = - container_of(xprt, struct sock_xprt, xprt); - - if (_xs_tcp_read_data(xprt, desc) == 0) - xs_tcp_check_fraghdr(transport); - else { - /* - * The transport_lock protects the request handling. - * There's no need to hold it to update the recv.flags. - */ - transport->recv.flags &= ~TCP_RCV_COPY_DATA; - } -} - -static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc) -{ - size_t len; - - len = transport->recv.len - transport->recv.offset; - if (len > desc->count) - len = desc->count; - desc->count -= len; - desc->offset += len; - transport->recv.offset += len; - dprintk("RPC: discarded %zu bytes\n", len); - xs_tcp_check_fraghdr(transport); -} - -static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len) -{ - struct rpc_xprt *xprt = rd_desc->arg.data; - struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); - struct xdr_skb_reader desc = { - .skb = skb, - .offset = offset, - .count = len, - }; - size_t ret; - - dprintk("RPC: xs_tcp_data_recv started\n"); - do { - trace_xs_tcp_data_recv(transport); - /* Read in a new fragment marker if necessary */ - /* Can we ever really expect to get completely empty fragments? */ - if (transport->recv.flags & TCP_RCV_COPY_FRAGHDR) { - xs_tcp_read_fraghdr(xprt, &desc); - continue; - } - /* Read in the xid if necessary */ - if (transport->recv.flags & TCP_RCV_COPY_XID) { - xs_tcp_read_xid(transport, &desc); - continue; - } - /* Read in the call/reply flag */ - if (transport->recv.flags & TCP_RCV_READ_CALLDIR) { - xs_tcp_read_calldir(transport, &desc); - continue; - } - /* Read in the request data */ - if (transport->recv.flags & TCP_RCV_COPY_DATA) { - xs_tcp_read_data(xprt, &desc); - continue; - } - /* Skip over any trailing bytes on short reads */ - xs_tcp_read_discard(transport, &desc); - } while (desc.count); - ret = len - desc.count; - if (ret < rd_desc->count) - rd_desc->count -= ret; - else - rd_desc->count = 0; - trace_xs_tcp_data_recv(transport); - dprintk("RPC: xs_tcp_data_recv done\n"); - return ret; -} - static void xs_tcp_data_receive(struct sock_xprt *transport) { struct rpc_xprt *xprt = &transport->xprt; struct sock *sk; - read_descriptor_t rd_desc = { - .arg.data = xprt, - }; - unsigned long total = 0; - int read = 0; + size_t read = 0; + ssize_t ret = 0; restart: mutex_lock(&transport->recv_mutex); @@ -1536,18 +1511,12 @@ restart: if (sk == NULL) goto out; - /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */ for (;;) { - rd_desc.count = RPC_TCP_READ_CHUNK_SZ; - lock_sock(sk); - read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv); - if (rd_desc.count != 0 || read < 0) { - clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state); - release_sock(sk); + clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state); + ret = xs_read_stream(transport, MSG_DONTWAIT | MSG_NOSIGNAL); + if (ret < 0) break; - } - release_sock(sk); - total += read; + read += ret; if (need_resched()) { mutex_unlock(&transport->recv_mutex); cond_resched(); @@ -1558,7 +1527,7 @@ restart: queue_work(xprtiod_workqueue, &transport->recv_worker); out: mutex_unlock(&transport->recv_mutex); - trace_xs_tcp_data_ready(xprt, read, total); + trace_xs_tcp_data_ready(xprt, ret, read); } static void xs_tcp_data_receive_workfn(struct work_struct *work) @@ -2380,7 +2349,6 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) transport->recv.offset = 0; transport->recv.len = 0; transport->recv.copied = 0; - transport->recv.flags = TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID; transport->xmit.offset = 0; /* Tell the socket layer to start connecting... */ @@ -2802,6 +2770,7 @@ static const struct rpc_xprt_ops xs_tcp_ops = { .connect = xs_connect, .buf_alloc = rpc_malloc, .buf_free = rpc_free, + .prepare_request = xs_stream_prepare_request, .send_request = xs_tcp_send_request, .set_retrans_timeout = xprt_set_retrans_timeout_def, .close = xs_tcp_shutdown, -- cgit v1.2.3-55-g7522 From c50b8ee02f1cb9506ac06d22e8414e9fef7d6890 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Fri, 14 Sep 2018 14:26:28 -0400 Subject: SUNRPC: Clean up - rename xs_tcp_data_receive() to xs_stream_data_receive() In preparation for sharing with AF_LOCAL. Signed-off-by: Trond Myklebust --- include/trace/events/sunrpc.h | 16 +++++----- net/sunrpc/xprtsock.c | 71 ++++++++++++++++++------------------------- 2 files changed, 38 insertions(+), 49 deletions(-) (limited to 'net/sunrpc/xprtsock.c') diff --git a/include/trace/events/sunrpc.h b/include/trace/events/sunrpc.h index 19e08d12696c..28e384186c35 100644 --- a/include/trace/events/sunrpc.h +++ b/include/trace/events/sunrpc.h @@ -470,14 +470,14 @@ TRACE_EVENT(xprt_ping, __get_str(addr), __get_str(port), __entry->status) ); -TRACE_EVENT(xs_tcp_data_ready, - TP_PROTO(struct rpc_xprt *xprt, int err, unsigned int total), +TRACE_EVENT(xs_stream_read_data, + TP_PROTO(struct rpc_xprt *xprt, ssize_t err, size_t total), TP_ARGS(xprt, err, total), TP_STRUCT__entry( - __field(int, err) - __field(unsigned int, total) + __field(ssize_t, err) + __field(size_t, total) __string(addr, xprt ? xprt->address_strings[RPC_DISPLAY_ADDR] : "(null)") __string(port, xprt ? xprt->address_strings[RPC_DISPLAY_PORT] : @@ -493,11 +493,11 @@ TRACE_EVENT(xs_tcp_data_ready, xprt->address_strings[RPC_DISPLAY_PORT] : "(null)"); ), - TP_printk("peer=[%s]:%s err=%d total=%u", __get_str(addr), + TP_printk("peer=[%s]:%s err=%zd total=%zu", __get_str(addr), __get_str(port), __entry->err, __entry->total) ); -TRACE_EVENT(xs_tcp_data_recv, +TRACE_EVENT(xs_stream_read_request, TP_PROTO(struct sock_xprt *xs), TP_ARGS(xs), @@ -508,7 +508,7 @@ TRACE_EVENT(xs_tcp_data_recv, __field(u32, xid) __field(unsigned long, copied) __field(unsigned int, reclen) - __field(unsigned long, offset) + __field(unsigned int, offset) ), TP_fast_assign( @@ -520,7 +520,7 @@ TRACE_EVENT(xs_tcp_data_recv, __entry->offset = xs->recv.offset; ), - TP_printk("peer=[%s]:%s xid=0x%08x copied=%lu reclen=%u offset=%lu", + TP_printk("peer=[%s]:%s xid=0x%08x copied=%lu reclen=%u offset=%u", __get_str(addr), __get_str(port), __entry->xid, __entry->copied, __entry->reclen, __entry->offset) ); diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 06aa75008708..55df1fadab27 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -626,7 +626,7 @@ xs_read_stream(struct sock_xprt *transport, int flags) return -EAGAIN; } if (xs_read_stream_request_done(transport)) { - trace_xs_tcp_data_recv(transport); + trace_xs_stream_read_request(transport); transport->recv.copied = 0; } transport->recv.offset = 0; @@ -642,6 +642,34 @@ out_err: return ret; } +static void xs_stream_data_receive(struct sock_xprt *transport) +{ + size_t read = 0; + ssize_t ret = 0; + + mutex_lock(&transport->recv_mutex); + if (transport->sock == NULL) + goto out; + clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state); + for (;;) { + ret = xs_read_stream(transport, MSG_DONTWAIT); + if (ret <= 0) + break; + read += ret; + cond_resched(); + } +out: + mutex_unlock(&transport->recv_mutex); + trace_xs_stream_read_data(&transport->xprt, ret, read); +} + +static void xs_stream_data_receive_workfn(struct work_struct *work) +{ + struct sock_xprt *transport = + container_of(work, struct sock_xprt, recv_worker); + xs_stream_data_receive(transport); +} + #define XS_SENDMSG_FLAGS (MSG_DONTWAIT | MSG_NOSIGNAL) static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more) @@ -1498,45 +1526,6 @@ static size_t xs_tcp_bc_maxpayload(struct rpc_xprt *xprt) } #endif /* CONFIG_SUNRPC_BACKCHANNEL */ -static void xs_tcp_data_receive(struct sock_xprt *transport) -{ - struct rpc_xprt *xprt = &transport->xprt; - struct sock *sk; - size_t read = 0; - ssize_t ret = 0; - -restart: - mutex_lock(&transport->recv_mutex); - sk = transport->inet; - if (sk == NULL) - goto out; - - for (;;) { - clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state); - ret = xs_read_stream(transport, MSG_DONTWAIT | MSG_NOSIGNAL); - if (ret < 0) - break; - read += ret; - if (need_resched()) { - mutex_unlock(&transport->recv_mutex); - cond_resched(); - goto restart; - } - } - if (test_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) - queue_work(xprtiod_workqueue, &transport->recv_worker); -out: - mutex_unlock(&transport->recv_mutex); - trace_xs_tcp_data_ready(xprt, ret, read); -} - -static void xs_tcp_data_receive_workfn(struct work_struct *work) -{ - struct sock_xprt *transport = - container_of(work, struct sock_xprt, recv_worker); - xs_tcp_data_receive(transport); -} - /** * xs_tcp_state_change - callback to handle TCP socket state changes * @sk: socket whose state has changed @@ -3066,7 +3055,7 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args) xprt->connect_timeout = xprt->timeout->to_initval * (xprt->timeout->to_retries + 1); - INIT_WORK(&transport->recv_worker, xs_tcp_data_receive_workfn); + INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn); INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_setup_socket); switch (addr->sa_family) { -- cgit v1.2.3-55-g7522 From 550aebfe1c573518c35ae85d6ffbdc2d44c92703 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Fri, 14 Sep 2018 14:32:45 -0400 Subject: SUNRPC: Allow AF_LOCAL sockets to use the generic stream receive Signed-off-by: Trond Myklebust --- include/linux/sunrpc/xdr.h | 1 - net/sunrpc/socklib.c | 4 +- net/sunrpc/xprtsock.c | 137 ++++++--------------------------------------- 3 files changed, 18 insertions(+), 124 deletions(-) (limited to 'net/sunrpc/xprtsock.c') diff --git a/include/linux/sunrpc/xdr.h b/include/linux/sunrpc/xdr.h index 745587132a87..8815be7cae72 100644 --- a/include/linux/sunrpc/xdr.h +++ b/include/linux/sunrpc/xdr.h @@ -185,7 +185,6 @@ struct xdr_skb_reader { typedef size_t (*xdr_skb_read_actor)(struct xdr_skb_reader *desc, void *to, size_t len); -size_t xdr_skb_read_bits(struct xdr_skb_reader *desc, void *to, size_t len); extern int csum_partial_copy_to_xdr(struct xdr_buf *, struct sk_buff *); extern ssize_t xdr_partial_copy_from_skb(struct xdr_buf *, unsigned int, struct xdr_skb_reader *, xdr_skb_read_actor); diff --git a/net/sunrpc/socklib.c b/net/sunrpc/socklib.c index 08f00a98151f..0e7c0dee7578 100644 --- a/net/sunrpc/socklib.c +++ b/net/sunrpc/socklib.c @@ -26,7 +26,8 @@ * Possibly called several times to iterate over an sk_buff and copy * data out of it. */ -size_t xdr_skb_read_bits(struct xdr_skb_reader *desc, void *to, size_t len) +static size_t +xdr_skb_read_bits(struct xdr_skb_reader *desc, void *to, size_t len) { if (len > desc->count) len = desc->count; @@ -36,7 +37,6 @@ size_t xdr_skb_read_bits(struct xdr_skb_reader *desc, void *to, size_t len) desc->offset += len; return len; } -EXPORT_SYMBOL_GPL(xdr_skb_read_bits); /** * xdr_skb_read_and_csum_bits - copy and checksum from skb to buffer diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 55df1fadab27..90d4c92177b7 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -670,6 +670,17 @@ static void xs_stream_data_receive_workfn(struct work_struct *work) xs_stream_data_receive(transport); } +static void +xs_stream_reset_connect(struct sock_xprt *transport) +{ + transport->recv.offset = 0; + transport->recv.len = 0; + transport->recv.copied = 0; + transport->xmit.offset = 0; + transport->xprt.stat.connect_count++; + transport->xprt.stat.connect_start = jiffies; +} + #define XS_SENDMSG_FLAGS (MSG_DONTWAIT | MSG_NOSIGNAL) static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more) @@ -1266,114 +1277,6 @@ static void xs_destroy(struct rpc_xprt *xprt) module_put(THIS_MODULE); } -static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb) -{ - struct xdr_skb_reader desc = { - .skb = skb, - .offset = sizeof(rpc_fraghdr), - .count = skb->len - sizeof(rpc_fraghdr), - }; - - if (xdr_partial_copy_from_skb(xdr, 0, &desc, xdr_skb_read_bits) < 0) - return -1; - if (desc.count) - return -1; - return 0; -} - -/** - * xs_local_data_read_skb - * @xprt: transport - * @sk: socket - * @skb: skbuff - * - * Currently this assumes we can read the whole reply in a single gulp. - */ -static void xs_local_data_read_skb(struct rpc_xprt *xprt, - struct sock *sk, - struct sk_buff *skb) -{ - struct rpc_task *task; - struct rpc_rqst *rovr; - int repsize, copied; - u32 _xid; - __be32 *xp; - - repsize = skb->len - sizeof(rpc_fraghdr); - if (repsize < 4) { - dprintk("RPC: impossible RPC reply size %d\n", repsize); - return; - } - - /* Copy the XID from the skb... */ - xp = skb_header_pointer(skb, sizeof(rpc_fraghdr), sizeof(_xid), &_xid); - if (xp == NULL) - return; - - /* Look up and lock the request corresponding to the given XID */ - spin_lock(&xprt->queue_lock); - rovr = xprt_lookup_rqst(xprt, *xp); - if (!rovr) - goto out_unlock; - xprt_pin_rqst(rovr); - spin_unlock(&xprt->queue_lock); - task = rovr->rq_task; - - copied = rovr->rq_private_buf.buflen; - if (copied > repsize) - copied = repsize; - - if (xs_local_copy_to_xdr(&rovr->rq_private_buf, skb)) { - dprintk("RPC: sk_buff copy failed\n"); - spin_lock(&xprt->queue_lock); - goto out_unpin; - } - - spin_lock(&xprt->queue_lock); - xprt_complete_rqst(task, copied); -out_unpin: - xprt_unpin_rqst(rovr); - out_unlock: - spin_unlock(&xprt->queue_lock); -} - -static void xs_local_data_receive(struct sock_xprt *transport) -{ - struct sk_buff *skb; - struct sock *sk; - int err; - -restart: - mutex_lock(&transport->recv_mutex); - sk = transport->inet; - if (sk == NULL) - goto out; - for (;;) { - skb = skb_recv_datagram(sk, 0, 1, &err); - if (skb != NULL) { - xs_local_data_read_skb(&transport->xprt, sk, skb); - skb_free_datagram(sk, skb); - continue; - } - if (!test_and_clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) - break; - if (need_resched()) { - mutex_unlock(&transport->recv_mutex); - cond_resched(); - goto restart; - } - } -out: - mutex_unlock(&transport->recv_mutex); -} - -static void xs_local_data_receive_workfn(struct work_struct *work) -{ - struct sock_xprt *transport = - container_of(work, struct sock_xprt, recv_worker); - xs_local_data_receive(transport); -} - /** * xs_udp_data_read_skb - receive callback for UDP sockets * @xprt: transport @@ -1974,11 +1877,8 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt, write_unlock_bh(&sk->sk_callback_lock); } - transport->xmit.offset = 0; + xs_stream_reset_connect(transport); - /* Tell the socket layer to start connecting... */ - xprt->stat.connect_count++; - xprt->stat.connect_start = jiffies; return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, 0); } @@ -2335,14 +2235,9 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) xs_set_memalloc(xprt); /* Reset TCP record info */ - transport->recv.offset = 0; - transport->recv.len = 0; - transport->recv.copied = 0; - transport->xmit.offset = 0; + xs_stream_reset_connect(transport); /* Tell the socket layer to start connecting... */ - xprt->stat.connect_count++; - xprt->stat.connect_start = jiffies; set_bit(XPRT_SOCK_CONNECTING, &transport->sock_state); ret = kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK); switch (ret) { @@ -2717,6 +2612,7 @@ static const struct rpc_xprt_ops xs_local_ops = { .connect = xs_local_connect, .buf_alloc = rpc_malloc, .buf_free = rpc_free, + .prepare_request = xs_stream_prepare_request, .send_request = xs_local_send_request, .set_retrans_timeout = xprt_set_retrans_timeout_def, .close = xs_close, @@ -2901,9 +2797,8 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args) xprt->ops = &xs_local_ops; xprt->timeout = &xs_local_default_timeout; - INIT_WORK(&transport->recv_worker, xs_local_data_receive_workfn); - INIT_DELAYED_WORK(&transport->connect_worker, - xs_dummy_setup_socket); + INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn); + INIT_DELAYED_WORK(&transport->connect_worker, xs_dummy_setup_socket); switch (sun->sun_family) { case AF_LOCAL: -- cgit v1.2.3-55-g7522 From 4f546149755b4dec431bec236a9116a74384c7a7 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Fri, 14 Sep 2018 17:45:23 -0400 Subject: SUNRPC: Clean up xs_udp_data_receive() Simplify the retry logic. Signed-off-by: Trond Myklebust --- net/sunrpc/xprtsock.c | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) (limited to 'net/sunrpc/xprtsock.c') diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 90d4c92177b7..039444eb138f 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -1344,25 +1344,18 @@ static void xs_udp_data_receive(struct sock_xprt *transport) struct sock *sk; int err; -restart: mutex_lock(&transport->recv_mutex); sk = transport->inet; if (sk == NULL) goto out; + clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state); for (;;) { skb = skb_recv_udp(sk, 0, 1, &err); - if (skb != NULL) { - xs_udp_data_read_skb(&transport->xprt, sk, skb); - consume_skb(skb); - continue; - } - if (!test_and_clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) + if (skb == NULL) break; - if (need_resched()) { - mutex_unlock(&transport->recv_mutex); - cond_resched(); - goto restart; - } + xs_udp_data_read_skb(&transport->xprt, sk, skb); + consume_skb(skb); + cond_resched(); } out: mutex_unlock(&transport->recv_mutex); -- cgit v1.2.3-55-g7522 From 3968a8a5310404c2f0b9e4d9f28cab13a12bc4fd Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Mon, 1 Oct 2018 14:25:36 -0400 Subject: sunrpc: Fix connect metrics For TCP, the logic in xprt_connect_status is currently never invoked to record a successful connection. Commit 2a4919919a97 ("SUNRPC: Return EAGAIN instead of ENOTCONN when waking up xprt->pending") changed the way TCP xprt's are awoken after a connect succeeds. Instead, change connection-oriented transports to bump connect_count and compute connect_time the moment that XPRT_CONNECTED is set. Signed-off-by: Chuck Lever Signed-off-by: Anna Schumaker --- net/sunrpc/xprt.c | 14 ++++---------- net/sunrpc/xprtrdma/transport.c | 6 +++++- net/sunrpc/xprtsock.c | 10 ++++++---- 3 files changed, 15 insertions(+), 15 deletions(-) (limited to 'net/sunrpc/xprtsock.c') diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index a8db2e3f8904..93c7a2f4a266 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -789,17 +789,11 @@ void xprt_connect(struct rpc_task *task) static void xprt_connect_status(struct rpc_task *task) { - struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; - - if (task->tk_status == 0) { - xprt->stat.connect_count++; - xprt->stat.connect_time += (long)jiffies - xprt->stat.connect_start; + switch (task->tk_status) { + case 0: dprintk("RPC: %5u xprt_connect_status: connection established\n", task->tk_pid); - return; - } - - switch (task->tk_status) { + break; case -ECONNREFUSED: case -ECONNRESET: case -ECONNABORTED: @@ -816,7 +810,7 @@ static void xprt_connect_status(struct rpc_task *task) default: dprintk("RPC: %5u xprt_connect_status: error %d connecting to " "server %s\n", task->tk_pid, -task->tk_status, - xprt->servername); + task->tk_rqstp->rq_xprt->servername); task->tk_status = -EIO; } } diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index 3ae73e6a5c93..087acfce142a 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -242,8 +242,12 @@ rpcrdma_connect_worker(struct work_struct *work) spin_lock_bh(&xprt->transport_lock); if (ep->rep_connected > 0) { - if (!xprt_test_and_set_connected(xprt)) + if (!xprt_test_and_set_connected(xprt)) { + xprt->stat.connect_count++; + xprt->stat.connect_time += (long)jiffies - + xprt->stat.connect_start; xprt_wake_pending_tasks(xprt, 0); + } } else { if (xprt_test_and_clear_connected(xprt)) xprt_wake_pending_tasks(xprt, -ENOTCONN); diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 6b7539c0466e..e146caacc494 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -1611,6 +1611,9 @@ static void xs_tcp_state_change(struct sock *sk) clear_bit(XPRT_SOCK_CONNECTING, &transport->sock_state); xprt_clear_connecting(xprt); + xprt->stat.connect_count++; + xprt->stat.connect_time += (long)jiffies - + xprt->stat.connect_start; xprt_wake_pending_tasks(xprt, -EAGAIN); } spin_unlock(&xprt->transport_lock); @@ -2029,8 +2032,6 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt, } /* Tell the socket layer to start connecting... */ - xprt->stat.connect_count++; - xprt->stat.connect_start = jiffies; return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, 0); } @@ -2062,6 +2063,9 @@ static int xs_local_setup_socket(struct sock_xprt *transport) case 0: dprintk("RPC: xprt %p connected to %s\n", xprt, xprt->address_strings[RPC_DISPLAY_ADDR]); + xprt->stat.connect_count++; + xprt->stat.connect_time += (long)jiffies - + xprt->stat.connect_start; xprt_set_connected(xprt); case -ENOBUFS: break; @@ -2387,8 +2391,6 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) xs_set_memalloc(xprt); /* Tell the socket layer to start connecting... */ - xprt->stat.connect_count++; - xprt->stat.connect_start = jiffies; set_bit(XPRT_SOCK_CONNECTING, &transport->sock_state); ret = kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK); switch (ret) { -- cgit v1.2.3-55-g7522 From 8440a886112b46a8b402679dca9d8b5662a0d73e Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Mon, 1 Oct 2018 14:25:41 -0400 Subject: sunrpc: Report connect_time in seconds The way connection-oriented transports report connect_time is wrong: it's supposed to be in seconds, not in jiffies. Signed-off-by: Chuck Lever Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/transport.c | 2 +- net/sunrpc/xprtsock.c | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'net/sunrpc/xprtsock.c') diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index 087acfce142a..289d13cad638 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -776,7 +776,7 @@ void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) 0, /* need a local port? */ xprt->stat.bind_count, xprt->stat.connect_count, - xprt->stat.connect_time, + xprt->stat.connect_time / HZ, idle_time, xprt->stat.sends, xprt->stat.recvs, diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index e146caacc494..9bbc395cfd55 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -2563,7 +2563,7 @@ static void xs_local_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) "%llu %llu %lu %llu %llu\n", xprt->stat.bind_count, xprt->stat.connect_count, - xprt->stat.connect_time, + xprt->stat.connect_time / HZ, idle_time, xprt->stat.sends, xprt->stat.recvs, @@ -2618,7 +2618,7 @@ static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) transport->srcport, xprt->stat.bind_count, xprt->stat.connect_count, - xprt->stat.connect_time, + xprt->stat.connect_time / HZ, idle_time, xprt->stat.sends, xprt->stat.recvs, -- cgit v1.2.3-55-g7522 From 826799e66e8683e5698e140bb9ef69afc8c0014e Mon Sep 17 00:00:00 2001 From: J. Bruce Fields Date: Thu, 18 Oct 2018 15:27:02 -0400 Subject: sunrpc: safely reallow resvport min/max inversion Commits ffb6ca33b04b and e08ea3a96fc7 prevent setting xprt_min_resvport greater than xprt_max_resvport, but may also break simple code that sets one parameter then the other, if the new range does not overlap the old. Also it looks racy to me, unless there's some serialization I'm not seeing. Granted it would probably require malicious privileged processes (unless there's a chance these might eventually be settable in unprivileged containers), but still it seems better not to let userspace panic the kernel. Simpler seems to be to allow setting the parameters to whatever you want but interpret xprt_min_resvport > xprt_max_resvport as the empty range. Fixes: ffb6ca33b04b "sunrpc: Prevent resvport min/max inversion..." Fixes: e08ea3a96fc7 "sunrpc: Prevent rexvport min/max inversion..." Signed-off-by: J. Bruce Fields Signed-off-by: Trond Myklebust --- net/sunrpc/xprtsock.c | 34 ++++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) (limited to 'net/sunrpc/xprtsock.c') diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 039444eb138f..9bb86cd3ee56 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -129,7 +129,7 @@ static struct ctl_table xs_tunables_table[] = { .mode = 0644, .proc_handler = proc_dointvec_minmax, .extra1 = &xprt_min_resvport_limit, - .extra2 = &xprt_max_resvport + .extra2 = &xprt_max_resvport_limit }, { .procname = "max_resvport", @@ -137,7 +137,7 @@ static struct ctl_table xs_tunables_table[] = { .maxlen = sizeof(unsigned int), .mode = 0644, .proc_handler = proc_dointvec_minmax, - .extra1 = &xprt_min_resvport, + .extra1 = &xprt_min_resvport_limit, .extra2 = &xprt_max_resvport_limit }, { @@ -1615,11 +1615,17 @@ static void xs_udp_timer(struct rpc_xprt *xprt, struct rpc_task *task) spin_unlock_bh(&xprt->transport_lock); } -static unsigned short xs_get_random_port(void) +static int xs_get_random_port(void) { - unsigned short range = xprt_max_resvport - xprt_min_resvport + 1; - unsigned short rand = (unsigned short) prandom_u32() % range; - return rand + xprt_min_resvport; + unsigned short min = xprt_min_resvport, max = xprt_max_resvport; + unsigned short range; + unsigned short rand; + + if (max < min) + return -EADDRINUSE; + range = max - min + 1; + rand = (unsigned short) prandom_u32() % range; + return rand + min; } /** @@ -1675,9 +1681,9 @@ static void xs_set_srcport(struct sock_xprt *transport, struct socket *sock) transport->srcport = xs_sock_getport(sock); } -static unsigned short xs_get_srcport(struct sock_xprt *transport) +static int xs_get_srcport(struct sock_xprt *transport) { - unsigned short port = transport->srcport; + int port = transport->srcport; if (port == 0 && transport->xprt.resvport) port = xs_get_random_port(); @@ -1698,7 +1704,7 @@ static int xs_bind(struct sock_xprt *transport, struct socket *sock) { struct sockaddr_storage myaddr; int err, nloop = 0; - unsigned short port = xs_get_srcport(transport); + int port = xs_get_srcport(transport); unsigned short last; /* @@ -1716,8 +1722,8 @@ static int xs_bind(struct sock_xprt *transport, struct socket *sock) * transport->xprt.resvport == 1) xs_get_srcport above will * ensure that port is non-zero and we will bind as needed. */ - if (port == 0) - return 0; + if (port <= 0) + return port; memcpy(&myaddr, &transport->srcaddr, transport->xprt.addrlen); do { @@ -3154,12 +3160,8 @@ static int param_set_uint_minmax(const char *val, static int param_set_portnr(const char *val, const struct kernel_param *kp) { - if (kp->arg == &xprt_min_resvport) - return param_set_uint_minmax(val, kp, - RPC_MIN_RESVPORT, - xprt_max_resvport); return param_set_uint_minmax(val, kp, - xprt_min_resvport, + RPC_MIN_RESVPORT, RPC_MAX_RESVPORT); } -- cgit v1.2.3-55-g7522