[Notes] [Git][BuildStream/buildstream][valentindavid/cache_server_fill_up] Move cas server from ref-based to object-based garbage collection.



Title: GitLab

Valentin David pushed to branch valentindavid/cache_server_fill_up at BuildStream / buildstream

Commits:

2 changed files:

Changes:

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -593,6 +593,41 @@ class CASCache(ArtifactCache):
    593 593
             # first element of this list will be the file modified earliest.
    
    594 594
             return [ref for _, ref in sorted(zip(mtimes, refs))]
    
    595 595
     
    
    596
    +    # list_objects():
    
    597
    +    #
    
    598
    +    # List cached objects in Least Recently Modified (LRM) order.
    
    599
    +    #
    
    600
    +    # Returns:
    
    601
    +    #     (list) - A list of objects and timestamps in LRM order
    
    602
    +    #
    
    603
    +    def list_objects(self):
    
    604
    +        objs = []
    
    605
    +        mtimes = []
    
    606
    +
    
    607
    +        for root, _, files in os.walk(os.path.join(self.casdir, 'objects')):
    
    608
    +            for filename in files:
    
    609
    +                obj_path = os.path.join(root, filename)
    
    610
    +                try:
    
    611
    +                    mtimes.append(os.path.getmtime(obj_path))
    
    612
    +                except FileNotFoundError:
    
    613
    +                    pass
    
    614
    +                else:
    
    615
    +                    objs.append(obj_path)
    
    616
    +
    
    617
    +        # NOTE: Sorted will sort from earliest to latest, thus the
    
    618
    +        # first element of this list will be the file modified earliest.
    
    619
    +        return sorted(zip(mtimes, objs))
    
    620
    +
    
    621
    +    def clean_up_refs_until(self, time):
    
    622
    +        ref_heads = os.path.join(self.casdir, 'refs', 'heads')
    
    623
    +
    
    624
    +        for root, _, files in os.walk(ref_heads):
    
    625
    +            for filename in files:
    
    626
    +                ref_path = os.path.join(root, filename)
    
    627
    +                # Obtain the mtime (the time a file was last modified)
    
    628
    +                if os.path.getmtime(ref_path) < time:
    
    629
    +                    os.unlink(ref_path)
    
    630
    +
    
    596 631
         # remove():
    
    597 632
         #
    
    598 633
         # Removes the given symbolic ref from the repo.
    

  • buildstream/_artifactcache/casserver.py
    ... ... @@ -24,9 +24,9 @@ import signal
    24 24
     import sys
    
    25 25
     import tempfile
    
    26 26
     import uuid
    
    27
    -import time
    
    28 27
     import errno
    
    29 28
     import ctypes
    
    29
    +import threading
    
    30 30
     
    
    31 31
     import click
    
    32 32
     import grpc
    
    ... ... @@ -62,7 +62,9 @@ class ArtifactTooLargeException(Exception):
    62 62
     #     repo (str): Path to CAS repository
    
    63 63
     #     enable_push (bool): Whether to allow blob uploads and artifact updates
    
    64 64
     #
    
    65
    -def create_server(repo, *, enable_push):
    
    65
    +def create_server(repo, *, enable_push,
    
    66
    +                  max_head_size=int(10e9),
    
    67
    +                  min_head_size=int(2e9)):
    
    66 68
         context = Context()
    
    67 69
         context.artifactdir = os.path.abspath(repo)
    
    68 70
     
    
    ... ... @@ -72,11 +74,13 @@ def create_server(repo, *, enable_push):
    72 74
         max_workers = (os.cpu_count() or 1) * 5
    
    73 75
         server = grpc.server(futures.ThreadPoolExecutor(max_workers))
    
    74 76
     
    
    77
    +    cache_cleaner = _CacheCleaner(artifactcache, max_head_size, min_head_size)
    
    78
    +
    
    75 79
         bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
    
    76
    -        _ByteStreamServicer(artifactcache, enable_push=enable_push), server)
    
    80
    +        _ByteStreamServicer(artifactcache, cache_cleaner, enable_push=enable_push), server)
    
    77 81
     
    
    78 82
         remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
    
    79
    -        _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
    
    83
    +        _ContentAddressableStorageServicer(artifactcache, cache_cleaner, enable_push=enable_push), server)
    
    80 84
     
    
    81 85
         remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
    
    82 86
             _CapabilitiesServicer(), server)
    
    ... ... @@ -94,9 +98,19 @@ def create_server(repo, *, enable_push):
    94 98
     @click.option('--client-certs', help="Public client certificates for TLS (PEM-encoded)")
    
    95 99
     @click.option('--enable-push', default=False, is_flag=True,
    
    96 100
                   help="Allow clients to upload blobs and update artifact cache")
    
    101
    +@click.option('--head-room-min', type=click.INT,
    
    102
    +              help="Disk head room minimum in bytes",
    
    103
    +              default=2e9)
    
    104
    +@click.option('--head-room-max', type=click.INT,
    
    105
    +              help="Disk head room maximum in bytes",
    
    106
    +              default=10e9)
    
    97 107
     @click.argument('repo')
    
    98
    -def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
    
    99
    -    server = create_server(repo, enable_push=enable_push)
    
    108
    +def server_main(repo, port, server_key, server_cert, client_certs, enable_push,
    
    109
    +                head_room_min, head_room_max):
    
    110
    +    server = create_server(repo,
    
    111
    +                           max_head_size=head_room_max,
    
    112
    +                           min_head_size=head_room_min,
    
    113
    +                           enable_push=enable_push)
    
    100 114
     
    
    101 115
         use_tls = bool(server_key)
    
    102 116
     
    
    ... ... @@ -168,11 +182,12 @@ class _FallocateCall:
    168 182
     
    
    169 183
     
    
    170 184
     class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    
    171
    -    def __init__(self, cas, *, enable_push):
    
    185
    +    def __init__(self, cas, cache_cleaner, *, enable_push):
    
    172 186
             super().__init__()
    
    173 187
             self.cas = cas
    
    174 188
             self.enable_push = enable_push
    
    175 189
             self.fallocate = _FallocateCall()
    
    190
    +        self.cache_cleaner = cache_cleaner
    
    176 191
     
    
    177 192
         def Read(self, request, context):
    
    178 193
             resource_name = request.resource_name
    
    ... ... @@ -234,7 +249,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    234 249
                             if client_digest.size_bytes == 0:
    
    235 250
                                 break
    
    236 251
                             try:
    
    237
    -                            _clean_up_cache(self.cas, client_digest.size_bytes)
    
    252
    +                            self.cache_cleaner.clean_up(client_digest.size_bytes)
    
    238 253
                             except ArtifactTooLargeException as e:
    
    239 254
                                 context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
    
    240 255
                                 context.set_details(str(e))
    
    ... ... @@ -280,10 +295,11 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    280 295
     
    
    281 296
     
    
    282 297
     class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
    
    283
    -    def __init__(self, cas, *, enable_push):
    
    298
    +    def __init__(self, cas, cache_cleaner, *, enable_push):
    
    284 299
             super().__init__()
    
    285 300
             self.cas = cas
    
    286 301
             self.enable_push = enable_push
    
    302
    +        self.cache_cleaner = cache_cleaner
    
    287 303
     
    
    288 304
         def FindMissingBlobs(self, request, context):
    
    289 305
             response = remote_execution_pb2.FindMissingBlobsResponse()
    
    ... ... @@ -352,7 +368,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
    352 368
                     continue
    
    353 369
     
    
    354 370
                 try:
    
    355
    -                _clean_up_cache(self.cas, digest.size_bytes)
    
    371
    +                self.cache_cleaner.clean_up(digest.size_bytes)
    
    356 372
     
    
    357 373
                     with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
    
    358 374
                         out.write(blob_request.data)
    
    ... ... @@ -473,63 +489,80 @@ def _digest_from_upload_resource_name(resource_name):
    473 489
             return None
    
    474 490
     
    
    475 491
     
    
    476
    -# _clean_up_cache()
    
    477
    -#
    
    478
    -# Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
    
    479
    -# is enough space for the incoming artifact
    
    480
    -#
    
    481
    -# Args:
    
    482
    -#   cas: CASCache object
    
    483
    -#   object_size: The size of the object being received in bytes
    
    484
    -#
    
    485
    -# Returns:
    
    486
    -#   int: The total bytes removed on the filesystem
    
    487
    -#
    
    488
    -def _clean_up_cache(cas, object_size):
    
    489
    -    # Determine the available disk space, in bytes, of the file system
    
    490
    -    # which mounts the repo
    
    491
    -    stats = os.statvfs(cas.casdir)
    
    492
    -    buffer_ = int(2e9)                # Add a 2 GB buffer
    
    493
    -    free_disk_space = (stats.f_bavail * stats.f_bsize) - buffer_
    
    494
    -    total_disk_space = (stats.f_blocks * stats.f_bsize) - buffer_
    
    495
    -
    
    496
    -    if object_size > total_disk_space:
    
    497
    -        raise ArtifactTooLargeException("Artifact of size: {} is too large for "
    
    498
    -                                        "the filesystem which mounts the remote "
    
    499
    -                                        "cache".format(object_size))
    
    500
    -
    
    501
    -    if object_size <= free_disk_space:
    
    502
    -        # No need to clean up
    
    503
    -        return 0
    
    504
    -
    
    505
    -    # obtain a list of LRP artifacts
    
    506
    -    LRP_artifacts = cas.list_artifacts()
    
    507
    -
    
    508
    -    keep_after = time.time() - _OBJECT_MIN_AGE
    
    509
    -
    
    510
    -    removed_size = 0  # in bytes
    
    511
    -    if object_size - removed_size > free_disk_space:
    
    512
    -        # First we try to see if some unreferenced objects became old
    
    513
    -        # enough to be removed.
    
    514
    -        removed_size += cas.prune(keep_after=keep_after)
    
    515
    -
    
    516
    -    while object_size - removed_size > free_disk_space:
    
    517
    -        try:
    
    518
    -            to_remove = LRP_artifacts.pop(0)  # The first element in the list is the LRP artifact
    
    519
    -        except IndexError:
    
    520
    -            # This exception is caught if there are no more artifacts in the list
    
    521
    -            # LRP_artifacts. This means the the artifact is too large for the filesystem
    
    522
    -            # so we abort the process
    
    523
    -            raise ArtifactTooLargeException("Artifact of size {} is too large for "
    
    524
    -                                            "the filesystem which mounts the remote "
    
    525
    -                                            "cache".format(object_size))
    
    492
    +class _CacheCleaner:
    
    526 493
     
    
    527
    -        cas.remove(to_remove, defer_prune=True)
    
    528
    -        removed_size += cas.prune(keep_after=keep_after)
    
    494
    +    __cleanup_cache_lock = threading.Lock()
    
    529 495
     
    
    530
    -    if removed_size > 0:
    
    531
    -        logging.info("Successfully removed {} bytes from the cache".format(removed_size))
    
    532
    -    else:
    
    533
    -        logging.info("No artifacts were removed from the cache.")
    
    496
    +    def __init__(self, cas, max_head_size, min_head_size=int(2e9)):
    
    497
    +        self.__cas = cas
    
    498
    +        self.__max_head_size = max_head_size
    
    499
    +        self.__min_head_size = min_head_size
    
    500
    +
    
    501
    +    def __has_space(self, object_size):
    
    502
    +        stats = os.statvfs(self.__cas.casdir)
    
    503
    +        free_disk_space = (stats.f_bavail * stats.f_bsize) - self.__min_head_size
    
    504
    +        total_disk_space = (stats.f_blocks * stats.f_bsize) - self.__min_head_size
    
    505
    +
    
    506
    +        if object_size > total_disk_space:
    
    507
    +            raise ArtifactTooLargeException("Artifact of size: {} is too large for "
    
    508
    +                                            "the filesystem which mounts the remote "
    
    509
    +                                            "cache".format(object_size))
    
    534 510
     
    
    535
    -    return removed_size
    511
    +        return object_size <= free_disk_space
    
    512
    +
    
    513
    +    # _clean_up_cache()
    
    514
    +    #
    
    515
    +    # Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
    
    516
    +    # is enough space for the incoming artifact
    
    517
    +    #
    
    518
    +    # Args:
    
    519
    +    #   object_size: The size of the object being received in bytes
    
    520
    +    #
    
    521
    +    # Returns:
    
    522
    +    #   int: The total bytes removed on the filesystem
    
    523
    +    #
    
    524
    +    def clean_up(self, object_size):
    
    525
    +        if self.__has_space(object_size):
    
    526
    +            return 0
    
    527
    +
    
    528
    +        with _CacheCleaner.__cleanup_cache_lock:
    
    529
    +            if self.__has_space(object_size):
    
    530
    +                # Another thread has done the cleanup for us
    
    531
    +                return 0
    
    532
    +
    
    533
    +            stats = os.statvfs(self.__cas.casdir)
    
    534
    +            target_disk_space = (stats.f_bavail * stats.f_bsize) - self.__max_head_size
    
    535
    +
    
    536
    +            # obtain a list of LRP artifacts
    
    537
    +            LRP_objects = self.__cas.list_objects()
    
    538
    +
    
    539
    +            removed_size = 0  # in bytes
    
    540
    +
    
    541
    +            last_mtime = 0
    
    542
    +
    
    543
    +            while object_size - removed_size > target_disk_space:
    
    544
    +                try:
    
    545
    +                    last_mtime, to_remove = LRP_objects.pop(0)  # The first element in the list is the LRP artifact
    
    546
    +                except IndexError:
    
    547
    +                    # This exception is caught if there are no more artifacts in the list
    
    548
    +                    # LRP_artifacts. This means the the artifact is too large for the filesystem
    
    549
    +                    # so we abort the process
    
    550
    +                    raise ArtifactTooLargeException("Artifact of size {} is too large for "
    
    551
    +                                                    "the filesystem which mounts the remote "
    
    552
    +                                                    "cache".format(object_size))
    
    553
    +
    
    554
    +                try:
    
    555
    +                    size = os.stat(to_remove).st_size
    
    556
    +                    os.unlink(to_remove)
    
    557
    +                    removed_size += size
    
    558
    +                except FileNotFoundError:
    
    559
    +                    pass
    
    560
    +
    
    561
    +            self.__cas.clean_up_refs_until(last_mtime)
    
    562
    +
    
    563
    +            if removed_size > 0:
    
    564
    +                logging.info("Successfully removed {} bytes from the cache".format(removed_size))
    
    565
    +            else:
    
    566
    +                logging.info("No artifacts were removed from the cache.")
    
    567
    +
    
    568
    +            return removed_size



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]