summaryrefslogblamecommitdiffstats
path: root/scripts/oss-fuzz/minimize_qtest_trace.py
blob: a28913a2a7452a7b0c80f6cd7dc02b0194d4034b (plain) (tree)






























                                                                               






                                                                                
                                        









                                                                                


                                                 
                           















                                                                           
 



                                                                              












                                                                 























































































                                                                               
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
This takes a crashing qtest trace and tries to remove superflous operations
"""

import sys
import os
import subprocess
import time
import struct

QEMU_ARGS = None
QEMU_PATH = None
TIMEOUT = 5
CRASH_TOKEN = None

write_suffix_lookup = {"b": (1, "B"),
                       "w": (2, "H"),
                       "l": (4, "L"),
                       "q": (8, "Q")}

def usage():
    sys.exit("""\
Usage: QEMU_PATH="/path/to/qemu" QEMU_ARGS="args" {} input_trace output_trace
By default, will try to use the second-to-last line in the output to identify
whether the crash occred. Optionally, manually set a string that idenitifes the
crash by setting CRASH_TOKEN=
""".format((sys.argv[0])))

deduplication_note = """\n\
Note: While trimming the input, sometimes the mutated trace triggers a different
type crash but indicates the same bug. Under this situation, our minimizer is
incapable of recognizing and stopped from removing it. In the future, we may
use a more sophisticated crash case deduplication method.
\n"""

def check_if_trace_crashes(trace, path):
    with open(path, "w") as tracefile:
        tracefile.write("".join(trace))

    rc = subprocess.Popen("timeout -s 9 {timeout}s {qemu_path} {qemu_args} 2>&1\
    < {trace_path}".format(timeout=TIMEOUT,
                           qemu_path=QEMU_PATH,
                           qemu_args=QEMU_ARGS,
                           trace_path=path),
                          shell=True,
                          stdin=subprocess.PIPE,
                          stdout=subprocess.PIPE,
                          encoding="utf-8")
    global CRASH_TOKEN
    if CRASH_TOKEN is None:
        try:
            outs, _ = rc.communicate(timeout=5)
            CRASH_TOKEN = " ".join(outs.splitlines()[-2].split()[0:3])
        except subprocess.TimeoutExpired:
            print("subprocess.TimeoutExpired")
            return False
        print("Identifying Crashes by this string: {}".format(CRASH_TOKEN))
        global deduplication_note
        print(deduplication_note)
        return True

    for line in iter(rc.stdout.readline, ""):
        if "CLOSED" in line:
            return False
        if CRASH_TOKEN in line:
            return True

    print("\nWarning:")
    print("  There is no 'CLOSED'or CRASH_TOKEN in the stdout of subprocess.")
    print("  Usually this indicates a different type of crash.\n")
    return False


def minimize_trace(inpath, outpath):
    global TIMEOUT
    with open(inpath) as f:
        trace = f.readlines()
    start = time.time()
    if not check_if_trace_crashes(trace, outpath):
        sys.exit("The input qtest trace didn't cause a crash...")
    end = time.time()
    print("Crashed in {} seconds".format(end-start))
    TIMEOUT = (end-start)*5
    print("Setting the timeout for {} seconds".format(TIMEOUT))

    i = 0
    newtrace = trace[:]
    # For each line
    while i < len(newtrace):
        # 1.) Try to remove it completely and reproduce the crash. If it works,
        # we're done.
        prior = newtrace[i]
        print("Trying to remove {}".format(newtrace[i]))
        # Try to remove the line completely
        newtrace[i] = ""
        if check_if_trace_crashes(newtrace, outpath):
            i += 1
            continue
        newtrace[i] = prior

        # 2.) Try to replace write{bwlq} commands with a write addr, len
        # command. Since this can require swapping endianness, try both LE and
        # BE options. We do this, so we can "trim" the writes in (3)
        if (newtrace[i].startswith("write") and not
            newtrace[i].startswith("write ")):
            suffix = newtrace[i].split()[0][-1]
            assert(suffix in write_suffix_lookup)
            addr = int(newtrace[i].split()[1], 16)
            value = int(newtrace[i].split()[2], 16)
            for endianness in ['<', '>']:
                data = struct.pack("{end}{size}".format(end=endianness,
                                   size=write_suffix_lookup[suffix][1]),
                                   value)
                newtrace[i] = "write {addr} {size} 0x{data}\n".format(
                    addr=hex(addr),
                    size=hex(write_suffix_lookup[suffix][0]),
                    data=data.hex())
                if(check_if_trace_crashes(newtrace, outpath)):
                    break
            else:
                newtrace[i] = prior

        # 3.) If it is a qtest write command: write addr len data, try to split
        # it into two separate write commands. If splitting the write down the
        # middle does not work, try to move the pivot "left" and retry, until
        # there is no space left. The idea is to prune unneccessary bytes from
        # long writes, while accommodating arbitrary MemoryRegion access sizes
        # and alignments.
        if newtrace[i].startswith("write "):
            addr = int(newtrace[i].split()[1], 16)
            length = int(newtrace[i].split()[2], 16)
            data = newtrace[i].split()[3][2:]
            if length > 1:
                leftlength = int(length/2)
                rightlength = length - leftlength
                newtrace.insert(i+1, "")
                while leftlength > 0:
                    newtrace[i] = "write {addr} {size} 0x{data}\n".format(
                            addr=hex(addr),
                            size=hex(leftlength),
                            data=data[:leftlength*2])
                    newtrace[i+1] = "write {addr} {size} 0x{data}\n".format(
                            addr=hex(addr+leftlength),
                            size=hex(rightlength),
                            data=data[leftlength*2:])
                    if check_if_trace_crashes(newtrace, outpath):
                        break
                    else:
                        leftlength -= 1
                        rightlength += 1
                if check_if_trace_crashes(newtrace, outpath):
                    i -= 1
                else:
                    newtrace[i] = prior
                    del newtrace[i+1]
        i += 1
    check_if_trace_crashes(newtrace, outpath)


if __name__ == '__main__':
    if len(sys.argv) < 3:
        usage()

    QEMU_PATH = os.getenv("QEMU_PATH")
    QEMU_ARGS = os.getenv("QEMU_ARGS")
    if QEMU_PATH is None or QEMU_ARGS is None:
        usage()
    # if "accel" not in QEMU_ARGS:
    #     QEMU_ARGS += " -accel qtest"
    CRASH_TOKEN = os.getenv("CRASH_TOKEN")
    QEMU_ARGS += " -qtest stdio -monitor none -serial none "
    minimize_trace(sys.argv[1], sys.argv[2])