Valentin David pushed to branch valentindavid/cache_server_fill_up at BuildStream / buildstream
Commits:
-
b97ec9b8
by Valentin David at 2018-11-28T13:25:13Z
-
8e2cab79
by Valentin David at 2018-11-28T13:25:16Z
-
76a5b4cb
by Valentin David at 2018-11-28T13:25:16Z
-
e4f065e6
by Valentin David at 2018-11-28T13:25:16Z
-
ee0fae1d
by Valentin David at 2018-11-28T13:25:16Z
4 changed files:
- buildstream/_artifactcache/cascache.py
- buildstream/_artifactcache/casserver.py
- tests/frontend/push.py
- tests/testutils/artifactshare.py
Changes:
| ... | ... | @@ -623,6 +623,41 @@ class CASCache(): |
| 623 | 623 |
# first ref of this list will be the file modified earliest.
|
| 624 | 624 |
return [ref for _, ref in sorted(zip(mtimes, refs))]
|
| 625 | 625 |
|
| 626 |
+ # list_objects():
|
|
| 627 |
+ #
|
|
| 628 |
+ # List cached objects in Least Recently Modified (LRM) order.
|
|
| 629 |
+ #
|
|
| 630 |
+ # Returns:
|
|
| 631 |
+ # (list) - A list of objects and timestamps in LRM order
|
|
| 632 |
+ #
|
|
| 633 |
+ def list_objects(self):
|
|
| 634 |
+ objs = []
|
|
| 635 |
+ mtimes = []
|
|
| 636 |
+ |
|
| 637 |
+ for root, _, files in os.walk(os.path.join(self.casdir, 'objects')):
|
|
| 638 |
+ for filename in files:
|
|
| 639 |
+ obj_path = os.path.join(root, filename)
|
|
| 640 |
+ try:
|
|
| 641 |
+ mtimes.append(os.path.getmtime(obj_path))
|
|
| 642 |
+ except FileNotFoundError:
|
|
| 643 |
+ pass
|
|
| 644 |
+ else:
|
|
| 645 |
+ objs.append(obj_path)
|
|
| 646 |
+ |
|
| 647 |
+ # NOTE: Sorted will sort from earliest to latest, thus the
|
|
| 648 |
+ # first element of this list will be the file modified earliest.
|
|
| 649 |
+ return sorted(zip(mtimes, objs))
|
|
| 650 |
+ |
|
| 651 |
+ def clean_up_refs_until(self, time):
|
|
| 652 |
+ ref_heads = os.path.join(self.casdir, 'refs', 'heads')
|
|
| 653 |
+ |
|
| 654 |
+ for root, _, files in os.walk(ref_heads):
|
|
| 655 |
+ for filename in files:
|
|
| 656 |
+ ref_path = os.path.join(root, filename)
|
|
| 657 |
+ # Obtain the mtime (the time a file was last modified)
|
|
| 658 |
+ if os.path.getmtime(ref_path) < time:
|
|
| 659 |
+ os.unlink(ref_path)
|
|
| 660 |
+ |
|
| 626 | 661 |
# remove():
|
| 627 | 662 |
#
|
| 628 | 663 |
# Removes the given symbolic ref from the repo.
|
| ... | ... | @@ -682,6 +717,10 @@ class CASCache(): |
| 682 | 717 |
|
| 683 | 718 |
return pruned
|
| 684 | 719 |
|
| 720 |
+ def update_tree_mtime(self, tree):
|
|
| 721 |
+ reachable = set()
|
|
| 722 |
+ self._reachable_refs_dir(reachable, tree, update_mtime=True)
|
|
| 723 |
+ |
|
| 685 | 724 |
################################################
|
| 686 | 725 |
# Local Private Methods #
|
| 687 | 726 |
################################################
|
| ... | ... | @@ -828,10 +867,13 @@ class CASCache(): |
| 828 | 867 |
a += 1
|
| 829 | 868 |
b += 1
|
| 830 | 869 |
|
| 831 |
- def _reachable_refs_dir(self, reachable, tree):
|
|
| 870 |
+ def _reachable_refs_dir(self, reachable, tree, update_mtime=False):
|
|
| 832 | 871 |
if tree.hash in reachable:
|
| 833 | 872 |
return
|
| 834 | 873 |
|
| 874 |
+ if update_mtime:
|
|
| 875 |
+ os.utime(self.objpath(tree))
|
|
| 876 |
+ |
|
| 835 | 877 |
reachable.add(tree.hash)
|
| 836 | 878 |
|
| 837 | 879 |
directory = remote_execution_pb2.Directory()
|
| ... | ... | @@ -840,10 +882,12 @@ class CASCache(): |
| 840 | 882 |
directory.ParseFromString(f.read())
|
| 841 | 883 |
|
| 842 | 884 |
for filenode in directory.files:
|
| 885 |
+ if update_mtime:
|
|
| 886 |
+ os.utime(self.objpath(filenode.digest))
|
|
| 843 | 887 |
reachable.add(filenode.digest.hash)
|
| 844 | 888 |
|
| 845 | 889 |
for dirnode in directory.directories:
|
| 846 |
- self._reachable_refs_dir(reachable, dirnode.digest)
|
|
| 890 |
+ self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime)
|
|
| 847 | 891 |
|
| 848 | 892 |
def _required_blobs(self, directory_digest):
|
| 849 | 893 |
# parse directory, and recursively add blobs
|
| ... | ... | @@ -24,6 +24,9 @@ import signal |
| 24 | 24 |
import sys
|
| 25 | 25 |
import tempfile
|
| 26 | 26 |
import uuid
|
| 27 |
+import errno
|
|
| 28 |
+import ctypes
|
|
| 29 |
+import threading
|
|
| 27 | 30 |
|
| 28 | 31 |
import click
|
| 29 | 32 |
import grpc
|
| ... | ... | @@ -56,18 +59,22 @@ class ArtifactTooLargeException(Exception): |
| 56 | 59 |
# repo (str): Path to CAS repository
|
| 57 | 60 |
# enable_push (bool): Whether to allow blob uploads and artifact updates
|
| 58 | 61 |
#
|
| 59 |
-def create_server(repo, *, enable_push):
|
|
| 62 |
+def create_server(repo, *, enable_push,
|
|
| 63 |
+ max_head_size=int(10e9),
|
|
| 64 |
+ min_head_size=int(2e9)):
|
|
| 60 | 65 |
cas = CASCache(os.path.abspath(repo))
|
| 61 | 66 |
|
| 62 | 67 |
# Use max_workers default from Python 3.5+
|
| 63 | 68 |
max_workers = (os.cpu_count() or 1) * 5
|
| 64 | 69 |
server = grpc.server(futures.ThreadPoolExecutor(max_workers))
|
| 65 | 70 |
|
| 71 |
+ cache_cleaner = _CacheCleaner(cas, max_head_size, min_head_size)
|
|
| 72 |
+ |
|
| 66 | 73 |
bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
|
| 67 |
- _ByteStreamServicer(cas, enable_push=enable_push), server)
|
|
| 74 |
+ _ByteStreamServicer(cas, cache_cleaner, enable_push=enable_push), server)
|
|
| 68 | 75 |
|
| 69 | 76 |
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
|
| 70 |
- _ContentAddressableStorageServicer(cas, enable_push=enable_push), server)
|
|
| 77 |
+ _ContentAddressableStorageServicer(cas, cache_cleaner, enable_push=enable_push), server)
|
|
| 71 | 78 |
|
| 72 | 79 |
remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
|
| 73 | 80 |
_CapabilitiesServicer(), server)
|
| ... | ... | @@ -85,9 +92,19 @@ def create_server(repo, *, enable_push): |
| 85 | 92 |
@click.option('--client-certs', help="Public client certificates for TLS (PEM-encoded)")
|
| 86 | 93 |
@click.option('--enable-push', default=False, is_flag=True,
|
| 87 | 94 |
help="Allow clients to upload blobs and update artifact cache")
|
| 95 |
+@click.option('--head-room-min', type=click.INT,
|
|
| 96 |
+ help="Disk head room minimum in bytes",
|
|
| 97 |
+ default=2e9)
|
|
| 98 |
+@click.option('--head-room-max', type=click.INT,
|
|
| 99 |
+ help="Disk head room maximum in bytes",
|
|
| 100 |
+ default=10e9)
|
|
| 88 | 101 |
@click.argument('repo')
|
| 89 |
-def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
|
|
| 90 |
- server = create_server(repo, enable_push=enable_push)
|
|
| 102 |
+def server_main(repo, port, server_key, server_cert, client_certs, enable_push,
|
|
| 103 |
+ head_room_min, head_room_max):
|
|
| 104 |
+ server = create_server(repo,
|
|
| 105 |
+ max_head_size=head_room_max,
|
|
| 106 |
+ min_head_size=head_room_min,
|
|
| 107 |
+ enable_push=enable_push)
|
|
| 91 | 108 |
|
| 92 | 109 |
use_tls = bool(server_key)
|
| 93 | 110 |
|
| ... | ... | @@ -129,10 +146,11 @@ def server_main(repo, port, server_key, server_cert, client_certs, enable_push): |
| 129 | 146 |
|
| 130 | 147 |
|
| 131 | 148 |
class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
|
| 132 |
- def __init__(self, cas, *, enable_push):
|
|
| 149 |
+ def __init__(self, cas, cache_cleaner, *, enable_push):
|
|
| 133 | 150 |
super().__init__()
|
| 134 | 151 |
self.cas = cas
|
| 135 | 152 |
self.enable_push = enable_push
|
| 153 |
+ self.cache_cleaner = cache_cleaner
|
|
| 136 | 154 |
|
| 137 | 155 |
def Read(self, request, context):
|
| 138 | 156 |
resource_name = request.resource_name
|
| ... | ... | @@ -190,17 +208,34 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
| 190 | 208 |
context.set_code(grpc.StatusCode.NOT_FOUND)
|
| 191 | 209 |
return response
|
| 192 | 210 |
|
| 193 |
- try:
|
|
| 194 |
- _clean_up_cache(self.cas, client_digest.size_bytes)
|
|
| 195 |
- except ArtifactTooLargeException as e:
|
|
| 196 |
- context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
|
|
| 197 |
- context.set_details(str(e))
|
|
| 198 |
- return response
|
|
| 211 |
+ while True:
|
|
| 212 |
+ if client_digest.size_bytes == 0:
|
|
| 213 |
+ break
|
|
| 214 |
+ try:
|
|
| 215 |
+ self.cache_cleaner.clean_up(client_digest.size_bytes)
|
|
| 216 |
+ except ArtifactTooLargeException as e:
|
|
| 217 |
+ context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
|
|
| 218 |
+ context.set_details(str(e))
|
|
| 219 |
+ return response
|
|
| 220 |
+ |
|
| 221 |
+ try:
|
|
| 222 |
+ os.posix_fallocate(out.fileno(), 0, client_digest.size_bytes)
|
|
| 223 |
+ break
|
|
| 224 |
+ except OSError as e:
|
|
| 225 |
+ # Multiple upload can happen in the same time
|
|
| 226 |
+ if e.errno != errno.ENOSPC:
|
|
| 227 |
+ raise
|
|
| 228 |
+ |
|
| 199 | 229 |
elif request.resource_name:
|
| 200 | 230 |
# If it is set on subsequent calls, it **must** match the value of the first request.
|
| 201 | 231 |
if request.resource_name != resource_name:
|
| 202 | 232 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
| 203 | 233 |
return response
|
| 234 |
+ |
|
| 235 |
+ if (offset + len(request.data)) > client_digest.size_bytes:
|
|
| 236 |
+ context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
|
| 237 |
+ return response
|
|
| 238 |
+ |
|
| 204 | 239 |
out.write(request.data)
|
| 205 | 240 |
offset += len(request.data)
|
| 206 | 241 |
if request.finish_write:
|
| ... | ... | @@ -221,18 +256,26 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
| 221 | 256 |
|
| 222 | 257 |
|
| 223 | 258 |
class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
|
| 224 |
- def __init__(self, cas, *, enable_push):
|
|
| 259 |
+ def __init__(self, cas, cache_cleaner, *, enable_push):
|
|
| 225 | 260 |
super().__init__()
|
| 226 | 261 |
self.cas = cas
|
| 227 | 262 |
self.enable_push = enable_push
|
| 263 |
+ self.cache_cleaner = cache_cleaner
|
|
| 228 | 264 |
|
| 229 | 265 |
def FindMissingBlobs(self, request, context):
|
| 230 | 266 |
response = remote_execution_pb2.FindMissingBlobsResponse()
|
| 231 | 267 |
for digest in request.blob_digests:
|
| 232 |
- if not _has_object(self.cas, digest):
|
|
| 233 |
- d = response.missing_blob_digests.add()
|
|
| 234 |
- d.hash = digest.hash
|
|
| 235 |
- d.size_bytes = digest.size_bytes
|
|
| 268 |
+ objpath = self.cas.objpath(digest)
|
|
| 269 |
+ try:
|
|
| 270 |
+ os.utime(objpath)
|
|
| 271 |
+ except OSError as e:
|
|
| 272 |
+ if e.errno != errno.ENOENT:
|
|
| 273 |
+ raise
|
|
| 274 |
+ else:
|
|
| 275 |
+ d = response.missing_blob_digests.add()
|
|
| 276 |
+ d.hash = digest.hash
|
|
| 277 |
+ d.size_bytes = digest.size_bytes
|
|
| 278 |
+ |
|
| 236 | 279 |
return response
|
| 237 | 280 |
|
| 238 | 281 |
def BatchReadBlobs(self, request, context):
|
| ... | ... | @@ -286,7 +329,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres |
| 286 | 329 |
continue
|
| 287 | 330 |
|
| 288 | 331 |
try:
|
| 289 |
- _clean_up_cache(self.cas, digest.size_bytes)
|
|
| 332 |
+ self.cache_cleaner.clean_up(digest.size_bytes)
|
|
| 290 | 333 |
|
| 291 | 334 |
with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
|
| 292 | 335 |
out.write(blob_request.data)
|
| ... | ... | @@ -329,6 +372,12 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): |
| 329 | 372 |
|
| 330 | 373 |
try:
|
| 331 | 374 |
tree = self.cas.resolve_ref(request.key, update_mtime=True)
|
| 375 |
+ try:
|
|
| 376 |
+ self.cas.update_tree_mtime(tree)
|
|
| 377 |
+ except FileNotFoundError:
|
|
| 378 |
+ self.cas.remove(request.key, defer_prune=True)
|
|
| 379 |
+ context.set_code(grpc.StatusCode.NOT_FOUND)
|
|
| 380 |
+ return response
|
|
| 332 | 381 |
|
| 333 | 382 |
response.digest.hash = tree.hash
|
| 334 | 383 |
response.digest.size_bytes = tree.size_bytes
|
| ... | ... | @@ -401,60 +450,79 @@ def _digest_from_upload_resource_name(resource_name): |
| 401 | 450 |
return None
|
| 402 | 451 |
|
| 403 | 452 |
|
| 404 |
-def _has_object(cas, digest):
|
|
| 405 |
- objpath = cas.objpath(digest)
|
|
| 406 |
- return os.path.exists(objpath)
|
|
| 453 |
+class _CacheCleaner:
|
|
| 407 | 454 |
|
| 455 |
+ __cleanup_cache_lock = threading.Lock()
|
|
| 408 | 456 |
|
| 409 |
-# _clean_up_cache()
|
|
| 410 |
-#
|
|
| 411 |
-# Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
|
|
| 412 |
-# is enough space for the incoming artifact
|
|
| 413 |
-#
|
|
| 414 |
-# Args:
|
|
| 415 |
-# cas: CASCache object
|
|
| 416 |
-# object_size: The size of the object being received in bytes
|
|
| 417 |
-#
|
|
| 418 |
-# Returns:
|
|
| 419 |
-# int: The total bytes removed on the filesystem
|
|
| 420 |
-#
|
|
| 421 |
-def _clean_up_cache(cas, object_size):
|
|
| 422 |
- # Determine the available disk space, in bytes, of the file system
|
|
| 423 |
- # which mounts the repo
|
|
| 424 |
- stats = os.statvfs(cas.casdir)
|
|
| 425 |
- buffer_ = int(2e9) # Add a 2 GB buffer
|
|
| 426 |
- free_disk_space = (stats.f_bavail * stats.f_bsize) - buffer_
|
|
| 427 |
- total_disk_space = (stats.f_blocks * stats.f_bsize) - buffer_
|
|
| 428 |
- |
|
| 429 |
- if object_size > total_disk_space:
|
|
| 430 |
- raise ArtifactTooLargeException("Artifact of size: {} is too large for "
|
|
| 431 |
- "the filesystem which mounts the remote "
|
|
| 432 |
- "cache".format(object_size))
|
|
| 433 |
- |
|
| 434 |
- if object_size <= free_disk_space:
|
|
| 435 |
- # No need to clean up
|
|
| 436 |
- return 0
|
|
| 437 |
- |
|
| 438 |
- # obtain a list of LRP artifacts
|
|
| 439 |
- LRP_artifacts = cas.list_refs()
|
|
| 440 |
- |
|
| 441 |
- removed_size = 0 # in bytes
|
|
| 442 |
- while object_size - removed_size > free_disk_space:
|
|
| 443 |
- try:
|
|
| 444 |
- to_remove = LRP_artifacts.pop(0) # The first element in the list is the LRP artifact
|
|
| 445 |
- except IndexError:
|
|
| 446 |
- # This exception is caught if there are no more artifacts in the list
|
|
| 447 |
- # LRP_artifacts. This means the the artifact is too large for the filesystem
|
|
| 448 |
- # so we abort the process
|
|
| 449 |
- raise ArtifactTooLargeException("Artifact of size {} is too large for "
|
|
| 450 |
- "the filesystem which mounts the remote "
|
|
| 451 |
- "cache".format(object_size))
|
|
| 457 |
+ def __init__(self, cas, max_head_size, min_head_size=int(2e9)):
|
|
| 458 |
+ self.__cas = cas
|
|
| 459 |
+ self.__max_head_size = max_head_size
|
|
| 460 |
+ self.__min_head_size = min_head_size
|
|
| 452 | 461 |
|
| 453 |
- removed_size += cas.remove(to_remove, defer_prune=False)
|
|
| 462 |
+ def __has_space(self, object_size):
|
|
| 463 |
+ stats = os.statvfs(self.__cas.casdir)
|
|
| 464 |
+ free_disk_space = (stats.f_bavail * stats.f_bsize) - self.__min_head_size
|
|
| 465 |
+ total_disk_space = (stats.f_blocks * stats.f_bsize) - self.__min_head_size
|
|
| 454 | 466 |
|
| 455 |
- if removed_size > 0:
|
|
| 456 |
- logging.info("Successfully removed {} bytes from the cache".format(removed_size))
|
|
| 457 |
- else:
|
|
| 458 |
- logging.info("No artifacts were removed from the cache.")
|
|
| 467 |
+ if object_size > total_disk_space:
|
|
| 468 |
+ raise ArtifactTooLargeException("Artifact of size: {} is too large for "
|
|
| 469 |
+ "the filesystem which mounts the remote "
|
|
| 470 |
+ "cache".format(object_size))
|
|
| 459 | 471 |
|
| 460 |
- return removed_size
|
|
| 472 |
+ return object_size <= free_disk_space
|
|
| 473 |
+ |
|
| 474 |
+ # _clean_up_cache()
|
|
| 475 |
+ #
|
|
| 476 |
+ # Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
|
|
| 477 |
+ # is enough space for the incoming artifact
|
|
| 478 |
+ #
|
|
| 479 |
+ # Args:
|
|
| 480 |
+ # object_size: The size of the object being received in bytes
|
|
| 481 |
+ #
|
|
| 482 |
+ # Returns:
|
|
| 483 |
+ # int: The total bytes removed on the filesystem
|
|
| 484 |
+ #
|
|
| 485 |
+ def clean_up(self, object_size):
|
|
| 486 |
+ if self.__has_space(object_size):
|
|
| 487 |
+ return 0
|
|
| 488 |
+ |
|
| 489 |
+ with _CacheCleaner.__cleanup_cache_lock:
|
|
| 490 |
+ if self.__has_space(object_size):
|
|
| 491 |
+ # Another thread has done the cleanup for us
|
|
| 492 |
+ return 0
|
|
| 493 |
+ |
|
| 494 |
+ stats = os.statvfs(self.__cas.casdir)
|
|
| 495 |
+ target_disk_space = (stats.f_bavail * stats.f_bsize) - self.__max_head_size
|
|
| 496 |
+ |
|
| 497 |
+ # obtain a list of LRP artifacts
|
|
| 498 |
+ LRP_objects = self.__cas.list_objects()
|
|
| 499 |
+ |
|
| 500 |
+ removed_size = 0 # in bytes
|
|
| 501 |
+ last_mtime = 0
|
|
| 502 |
+ |
|
| 503 |
+ while object_size - removed_size > target_disk_space:
|
|
| 504 |
+ try:
|
|
| 505 |
+ last_mtime, to_remove = LRP_objects.pop(0) # The first element in the list is the LRP artifact
|
|
| 506 |
+ except IndexError:
|
|
| 507 |
+ # This exception is caught if there are no more artifacts in the list
|
|
| 508 |
+ # LRP_artifacts. This means the the artifact is too large for the filesystem
|
|
| 509 |
+ # so we abort the process
|
|
| 510 |
+ raise ArtifactTooLargeException("Artifact of size {} is too large for "
|
|
| 511 |
+ "the filesystem which mounts the remote "
|
|
| 512 |
+ "cache".format(object_size))
|
|
| 513 |
+ |
|
| 514 |
+ try:
|
|
| 515 |
+ size = os.stat(to_remove).st_size
|
|
| 516 |
+ os.unlink(to_remove)
|
|
| 517 |
+ removed_size += size
|
|
| 518 |
+ except FileNotFoundError:
|
|
| 519 |
+ pass
|
|
| 520 |
+ |
|
| 521 |
+ self.__cas.clean_up_refs_until(last_mtime)
|
|
| 522 |
+ |
|
| 523 |
+ if removed_size > 0:
|
|
| 524 |
+ logging.info("Successfully removed {} bytes from the cache".format(removed_size))
|
|
| 525 |
+ else:
|
|
| 526 |
+ logging.info("No artifacts were removed from the cache.")
|
|
| 527 |
+ |
|
| 528 |
+ return removed_size
|
| ... | ... | @@ -230,6 +230,8 @@ def test_artifact_expires(cli, datafiles, tmpdir): |
| 230 | 230 |
# Create an artifact share (remote artifact cache) in the tmpdir/artifactshare
|
| 231 | 231 |
# Mock a file system with 12 MB free disk space
|
| 232 | 232 |
with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'),
|
| 233 |
+ min_head_size=int(2e9),
|
|
| 234 |
+ max_head_size=int(2e9),
|
|
| 233 | 235 |
total_space=int(10e9), free_space=(int(12e6) + int(2e9))) as share:
|
| 234 | 236 |
|
| 235 | 237 |
# Configure bst to push to the cache
|
| ... | ... | @@ -313,6 +315,8 @@ def test_recently_pulled_artifact_does_not_expire(cli, datafiles, tmpdir): |
| 313 | 315 |
# Create an artifact share (remote cache) in tmpdir/artifactshare
|
| 314 | 316 |
# Mock a file system with 12 MB free disk space
|
| 315 | 317 |
with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'),
|
| 318 |
+ min_head_size=int(2e9),
|
|
| 319 |
+ max_head_size=int(2e9),
|
|
| 316 | 320 |
total_space=int(10e9), free_space=(int(12e6) + int(2e9))) as share:
|
| 317 | 321 |
|
| 318 | 322 |
# Configure bst to push to the cache
|
| ... | ... | @@ -29,7 +29,11 @@ from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution |
| 29 | 29 |
#
|
| 30 | 30 |
class ArtifactShare():
|
| 31 | 31 |
|
| 32 |
- def __init__(self, directory, *, total_space=None, free_space=None):
|
|
| 32 |
+ def __init__(self, directory, *,
|
|
| 33 |
+ total_space=None,
|
|
| 34 |
+ free_space=None,
|
|
| 35 |
+ min_head_size=int(2e9),
|
|
| 36 |
+ max_head_size=int(10e9)):
|
|
| 33 | 37 |
|
| 34 | 38 |
# The working directory for the artifact share (in case it
|
| 35 | 39 |
# needs to do something outside of its backend's storage folder).
|
| ... | ... | @@ -50,6 +54,9 @@ class ArtifactShare(): |
| 50 | 54 |
self.total_space = total_space
|
| 51 | 55 |
self.free_space = free_space
|
| 52 | 56 |
|
| 57 |
+ self.max_head_size = max_head_size
|
|
| 58 |
+ self.min_head_size = min_head_size
|
|
| 59 |
+ |
|
| 53 | 60 |
q = Queue()
|
| 54 | 61 |
|
| 55 | 62 |
self.process = Process(target=self.run, args=(q,))
|
| ... | ... | @@ -74,7 +81,10 @@ class ArtifactShare(): |
| 74 | 81 |
self.free_space = self.total_space
|
| 75 | 82 |
os.statvfs = self._mock_statvfs
|
| 76 | 83 |
|
| 77 |
- server = create_server(self.repodir, enable_push=True)
|
|
| 84 |
+ server = create_server(self.repodir,
|
|
| 85 |
+ max_head_size=self.max_head_size,
|
|
| 86 |
+ min_head_size=self.min_head_size,
|
|
| 87 |
+ enable_push=True)
|
|
| 78 | 88 |
port = server.add_insecure_port('localhost:0')
|
| 79 | 89 |
|
| 80 | 90 |
server.start()
|
| ... | ... | @@ -136,6 +146,15 @@ class ArtifactShare(): |
| 136 | 146 |
|
| 137 | 147 |
try:
|
| 138 | 148 |
tree = self.cas.resolve_ref(artifact_key)
|
| 149 |
+ reachable = set()
|
|
| 150 |
+ try:
|
|
| 151 |
+ self.cas._reachable_refs_dir(reachable, tree, update_mtime=False)
|
|
| 152 |
+ except FileNotFoundError:
|
|
| 153 |
+ return None
|
|
| 154 |
+ for digest in reachable:
|
|
| 155 |
+ object_name = os.path.join(self.cas.casdir, 'objects', digest[:2], digest[2:])
|
|
| 156 |
+ if not os.path.exists(object_name):
|
|
| 157 |
+ return None
|
|
| 139 | 158 |
return tree
|
| 140 | 159 |
except CASError:
|
| 141 | 160 |
return None
|
| ... | ... | @@ -167,8 +186,11 @@ class ArtifactShare(): |
| 167 | 186 |
# Create an ArtifactShare for use in a test case
|
| 168 | 187 |
#
|
| 169 | 188 |
@contextmanager
|
| 170 |
-def create_artifact_share(directory, *, total_space=None, free_space=None):
|
|
| 171 |
- share = ArtifactShare(directory, total_space=total_space, free_space=free_space)
|
|
| 189 |
+def create_artifact_share(directory, *, total_space=None, free_space=None,
|
|
| 190 |
+ min_head_size=int(2e9),
|
|
| 191 |
+ max_head_size=int(10e9)):
|
|
| 192 |
+ share = ArtifactShare(directory, total_space=total_space, free_space=free_space,
|
|
| 193 |
+ min_head_size=min_head_size, max_head_size=max_head_size)
|
|
| 172 | 194 |
try:
|
| 173 | 195 |
yield share
|
| 174 | 196 |
finally:
|
