1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
|
import io
import logging
import os
import struct
import tempfile
from collections import namedtuple
from subprocess import run
from . import log, rm, rmdir
__all__ = ["mount"]
L = logging.getLogger(__name__)
@log
def mount(path):
"""Mount a VMware Virtual Machine Disk (VMDK) file as a RAW image in the
local filesystem with read-only support using `libvmdk`.
See also:
https://github.com/libyal/libvmdk/wiki/Building
https://github.com/libyal/libvmdk/wiki/Mounting
Make sure you have downloaded the latest source distribution package from
https://github.com/libyal/libvmdk/releases, e.g.
libvmdk-alpha-20210807.tar.gz and built it as follows:
$ sudo apt install build-essential libfuse-dev
$ tar xfv libvmdk-alpha-20210807.tar.gz
$ cd libvmdk-20210807/
$ ./configure
$ make
$ sudo make install
$ sudo ldconfig
Args:
path (str): Path to the VMDK file.
Returns:
Path to the directory containing a single virtual file named `vmdk1`.
"""
# See also: https://github.com/libyal/libvmdk/issues/7
# libvmdk currently can't mount VMDK files with type "monolithicSparse"
# if they've been renamed. There is already a created issue that has been
# marked as solved and closed. Since libvmdk still can't handle renamed
# single "monolithicSparse" images, extract name manually from descriptor,
# create a symlink in case of mismatch and try to mount image through
# this symlink.
name = _extract_name_from_descriptor(path)
if renamed := (name and name != os.path.basename(path)):
L.debug("%s has been renamed", path)
tempdir = tempfile.mkdtemp()
symlink = os.path.join(tempdir, name)
os.symlink(path, symlink)
L.debug("created symlink with original name: %s", symlink)
path = symlink
mp = tempfile.mkdtemp()
cmd = ["vmdkmount", path, mp]
try:
p = run(cmd, capture_output=True, check=False, text=True)
except Exception as e:
L.error("failed to execute command %s: %r", cmd, e)
rmdir(mp)
if renamed:
rm(symlink)
rmdir(tempdir)
return None
if renamed:
rm(symlink)
rmdir(tempdir)
if os.path.ismount(mp):
return mp
out, err = p.stdout.strip(), p.stderr.strip()
L.error("retcode: %d, stdout: %s, stderr: %s", p.returncode, out, err)
rmdir(mp)
return None
@log
def _extract_name_from_descriptor(path):
"""
VMware Virtual Disks Virtual Disk Format 1.1
https://www.vmware.com/app/vmdk/?src=vmdk
typedef uint64 SectorType;
typedef uint8 Bool;
typedef struct SparseExtentHeader {
uint32 magicNumber;
uint32 version;
uint32 flags;
SectorType capacity;
SectorType grainSize;
SectorType descriptorOffset;
SectorType descriptorSize;
uint32 numGTEsPerGT;
SectorType rgdOffset;
SectorType gdOffset;
SectorType overHead;
Bool uncleanShutdown;
char singleEndLineChar;
char nonEndLineChar;
char doubleEndLineChar1;
char doubleEndLineChar2;
uint16 compressAlgorithm;
uint8 pad[433];
} SparseExtentHeader;
#define SPARSE_MAGICNUMBER 0x564d444b /* 'V' 'M' 'D' 'K' */
"""
with open(path, "rb") as fh:
name = ""
if not (magic := fh.read(4)) or magic != b"KDMV":
return None
fh.seek(-4, io.SEEK_CUR)
fmt = "<IIIQQQQIQQQ?ccccHB"
size = struct.calcsize(fmt)
data = fh.read(size)
Header = namedtuple(
"Header",
"magic version flags capacity grain_size desc_offset desc_size "
"num_gtes_per_gt rgd_offset gd_offset overhead is_dirty "
"single_end_line_char non_end_line_char first_dbl_end_line_char "
"second_dbl_end_line_char compress_algorithm pad"
)
header = Header._make(struct.unpack_from(fmt, data))
# check if the descriptor is embedded
if not (header.desc_offset > 0):
return None
# extract disc descriptor file
fh.seek(header.desc_offset * 512)
data = fh.read(header.desc_size * 512)
descriptor = data.split(b"\x00", 1)[0].decode()
for line in descriptor.splitlines():
if not (line := line.strip()) or line.startswith("#"):
continue
if line.startswith("createType"):
if line[11:].strip('"') != "monolithicSparse":
break
else:
continue
if line.startswith("RW "):
extent = line.split(" ", 3)
name = extent[3].strip('"')
break
return name
|