import io import logging import os import struct import tempfile from collections import namedtuple from subprocess import run from . import log, rm, rmdir __all__ = ["mount"] L = logging.getLogger(__name__) @log def mount(path): """Mount a VMware Virtual Machine Disk (VMDK) file as a RAW image in the local filesystem with read-only support using `libvmdk`. See also: https://github.com/libyal/libvmdk/wiki/Mounting Args: path (str): Path to the VMDK file. Returns: Path to the directory containing a single virtual file named `vmdk1`. """ # See also: https://github.com/libyal/libvmdk/issues/7 # libvmdk currently can't mount VMDK files with type "monolithicSparse" # if they've been renamed. There is already a created issue that has been # marked as solved and closed. Since libvmdk still can't handle renamed # single "monolithicSparse" images, extract name manually from descriptor, # create a symlink in case of mismatch and try to mount image through # this symlink. name = _extract_name_from_descriptor(path) if renamed := (name and name != os.path.basename(path)): L.debug("%s has been renamed", path) tempdir = tempfile.mkdtemp() symlink = os.path.join(tempdir, name) os.symlink(path, symlink) L.debug("created symlink with original name: %s", symlink) path = symlink mp = tempfile.mkdtemp() cmd = ["vmdkmount", path, mp] try: p = run(cmd, capture_output=True, check=False, text=True) except Exception as e: L.error("failed to execute command %s: %r", cmd, e) rmdir(mp) if renamed: rm(symlink) rmdir(tempdir) return None if renamed: rm(symlink) rmdir(tempdir) if os.path.ismount(mp): return mp out, err = p.stdout.strip(), p.stderr.strip() L.error("retcode: %d, stdout: %s, stderr: %s", p.returncode, out, err) rmdir(mp) return None @log def _extract_name_from_descriptor(path): """ VMware Virtual Disks Virtual Disk Format 1.1 https://www.vmware.com/app/vmdk/?src=vmdk typedef uint64 SectorType; typedef uint8 Bool; typedef struct SparseExtentHeader { uint32 magicNumber; uint32 version; uint32 flags; SectorType capacity; SectorType grainSize; SectorType descriptorOffset; SectorType descriptorSize; uint32 numGTEsPerGT; SectorType rgdOffset; SectorType gdOffset; SectorType overHead; Bool uncleanShutdown; char singleEndLineChar; char nonEndLineChar; char doubleEndLineChar1; char doubleEndLineChar2; uint16 compressAlgorithm; uint8 pad[433]; } SparseExtentHeader; #define SPARSE_MAGICNUMBER 0x564d444b /* 'V' 'M' 'D' 'K' */ """ with open(path, "rb") as fh: name = "" if not (magic := fh.read(4)) or magic != b"KDMV": return None fh.seek(-4, io.SEEK_CUR) fmt = " 0): return None # extract disc descriptor file fh.seek(header.desc_offset * 512) data = fh.read(header.desc_size * 512) descriptor = data.split(b"\x00", 1)[0].decode() for line in descriptor.splitlines(): if not (line := line.strip()) or line.startswith("#"): continue if line.startswith("createType"): if line[11:].strip('"') != "monolithicSparse": break else: continue if line.startswith("RW "): extent = line.split(" ", 3) name = extent[3].strip('"') break return name