Valentin David pushed to branch master at BuildStream / buildstream
Commits:
-
26cdee08
by Valentin David at 2018-11-28T14:29:52Z
-
58ca298f
by Valentin David at 2018-11-28T14:29:52Z
-
227fa26d
by Valentin David at 2018-11-28T14:29:52Z
-
5ef19a0b
by Valentin David at 2018-11-28T14:29:52Z
-
8d2946ff
by Valentin David at 2018-11-28T14:29:52Z
-
b587953a
by Valentin David at 2018-11-28T14:29:52Z
-
a64f667d
by Valentin David at 2018-11-28T14:29:52Z
-
353b90dd
by Valentin David at 2018-11-28T14:29:52Z
-
ba9afa98
by Valentin David at 2018-11-28T14:29:52Z
-
9a458402
by Valentin David at 2018-11-29T08:47:40Z
4 changed files:
- buildstream/_artifactcache/cascache.py
- buildstream/_artifactcache/casserver.py
- tests/frontend/push.py
- tests/testutils/artifactshare.py
Changes:
... | ... | @@ -25,6 +25,7 @@ import os |
25 | 25 |
import stat
|
26 | 26 |
import tempfile
|
27 | 27 |
import uuid
|
28 |
+import contextlib
|
|
28 | 29 |
from urllib.parse import urlparse
|
29 | 30 |
|
30 | 31 |
import grpc
|
... | ... | @@ -88,6 +89,13 @@ class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key |
88 | 89 |
CASRemoteSpec.__new__.__defaults__ = (None, None, None)
|
89 | 90 |
|
90 | 91 |
|
92 |
+class BlobNotFound(CASError):
|
|
93 |
+ |
|
94 |
+ def __init__(self, blob, msg):
|
|
95 |
+ self.blob = blob
|
|
96 |
+ super().__init__(msg)
|
|
97 |
+ |
|
98 |
+ |
|
91 | 99 |
# A CASCache manages a CAS repository as specified in the Remote Execution API.
|
92 | 100 |
#
|
93 | 101 |
# Args:
|
... | ... | @@ -299,6 +307,8 @@ class CASCache(): |
299 | 307 |
raise CASError("Failed to pull ref {}: {}".format(ref, e)) from e
|
300 | 308 |
else:
|
301 | 309 |
return False
|
310 |
+ except BlobNotFound as e:
|
|
311 |
+ return False
|
|
302 | 312 |
|
303 | 313 |
# pull_tree():
|
304 | 314 |
#
|
... | ... | @@ -471,13 +481,14 @@ class CASCache(): |
471 | 481 |
# digest (Digest): An optional Digest object to populate
|
472 | 482 |
# path (str): Path to file to add
|
473 | 483 |
# buffer (bytes): Byte buffer to add
|
484 |
+ # link_directly (bool): Whether file given by path can be linked
|
|
474 | 485 |
#
|
475 | 486 |
# Returns:
|
476 | 487 |
# (Digest): The digest of the added object
|
477 | 488 |
#
|
478 | 489 |
# Either `path` or `buffer` must be passed, but not both.
|
479 | 490 |
#
|
480 |
- def add_object(self, *, digest=None, path=None, buffer=None):
|
|
491 |
+ def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False):
|
|
481 | 492 |
# Exactly one of the two parameters has to be specified
|
482 | 493 |
assert (path is None) != (buffer is None)
|
483 | 494 |
|
... | ... | @@ -487,28 +498,34 @@ class CASCache(): |
487 | 498 |
try:
|
488 | 499 |
h = hashlib.sha256()
|
489 | 500 |
# Always write out new file to avoid corruption if input file is modified
|
490 |
- with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
|
|
491 |
- # Set mode bits to 0644
|
|
492 |
- os.chmod(out.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
|
|
493 |
- |
|
494 |
- if path:
|
|
495 |
- with open(path, 'rb') as f:
|
|
496 |
- for chunk in iter(lambda: f.read(4096), b""):
|
|
497 |
- h.update(chunk)
|
|
498 |
- out.write(chunk)
|
|
501 |
+ with contextlib.ExitStack() as stack:
|
|
502 |
+ if path is not None and link_directly:
|
|
503 |
+ tmp = stack.enter_context(open(path, 'rb'))
|
|
504 |
+ for chunk in iter(lambda: tmp.read(4096), b""):
|
|
505 |
+ h.update(chunk)
|
|
499 | 506 |
else:
|
500 |
- h.update(buffer)
|
|
501 |
- out.write(buffer)
|
|
507 |
+ tmp = stack.enter_context(tempfile.NamedTemporaryFile(dir=self.tmpdir))
|
|
508 |
+ # Set mode bits to 0644
|
|
509 |
+ os.chmod(tmp.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
|
|
502 | 510 |
|
503 |
- out.flush()
|
|
511 |
+ if path:
|
|
512 |
+ with open(path, 'rb') as f:
|
|
513 |
+ for chunk in iter(lambda: f.read(4096), b""):
|
|
514 |
+ h.update(chunk)
|
|
515 |
+ tmp.write(chunk)
|
|
516 |
+ else:
|
|
517 |
+ h.update(buffer)
|
|
518 |
+ tmp.write(buffer)
|
|
519 |
+ |
|
520 |
+ tmp.flush()
|
|
504 | 521 |
|
505 | 522 |
digest.hash = h.hexdigest()
|
506 |
- digest.size_bytes = os.fstat(out.fileno()).st_size
|
|
523 |
+ digest.size_bytes = os.fstat(tmp.fileno()).st_size
|
|
507 | 524 |
|
508 | 525 |
# Place file at final location
|
509 | 526 |
objpath = self.objpath(digest)
|
510 | 527 |
os.makedirs(os.path.dirname(objpath), exist_ok=True)
|
511 |
- os.link(out.name, objpath)
|
|
528 |
+ os.link(tmp.name, objpath)
|
|
512 | 529 |
|
513 | 530 |
except FileExistsError as e:
|
514 | 531 |
# We can ignore the failed link() if the object is already in the repo.
|
... | ... | @@ -606,6 +623,41 @@ class CASCache(): |
606 | 623 |
# first ref of this list will be the file modified earliest.
|
607 | 624 |
return [ref for _, ref in sorted(zip(mtimes, refs))]
|
608 | 625 |
|
626 |
+ # list_objects():
|
|
627 |
+ #
|
|
628 |
+ # List cached objects in Least Recently Modified (LRM) order.
|
|
629 |
+ #
|
|
630 |
+ # Returns:
|
|
631 |
+ # (list) - A list of objects and timestamps in LRM order
|
|
632 |
+ #
|
|
633 |
+ def list_objects(self):
|
|
634 |
+ objs = []
|
|
635 |
+ mtimes = []
|
|
636 |
+ |
|
637 |
+ for root, _, files in os.walk(os.path.join(self.casdir, 'objects')):
|
|
638 |
+ for filename in files:
|
|
639 |
+ obj_path = os.path.join(root, filename)
|
|
640 |
+ try:
|
|
641 |
+ mtimes.append(os.path.getmtime(obj_path))
|
|
642 |
+ except FileNotFoundError:
|
|
643 |
+ pass
|
|
644 |
+ else:
|
|
645 |
+ objs.append(obj_path)
|
|
646 |
+ |
|
647 |
+ # NOTE: Sorted will sort from earliest to latest, thus the
|
|
648 |
+ # first element of this list will be the file modified earliest.
|
|
649 |
+ return sorted(zip(mtimes, objs))
|
|
650 |
+ |
|
651 |
+ def clean_up_refs_until(self, time):
|
|
652 |
+ ref_heads = os.path.join(self.casdir, 'refs', 'heads')
|
|
653 |
+ |
|
654 |
+ for root, _, files in os.walk(ref_heads):
|
|
655 |
+ for filename in files:
|
|
656 |
+ ref_path = os.path.join(root, filename)
|
|
657 |
+ # Obtain the mtime (the time a file was last modified)
|
|
658 |
+ if os.path.getmtime(ref_path) < time:
|
|
659 |
+ os.unlink(ref_path)
|
|
660 |
+ |
|
609 | 661 |
# remove():
|
610 | 662 |
#
|
611 | 663 |
# Removes the given symbolic ref from the repo.
|
... | ... | @@ -665,6 +717,10 @@ class CASCache(): |
665 | 717 |
|
666 | 718 |
return pruned
|
667 | 719 |
|
720 |
+ def update_tree_mtime(self, tree):
|
|
721 |
+ reachable = set()
|
|
722 |
+ self._reachable_refs_dir(reachable, tree, update_mtime=True)
|
|
723 |
+ |
|
668 | 724 |
################################################
|
669 | 725 |
# Local Private Methods #
|
670 | 726 |
################################################
|
... | ... | @@ -811,10 +867,13 @@ class CASCache(): |
811 | 867 |
a += 1
|
812 | 868 |
b += 1
|
813 | 869 |
|
814 |
- def _reachable_refs_dir(self, reachable, tree):
|
|
870 |
+ def _reachable_refs_dir(self, reachable, tree, update_mtime=False):
|
|
815 | 871 |
if tree.hash in reachable:
|
816 | 872 |
return
|
817 | 873 |
|
874 |
+ if update_mtime:
|
|
875 |
+ os.utime(self.objpath(tree))
|
|
876 |
+ |
|
818 | 877 |
reachable.add(tree.hash)
|
819 | 878 |
|
820 | 879 |
directory = remote_execution_pb2.Directory()
|
... | ... | @@ -823,10 +882,12 @@ class CASCache(): |
823 | 882 |
directory.ParseFromString(f.read())
|
824 | 883 |
|
825 | 884 |
for filenode in directory.files:
|
885 |
+ if update_mtime:
|
|
886 |
+ os.utime(self.objpath(filenode.digest))
|
|
826 | 887 |
reachable.add(filenode.digest.hash)
|
827 | 888 |
|
828 | 889 |
for dirnode in directory.directories:
|
829 |
- self._reachable_refs_dir(reachable, dirnode.digest)
|
|
890 |
+ self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime)
|
|
830 | 891 |
|
831 | 892 |
def _required_blobs(self, directory_digest):
|
832 | 893 |
# parse directory, and recursively add blobs
|
... | ... | @@ -880,7 +941,7 @@ class CASCache(): |
880 | 941 |
with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
|
881 | 942 |
self._fetch_blob(remote, digest, f)
|
882 | 943 |
|
883 |
- added_digest = self.add_object(path=f.name)
|
|
944 |
+ added_digest = self.add_object(path=f.name, link_directly=True)
|
|
884 | 945 |
assert added_digest.hash == digest.hash
|
885 | 946 |
|
886 | 947 |
return objpath
|
... | ... | @@ -891,7 +952,7 @@ class CASCache(): |
891 | 952 |
f.write(data)
|
892 | 953 |
f.flush()
|
893 | 954 |
|
894 |
- added_digest = self.add_object(path=f.name)
|
|
955 |
+ added_digest = self.add_object(path=f.name, link_directly=True)
|
|
895 | 956 |
assert added_digest.hash == digest.hash
|
896 | 957 |
|
897 | 958 |
# Helper function for _fetch_directory().
|
... | ... | @@ -1203,6 +1264,9 @@ class _CASBatchRead(): |
1203 | 1264 |
batch_response = self._remote.cas.BatchReadBlobs(self._request)
|
1204 | 1265 |
|
1205 | 1266 |
for response in batch_response.responses:
|
1267 |
+ if response.status.code == code_pb2.NOT_FOUND:
|
|
1268 |
+ raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format(
|
|
1269 |
+ response.digest.hash, response.status.code))
|
|
1206 | 1270 |
if response.status.code != code_pb2.OK:
|
1207 | 1271 |
raise CASError("Failed to download blob {}: {}".format(
|
1208 | 1272 |
response.digest.hash, response.status.code))
|
... | ... | @@ -24,6 +24,8 @@ import signal |
24 | 24 |
import sys
|
25 | 25 |
import tempfile
|
26 | 26 |
import uuid
|
27 |
+import errno
|
|
28 |
+import threading
|
|
27 | 29 |
|
28 | 30 |
import click
|
29 | 31 |
import grpc
|
... | ... | @@ -31,6 +33,7 @@ import grpc |
31 | 33 |
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
32 | 34 |
from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
|
33 | 35 |
from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
|
36 |
+from .._protos.google.rpc import code_pb2
|
|
34 | 37 |
|
35 | 38 |
from .._exceptions import CASError
|
36 | 39 |
|
... | ... | @@ -55,18 +58,22 @@ class ArtifactTooLargeException(Exception): |
55 | 58 |
# repo (str): Path to CAS repository
|
56 | 59 |
# enable_push (bool): Whether to allow blob uploads and artifact updates
|
57 | 60 |
#
|
58 |
-def create_server(repo, *, enable_push):
|
|
61 |
+def create_server(repo, *, enable_push,
|
|
62 |
+ max_head_size=int(10e9),
|
|
63 |
+ min_head_size=int(2e9)):
|
|
59 | 64 |
cas = CASCache(os.path.abspath(repo))
|
60 | 65 |
|
61 | 66 |
# Use max_workers default from Python 3.5+
|
62 | 67 |
max_workers = (os.cpu_count() or 1) * 5
|
63 | 68 |
server = grpc.server(futures.ThreadPoolExecutor(max_workers))
|
64 | 69 |
|
70 |
+ cache_cleaner = _CacheCleaner(cas, max_head_size, min_head_size)
|
|
71 |
+ |
|
65 | 72 |
bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
|
66 |
- _ByteStreamServicer(cas, enable_push=enable_push), server)
|
|
73 |
+ _ByteStreamServicer(cas, cache_cleaner, enable_push=enable_push), server)
|
|
67 | 74 |
|
68 | 75 |
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
|
69 |
- _ContentAddressableStorageServicer(cas, enable_push=enable_push), server)
|
|
76 |
+ _ContentAddressableStorageServicer(cas, cache_cleaner, enable_push=enable_push), server)
|
|
70 | 77 |
|
71 | 78 |
remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
|
72 | 79 |
_CapabilitiesServicer(), server)
|
... | ... | @@ -84,9 +91,19 @@ def create_server(repo, *, enable_push): |
84 | 91 |
@click.option('--client-certs', help="Public client certificates for TLS (PEM-encoded)")
|
85 | 92 |
@click.option('--enable-push', default=False, is_flag=True,
|
86 | 93 |
help="Allow clients to upload blobs and update artifact cache")
|
94 |
+@click.option('--head-room-min', type=click.INT,
|
|
95 |
+ help="Disk head room minimum in bytes",
|
|
96 |
+ default=2e9)
|
|
97 |
+@click.option('--head-room-max', type=click.INT,
|
|
98 |
+ help="Disk head room maximum in bytes",
|
|
99 |
+ default=10e9)
|
|
87 | 100 |
@click.argument('repo')
|
88 |
-def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
|
|
89 |
- server = create_server(repo, enable_push=enable_push)
|
|
101 |
+def server_main(repo, port, server_key, server_cert, client_certs, enable_push,
|
|
102 |
+ head_room_min, head_room_max):
|
|
103 |
+ server = create_server(repo,
|
|
104 |
+ max_head_size=head_room_max,
|
|
105 |
+ min_head_size=head_room_min,
|
|
106 |
+ enable_push=enable_push)
|
|
90 | 107 |
|
91 | 108 |
use_tls = bool(server_key)
|
92 | 109 |
|
... | ... | @@ -128,10 +145,11 @@ def server_main(repo, port, server_key, server_cert, client_certs, enable_push): |
128 | 145 |
|
129 | 146 |
|
130 | 147 |
class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
|
131 |
- def __init__(self, cas, *, enable_push):
|
|
148 |
+ def __init__(self, cas, cache_cleaner, *, enable_push):
|
|
132 | 149 |
super().__init__()
|
133 | 150 |
self.cas = cas
|
134 | 151 |
self.enable_push = enable_push
|
152 |
+ self.cache_cleaner = cache_cleaner
|
|
135 | 153 |
|
136 | 154 |
def Read(self, request, context):
|
137 | 155 |
resource_name = request.resource_name
|
... | ... | @@ -189,17 +207,34 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
189 | 207 |
context.set_code(grpc.StatusCode.NOT_FOUND)
|
190 | 208 |
return response
|
191 | 209 |
|
192 |
- try:
|
|
193 |
- _clean_up_cache(self.cas, client_digest.size_bytes)
|
|
194 |
- except ArtifactTooLargeException as e:
|
|
195 |
- context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
|
|
196 |
- context.set_details(str(e))
|
|
197 |
- return response
|
|
210 |
+ while True:
|
|
211 |
+ if client_digest.size_bytes == 0:
|
|
212 |
+ break
|
|
213 |
+ try:
|
|
214 |
+ self.cache_cleaner.clean_up(client_digest.size_bytes)
|
|
215 |
+ except ArtifactTooLargeException as e:
|
|
216 |
+ context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
|
|
217 |
+ context.set_details(str(e))
|
|
218 |
+ return response
|
|
219 |
+ |
|
220 |
+ try:
|
|
221 |
+ os.posix_fallocate(out.fileno(), 0, client_digest.size_bytes)
|
|
222 |
+ break
|
|
223 |
+ except OSError as e:
|
|
224 |
+ # Multiple upload can happen in the same time
|
|
225 |
+ if e.errno != errno.ENOSPC:
|
|
226 |
+ raise
|
|
227 |
+ |
|
198 | 228 |
elif request.resource_name:
|
199 | 229 |
# If it is set on subsequent calls, it **must** match the value of the first request.
|
200 | 230 |
if request.resource_name != resource_name:
|
201 | 231 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
202 | 232 |
return response
|
233 |
+ |
|
234 |
+ if (offset + len(request.data)) > client_digest.size_bytes:
|
|
235 |
+ context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
|
236 |
+ return response
|
|
237 |
+ |
|
203 | 238 |
out.write(request.data)
|
204 | 239 |
offset += len(request.data)
|
205 | 240 |
if request.finish_write:
|
... | ... | @@ -207,7 +242,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
207 | 242 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
208 | 243 |
return response
|
209 | 244 |
out.flush()
|
210 |
- digest = self.cas.add_object(path=out.name)
|
|
245 |
+ digest = self.cas.add_object(path=out.name, link_directly=True)
|
|
211 | 246 |
if digest.hash != client_digest.hash:
|
212 | 247 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
213 | 248 |
return response
|
... | ... | @@ -220,18 +255,26 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
220 | 255 |
|
221 | 256 |
|
222 | 257 |
class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
|
223 |
- def __init__(self, cas, *, enable_push):
|
|
258 |
+ def __init__(self, cas, cache_cleaner, *, enable_push):
|
|
224 | 259 |
super().__init__()
|
225 | 260 |
self.cas = cas
|
226 | 261 |
self.enable_push = enable_push
|
262 |
+ self.cache_cleaner = cache_cleaner
|
|
227 | 263 |
|
228 | 264 |
def FindMissingBlobs(self, request, context):
|
229 | 265 |
response = remote_execution_pb2.FindMissingBlobsResponse()
|
230 | 266 |
for digest in request.blob_digests:
|
231 |
- if not _has_object(self.cas, digest):
|
|
232 |
- d = response.missing_blob_digests.add()
|
|
233 |
- d.hash = digest.hash
|
|
234 |
- d.size_bytes = digest.size_bytes
|
|
267 |
+ objpath = self.cas.objpath(digest)
|
|
268 |
+ try:
|
|
269 |
+ os.utime(objpath)
|
|
270 |
+ except OSError as e:
|
|
271 |
+ if e.errno != errno.ENOENT:
|
|
272 |
+ raise
|
|
273 |
+ else:
|
|
274 |
+ d = response.missing_blob_digests.add()
|
|
275 |
+ d.hash = digest.hash
|
|
276 |
+ d.size_bytes = digest.size_bytes
|
|
277 |
+ |
|
235 | 278 |
return response
|
236 | 279 |
|
237 | 280 |
def BatchReadBlobs(self, request, context):
|
... | ... | @@ -250,12 +293,12 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres |
250 | 293 |
try:
|
251 | 294 |
with open(self.cas.objpath(digest), 'rb') as f:
|
252 | 295 |
if os.fstat(f.fileno()).st_size != digest.size_bytes:
|
253 |
- blob_response.status.code = grpc.StatusCode.NOT_FOUND
|
|
296 |
+ blob_response.status.code = code_pb2.NOT_FOUND
|
|
254 | 297 |
continue
|
255 | 298 |
|
256 | 299 |
blob_response.data = f.read(digest.size_bytes)
|
257 | 300 |
except FileNotFoundError:
|
258 |
- blob_response.status.code = grpc.StatusCode.NOT_FOUND
|
|
301 |
+ blob_response.status.code = code_pb2.NOT_FOUND
|
|
259 | 302 |
|
260 | 303 |
return response
|
261 | 304 |
|
... | ... | @@ -285,7 +328,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres |
285 | 328 |
continue
|
286 | 329 |
|
287 | 330 |
try:
|
288 |
- _clean_up_cache(self.cas, digest.size_bytes)
|
|
331 |
+ self.cache_cleaner.clean_up(digest.size_bytes)
|
|
289 | 332 |
|
290 | 333 |
with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
|
291 | 334 |
out.write(blob_request.data)
|
... | ... | @@ -328,6 +371,12 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): |
328 | 371 |
|
329 | 372 |
try:
|
330 | 373 |
tree = self.cas.resolve_ref(request.key, update_mtime=True)
|
374 |
+ try:
|
|
375 |
+ self.cas.update_tree_mtime(tree)
|
|
376 |
+ except FileNotFoundError:
|
|
377 |
+ self.cas.remove(request.key, defer_prune=True)
|
|
378 |
+ context.set_code(grpc.StatusCode.NOT_FOUND)
|
|
379 |
+ return response
|
|
331 | 380 |
|
332 | 381 |
response.digest.hash = tree.hash
|
333 | 382 |
response.digest.size_bytes = tree.size_bytes
|
... | ... | @@ -400,60 +449,79 @@ def _digest_from_upload_resource_name(resource_name): |
400 | 449 |
return None
|
401 | 450 |
|
402 | 451 |
|
403 |
-def _has_object(cas, digest):
|
|
404 |
- objpath = cas.objpath(digest)
|
|
405 |
- return os.path.exists(objpath)
|
|
452 |
+class _CacheCleaner:
|
|
406 | 453 |
|
454 |
+ __cleanup_cache_lock = threading.Lock()
|
|
407 | 455 |
|
408 |
-# _clean_up_cache()
|
|
409 |
-#
|
|
410 |
-# Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
|
|
411 |
-# is enough space for the incoming artifact
|
|
412 |
-#
|
|
413 |
-# Args:
|
|
414 |
-# cas: CASCache object
|
|
415 |
-# object_size: The size of the object being received in bytes
|
|
416 |
-#
|
|
417 |
-# Returns:
|
|
418 |
-# int: The total bytes removed on the filesystem
|
|
419 |
-#
|
|
420 |
-def _clean_up_cache(cas, object_size):
|
|
421 |
- # Determine the available disk space, in bytes, of the file system
|
|
422 |
- # which mounts the repo
|
|
423 |
- stats = os.statvfs(cas.casdir)
|
|
424 |
- buffer_ = int(2e9) # Add a 2 GB buffer
|
|
425 |
- free_disk_space = (stats.f_bfree * stats.f_bsize) - buffer_
|
|
426 |
- total_disk_space = (stats.f_blocks * stats.f_bsize) - buffer_
|
|
427 |
- |
|
428 |
- if object_size > total_disk_space:
|
|
429 |
- raise ArtifactTooLargeException("Artifact of size: {} is too large for "
|
|
430 |
- "the filesystem which mounts the remote "
|
|
431 |
- "cache".format(object_size))
|
|
432 |
- |
|
433 |
- if object_size <= free_disk_space:
|
|
434 |
- # No need to clean up
|
|
435 |
- return 0
|
|
436 |
- |
|
437 |
- # obtain a list of LRP artifacts
|
|
438 |
- LRP_artifacts = cas.list_refs()
|
|
439 |
- |
|
440 |
- removed_size = 0 # in bytes
|
|
441 |
- while object_size - removed_size > free_disk_space:
|
|
442 |
- try:
|
|
443 |
- to_remove = LRP_artifacts.pop(0) # The first element in the list is the LRP artifact
|
|
444 |
- except IndexError:
|
|
445 |
- # This exception is caught if there are no more artifacts in the list
|
|
446 |
- # LRP_artifacts. This means the the artifact is too large for the filesystem
|
|
447 |
- # so we abort the process
|
|
448 |
- raise ArtifactTooLargeException("Artifact of size {} is too large for "
|
|
449 |
- "the filesystem which mounts the remote "
|
|
450 |
- "cache".format(object_size))
|
|
456 |
+ def __init__(self, cas, max_head_size, min_head_size=int(2e9)):
|
|
457 |
+ self.__cas = cas
|
|
458 |
+ self.__max_head_size = max_head_size
|
|
459 |
+ self.__min_head_size = min_head_size
|
|
451 | 460 |
|
452 |
- removed_size += cas.remove(to_remove, defer_prune=False)
|
|
461 |
+ def __has_space(self, object_size):
|
|
462 |
+ stats = os.statvfs(self.__cas.casdir)
|
|
463 |
+ free_disk_space = (stats.f_bavail * stats.f_bsize) - self.__min_head_size
|
|
464 |
+ total_disk_space = (stats.f_blocks * stats.f_bsize) - self.__min_head_size
|
|
453 | 465 |
|
454 |
- if removed_size > 0:
|
|
455 |
- logging.info("Successfully removed {} bytes from the cache".format(removed_size))
|
|
456 |
- else:
|
|
457 |
- logging.info("No artifacts were removed from the cache.")
|
|
466 |
+ if object_size > total_disk_space:
|
|
467 |
+ raise ArtifactTooLargeException("Artifact of size: {} is too large for "
|
|
468 |
+ "the filesystem which mounts the remote "
|
|
469 |
+ "cache".format(object_size))
|
|
458 | 470 |
|
459 |
- return removed_size
|
|
471 |
+ return object_size <= free_disk_space
|
|
472 |
+ |
|
473 |
+ # _clean_up_cache()
|
|
474 |
+ #
|
|
475 |
+ # Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
|
|
476 |
+ # is enough space for the incoming artifact
|
|
477 |
+ #
|
|
478 |
+ # Args:
|
|
479 |
+ # object_size: The size of the object being received in bytes
|
|
480 |
+ #
|
|
481 |
+ # Returns:
|
|
482 |
+ # int: The total bytes removed on the filesystem
|
|
483 |
+ #
|
|
484 |
+ def clean_up(self, object_size):
|
|
485 |
+ if self.__has_space(object_size):
|
|
486 |
+ return 0
|
|
487 |
+ |
|
488 |
+ with _CacheCleaner.__cleanup_cache_lock:
|
|
489 |
+ if self.__has_space(object_size):
|
|
490 |
+ # Another thread has done the cleanup for us
|
|
491 |
+ return 0
|
|
492 |
+ |
|
493 |
+ stats = os.statvfs(self.__cas.casdir)
|
|
494 |
+ target_disk_space = (stats.f_bavail * stats.f_bsize) - self.__max_head_size
|
|
495 |
+ |
|
496 |
+ # obtain a list of LRP artifacts
|
|
497 |
+ LRP_objects = self.__cas.list_objects()
|
|
498 |
+ |
|
499 |
+ removed_size = 0 # in bytes
|
|
500 |
+ last_mtime = 0
|
|
501 |
+ |
|
502 |
+ while object_size - removed_size > target_disk_space:
|
|
503 |
+ try:
|
|
504 |
+ last_mtime, to_remove = LRP_objects.pop(0) # The first element in the list is the LRP artifact
|
|
505 |
+ except IndexError:
|
|
506 |
+ # This exception is caught if there are no more artifacts in the list
|
|
507 |
+ # LRP_artifacts. This means the the artifact is too large for the filesystem
|
|
508 |
+ # so we abort the process
|
|
509 |
+ raise ArtifactTooLargeException("Artifact of size {} is too large for "
|
|
510 |
+ "the filesystem which mounts the remote "
|
|
511 |
+ "cache".format(object_size))
|
|
512 |
+ |
|
513 |
+ try:
|
|
514 |
+ size = os.stat(to_remove).st_size
|
|
515 |
+ os.unlink(to_remove)
|
|
516 |
+ removed_size += size
|
|
517 |
+ except FileNotFoundError:
|
|
518 |
+ pass
|
|
519 |
+ |
|
520 |
+ self.__cas.clean_up_refs_until(last_mtime)
|
|
521 |
+ |
|
522 |
+ if removed_size > 0:
|
|
523 |
+ logging.info("Successfully removed {} bytes from the cache".format(removed_size))
|
|
524 |
+ else:
|
|
525 |
+ logging.info("No artifacts were removed from the cache.")
|
|
526 |
+ |
|
527 |
+ return removed_size
|
... | ... | @@ -230,6 +230,8 @@ def test_artifact_expires(cli, datafiles, tmpdir): |
230 | 230 |
# Create an artifact share (remote artifact cache) in the tmpdir/artifactshare
|
231 | 231 |
# Mock a file system with 12 MB free disk space
|
232 | 232 |
with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'),
|
233 |
+ min_head_size=int(2e9),
|
|
234 |
+ max_head_size=int(2e9),
|
|
233 | 235 |
total_space=int(10e9), free_space=(int(12e6) + int(2e9))) as share:
|
234 | 236 |
|
235 | 237 |
# Configure bst to push to the cache
|
... | ... | @@ -313,6 +315,8 @@ def test_recently_pulled_artifact_does_not_expire(cli, datafiles, tmpdir): |
313 | 315 |
# Create an artifact share (remote cache) in tmpdir/artifactshare
|
314 | 316 |
# Mock a file system with 12 MB free disk space
|
315 | 317 |
with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'),
|
318 |
+ min_head_size=int(2e9),
|
|
319 |
+ max_head_size=int(2e9),
|
|
316 | 320 |
total_space=int(10e9), free_space=(int(12e6) + int(2e9))) as share:
|
317 | 321 |
|
318 | 322 |
# Configure bst to push to the cache
|
... | ... | @@ -29,7 +29,11 @@ from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution |
29 | 29 |
#
|
30 | 30 |
class ArtifactShare():
|
31 | 31 |
|
32 |
- def __init__(self, directory, *, total_space=None, free_space=None):
|
|
32 |
+ def __init__(self, directory, *,
|
|
33 |
+ total_space=None,
|
|
34 |
+ free_space=None,
|
|
35 |
+ min_head_size=int(2e9),
|
|
36 |
+ max_head_size=int(10e9)):
|
|
33 | 37 |
|
34 | 38 |
# The working directory for the artifact share (in case it
|
35 | 39 |
# needs to do something outside of its backend's storage folder).
|
... | ... | @@ -50,6 +54,9 @@ class ArtifactShare(): |
50 | 54 |
self.total_space = total_space
|
51 | 55 |
self.free_space = free_space
|
52 | 56 |
|
57 |
+ self.max_head_size = max_head_size
|
|
58 |
+ self.min_head_size = min_head_size
|
|
59 |
+ |
|
53 | 60 |
q = Queue()
|
54 | 61 |
|
55 | 62 |
self.process = Process(target=self.run, args=(q,))
|
... | ... | @@ -74,7 +81,10 @@ class ArtifactShare(): |
74 | 81 |
self.free_space = self.total_space
|
75 | 82 |
os.statvfs = self._mock_statvfs
|
76 | 83 |
|
77 |
- server = create_server(self.repodir, enable_push=True)
|
|
84 |
+ server = create_server(self.repodir,
|
|
85 |
+ max_head_size=self.max_head_size,
|
|
86 |
+ min_head_size=self.min_head_size,
|
|
87 |
+ enable_push=True)
|
|
78 | 88 |
port = server.add_insecure_port('localhost:0')
|
79 | 89 |
|
80 | 90 |
server.start()
|
... | ... | @@ -136,6 +146,15 @@ class ArtifactShare(): |
136 | 146 |
|
137 | 147 |
try:
|
138 | 148 |
tree = self.cas.resolve_ref(artifact_key)
|
149 |
+ reachable = set()
|
|
150 |
+ try:
|
|
151 |
+ self.cas._reachable_refs_dir(reachable, tree, update_mtime=False)
|
|
152 |
+ except FileNotFoundError:
|
|
153 |
+ return None
|
|
154 |
+ for digest in reachable:
|
|
155 |
+ object_name = os.path.join(self.cas.casdir, 'objects', digest[:2], digest[2:])
|
|
156 |
+ if not os.path.exists(object_name):
|
|
157 |
+ return None
|
|
139 | 158 |
return tree
|
140 | 159 |
except CASError:
|
141 | 160 |
return None
|
... | ... | @@ -167,8 +186,11 @@ class ArtifactShare(): |
167 | 186 |
# Create an ArtifactShare for use in a test case
|
168 | 187 |
#
|
169 | 188 |
@contextmanager
|
170 |
-def create_artifact_share(directory, *, total_space=None, free_space=None):
|
|
171 |
- share = ArtifactShare(directory, total_space=total_space, free_space=free_space)
|
|
189 |
+def create_artifact_share(directory, *, total_space=None, free_space=None,
|
|
190 |
+ min_head_size=int(2e9),
|
|
191 |
+ max_head_size=int(10e9)):
|
|
192 |
+ share = ArtifactShare(directory, total_space=total_space, free_space=free_space,
|
|
193 |
+ min_head_size=min_head_size, max_head_size=max_head_size)
|
|
172 | 194 |
try:
|
173 | 195 |
yield share
|
174 | 196 |
finally:
|