#!/usr/bin/env python3 # group: rw # # Test whether the backing BDSs are correct after completion of a # mirror block job; in "existing" modes (drive-mirror with # mode=existing and blockdev-mirror) the backing chain should not be # overridden. # # Copyright (C) 2016 Red Hat, Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. # import os import iotests from iotests import qemu_img back0_img = os.path.join(iotests.test_dir, 'back0.' + iotests.imgfmt) back1_img = os.path.join(iotests.test_dir, 'back1.' + iotests.imgfmt) back2_img = os.path.join(iotests.test_dir, 'back2.' + iotests.imgfmt) source_img = os.path.join(iotests.test_dir, 'source.' + iotests.imgfmt) target_img = os.path.join(iotests.test_dir, 'target.' + iotests.imgfmt) # Class variables for controlling its behavior: # # existing: If True, explicitly create the target image and blockdev-add it # target_backing: If existing is True: Use this filename as the backing file # of the target image # (None: no backing file) # target_blockdev_backing: If existing is True: Pass this dict as "backing" # for the blockdev-add command # (None: do not pass "backing") # target_real_backing: If existing is True: The real filename of the backing # image during runtime, only makes sense if # target_blockdev_backing is not None # (None: same as target_backing) # target_open_with_backing: If True, the target image is added with its backing # chain opened right away. If False, blockdev-add # opens it without a backing file and job completion # is supposed to open the backing chain. # use_iothread: If True, an iothread is configured for the virtio-blk device # that uses the image being mirrored class BaseClass(iotests.QMPTestCase): target_blockdev_backing = None target_real_backing = None target_open_with_backing = True use_iothread = False def setUp(self): qemu_img('create', '-f', iotests.imgfmt, back0_img, '1440K') qemu_img('create', '-f', iotests.imgfmt, '-b', back0_img, '-F', iotests.imgfmt, back1_img) qemu_img('create', '-f', iotests.imgfmt, '-b', back1_img, '-F', iotests.imgfmt, back2_img) qemu_img('create', '-f', iotests.imgfmt, '-b', back2_img, '-F', iotests.imgfmt, source_img) self.vm = iotests.VM() # Add the BDS via blockdev-add so it stays around after the mirror block # job has been completed blockdev = {'node-name': 'source', 'driver': iotests.imgfmt, 'file': {'driver': 'file', 'filename': source_img}} self.vm.add_blockdev(self.vm.qmp_to_opts(blockdev)) if self.use_iothread: self.vm.add_object('iothread,id=iothread0') iothread = ",iothread=iothread0" else: iothread = "" self.vm.add_device('virtio-scsi%s' % iothread) self.vm.add_device('scsi-hd,id=qdev0,drive=source') self.vm.launch() self.assertIntactSourceBackingChain() if self.existing: if self.target_backing: qemu_img('create', '-f', iotests.imgfmt, '-b', self.target_backing, '-F', 'raw', target_img, '1440K') else: qemu_img('create', '-f', iotests.imgfmt, target_img, '1440K') if self.cmd == 'blockdev-mirror': options = { 'node-name': 'target', 'driver': iotests.imgfmt, 'file': { 'driver': 'file', 'node-name': 'target-file', 'filename': target_img } } if not self.target_open_with_backing: options['backing'] = None elif self.target_blockdev_backing: options['backing'] = self.target_blockdev_backing result = self.vm.qmp('blockdev-add', **options) self.assert_qmp(result, 'return', {}) def tearDown(self): self.vm.shutdown() os.remove(source_img) os.remove(back2_img) os.remove(back1_img) os.remove(back0_img) try: os.remove(target_img) except OSError: pass def findBlockNode(self, node_name, qdev=None): if qdev: result = self.vm.qmp('query-block') for device in result['return']: if device['qdev'] == qdev: if node_name: self.assert_qmp(device, 'inserted/node-name', node_name) return device['inserted'] else: result = self.vm.qmp('query-named-block-nodes') for node in result['return']: if node['node-name'] == node_name: return node self.fail('Cannot find node %s/%s' % (qdev, node_name)) def assertIntactSourceBackingChain(self): node = self.findBlockNode('source') self.assert_qmp(node, 'image' + '/backing-image' * 0 + '/filename', source_img) self.assert_qmp(node, 'image' + '/backing-image' * 1 + '/filename', back2_img) self.assert_qmp(node, 'image' + '/backing-image' * 2 + '/filename', back1_img) self.assert_qmp(node, 'image' + '/backing-image' * 3 + '/filename', back0_img) self.assert_qmp_absent(node, 'image' + '/backing-image' * 4) def assertCorrectBackingImage(self, node, default_image): if self.existing: if self.target_real_backing: image = self.target_real_backing else: image = self.target_backing else: image = default_image if image: self.assert_qmp(node, 'image/backing-image/filename', image) else: self.assert_qmp_absent(node, 'image/backing-image') # Class variables for controlling its behavior: # # cmd: Mirroring command to execute, either drive-mirror or blockdev-mirror class MirrorBaseClass(BaseClass): def openBacking(self): pass def runMirror(self, sync): if self.cmd == 'blockdev-mirror': result = self.vm.qmp(self.cmd, job_id='mirror-job', device='source', sync=sync, target='target', auto_finalize=False) else: if self.existing: mode = 'existing' else: mode = 'absolute-paths' result = self.vm.qmp(self.cmd, job_id='mirror-job', device='source', sync=sync, target=target_img, format=iotests.imgfmt, mode=mode, node_name='target', auto_finalize=False) self.assert_qmp(result, 'return', {}) self.vm.run_job('mirror-job', auto_finalize=False, pre_finalize=self.openBacking, auto_dismiss=True) def testFull(self): self.runMirror('full') node = self.findBlockNode('target', 'qdev0') self.assertCorrectBackingImage(node, None) self.assertIntactSourceBackingChain() def testTop(self): self.runMirror('top') node = self.findBlockNode('target', 'qdev0') self.assertCorrectBackingImage(node, back2_img) self.assertIntactSourceBackingChain() def testNone(self): self.runMirror('none') node = self.findBlockNode('target', 'qdev0') self.assertCorrectBackingImage(node, source_img) self.assertIntactSourceBackingChain() class TestDriveMirrorAbsolutePaths(MirrorBaseClass): cmd = 'drive-mirror' existing = False class TestDriveMirrorExistingNoBacking(MirrorBaseClass): cmd = 'drive-mirror' existing = True target_backing = None class TestDriveMirrorExistingBacking(MirrorBaseClass): cmd = 'drive-mirror' existing = True target_backing = 'null-co://' class TestBlockdevMirrorNoBacking(MirrorBaseClass): cmd = 'blockdev-mirror' existing = True target_backing = None class TestBlockdevMirrorBacking(MirrorBaseClass): cmd = 'blockdev-mirror' existing = True target_backing = 'null-co://' class TestBlockdevMirrorForcedBacking(MirrorBaseClass): cmd = 'blockdev-mirror' existing = True target_backing = None target_blockdev_backing = { 'driver': 'null-co' } target_real_backing = 'null-co://' # Attach the backing chain only during completion, with blockdev-reopen class TestBlockdevMirrorReopen(MirrorBaseClass): cmd = 'blockdev-mirror' existing = True target_backing = 'null-co://' target_open_with_backing = False def openBacking(self): if not self.target_open_with_backing: result = self.vm.qmp('blockdev-add', node_name="backing", driver="null-co") self.assert_qmp(result, 'return', {}) result = self.vm.qmp('x-blockdev-reopen', node_name="target", driver=iotests.imgfmt, file="target-file", backing="backing") self.assert_qmp(result, 'return', {}) class TestBlockdevMirrorReopenIothread(TestBlockdevMirrorReopen): use_iothread = True # Attach the backing chain only during completion, with blockdev-snapshot class TestBlockdevMirrorSnapshot(MirrorBaseClass): cmd = 'blockdev-mirror' existing = True target_backing = 'null-co://' target_open_with_backing = False def openBacking(self): if not self.target_open_with_backing: result = self.vm.qmp('blockdev-add', node_name="backing", driver="null-co") self.assert_qmp(result, 'return', {}) result = self.vm.qmp('blockdev-snapshot', node="backing", overlay="target") self.assert_qmp(result, 'return', {}) class TestBlockdevMirrorSnapshotIothread(TestBlockdevMirrorSnapshot): use_iothread = True class TestCommit(BaseClass): existing = False def testCommit(self): result = self.vm.qmp('block-commit', job_id='commit-job', device='source', base=back1_img) self.assert_qmp(result, 'return', {}) self.vm.event_wait('BLOCK_JOB_READY') result = self.vm.qmp('block-job-complete', device='commit-job') self.assert_qmp(result, 'return', {}) self.vm.event_wait('BLOCK_JOB_COMPLETED') node = self.findBlockNode(None, 'qdev0') self.assert_qmp(node, 'image' + '/backing-image' * 0 + '/filename', back1_img) self.assert_qmp(node, 'image' + '/backing-image' * 1 + '/filename', back0_img) self.assert_qmp_absent(node, 'image' + '/backing-image' * 2 + '/filename') self.assertIntactSourceBackingChain() BaseClass = None MirrorBaseClass = None if __name__ == '__main__': iotests.main(supported_fmts=['qcow2'], supported_protocols=['file'])