summaryrefslogtreecommitdiffstats
path: root/migration/ram.c
diff options
context:
space:
mode:
Diffstat (limited to 'migration/ram.c')
-rw-r--r--migration/ram.c251
1 files changed, 141 insertions, 110 deletions
diff --git a/migration/ram.c b/migration/ram.c
index 42fb8ac6d6..815bc0e11a 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -253,8 +253,8 @@ static struct BitmapRcu {
} *migration_bitmap_rcu;
struct CompressParam {
- bool start;
bool done;
+ bool quit;
QEMUFile *file;
QemuMutex mutex;
QemuCond cond;
@@ -264,7 +264,8 @@ struct CompressParam {
typedef struct CompressParam CompressParam;
struct DecompressParam {
- bool start;
+ bool done;
+ bool quit;
QemuMutex mutex;
QemuCond cond;
void *des;
@@ -279,45 +280,47 @@ static QemuThread *compress_threads;
* one of the compression threads has finished the compression.
* comp_done_lock is used to co-work with comp_done_cond.
*/
-static QemuMutex *comp_done_lock;
-static QemuCond *comp_done_cond;
+static QemuMutex comp_done_lock;
+static QemuCond comp_done_cond;
/* The empty QEMUFileOps will be used by file in CompressParam */
static const QEMUFileOps empty_ops = { };
static bool compression_switch;
-static bool quit_comp_thread;
-static bool quit_decomp_thread;
static DecompressParam *decomp_param;
static QemuThread *decompress_threads;
+static QemuMutex decomp_done_lock;
+static QemuCond decomp_done_cond;
-static int do_compress_ram_page(CompressParam *param);
+static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
+ ram_addr_t offset);
static void *do_data_compress(void *opaque)
{
CompressParam *param = opaque;
+ RAMBlock *block;
+ ram_addr_t offset;
- while (!quit_comp_thread) {
- qemu_mutex_lock(&param->mutex);
- /* Re-check the quit_comp_thread in case of
- * terminate_compression_threads is called just before
- * qemu_mutex_lock(&param->mutex) and after
- * while(!quit_comp_thread), re-check it here can make
- * sure the compression thread terminate as expected.
- */
- while (!param->start && !quit_comp_thread) {
+ qemu_mutex_lock(&param->mutex);
+ while (!param->quit) {
+ if (param->block) {
+ block = param->block;
+ offset = param->offset;
+ param->block = NULL;
+ qemu_mutex_unlock(&param->mutex);
+
+ do_compress_ram_page(param->file, block, offset);
+
+ qemu_mutex_lock(&comp_done_lock);
+ param->done = true;
+ qemu_cond_signal(&comp_done_cond);
+ qemu_mutex_unlock(&comp_done_lock);
+
+ qemu_mutex_lock(&param->mutex);
+ } else {
qemu_cond_wait(&param->cond, &param->mutex);
}
- if (!quit_comp_thread) {
- do_compress_ram_page(param);
- }
- param->start = false;
- qemu_mutex_unlock(&param->mutex);
-
- qemu_mutex_lock(comp_done_lock);
- param->done = true;
- qemu_cond_signal(comp_done_cond);
- qemu_mutex_unlock(comp_done_lock);
}
+ qemu_mutex_unlock(&param->mutex);
return NULL;
}
@@ -327,9 +330,9 @@ static inline void terminate_compression_threads(void)
int idx, thread_count;
thread_count = migrate_compress_threads();
- quit_comp_thread = true;
for (idx = 0; idx < thread_count; idx++) {
qemu_mutex_lock(&comp_param[idx].mutex);
+ comp_param[idx].quit = true;
qemu_cond_signal(&comp_param[idx].cond);
qemu_mutex_unlock(&comp_param[idx].mutex);
}
@@ -350,16 +353,12 @@ void migrate_compress_threads_join(void)
qemu_mutex_destroy(&comp_param[i].mutex);
qemu_cond_destroy(&comp_param[i].cond);
}
- qemu_mutex_destroy(comp_done_lock);
- qemu_cond_destroy(comp_done_cond);
+ qemu_mutex_destroy(&comp_done_lock);
+ qemu_cond_destroy(&comp_done_cond);
g_free(compress_threads);
g_free(comp_param);
- g_free(comp_done_cond);
- g_free(comp_done_lock);
compress_threads = NULL;
comp_param = NULL;
- comp_done_cond = NULL;
- comp_done_lock = NULL;
}
void migrate_compress_threads_create(void)
@@ -369,21 +368,19 @@ void migrate_compress_threads_create(void)
if (!migrate_use_compression()) {
return;
}
- quit_comp_thread = false;
compression_switch = true;
thread_count = migrate_compress_threads();
compress_threads = g_new0(QemuThread, thread_count);
comp_param = g_new0(CompressParam, thread_count);
- comp_done_cond = g_new0(QemuCond, 1);
- comp_done_lock = g_new0(QemuMutex, 1);
- qemu_cond_init(comp_done_cond);
- qemu_mutex_init(comp_done_lock);
+ qemu_cond_init(&comp_done_cond);
+ qemu_mutex_init(&comp_done_lock);
for (i = 0; i < thread_count; i++) {
/* com_param[i].file is just used as a dummy buffer to save data, set
* it's ops to empty.
*/
comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
comp_param[i].done = true;
+ comp_param[i].quit = false;
qemu_mutex_init(&comp_param[i].mutex);
qemu_cond_init(&comp_param[i].cond);
qemu_thread_create(compress_threads + i, "compress",
@@ -805,41 +802,27 @@ static int ram_save_page(QEMUFile *f, PageSearchStatus *pss,
return pages;
}
-static int do_compress_ram_page(CompressParam *param)
+static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
+ ram_addr_t offset)
{
int bytes_sent, blen;
- uint8_t *p;
- RAMBlock *block = param->block;
- ram_addr_t offset = param->offset;
-
- p = block->host + (offset & TARGET_PAGE_MASK);
+ uint8_t *p = block->host + (offset & TARGET_PAGE_MASK);
- bytes_sent = save_page_header(param->file, block, offset |
+ bytes_sent = save_page_header(f, block, offset |
RAM_SAVE_FLAG_COMPRESS_PAGE);
- blen = qemu_put_compression_data(param->file, p, TARGET_PAGE_SIZE,
+ blen = qemu_put_compression_data(f, p, TARGET_PAGE_SIZE,
migrate_compress_level());
- bytes_sent += blen;
+ if (blen < 0) {
+ bytes_sent = 0;
+ qemu_file_set_error(migrate_get_current()->to_dst_file, blen);
+ error_report("compressed data failed!");
+ } else {
+ bytes_sent += blen;
+ }
return bytes_sent;
}
-static inline void start_compression(CompressParam *param)
-{
- param->done = false;
- qemu_mutex_lock(&param->mutex);
- param->start = true;
- qemu_cond_signal(&param->cond);
- qemu_mutex_unlock(&param->mutex);
-}
-
-static inline void start_decompression(DecompressParam *param)
-{
- qemu_mutex_lock(&param->mutex);
- param->start = true;
- qemu_cond_signal(&param->cond);
- qemu_mutex_unlock(&param->mutex);
-}
-
static uint64_t bytes_transferred;
static void flush_compressed_data(QEMUFile *f)
@@ -850,18 +833,22 @@ static void flush_compressed_data(QEMUFile *f)
return;
}
thread_count = migrate_compress_threads();
+
+ qemu_mutex_lock(&comp_done_lock);
for (idx = 0; idx < thread_count; idx++) {
- if (!comp_param[idx].done) {
- qemu_mutex_lock(comp_done_lock);
- while (!comp_param[idx].done && !quit_comp_thread) {
- qemu_cond_wait(comp_done_cond, comp_done_lock);
- }
- qemu_mutex_unlock(comp_done_lock);
+ while (!comp_param[idx].done) {
+ qemu_cond_wait(&comp_done_cond, &comp_done_lock);
}
- if (!quit_comp_thread) {
+ }
+ qemu_mutex_unlock(&comp_done_lock);
+
+ for (idx = 0; idx < thread_count; idx++) {
+ qemu_mutex_lock(&comp_param[idx].mutex);
+ if (!comp_param[idx].quit) {
len = qemu_put_qemu_file(f, comp_param[idx].file);
bytes_transferred += len;
}
+ qemu_mutex_unlock(&comp_param[idx].mutex);
}
}
@@ -879,13 +866,16 @@ static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock *block,
int idx, thread_count, bytes_xmit = -1, pages = -1;
thread_count = migrate_compress_threads();
- qemu_mutex_lock(comp_done_lock);
+ qemu_mutex_lock(&comp_done_lock);
while (true) {
for (idx = 0; idx < thread_count; idx++) {
if (comp_param[idx].done) {
+ comp_param[idx].done = false;
bytes_xmit = qemu_put_qemu_file(f, comp_param[idx].file);
+ qemu_mutex_lock(&comp_param[idx].mutex);
set_compress_params(&comp_param[idx], block, offset);
- start_compression(&comp_param[idx]);
+ qemu_cond_signal(&comp_param[idx].cond);
+ qemu_mutex_unlock(&comp_param[idx].mutex);
pages = 1;
acct_info.norm_pages++;
*bytes_transferred += bytes_xmit;
@@ -895,10 +885,10 @@ static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock *block,
if (pages > 0) {
break;
} else {
- qemu_cond_wait(comp_done_cond, comp_done_lock);
+ qemu_cond_wait(&comp_done_cond, &comp_done_lock);
}
}
- qemu_mutex_unlock(comp_done_lock);
+ qemu_mutex_unlock(&comp_done_lock);
return pages;
}
@@ -919,24 +909,20 @@ static int ram_save_compressed_page(QEMUFile *f, PageSearchStatus *pss,
uint64_t *bytes_transferred)
{
int pages = -1;
- uint64_t bytes_xmit;
+ uint64_t bytes_xmit = 0;
uint8_t *p;
- int ret;
+ int ret, blen;
RAMBlock *block = pss->block;
ram_addr_t offset = pss->offset;
p = block->host + offset;
- bytes_xmit = 0;
ret = ram_control_save_page(f, block->offset,
offset, TARGET_PAGE_SIZE, &bytes_xmit);
if (bytes_xmit) {
*bytes_transferred += bytes_xmit;
pages = 1;
}
- if (block == last_sent_block) {
- offset |= RAM_SAVE_FLAG_CONTINUE;
- }
if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
if (ret != RAM_SAVE_CONTROL_DELAYED) {
if (bytes_xmit > 0) {
@@ -956,17 +942,22 @@ static int ram_save_compressed_page(QEMUFile *f, PageSearchStatus *pss,
flush_compressed_data(f);
pages = save_zero_page(f, block, offset, p, bytes_transferred);
if (pages == -1) {
- set_compress_params(&comp_param[0], block, offset);
- /* Use the qemu thread to compress the data to make sure the
- * first page is sent out before other pages
- */
- bytes_xmit = do_compress_ram_page(&comp_param[0]);
- acct_info.norm_pages++;
- qemu_put_qemu_file(f, comp_param[0].file);
- *bytes_transferred += bytes_xmit;
- pages = 1;
+ /* Make sure the first page is sent out before other pages */
+ bytes_xmit = save_page_header(f, block, offset |
+ RAM_SAVE_FLAG_COMPRESS_PAGE);
+ blen = qemu_put_compression_data(f, p, TARGET_PAGE_SIZE,
+ migrate_compress_level());
+ if (blen > 0) {
+ *bytes_transferred += bytes_xmit + blen;
+ acct_info.norm_pages++;
+ pages = 1;
+ } else {
+ qemu_file_set_error(f, blen);
+ error_report("compressed data failed!");
+ }
}
} else {
+ offset |= RAM_SAVE_FLAG_CONTINUE;
pages = save_zero_page(f, block, offset, p, bytes_transferred);
if (pages == -1) {
pages = compress_page_with_multi_thread(f, block, offset,
@@ -2191,29 +2182,59 @@ static void *do_data_decompress(void *opaque)
{
DecompressParam *param = opaque;
unsigned long pagesize;
+ uint8_t *des;
+ int len;
+
+ qemu_mutex_lock(&param->mutex);
+ while (!param->quit) {
+ if (param->des) {
+ des = param->des;
+ len = param->len;
+ param->des = 0;
+ qemu_mutex_unlock(&param->mutex);
- while (!quit_decomp_thread) {
- qemu_mutex_lock(&param->mutex);
- while (!param->start && !quit_decomp_thread) {
- qemu_cond_wait(&param->cond, &param->mutex);
pagesize = TARGET_PAGE_SIZE;
- if (!quit_decomp_thread) {
- /* uncompress() will return failed in some case, especially
- * when the page is dirted when doing the compression, it's
- * not a problem because the dirty page will be retransferred
- * and uncompress() won't break the data in other pages.
- */
- uncompress((Bytef *)param->des, &pagesize,
- (const Bytef *)param->compbuf, param->len);
- }
- param->start = false;
+ /* uncompress() will return failed in some case, especially
+ * when the page is dirted when doing the compression, it's
+ * not a problem because the dirty page will be retransferred
+ * and uncompress() won't break the data in other pages.
+ */
+ uncompress((Bytef *)des, &pagesize,
+ (const Bytef *)param->compbuf, len);
+
+ qemu_mutex_lock(&decomp_done_lock);
+ param->done = true;
+ qemu_cond_signal(&decomp_done_cond);
+ qemu_mutex_unlock(&decomp_done_lock);
+
+ qemu_mutex_lock(&param->mutex);
+ } else {
+ qemu_cond_wait(&param->cond, &param->mutex);
}
- qemu_mutex_unlock(&param->mutex);
}
+ qemu_mutex_unlock(&param->mutex);
return NULL;
}
+static void wait_for_decompress_done(void)
+{
+ int idx, thread_count;
+
+ if (!migrate_use_compression()) {
+ return;
+ }
+
+ thread_count = migrate_decompress_threads();
+ qemu_mutex_lock(&decomp_done_lock);
+ for (idx = 0; idx < thread_count; idx++) {
+ while (!decomp_param[idx].done) {
+ qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
+ }
+ }
+ qemu_mutex_unlock(&decomp_done_lock);
+}
+
void migrate_decompress_threads_create(void)
{
int i, thread_count;
@@ -2221,11 +2242,14 @@ void migrate_decompress_threads_create(void)
thread_count = migrate_decompress_threads();
decompress_threads = g_new0(QemuThread, thread_count);
decomp_param = g_new0(DecompressParam, thread_count);
- quit_decomp_thread = false;
+ qemu_mutex_init(&decomp_done_lock);
+ qemu_cond_init(&decomp_done_cond);
for (i = 0; i < thread_count; i++) {
qemu_mutex_init(&decomp_param[i].mutex);
qemu_cond_init(&decomp_param[i].cond);
decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
+ decomp_param[i].done = true;
+ decomp_param[i].quit = false;
qemu_thread_create(decompress_threads + i, "decompress",
do_data_decompress, decomp_param + i,
QEMU_THREAD_JOINABLE);
@@ -2236,10 +2260,10 @@ void migrate_decompress_threads_join(void)
{
int i, thread_count;
- quit_decomp_thread = true;
thread_count = migrate_decompress_threads();
for (i = 0; i < thread_count; i++) {
qemu_mutex_lock(&decomp_param[i].mutex);
+ decomp_param[i].quit = true;
qemu_cond_signal(&decomp_param[i].cond);
qemu_mutex_unlock(&decomp_param[i].mutex);
}
@@ -2261,20 +2285,27 @@ static void decompress_data_with_multi_threads(QEMUFile *f,
int idx, thread_count;
thread_count = migrate_decompress_threads();
+ qemu_mutex_lock(&decomp_done_lock);
while (true) {
for (idx = 0; idx < thread_count; idx++) {
- if (!decomp_param[idx].start) {
+ if (decomp_param[idx].done) {
+ decomp_param[idx].done = false;
+ qemu_mutex_lock(&decomp_param[idx].mutex);
qemu_get_buffer(f, decomp_param[idx].compbuf, len);
decomp_param[idx].des = host;
decomp_param[idx].len = len;
- start_decompression(&decomp_param[idx]);
+ qemu_cond_signal(&decomp_param[idx].cond);
+ qemu_mutex_unlock(&decomp_param[idx].mutex);
break;
}
}
if (idx < thread_count) {
break;
+ } else {
+ qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
}
}
+ qemu_mutex_unlock(&decomp_done_lock);
}
/*
@@ -2325,7 +2356,6 @@ static int ram_load_postcopy(QEMUFile *f)
ret = -EINVAL;
break;
}
- page_buffer = host;
/*
* Postcopy requires that we place whole host pages atomically.
* To make it atomic, the data is read into a temporary page
@@ -2541,6 +2571,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
}
}
+ wait_for_decompress_done();
rcu_read_unlock();
DPRINTF("Completed load of VM with exit code %d seq iteration "
"%" PRIu64 "\n", ret, seq_iter);