summaryrefslogtreecommitdiffstats
path: root/net/sunrpc/xprtrdma/verbs.c
diff options
context:
space:
mode:
authorChuck Lever2015-10-24 23:27:10 +0200
committerAnna Schumaker2015-11-02 19:45:15 +0100
commitfe97b47cd623ebbaa55a163c336abc47153526d1 (patch)
treed936169799d99d924f92b385d1af31d62d083b90 /net/sunrpc/xprtrdma/verbs.c
parentxprtrdma: Replace send and receive arrays (diff)
downloadkernel-qcow2-linux-fe97b47cd623ebbaa55a163c336abc47153526d1.tar.gz
kernel-qcow2-linux-fe97b47cd623ebbaa55a163c336abc47153526d1.tar.xz
kernel-qcow2-linux-fe97b47cd623ebbaa55a163c336abc47153526d1.zip
xprtrdma: Use workqueue to process RPC/RDMA replies
The reply tasklet is fast, but it's single threaded. After reply traffic saturates a single CPU, there's no more reply processing capacity. Replace the tasklet with a workqueue to spread reply handling across all CPUs. This also moves RPC/RDMA reply handling out of the soft IRQ context and into a context that allows sleeps. Signed-off-by: Chuck Lever <chuck.lever@oracle.com> Reviewed-by: Sagi Grimberg <sagig@mellanox.com> Tested-By: Devesh Sharma <devesh.sharma@avagotech.com> Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
Diffstat (limited to 'net/sunrpc/xprtrdma/verbs.c')
-rw-r--r--net/sunrpc/xprtrdma/verbs.c54
1 files changed, 44 insertions, 10 deletions
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index c09f1b6c3f0a..5c20629544bb 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -100,6 +100,35 @@ rpcrdma_run_tasklet(unsigned long data)
static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
+static struct workqueue_struct *rpcrdma_receive_wq;
+
+int
+rpcrdma_alloc_wq(void)
+{
+ struct workqueue_struct *recv_wq;
+
+ recv_wq = alloc_workqueue("xprtrdma_receive",
+ WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI,
+ 0);
+ if (!recv_wq)
+ return -ENOMEM;
+
+ rpcrdma_receive_wq = recv_wq;
+ return 0;
+}
+
+void
+rpcrdma_destroy_wq(void)
+{
+ struct workqueue_struct *wq;
+
+ if (rpcrdma_receive_wq) {
+ wq = rpcrdma_receive_wq;
+ rpcrdma_receive_wq = NULL;
+ destroy_workqueue(wq);
+ }
+}
+
static void
rpcrdma_schedule_tasklet(struct list_head *sched_list)
{
@@ -196,7 +225,16 @@ rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
}
static void
-rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
+rpcrdma_receive_worker(struct work_struct *work)
+{
+ struct rpcrdma_rep *rep =
+ container_of(work, struct rpcrdma_rep, rr_work);
+
+ rpcrdma_reply_handler(rep);
+}
+
+static void
+rpcrdma_recvcq_process_wc(struct ib_wc *wc)
{
struct rpcrdma_rep *rep =
(struct rpcrdma_rep *)(unsigned long)wc->wr_id;
@@ -219,8 +257,9 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
prefetch(rdmab_to_msg(rep->rr_rdmabuf));
out_schedule:
- list_add_tail(&rep->rr_list, sched_list);
+ queue_work(rpcrdma_receive_wq, &rep->rr_work);
return;
+
out_fail:
if (wc->status != IB_WC_WR_FLUSH_ERR)
pr_err("RPC: %s: rep %p: %s\n",
@@ -239,7 +278,6 @@ static void
rpcrdma_recvcq_poll(struct ib_cq *cq)
{
struct ib_wc *pos, wcs[4];
- LIST_HEAD(sched_list);
int count, rc;
do {
@@ -251,10 +289,8 @@ rpcrdma_recvcq_poll(struct ib_cq *cq)
count = rc;
while (count-- > 0)
- rpcrdma_recvcq_process_wc(pos++, &sched_list);
+ rpcrdma_recvcq_process_wc(pos++);
} while (rc == ARRAY_SIZE(wcs));
-
- rpcrdma_schedule_tasklet(&sched_list);
}
/* Handle provider receive completion upcalls.
@@ -272,12 +308,9 @@ static void
rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
{
struct ib_wc wc;
- LIST_HEAD(sched_list);
while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
- rpcrdma_recvcq_process_wc(&wc, &sched_list);
- if (!list_empty(&sched_list))
- rpcrdma_schedule_tasklet(&sched_list);
+ rpcrdma_recvcq_process_wc(&wc);
while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
rpcrdma_sendcq_process_wc(&wc);
}
@@ -913,6 +946,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
rep->rr_device = ia->ri_device;
rep->rr_rxprt = r_xprt;
+ INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
return rep;
out_free: