[Notes] [Git][BuildStream/buildstream][valentindavid/cache_server_fill_up] Move cas server from ref-based to object-based garbage collection.



Title: GitLab

Valentin David pushed to branch valentindavid/cache_server_fill_up at BuildStream / buildstream

Commits:

2 changed files:

Changes:

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -583,7 +583,7 @@ class CASCache(ArtifactCache):
    583 583
             mtimes = []
    
    584 584
     
    
    585 585
             for root, _, files in os.walk(ref_heads):
    
    586
    -            for filename in files:
    
    586
    +             for filename in files:
    
    587 587
                     ref_path = os.path.join(root, filename)
    
    588 588
                     refs.append(os.path.relpath(ref_path, ref_heads))
    
    589 589
                     # Obtain the mtime (the time a file was last modified)
    
    ... ... @@ -593,6 +593,41 @@ class CASCache(ArtifactCache):
    593 593
             # first element of this list will be the file modified earliest.
    
    594 594
             return [ref for _, ref in sorted(zip(mtimes, refs))]
    
    595 595
     
    
    596
    +    # list_objects():
    
    597
    +    #
    
    598
    +    # List cached objects in Least Recently Modified (LRM) order.
    
    599
    +    #
    
    600
    +    # Returns:
    
    601
    +    #     (list) - A list of objects and timestamps in LRM order
    
    602
    +    #
    
    603
    +    def list_objects(self):
    
    604
    +        objs = []
    
    605
    +        mtimes = []
    
    606
    +
    
    607
    +        for root, _, files in os.walk(os.path.join(self.casdir, 'objects')):
    
    608
    +            for filename in files:
    
    609
    +                obj_path = os.path.join(root, filename)
    
    610
    +                try:
    
    611
    +                    mtimes.append(os.path.getmtime(obj_path))
    
    612
    +                except FileNotFoundError:
    
    613
    +                    pass
    
    614
    +                else:
    
    615
    +                    objs.append(obj_path)
    
    616
    +
    
    617
    +        # NOTE: Sorted will sort from earliest to latest, thus the
    
    618
    +        # first element of this list will be the file modified earliest.
    
    619
    +        return sorted(zip(mtimes, objs))
    
    620
    +
    
    621
    +    def clean_up_refs_until(self, time):
    
    622
    +        ref_heads = os.path.join(self.casdir, 'refs', 'heads')
    
    623
    +
    
    624
    +        for root, _, files in os.walk(ref_heads):
    
    625
    +            for filename in files:
    
    626
    +                ref_path = os.path.join(root, filename)
    
    627
    +                # Obtain the mtime (the time a file was last modified)
    
    628
    +                if os.path.getmtime(ref_path) < time:
    
    629
    +                    os.unlink(ref_path)
    
    630
    +
    
    596 631
         # remove():
    
    597 632
         #
    
    598 633
         # Removes the given symbolic ref from the repo.
    

  • buildstream/_artifactcache/casserver.py
    ... ... @@ -27,6 +27,7 @@ import uuid
    27 27
     import time
    
    28 28
     import errno
    
    29 29
     import ctypes
    
    30
    +import threading
    
    30 31
     
    
    31 32
     import click
    
    32 33
     import grpc
    
    ... ... @@ -62,7 +63,7 @@ class ArtifactTooLargeException(Exception):
    62 63
     #     repo (str): Path to CAS repository
    
    63 64
     #     enable_push (bool): Whether to allow blob uploads and artifact updates
    
    64 65
     #
    
    65
    -def create_server(repo, *, enable_push):
    
    66
    +def create_server(repo, max_head_size, min_head_size, *, enable_push):
    
    66 67
         context = Context()
    
    67 68
         context.artifactdir = os.path.abspath(repo)
    
    68 69
     
    
    ... ... @@ -72,11 +73,13 @@ def create_server(repo, *, enable_push):
    72 73
         max_workers = (os.cpu_count() or 1) * 5
    
    73 74
         server = grpc.server(futures.ThreadPoolExecutor(max_workers))
    
    74 75
     
    
    76
    +    cache_cleaner = _CacheCleaner(artifactcache, max_head_size, min_head_size)
    
    77
    +
    
    75 78
         bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
    
    76
    -        _ByteStreamServicer(artifactcache, enable_push=enable_push), server)
    
    79
    +        _ByteStreamServicer(artifactcache, cache_cleaner, enable_push=enable_push), server)
    
    77 80
     
    
    78 81
         remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
    
    79
    -        _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
    
    82
    +        _ContentAddressableStorageServicer(artifactcache, cache_cleaner, enable_push=enable_push), server)
    
    80 83
     
    
    81 84
         remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
    
    82 85
             _CapabilitiesServicer(), server)
    
    ... ... @@ -94,9 +97,16 @@ def create_server(repo, *, enable_push):
    94 97
     @click.option('--client-certs', help="Public client certificates for TLS (PEM-encoded)")
    
    95 98
     @click.option('--enable-push', default=False, is_flag=True,
    
    96 99
                   help="Allow clients to upload blobs and update artifact cache")
    
    100
    +@click.option('--head-room-min', type=click.INT,
    
    101
    +              help="Disk head room minimum in bytes",
    
    102
    +              default=2e9)
    
    103
    +@click.option('--head-room-max', type=click.INT,
    
    104
    +              help="Disk head room maximum in bytes",
    
    105
    +              default=10e9)
    
    97 106
     @click.argument('repo')
    
    98
    -def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
    
    99
    -    server = create_server(repo, enable_push=enable_push)
    
    107
    +def server_main(repo, port, server_key, server_cert, client_certs, enable_push,
    
    108
    +                head_room_min, head_room_max):
    
    109
    +    server = create_server(repo, head_room_max, head_room_min, enable_push=enable_push)
    
    100 110
     
    
    101 111
         use_tls = bool(server_key)
    
    102 112
     
    
    ... ... @@ -168,11 +178,12 @@ class _FallocateCall:
    168 178
     
    
    169 179
     
    
    170 180
     class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    
    171
    -    def __init__(self, cas, *, enable_push):
    
    181
    +    def __init__(self, cas, cache_cleaner, *, enable_push):
    
    172 182
             super().__init__()
    
    173 183
             self.cas = cas
    
    174 184
             self.enable_push = enable_push
    
    175 185
             self.fallocate = _FallocateCall()
    
    186
    +        self.cache_cleaner = cache_cleaner
    
    176 187
     
    
    177 188
         def Read(self, request, context):
    
    178 189
             resource_name = request.resource_name
    
    ... ... @@ -234,7 +245,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    234 245
                             if client_digest.size_bytes == 0:
    
    235 246
                                 break
    
    236 247
                             try:
    
    237
    -                            _clean_up_cache(self.cas, client_digest.size_bytes)
    
    248
    +                            self.cache_cleaner.clean_up(client_digest.size_bytes)
    
    238 249
                             except ArtifactTooLargeException as e:
    
    239 250
                                 context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
    
    240 251
                                 context.set_details(str(e))
    
    ... ... @@ -280,10 +291,11 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    280 291
     
    
    281 292
     
    
    282 293
     class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
    
    283
    -    def __init__(self, cas, *, enable_push):
    
    294
    +    def __init__(self, cas, cache_cleaner, *, enable_push):
    
    284 295
             super().__init__()
    
    285 296
             self.cas = cas
    
    286 297
             self.enable_push = enable_push
    
    298
    +        self.cache_cleaner = cache_cleaner
    
    287 299
     
    
    288 300
         def FindMissingBlobs(self, request, context):
    
    289 301
             response = remote_execution_pb2.FindMissingBlobsResponse()
    
    ... ... @@ -352,7 +364,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
    352 364
                     continue
    
    353 365
     
    
    354 366
                 try:
    
    355
    -                _clean_up_cache(self.cas, digest.size_bytes)
    
    367
    +                self.cache_cleaner.clean_up(digest.size_bytes)
    
    356 368
     
    
    357 369
                     with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
    
    358 370
                         out.write(blob_request.data)
    
    ... ... @@ -473,63 +485,80 @@ def _digest_from_upload_resource_name(resource_name):
    473 485
             return None
    
    474 486
     
    
    475 487
     
    
    476
    -# _clean_up_cache()
    
    477
    -#
    
    478
    -# Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
    
    479
    -# is enough space for the incoming artifact
    
    480
    -#
    
    481
    -# Args:
    
    482
    -#   cas: CASCache object
    
    483
    -#   object_size: The size of the object being received in bytes
    
    484
    -#
    
    485
    -# Returns:
    
    486
    -#   int: The total bytes removed on the filesystem
    
    487
    -#
    
    488
    -def _clean_up_cache(cas, object_size):
    
    489
    -    # Determine the available disk space, in bytes, of the file system
    
    490
    -    # which mounts the repo
    
    491
    -    stats = os.statvfs(cas.casdir)
    
    492
    -    buffer_ = int(2e9)                # Add a 2 GB buffer
    
    493
    -    free_disk_space = (stats.f_bavail * stats.f_bsize) - buffer_
    
    494
    -    total_disk_space = (stats.f_blocks * stats.f_bsize) - buffer_
    
    495
    -
    
    496
    -    if object_size > total_disk_space:
    
    497
    -        raise ArtifactTooLargeException("Artifact of size: {} is too large for "
    
    498
    -                                        "the filesystem which mounts the remote "
    
    499
    -                                        "cache".format(object_size))
    
    500
    -
    
    501
    -    if object_size <= free_disk_space:
    
    502
    -        # No need to clean up
    
    503
    -        return 0
    
    504
    -
    
    505
    -    # obtain a list of LRP artifacts
    
    506
    -    LRP_artifacts = cas.list_artifacts()
    
    507
    -
    
    508
    -    keep_after = time.time() - _OBJECT_MIN_AGE
    
    509
    -
    
    510
    -    removed_size = 0  # in bytes
    
    511
    -    if object_size - removed_size > free_disk_space:
    
    512
    -        # First we try to see if some unreferenced objects became old
    
    513
    -        # enough to be removed.
    
    514
    -        removed_size += cas.prune(keep_after=keep_after)
    
    515
    -
    
    516
    -    while object_size - removed_size > free_disk_space:
    
    517
    -        try:
    
    518
    -            to_remove = LRP_artifacts.pop(0)  # The first element in the list is the LRP artifact
    
    519
    -        except IndexError:
    
    520
    -            # This exception is caught if there are no more artifacts in the list
    
    521
    -            # LRP_artifacts. This means the the artifact is too large for the filesystem
    
    522
    -            # so we abort the process
    
    523
    -            raise ArtifactTooLargeException("Artifact of size {} is too large for "
    
    524
    -                                            "the filesystem which mounts the remote "
    
    525
    -                                            "cache".format(object_size))
    
    488
    +class _CacheCleaner:
    
    526 489
     
    
    527
    -        cas.remove(to_remove, defer_prune=True)
    
    528
    -        removed_size += cas.prune(keep_after=keep_after)
    
    490
    +    __cleanup_cache_lock = threading.Lock()
    
    529 491
     
    
    530
    -    if removed_size > 0:
    
    531
    -        logging.info("Successfully removed {} bytes from the cache".format(removed_size))
    
    532
    -    else:
    
    533
    -        logging.info("No artifacts were removed from the cache.")
    
    492
    +    def __init__(self, cas, max_head_size, min_head_size = int(2e9)):
    
    493
    +        self.__cas = cas
    
    494
    +        self.__max_head_size = max_head_size
    
    495
    +        self.__min_head_size = min_head_size
    
    496
    +
    
    497
    +    def __has_space(self, object_size):
    
    498
    +        stats = os.statvfs(self.__cas.casdir)
    
    499
    +        free_disk_space = (stats.f_bavail * stats.f_bsize) - self.__min_head_size
    
    500
    +        total_disk_space = (stats.f_blocks * stats.f_bsize) - self.__min_head_size
    
    501
    +
    
    502
    +        if object_size > total_disk_space:
    
    503
    +            raise ArtifactTooLargeException("Artifact of size: {} is too large for "
    
    504
    +                                            "the filesystem which mounts the remote "
    
    505
    +                                            "cache".format(object_size))
    
    534 506
     
    
    535
    -    return removed_size
    507
    +        return object_size <= free_disk_space
    
    508
    +
    
    509
    +    # _clean_up_cache()
    
    510
    +    #
    
    511
    +    # Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
    
    512
    +    # is enough space for the incoming artifact
    
    513
    +    #
    
    514
    +    # Args:
    
    515
    +    #   object_size: The size of the object being received in bytes
    
    516
    +    #
    
    517
    +    # Returns:
    
    518
    +    #   int: The total bytes removed on the filesystem
    
    519
    +    #
    
    520
    +    def clean_up(self, object_size):
    
    521
    +        if self.__has_space(object_size):
    
    522
    +            return 0
    
    523
    +
    
    524
    +        with _CacheCleaner.__cleanup_cache_lock:
    
    525
    +            if self.__has_space(object_size):
    
    526
    +                # Another thread has done the cleanup for us
    
    527
    +                return 0
    
    528
    +
    
    529
    +            stats = os.statvfs(self.__cas.casdir)
    
    530
    +            target_disk_space = (stats.f_bavail * stats.f_bsize) - self.__max_head_size
    
    531
    +
    
    532
    +            # obtain a list of LRP artifacts
    
    533
    +            LRP_objects = self.__cas.list_objects()
    
    534
    +
    
    535
    +            removed_size = 0  # in bytes
    
    536
    +
    
    537
    +            last_mtime = 0
    
    538
    +
    
    539
    +            while object_size - removed_size > target_disk_space:
    
    540
    +                try:
    
    541
    +                    last_mtime, to_remove = LRP_objects.pop(0)  # The first element in the list is the LRP artifact
    
    542
    +                except IndexError:
    
    543
    +                    # This exception is caught if there are no more artifacts in the list
    
    544
    +                    # LRP_artifacts. This means the the artifact is too large for the filesystem
    
    545
    +                    # so we abort the process
    
    546
    +                    raise ArtifactTooLargeException("Artifact of size {} is too large for "
    
    547
    +                                                    "the filesystem which mounts the remote "
    
    548
    +                                                    "cache".format(object_size))
    
    549
    +
    
    550
    +                try:
    
    551
    +                    size = os.stat(to_remove).st_size
    
    552
    +                    os.unlink(to_remove)
    
    553
    +                    removed_size += size
    
    554
    +                except FileNotFoundError:
    
    555
    +                    pass
    
    556
    +
    
    557
    +            self.__cas.clean_up_refs_until(last_mtime)
    
    558
    +
    
    559
    +            if removed_size > 0:
    
    560
    +                logging.info("Successfully removed {} bytes from the cache".format(removed_size))
    
    561
    +            else:
    
    562
    +                logging.info("No artifacts were removed from the cache.")
    
    563
    +
    
    564
    +            return removed_size



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]