Valentin David pushed to branch valentindavid/cache_server_fill_up-1.2 at BuildStream / buildstream
Commits:
- 
1c4b410f
by Valentin David at 2018-11-29T16:17:21Z
 - 
2259dff1
by Valentin David at 2018-11-29T16:17:27Z
 - 
d6b661e1
by Valentin David at 2018-11-29T16:17:27Z
 - 
dbca7c70
by Valentin David at 2018-11-29T16:17:27Z
 - 
65ea6832
by Valentin David at 2018-11-29T16:17:27Z
 
4 changed files:
- buildstream/_artifactcache/cascache.py
 - buildstream/_artifactcache/casserver.py
 - tests/frontend/push.py
 - tests/testutils/artifactshare.py
 
Changes:
| ... | ... | @@ -500,6 +500,41 @@ class CASCache(ArtifactCache): | 
| 500 | 500 | 
         # first element of this list will be the file modified earliest.
 | 
| 501 | 501 | 
         return [ref for _, ref in sorted(zip(mtimes, refs))]
 | 
| 502 | 502 | 
 | 
| 503 | 
+    # list_objects():
 | 
|
| 504 | 
+    #
 | 
|
| 505 | 
+    # List cached objects in Least Recently Modified (LRM) order.
 | 
|
| 506 | 
+    #
 | 
|
| 507 | 
+    # Returns:
 | 
|
| 508 | 
+    #     (list) - A list of objects and timestamps in LRM order
 | 
|
| 509 | 
+    #
 | 
|
| 510 | 
+    def list_objects(self):
 | 
|
| 511 | 
+        objs = []
 | 
|
| 512 | 
+        mtimes = []
 | 
|
| 513 | 
+  | 
|
| 514 | 
+        for root, _, files in os.walk(os.path.join(self.casdir, 'objects')):
 | 
|
| 515 | 
+            for filename in files:
 | 
|
| 516 | 
+                obj_path = os.path.join(root, filename)
 | 
|
| 517 | 
+                try:
 | 
|
| 518 | 
+                    mtimes.append(os.path.getmtime(obj_path))
 | 
|
| 519 | 
+                except FileNotFoundError:
 | 
|
| 520 | 
+                    pass
 | 
|
| 521 | 
+                else:
 | 
|
| 522 | 
+                    objs.append(obj_path)
 | 
|
| 523 | 
+  | 
|
| 524 | 
+        # NOTE: Sorted will sort from earliest to latest, thus the
 | 
|
| 525 | 
+        # first element of this list will be the file modified earliest.
 | 
|
| 526 | 
+        return sorted(zip(mtimes, objs))
 | 
|
| 527 | 
+  | 
|
| 528 | 
+    def clean_up_refs_until(self, time):
 | 
|
| 529 | 
+        ref_heads = os.path.join(self.casdir, 'refs', 'heads')
 | 
|
| 530 | 
+  | 
|
| 531 | 
+        for root, _, files in os.walk(ref_heads):
 | 
|
| 532 | 
+            for filename in files:
 | 
|
| 533 | 
+                ref_path = os.path.join(root, filename)
 | 
|
| 534 | 
+                # Obtain the mtime (the time a file was last modified)
 | 
|
| 535 | 
+                if os.path.getmtime(ref_path) < time:
 | 
|
| 536 | 
+                    os.unlink(ref_path)
 | 
|
| 537 | 
+  | 
|
| 503 | 538 | 
     # remove():
 | 
| 504 | 539 | 
     #
 | 
| 505 | 540 | 
     # Removes the given symbolic ref from the repo.
 | 
| ... | ... | @@ -577,6 +612,10 @@ class CASCache(ArtifactCache): | 
| 577 | 612 | 
 | 
| 578 | 613 | 
         return pruned
 | 
| 579 | 614 | 
 | 
| 615 | 
+    def update_tree_mtime(self, tree):
 | 
|
| 616 | 
+        reachable = set()
 | 
|
| 617 | 
+        self._reachable_refs_dir(reachable, tree, update_mtime=True)
 | 
|
| 618 | 
+  | 
|
| 580 | 619 | 
     ################################################
 | 
| 581 | 620 | 
     #             Local Private Methods            #
 | 
| 582 | 621 | 
     ################################################
 | 
| ... | ... | @@ -718,10 +757,13 @@ class CASCache(ArtifactCache): | 
| 718 | 757 | 
                 a += 1
 | 
| 719 | 758 | 
                 b += 1
 | 
| 720 | 759 | 
 | 
| 721 | 
-    def _reachable_refs_dir(self, reachable, tree):
 | 
|
| 760 | 
+    def _reachable_refs_dir(self, reachable, tree, update_mtime=False):
 | 
|
| 722 | 761 | 
         if tree.hash in reachable:
 | 
| 723 | 762 | 
             return
 | 
| 724 | 763 | 
 | 
| 764 | 
+        if update_mtime:
 | 
|
| 765 | 
+            os.utime(self.objpath(tree))
 | 
|
| 766 | 
+  | 
|
| 725 | 767 | 
         reachable.add(tree.hash)
 | 
| 726 | 768 | 
 | 
| 727 | 769 | 
         directory = remote_execution_pb2.Directory()
 | 
| ... | ... | @@ -730,10 +772,12 @@ class CASCache(ArtifactCache): | 
| 730 | 772 | 
             directory.ParseFromString(f.read())
 | 
| 731 | 773 | 
 | 
| 732 | 774 | 
         for filenode in directory.files:
 | 
| 775 | 
+            if update_mtime:
 | 
|
| 776 | 
+                os.utime(self.objpath(filenode.digest))
 | 
|
| 733 | 777 | 
             reachable.add(filenode.digest.hash)
 | 
| 734 | 778 | 
 | 
| 735 | 779 | 
         for dirnode in directory.directories:
 | 
| 736 | 
-            self._reachable_refs_dir(reachable, dirnode.digest)
 | 
|
| 780 | 
+            self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime)
 | 
|
| 737 | 781 | 
 | 
| 738 | 782 | 
     def _initialize_remote(self, remote_spec, q):
 | 
| 739 | 783 | 
         try:
 | 
| ... | ... | @@ -24,6 +24,8 @@ import signal | 
| 24 | 24 | 
 import sys
 | 
| 25 | 25 | 
 import tempfile
 | 
| 26 | 26 | 
 import uuid
 | 
| 27 | 
+import errno
 | 
|
| 28 | 
+import threading
 | 
|
| 27 | 29 | 
 | 
| 28 | 30 | 
 import click
 | 
| 29 | 31 | 
 import grpc
 | 
| ... | ... | @@ -56,7 +58,9 @@ class ArtifactTooLargeException(Exception): | 
| 56 | 58 | 
 #     repo (str): Path to CAS repository
 | 
| 57 | 59 | 
 #     enable_push (bool): Whether to allow blob uploads and artifact updates
 | 
| 58 | 60 | 
 #
 | 
| 59 | 
-def create_server(repo, *, enable_push):
 | 
|
| 61 | 
+def create_server(repo, *, enable_push,
 | 
|
| 62 | 
+                  max_head_size=int(10e9),
 | 
|
| 63 | 
+                  min_head_size=int(2e9)):
 | 
|
| 60 | 64 | 
     context = Context()
 | 
| 61 | 65 | 
     context.artifactdir = os.path.abspath(repo)
 | 
| 62 | 66 | 
 | 
| ... | ... | @@ -66,11 +70,13 @@ def create_server(repo, *, enable_push): | 
| 66 | 70 | 
     max_workers = (os.cpu_count() or 1) * 5
 | 
| 67 | 71 | 
     server = grpc.server(futures.ThreadPoolExecutor(max_workers))
 | 
| 68 | 72 | 
 | 
| 73 | 
+    cache_cleaner = _CacheCleaner(artifactcache, max_head_size, min_head_size)
 | 
|
| 74 | 
+  | 
|
| 69 | 75 | 
     bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
 | 
| 70 | 
-        _ByteStreamServicer(artifactcache, enable_push=enable_push), server)
 | 
|
| 76 | 
+        _ByteStreamServicer(artifactcache, cache_cleaner, enable_push=enable_push), server)
 | 
|
| 71 | 77 | 
 | 
| 72 | 78 | 
     remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
 | 
| 73 | 
-        _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
 | 
|
| 79 | 
+        _ContentAddressableStorageServicer(artifactcache, cache_cleaner, enable_push=enable_push), server)
 | 
|
| 74 | 80 | 
 | 
| 75 | 81 | 
     remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
 | 
| 76 | 82 | 
         _CapabilitiesServicer(), server)
 | 
| ... | ... | @@ -88,9 +94,19 @@ def create_server(repo, *, enable_push): | 
| 88 | 94 | 
 @click.option('--client-certs', help="Public client certificates for TLS (PEM-encoded)")
 | 
| 89 | 95 | 
 @click.option('--enable-push', default=False, is_flag=True,
 | 
| 90 | 96 | 
               help="Allow clients to upload blobs and update artifact cache")
 | 
| 97 | 
+@click.option('--head-room-min', type=click.INT,
 | 
|
| 98 | 
+              help="Disk head room minimum in bytes",
 | 
|
| 99 | 
+              default=2e9)
 | 
|
| 100 | 
+@click.option('--head-room-max', type=click.INT,
 | 
|
| 101 | 
+              help="Disk head room maximum in bytes",
 | 
|
| 102 | 
+              default=10e9)
 | 
|
| 91 | 103 | 
 @click.argument('repo')
 | 
| 92 | 
-def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
 | 
|
| 93 | 
-    server = create_server(repo, enable_push=enable_push)
 | 
|
| 104 | 
+def server_main(repo, port, server_key, server_cert, client_certs, enable_push,
 | 
|
| 105 | 
+                head_room_min, head_room_max):
 | 
|
| 106 | 
+    server = create_server(repo,
 | 
|
| 107 | 
+                           max_head_size=head_room_max,
 | 
|
| 108 | 
+                           min_head_size=head_room_min,
 | 
|
| 109 | 
+                           enable_push=enable_push)
 | 
|
| 94 | 110 | 
 | 
| 95 | 111 | 
     use_tls = bool(server_key)
 | 
| 96 | 112 | 
 | 
| ... | ... | @@ -132,10 +148,11 @@ def server_main(repo, port, server_key, server_cert, client_certs, enable_push): | 
| 132 | 148 | 
 | 
| 133 | 149 | 
 | 
| 134 | 150 | 
 class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
 | 
| 135 | 
-    def __init__(self, cas, *, enable_push):
 | 
|
| 151 | 
+    def __init__(self, cas, cache_cleaner, *, enable_push):
 | 
|
| 136 | 152 | 
         super().__init__()
 | 
| 137 | 153 | 
         self.cas = cas
 | 
| 138 | 154 | 
         self.enable_push = enable_push
 | 
| 155 | 
+        self.cache_cleaner = cache_cleaner
 | 
|
| 139 | 156 | 
 | 
| 140 | 157 | 
     def Read(self, request, context):
 | 
| 141 | 158 | 
         resource_name = request.resource_name
 | 
| ... | ... | @@ -193,17 +210,34 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): | 
| 193 | 210 | 
                         context.set_code(grpc.StatusCode.NOT_FOUND)
 | 
| 194 | 211 | 
                         return response
 | 
| 195 | 212 | 
 | 
| 196 | 
-                    try:
 | 
|
| 197 | 
-                        _clean_up_cache(self.cas, client_digest.size_bytes)
 | 
|
| 198 | 
-                    except ArtifactTooLargeException as e:
 | 
|
| 199 | 
-                        context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
 | 
|
| 200 | 
-                        context.set_details(str(e))
 | 
|
| 201 | 
-                        return response
 | 
|
| 213 | 
+                    while True:
 | 
|
| 214 | 
+                        if client_digest.size_bytes == 0:
 | 
|
| 215 | 
+                            break
 | 
|
| 216 | 
+                        try:
 | 
|
| 217 | 
+                            self.cache_cleaner.clean_up(client_digest.size_bytes)
 | 
|
| 218 | 
+                        except ArtifactTooLargeException as e:
 | 
|
| 219 | 
+                            context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
 | 
|
| 220 | 
+                            context.set_details(str(e))
 | 
|
| 221 | 
+                            return response
 | 
|
| 222 | 
+  | 
|
| 223 | 
+                        try:
 | 
|
| 224 | 
+                            os.posix_fallocate(out.fileno(), 0, client_digest.size_bytes)
 | 
|
| 225 | 
+                            break
 | 
|
| 226 | 
+                        except OSError as e:
 | 
|
| 227 | 
+                            # Multiple upload can happen in the same time
 | 
|
| 228 | 
+                            if e.errno != errno.ENOSPC:
 | 
|
| 229 | 
+                                raise
 | 
|
| 230 | 
+  | 
|
| 202 | 231 | 
                 elif request.resource_name:
 | 
| 203 | 232 | 
                     # If it is set on subsequent calls, it **must** match the value of the first request.
 | 
| 204 | 233 | 
                     if request.resource_name != resource_name:
 | 
| 205 | 234 | 
                         context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
 | 
| 206 | 235 | 
                         return response
 | 
| 236 | 
+  | 
|
| 237 | 
+                if (offset + len(request.data)) > client_digest.size_bytes:
 | 
|
| 238 | 
+                    context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
 | 
|
| 239 | 
+                    return response
 | 
|
| 240 | 
+  | 
|
| 207 | 241 | 
                 out.write(request.data)
 | 
| 208 | 242 | 
                 offset += len(request.data)
 | 
| 209 | 243 | 
                 if request.finish_write:
 | 
| ... | ... | @@ -224,18 +258,26 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): | 
| 224 | 258 | 
 | 
| 225 | 259 | 
 | 
| 226 | 260 | 
 class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
 | 
| 227 | 
-    def __init__(self, cas, *, enable_push):
 | 
|
| 261 | 
+    def __init__(self, cas, cache_cleaner, *, enable_push):
 | 
|
| 228 | 262 | 
         super().__init__()
 | 
| 229 | 263 | 
         self.cas = cas
 | 
| 230 | 264 | 
         self.enable_push = enable_push
 | 
| 265 | 
+        self.cache_cleaner = cache_cleaner
 | 
|
| 231 | 266 | 
 | 
| 232 | 267 | 
     def FindMissingBlobs(self, request, context):
 | 
| 233 | 268 | 
         response = remote_execution_pb2.FindMissingBlobsResponse()
 | 
| 234 | 269 | 
         for digest in request.blob_digests:
 | 
| 235 | 
-            if not _has_object(self.cas, digest):
 | 
|
| 236 | 
-                d = response.missing_blob_digests.add()
 | 
|
| 237 | 
-                d.hash = digest.hash
 | 
|
| 238 | 
-                d.size_bytes = digest.size_bytes
 | 
|
| 270 | 
+            objpath = self.cas.objpath(digest)
 | 
|
| 271 | 
+            try:
 | 
|
| 272 | 
+                os.utime(objpath)
 | 
|
| 273 | 
+            except OSError as e:
 | 
|
| 274 | 
+                if e.errno != errno.ENOENT:
 | 
|
| 275 | 
+                    raise
 | 
|
| 276 | 
+                else:
 | 
|
| 277 | 
+                    d = response.missing_blob_digests.add()
 | 
|
| 278 | 
+                    d.hash = digest.hash
 | 
|
| 279 | 
+                    d.size_bytes = digest.size_bytes
 | 
|
| 280 | 
+  | 
|
| 239 | 281 | 
         return response
 | 
| 240 | 282 | 
 | 
| 241 | 283 | 
     def BatchReadBlobs(self, request, context):
 | 
| ... | ... | @@ -289,7 +331,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres | 
| 289 | 331 | 
                 continue
 | 
| 290 | 332 | 
 | 
| 291 | 333 | 
             try:
 | 
| 292 | 
-                _clean_up_cache(self.cas, digest.size_bytes)
 | 
|
| 334 | 
+                self.cache_cleaner.clean_up(digest.size_bytes)
 | 
|
| 293 | 335 | 
 | 
| 294 | 336 | 
                 with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
 | 
| 295 | 337 | 
                     out.write(blob_request.data)
 | 
| ... | ... | @@ -332,6 +374,12 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): | 
| 332 | 374 | 
 | 
| 333 | 375 | 
         try:
 | 
| 334 | 376 | 
             tree = self.cas.resolve_ref(request.key, update_mtime=True)
 | 
| 377 | 
+            try:
 | 
|
| 378 | 
+                self.cas.update_tree_mtime(tree)
 | 
|
| 379 | 
+            except FileNotFoundError:
 | 
|
| 380 | 
+                self.cas.remove(request.key, defer_prune=True)
 | 
|
| 381 | 
+                context.set_code(grpc.StatusCode.NOT_FOUND)
 | 
|
| 382 | 
+                return response
 | 
|
| 335 | 383 | 
 | 
| 336 | 384 | 
             response.digest.hash = tree.hash
 | 
| 337 | 385 | 
             response.digest.size_bytes = tree.size_bytes
 | 
| ... | ... | @@ -404,60 +452,79 @@ def _digest_from_upload_resource_name(resource_name): | 
| 404 | 452 | 
         return None
 | 
| 405 | 453 | 
 | 
| 406 | 454 | 
 | 
| 407 | 
-def _has_object(cas, digest):
 | 
|
| 408 | 
-    objpath = cas.objpath(digest)
 | 
|
| 409 | 
-    return os.path.exists(objpath)
 | 
|
| 455 | 
+class _CacheCleaner:
 | 
|
| 410 | 456 | 
 | 
| 457 | 
+    __cleanup_cache_lock = threading.Lock()
 | 
|
| 411 | 458 | 
 | 
| 412 | 
-# _clean_up_cache()
 | 
|
| 413 | 
-#
 | 
|
| 414 | 
-# Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
 | 
|
| 415 | 
-# is enough space for the incoming artifact
 | 
|
| 416 | 
-#
 | 
|
| 417 | 
-# Args:
 | 
|
| 418 | 
-#   cas: CASCache object
 | 
|
| 419 | 
-#   object_size: The size of the object being received in bytes
 | 
|
| 420 | 
-#
 | 
|
| 421 | 
-# Returns:
 | 
|
| 422 | 
-#   int: The total bytes removed on the filesystem
 | 
|
| 423 | 
-#
 | 
|
| 424 | 
-def _clean_up_cache(cas, object_size):
 | 
|
| 425 | 
-    # Determine the available disk space, in bytes, of the file system
 | 
|
| 426 | 
-    # which mounts the repo
 | 
|
| 427 | 
-    stats = os.statvfs(cas.casdir)
 | 
|
| 428 | 
-    buffer_ = int(2e9)                # Add a 2 GB buffer
 | 
|
| 429 | 
-    free_disk_space = (stats.f_bavail * stats.f_bsize) - buffer_
 | 
|
| 430 | 
-    total_disk_space = (stats.f_blocks * stats.f_bsize) - buffer_
 | 
|
| 431 | 
-  | 
|
| 432 | 
-    if object_size > total_disk_space:
 | 
|
| 433 | 
-        raise ArtifactTooLargeException("Artifact of size: {} is too large for "
 | 
|
| 434 | 
-                                        "the filesystem which mounts the remote "
 | 
|
| 435 | 
-                                        "cache".format(object_size))
 | 
|
| 436 | 
-  | 
|
| 437 | 
-    if object_size <= free_disk_space:
 | 
|
| 438 | 
-        # No need to clean up
 | 
|
| 439 | 
-        return 0
 | 
|
| 440 | 
-  | 
|
| 441 | 
-    # obtain a list of LRP artifacts
 | 
|
| 442 | 
-    LRP_artifacts = cas.list_artifacts()
 | 
|
| 443 | 
-  | 
|
| 444 | 
-    removed_size = 0  # in bytes
 | 
|
| 445 | 
-    while object_size - removed_size > free_disk_space:
 | 
|
| 446 | 
-        try:
 | 
|
| 447 | 
-            to_remove = LRP_artifacts.pop(0)  # The first element in the list is the LRP artifact
 | 
|
| 448 | 
-        except IndexError:
 | 
|
| 449 | 
-            # This exception is caught if there are no more artifacts in the list
 | 
|
| 450 | 
-            # LRP_artifacts. This means the the artifact is too large for the filesystem
 | 
|
| 451 | 
-            # so we abort the process
 | 
|
| 452 | 
-            raise ArtifactTooLargeException("Artifact of size {} is too large for "
 | 
|
| 453 | 
-                                            "the filesystem which mounts the remote "
 | 
|
| 454 | 
-                                            "cache".format(object_size))
 | 
|
| 459 | 
+    def __init__(self, cas, max_head_size, min_head_size=int(2e9)):
 | 
|
| 460 | 
+        self.__cas = cas
 | 
|
| 461 | 
+        self.__max_head_size = max_head_size
 | 
|
| 462 | 
+        self.__min_head_size = min_head_size
 | 
|
| 455 | 463 | 
 | 
| 456 | 
-        removed_size += cas.remove(to_remove, defer_prune=False)
 | 
|
| 464 | 
+    def __has_space(self, object_size):
 | 
|
| 465 | 
+        stats = os.statvfs(self.__cas.casdir)
 | 
|
| 466 | 
+        free_disk_space = (stats.f_bavail * stats.f_bsize) - self.__min_head_size
 | 
|
| 467 | 
+        total_disk_space = (stats.f_blocks * stats.f_bsize) - self.__min_head_size
 | 
|
| 457 | 468 | 
 | 
| 458 | 
-    if removed_size > 0:
 | 
|
| 459 | 
-        logging.info("Successfully removed {} bytes from the cache".format(removed_size))
 | 
|
| 460 | 
-    else:
 | 
|
| 461 | 
-        logging.info("No artifacts were removed from the cache.")
 | 
|
| 469 | 
+        if object_size > total_disk_space:
 | 
|
| 470 | 
+            raise ArtifactTooLargeException("Artifact of size: {} is too large for "
 | 
|
| 471 | 
+                                            "the filesystem which mounts the remote "
 | 
|
| 472 | 
+                                            "cache".format(object_size))
 | 
|
| 462 | 473 | 
 | 
| 463 | 
-    return removed_size
 | 
|
| 474 | 
+        return object_size <= free_disk_space
 | 
|
| 475 | 
+  | 
|
| 476 | 
+    # _clean_up_cache()
 | 
|
| 477 | 
+    #
 | 
|
| 478 | 
+    # Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
 | 
|
| 479 | 
+    # is enough space for the incoming artifact
 | 
|
| 480 | 
+    #
 | 
|
| 481 | 
+    # Args:
 | 
|
| 482 | 
+    #   object_size: The size of the object being received in bytes
 | 
|
| 483 | 
+    #
 | 
|
| 484 | 
+    # Returns:
 | 
|
| 485 | 
+    #   int: The total bytes removed on the filesystem
 | 
|
| 486 | 
+    #
 | 
|
| 487 | 
+    def clean_up(self, object_size):
 | 
|
| 488 | 
+        if self.__has_space(object_size):
 | 
|
| 489 | 
+            return 0
 | 
|
| 490 | 
+  | 
|
| 491 | 
+        with _CacheCleaner.__cleanup_cache_lock:
 | 
|
| 492 | 
+            if self.__has_space(object_size):
 | 
|
| 493 | 
+                # Another thread has done the cleanup for us
 | 
|
| 494 | 
+                return 0
 | 
|
| 495 | 
+  | 
|
| 496 | 
+            stats = os.statvfs(self.__cas.casdir)
 | 
|
| 497 | 
+            target_disk_space = (stats.f_bavail * stats.f_bsize) - self.__max_head_size
 | 
|
| 498 | 
+  | 
|
| 499 | 
+            # obtain a list of LRP artifacts
 | 
|
| 500 | 
+            LRP_objects = self.__cas.list_objects()
 | 
|
| 501 | 
+  | 
|
| 502 | 
+            removed_size = 0  # in bytes
 | 
|
| 503 | 
+            last_mtime = 0
 | 
|
| 504 | 
+  | 
|
| 505 | 
+            while object_size - removed_size > target_disk_space:
 | 
|
| 506 | 
+                try:
 | 
|
| 507 | 
+                    last_mtime, to_remove = LRP_objects.pop(0)  # The first element in the list is the LRP artifact
 | 
|
| 508 | 
+                except IndexError:
 | 
|
| 509 | 
+                    # This exception is caught if there are no more artifacts in the list
 | 
|
| 510 | 
+                    # LRP_artifacts. This means the the artifact is too large for the filesystem
 | 
|
| 511 | 
+                    # so we abort the process
 | 
|
| 512 | 
+                    raise ArtifactTooLargeException("Artifact of size {} is too large for "
 | 
|
| 513 | 
+                                                    "the filesystem which mounts the remote "
 | 
|
| 514 | 
+                                                    "cache".format(object_size))
 | 
|
| 515 | 
+  | 
|
| 516 | 
+                try:
 | 
|
| 517 | 
+                    size = os.stat(to_remove).st_size
 | 
|
| 518 | 
+                    os.unlink(to_remove)
 | 
|
| 519 | 
+                    removed_size += size
 | 
|
| 520 | 
+                except FileNotFoundError:
 | 
|
| 521 | 
+                    pass
 | 
|
| 522 | 
+  | 
|
| 523 | 
+            self.__cas.clean_up_refs_until(last_mtime)
 | 
|
| 524 | 
+  | 
|
| 525 | 
+            if removed_size > 0:
 | 
|
| 526 | 
+                logging.info("Successfully removed {} bytes from the cache".format(removed_size))
 | 
|
| 527 | 
+            else:
 | 
|
| 528 | 
+                logging.info("No artifacts were removed from the cache.")
 | 
|
| 529 | 
+  | 
|
| 530 | 
+            return removed_size
 | 
| ... | ... | @@ -208,6 +208,8 @@ def test_artifact_expires(cli, datafiles, tmpdir): | 
| 208 | 208 | 
     # Create an artifact share (remote artifact cache) in the tmpdir/artifactshare
 | 
| 209 | 209 | 
     # Mock a file system with 12 MB free disk space
 | 
| 210 | 210 | 
     with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'),
 | 
| 211 | 
+                               min_head_size=int(2e9),
 | 
|
| 212 | 
+                               max_head_size=int(2e9),
 | 
|
| 211 | 213 | 
                                total_space=int(10e9), free_space=(int(12e6) + int(2e9))) as share:
 | 
| 212 | 214 | 
 | 
| 213 | 215 | 
         # Configure bst to push to the cache
 | 
| ... | ... | @@ -291,6 +293,8 @@ def test_recently_pulled_artifact_does_not_expire(cli, datafiles, tmpdir): | 
| 291 | 293 | 
     # Create an artifact share (remote cache) in tmpdir/artifactshare
 | 
| 292 | 294 | 
     # Mock a file system with 12 MB free disk space
 | 
| 293 | 295 | 
     with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'),
 | 
| 296 | 
+                               min_head_size=int(2e9),
 | 
|
| 297 | 
+                               max_head_size=int(2e9),
 | 
|
| 294 | 298 | 
                                total_space=int(10e9), free_space=(int(12e6) + int(2e9))) as share:
 | 
| 295 | 299 | 
 | 
| 296 | 300 | 
         # Configure bst to push to the cache
 | 
| ... | ... | @@ -29,7 +29,11 @@ from buildstream._exceptions import ArtifactError | 
| 29 | 29 | 
 #
 | 
| 30 | 30 | 
 class ArtifactShare():
 | 
| 31 | 31 | 
 | 
| 32 | 
-    def __init__(self, directory, *, total_space=None, free_space=None):
 | 
|
| 32 | 
+    def __init__(self, directory, *,
 | 
|
| 33 | 
+                 total_space=None,
 | 
|
| 34 | 
+                 free_space=None,
 | 
|
| 35 | 
+                 min_head_size=int(2e9),
 | 
|
| 36 | 
+                 max_head_size=int(10e9)):
 | 
|
| 33 | 37 | 
 | 
| 34 | 38 | 
         # The working directory for the artifact share (in case it
 | 
| 35 | 39 | 
         # needs to do something outside of it's backend's storage folder).
 | 
| ... | ... | @@ -53,6 +57,9 @@ class ArtifactShare(): | 
| 53 | 57 | 
         self.total_space = total_space
 | 
| 54 | 58 | 
         self.free_space = free_space
 | 
| 55 | 59 | 
 | 
| 60 | 
+        self.max_head_size = max_head_size
 | 
|
| 61 | 
+        self.min_head_size = min_head_size
 | 
|
| 62 | 
+  | 
|
| 56 | 63 | 
         q = Queue()
 | 
| 57 | 64 | 
 | 
| 58 | 65 | 
         self.process = Process(target=self.run, args=(q,))
 | 
| ... | ... | @@ -76,7 +83,10 @@ class ArtifactShare(): | 
| 76 | 83 | 
                 self.free_space = self.total_space
 | 
| 77 | 84 | 
             os.statvfs = self._mock_statvfs
 | 
| 78 | 85 | 
 | 
| 79 | 
-        server = create_server(self.repodir, enable_push=True)
 | 
|
| 86 | 
+        server = create_server(self.repodir,
 | 
|
| 87 | 
+                               max_head_size=self.max_head_size,
 | 
|
| 88 | 
+                               min_head_size=self.min_head_size,
 | 
|
| 89 | 
+                               enable_push=True)
 | 
|
| 80 | 90 | 
         port = server.add_insecure_port('localhost:0')
 | 
| 81 | 91 | 
 | 
| 82 | 92 | 
         server.start()
 | 
| ... | ... | @@ -118,6 +128,15 @@ class ArtifactShare(): | 
| 118 | 128 | 
 | 
| 119 | 129 | 
         try:
 | 
| 120 | 130 | 
             tree = self.cas.resolve_ref(artifact_key)
 | 
| 131 | 
+            reachable = set()
 | 
|
| 132 | 
+            try:
 | 
|
| 133 | 
+                self.cas._reachable_refs_dir(reachable, tree, update_mtime=False)
 | 
|
| 134 | 
+            except FileNotFoundError:
 | 
|
| 135 | 
+                return False
 | 
|
| 136 | 
+            for digest in reachable:
 | 
|
| 137 | 
+                object_name = os.path.join(self.cas.casdir, 'objects', digest[:2], digest[2:])
 | 
|
| 138 | 
+                if not os.path.exists(object_name):
 | 
|
| 139 | 
+                    return False
 | 
|
| 121 | 140 | 
             return True
 | 
| 122 | 141 | 
         except ArtifactError:
 | 
| 123 | 142 | 
             return False
 | 
| ... | ... | @@ -149,8 +168,11 @@ class ArtifactShare(): | 
| 149 | 168 | 
 # Create an ArtifactShare for use in a test case
 | 
| 150 | 169 | 
 #
 | 
| 151 | 170 | 
 @contextmanager
 | 
| 152 | 
-def create_artifact_share(directory, *, total_space=None, free_space=None):
 | 
|
| 153 | 
-    share = ArtifactShare(directory, total_space=total_space, free_space=free_space)
 | 
|
| 171 | 
+def create_artifact_share(directory, *, total_space=None, free_space=None,
 | 
|
| 172 | 
+                          min_head_size=int(2e9),
 | 
|
| 173 | 
+                          max_head_size=int(10e9)):
 | 
|
| 174 | 
+    share = ArtifactShare(directory, total_space=total_space, free_space=free_space,
 | 
|
| 175 | 
+                          min_head_size=min_head_size, max_head_size=max_head_size)
 | 
|
| 154 | 176 | 
     try:
 | 
| 155 | 177 | 
         yield share
 | 
| 156 | 178 | 
     finally:
 | 
