|
1
|
+import os
|
|
2
|
+import pytest
|
|
3
|
+from tests.testutils import cli
|
|
4
|
+
|
|
5
|
+from buildstream.storage import CasBasedDirectory
|
|
6
|
+from buildstream.storage import FileBasedDirectory
|
|
7
|
+
|
|
8
|
+
|
|
9
|
+class FakeContext():
|
|
10
|
+ def __init__(self):
|
|
11
|
+ self.config_cache_quota = "65536"
|
|
12
|
+
|
|
13
|
+ def get_projects(self):
|
|
14
|
+ return []
|
|
15
|
+
|
|
16
|
+# This is a set of example file system contents. The test attempts to import
|
|
17
|
+# each on top of each other to test importing works consistently.
|
|
18
|
+# Each tuple is defined as (<filename>, <type>, <content>). Type can be
|
|
19
|
+# 'F' (file), 'S' (symlink) or 'D' (directory) with content being the contents
|
|
20
|
+# for a file or the destination for a symlink.
|
|
21
|
+root_filesets = [
|
|
22
|
+ [('a/b/c/textfile1', 'F', 'This is textfile 1\n')],
|
|
23
|
+ [('a/b/c/textfile1', 'F', 'This is the replacement textfile 1\n')],
|
|
24
|
+ [('a/b/d', 'D', '')],
|
|
25
|
+ [('a/b/c', 'S', '/a/b/d')],
|
|
26
|
+ [('a/b/d', 'D', ''), ('a/b/c', 'S', '/a/b/d')],
|
|
27
|
+]
|
|
28
|
+
|
|
29
|
+empty_hash_ref = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
|
|
30
|
+
|
|
31
|
+
|
|
32
|
+def generate_import_roots(directory):
|
|
33
|
+ for fileset in range(1, len(root_filesets) + 1):
|
|
34
|
+ rootname = "root{}".format(fileset)
|
|
35
|
+ rootdir = os.path.join(directory, "content", rootname)
|
|
36
|
+
|
|
37
|
+ for (path, typesymbol, content) in root_filesets[fileset - 1]:
|
|
38
|
+ if typesymbol == 'F':
|
|
39
|
+ (dirnames, filename) = os.path.split(path)
|
|
40
|
+ os.makedirs(os.path.join(rootdir, dirnames), exist_ok=True)
|
|
41
|
+ with open(os.path.join(rootdir, dirnames, filename), "wt") as f:
|
|
42
|
+ f.write(content)
|
|
43
|
+ elif typesymbol == 'D':
|
|
44
|
+ os.makedirs(os.path.join(rootdir, path), exist_ok=True)
|
|
45
|
+ elif typesymbol == 'S':
|
|
46
|
+ (dirnames, filename) = os.path.split(path)
|
|
47
|
+ os.makedirs(os.path.join(rootdir, dirnames), exist_ok=True)
|
|
48
|
+ os.symlink(content, os.path.join(rootdir, path))
|
|
49
|
+
|
|
50
|
+
|
|
51
|
+def file_contents(path):
|
|
52
|
+ with open(path, "r") as f:
|
|
53
|
+ result = f.read()
|
|
54
|
+ return result
|
|
55
|
+
|
|
56
|
+
|
|
57
|
+def file_contents_are(path, contents):
|
|
58
|
+ return file_contents(path) == contents
|
|
59
|
+
|
|
60
|
+
|
|
61
|
+def create_new_casdir(root_number, fake_context, tmpdir):
|
|
62
|
+ d = CasBasedDirectory(fake_context)
|
|
63
|
+ d.import_files(os.path.join(tmpdir, "content", "root{}".format(root_number)))
|
|
64
|
+ assert d.ref.hash != empty_hash_ref
|
|
65
|
+ return d
|
|
66
|
+
|
|
67
|
+def create_new_filedir(root_number, tmpdir):
|
|
68
|
+ root = os.path.join(tmpdir, "vdir")
|
|
69
|
+ os.makedirs(root)
|
|
70
|
+ d = FileBasedDirectory(root)
|
|
71
|
+ d.import_files(os.path.join(tmpdir, "content", "root{}".format(root_number)))
|
|
72
|
+ return d
|
|
73
|
+
|
|
74
|
+
|
|
75
|
+def combinations(integer_range):
|
|
76
|
+ for x in integer_range:
|
|
77
|
+ for y in integer_range:
|
|
78
|
+ yield (x, y)
|
|
79
|
+
|
|
80
|
+
|
|
81
|
+def resolve_symlinks(path, root):
|
|
82
|
+ """ A function to resolve symlinks inside 'path' components apart from the last one.
|
|
83
|
+ For example, resolve_symlinks('/a/b/c/d', '/a/b')
|
|
84
|
+ will return '/a/b/f/d' if /a/b/c is a symlink to /a/b/f. The final component of
|
|
85
|
+ 'path' is not resolved, because we typically want to inspect the symlink found
|
|
86
|
+ at that path, not its target.
|
|
87
|
+
|
|
88
|
+ """
|
|
89
|
+ components = path.split(os.path.sep)
|
|
90
|
+ location = root
|
|
91
|
+ for i in range(0, len(components) - 1):
|
|
92
|
+ location = os.path.join(location, components[i])
|
|
93
|
+ if os.path.islink(location):
|
|
94
|
+ # Resolve the link, add on all the remaining components
|
|
95
|
+ target = os.path.join(os.readlink(location))
|
|
96
|
+ tail = os.path.sep.join(components[i + 1:])
|
|
97
|
+
|
|
98
|
+ if target.startswith(os.path.sep):
|
|
99
|
+ # Absolute link - relative to root
|
|
100
|
+ location = os.path.join(root, target, tail)
|
|
101
|
+ else:
|
|
102
|
+ # Relative link - relative to symlink location
|
|
103
|
+ location = os.path.join(location, target)
|
|
104
|
+ return resolve_symlinks(location, root)
|
|
105
|
+ # If we got here, no symlinks were found. Add on the final component and return.
|
|
106
|
+ location = os.path.join(location, components[-1])
|
|
107
|
+ return location
|
|
108
|
+
|
|
109
|
+
|
|
110
|
+def directory_not_empty(path):
|
|
111
|
+ return os.listdir(path)
|
|
112
|
+
|
|
113
|
+
|
|
114
|
+@pytest.mark.parametrize("original,overlay", combinations([1, 2, 3, 4, 5]))
|
|
115
|
+def test_cas_import(cli, tmpdir, original, overlay):
|
|
116
|
+ fake_context = FakeContext()
|
|
117
|
+ fake_context.artifactdir = tmpdir
|
|
118
|
+ # Create some fake content
|
|
119
|
+ generate_import_roots(tmpdir)
|
|
120
|
+
|
|
121
|
+ d = create_new_casdir(original, fake_context, tmpdir)
|
|
122
|
+ d2 = create_new_casdir(overlay, fake_context, tmpdir)
|
|
123
|
+ d.import_files(d2)
|
|
124
|
+ d.export_files(os.path.join(tmpdir, "output"))
|
|
125
|
+
|
|
126
|
+ for item in root_filesets[overlay - 1]:
|
|
127
|
+ (path, typename, content) = item
|
|
128
|
+ realpath = resolve_symlinks(path, os.path.join(tmpdir, "output"))
|
|
129
|
+ if typename == 'F':
|
|
130
|
+ if os.path.isdir(realpath) and directory_not_empty(realpath):
|
|
131
|
+ # The file should not have overwritten the directory in this case.
|
|
132
|
+ pass
|
|
133
|
+ else:
|
|
134
|
+ assert os.path.isfile(realpath), "{} did not exist in the combined virtual directory".format(path)
|
|
135
|
+ assert file_contents_are(realpath, content)
|
|
136
|
+ elif typename == 'S':
|
|
137
|
+ if os.path.isdir(realpath) and directory_not_empty(realpath):
|
|
138
|
+ # The symlink should not have overwritten the directory in this case.
|
|
139
|
+ pass
|
|
140
|
+ else:
|
|
141
|
+ assert os.path.islink(realpath)
|
|
142
|
+ assert os.readlink(realpath) == content
|
|
143
|
+ elif typename == 'D':
|
|
144
|
+ # Note that isdir accepts symlinks to dirs, so a symlink to a dir is acceptable.
|
|
145
|
+ assert os.path.isdir(realpath)
|
|
146
|
+
|
|
147
|
+
|
|
148
|
+@pytest.mark.parametrize("root", [1, 2, 3, 4, 5])
|
|
149
|
+def test_directory_listing(cli, tmpdir, root):
|
|
150
|
+ fake_context = FakeContext()
|
|
151
|
+ fake_context.artifactdir = tmpdir
|
|
152
|
+ # Create some fake content
|
|
153
|
+ generate_import_roots(tmpdir)
|
|
154
|
+
|
|
155
|
+ d = create_new_filedir(root, tmpdir)
|
|
156
|
+ filelist = list(d.list_relative_paths())
|
|
157
|
+
|
|
158
|
+ d2 = create_new_casdir(root, fake_context, tmpdir)
|
|
159
|
+ filelist2 = list(d2.list_relative_paths())
|
|
160
|
+
|
|
161
|
+ print("filelist for root {} via FileBasedDirectory:".format(root))
|
|
162
|
+ print("{}".format(filelist))
|
|
163
|
+ print("filelist for root {} via CasBasedDirectory:".format(root))
|
|
164
|
+ print("{}".format(filelist2))
|
|
165
|
+ assert(filelist==filelist2)
|