summaryrefslogtreecommitdiffstats
path: root/util/thread-pool.c
diff options
context:
space:
mode:
authorRichard Henderson2022-05-25 22:46:29 +0200
committerRichard Henderson2022-05-25 22:46:29 +0200
commit58b53669e87fed0d70903e05cd42079fbbdbc195 (patch)
tree1bf867fe078f882241f12180c30a8530e1e9163a /util/thread-pool.c
parentMerge tag 'pull-aspeed-20220525' of https://github.com/legoater/qemu into sta... (diff)
parenti386: docs: Convert hyperv.txt to rST (diff)
downloadqemu-58b53669e87fed0d70903e05cd42079fbbdbc195.tar.gz
qemu-58b53669e87fed0d70903e05cd42079fbbdbc195.tar.xz
qemu-58b53669e87fed0d70903e05cd42079fbbdbc195.zip
Merge tag 'for-upstream' of https://gitlab.com/bonzini/qemu into staging
* ac97 cleanups (Zoltan) * default the amount of prealloc-threads to smp-cpus (Jaroslav) * fix disabling MPX on "-cpu host" with MPX-capable host (Maciej) * thread-pool performance optimizations (myself) * Hyper-V enlightenment enabling and docs (Vitaly) * check ELF header in elf2dmp (Viktor) * tweak LBREn migration (Weijiang) # -----BEGIN PGP SIGNATURE----- # # iQFIBAABCAAyFiEE8TM4V0tmI4mGbHaCv/vSX3jHroMFAmKOgwgUHHBib256aW5p # QHJlZGhhdC5jb20ACgkQv/vSX3jHroOO3Qf7Btcvr2ex9qZ1yThlmZ6hl20WvQZe # GlKBq5xJnx2FUpvrH/AiNl2qfiBN5emhzJp1oBieQusDDsWVblmRpWgzUkUZvh0H # s5rKsNuOPdhqaxLH4sRCXS2FCVOy81d+lc9yYe5bzy3EHDO/qzMjye+JoBhXtQve # 3gOcOb1srIB/xSGNur2iCJkcauhBOipOo77kryfWekfReA3glHGnwhuEO+F+gXT3 # hiEO6TuRHjVrVCExbsDJb2pV2sSH6FxOP09BZ84IT0puv/FfgnUGCiNVfVNmMgNq # KYysG7vPlRSaDX17bt3UlS4Y6yKb1vZpnvymRRkWxWLIfuAVVNm0vgHBpg== # =gX2j # -----END PGP SIGNATURE----- # gpg: Signature made Wed 25 May 2022 12:27:04 PM PDT # gpg: using RSA key F13338574B662389866C7682BFFBD25F78C7AE83 # gpg: issuer "pbonzini@redhat.com" # gpg: Good signature from "Paolo Bonzini <bonzini@gnu.org>" [undefined] # gpg: aka "Paolo Bonzini <pbonzini@redhat.com>" [undefined] # gpg: WARNING: This key is not certified with a trusted signature! # gpg: There is no indication that the signature belongs to the owner. # Primary key fingerprint: 46F5 9FBD 57D6 12E7 BFD4 E2F7 7E15 100C CD36 69B1 # Subkey fingerprint: F133 3857 4B66 2389 866C 7682 BFFB D25F 78C7 AE83 * tag 'for-upstream' of https://gitlab.com/bonzini/qemu: i386: docs: Convert hyperv.txt to rST i386: Hyper-V Direct TLB flush hypercall i386: Hyper-V Support extended GVA ranges for TLB flush hypercalls i386: Hyper-V XMM fast hypercall input feature i386: Hyper-V Enlightened MSR bitmap feature i386: Use hv_build_cpuid_leaf() for HV_CPUID_NESTED_FEATURES ide_ioport_read: Return lower octet of data register instead of 0xFF target/i386/kvm: Fix disabling MPX on "-cpu host" with MPX-capable host hw/audio/ac97: Remove unneeded local variables hw/audio/ac97: Remove unimplemented reset functions hw/audio/ac97: Coding style fixes to avoid checkpatch errors contrib/elf2dmp: add ELF dump header checking thread-pool: remove stopping variable thread-pool: replace semaphore with condition variable thread-pool: optimize scheduling of completion bottom half hostmem: default the amount of prealloc-threads to smp-cpus target/i386: Remove LBREn bit check when access Arch LBR MSRs Signed-off-by: Richard Henderson <richard.henderson@linaro.org>
Diffstat (limited to 'util/thread-pool.c')
-rw-r--r--util/thread-pool.c74
1 files changed, 30 insertions, 44 deletions
diff --git a/util/thread-pool.c b/util/thread-pool.c
index 196835b4d3..31113b5860 100644
--- a/util/thread-pool.c
+++ b/util/thread-pool.c
@@ -57,7 +57,7 @@ struct ThreadPool {
QEMUBH *completion_bh;
QemuMutex lock;
QemuCond worker_stopped;
- QemuSemaphore sem;
+ QemuCond request_cond;
QEMUBH *new_thread_bh;
/* The following variables are only accessed from one AioContext. */
@@ -69,28 +69,10 @@ struct ThreadPool {
int idle_threads;
int new_threads; /* backlog of threads we need to create */
int pending_threads; /* threads created but not running yet */
- bool stopping;
int min_threads;
int max_threads;
};
-static inline bool back_to_sleep(ThreadPool *pool, int ret)
-{
- /*
- * The semaphore timed out, we should exit the loop except when:
- * - There is work to do, we raced with the signal.
- * - The max threads threshold just changed, we raced with the signal.
- * - The thread pool forces a minimum number of readily available threads.
- */
- if (ret == -1 && (!QTAILQ_EMPTY(&pool->request_list) ||
- pool->cur_threads > pool->max_threads ||
- pool->cur_threads <= pool->min_threads)) {
- return true;
- }
-
- return false;
-}
-
static void *worker_thread(void *opaque)
{
ThreadPool *pool = opaque;
@@ -99,20 +81,25 @@ static void *worker_thread(void *opaque)
pool->pending_threads--;
do_spawn_thread(pool);
- while (!pool->stopping) {
+ while (pool->cur_threads <= pool->max_threads) {
ThreadPoolElement *req;
int ret;
- do {
+ if (QTAILQ_EMPTY(&pool->request_list)) {
pool->idle_threads++;
- qemu_mutex_unlock(&pool->lock);
- ret = qemu_sem_timedwait(&pool->sem, 10000);
- qemu_mutex_lock(&pool->lock);
+ ret = qemu_cond_timedwait(&pool->request_cond, &pool->lock, 10000);
pool->idle_threads--;
- } while (back_to_sleep(pool, ret));
- if (ret == -1 || pool->stopping ||
- pool->cur_threads > pool->max_threads) {
- break;
+ if (ret == 0 &&
+ QTAILQ_EMPTY(&pool->request_list) &&
+ pool->cur_threads > pool->min_threads) {
+ /* Timed out + no work to do + no need for warm threads = exit. */
+ break;
+ }
+ /*
+ * Even if there was some work to do, check if there aren't
+ * too many worker threads before picking it up.
+ */
+ continue;
}
req = QTAILQ_FIRST(&pool->request_list);
@@ -127,14 +114,19 @@ static void *worker_thread(void *opaque)
smp_wmb();
req->state = THREAD_DONE;
- qemu_mutex_lock(&pool->lock);
-
qemu_bh_schedule(pool->completion_bh);
+ qemu_mutex_lock(&pool->lock);
}
pool->cur_threads--;
qemu_cond_signal(&pool->worker_stopped);
qemu_mutex_unlock(&pool->lock);
+
+ /*
+ * Wake up another thread, in case we got a wakeup but decided
+ * to exit due to pool->cur_threads > pool->max_threads.
+ */
+ qemu_cond_signal(&pool->request_cond);
return NULL;
}
@@ -230,13 +222,7 @@ static void thread_pool_cancel(BlockAIOCB *acb)
trace_thread_pool_cancel(elem, elem->common.opaque);
QEMU_LOCK_GUARD(&pool->lock);
- if (elem->state == THREAD_QUEUED &&
- /* No thread has yet started working on elem. we can try to "steal"
- * the item from the worker if we can get a signal from the
- * semaphore. Because this is non-blocking, we can do it with
- * the lock taken and ensure that elem will remain THREAD_QUEUED.
- */
- qemu_sem_timedwait(&pool->sem, 0) == 0) {
+ if (elem->state == THREAD_QUEUED) {
QTAILQ_REMOVE(&pool->request_list, elem, reqs);
qemu_bh_schedule(pool->completion_bh);
@@ -281,7 +267,7 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool,
}
QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
qemu_mutex_unlock(&pool->lock);
- qemu_sem_post(&pool->sem);
+ qemu_cond_signal(&pool->request_cond);
return &req->common;
}
@@ -324,7 +310,7 @@ void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
* We either have to:
* - Increase the number available of threads until over the min_threads
* threshold.
- * - Decrease the number of available threads until under the max_threads
+ * - Bump the worker threads so that they exit, until under the max_threads
* threshold.
* - Do nothing. The current number of threads fall in between the min and
* max thresholds. We'll let the pool manage itself.
@@ -334,7 +320,7 @@ void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
}
for (int i = pool->cur_threads; i > pool->max_threads; i--) {
- qemu_sem_post(&pool->sem);
+ qemu_cond_signal(&pool->request_cond);
}
qemu_mutex_unlock(&pool->lock);
@@ -351,7 +337,7 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
pool->completion_bh = aio_bh_new(ctx, thread_pool_completion_bh, pool);
qemu_mutex_init(&pool->lock);
qemu_cond_init(&pool->worker_stopped);
- qemu_sem_init(&pool->sem, 0);
+ qemu_cond_init(&pool->request_cond);
pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
QLIST_INIT(&pool->head);
@@ -383,16 +369,16 @@ void thread_pool_free(ThreadPool *pool)
pool->new_threads = 0;
/* Wait for worker threads to terminate */
- pool->stopping = true;
+ pool->max_threads = 0;
+ qemu_cond_broadcast(&pool->request_cond);
while (pool->cur_threads > 0) {
- qemu_sem_post(&pool->sem);
qemu_cond_wait(&pool->worker_stopped, &pool->lock);
}
qemu_mutex_unlock(&pool->lock);
qemu_bh_delete(pool->completion_bh);
- qemu_sem_destroy(&pool->sem);
+ qemu_cond_destroy(&pool->request_cond);
qemu_cond_destroy(&pool->worker_stopped);
qemu_mutex_destroy(&pool->lock);
g_free(pool);