[Notes] [Git][BuildStream/buildstream][valentindavid/cache_server_fill_up] Move cas server from ref-based to object-based garbage collection.



Title: GitLab

Valentin David pushed to branch valentindavid/cache_server_fill_up at BuildStream / buildstream

Commits:

4 changed files:

Changes:

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -593,6 +593,41 @@ class CASCache(ArtifactCache):
    593 593
             # first element of this list will be the file modified earliest.
    
    594 594
             return [ref for _, ref in sorted(zip(mtimes, refs))]
    
    595 595
     
    
    596
    +    # list_objects():
    
    597
    +    #
    
    598
    +    # List cached objects in Least Recently Modified (LRM) order.
    
    599
    +    #
    
    600
    +    # Returns:
    
    601
    +    #     (list) - A list of objects and timestamps in LRM order
    
    602
    +    #
    
    603
    +    def list_objects(self):
    
    604
    +        objs = []
    
    605
    +        mtimes = []
    
    606
    +
    
    607
    +        for root, _, files in os.walk(os.path.join(self.casdir, 'objects')):
    
    608
    +            for filename in files:
    
    609
    +                obj_path = os.path.join(root, filename)
    
    610
    +                try:
    
    611
    +                    mtimes.append(os.path.getmtime(obj_path))
    
    612
    +                except FileNotFoundError:
    
    613
    +                    pass
    
    614
    +                else:
    
    615
    +                    objs.append(obj_path)
    
    616
    +
    
    617
    +        # NOTE: Sorted will sort from earliest to latest, thus the
    
    618
    +        # first element of this list will be the file modified earliest.
    
    619
    +        return sorted(zip(mtimes, objs))
    
    620
    +
    
    621
    +    def clean_up_refs_until(self, time):
    
    622
    +        ref_heads = os.path.join(self.casdir, 'refs', 'heads')
    
    623
    +
    
    624
    +        for root, _, files in os.walk(ref_heads):
    
    625
    +            for filename in files:
    
    626
    +                ref_path = os.path.join(root, filename)
    
    627
    +                # Obtain the mtime (the time a file was last modified)
    
    628
    +                if os.path.getmtime(ref_path) < time:
    
    629
    +                    os.unlink(ref_path)
    
    630
    +
    
    596 631
         # remove():
    
    597 632
         #
    
    598 633
         # Removes the given symbolic ref from the repo.
    
    ... ... @@ -831,6 +866,9 @@ class CASCache(ArtifactCache):
    831 866
             if tree.hash in reachable:
    
    832 867
                 return
    
    833 868
     
    
    869
    +        if update_mtime:
    
    870
    +            os.utime(self.objpath(tree))
    
    871
    +
    
    834 872
             reachable.add(tree.hash)
    
    835 873
     
    
    836 874
             directory = remote_execution_pb2.Directory()
    
    ... ... @@ -844,8 +882,6 @@ class CASCache(ArtifactCache):
    844 882
                 reachable.add(filenode.digest.hash)
    
    845 883
     
    
    846 884
             for dirnode in directory.directories:
    
    847
    -            if update_mtime:
    
    848
    -                os.utime(self.objpath(dirnode.digest))
    
    849 885
                 self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime)
    
    850 886
     
    
    851 887
         def _initialize_remote(self, remote_spec, q):
    

  • buildstream/_artifactcache/casserver.py
    ... ... @@ -24,9 +24,9 @@ import signal
    24 24
     import sys
    
    25 25
     import tempfile
    
    26 26
     import uuid
    
    27
    -import time
    
    28 27
     import errno
    
    29 28
     import ctypes
    
    29
    +import threading
    
    30 30
     
    
    31 31
     import click
    
    32 32
     import grpc
    
    ... ... @@ -62,7 +62,9 @@ class ArtifactTooLargeException(Exception):
    62 62
     #     repo (str): Path to CAS repository
    
    63 63
     #     enable_push (bool): Whether to allow blob uploads and artifact updates
    
    64 64
     #
    
    65
    -def create_server(repo, *, enable_push):
    
    65
    +def create_server(repo, *, enable_push,
    
    66
    +                  max_head_size=int(10e9),
    
    67
    +                  min_head_size=int(2e9)):
    
    66 68
         context = Context()
    
    67 69
         context.artifactdir = os.path.abspath(repo)
    
    68 70
     
    
    ... ... @@ -72,11 +74,13 @@ def create_server(repo, *, enable_push):
    72 74
         max_workers = (os.cpu_count() or 1) * 5
    
    73 75
         server = grpc.server(futures.ThreadPoolExecutor(max_workers))
    
    74 76
     
    
    77
    +    cache_cleaner = _CacheCleaner(artifactcache, max_head_size, min_head_size)
    
    78
    +
    
    75 79
         bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
    
    76
    -        _ByteStreamServicer(artifactcache, enable_push=enable_push), server)
    
    80
    +        _ByteStreamServicer(artifactcache, cache_cleaner, enable_push=enable_push), server)
    
    77 81
     
    
    78 82
         remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
    
    79
    -        _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
    
    83
    +        _ContentAddressableStorageServicer(artifactcache, cache_cleaner, enable_push=enable_push), server)
    
    80 84
     
    
    81 85
         remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
    
    82 86
             _CapabilitiesServicer(), server)
    
    ... ... @@ -94,9 +98,19 @@ def create_server(repo, *, enable_push):
    94 98
     @click.option('--client-certs', help="Public client certificates for TLS (PEM-encoded)")
    
    95 99
     @click.option('--enable-push', default=False, is_flag=True,
    
    96 100
                   help="Allow clients to upload blobs and update artifact cache")
    
    101
    +@click.option('--head-room-min', type=click.INT,
    
    102
    +              help="Disk head room minimum in bytes",
    
    103
    +              default=2e9)
    
    104
    +@click.option('--head-room-max', type=click.INT,
    
    105
    +              help="Disk head room maximum in bytes",
    
    106
    +              default=10e9)
    
    97 107
     @click.argument('repo')
    
    98
    -def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
    
    99
    -    server = create_server(repo, enable_push=enable_push)
    
    108
    +def server_main(repo, port, server_key, server_cert, client_certs, enable_push,
    
    109
    +                head_room_min, head_room_max):
    
    110
    +    server = create_server(repo,
    
    111
    +                           max_head_size=head_room_max,
    
    112
    +                           min_head_size=head_room_min,
    
    113
    +                           enable_push=enable_push)
    
    100 114
     
    
    101 115
         use_tls = bool(server_key)
    
    102 116
     
    
    ... ... @@ -168,11 +182,12 @@ class _FallocateCall:
    168 182
     
    
    169 183
     
    
    170 184
     class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    
    171
    -    def __init__(self, cas, *, enable_push):
    
    185
    +    def __init__(self, cas, cache_cleaner, *, enable_push):
    
    172 186
             super().__init__()
    
    173 187
             self.cas = cas
    
    174 188
             self.enable_push = enable_push
    
    175 189
             self.fallocate = _FallocateCall()
    
    190
    +        self.cache_cleaner = cache_cleaner
    
    176 191
     
    
    177 192
         def Read(self, request, context):
    
    178 193
             resource_name = request.resource_name
    
    ... ... @@ -234,7 +249,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    234 249
                             if client_digest.size_bytes == 0:
    
    235 250
                                 break
    
    236 251
                             try:
    
    237
    -                            _clean_up_cache(self.cas, client_digest.size_bytes)
    
    252
    +                            self.cache_cleaner.clean_up(client_digest.size_bytes)
    
    238 253
                             except ArtifactTooLargeException as e:
    
    239 254
                                 context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
    
    240 255
                                 context.set_details(str(e))
    
    ... ... @@ -280,10 +295,11 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    280 295
     
    
    281 296
     
    
    282 297
     class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
    
    283
    -    def __init__(self, cas, *, enable_push):
    
    298
    +    def __init__(self, cas, cache_cleaner, *, enable_push):
    
    284 299
             super().__init__()
    
    285 300
             self.cas = cas
    
    286 301
             self.enable_push = enable_push
    
    302
    +        self.cache_cleaner = cache_cleaner
    
    287 303
     
    
    288 304
         def FindMissingBlobs(self, request, context):
    
    289 305
             response = remote_execution_pb2.FindMissingBlobsResponse()
    
    ... ... @@ -352,7 +368,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
    352 368
                     continue
    
    353 369
     
    
    354 370
                 try:
    
    355
    -                _clean_up_cache(self.cas, digest.size_bytes)
    
    371
    +                self.cache_cleaner.clean_up(digest.size_bytes)
    
    356 372
     
    
    357 373
                     with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
    
    358 374
                         out.write(blob_request.data)
    
    ... ... @@ -473,63 +489,80 @@ def _digest_from_upload_resource_name(resource_name):
    473 489
             return None
    
    474 490
     
    
    475 491
     
    
    476
    -# _clean_up_cache()
    
    477
    -#
    
    478
    -# Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
    
    479
    -# is enough space for the incoming artifact
    
    480
    -#
    
    481
    -# Args:
    
    482
    -#   cas: CASCache object
    
    483
    -#   object_size: The size of the object being received in bytes
    
    484
    -#
    
    485
    -# Returns:
    
    486
    -#   int: The total bytes removed on the filesystem
    
    487
    -#
    
    488
    -def _clean_up_cache(cas, object_size):
    
    489
    -    # Determine the available disk space, in bytes, of the file system
    
    490
    -    # which mounts the repo
    
    491
    -    stats = os.statvfs(cas.casdir)
    
    492
    -    buffer_ = int(2e9)                # Add a 2 GB buffer
    
    493
    -    free_disk_space = (stats.f_bavail * stats.f_bsize) - buffer_
    
    494
    -    total_disk_space = (stats.f_blocks * stats.f_bsize) - buffer_
    
    495
    -
    
    496
    -    if object_size > total_disk_space:
    
    497
    -        raise ArtifactTooLargeException("Artifact of size: {} is too large for "
    
    498
    -                                        "the filesystem which mounts the remote "
    
    499
    -                                        "cache".format(object_size))
    
    500
    -
    
    501
    -    if object_size <= free_disk_space:
    
    502
    -        # No need to clean up
    
    503
    -        return 0
    
    504
    -
    
    505
    -    # obtain a list of LRP artifacts
    
    506
    -    LRP_artifacts = cas.list_artifacts()
    
    507
    -
    
    508
    -    keep_after = time.time() - _OBJECT_MIN_AGE
    
    509
    -
    
    510
    -    removed_size = 0  # in bytes
    
    511
    -    if object_size - removed_size > free_disk_space:
    
    512
    -        # First we try to see if some unreferenced objects became old
    
    513
    -        # enough to be removed.
    
    514
    -        removed_size += cas.prune(keep_after=keep_after)
    
    515
    -
    
    516
    -    while object_size - removed_size > free_disk_space:
    
    517
    -        try:
    
    518
    -            to_remove = LRP_artifacts.pop(0)  # The first element in the list is the LRP artifact
    
    519
    -        except IndexError:
    
    520
    -            # This exception is caught if there are no more artifacts in the list
    
    521
    -            # LRP_artifacts. This means the the artifact is too large for the filesystem
    
    522
    -            # so we abort the process
    
    523
    -            raise ArtifactTooLargeException("Artifact of size {} is too large for "
    
    524
    -                                            "the filesystem which mounts the remote "
    
    525
    -                                            "cache".format(object_size))
    
    492
    +class _CacheCleaner:
    
    526 493
     
    
    527
    -        cas.remove(to_remove, defer_prune=True)
    
    528
    -        removed_size += cas.prune(keep_after=keep_after)
    
    494
    +    __cleanup_cache_lock = threading.Lock()
    
    529 495
     
    
    530
    -    if removed_size > 0:
    
    531
    -        logging.info("Successfully removed {} bytes from the cache".format(removed_size))
    
    532
    -    else:
    
    533
    -        logging.info("No artifacts were removed from the cache.")
    
    496
    +    def __init__(self, cas, max_head_size, min_head_size=int(2e9)):
    
    497
    +        self.__cas = cas
    
    498
    +        self.__max_head_size = max_head_size
    
    499
    +        self.__min_head_size = min_head_size
    
    500
    +
    
    501
    +    def __has_space(self, object_size):
    
    502
    +        stats = os.statvfs(self.__cas.casdir)
    
    503
    +        free_disk_space = (stats.f_bavail * stats.f_bsize) - self.__min_head_size
    
    504
    +        total_disk_space = (stats.f_blocks * stats.f_bsize) - self.__min_head_size
    
    505
    +
    
    506
    +        if object_size > total_disk_space:
    
    507
    +            raise ArtifactTooLargeException("Artifact of size: {} is too large for "
    
    508
    +                                            "the filesystem which mounts the remote "
    
    509
    +                                            "cache".format(object_size))
    
    534 510
     
    
    535
    -    return removed_size
    511
    +        return object_size <= free_disk_space
    
    512
    +
    
    513
    +    # _clean_up_cache()
    
    514
    +    #
    
    515
    +    # Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
    
    516
    +    # is enough space for the incoming artifact
    
    517
    +    #
    
    518
    +    # Args:
    
    519
    +    #   object_size: The size of the object being received in bytes
    
    520
    +    #
    
    521
    +    # Returns:
    
    522
    +    #   int: The total bytes removed on the filesystem
    
    523
    +    #
    
    524
    +    def clean_up(self, object_size):
    
    525
    +        if self.__has_space(object_size):
    
    526
    +            return 0
    
    527
    +
    
    528
    +        with _CacheCleaner.__cleanup_cache_lock:
    
    529
    +            if self.__has_space(object_size):
    
    530
    +                # Another thread has done the cleanup for us
    
    531
    +                return 0
    
    532
    +
    
    533
    +            stats = os.statvfs(self.__cas.casdir)
    
    534
    +            target_disk_space = (stats.f_bavail * stats.f_bsize) - self.__max_head_size
    
    535
    +
    
    536
    +            # obtain a list of LRP artifacts
    
    537
    +            LRP_objects = self.__cas.list_objects()
    
    538
    +
    
    539
    +            removed_size = 0  # in bytes
    
    540
    +
    
    541
    +            last_mtime = 0
    
    542
    +
    
    543
    +            while object_size - removed_size > target_disk_space:
    
    544
    +                try:
    
    545
    +                    last_mtime, to_remove = LRP_objects.pop(0)  # The first element in the list is the LRP artifact
    
    546
    +                except IndexError:
    
    547
    +                    # This exception is caught if there are no more artifacts in the list
    
    548
    +                    # LRP_artifacts. This means the the artifact is too large for the filesystem
    
    549
    +                    # so we abort the process
    
    550
    +                    raise ArtifactTooLargeException("Artifact of size {} is too large for "
    
    551
    +                                                    "the filesystem which mounts the remote "
    
    552
    +                                                    "cache".format(object_size))
    
    553
    +
    
    554
    +                try:
    
    555
    +                    size = os.stat(to_remove).st_size
    
    556
    +                    os.unlink(to_remove)
    
    557
    +                    removed_size += size
    
    558
    +                except FileNotFoundError:
    
    559
    +                    pass
    
    560
    +
    
    561
    +            self.__cas.clean_up_refs_until(last_mtime)
    
    562
    +
    
    563
    +            if removed_size > 0:
    
    564
    +                logging.info("Successfully removed {} bytes from the cache".format(removed_size))
    
    565
    +            else:
    
    566
    +                logging.info("No artifacts were removed from the cache.")
    
    567
    +
    
    568
    +            return removed_size

  • tests/frontend/push.py
    ... ... @@ -230,6 +230,8 @@ def test_artifact_expires(cli, datafiles, tmpdir):
    230 230
         # Create an artifact share (remote artifact cache) in the tmpdir/artifactshare
    
    231 231
         # Mock a file system with 12 MB free disk space
    
    232 232
         with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'),
    
    233
    +                               min_head_size=int(2e9),
    
    234
    +                               max_head_size=int(2e9),
    
    233 235
                                    total_space=int(10e9), free_space=(int(12e6) + int(2e9))) as share:
    
    234 236
     
    
    235 237
             # Configure bst to push to the cache
    
    ... ... @@ -253,8 +255,6 @@ def test_artifact_expires(cli, datafiles, tmpdir):
    253 255
             assert cli.get_element_state(project, 'element2.bst') == 'cached'
    
    254 256
             assert_shared(cli, share, project, 'element2.bst')
    
    255 257
     
    
    256
    -        share.make_all_objects_older()
    
    257
    -
    
    258 258
             # Create and build another element of 5 MB (This will exceed the free disk space available)
    
    259 259
             create_element_size('element3.bst', project, element_path, [], int(5e6))
    
    260 260
             result = cli.run(project=project, args=['build', 'element3.bst'])
    
    ... ... @@ -315,6 +315,8 @@ def test_recently_pulled_artifact_does_not_expire(cli, datafiles, tmpdir):
    315 315
         # Create an artifact share (remote cache) in tmpdir/artifactshare
    
    316 316
         # Mock a file system with 12 MB free disk space
    
    317 317
         with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'),
    
    318
    +                               min_head_size=int(2e9),
    
    319
    +                               max_head_size=int(2e9),
    
    318 320
                                    total_space=int(10e9), free_space=(int(12e6) + int(2e9))) as share:
    
    319 321
     
    
    320 322
             # Configure bst to push to the cache
    
    ... ... @@ -352,7 +354,6 @@ def test_recently_pulled_artifact_does_not_expire(cli, datafiles, tmpdir):
    352 354
             assert cli.get_element_state(project, 'element1.bst') == 'cached'
    
    353 355
     
    
    354 356
             wait_for_cache_granularity()
    
    355
    -        share.make_all_objects_older()
    
    356 357
     
    
    357 358
             # Create and build the element3 (of 5 MB)
    
    358 359
             create_element_size('element3.bst', project, element_path, [], int(5e6))
    

  • tests/testutils/artifactshare.py
    ... ... @@ -29,7 +29,11 @@ from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution
    29 29
     #
    
    30 30
     class ArtifactShare():
    
    31 31
     
    
    32
    -    def __init__(self, directory, *, total_space=None, free_space=None):
    
    32
    +    def __init__(self, directory, *,
    
    33
    +                 total_space=None,
    
    34
    +                 free_space=None,
    
    35
    +                 min_head_size=int(2e9),
    
    36
    +                 max_head_size=int(10e9)):
    
    33 37
     
    
    34 38
             # The working directory for the artifact share (in case it
    
    35 39
             # needs to do something outside of its backend's storage folder).
    
    ... ... @@ -53,6 +57,9 @@ class ArtifactShare():
    53 57
             self.total_space = total_space
    
    54 58
             self.free_space = free_space
    
    55 59
     
    
    60
    +        self.max_head_size = max_head_size
    
    61
    +        self.min_head_size = min_head_size
    
    62
    +
    
    56 63
             q = Queue()
    
    57 64
     
    
    58 65
             self.process = Process(target=self.run, args=(q,))
    
    ... ... @@ -76,7 +83,10 @@ class ArtifactShare():
    76 83
                     self.free_space = self.total_space
    
    77 84
                 os.statvfs = self._mock_statvfs
    
    78 85
     
    
    79
    -        server = create_server(self.repodir, enable_push=True)
    
    86
    +        server = create_server(self.repodir,
    
    87
    +                               max_head_size=self.max_head_size,
    
    88
    +                               min_head_size=self.min_head_size,
    
    89
    +                               enable_push=True)
    
    80 90
             port = server.add_insecure_port('localhost:0')
    
    81 91
     
    
    82 92
             server.start()
    
    ... ... @@ -134,19 +144,19 @@ class ArtifactShare():
    134 144
     
    
    135 145
             try:
    
    136 146
                 tree = self.cas.resolve_ref(artifact_key)
    
    147
    +            reachable = set()
    
    148
    +            try:
    
    149
    +                self.cas._reachable_refs_dir(reachable, tree, update_mtime=False)
    
    150
    +            except FileNotFoundError:
    
    151
    +                return False
    
    152
    +            for digest in reachable:
    
    153
    +                object_name = os.path.join(self.cas.casdir, 'objects', digest[:2], digest[2:])
    
    154
    +                if not os.path.exists(object_name):
    
    155
    +                    return False
    
    137 156
                 return True
    
    138 157
             except ArtifactError:
    
    139 158
                 return False
    
    140 159
     
    
    141
    -    def make_all_objects_older(self):
    
    142
    -        for root, dirs, files in os.walk(os.path.join(self.cas.casdir, 'objects')):
    
    143
    -            for name in files:
    
    144
    -                fullname = os.path.join(root, name)
    
    145
    -                st = os.stat(fullname)
    
    146
    -                mtime = st.st_mtime - 6 * 60 * 60
    
    147
    -                atime = st.st_atime - 6 * 60 * 60
    
    148
    -                os.utime(fullname, times=(atime, mtime))
    
    149
    -
    
    150 160
         # close():
    
    151 161
         #
    
    152 162
         # Remove the artifact share.
    
    ... ... @@ -174,8 +184,11 @@ class ArtifactShare():
    174 184
     # Create an ArtifactShare for use in a test case
    
    175 185
     #
    
    176 186
     @contextmanager
    
    177
    -def create_artifact_share(directory, *, total_space=None, free_space=None):
    
    178
    -    share = ArtifactShare(directory, total_space=total_space, free_space=free_space)
    
    187
    +def create_artifact_share(directory, *, total_space=None, free_space=None,
    
    188
    +                          min_head_size=int(2e9),
    
    189
    +                          max_head_size=int(10e9)):
    
    190
    +    share = ArtifactShare(directory, total_space=total_space, free_space=free_space,
    
    191
    +                          min_head_size=min_head_size, max_head_size=max_head_size)
    
    179 192
         try:
    
    180 193
             yield share
    
    181 194
         finally:
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]