summaryrefslogtreecommitdiffstats
path: root/tools/libvmdk.py
blob: b151429ce1b0ef5fa392cf84021ef66171f5ce40 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import io
import logging
import os
import struct
import tempfile

from collections import namedtuple
from subprocess import run
from . import log, rm, rmdir

__all__ = ["mount"]

L = logging.getLogger(__name__)


@log
def mount(path):
    """Mount a VMware Virtual Machine Disk (VMDK) file as a RAW image in the
    local filesystem with read-only support using `libvmdk`.

    See also:
    https://github.com/libyal/libvmdk/wiki/Mounting

    Args:
        path (str): Path to the VMDK file.

    Returns:
        Path to the directory containing a single virtual file named `vmdk1`.
    """
    # See also: https://github.com/libyal/libvmdk/issues/7
    # libvmdk currently can't mount VMDK files with type "monolithicSparse"
    # if they've been renamed. There is already a created issue that has been
    # marked as solved and closed. Since libvmdk still can't handle renamed
    # single "monolithicSparse" images, extract name manually from descriptor,
    # create a symlink in case of mismatch and try to mount image through
    # this symlink.
    name = _extract_name_from_descriptor(path)
    if renamed := (name and name != os.path.basename(path)):
        L.debug("%s has been renamed", path)
        tempdir = tempfile.mkdtemp()
        symlink = os.path.join(tempdir, name)
        os.symlink(path, symlink)
        L.debug("created symlink with original name: %s", symlink)
        path = symlink

    mp = tempfile.mkdtemp()
    cmd = ["vmdkmount", path, mp]
    try:
        p = run(cmd, capture_output=True, check=False, text=True)
    except Exception as e:
        L.error("failed to execute command %s: %r", cmd, e)
        rmdir(mp)
        if renamed:
            rm(symlink)
            rmdir(tempdir)
        return None

    if renamed:
        rm(symlink)
        rmdir(tempdir)

    if os.path.ismount(mp):
        return mp

    out, err = p.stdout.strip(), p.stderr.strip()
    L.error("retcode: %d, stdout: %s, stderr: %s", p.returncode, out, err)
    rmdir(mp)

    return None


@log
def _extract_name_from_descriptor(path):
    """
    VMware Virtual Disks Virtual Disk Format 1.1
    https://www.vmware.com/app/vmdk/?src=vmdk

    typedef uint64  SectorType;
    typedef uint8   Bool;

    typedef struct SparseExtentHeader {
        uint32      magicNumber;
        uint32      version;
        uint32      flags;
        SectorType  capacity;
        SectorType  grainSize;
        SectorType  descriptorOffset;
        SectorType  descriptorSize;
        uint32      numGTEsPerGT;
        SectorType  rgdOffset;
        SectorType  gdOffset;
        SectorType  overHead;
        Bool        uncleanShutdown;
        char        singleEndLineChar;
        char        nonEndLineChar;
        char        doubleEndLineChar1;
        char        doubleEndLineChar2;
        uint16      compressAlgorithm;
        uint8       pad[433];
    } SparseExtentHeader;

    #define SPARSE_MAGICNUMBER 0x564d444b /* 'V' 'M' 'D' 'K' */
    """
    with open(path, "rb") as fh:
        name = ""
        if not (magic := fh.read(4)) or magic != b"KDMV":
            return None

        fh.seek(-4, io.SEEK_CUR)

        fmt = "<IIIQQQQIQQQ?ccccHB"
        size = struct.calcsize(fmt)
        data = fh.read(size)

        Header = namedtuple(
            "Header",
            "magic version flags capacity grain_size desc_offset desc_size "
            "num_gtes_per_gt rgd_offset gd_offset overhead is_dirty "
            "single_end_line_char non_end_line_char first_dbl_end_line_char "
            "second_dbl_end_line_char compress_algorithm pad"
        )
        header = Header._make(struct.unpack_from(fmt, data))

        # check if the descriptor is embedded
        if not (header.desc_offset > 0):
            return None

        # extract disc descriptor file
        fh.seek(header.desc_offset * 512)
        data = fh.read(header.desc_size * 512)
        descriptor = data.split(b"\x00", 1)[0].decode()

        for line in descriptor.splitlines():
            if not (line := line.strip()) or line.startswith("#"):
                continue

            if line.startswith("createType"):
                if line[11:].strip('"') != "monolithicSparse":
                    break
                else:
                    continue

            if line.startswith("RW "):
                extent = line.split(" ", 3)
                name = extent[3].strip('"')
                break

        return name