summaryrefslogtreecommitdiffstats
path: root/fs/io_uring.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/io_uring.c')
-rw-r--r--fs/io_uring.c249
1 files changed, 242 insertions, 7 deletions
diff --git a/fs/io_uring.c b/fs/io_uring.c
index 5eaa1c4a3f7c..0a4caedf82c1 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -44,6 +44,7 @@
#include <linux/percpu.h>
#include <linux/slab.h>
#include <linux/workqueue.h>
+#include <linux/kthread.h>
#include <linux/blkdev.h>
#include <linux/bvec.h>
#include <linux/net.h>
@@ -108,12 +109,16 @@ struct io_ring_ctx {
unsigned cached_sq_head;
unsigned sq_entries;
unsigned sq_mask;
+ unsigned sq_thread_idle;
struct io_uring_sqe *sq_sqes;
} ____cacheline_aligned_in_smp;
/* IO offload */
struct workqueue_struct *sqo_wq;
+ struct task_struct *sqo_thread; /* if using sq thread polling */
struct mm_struct *sqo_mm;
+ wait_queue_head_t sqo_wait;
+ unsigned sqo_stop;
struct {
/* CQ ring */
@@ -168,6 +173,7 @@ struct sqe_submit {
unsigned short index;
bool has_user;
bool needs_lock;
+ bool needs_fixed_file;
};
struct io_kiocb {
@@ -327,6 +333,8 @@ static void io_cqring_add_event(struct io_ring_ctx *ctx, u64 ki_user_data,
if (waitqueue_active(&ctx->wait))
wake_up(&ctx->wait);
+ if (waitqueue_active(&ctx->sqo_wait))
+ wake_up(&ctx->sqo_wait);
}
static void io_ring_drop_ctx_refs(struct io_ring_ctx *ctx, unsigned refs)
@@ -680,9 +688,10 @@ static bool io_file_supports_async(struct file *file)
return false;
}
-static int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe,
+static int io_prep_rw(struct io_kiocb *req, const struct sqe_submit *s,
bool force_nonblock, struct io_submit_state *state)
{
+ const struct io_uring_sqe *sqe = s->sqe;
struct io_ring_ctx *ctx = req->ctx;
struct kiocb *kiocb = &req->rw;
unsigned ioprio, flags;
@@ -702,6 +711,8 @@ static int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe,
kiocb->ki_filp = ctx->user_files[fd];
req->flags |= REQ_F_FIXED_FILE;
} else {
+ if (s->needs_fixed_file)
+ return -EBADF;
kiocb->ki_filp = io_file_get(state, fd);
if (unlikely(!kiocb->ki_filp))
return -EBADF;
@@ -865,7 +876,7 @@ static ssize_t io_read(struct io_kiocb *req, const struct sqe_submit *s,
struct file *file;
ssize_t ret;
- ret = io_prep_rw(req, s->sqe, force_nonblock, state);
+ ret = io_prep_rw(req, s, force_nonblock, state);
if (ret)
return ret;
file = kiocb->ki_filp;
@@ -909,7 +920,7 @@ static ssize_t io_write(struct io_kiocb *req, const struct sqe_submit *s,
struct file *file;
ssize_t ret;
- ret = io_prep_rw(req, s->sqe, force_nonblock, state);
+ ret = io_prep_rw(req, s, force_nonblock, state);
if (ret)
return ret;
/* Hold on to the file for -EAGAIN */
@@ -1301,6 +1312,169 @@ static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
return false;
}
+static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
+ unsigned int nr, bool has_user, bool mm_fault)
+{
+ struct io_submit_state state, *statep = NULL;
+ int ret, i, submitted = 0;
+
+ if (nr > IO_PLUG_THRESHOLD) {
+ io_submit_state_start(&state, ctx, nr);
+ statep = &state;
+ }
+
+ for (i = 0; i < nr; i++) {
+ if (unlikely(mm_fault)) {
+ ret = -EFAULT;
+ } else {
+ sqes[i].has_user = has_user;
+ sqes[i].needs_lock = true;
+ sqes[i].needs_fixed_file = true;
+ ret = io_submit_sqe(ctx, &sqes[i], statep);
+ }
+ if (!ret) {
+ submitted++;
+ continue;
+ }
+
+ io_cqring_add_event(ctx, sqes[i].sqe->user_data, ret, 0);
+ }
+
+ if (statep)
+ io_submit_state_end(&state);
+
+ return submitted;
+}
+
+static int io_sq_thread(void *data)
+{
+ struct sqe_submit sqes[IO_IOPOLL_BATCH];
+ struct io_ring_ctx *ctx = data;
+ struct mm_struct *cur_mm = NULL;
+ mm_segment_t old_fs;
+ DEFINE_WAIT(wait);
+ unsigned inflight;
+ unsigned long timeout;
+
+ old_fs = get_fs();
+ set_fs(USER_DS);
+
+ timeout = inflight = 0;
+ while (!kthread_should_stop() && !ctx->sqo_stop) {
+ bool all_fixed, mm_fault = false;
+ int i;
+
+ if (inflight) {
+ unsigned nr_events = 0;
+
+ if (ctx->flags & IORING_SETUP_IOPOLL) {
+ /*
+ * We disallow the app entering submit/complete
+ * with polling, but we still need to lock the
+ * ring to prevent racing with polled issue
+ * that got punted to a workqueue.
+ */
+ mutex_lock(&ctx->uring_lock);
+ io_iopoll_check(ctx, &nr_events, 0);
+ mutex_unlock(&ctx->uring_lock);
+ } else {
+ /*
+ * Normal IO, just pretend everything completed.
+ * We don't have to poll completions for that.
+ */
+ nr_events = inflight;
+ }
+
+ inflight -= nr_events;
+ if (!inflight)
+ timeout = jiffies + ctx->sq_thread_idle;
+ }
+
+ if (!io_get_sqring(ctx, &sqes[0])) {
+ /*
+ * We're polling. If we're within the defined idle
+ * period, then let us spin without work before going
+ * to sleep.
+ */
+ if (inflight || !time_after(jiffies, timeout)) {
+ cpu_relax();
+ continue;
+ }
+
+ /*
+ * Drop cur_mm before scheduling, we can't hold it for
+ * long periods (or over schedule()). Do this before
+ * adding ourselves to the waitqueue, as the unuse/drop
+ * may sleep.
+ */
+ if (cur_mm) {
+ unuse_mm(cur_mm);
+ mmput(cur_mm);
+ cur_mm = NULL;
+ }
+
+ prepare_to_wait(&ctx->sqo_wait, &wait,
+ TASK_INTERRUPTIBLE);
+
+ /* Tell userspace we may need a wakeup call */
+ ctx->sq_ring->flags |= IORING_SQ_NEED_WAKEUP;
+ smp_wmb();
+
+ if (!io_get_sqring(ctx, &sqes[0])) {
+ if (kthread_should_stop()) {
+ finish_wait(&ctx->sqo_wait, &wait);
+ break;
+ }
+ if (signal_pending(current))
+ flush_signals(current);
+ schedule();
+ finish_wait(&ctx->sqo_wait, &wait);
+
+ ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
+ smp_wmb();
+ continue;
+ }
+ finish_wait(&ctx->sqo_wait, &wait);
+
+ ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
+ smp_wmb();
+ }
+
+ i = 0;
+ all_fixed = true;
+ do {
+ if (all_fixed && io_sqe_needs_user(sqes[i].sqe))
+ all_fixed = false;
+
+ i++;
+ if (i == ARRAY_SIZE(sqes))
+ break;
+ } while (io_get_sqring(ctx, &sqes[i]));
+
+ /* Unless all new commands are FIXED regions, grab mm */
+ if (!all_fixed && !cur_mm) {
+ mm_fault = !mmget_not_zero(ctx->sqo_mm);
+ if (!mm_fault) {
+ use_mm(ctx->sqo_mm);
+ cur_mm = ctx->sqo_mm;
+ }
+ }
+
+ inflight += io_submit_sqes(ctx, sqes, i, cur_mm != NULL,
+ mm_fault);
+
+ /* Commit SQ ring head once we've consumed all SQEs */
+ io_commit_sqring(ctx);
+ }
+
+ set_fs(old_fs);
+ if (cur_mm) {
+ unuse_mm(cur_mm);
+ mmput(cur_mm);
+ }
+ return 0;
+}
+
static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
{
struct io_submit_state state, *statep = NULL;
@@ -1319,6 +1493,7 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
s.has_user = true;
s.needs_lock = false;
+ s.needs_fixed_file = false;
ret = io_submit_sqe(ctx, &s, statep);
if (ret) {
@@ -1418,8 +1593,20 @@ static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
return 0;
}
+static void io_sq_thread_stop(struct io_ring_ctx *ctx)
+{
+ if (ctx->sqo_thread) {
+ ctx->sqo_stop = 1;
+ mb();
+ kthread_stop(ctx->sqo_thread);
+ ctx->sqo_thread = NULL;
+ }
+}
+
static void io_finish_async(struct io_ring_ctx *ctx)
{
+ io_sq_thread_stop(ctx);
+
if (ctx->sqo_wq) {
destroy_workqueue(ctx->sqo_wq);
ctx->sqo_wq = NULL;
@@ -1583,13 +1770,47 @@ static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
return ret;
}
-static int io_sq_offload_start(struct io_ring_ctx *ctx)
+static int io_sq_offload_start(struct io_ring_ctx *ctx,
+ struct io_uring_params *p)
{
int ret;
+ init_waitqueue_head(&ctx->sqo_wait);
mmgrab(current->mm);
ctx->sqo_mm = current->mm;
+ ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
+ if (!ctx->sq_thread_idle)
+ ctx->sq_thread_idle = HZ;
+
+ ret = -EINVAL;
+ if (!cpu_possible(p->sq_thread_cpu))
+ goto err;
+
+ if (ctx->flags & IORING_SETUP_SQPOLL) {
+ if (p->flags & IORING_SETUP_SQ_AFF) {
+ int cpu;
+
+ cpu = array_index_nospec(p->sq_thread_cpu, NR_CPUS);
+ ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread,
+ ctx, cpu,
+ "io_uring-sq");
+ } else {
+ ctx->sqo_thread = kthread_create(io_sq_thread, ctx,
+ "io_uring-sq");
+ }
+ if (IS_ERR(ctx->sqo_thread)) {
+ ret = PTR_ERR(ctx->sqo_thread);
+ ctx->sqo_thread = NULL;
+ goto err;
+ }
+ wake_up_process(ctx->sqo_thread);
+ } else if (p->flags & IORING_SETUP_SQ_AFF) {
+ /* Can't have SQ_AFF without SQPOLL */
+ ret = -EINVAL;
+ goto err;
+ }
+
/* Do QD, or 2 * CPUS, whatever is smallest */
ctx->sqo_wq = alloc_workqueue("io_ring-wq", WQ_UNBOUND | WQ_FREEZABLE,
min(ctx->sq_entries - 1, 2 * num_online_cpus()));
@@ -1600,6 +1821,7 @@ static int io_sq_offload_start(struct io_ring_ctx *ctx)
return 0;
err:
+ io_sq_thread_stop(ctx);
mmdrop(ctx->sqo_mm);
ctx->sqo_mm = NULL;
return ret;
@@ -1959,7 +2181,7 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
int submitted = 0;
struct fd f;
- if (flags & ~IORING_ENTER_GETEVENTS)
+ if (flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP))
return -EINVAL;
f = fdget(fd);
@@ -1975,6 +2197,18 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
if (!percpu_ref_tryget(&ctx->refs))
goto out_fput;
+ /*
+ * For SQ polling, the thread will do all submissions and completions.
+ * Just return the requested submit count, and wake the thread if
+ * we were asked to.
+ */
+ if (ctx->flags & IORING_SETUP_SQPOLL) {
+ if (flags & IORING_ENTER_SQ_WAKEUP)
+ wake_up(&ctx->sqo_wait);
+ submitted = to_submit;
+ goto out_ctx;
+ }
+
ret = 0;
if (to_submit) {
to_submit = min(to_submit, ctx->sq_entries);
@@ -2156,7 +2390,7 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p)
if (ret)
goto err;
- ret = io_sq_offload_start(ctx);
+ ret = io_sq_offload_start(ctx, p);
if (ret)
goto err;
@@ -2204,7 +2438,8 @@ static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
return -EINVAL;
}
- if (p.flags & ~IORING_SETUP_IOPOLL)
+ if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
+ IORING_SETUP_SQ_AFF))
return -EINVAL;
ret = io_uring_create(entries, &p);