Valentin David pushed to branch valentindavid/cache_server_fill_up-1.2 at BuildStream / buildstream
Commits:
-
fd97b01f
by Chandan Singh at 2018-10-29T13:40:26Z
-
f89a8ab9
by Tristan Van Berkom at 2018-10-29T14:28:57Z
-
97376136
by Valentin David at 2018-11-08T12:49:45Z
-
60349072
by Valentin David at 2018-11-14T14:57:34Z
-
9126287e
by Valentin David at 2018-11-19T09:37:58Z
-
af6b0f5b
by Valentin David at 2018-11-19T13:11:56Z
-
300011b2
by Valentin David at 2018-11-19T13:11:56Z
-
9252a181
by Valentin David at 2018-11-19T13:11:56Z
-
73cf8e4a
by Valentin David at 2018-11-19T13:11:56Z
-
160a5c17
by Valentin David at 2018-11-19T13:11:56Z
-
b4f261b9
by Valentin David at 2018-11-19T13:11:56Z
-
1aef2941
by Valentin David at 2018-11-19T13:11:56Z
-
445ee525
by Valentin David at 2018-11-19T13:11:56Z
18 changed files:
- buildstream/_artifactcache/cascache.py
- buildstream/_artifactcache/casserver.py
- buildstream/element.py
- buildstream/scriptelement.py
- tests/frontend/buildcheckout.py
- + tests/frontend/project/elements/checkout-deps.bst
- + tests/frontend/project/files/etc-files/etc/buildstream/config
- tests/frontend/push.py
- + tests/integration/project/elements/script/corruption-2.bst
- + tests/integration/project/elements/script/corruption-image.bst
- + tests/integration/project/elements/script/corruption-integration.bst
- + tests/integration/project/elements/script/corruption.bst
- + tests/integration/project/elements/script/marked-tmpdir.bst
- + tests/integration/project/elements/script/no-tmpdir.bst
- + tests/integration/project/elements/script/tmpdir.bst
- + tests/integration/project/files/canary
- tests/integration/script.py
- tests/testutils/artifactshare.py
Changes:
... | ... | @@ -26,6 +26,7 @@ import stat |
26 | 26 |
import tempfile
|
27 | 27 |
import uuid
|
28 | 28 |
import errno
|
29 |
+import contextlib
|
|
29 | 30 |
from urllib.parse import urlparse
|
30 | 31 |
|
31 | 32 |
import grpc
|
... | ... | @@ -48,6 +49,13 @@ from . import ArtifactCache |
48 | 49 |
_MAX_PAYLOAD_BYTES = 1024 * 1024
|
49 | 50 |
|
50 | 51 |
|
52 |
+class BlobNotFound(ArtifactError):
|
|
53 |
+ |
|
54 |
+ def __init__(self, blob, msg):
|
|
55 |
+ self.blob = blob
|
|
56 |
+ super().__init__(msg)
|
|
57 |
+ |
|
58 |
+ |
|
51 | 59 |
# A CASCache manages artifacts in a CAS repository as specified in the
|
52 | 60 |
# Remote Execution API.
|
53 | 61 |
#
|
... | ... | @@ -259,6 +267,10 @@ class CASCache(ArtifactCache): |
259 | 267 |
element.info("Remote ({}) does not have {} cached".format(
|
260 | 268 |
remote.spec.url, element._get_brief_display_key()
|
261 | 269 |
))
|
270 |
+ except BlobNotFound as e:
|
|
271 |
+ element.info("Remote ({}) does not have {} cached".format(
|
|
272 |
+ remote.spec.url, element._get_brief_display_key()
|
|
273 |
+ ))
|
|
262 | 274 |
|
263 | 275 |
return False
|
264 | 276 |
|
... | ... | @@ -360,13 +372,14 @@ class CASCache(ArtifactCache): |
360 | 372 |
# digest (Digest): An optional Digest object to populate
|
361 | 373 |
# path (str): Path to file to add
|
362 | 374 |
# buffer (bytes): Byte buffer to add
|
375 |
+ # link_directly (bool): Whether file given by path can be linked
|
|
363 | 376 |
#
|
364 | 377 |
# Returns:
|
365 | 378 |
# (Digest): The digest of the added object
|
366 | 379 |
#
|
367 | 380 |
# Either `path` or `buffer` must be passed, but not both.
|
368 | 381 |
#
|
369 |
- def add_object(self, *, digest=None, path=None, buffer=None):
|
|
382 |
+ def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False):
|
|
370 | 383 |
# Exactly one of the two parameters has to be specified
|
371 | 384 |
assert (path is None) != (buffer is None)
|
372 | 385 |
|
... | ... | @@ -376,28 +389,34 @@ class CASCache(ArtifactCache): |
376 | 389 |
try:
|
377 | 390 |
h = hashlib.sha256()
|
378 | 391 |
# Always write out new file to avoid corruption if input file is modified
|
379 |
- with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
|
|
380 |
- # Set mode bits to 0644
|
|
381 |
- os.chmod(out.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
|
|
382 |
- |
|
383 |
- if path:
|
|
384 |
- with open(path, 'rb') as f:
|
|
385 |
- for chunk in iter(lambda: f.read(4096), b""):
|
|
386 |
- h.update(chunk)
|
|
387 |
- out.write(chunk)
|
|
392 |
+ with contextlib.ExitStack() as stack:
|
|
393 |
+ if path is not None and link_directly:
|
|
394 |
+ tmp = stack.enter_context(open(path, 'rb'))
|
|
395 |
+ for chunk in iter(lambda: tmp.read(4096), b""):
|
|
396 |
+ h.update(chunk)
|
|
388 | 397 |
else:
|
389 |
- h.update(buffer)
|
|
390 |
- out.write(buffer)
|
|
398 |
+ tmp = stack.enter_context(tempfile.NamedTemporaryFile(dir=self.tmpdir))
|
|
399 |
+ # Set mode bits to 0644
|
|
400 |
+ os.chmod(tmp.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
|
|
391 | 401 |
|
392 |
- out.flush()
|
|
402 |
+ if path:
|
|
403 |
+ with open(path, 'rb') as f:
|
|
404 |
+ for chunk in iter(lambda: f.read(4096), b""):
|
|
405 |
+ h.update(chunk)
|
|
406 |
+ tmp.write(chunk)
|
|
407 |
+ else:
|
|
408 |
+ h.update(buffer)
|
|
409 |
+ tmp.write(buffer)
|
|
410 |
+ |
|
411 |
+ tmp.flush()
|
|
393 | 412 |
|
394 | 413 |
digest.hash = h.hexdigest()
|
395 |
- digest.size_bytes = os.fstat(out.fileno()).st_size
|
|
414 |
+ digest.size_bytes = os.fstat(tmp.fileno()).st_size
|
|
396 | 415 |
|
397 | 416 |
# Place file at final location
|
398 | 417 |
objpath = self.objpath(digest)
|
399 | 418 |
os.makedirs(os.path.dirname(objpath), exist_ok=True)
|
400 |
- os.link(out.name, objpath)
|
|
419 |
+ os.link(tmp.name, objpath)
|
|
401 | 420 |
|
402 | 421 |
except FileExistsError as e:
|
403 | 422 |
# We can ignore the failed link() if the object is already in the repo.
|
... | ... | @@ -481,6 +500,41 @@ class CASCache(ArtifactCache): |
481 | 500 |
# first element of this list will be the file modified earliest.
|
482 | 501 |
return [ref for _, ref in sorted(zip(mtimes, refs))]
|
483 | 502 |
|
503 |
+ # list_objects():
|
|
504 |
+ #
|
|
505 |
+ # List cached objects in Least Recently Modified (LRM) order.
|
|
506 |
+ #
|
|
507 |
+ # Returns:
|
|
508 |
+ # (list) - A list of objects and timestamps in LRM order
|
|
509 |
+ #
|
|
510 |
+ def list_objects(self):
|
|
511 |
+ objs = []
|
|
512 |
+ mtimes = []
|
|
513 |
+ |
|
514 |
+ for root, _, files in os.walk(os.path.join(self.casdir, 'objects')):
|
|
515 |
+ for filename in files:
|
|
516 |
+ obj_path = os.path.join(root, filename)
|
|
517 |
+ try:
|
|
518 |
+ mtimes.append(os.path.getmtime(obj_path))
|
|
519 |
+ except FileNotFoundError:
|
|
520 |
+ pass
|
|
521 |
+ else:
|
|
522 |
+ objs.append(obj_path)
|
|
523 |
+ |
|
524 |
+ # NOTE: Sorted will sort from earliest to latest, thus the
|
|
525 |
+ # first element of this list will be the file modified earliest.
|
|
526 |
+ return sorted(zip(mtimes, objs))
|
|
527 |
+ |
|
528 |
+ def clean_up_refs_until(self, time):
|
|
529 |
+ ref_heads = os.path.join(self.casdir, 'refs', 'heads')
|
|
530 |
+ |
|
531 |
+ for root, _, files in os.walk(ref_heads):
|
|
532 |
+ for filename in files:
|
|
533 |
+ ref_path = os.path.join(root, filename)
|
|
534 |
+ # Obtain the mtime (the time a file was last modified)
|
|
535 |
+ if os.path.getmtime(ref_path) < time:
|
|
536 |
+ os.unlink(ref_path)
|
|
537 |
+ |
|
484 | 538 |
# remove():
|
485 | 539 |
#
|
486 | 540 |
# Removes the given symbolic ref from the repo.
|
... | ... | @@ -558,6 +612,10 @@ class CASCache(ArtifactCache): |
558 | 612 |
|
559 | 613 |
return pruned
|
560 | 614 |
|
615 |
+ def update_tree_mtime(self, tree):
|
|
616 |
+ reachable = set()
|
|
617 |
+ self._reachable_refs_dir(reachable, tree, update_mtime=True)
|
|
618 |
+ |
|
561 | 619 |
################################################
|
562 | 620 |
# Local Private Methods #
|
563 | 621 |
################################################
|
... | ... | @@ -699,10 +757,13 @@ class CASCache(ArtifactCache): |
699 | 757 |
a += 1
|
700 | 758 |
b += 1
|
701 | 759 |
|
702 |
- def _reachable_refs_dir(self, reachable, tree):
|
|
760 |
+ def _reachable_refs_dir(self, reachable, tree, update_mtime=False):
|
|
703 | 761 |
if tree.hash in reachable:
|
704 | 762 |
return
|
705 | 763 |
|
764 |
+ if update_mtime:
|
|
765 |
+ os.utime(self.objpath(tree))
|
|
766 |
+ |
|
706 | 767 |
reachable.add(tree.hash)
|
707 | 768 |
|
708 | 769 |
directory = remote_execution_pb2.Directory()
|
... | ... | @@ -711,10 +772,12 @@ class CASCache(ArtifactCache): |
711 | 772 |
directory.ParseFromString(f.read())
|
712 | 773 |
|
713 | 774 |
for filenode in directory.files:
|
775 |
+ if update_mtime:
|
|
776 |
+ os.utime(self.objpath(filenode.digest))
|
|
714 | 777 |
reachable.add(filenode.digest.hash)
|
715 | 778 |
|
716 | 779 |
for dirnode in directory.directories:
|
717 |
- self._reachable_refs_dir(reachable, dirnode.digest)
|
|
780 |
+ self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime)
|
|
718 | 781 |
|
719 | 782 |
def _initialize_remote(self, remote_spec, q):
|
720 | 783 |
try:
|
... | ... | @@ -791,7 +854,7 @@ class CASCache(ArtifactCache): |
791 | 854 |
with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
|
792 | 855 |
self._fetch_blob(remote, digest, f)
|
793 | 856 |
|
794 |
- added_digest = self.add_object(path=f.name)
|
|
857 |
+ added_digest = self.add_object(path=f.name, link_directly=True)
|
|
795 | 858 |
assert added_digest.hash == digest.hash
|
796 | 859 |
|
797 | 860 |
return objpath
|
... | ... | @@ -802,7 +865,7 @@ class CASCache(ArtifactCache): |
802 | 865 |
f.write(data)
|
803 | 866 |
f.flush()
|
804 | 867 |
|
805 |
- added_digest = self.add_object(path=f.name)
|
|
868 |
+ added_digest = self.add_object(path=f.name, link_directly=True)
|
|
806 | 869 |
assert added_digest.hash == digest.hash
|
807 | 870 |
|
808 | 871 |
# Helper function for _fetch_directory().
|
... | ... | @@ -1079,6 +1142,9 @@ class _CASBatchRead(): |
1079 | 1142 |
batch_response = self._remote.cas.BatchReadBlobs(self._request)
|
1080 | 1143 |
|
1081 | 1144 |
for response in batch_response.responses:
|
1145 |
+ if response.status.code == grpc.StatusCode.NOT_FOUND.value[0]:
|
|
1146 |
+ raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format(
|
|
1147 |
+ response.digest.hash, response.status.code))
|
|
1082 | 1148 |
if response.status.code != grpc.StatusCode.OK.value[0]:
|
1083 | 1149 |
raise ArtifactError("Failed to download blob {}: {}".format(
|
1084 | 1150 |
response.digest.hash, response.status.code))
|
... | ... | @@ -24,6 +24,9 @@ import signal |
24 | 24 |
import sys
|
25 | 25 |
import tempfile
|
26 | 26 |
import uuid
|
27 |
+import errno
|
|
28 |
+import ctypes
|
|
29 |
+import threading
|
|
27 | 30 |
|
28 | 31 |
import click
|
29 | 32 |
import grpc
|
... | ... | @@ -56,7 +59,9 @@ class ArtifactTooLargeException(Exception): |
56 | 59 |
# repo (str): Path to CAS repository
|
57 | 60 |
# enable_push (bool): Whether to allow blob uploads and artifact updates
|
58 | 61 |
#
|
59 |
-def create_server(repo, *, enable_push):
|
|
62 |
+def create_server(repo, *, enable_push,
|
|
63 |
+ max_head_size=int(10e9),
|
|
64 |
+ min_head_size=int(2e9)):
|
|
60 | 65 |
context = Context()
|
61 | 66 |
context.artifactdir = os.path.abspath(repo)
|
62 | 67 |
|
... | ... | @@ -66,11 +71,13 @@ def create_server(repo, *, enable_push): |
66 | 71 |
max_workers = (os.cpu_count() or 1) * 5
|
67 | 72 |
server = grpc.server(futures.ThreadPoolExecutor(max_workers))
|
68 | 73 |
|
74 |
+ cache_cleaner = _CacheCleaner(artifactcache, max_head_size, min_head_size)
|
|
75 |
+ |
|
69 | 76 |
bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
|
70 |
- _ByteStreamServicer(artifactcache, enable_push=enable_push), server)
|
|
77 |
+ _ByteStreamServicer(artifactcache, cache_cleaner, enable_push=enable_push), server)
|
|
71 | 78 |
|
72 | 79 |
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
|
73 |
- _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
|
|
80 |
+ _ContentAddressableStorageServicer(artifactcache, cache_cleaner, enable_push=enable_push), server)
|
|
74 | 81 |
|
75 | 82 |
remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
|
76 | 83 |
_CapabilitiesServicer(), server)
|
... | ... | @@ -88,9 +95,19 @@ def create_server(repo, *, enable_push): |
88 | 95 |
@click.option('--client-certs', help="Public client certificates for TLS (PEM-encoded)")
|
89 | 96 |
@click.option('--enable-push', default=False, is_flag=True,
|
90 | 97 |
help="Allow clients to upload blobs and update artifact cache")
|
98 |
+@click.option('--head-room-min', type=click.INT,
|
|
99 |
+ help="Disk head room minimum in bytes",
|
|
100 |
+ default=2e9)
|
|
101 |
+@click.option('--head-room-max', type=click.INT,
|
|
102 |
+ help="Disk head room maximum in bytes",
|
|
103 |
+ default=10e9)
|
|
91 | 104 |
@click.argument('repo')
|
92 |
-def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
|
|
93 |
- server = create_server(repo, enable_push=enable_push)
|
|
105 |
+def server_main(repo, port, server_key, server_cert, client_certs, enable_push,
|
|
106 |
+ head_room_min, head_room_max):
|
|
107 |
+ server = create_server(repo,
|
|
108 |
+ max_head_size=head_room_max,
|
|
109 |
+ min_head_size=head_room_min,
|
|
110 |
+ enable_push=enable_push)
|
|
94 | 111 |
|
95 | 112 |
use_tls = bool(server_key)
|
96 | 113 |
|
... | ... | @@ -131,11 +148,43 @@ def server_main(repo, port, server_key, server_cert, client_certs, enable_push): |
131 | 148 |
server.stop(0)
|
132 | 149 |
|
133 | 150 |
|
151 |
+class _FallocateCall:
|
|
152 |
+ |
|
153 |
+ FALLOC_FL_KEEP_SIZE = 1
|
|
154 |
+ FALLOC_FL_PUNCH_HOLE = 2
|
|
155 |
+ FALLOC_FL_NO_HIDE_STALE = 4
|
|
156 |
+ FALLOC_FL_COLLAPSE_RANGE = 8
|
|
157 |
+ FALLOC_FL_ZERO_RANGE = 16
|
|
158 |
+ FALLOC_FL_INSERT_RANGE = 32
|
|
159 |
+ FALLOC_FL_UNSHARE_RANGE = 64
|
|
160 |
+ |
|
161 |
+ def __init__(self):
|
|
162 |
+ self.libc = ctypes.CDLL("libc.so.6", use_errno=True)
|
|
163 |
+ try:
|
|
164 |
+ self.fallocate64 = self.libc.fallocate64
|
|
165 |
+ except AttributeError:
|
|
166 |
+ self.fallocate = self.libc.fallocate
|
|
167 |
+ |
|
168 |
+ def __call__(self, fd, mode, offset, length):
|
|
169 |
+ if hasattr(self, 'fallocate64'):
|
|
170 |
+ ret = self.fallocate64(ctypes.c_int(fd), ctypes.c_int(mode),
|
|
171 |
+ ctypes.c_int64(offset), ctypes.c_int64(length))
|
|
172 |
+ else:
|
|
173 |
+ ret = self.fallocate(ctypes.c_int(fd), ctypes.c_int(mode),
|
|
174 |
+ ctypes.c_int(offset), ctypes.c_int(length))
|
|
175 |
+ if ret == -1:
|
|
176 |
+ err = ctypes.get_errno()
|
|
177 |
+ raise OSError(errno, os.strerror(err))
|
|
178 |
+ return ret
|
|
179 |
+ |
|
180 |
+ |
|
134 | 181 |
class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
|
135 |
- def __init__(self, cas, *, enable_push):
|
|
182 |
+ def __init__(self, cas, cache_cleaner, *, enable_push):
|
|
136 | 183 |
super().__init__()
|
137 | 184 |
self.cas = cas
|
138 | 185 |
self.enable_push = enable_push
|
186 |
+ self.fallocate = _FallocateCall()
|
|
187 |
+ self.cache_cleaner = cache_cleaner
|
|
139 | 188 |
|
140 | 189 |
def Read(self, request, context):
|
141 | 190 |
resource_name = request.resource_name
|
... | ... | @@ -193,17 +242,34 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
193 | 242 |
context.set_code(grpc.StatusCode.NOT_FOUND)
|
194 | 243 |
return response
|
195 | 244 |
|
196 |
- try:
|
|
197 |
- _clean_up_cache(self.cas, client_digest.size_bytes)
|
|
198 |
- except ArtifactTooLargeException as e:
|
|
199 |
- context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
|
|
200 |
- context.set_details(str(e))
|
|
201 |
- return response
|
|
245 |
+ while True:
|
|
246 |
+ if client_digest.size_bytes == 0:
|
|
247 |
+ break
|
|
248 |
+ try:
|
|
249 |
+ self.cache_cleaner.clean_up(client_digest.size_bytes)
|
|
250 |
+ except ArtifactTooLargeException as e:
|
|
251 |
+ context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
|
|
252 |
+ context.set_details(str(e))
|
|
253 |
+ return response
|
|
254 |
+ |
|
255 |
+ try:
|
|
256 |
+ self.fallocate(out.fileno(), 0, 0, client_digest.size_bytes)
|
|
257 |
+ break
|
|
258 |
+ except OSError as e:
|
|
259 |
+ # Multiple upload can happen in the same time
|
|
260 |
+ if e.errno != errno.ENOSPC:
|
|
261 |
+ raise
|
|
262 |
+ |
|
202 | 263 |
elif request.resource_name:
|
203 | 264 |
# If it is set on subsequent calls, it **must** match the value of the first request.
|
204 | 265 |
if request.resource_name != resource_name:
|
205 | 266 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
206 | 267 |
return response
|
268 |
+ |
|
269 |
+ if (offset + len(request.data)) > client_digest.size_bytes:
|
|
270 |
+ context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
|
271 |
+ return response
|
|
272 |
+ |
|
207 | 273 |
out.write(request.data)
|
208 | 274 |
offset += len(request.data)
|
209 | 275 |
if request.finish_write:
|
... | ... | @@ -211,7 +277,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
211 | 277 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
212 | 278 |
return response
|
213 | 279 |
out.flush()
|
214 |
- digest = self.cas.add_object(path=out.name)
|
|
280 |
+ digest = self.cas.add_object(path=out.name, link_directly=True)
|
|
215 | 281 |
if digest.hash != client_digest.hash:
|
216 | 282 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
217 | 283 |
return response
|
... | ... | @@ -224,18 +290,26 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
224 | 290 |
|
225 | 291 |
|
226 | 292 |
class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
|
227 |
- def __init__(self, cas, *, enable_push):
|
|
293 |
+ def __init__(self, cas, cache_cleaner, *, enable_push):
|
|
228 | 294 |
super().__init__()
|
229 | 295 |
self.cas = cas
|
230 | 296 |
self.enable_push = enable_push
|
297 |
+ self.cache_cleaner = cache_cleaner
|
|
231 | 298 |
|
232 | 299 |
def FindMissingBlobs(self, request, context):
|
233 | 300 |
response = remote_execution_pb2.FindMissingBlobsResponse()
|
234 | 301 |
for digest in request.blob_digests:
|
235 |
- if not _has_object(self.cas, digest):
|
|
236 |
- d = response.missing_blob_digests.add()
|
|
237 |
- d.hash = digest.hash
|
|
238 |
- d.size_bytes = digest.size_bytes
|
|
302 |
+ objpath = self.cas.objpath(digest)
|
|
303 |
+ try:
|
|
304 |
+ os.utime(objpath)
|
|
305 |
+ except OSError as e:
|
|
306 |
+ if e.errno != errno.ENOENT:
|
|
307 |
+ raise
|
|
308 |
+ else:
|
|
309 |
+ d = response.missing_blob_digests.add()
|
|
310 |
+ d.hash = digest.hash
|
|
311 |
+ d.size_bytes = digest.size_bytes
|
|
312 |
+ |
|
239 | 313 |
return response
|
240 | 314 |
|
241 | 315 |
def BatchReadBlobs(self, request, context):
|
... | ... | @@ -254,12 +328,12 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres |
254 | 328 |
try:
|
255 | 329 |
with open(self.cas.objpath(digest), 'rb') as f:
|
256 | 330 |
if os.fstat(f.fileno()).st_size != digest.size_bytes:
|
257 |
- blob_response.status.code = grpc.StatusCode.NOT_FOUND
|
|
331 |
+ blob_response.status.code = grpc.StatusCode.NOT_FOUND.value[0]
|
|
258 | 332 |
continue
|
259 | 333 |
|
260 | 334 |
blob_response.data = f.read(digest.size_bytes)
|
261 | 335 |
except FileNotFoundError:
|
262 |
- blob_response.status.code = grpc.StatusCode.NOT_FOUND
|
|
336 |
+ blob_response.status.code = grpc.StatusCode.NOT_FOUND.value[0]
|
|
263 | 337 |
|
264 | 338 |
return response
|
265 | 339 |
|
... | ... | @@ -289,7 +363,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres |
289 | 363 |
continue
|
290 | 364 |
|
291 | 365 |
try:
|
292 |
- _clean_up_cache(self.cas, digest.size_bytes)
|
|
366 |
+ self.cache_cleaner.clean_up(digest.size_bytes)
|
|
293 | 367 |
|
294 | 368 |
with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
|
295 | 369 |
out.write(blob_request.data)
|
... | ... | @@ -332,6 +406,12 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): |
332 | 406 |
|
333 | 407 |
try:
|
334 | 408 |
tree = self.cas.resolve_ref(request.key, update_mtime=True)
|
409 |
+ try:
|
|
410 |
+ self.cas.update_tree_mtime(tree)
|
|
411 |
+ except FileNotFoundError:
|
|
412 |
+ self.cas.remove(request.key, defer_prune=True)
|
|
413 |
+ context.set_code(grpc.StatusCode.NOT_FOUND)
|
|
414 |
+ return response
|
|
335 | 415 |
|
336 | 416 |
response.digest.hash = tree.hash
|
337 | 417 |
response.digest.size_bytes = tree.size_bytes
|
... | ... | @@ -404,60 +484,79 @@ def _digest_from_upload_resource_name(resource_name): |
404 | 484 |
return None
|
405 | 485 |
|
406 | 486 |
|
407 |
-def _has_object(cas, digest):
|
|
408 |
- objpath = cas.objpath(digest)
|
|
409 |
- return os.path.exists(objpath)
|
|
487 |
+class _CacheCleaner:
|
|
410 | 488 |
|
489 |
+ __cleanup_cache_lock = threading.Lock()
|
|
411 | 490 |
|
412 |
-# _clean_up_cache()
|
|
413 |
-#
|
|
414 |
-# Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
|
|
415 |
-# is enough space for the incoming artifact
|
|
416 |
-#
|
|
417 |
-# Args:
|
|
418 |
-# cas: CASCache object
|
|
419 |
-# object_size: The size of the object being received in bytes
|
|
420 |
-#
|
|
421 |
-# Returns:
|
|
422 |
-# int: The total bytes removed on the filesystem
|
|
423 |
-#
|
|
424 |
-def _clean_up_cache(cas, object_size):
|
|
425 |
- # Determine the available disk space, in bytes, of the file system
|
|
426 |
- # which mounts the repo
|
|
427 |
- stats = os.statvfs(cas.casdir)
|
|
428 |
- buffer_ = int(2e9) # Add a 2 GB buffer
|
|
429 |
- free_disk_space = (stats.f_bfree * stats.f_bsize) - buffer_
|
|
430 |
- total_disk_space = (stats.f_blocks * stats.f_bsize) - buffer_
|
|
431 |
- |
|
432 |
- if object_size > total_disk_space:
|
|
433 |
- raise ArtifactTooLargeException("Artifact of size: {} is too large for "
|
|
434 |
- "the filesystem which mounts the remote "
|
|
435 |
- "cache".format(object_size))
|
|
436 |
- |
|
437 |
- if object_size <= free_disk_space:
|
|
438 |
- # No need to clean up
|
|
439 |
- return 0
|
|
440 |
- |
|
441 |
- # obtain a list of LRP artifacts
|
|
442 |
- LRP_artifacts = cas.list_artifacts()
|
|
443 |
- |
|
444 |
- removed_size = 0 # in bytes
|
|
445 |
- while object_size - removed_size > free_disk_space:
|
|
446 |
- try:
|
|
447 |
- to_remove = LRP_artifacts.pop(0) # The first element in the list is the LRP artifact
|
|
448 |
- except IndexError:
|
|
449 |
- # This exception is caught if there are no more artifacts in the list
|
|
450 |
- # LRP_artifacts. This means the the artifact is too large for the filesystem
|
|
451 |
- # so we abort the process
|
|
452 |
- raise ArtifactTooLargeException("Artifact of size {} is too large for "
|
|
453 |
- "the filesystem which mounts the remote "
|
|
454 |
- "cache".format(object_size))
|
|
491 |
+ def __init__(self, cas, max_head_size, min_head_size=int(2e9)):
|
|
492 |
+ self.__cas = cas
|
|
493 |
+ self.__max_head_size = max_head_size
|
|
494 |
+ self.__min_head_size = min_head_size
|
|
455 | 495 |
|
456 |
- removed_size += cas.remove(to_remove, defer_prune=False)
|
|
496 |
+ def __has_space(self, object_size):
|
|
497 |
+ stats = os.statvfs(self.__cas.casdir)
|
|
498 |
+ free_disk_space = (stats.f_bavail * stats.f_bsize) - self.__min_head_size
|
|
499 |
+ total_disk_space = (stats.f_blocks * stats.f_bsize) - self.__min_head_size
|
|
457 | 500 |
|
458 |
- if removed_size > 0:
|
|
459 |
- logging.info("Successfully removed {} bytes from the cache".format(removed_size))
|
|
460 |
- else:
|
|
461 |
- logging.info("No artifacts were removed from the cache.")
|
|
501 |
+ if object_size > total_disk_space:
|
|
502 |
+ raise ArtifactTooLargeException("Artifact of size: {} is too large for "
|
|
503 |
+ "the filesystem which mounts the remote "
|
|
504 |
+ "cache".format(object_size))
|
|
462 | 505 |
|
463 |
- return removed_size
|
|
506 |
+ return object_size <= free_disk_space
|
|
507 |
+ |
|
508 |
+ # _clean_up_cache()
|
|
509 |
+ #
|
|
510 |
+ # Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
|
|
511 |
+ # is enough space for the incoming artifact
|
|
512 |
+ #
|
|
513 |
+ # Args:
|
|
514 |
+ # object_size: The size of the object being received in bytes
|
|
515 |
+ #
|
|
516 |
+ # Returns:
|
|
517 |
+ # int: The total bytes removed on the filesystem
|
|
518 |
+ #
|
|
519 |
+ def clean_up(self, object_size):
|
|
520 |
+ if self.__has_space(object_size):
|
|
521 |
+ return 0
|
|
522 |
+ |
|
523 |
+ with _CacheCleaner.__cleanup_cache_lock:
|
|
524 |
+ if self.__has_space(object_size):
|
|
525 |
+ # Another thread has done the cleanup for us
|
|
526 |
+ return 0
|
|
527 |
+ |
|
528 |
+ stats = os.statvfs(self.__cas.casdir)
|
|
529 |
+ target_disk_space = (stats.f_bavail * stats.f_bsize) - self.__max_head_size
|
|
530 |
+ |
|
531 |
+ # obtain a list of LRP artifacts
|
|
532 |
+ LRP_objects = self.__cas.list_objects()
|
|
533 |
+ |
|
534 |
+ removed_size = 0 # in bytes
|
|
535 |
+ last_mtime = 0
|
|
536 |
+ |
|
537 |
+ while object_size - removed_size > target_disk_space:
|
|
538 |
+ try:
|
|
539 |
+ last_mtime, to_remove = LRP_objects.pop(0) # The first element in the list is the LRP artifact
|
|
540 |
+ except IndexError:
|
|
541 |
+ # This exception is caught if there are no more artifacts in the list
|
|
542 |
+ # LRP_artifacts. This means the the artifact is too large for the filesystem
|
|
543 |
+ # so we abort the process
|
|
544 |
+ raise ArtifactTooLargeException("Artifact of size {} is too large for "
|
|
545 |
+ "the filesystem which mounts the remote "
|
|
546 |
+ "cache".format(object_size))
|
|
547 |
+ |
|
548 |
+ try:
|
|
549 |
+ size = os.stat(to_remove).st_size
|
|
550 |
+ os.unlink(to_remove)
|
|
551 |
+ removed_size += size
|
|
552 |
+ except FileNotFoundError:
|
|
553 |
+ pass
|
|
554 |
+ |
|
555 |
+ self.__cas.clean_up_refs_until(last_mtime)
|
|
556 |
+ |
|
557 |
+ if removed_size > 0:
|
|
558 |
+ logging.info("Successfully removed {} bytes from the cache".format(removed_size))
|
|
559 |
+ else:
|
|
560 |
+ logging.info("No artifacts were removed from the cache.")
|
|
561 |
+ |
|
562 |
+ return removed_size
|
... | ... | @@ -434,7 +434,7 @@ class Element(Plugin): |
434 | 434 |
visited=visited, recursed=True)
|
435 | 435 |
|
436 | 436 |
# Yeild self only at the end, after anything needed has been traversed
|
437 |
- if should_yield and (recurse or recursed) and (scope == Scope.ALL or scope == Scope.RUN):
|
|
437 |
+ if should_yield and (recurse or recursed) and scope != Scope.BUILD:
|
|
438 | 438 |
yield self
|
439 | 439 |
|
440 | 440 |
def search(self, scope, name):
|
... | ... | @@ -1289,17 +1289,21 @@ class Element(Plugin): |
1289 | 1289 |
if scope == Scope.BUILD:
|
1290 | 1290 |
self.stage(sandbox)
|
1291 | 1291 |
elif scope == Scope.RUN:
|
1292 |
- # Stage deps in the sandbox root
|
|
1293 | 1292 |
if deps == 'run':
|
1294 |
- with self.timed_activity("Staging dependencies", silent_nested=True):
|
|
1295 |
- self.stage_dependency_artifacts(sandbox, scope)
|
|
1296 |
- |
|
1297 |
- # Run any integration commands provided by the dependencies
|
|
1298 |
- # once they are all staged and ready
|
|
1299 |
- if integrate:
|
|
1300 |
- with self.timed_activity("Integrating sandbox"):
|
|
1301 |
- for dep in self.dependencies(scope):
|
|
1302 |
- dep.integrate(sandbox)
|
|
1293 |
+ dependency_scope = Scope.RUN
|
|
1294 |
+ else:
|
|
1295 |
+ dependency_scope = None
|
|
1296 |
+ |
|
1297 |
+ # Stage deps in the sandbox root
|
|
1298 |
+ with self.timed_activity("Staging dependencies", silent_nested=True):
|
|
1299 |
+ self.stage_dependency_artifacts(sandbox, dependency_scope)
|
|
1300 |
+ |
|
1301 |
+ # Run any integration commands provided by the dependencies
|
|
1302 |
+ # once they are all staged and ready
|
|
1303 |
+ if integrate:
|
|
1304 |
+ with self.timed_activity("Integrating sandbox"):
|
|
1305 |
+ for dep in self.dependencies(dependency_scope):
|
|
1306 |
+ dep.integrate(sandbox)
|
|
1303 | 1307 |
|
1304 | 1308 |
yield sandbox
|
1305 | 1309 |
|
... | ... | @@ -201,16 +201,20 @@ class ScriptElement(Element): |
201 | 201 |
# Setup environment
|
202 | 202 |
sandbox.set_environment(self.get_environment())
|
203 | 203 |
|
204 |
+ # Tell the sandbox to mount the install root
|
|
205 |
+ directories = {self.__install_root: False}
|
|
206 |
+ |
|
204 | 207 |
# Mark the artifact directories in the layout
|
205 | 208 |
for item in self.__layout:
|
206 |
- if item['destination'] != '/':
|
|
207 |
- if item['element']:
|
|
208 |
- sandbox.mark_directory(item['destination'], artifact=True)
|
|
209 |
- else:
|
|
210 |
- sandbox.mark_directory(item['destination'])
|
|
211 |
- |
|
212 |
- # Tell the sandbox to mount the install root
|
|
213 |
- sandbox.mark_directory(self.__install_root)
|
|
209 |
+ destination = item['destination']
|
|
210 |
+ was_artifact = directories.get(destination, False)
|
|
211 |
+ directories[destination] = item['element'] or was_artifact
|
|
212 |
+ |
|
213 |
+ for directory, artifact in directories.items():
|
|
214 |
+ # Root does not need to be marked as it is always mounted
|
|
215 |
+ # with artifact (unless explicitly marked non-artifact)
|
|
216 |
+ if directory != '/':
|
|
217 |
+ sandbox.mark_directory(directory, artifact=artifact)
|
|
214 | 218 |
|
215 | 219 |
def stage(self, sandbox):
|
216 | 220 |
|
... | ... | @@ -65,9 +65,10 @@ def test_build_checkout(datafiles, cli, strict, hardlinks): |
65 | 65 |
def test_build_checkout_deps(datafiles, cli, deps):
|
66 | 66 |
project = os.path.join(datafiles.dirname, datafiles.basename)
|
67 | 67 |
checkout = os.path.join(cli.directory, 'checkout')
|
68 |
+ element_name = "checkout-deps.bst"
|
|
68 | 69 |
|
69 | 70 |
# First build it
|
70 |
- result = cli.run(project=project, args=['build', 'target.bst'])
|
|
71 |
+ result = cli.run(project=project, args=['build', element_name])
|
|
71 | 72 |
result.assert_success()
|
72 | 73 |
|
73 | 74 |
# Assert that after a successful build, the builddir is empty
|
... | ... | @@ -76,20 +77,15 @@ def test_build_checkout_deps(datafiles, cli, deps): |
76 | 77 |
assert not os.listdir(builddir)
|
77 | 78 |
|
78 | 79 |
# Now check it out
|
79 |
- result = cli.run(project=project, args=['checkout', 'target.bst', '--deps', deps, checkout])
|
|
80 |
+ result = cli.run(project=project, args=['checkout', element_name, '--deps', deps, checkout])
|
|
80 | 81 |
result.assert_success()
|
81 | 82 |
|
82 |
- # Check that the executable hello file is found in the checkout
|
|
83 |
- filename = os.path.join(checkout, 'usr', 'bin', 'hello')
|
|
84 |
- |
|
85 |
- if deps == "run":
|
|
86 |
- assert os.path.exists(filename)
|
|
87 |
- else:
|
|
88 |
- assert not os.path.exists(filename)
|
|
89 |
- |
|
90 |
- # Check that the executable hello file is found in the checkout
|
|
91 |
- filename = os.path.join(checkout, 'usr', 'include', 'pony.h')
|
|
83 |
+ # Verify output of this element
|
|
84 |
+ filename = os.path.join(checkout, 'etc', 'buildstream', 'config')
|
|
85 |
+ assert os.path.exists(filename)
|
|
92 | 86 |
|
87 |
+ # Verify output of this element's runtime dependencies
|
|
88 |
+ filename = os.path.join(checkout, 'usr', 'bin', 'hello')
|
|
93 | 89 |
if deps == "run":
|
94 | 90 |
assert os.path.exists(filename)
|
95 | 91 |
else:
|
1 |
+kind: import
|
|
2 |
+description: It is important for this element to have both build and runtime dependencies
|
|
3 |
+sources:
|
|
4 |
+- kind: local
|
|
5 |
+ path: files/etc-files
|
|
6 |
+depends:
|
|
7 |
+- filename: import-dev.bst
|
|
8 |
+ type: build
|
|
9 |
+- filename: import-bin.bst
|
|
10 |
+ type: runtime
|
1 |
+config
|
... | ... | @@ -208,6 +208,8 @@ def test_artifact_expires(cli, datafiles, tmpdir): |
208 | 208 |
# Create an artifact share (remote artifact cache) in the tmpdir/artifactshare
|
209 | 209 |
# Mock a file system with 12 MB free disk space
|
210 | 210 |
with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'),
|
211 |
+ min_head_size=int(2e9),
|
|
212 |
+ max_head_size=int(2e9),
|
|
211 | 213 |
total_space=int(10e9), free_space=(int(12e6) + int(2e9))) as share:
|
212 | 214 |
|
213 | 215 |
# Configure bst to push to the cache
|
... | ... | @@ -291,6 +293,8 @@ def test_recently_pulled_artifact_does_not_expire(cli, datafiles, tmpdir): |
291 | 293 |
# Create an artifact share (remote cache) in tmpdir/artifactshare
|
292 | 294 |
# Mock a file system with 12 MB free disk space
|
293 | 295 |
with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'),
|
296 |
+ min_head_size=int(2e9),
|
|
297 |
+ max_head_size=int(2e9),
|
|
294 | 298 |
total_space=int(10e9), free_space=(int(12e6) + int(2e9))) as share:
|
295 | 299 |
|
296 | 300 |
# Configure bst to push to the cache
|
1 |
+kind: script
|
|
2 |
+ |
|
3 |
+depends:
|
|
4 |
+- filename: base.bst
|
|
5 |
+ type: build
|
|
6 |
+- filename: script/corruption-image.bst
|
|
7 |
+ type: build
|
|
8 |
+ |
|
9 |
+config:
|
|
10 |
+ commands:
|
|
11 |
+ - echo smashed >>/canary
|
1 |
+kind: import
|
|
2 |
+sources:
|
|
3 |
+- kind: local
|
|
4 |
+ path: files/canary
|
1 |
+kind: stack
|
|
2 |
+ |
|
3 |
+public:
|
|
4 |
+ bst:
|
|
5 |
+ integration-commands:
|
|
6 |
+ - echo smashed >>/canary
|
|
7 |
+ |
1 |
+kind: script
|
|
2 |
+ |
|
3 |
+depends:
|
|
4 |
+- filename: base.bst
|
|
5 |
+ type: build
|
|
6 |
+- filename: script/corruption-image.bst
|
|
7 |
+ type: build
|
|
8 |
+- filename: script/corruption-integration.bst
|
|
9 |
+ type: build
|
|
10 |
+ |
|
11 |
+variables:
|
|
12 |
+ install-root: "/"
|
|
13 |
+ |
|
14 |
+config:
|
|
15 |
+ layout:
|
|
16 |
+ - element: base.bst
|
|
17 |
+ destination: "/"
|
|
18 |
+ - element: script/corruption-image.bst
|
|
19 |
+ destination: "/"
|
|
20 |
+ - element: script/corruption-integration.bst
|
|
21 |
+ destination: "/"
|
1 |
+kind: compose
|
|
2 |
+ |
|
3 |
+depends:
|
|
4 |
+- filename: base.bst
|
|
5 |
+ type: build
|
|
6 |
+ |
|
7 |
+public:
|
|
8 |
+ bst:
|
|
9 |
+ split-rules:
|
|
10 |
+ remove:
|
|
11 |
+ - "/tmp/**"
|
|
12 |
+ - "/tmp"
|
1 |
+kind: filter
|
|
2 |
+ |
|
3 |
+depends:
|
|
4 |
+- filename: script/marked-tmpdir.bst
|
|
5 |
+ type: build
|
|
6 |
+ |
|
7 |
+config:
|
|
8 |
+ exclude:
|
|
9 |
+ - remove
|
|
10 |
+ include-orphans: True
|
|
11 |
+ |
|
12 |
+ |
1 |
+kind: script
|
|
2 |
+ |
|
3 |
+depends:
|
|
4 |
+- filename: script/no-tmpdir.bst
|
|
5 |
+ type: build
|
|
6 |
+ |
|
7 |
+config:
|
|
8 |
+ commands:
|
|
9 |
+ - |
|
|
10 |
+ mkdir -p /tmp/blah
|
1 |
+alive
|
... | ... | @@ -155,3 +155,70 @@ def test_script_layout(cli, tmpdir, datafiles): |
155 | 155 |
text = f.read()
|
156 | 156 |
|
157 | 157 |
assert text == "Hi\n"
|
158 |
+ |
|
159 |
+ |
|
160 |
+@pytest.mark.datafiles(DATA_DIR)
|
|
161 |
+def test_regression_cache_corruption(cli, tmpdir, datafiles):
|
|
162 |
+ project = str(datafiles)
|
|
163 |
+ checkout_original = os.path.join(cli.directory, 'checkout-original')
|
|
164 |
+ checkout_after = os.path.join(cli.directory, 'checkout-after')
|
|
165 |
+ element_name = 'script/corruption.bst'
|
|
166 |
+ canary_element_name = 'script/corruption-image.bst'
|
|
167 |
+ |
|
168 |
+ res = cli.run(project=project, args=['build', canary_element_name])
|
|
169 |
+ assert res.exit_code == 0
|
|
170 |
+ |
|
171 |
+ res = cli.run(project=project, args=['checkout', canary_element_name,
|
|
172 |
+ checkout_original])
|
|
173 |
+ assert res.exit_code == 0
|
|
174 |
+ |
|
175 |
+ with open(os.path.join(checkout_original, 'canary')) as f:
|
|
176 |
+ assert f.read() == 'alive\n'
|
|
177 |
+ |
|
178 |
+ res = cli.run(project=project, args=['build', element_name])
|
|
179 |
+ assert res.exit_code == 0
|
|
180 |
+ |
|
181 |
+ res = cli.run(project=project, args=['checkout', canary_element_name,
|
|
182 |
+ checkout_after])
|
|
183 |
+ assert res.exit_code == 0
|
|
184 |
+ |
|
185 |
+ with open(os.path.join(checkout_after, 'canary')) as f:
|
|
186 |
+ assert f.read() == 'alive\n'
|
|
187 |
+ |
|
188 |
+ |
|
189 |
+@pytest.mark.datafiles(DATA_DIR)
|
|
190 |
+def test_regression_tmpdir(cli, tmpdir, datafiles):
|
|
191 |
+ project = str(datafiles)
|
|
192 |
+ element_name = 'script/tmpdir.bst'
|
|
193 |
+ |
|
194 |
+ res = cli.run(project=project, args=['build', element_name])
|
|
195 |
+ assert res.exit_code == 0
|
|
196 |
+ |
|
197 |
+ |
|
198 |
+@pytest.mark.datafiles(DATA_DIR)
|
|
199 |
+def test_regression_cache_corruption_2(cli, tmpdir, datafiles):
|
|
200 |
+ project = str(datafiles)
|
|
201 |
+ checkout_original = os.path.join(cli.directory, 'checkout-original')
|
|
202 |
+ checkout_after = os.path.join(cli.directory, 'checkout-after')
|
|
203 |
+ element_name = 'script/corruption-2.bst'
|
|
204 |
+ canary_element_name = 'script/corruption-image.bst'
|
|
205 |
+ |
|
206 |
+ res = cli.run(project=project, args=['build', canary_element_name])
|
|
207 |
+ assert res.exit_code == 0
|
|
208 |
+ |
|
209 |
+ res = cli.run(project=project, args=['checkout', canary_element_name,
|
|
210 |
+ checkout_original])
|
|
211 |
+ assert res.exit_code == 0
|
|
212 |
+ |
|
213 |
+ with open(os.path.join(checkout_original, 'canary')) as f:
|
|
214 |
+ assert f.read() == 'alive\n'
|
|
215 |
+ |
|
216 |
+ res = cli.run(project=project, args=['build', element_name])
|
|
217 |
+ assert res.exit_code == 0
|
|
218 |
+ |
|
219 |
+ res = cli.run(project=project, args=['checkout', canary_element_name,
|
|
220 |
+ checkout_after])
|
|
221 |
+ assert res.exit_code == 0
|
|
222 |
+ |
|
223 |
+ with open(os.path.join(checkout_after, 'canary')) as f:
|
|
224 |
+ assert f.read() == 'alive\n'
|
... | ... | @@ -29,7 +29,11 @@ from buildstream._exceptions import ArtifactError |
29 | 29 |
#
|
30 | 30 |
class ArtifactShare():
|
31 | 31 |
|
32 |
- def __init__(self, directory, *, total_space=None, free_space=None):
|
|
32 |
+ def __init__(self, directory, *,
|
|
33 |
+ total_space=None,
|
|
34 |
+ free_space=None,
|
|
35 |
+ min_head_size=int(2e9),
|
|
36 |
+ max_head_size=int(10e9)):
|
|
33 | 37 |
|
34 | 38 |
# The working directory for the artifact share (in case it
|
35 | 39 |
# needs to do something outside of it's backend's storage folder).
|
... | ... | @@ -53,6 +57,9 @@ class ArtifactShare(): |
53 | 57 |
self.total_space = total_space
|
54 | 58 |
self.free_space = free_space
|
55 | 59 |
|
60 |
+ self.max_head_size = max_head_size
|
|
61 |
+ self.min_head_size = min_head_size
|
|
62 |
+ |
|
56 | 63 |
q = Queue()
|
57 | 64 |
|
58 | 65 |
self.process = Process(target=self.run, args=(q,))
|
... | ... | @@ -76,7 +83,10 @@ class ArtifactShare(): |
76 | 83 |
self.free_space = self.total_space
|
77 | 84 |
os.statvfs = self._mock_statvfs
|
78 | 85 |
|
79 |
- server = create_server(self.repodir, enable_push=True)
|
|
86 |
+ server = create_server(self.repodir,
|
|
87 |
+ max_head_size=self.max_head_size,
|
|
88 |
+ min_head_size=self.min_head_size,
|
|
89 |
+ enable_push=True)
|
|
80 | 90 |
port = server.add_insecure_port('localhost:0')
|
81 | 91 |
|
82 | 92 |
server.start()
|
... | ... | @@ -118,6 +128,15 @@ class ArtifactShare(): |
118 | 128 |
|
119 | 129 |
try:
|
120 | 130 |
tree = self.cas.resolve_ref(artifact_key)
|
131 |
+ reachable = set()
|
|
132 |
+ try:
|
|
133 |
+ self.cas._reachable_refs_dir(reachable, tree, update_mtime=False)
|
|
134 |
+ except FileNotFoundError:
|
|
135 |
+ return False
|
|
136 |
+ for digest in reachable:
|
|
137 |
+ object_name = os.path.join(self.cas.casdir, 'objects', digest[:2], digest[2:])
|
|
138 |
+ if not os.path.exists(object_name):
|
|
139 |
+ return False
|
|
121 | 140 |
return True
|
122 | 141 |
except ArtifactError:
|
123 | 142 |
return False
|
... | ... | @@ -149,8 +168,11 @@ class ArtifactShare(): |
149 | 168 |
# Create an ArtifactShare for use in a test case
|
150 | 169 |
#
|
151 | 170 |
@contextmanager
|
152 |
-def create_artifact_share(directory, *, total_space=None, free_space=None):
|
|
153 |
- share = ArtifactShare(directory, total_space=total_space, free_space=free_space)
|
|
171 |
+def create_artifact_share(directory, *, total_space=None, free_space=None,
|
|
172 |
+ min_head_size=int(2e9),
|
|
173 |
+ max_head_size=int(10e9)):
|
|
174 |
+ share = ArtifactShare(directory, total_space=total_space, free_space=free_space,
|
|
175 |
+ min_head_size=min_head_size, max_head_size=max_head_size)
|
|
154 | 176 |
try:
|
155 | 177 |
yield share
|
156 | 178 |
finally:
|