Valentin David pushed to branch valentindavid/cache_server_fill_up at BuildStream / buildstream
Commits:
-
94b7483f
by Valentin David at 2018-11-05T10:35:43Z
4 changed files:
- buildstream/_artifactcache/cascache.py
- buildstream/_artifactcache/casserver.py
- tests/frontend/push.py
- tests/testutils/artifactshare.py
Changes:
... | ... | @@ -593,6 +593,41 @@ class CASCache(ArtifactCache): |
593 | 593 |
# first element of this list will be the file modified earliest.
|
594 | 594 |
return [ref for _, ref in sorted(zip(mtimes, refs))]
|
595 | 595 |
|
596 |
+ # list_objects():
|
|
597 |
+ #
|
|
598 |
+ # List cached objects in Least Recently Modified (LRM) order.
|
|
599 |
+ #
|
|
600 |
+ # Returns:
|
|
601 |
+ # (list) - A list of objects and timestamps in LRM order
|
|
602 |
+ #
|
|
603 |
+ def list_objects(self):
|
|
604 |
+ objs = []
|
|
605 |
+ mtimes = []
|
|
606 |
+ |
|
607 |
+ for root, _, files in os.walk(os.path.join(self.casdir, 'objects')):
|
|
608 |
+ for filename in files:
|
|
609 |
+ obj_path = os.path.join(root, filename)
|
|
610 |
+ try:
|
|
611 |
+ mtimes.append(os.path.getmtime(obj_path))
|
|
612 |
+ except FileNotFoundError:
|
|
613 |
+ pass
|
|
614 |
+ else:
|
|
615 |
+ objs.append(obj_path)
|
|
616 |
+ |
|
617 |
+ # NOTE: Sorted will sort from earliest to latest, thus the
|
|
618 |
+ # first element of this list will be the file modified earliest.
|
|
619 |
+ return sorted(zip(mtimes, objs))
|
|
620 |
+ |
|
621 |
+ def clean_up_refs_until(self, time):
|
|
622 |
+ ref_heads = os.path.join(self.casdir, 'refs', 'heads')
|
|
623 |
+ |
|
624 |
+ for root, _, files in os.walk(ref_heads):
|
|
625 |
+ for filename in files:
|
|
626 |
+ ref_path = os.path.join(root, filename)
|
|
627 |
+ # Obtain the mtime (the time a file was last modified)
|
|
628 |
+ if os.path.getmtime(ref_path) < time:
|
|
629 |
+ os.unlink(ref_path)
|
|
630 |
+ |
|
596 | 631 |
# remove():
|
597 | 632 |
#
|
598 | 633 |
# Removes the given symbolic ref from the repo.
|
... | ... | @@ -831,6 +866,9 @@ class CASCache(ArtifactCache): |
831 | 866 |
if tree.hash in reachable:
|
832 | 867 |
return
|
833 | 868 |
|
869 |
+ if update_mtime:
|
|
870 |
+ os.utime(self.objpath(tree))
|
|
871 |
+ |
|
834 | 872 |
reachable.add(tree.hash)
|
835 | 873 |
|
836 | 874 |
directory = remote_execution_pb2.Directory()
|
... | ... | @@ -844,8 +882,6 @@ class CASCache(ArtifactCache): |
844 | 882 |
reachable.add(filenode.digest.hash)
|
845 | 883 |
|
846 | 884 |
for dirnode in directory.directories:
|
847 |
- if update_mtime:
|
|
848 |
- os.utime(self.objpath(dirnode.digest))
|
|
849 | 885 |
self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime)
|
850 | 886 |
|
851 | 887 |
def _initialize_remote(self, remote_spec, q):
|
... | ... | @@ -24,9 +24,9 @@ import signal |
24 | 24 |
import sys
|
25 | 25 |
import tempfile
|
26 | 26 |
import uuid
|
27 |
-import time
|
|
28 | 27 |
import errno
|
29 | 28 |
import ctypes
|
29 |
+import threading
|
|
30 | 30 |
|
31 | 31 |
import click
|
32 | 32 |
import grpc
|
... | ... | @@ -62,7 +62,9 @@ class ArtifactTooLargeException(Exception): |
62 | 62 |
# repo (str): Path to CAS repository
|
63 | 63 |
# enable_push (bool): Whether to allow blob uploads and artifact updates
|
64 | 64 |
#
|
65 |
-def create_server(repo, *, enable_push):
|
|
65 |
+def create_server(repo, *, enable_push,
|
|
66 |
+ max_head_size=int(10e9),
|
|
67 |
+ min_head_size=int(2e9)):
|
|
66 | 68 |
context = Context()
|
67 | 69 |
context.artifactdir = os.path.abspath(repo)
|
68 | 70 |
|
... | ... | @@ -72,11 +74,13 @@ def create_server(repo, *, enable_push): |
72 | 74 |
max_workers = (os.cpu_count() or 1) * 5
|
73 | 75 |
server = grpc.server(futures.ThreadPoolExecutor(max_workers))
|
74 | 76 |
|
77 |
+ cache_cleaner = _CacheCleaner(artifactcache, max_head_size, min_head_size)
|
|
78 |
+ |
|
75 | 79 |
bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
|
76 |
- _ByteStreamServicer(artifactcache, enable_push=enable_push), server)
|
|
80 |
+ _ByteStreamServicer(artifactcache, cache_cleaner, enable_push=enable_push), server)
|
|
77 | 81 |
|
78 | 82 |
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
|
79 |
- _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
|
|
83 |
+ _ContentAddressableStorageServicer(artifactcache, cache_cleaner, enable_push=enable_push), server)
|
|
80 | 84 |
|
81 | 85 |
remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
|
82 | 86 |
_CapabilitiesServicer(), server)
|
... | ... | @@ -94,9 +98,19 @@ def create_server(repo, *, enable_push): |
94 | 98 |
@click.option('--client-certs', help="Public client certificates for TLS (PEM-encoded)")
|
95 | 99 |
@click.option('--enable-push', default=False, is_flag=True,
|
96 | 100 |
help="Allow clients to upload blobs and update artifact cache")
|
101 |
+@click.option('--head-room-min', type=click.INT,
|
|
102 |
+ help="Disk head room minimum in bytes",
|
|
103 |
+ default=2e9)
|
|
104 |
+@click.option('--head-room-max', type=click.INT,
|
|
105 |
+ help="Disk head room maximum in bytes",
|
|
106 |
+ default=10e9)
|
|
97 | 107 |
@click.argument('repo')
|
98 |
-def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
|
|
99 |
- server = create_server(repo, enable_push=enable_push)
|
|
108 |
+def server_main(repo, port, server_key, server_cert, client_certs, enable_push,
|
|
109 |
+ head_room_min, head_room_max):
|
|
110 |
+ server = create_server(repo,
|
|
111 |
+ max_head_size=head_room_max,
|
|
112 |
+ min_head_size=head_room_min,
|
|
113 |
+ enable_push=enable_push)
|
|
100 | 114 |
|
101 | 115 |
use_tls = bool(server_key)
|
102 | 116 |
|
... | ... | @@ -168,11 +182,12 @@ class _FallocateCall: |
168 | 182 |
|
169 | 183 |
|
170 | 184 |
class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
|
171 |
- def __init__(self, cas, *, enable_push):
|
|
185 |
+ def __init__(self, cas, cache_cleaner, *, enable_push):
|
|
172 | 186 |
super().__init__()
|
173 | 187 |
self.cas = cas
|
174 | 188 |
self.enable_push = enable_push
|
175 | 189 |
self.fallocate = _FallocateCall()
|
190 |
+ self.cache_cleaner = cache_cleaner
|
|
176 | 191 |
|
177 | 192 |
def Read(self, request, context):
|
178 | 193 |
resource_name = request.resource_name
|
... | ... | @@ -234,7 +249,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
234 | 249 |
if client_digest.size_bytes == 0:
|
235 | 250 |
break
|
236 | 251 |
try:
|
237 |
- _clean_up_cache(self.cas, client_digest.size_bytes)
|
|
252 |
+ self.cache_cleaner.clean_up(client_digest.size_bytes)
|
|
238 | 253 |
except ArtifactTooLargeException as e:
|
239 | 254 |
context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
|
240 | 255 |
context.set_details(str(e))
|
... | ... | @@ -280,10 +295,11 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
280 | 295 |
|
281 | 296 |
|
282 | 297 |
class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
|
283 |
- def __init__(self, cas, *, enable_push):
|
|
298 |
+ def __init__(self, cas, cache_cleaner, *, enable_push):
|
|
284 | 299 |
super().__init__()
|
285 | 300 |
self.cas = cas
|
286 | 301 |
self.enable_push = enable_push
|
302 |
+ self.cache_cleaner = cache_cleaner
|
|
287 | 303 |
|
288 | 304 |
def FindMissingBlobs(self, request, context):
|
289 | 305 |
response = remote_execution_pb2.FindMissingBlobsResponse()
|
... | ... | @@ -352,7 +368,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres |
352 | 368 |
continue
|
353 | 369 |
|
354 | 370 |
try:
|
355 |
- _clean_up_cache(self.cas, digest.size_bytes)
|
|
371 |
+ self.cache_cleaner.clean_up(digest.size_bytes)
|
|
356 | 372 |
|
357 | 373 |
with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
|
358 | 374 |
out.write(blob_request.data)
|
... | ... | @@ -473,63 +489,80 @@ def _digest_from_upload_resource_name(resource_name): |
473 | 489 |
return None
|
474 | 490 |
|
475 | 491 |
|
476 |
-# _clean_up_cache()
|
|
477 |
-#
|
|
478 |
-# Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
|
|
479 |
-# is enough space for the incoming artifact
|
|
480 |
-#
|
|
481 |
-# Args:
|
|
482 |
-# cas: CASCache object
|
|
483 |
-# object_size: The size of the object being received in bytes
|
|
484 |
-#
|
|
485 |
-# Returns:
|
|
486 |
-# int: The total bytes removed on the filesystem
|
|
487 |
-#
|
|
488 |
-def _clean_up_cache(cas, object_size):
|
|
489 |
- # Determine the available disk space, in bytes, of the file system
|
|
490 |
- # which mounts the repo
|
|
491 |
- stats = os.statvfs(cas.casdir)
|
|
492 |
- buffer_ = int(2e9) # Add a 2 GB buffer
|
|
493 |
- free_disk_space = (stats.f_bavail * stats.f_bsize) - buffer_
|
|
494 |
- total_disk_space = (stats.f_blocks * stats.f_bsize) - buffer_
|
|
495 |
- |
|
496 |
- if object_size > total_disk_space:
|
|
497 |
- raise ArtifactTooLargeException("Artifact of size: {} is too large for "
|
|
498 |
- "the filesystem which mounts the remote "
|
|
499 |
- "cache".format(object_size))
|
|
500 |
- |
|
501 |
- if object_size <= free_disk_space:
|
|
502 |
- # No need to clean up
|
|
503 |
- return 0
|
|
504 |
- |
|
505 |
- # obtain a list of LRP artifacts
|
|
506 |
- LRP_artifacts = cas.list_artifacts()
|
|
507 |
- |
|
508 |
- keep_after = time.time() - _OBJECT_MIN_AGE
|
|
509 |
- |
|
510 |
- removed_size = 0 # in bytes
|
|
511 |
- if object_size - removed_size > free_disk_space:
|
|
512 |
- # First we try to see if some unreferenced objects became old
|
|
513 |
- # enough to be removed.
|
|
514 |
- removed_size += cas.prune(keep_after=keep_after)
|
|
515 |
- |
|
516 |
- while object_size - removed_size > free_disk_space:
|
|
517 |
- try:
|
|
518 |
- to_remove = LRP_artifacts.pop(0) # The first element in the list is the LRP artifact
|
|
519 |
- except IndexError:
|
|
520 |
- # This exception is caught if there are no more artifacts in the list
|
|
521 |
- # LRP_artifacts. This means the the artifact is too large for the filesystem
|
|
522 |
- # so we abort the process
|
|
523 |
- raise ArtifactTooLargeException("Artifact of size {} is too large for "
|
|
524 |
- "the filesystem which mounts the remote "
|
|
525 |
- "cache".format(object_size))
|
|
492 |
+class _CacheCleaner:
|
|
526 | 493 |
|
527 |
- cas.remove(to_remove, defer_prune=True)
|
|
528 |
- removed_size += cas.prune(keep_after=keep_after)
|
|
494 |
+ __cleanup_cache_lock = threading.Lock()
|
|
529 | 495 |
|
530 |
- if removed_size > 0:
|
|
531 |
- logging.info("Successfully removed {} bytes from the cache".format(removed_size))
|
|
532 |
- else:
|
|
533 |
- logging.info("No artifacts were removed from the cache.")
|
|
496 |
+ def __init__(self, cas, max_head_size, min_head_size=int(2e9)):
|
|
497 |
+ self.__cas = cas
|
|
498 |
+ self.__max_head_size = max_head_size
|
|
499 |
+ self.__min_head_size = min_head_size
|
|
500 |
+ |
|
501 |
+ def __has_space(self, object_size):
|
|
502 |
+ stats = os.statvfs(self.__cas.casdir)
|
|
503 |
+ free_disk_space = (stats.f_bavail * stats.f_bsize) - self.__min_head_size
|
|
504 |
+ total_disk_space = (stats.f_blocks * stats.f_bsize) - self.__min_head_size
|
|
505 |
+ |
|
506 |
+ if object_size > total_disk_space:
|
|
507 |
+ raise ArtifactTooLargeException("Artifact of size: {} is too large for "
|
|
508 |
+ "the filesystem which mounts the remote "
|
|
509 |
+ "cache".format(object_size))
|
|
534 | 510 |
|
535 |
- return removed_size
|
|
511 |
+ return object_size <= free_disk_space
|
|
512 |
+ |
|
513 |
+ # _clean_up_cache()
|
|
514 |
+ #
|
|
515 |
+ # Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
|
|
516 |
+ # is enough space for the incoming artifact
|
|
517 |
+ #
|
|
518 |
+ # Args:
|
|
519 |
+ # object_size: The size of the object being received in bytes
|
|
520 |
+ #
|
|
521 |
+ # Returns:
|
|
522 |
+ # int: The total bytes removed on the filesystem
|
|
523 |
+ #
|
|
524 |
+ def clean_up(self, object_size):
|
|
525 |
+ if self.__has_space(object_size):
|
|
526 |
+ return 0
|
|
527 |
+ |
|
528 |
+ with _CacheCleaner.__cleanup_cache_lock:
|
|
529 |
+ if self.__has_space(object_size):
|
|
530 |
+ # Another thread has done the cleanup for us
|
|
531 |
+ return 0
|
|
532 |
+ |
|
533 |
+ stats = os.statvfs(self.__cas.casdir)
|
|
534 |
+ target_disk_space = (stats.f_bavail * stats.f_bsize) - self.__max_head_size
|
|
535 |
+ |
|
536 |
+ # obtain a list of LRP artifacts
|
|
537 |
+ LRP_objects = self.__cas.list_objects()
|
|
538 |
+ |
|
539 |
+ removed_size = 0 # in bytes
|
|
540 |
+ |
|
541 |
+ last_mtime = 0
|
|
542 |
+ |
|
543 |
+ while object_size - removed_size > target_disk_space:
|
|
544 |
+ try:
|
|
545 |
+ last_mtime, to_remove = LRP_objects.pop(0) # The first element in the list is the LRP artifact
|
|
546 |
+ except IndexError:
|
|
547 |
+ # This exception is caught if there are no more artifacts in the list
|
|
548 |
+ # LRP_artifacts. This means the the artifact is too large for the filesystem
|
|
549 |
+ # so we abort the process
|
|
550 |
+ raise ArtifactTooLargeException("Artifact of size {} is too large for "
|
|
551 |
+ "the filesystem which mounts the remote "
|
|
552 |
+ "cache".format(object_size))
|
|
553 |
+ |
|
554 |
+ try:
|
|
555 |
+ size = os.stat(to_remove).st_size
|
|
556 |
+ os.unlink(to_remove)
|
|
557 |
+ removed_size += size
|
|
558 |
+ except FileNotFoundError:
|
|
559 |
+ pass
|
|
560 |
+ |
|
561 |
+ self.__cas.clean_up_refs_until(last_mtime)
|
|
562 |
+ |
|
563 |
+ if removed_size > 0:
|
|
564 |
+ logging.info("Successfully removed {} bytes from the cache".format(removed_size))
|
|
565 |
+ else:
|
|
566 |
+ logging.info("No artifacts were removed from the cache.")
|
|
567 |
+ |
|
568 |
+ return removed_size
|
... | ... | @@ -230,6 +230,8 @@ def test_artifact_expires(cli, datafiles, tmpdir): |
230 | 230 |
# Create an artifact share (remote artifact cache) in the tmpdir/artifactshare
|
231 | 231 |
# Mock a file system with 12 MB free disk space
|
232 | 232 |
with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'),
|
233 |
+ min_head_size=int(2e9),
|
|
234 |
+ max_head_size=int(2e9),
|
|
233 | 235 |
total_space=int(10e9), free_space=(int(12e6) + int(2e9))) as share:
|
234 | 236 |
|
235 | 237 |
# Configure bst to push to the cache
|
... | ... | @@ -253,8 +255,6 @@ def test_artifact_expires(cli, datafiles, tmpdir): |
253 | 255 |
assert cli.get_element_state(project, 'element2.bst') == 'cached'
|
254 | 256 |
assert_shared(cli, share, project, 'element2.bst')
|
255 | 257 |
|
256 |
- share.make_all_objects_older()
|
|
257 |
- |
|
258 | 258 |
# Create and build another element of 5 MB (This will exceed the free disk space available)
|
259 | 259 |
create_element_size('element3.bst', project, element_path, [], int(5e6))
|
260 | 260 |
result = cli.run(project=project, args=['build', 'element3.bst'])
|
... | ... | @@ -315,6 +315,8 @@ def test_recently_pulled_artifact_does_not_expire(cli, datafiles, tmpdir): |
315 | 315 |
# Create an artifact share (remote cache) in tmpdir/artifactshare
|
316 | 316 |
# Mock a file system with 12 MB free disk space
|
317 | 317 |
with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'),
|
318 |
+ min_head_size=int(2e9),
|
|
319 |
+ max_head_size=int(2e9),
|
|
318 | 320 |
total_space=int(10e9), free_space=(int(12e6) + int(2e9))) as share:
|
319 | 321 |
|
320 | 322 |
# Configure bst to push to the cache
|
... | ... | @@ -352,7 +354,6 @@ def test_recently_pulled_artifact_does_not_expire(cli, datafiles, tmpdir): |
352 | 354 |
assert cli.get_element_state(project, 'element1.bst') == 'cached'
|
353 | 355 |
|
354 | 356 |
wait_for_cache_granularity()
|
355 |
- share.make_all_objects_older()
|
|
356 | 357 |
|
357 | 358 |
# Create and build the element3 (of 5 MB)
|
358 | 359 |
create_element_size('element3.bst', project, element_path, [], int(5e6))
|
... | ... | @@ -29,7 +29,11 @@ from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution |
29 | 29 |
#
|
30 | 30 |
class ArtifactShare():
|
31 | 31 |
|
32 |
- def __init__(self, directory, *, total_space=None, free_space=None):
|
|
32 |
+ def __init__(self, directory, *,
|
|
33 |
+ total_space=None,
|
|
34 |
+ free_space=None,
|
|
35 |
+ min_head_size=int(2e9),
|
|
36 |
+ max_head_size=int(10e9)):
|
|
33 | 37 |
|
34 | 38 |
# The working directory for the artifact share (in case it
|
35 | 39 |
# needs to do something outside of its backend's storage folder).
|
... | ... | @@ -53,6 +57,9 @@ class ArtifactShare(): |
53 | 57 |
self.total_space = total_space
|
54 | 58 |
self.free_space = free_space
|
55 | 59 |
|
60 |
+ self.max_head_size = max_head_size
|
|
61 |
+ self.min_head_size = min_head_size
|
|
62 |
+ |
|
56 | 63 |
q = Queue()
|
57 | 64 |
|
58 | 65 |
self.process = Process(target=self.run, args=(q,))
|
... | ... | @@ -76,7 +83,10 @@ class ArtifactShare(): |
76 | 83 |
self.free_space = self.total_space
|
77 | 84 |
os.statvfs = self._mock_statvfs
|
78 | 85 |
|
79 |
- server = create_server(self.repodir, enable_push=True)
|
|
86 |
+ server = create_server(self.repodir,
|
|
87 |
+ max_head_size=self.max_head_size,
|
|
88 |
+ min_head_size=self.min_head_size,
|
|
89 |
+ enable_push=True)
|
|
80 | 90 |
port = server.add_insecure_port('localhost:0')
|
81 | 91 |
|
82 | 92 |
server.start()
|
... | ... | @@ -134,19 +144,19 @@ class ArtifactShare(): |
134 | 144 |
|
135 | 145 |
try:
|
136 | 146 |
tree = self.cas.resolve_ref(artifact_key)
|
147 |
+ reachable = set()
|
|
148 |
+ try:
|
|
149 |
+ self.cas._reachable_refs_dir(reachable, tree, update_mtime=False)
|
|
150 |
+ except FileNotFoundError:
|
|
151 |
+ return False
|
|
152 |
+ for digest in reachable:
|
|
153 |
+ object_name = os.path.join(self.cas.casdir, 'objects', digest[:2], digest[2:])
|
|
154 |
+ if not os.path.exists(object_name):
|
|
155 |
+ return False
|
|
137 | 156 |
return True
|
138 | 157 |
except ArtifactError:
|
139 | 158 |
return False
|
140 | 159 |
|
141 |
- def make_all_objects_older(self):
|
|
142 |
- for root, dirs, files in os.walk(os.path.join(self.cas.casdir, 'objects')):
|
|
143 |
- for name in files:
|
|
144 |
- fullname = os.path.join(root, name)
|
|
145 |
- st = os.stat(fullname)
|
|
146 |
- mtime = st.st_mtime - 6 * 60 * 60
|
|
147 |
- atime = st.st_atime - 6 * 60 * 60
|
|
148 |
- os.utime(fullname, times=(atime, mtime))
|
|
149 |
- |
|
150 | 160 |
# close():
|
151 | 161 |
#
|
152 | 162 |
# Remove the artifact share.
|
... | ... | @@ -174,8 +184,11 @@ class ArtifactShare(): |
174 | 184 |
# Create an ArtifactShare for use in a test case
|
175 | 185 |
#
|
176 | 186 |
@contextmanager
|
177 |
-def create_artifact_share(directory, *, total_space=None, free_space=None):
|
|
178 |
- share = ArtifactShare(directory, total_space=total_space, free_space=free_space)
|
|
187 |
+def create_artifact_share(directory, *, total_space=None, free_space=None,
|
|
188 |
+ min_head_size=int(2e9),
|
|
189 |
+ max_head_size=int(10e9)):
|
|
190 |
+ share = ArtifactShare(directory, total_space=total_space, free_space=free_space,
|
|
191 |
+ min_head_size=min_head_size, max_head_size=max_head_size)
|
|
179 | 192 |
try:
|
180 | 193 |
yield share
|
181 | 194 |
finally:
|