summaryrefslogblamecommitdiffstats
path: root/tests/image-fuzzer/runner.py
blob: 2fc010fd9d7b2548e2dac58a3b1295d217347baf (plain) (tree)
1
                      

























                                                                       
           
             
         







                                 

                                                                                     

























                                                                             




                                                
                       





                                                  


                                                      

                                                      


                                        

                     







                                                            

































                                                                             
                                                           




                                                                    

                                                                     













                                                                              
                                                                         





                                                                 
                            
                                                                                 
                                                               















                                                                              
                                                                        





                                                                             
                                

























                                                                               

                                                                         











                                                                    
                                                                       














                                                                          
                                    

                                                        
                                

                                                                           
                                        




                                                               


                                                                   



                                                       


                                                                          













                                                                  
                 



                                                                              
 
                
                                                                            


                                                                  
                                                                            











                                                                               
                                                                               
                                                               

                                                                
 



                                                                            

                                 
                                                                           
                                        

                                                                    

                                              







                                                                              
            



















                                                                             




                                                                           
        
                                                                 
                                                                               
                                                                               
                             
                                                                                              






                   
                   






                                         
                                                           

                                                                                






                                            

                                         


                                        
                                                           

                                                                                


                           

                                                                 













                                                                          
                            

                                                                       





                                                                   


                                                
            
                                                                          





                                               
#!/usr/bin/env python3

# Tool for running fuzz tests
#
# Copyright (C) 2014 Maria Kustova <maria.k@catit.be>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

import sys
import os
import signal
import subprocess
import random
import shutil
from itertools import count
import time
import getopt
import io
import resource

try:
    import json
except ImportError:
    try:
        import simplejson as json
    except ImportError:
        print("Warning: Module for JSON processing is not found.\n" \
            "'--config' and '--command' options are not supported.", file=sys.stderr)

# Backing file sizes in MB
MAX_BACKING_FILE_SIZE = 10
MIN_BACKING_FILE_SIZE = 1


def multilog(msg, *output):
    """ Write an object to all of specified file descriptors."""
    for fd in output:
        fd.write(msg)
        fd.flush()


def str_signal(sig):
    """ Convert a numeric value of a system signal to the string one
    defined by the current operational system.
    """
    for k, v in signal.__dict__.items():
        if v == sig:
            return k


def run_app(fd, q_args):
    """Start an application with specified arguments and return its exit code
    or kill signal depending on the result of execution.
    """

    class Alarm(Exception):
        """Exception for signal.alarm events."""
        pass

    def handler(*args):
        """Notify that an alarm event occurred."""
        raise Alarm

    signal.signal(signal.SIGALRM, handler)
    signal.alarm(600)
    term_signal = signal.SIGKILL
    devnull = open('/dev/null', 'r+')
    process = subprocess.Popen(q_args, stdin=devnull,
                               stdout=subprocess.PIPE,
                               stderr=subprocess.PIPE,
                               errors='replace')
    try:
        out, err = process.communicate()
        signal.alarm(0)
        fd.write(out)
        fd.write(err)
        fd.flush()
        return process.returncode

    except Alarm:
        os.kill(process.pid, term_signal)
        fd.write('The command was terminated by timeout.\n')
        fd.flush()
        return -term_signal


class TestException(Exception):
    """Exception for errors risen by TestEnv objects."""
    pass


class TestEnv(object):

    """Test object.

    The class sets up test environment, generates backing and test images
    and executes application under tests with specified arguments and a test
    image provided.

    All logs are collected.

    The summary log will contain short descriptions and statuses of tests in
    a run.

    The test log will include application (e.g. 'qemu-img') logs besides info
    sent to the summary log.
    """

    def __init__(self, test_id, seed, work_dir, run_log,
                 cleanup=True, log_all=False):
        """Set test environment in a specified work directory.

        Path to qemu-img and qemu-io will be retrieved from 'QEMU_IMG' and
        'QEMU_IO' environment variables.
        """
        if seed is not None:
            self.seed = seed
        else:
            self.seed = str(random.randint(0, sys.maxsize))
        random.seed(self.seed)

        self.init_path = os.getcwd()
        self.work_dir = work_dir
        self.current_dir = os.path.join(work_dir, 'test-' + test_id)
        self.qemu_img = \
            os.environ.get('QEMU_IMG', 'qemu-img').strip().split(' ')
        self.qemu_io = os.environ.get('QEMU_IO', 'qemu-io').strip().split(' ')
        self.commands = [['qemu-img', 'check', '-f', 'qcow2', '$test_img'],
                         ['qemu-img', 'info', '-f', 'qcow2', '$test_img'],
                         ['qemu-io', '$test_img', '-c', 'read $off $len'],
                         ['qemu-io', '$test_img', '-c', 'write $off $len'],
                         ['qemu-io', '$test_img', '-c',
                          'aio_read $off $len'],
                         ['qemu-io', '$test_img', '-c',
                          'aio_write $off $len'],
                         ['qemu-io', '$test_img', '-c', 'flush'],
                         ['qemu-io', '$test_img', '-c',
                          'discard $off $len'],
                         ['qemu-io', '$test_img', '-c',
                          'truncate $off']]
        for fmt in ['raw', 'vmdk', 'vdi', 'qcow2', 'file', 'qed', 'vpc']:
            self.commands.append(
                ['qemu-img', 'convert', '-f', 'qcow2', '-O', fmt,
                 '$test_img', 'converted_image.' + fmt])

        try:
            os.makedirs(self.current_dir)
        except OSError as e:
            print("Error: The working directory '%s' cannot be used. Reason: %s"\
                % (self.work_dir, e.strerror), file=sys.stderr)
            raise TestException
        self.log = open(os.path.join(self.current_dir, "test.log"), "w")
        self.parent_log = open(run_log, "a")
        self.failed = False
        self.cleanup = cleanup
        self.log_all = log_all

    def _create_backing_file(self):
        """Create a backing file in the current directory.

        Return a tuple of a backing file name and format.

        Format of a backing file is randomly chosen from all formats supported
        by 'qemu-img create'.
        """
        # All formats supported by the 'qemu-img create' command.
        backing_file_fmt = random.choice(['raw', 'vmdk', 'vdi', 'qcow2',
                                          'file', 'qed', 'vpc'])
        backing_file_name = 'backing_img.' + backing_file_fmt
        backing_file_size = random.randint(MIN_BACKING_FILE_SIZE,
                                           MAX_BACKING_FILE_SIZE) * (1 << 20)
        cmd = self.qemu_img + ['create', '-f', backing_file_fmt,
                               backing_file_name, str(backing_file_size)]
        temp_log = io.StringIO()
        retcode = run_app(temp_log, cmd)
        if retcode == 0:
            temp_log.close()
            return (backing_file_name, backing_file_fmt)
        else:
            multilog("Warning: The %s backing file was not created.\n\n"
                     % backing_file_fmt, sys.stderr, self.log, self.parent_log)
            self.log.write("Log for the failure:\n" + temp_log.getvalue() +
                           '\n\n')
            temp_log.close()
            return (None, None)

    def execute(self, input_commands=None, fuzz_config=None):
        """ Execute a test.

        The method creates backing and test images, runs test app and analyzes
        its exit status. If the application was killed by a signal, the test
        is marked as failed.
        """
        if input_commands is None:
            commands = self.commands
        else:
            commands = input_commands

        os.chdir(self.current_dir)
        backing_file_name, backing_file_fmt = self._create_backing_file()
        img_size = image_generator.create_image(
            'test.img', backing_file_name, backing_file_fmt, fuzz_config)
        for item in commands:
            shutil.copy('test.img', 'copy.img')
            # 'off' and 'len' are multiple of the sector size
            sector_size = 512
            start = random.randrange(0, img_size + 1, sector_size)
            end = random.randrange(start, img_size + 1, sector_size)

            if item[0] == 'qemu-img':
                current_cmd = list(self.qemu_img)
            elif item[0] == 'qemu-io':
                current_cmd = list(self.qemu_io)
            else:
                multilog("Warning: test command '%s' is not defined.\n"
                         % item[0], sys.stderr, self.log, self.parent_log)
                continue
            # Replace all placeholders with their real values
            for v in item[1:]:
                c = (v
                     .replace('$test_img', 'copy.img')
                     .replace('$off', str(start))
                     .replace('$len', str(end - start)))
                current_cmd.append(c)

            # Log string with the test header
            test_summary = "Seed: %s\nCommand: %s\nTest directory: %s\n" \
                           "Backing file: %s\n" \
                           % (self.seed, " ".join(current_cmd),
                              self.current_dir, backing_file_name)
            temp_log = io.StringIO()
            try:
                retcode = run_app(temp_log, current_cmd)
            except OSError as e:
                multilog("%sError: Start of '%s' failed. Reason: %s\n\n"
                         % (test_summary, os.path.basename(current_cmd[0]),
                            e.strerror),
                         sys.stderr, self.log, self.parent_log)
                raise TestException

            if retcode < 0:
                self.log.write(temp_log.getvalue())
                multilog("%sFAIL: Test terminated by signal %s\n\n"
                         % (test_summary, str_signal(-retcode)),
                         sys.stderr, self.log, self.parent_log)
                self.failed = True
            else:
                if self.log_all:
                    self.log.write(temp_log.getvalue())
                    multilog("%sPASS: Application exited with the code " \
                             "'%d'\n\n" % (test_summary, retcode),
                             sys.stdout, self.log, self.parent_log)
            temp_log.close()
            os.remove('copy.img')

    def finish(self):
        """Restore the test environment after a test execution."""
        self.log.close()
        self.parent_log.close()
        os.chdir(self.init_path)
        if self.cleanup and not self.failed:
            shutil.rmtree(self.current_dir)

if __name__ == '__main__':

    def usage():
        print("""
        Usage: runner.py [OPTION...] TEST_DIR IMG_GENERATOR

        Set up test environment in TEST_DIR and run a test in it. A module for
        test image generation should be specified via IMG_GENERATOR.

        Example:
          runner.py -c '[["qemu-img", "info", "$test_img"]]' /tmp/test qcow2

        Optional arguments:
          -h, --help                    display this help and exit
          -d, --duration=NUMBER         finish tests after NUMBER of seconds
          -c, --command=JSON            run tests for all commands specified in
                                        the JSON array
          -s, --seed=STRING             seed for a test image generation,
                                        by default will be generated randomly
          --config=JSON                 take fuzzer configuration from the JSON
                                        array
          -k, --keep_passed             don't remove folders of passed tests
          -v, --verbose                 log information about passed tests

        JSON:

        '--command' accepts a JSON array of commands. Each command presents
        an application under test with all its parameters as a list of strings,
        e.g. ["qemu-io", "$test_img", "-c", "write $off $len"].

        Supported application aliases: 'qemu-img' and 'qemu-io'.

        Supported argument aliases: $test_img for the fuzzed image, $off
        for an offset, $len for length.

        Values for $off and $len will be generated based on the virtual disk
        size of the fuzzed image.

        Paths to 'qemu-img' and 'qemu-io' are retrevied from 'QEMU_IMG' and
        'QEMU_IO' environment variables.

        '--config' accepts a JSON array of fields to be fuzzed, e.g.
        '[["header"], ["header", "version"]]'.

        Each of the list elements can consist of a complex image element only
        as ["header"] or ["feature_name_table"] or an exact field as
        ["header", "version"]. In the first case random portion of the element
        fields will be fuzzed, in the second one the specified field will be
        fuzzed always.

        If '--config' argument is specified, fields not listed in
        the configuration array will not be fuzzed.
        """)

    def run_test(test_id, seed, work_dir, run_log, cleanup, log_all,
                 command, fuzz_config):
        """Setup environment for one test and execute this test."""
        try:
            test = TestEnv(test_id, seed, work_dir, run_log, cleanup,
                           log_all)
        except TestException:
            sys.exit(1)

        # Python 2.4 doesn't support 'finally' and 'except' in the same 'try'
        # block
        try:
            try:
                test.execute(command, fuzz_config)
            except TestException:
                sys.exit(1)
        finally:
            test.finish()

    def should_continue(duration, start_time):
        """Return True if a new test can be started and False otherwise."""
        current_time = int(time.time())
        return (duration is None) or (current_time - start_time < duration)

    try:
        opts, args = getopt.gnu_getopt(sys.argv[1:], 'c:hs:kvd:',
                                       ['command=', 'help', 'seed=', 'config=',
                                        'keep_passed', 'verbose', 'duration='])
    except getopt.error as e:
        print("Error: %s\n\nTry 'runner.py --help' for more information" % e, file=sys.stderr)
        sys.exit(1)

    command = None
    cleanup = True
    log_all = False
    seed = None
    config = None
    duration = None
    for opt, arg in opts:
        if opt in ('-h', '--help'):
            usage()
            sys.exit()
        elif opt in ('-c', '--command'):
            try:
                command = json.loads(arg)
            except (TypeError, ValueError, NameError) as e:
                print("Error: JSON array of test commands cannot be loaded.\n" \
                    "Reason: %s" % e, file=sys.stderr)
                sys.exit(1)
        elif opt in ('-k', '--keep_passed'):
            cleanup = False
        elif opt in ('-v', '--verbose'):
            log_all = True
        elif opt in ('-s', '--seed'):
            seed = arg
        elif opt in ('-d', '--duration'):
            duration = int(arg)
        elif opt == '--config':
            try:
                config = json.loads(arg)
            except (TypeError, ValueError, NameError) as e:
                print("Error: JSON array with the fuzzer configuration cannot" \
                    " be loaded\nReason: %s" % e, file=sys.stderr)
                sys.exit(1)

    if not len(args) == 2:
        print("Expected two parameters\nTry 'runner.py --help'" \
            " for more information.", file=sys.stderr)
        sys.exit(1)

    work_dir = os.path.realpath(args[0])
    # run_log is created in 'main', because multiple tests are expected to
    # log in it
    run_log = os.path.join(work_dir, 'run.log')

    # Add the path to the image generator module to sys.path
    sys.path.append(os.path.realpath(os.path.dirname(args[1])))
    # Remove a script extension from image generator module if any
    generator_name = os.path.splitext(os.path.basename(args[1]))[0]

    try:
        image_generator = __import__(generator_name)
    except ImportError as e:
        print("Error: The image generator '%s' cannot be imported.\n" \
            "Reason: %s" % (generator_name, e), file=sys.stderr)
        sys.exit(1)

    # Enable core dumps
    resource.setrlimit(resource.RLIMIT_CORE, (-1, -1))
    # If a seed is specified, only one test will be executed.
    # Otherwise runner will terminate after a keyboard interruption
    start_time = int(time.time())
    test_id = count(1)
    while should_continue(duration, start_time):
        try:
            run_test(str(next(test_id)), seed, work_dir, run_log, cleanup,
                     log_all, command, config)
        except (KeyboardInterrupt, SystemExit):
            sys.exit(1)

        if seed is not None:
            break