Valentin David pushed to branch valentindavid/cache_server_fill_up-1.2 at BuildStream / buildstream
Commits:
-
075ffefa
by Valentin David at 2018-09-28T09:23:19Z
-
621bd236
by Tiago Gomes at 2018-09-28T09:52:15Z
-
e298af53
by Valentin David at 2018-09-28T14:42:38Z
-
0a332520
by Valentin David at 2018-09-28T14:42:38Z
-
40962772
by Valentin David at 2018-09-28T14:42:38Z
-
8dbf1d34
by Valentin David at 2018-09-28T14:42:38Z
-
b6ae1c70
by Valentin David at 2018-09-28T14:42:38Z
-
3909b52e
by Valentin David at 2018-09-28T14:42:38Z
6 changed files:
- buildstream/_artifactcache/cascache.py
- buildstream/_artifactcache/casserver.py
- buildstream/_yaml.py
- tests/format/project.py
- tests/frontend/push.py
- tests/testutils/artifactshare.py
Changes:
... | ... | @@ -26,6 +26,7 @@ import stat |
26 | 26 |
import tempfile
|
27 | 27 |
import uuid
|
28 | 28 |
import errno
|
29 |
+import contextlib
|
|
29 | 30 |
from urllib.parse import urlparse
|
30 | 31 |
|
31 | 32 |
import grpc
|
... | ... | @@ -393,13 +394,14 @@ class CASCache(ArtifactCache): |
393 | 394 |
# digest (Digest): An optional Digest object to populate
|
394 | 395 |
# path (str): Path to file to add
|
395 | 396 |
# buffer (bytes): Byte buffer to add
|
397 |
+ # link_directly (bool): Whether file given by path can be linked
|
|
396 | 398 |
#
|
397 | 399 |
# Returns:
|
398 | 400 |
# (Digest): The digest of the added object
|
399 | 401 |
#
|
400 | 402 |
# Either `path` or `buffer` must be passed, but not both.
|
401 | 403 |
#
|
402 |
- def add_object(self, *, digest=None, path=None, buffer=None):
|
|
404 |
+ def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False):
|
|
403 | 405 |
# Exactly one of the two parameters has to be specified
|
404 | 406 |
assert (path is None) != (buffer is None)
|
405 | 407 |
|
... | ... | @@ -409,28 +411,34 @@ class CASCache(ArtifactCache): |
409 | 411 |
try:
|
410 | 412 |
h = hashlib.sha256()
|
411 | 413 |
# Always write out new file to avoid corruption if input file is modified
|
412 |
- with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
|
|
413 |
- # Set mode bits to 0644
|
|
414 |
- os.chmod(out.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
|
|
415 |
- |
|
416 |
- if path:
|
|
417 |
- with open(path, 'rb') as f:
|
|
418 |
- for chunk in iter(lambda: f.read(4096), b""):
|
|
419 |
- h.update(chunk)
|
|
420 |
- out.write(chunk)
|
|
414 |
+ with contextlib.ExitStack() as stack:
|
|
415 |
+ if path is not None and link_directly:
|
|
416 |
+ tmp = stack.enter_context(open(path, 'rb'))
|
|
417 |
+ for chunk in iter(lambda: tmp.read(4096), b""):
|
|
418 |
+ h.update(chunk)
|
|
421 | 419 |
else:
|
422 |
- h.update(buffer)
|
|
423 |
- out.write(buffer)
|
|
420 |
+ tmp = stack.enter_context(tempfile.NamedTemporaryFile(dir=self.tmpdir))
|
|
421 |
+ # Set mode bits to 0644
|
|
422 |
+ os.chmod(tmp.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
|
|
424 | 423 |
|
425 |
- out.flush()
|
|
424 |
+ if path:
|
|
425 |
+ with open(path, 'rb') as f:
|
|
426 |
+ for chunk in iter(lambda: f.read(4096), b""):
|
|
427 |
+ h.update(chunk)
|
|
428 |
+ tmp.write(chunk)
|
|
429 |
+ else:
|
|
430 |
+ h.update(buffer)
|
|
431 |
+ tmp.write(buffer)
|
|
432 |
+ |
|
433 |
+ tmp.flush()
|
|
426 | 434 |
|
427 | 435 |
digest.hash = h.hexdigest()
|
428 |
- digest.size_bytes = os.fstat(out.fileno()).st_size
|
|
436 |
+ digest.size_bytes = os.fstat(tmp.fileno()).st_size
|
|
429 | 437 |
|
430 | 438 |
# Place file at final location
|
431 | 439 |
objpath = self.objpath(digest)
|
432 | 440 |
os.makedirs(os.path.dirname(objpath), exist_ok=True)
|
433 |
- os.link(out.name, objpath)
|
|
441 |
+ os.link(tmp.name, objpath)
|
|
434 | 442 |
|
435 | 443 |
except FileExistsError as e:
|
436 | 444 |
# We can ignore the failed link() if the object is already in the repo.
|
... | ... | @@ -565,7 +573,12 @@ class CASCache(ArtifactCache): |
565 | 573 |
#
|
566 | 574 |
# Prune unreachable objects from the repo.
|
567 | 575 |
#
|
568 |
- def prune(self):
|
|
576 |
+ # Args:
|
|
577 |
+ # keep_after (int|None): timestamp after which unreachable objects
|
|
578 |
+ # are kept. None if no unreachable object
|
|
579 |
+ # should be kept.
|
|
580 |
+ #
|
|
581 |
+ def prune(self, keep_after=None):
|
|
569 | 582 |
ref_heads = os.path.join(self.casdir, 'refs', 'heads')
|
570 | 583 |
|
571 | 584 |
pruned = 0
|
... | ... | @@ -586,6 +599,10 @@ class CASCache(ArtifactCache): |
586 | 599 |
objhash = os.path.basename(root) + filename
|
587 | 600 |
if objhash not in reachable:
|
588 | 601 |
obj_path = os.path.join(root, filename)
|
602 |
+ if keep_after:
|
|
603 |
+ st = os.stat(obj_path)
|
|
604 |
+ if st.st_mtime >= keep_after:
|
|
605 |
+ continue
|
|
589 | 606 |
pruned += os.stat(obj_path).st_size
|
590 | 607 |
os.unlink(obj_path)
|
591 | 608 |
|
... | ... | @@ -24,6 +24,9 @@ import signal |
24 | 24 |
import sys
|
25 | 25 |
import tempfile
|
26 | 26 |
import uuid
|
27 |
+import time
|
|
28 |
+import errno
|
|
29 |
+import ctypes
|
|
27 | 30 |
|
28 | 31 |
import click
|
29 | 32 |
import grpc
|
... | ... | @@ -41,6 +44,10 @@ from .cascache import CASCache |
41 | 44 |
# The default limit for gRPC messages is 4 MiB
|
42 | 45 |
_MAX_BATCH_TOTAL_SIZE_BYTES = 4 * 1024 * 1024
|
43 | 46 |
|
47 |
+# The minimum age in seconds for objects before they can be cleaned
|
|
48 |
+# up.
|
|
49 |
+_OBJECT_MIN_AGE = 6 * 60 * 60
|
|
50 |
+ |
|
44 | 51 |
|
45 | 52 |
# Trying to push an artifact that is too large
|
46 | 53 |
class ArtifactTooLargeException(Exception):
|
... | ... | @@ -130,11 +137,43 @@ def server_main(repo, port, server_key, server_cert, client_certs, enable_push): |
130 | 137 |
server.stop(0)
|
131 | 138 |
|
132 | 139 |
|
140 |
+class _FallocateCall:
|
|
141 |
+ |
|
142 |
+ FALLOC_FL_KEEP_SIZE = 1
|
|
143 |
+ FALLOC_FL_PUNCH_HOLE = 2
|
|
144 |
+ FALLOC_FL_NO_HIDE_STALE = 4
|
|
145 |
+ FALLOC_FL_COLLAPSE_RANGE = 8
|
|
146 |
+ FALLOC_FL_ZERO_RANGE = 16
|
|
147 |
+ FALLOC_FL_INSERT_RANGE = 32
|
|
148 |
+ FALLOC_FL_UNSHARE_RANGE = 64
|
|
149 |
+ |
|
150 |
+ def __init__(self):
|
|
151 |
+ self.libc = ctypes.CDLL("libc.so.6", use_errno=True)
|
|
152 |
+ try:
|
|
153 |
+ self.fallocate64 = self.libc.fallocate64
|
|
154 |
+ except AttributeError:
|
|
155 |
+ self.fallocate = self.libc.fallocate
|
|
156 |
+ |
|
157 |
+ def __call__(self, fd, mode, offset, length):
|
|
158 |
+ if hasattr(self, 'fallocate64'):
|
|
159 |
+ print(fd, mode, offset, length)
|
|
160 |
+ ret = self.fallocate64(ctypes.c_int(fd), ctypes.c_int(mode),
|
|
161 |
+ ctypes.c_int64(offset), ctypes.c_int64(length))
|
|
162 |
+ else:
|
|
163 |
+ ret = self.fallocate(ctypes.c_int(fd), ctypes.c_int(mode),
|
|
164 |
+ ctypes.c_int(offset), ctypes.c_int(length))
|
|
165 |
+ if ret == -1:
|
|
166 |
+ errno = ctypes.get_errno()
|
|
167 |
+ raise OSError(errno, os.strerror(errno))
|
|
168 |
+ return ret
|
|
169 |
+ |
|
170 |
+ |
|
133 | 171 |
class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
|
134 | 172 |
def __init__(self, cas, *, enable_push):
|
135 | 173 |
super().__init__()
|
136 | 174 |
self.cas = cas
|
137 | 175 |
self.enable_push = enable_push
|
176 |
+ self.fallocate = _FallocateCall()
|
|
138 | 177 |
|
139 | 178 |
def Read(self, request, context):
|
140 | 179 |
resource_name = request.resource_name
|
... | ... | @@ -192,25 +231,44 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
192 | 231 |
context.set_code(grpc.StatusCode.NOT_FOUND)
|
193 | 232 |
return response
|
194 | 233 |
|
195 |
- try:
|
|
196 |
- _clean_up_cache(self.cas, client_digest.size_bytes)
|
|
197 |
- except ArtifactTooLargeException as e:
|
|
198 |
- context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
|
|
199 |
- context.set_details(str(e))
|
|
200 |
- return response
|
|
234 |
+ while True:
|
|
235 |
+ if client_digest.size_bytes == 0:
|
|
236 |
+ break
|
|
237 |
+ try:
|
|
238 |
+ _clean_up_cache(self.cas, client_digest.size_bytes)
|
|
239 |
+ except ArtifactTooLargeException as e:
|
|
240 |
+ context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
|
|
241 |
+ context.set_details(str(e))
|
|
242 |
+ return response
|
|
243 |
+ |
|
244 |
+ try:
|
|
245 |
+ self.fallocate(out.fileno(), 0, 0, client_digest.size_bytes)
|
|
246 |
+ break
|
|
247 |
+ except OSError as e:
|
|
248 |
+ # Multiple upload can happen in the same time
|
|
249 |
+ if e.errno != errno.ENOSPC:
|
|
250 |
+ raise
|
|
251 |
+ |
|
201 | 252 |
elif request.resource_name:
|
202 | 253 |
# If it is set on subsequent calls, it **must** match the value of the first request.
|
203 | 254 |
if request.resource_name != resource_name:
|
204 | 255 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
205 | 256 |
return response
|
257 |
+ |
|
258 |
+ if (offset + len(request.data)) > client_digest.size_bytes:
|
|
259 |
+ context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
|
260 |
+ return response
|
|
261 |
+ |
|
206 | 262 |
out.write(request.data)
|
263 |
+ |
|
207 | 264 |
offset += len(request.data)
|
265 |
+ |
|
208 | 266 |
if request.finish_write:
|
209 | 267 |
if client_digest.size_bytes != offset:
|
210 | 268 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
211 | 269 |
return response
|
212 | 270 |
out.flush()
|
213 |
- digest = self.cas.add_object(path=out.name)
|
|
271 |
+ digest = self.cas.add_object(path=out.name, link_directly=True)
|
|
214 | 272 |
if digest.hash != client_digest.hash:
|
215 | 273 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
216 | 274 |
return response
|
... | ... | @@ -230,10 +288,17 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres |
230 | 288 |
def FindMissingBlobs(self, request, context):
|
231 | 289 |
response = remote_execution_pb2.FindMissingBlobsResponse()
|
232 | 290 |
for digest in request.blob_digests:
|
233 |
- if not _has_object(self.cas, digest):
|
|
234 |
- d = response.missing_blob_digests.add()
|
|
235 |
- d.hash = digest.hash
|
|
236 |
- d.size_bytes = digest.size_bytes
|
|
291 |
+ objpath = self.cas.objpath(digest)
|
|
292 |
+ try:
|
|
293 |
+ os.utime(objpath)
|
|
294 |
+ except OSError as e:
|
|
295 |
+ if e.errno != errno.ENOENT:
|
|
296 |
+ raise
|
|
297 |
+ else:
|
|
298 |
+ d = response.missing_blob_digests.add()
|
|
299 |
+ d.hash = digest.hash
|
|
300 |
+ d.size_bytes = digest.size_bytes
|
|
301 |
+ |
|
237 | 302 |
return response
|
238 | 303 |
|
239 | 304 |
def BatchReadBlobs(self, request, context):
|
... | ... | @@ -362,11 +427,6 @@ def _digest_from_upload_resource_name(resource_name): |
362 | 427 |
return None
|
363 | 428 |
|
364 | 429 |
|
365 |
-def _has_object(cas, digest):
|
|
366 |
- objpath = cas.objpath(digest)
|
|
367 |
- return os.path.exists(objpath)
|
|
368 |
- |
|
369 |
- |
|
370 | 430 |
# _clean_up_cache()
|
371 | 431 |
#
|
372 | 432 |
# Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
|
... | ... | @@ -399,7 +459,14 @@ def _clean_up_cache(cas, object_size): |
399 | 459 |
# obtain a list of LRP artifacts
|
400 | 460 |
LRP_artifacts = cas.list_artifacts()
|
401 | 461 |
|
462 |
+ keep_after = time.time() - _OBJECT_MIN_AGE
|
|
463 |
+ |
|
402 | 464 |
removed_size = 0 # in bytes
|
465 |
+ if object_size - removed_size > free_disk_space:
|
|
466 |
+ # First we try to see if some unreferenced objects became old
|
|
467 |
+ # enough to be removed.
|
|
468 |
+ removed_size += cas.prune(keep_after=keep_after)
|
|
469 |
+ |
|
403 | 470 |
while object_size - removed_size > free_disk_space:
|
404 | 471 |
try:
|
405 | 472 |
to_remove = LRP_artifacts.pop(0) # The first element in the list is the LRP artifact
|
... | ... | @@ -411,7 +478,8 @@ def _clean_up_cache(cas, object_size): |
411 | 478 |
"the filesystem which mounts the remote "
|
412 | 479 |
"cache".format(object_size))
|
413 | 480 |
|
414 |
- removed_size += cas.remove(to_remove, defer_prune=False)
|
|
481 |
+ cas.remove(to_remove, defer_prune=True)
|
|
482 |
+ removed_size += cas.prune(keep_after=keep_after)
|
|
415 | 483 |
|
416 | 484 |
if removed_size > 0:
|
417 | 485 |
logging.info("Successfully removed {} bytes from the cache".format(removed_size))
|
... | ... | @@ -467,7 +467,7 @@ def node_get_project_path(node, key, project_dir, *, |
467 | 467 |
"{}: Specified path '{}' does not exist"
|
468 | 468 |
.format(provenance, path_str))
|
469 | 469 |
|
470 |
- is_inside = project_dir_path in full_resolved_path.parents or (
|
|
470 |
+ is_inside = project_dir_path.resolve() in full_resolved_path.parents or (
|
|
471 | 471 |
full_resolved_path == project_dir_path)
|
472 | 472 |
|
473 | 473 |
if path.is_absolute() or not is_inside:
|
... | ... | @@ -181,3 +181,15 @@ def test_project_refs_options(cli, datafiles): |
181 | 181 |
|
182 | 182 |
# Assert that the cache keys are different
|
183 | 183 |
assert result1.output != result2.output
|
184 |
+ |
|
185 |
+ |
|
186 |
+@pytest.mark.datafiles(os.path.join(DATA_DIR, 'element-path'))
|
|
187 |
+def test_element_path_project_path_contains_symlinks(cli, datafiles, tmpdir):
|
|
188 |
+ real_project = str(datafiles)
|
|
189 |
+ linked_project = os.path.join(str(tmpdir), 'linked')
|
|
190 |
+ os.symlink(real_project, linked_project)
|
|
191 |
+ os.makedirs(os.path.join(real_project, 'elements'), exist_ok=True)
|
|
192 |
+ with open(os.path.join(real_project, 'elements', 'element.bst'), 'w') as f:
|
|
193 |
+ f.write("kind: manual\n")
|
|
194 |
+ result = cli.run(project=linked_project, args=['show', 'element.bst'])
|
|
195 |
+ result.assert_success()
|
... | ... | @@ -231,6 +231,8 @@ def test_artifact_expires(cli, datafiles, tmpdir): |
231 | 231 |
assert cli.get_element_state(project, 'element2.bst') == 'cached'
|
232 | 232 |
assert_shared(cli, share, project, 'element2.bst')
|
233 | 233 |
|
234 |
+ share.make_all_objects_older()
|
|
235 |
+ |
|
234 | 236 |
# Create and build another element of 5 MB (This will exceed the free disk space available)
|
235 | 237 |
create_element_size('element3.bst', project, element_path, [], int(5e6))
|
236 | 238 |
result = cli.run(project=project, args=['build', 'element3.bst'])
|
... | ... | @@ -327,6 +329,8 @@ def test_recently_pulled_artifact_does_not_expire(cli, datafiles, tmpdir): |
327 | 329 |
# Ensure element1 is cached locally
|
328 | 330 |
assert cli.get_element_state(project, 'element1.bst') == 'cached'
|
329 | 331 |
|
332 |
+ share.make_all_objects_older()
|
|
333 |
+ |
|
330 | 334 |
# Create and build the element3 (of 5 MB)
|
331 | 335 |
create_element_size('element3.bst', project, element_path, [], int(5e6))
|
332 | 336 |
result = cli.run(project=project, args=['build', 'element3.bst'])
|
... | ... | @@ -122,6 +122,15 @@ class ArtifactShare(): |
122 | 122 |
except ArtifactError:
|
123 | 123 |
return False
|
124 | 124 |
|
125 |
+ def make_all_objects_older(self):
|
|
126 |
+ for root, dirs, files in os.walk(os.path.join(self.cas.casdir, 'objects')):
|
|
127 |
+ for name in files:
|
|
128 |
+ fullname = os.path.join(root, name)
|
|
129 |
+ st = os.stat(fullname)
|
|
130 |
+ mtime = st.st_mtime - 6 * 60 * 60
|
|
131 |
+ atime = st.st_atime - 6 * 60 * 60
|
|
132 |
+ os.utime(fullname, times=(atime, mtime))
|
|
133 |
+ |
|
125 | 134 |
# close():
|
126 | 135 |
#
|
127 | 136 |
# Remove the artifact share.
|