[Notes] [Git][BuildStream/buildstream][valentindavid/cache_server_fill_up] 15 commits: BREAK: manual.yaml: don't set any default env vars



Title: GitLab

Valentin David pushed to branch valentindavid/cache_server_fill_up at BuildStream / buildstream

Commits:

8 changed files:

Changes:

  • NEWS
    ... ... @@ -2,6 +2,12 @@
    2 2
     buildstream 1.3.1
    
    3 3
     =================
    
    4 4
     
    
    5
    +  o BREAKING CHANGE: The 'manual' element lost its default 'MAKEFLAGS' and 'V'
    
    6
    +    environment variables. There is already a 'make' element with the same
    
    7
    +    variables. Note that this is a breaking change, it will require users to
    
    8
    +    make changes to their .bst files if they are expecting these environment
    
    9
    +    variables to be set.
    
    10
    +
    
    5 11
       o Failed builds are included in the cache as well.
    
    6 12
         `bst checkout` will provide anything in `%{install-root}`.
    
    7 13
         A build including cached fails will cause any dependant elements
    

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -27,6 +27,7 @@ import stat
    27 27
     import tempfile
    
    28 28
     import uuid
    
    29 29
     import errno
    
    30
    +import contextlib
    
    30 31
     from urllib.parse import urlparse
    
    31 32
     
    
    32 33
     import grpc
    
    ... ... @@ -49,6 +50,13 @@ from . import ArtifactCache
    49 50
     _MAX_PAYLOAD_BYTES = 1024 * 1024
    
    50 51
     
    
    51 52
     
    
    53
    +class BlobNotFound(ArtifactError):
    
    54
    +
    
    55
    +    def __init__(self, blob, msg):
    
    56
    +        self.blob = blob
    
    57
    +        super().__init__(msg)
    
    58
    +
    
    59
    +
    
    52 60
     # A CASCache manages artifacts in a CAS repository as specified in the
    
    53 61
     # Remote Execution API.
    
    54 62
     #
    
    ... ... @@ -264,6 +272,10 @@ class CASCache(ArtifactCache):
    264 272
                         element.info("Remote ({}) does not have {} cached".format(
    
    265 273
                             remote.spec.url, element._get_brief_display_key()
    
    266 274
                         ))
    
    275
    +            except BlobNotFound as e:
    
    276
    +                element.info("Remote ({}) does not have {} cached (blob {} missing)".format(
    
    277
    +                    remote.spec.url, element._get_brief_display_key(), e.blob
    
    278
    +                ))
    
    267 279
     
    
    268 280
             return False
    
    269 281
     
    
    ... ... @@ -452,13 +464,14 @@ class CASCache(ArtifactCache):
    452 464
         #     digest (Digest): An optional Digest object to populate
    
    453 465
         #     path (str): Path to file to add
    
    454 466
         #     buffer (bytes): Byte buffer to add
    
    467
    +    #     link_directly (bool): Whether file given by path can be linked
    
    455 468
         #
    
    456 469
         # Returns:
    
    457 470
         #     (Digest): The digest of the added object
    
    458 471
         #
    
    459 472
         # Either `path` or `buffer` must be passed, but not both.
    
    460 473
         #
    
    461
    -    def add_object(self, *, digest=None, path=None, buffer=None):
    
    474
    +    def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False):
    
    462 475
             # Exactly one of the two parameters has to be specified
    
    463 476
             assert (path is None) != (buffer is None)
    
    464 477
     
    
    ... ... @@ -468,28 +481,34 @@ class CASCache(ArtifactCache):
    468 481
             try:
    
    469 482
                 h = hashlib.sha256()
    
    470 483
                 # Always write out new file to avoid corruption if input file is modified
    
    471
    -            with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
    
    472
    -                # Set mode bits to 0644
    
    473
    -                os.chmod(out.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
    
    474
    -
    
    475
    -                if path:
    
    476
    -                    with open(path, 'rb') as f:
    
    477
    -                        for chunk in iter(lambda: f.read(4096), b""):
    
    478
    -                            h.update(chunk)
    
    479
    -                            out.write(chunk)
    
    484
    +            with contextlib.ExitStack() as stack:
    
    485
    +                if path is not None and link_directly:
    
    486
    +                    tmp = stack.enter_context(open(path, 'rb'))
    
    487
    +                    for chunk in iter(lambda: tmp.read(4096), b""):
    
    488
    +                        h.update(chunk)
    
    480 489
                     else:
    
    481
    -                    h.update(buffer)
    
    482
    -                    out.write(buffer)
    
    490
    +                    tmp = stack.enter_context(tempfile.NamedTemporaryFile(dir=self.tmpdir))
    
    491
    +                    # Set mode bits to 0644
    
    492
    +                    os.chmod(tmp.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
    
    483 493
     
    
    484
    -                out.flush()
    
    494
    +                    if path:
    
    495
    +                        with open(path, 'rb') as f:
    
    496
    +                            for chunk in iter(lambda: f.read(4096), b""):
    
    497
    +                                h.update(chunk)
    
    498
    +                                tmp.write(chunk)
    
    499
    +                    else:
    
    500
    +                        h.update(buffer)
    
    501
    +                        tmp.write(buffer)
    
    502
    +
    
    503
    +                    tmp.flush()
    
    485 504
     
    
    486 505
                     digest.hash = h.hexdigest()
    
    487
    -                digest.size_bytes = os.fstat(out.fileno()).st_size
    
    506
    +                digest.size_bytes = os.fstat(tmp.fileno()).st_size
    
    488 507
     
    
    489 508
                     # Place file at final location
    
    490 509
                     objpath = self.objpath(digest)
    
    491 510
                     os.makedirs(os.path.dirname(objpath), exist_ok=True)
    
    492
    -                os.link(out.name, objpath)
    
    511
    +                os.link(tmp.name, objpath)
    
    493 512
     
    
    494 513
             except FileExistsError as e:
    
    495 514
                 # We can ignore the failed link() if the object is already in the repo.
    
    ... ... @@ -574,6 +593,41 @@ class CASCache(ArtifactCache):
    574 593
             # first element of this list will be the file modified earliest.
    
    575 594
             return [ref for _, ref in sorted(zip(mtimes, refs))]
    
    576 595
     
    
    596
    +    # list_objects():
    
    597
    +    #
    
    598
    +    # List cached objects in Least Recently Modified (LRM) order.
    
    599
    +    #
    
    600
    +    # Returns:
    
    601
    +    #     (list) - A list of objects and timestamps in LRM order
    
    602
    +    #
    
    603
    +    def list_objects(self):
    
    604
    +        objs = []
    
    605
    +        mtimes = []
    
    606
    +
    
    607
    +        for root, _, files in os.walk(os.path.join(self.casdir, 'objects')):
    
    608
    +            for filename in files:
    
    609
    +                obj_path = os.path.join(root, filename)
    
    610
    +                try:
    
    611
    +                    mtimes.append(os.path.getmtime(obj_path))
    
    612
    +                except FileNotFoundError:
    
    613
    +                    pass
    
    614
    +                else:
    
    615
    +                    objs.append(obj_path)
    
    616
    +
    
    617
    +        # NOTE: Sorted will sort from earliest to latest, thus the
    
    618
    +        # first element of this list will be the file modified earliest.
    
    619
    +        return sorted(zip(mtimes, objs))
    
    620
    +
    
    621
    +    def clean_up_refs_until(self, time):
    
    622
    +        ref_heads = os.path.join(self.casdir, 'refs', 'heads')
    
    623
    +
    
    624
    +        for root, _, files in os.walk(ref_heads):
    
    625
    +            for filename in files:
    
    626
    +                ref_path = os.path.join(root, filename)
    
    627
    +                # Obtain the mtime (the time a file was last modified)
    
    628
    +                if os.path.getmtime(ref_path) < time:
    
    629
    +                    os.unlink(ref_path)
    
    630
    +
    
    577 631
         # remove():
    
    578 632
         #
    
    579 633
         # Removes the given symbolic ref from the repo.
    
    ... ... @@ -625,7 +679,12 @@ class CASCache(ArtifactCache):
    625 679
         #
    
    626 680
         # Prune unreachable objects from the repo.
    
    627 681
         #
    
    628
    -    def prune(self):
    
    682
    +    # Args:
    
    683
    +    #    keep_after (int|None): timestamp after which unreachable objects
    
    684
    +    #                           are kept. None if no unreachable object
    
    685
    +    #                           should be kept.
    
    686
    +    #
    
    687
    +    def prune(self, keep_after=None):
    
    629 688
             ref_heads = os.path.join(self.casdir, 'refs', 'heads')
    
    630 689
     
    
    631 690
             pruned = 0
    
    ... ... @@ -646,11 +705,19 @@ class CASCache(ArtifactCache):
    646 705
                     objhash = os.path.basename(root) + filename
    
    647 706
                     if objhash not in reachable:
    
    648 707
                         obj_path = os.path.join(root, filename)
    
    708
    +                    if keep_after:
    
    709
    +                        st = os.stat(obj_path)
    
    710
    +                        if st.st_mtime >= keep_after:
    
    711
    +                            continue
    
    649 712
                         pruned += os.stat(obj_path).st_size
    
    650 713
                         os.unlink(obj_path)
    
    651 714
     
    
    652 715
             return pruned
    
    653 716
     
    
    717
    +    def update_tree_mtime(self, tree):
    
    718
    +        reachable = set()
    
    719
    +        self._reachable_refs_dir(reachable, tree, update_mtime=True)
    
    720
    +
    
    654 721
         ################################################
    
    655 722
         #             Local Private Methods            #
    
    656 723
         ################################################
    
    ... ... @@ -795,7 +862,7 @@ class CASCache(ArtifactCache):
    795 862
                     a += 1
    
    796 863
                     b += 1
    
    797 864
     
    
    798
    -    def _reachable_refs_dir(self, reachable, tree):
    
    865
    +    def _reachable_refs_dir(self, reachable, tree, update_mtime=False):
    
    799 866
             if tree.hash in reachable:
    
    800 867
                 return
    
    801 868
     
    
    ... ... @@ -807,10 +874,14 @@ class CASCache(ArtifactCache):
    807 874
                 directory.ParseFromString(f.read())
    
    808 875
     
    
    809 876
             for filenode in directory.files:
    
    877
    +            if update_mtime:
    
    878
    +                os.utime(self.objpath(filenode.digest))
    
    810 879
                 reachable.add(filenode.digest.hash)
    
    811 880
     
    
    812 881
             for dirnode in directory.directories:
    
    813
    -            self._reachable_refs_dir(reachable, dirnode.digest)
    
    882
    +            if update_mtime:
    
    883
    +                os.utime(self.objpath(dirnode.digest))
    
    884
    +            self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime)
    
    814 885
     
    
    815 886
         def _initialize_remote(self, remote_spec, q):
    
    816 887
             try:
    
    ... ... @@ -887,7 +958,7 @@ class CASCache(ArtifactCache):
    887 958
             with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
    
    888 959
                 self._fetch_blob(remote, digest, f)
    
    889 960
     
    
    890
    -            added_digest = self.add_object(path=f.name)
    
    961
    +            added_digest = self.add_object(path=f.name, link_directly=True)
    
    891 962
                 assert added_digest.hash == digest.hash
    
    892 963
     
    
    893 964
             return objpath
    
    ... ... @@ -898,7 +969,7 @@ class CASCache(ArtifactCache):
    898 969
                     f.write(data)
    
    899 970
                     f.flush()
    
    900 971
     
    
    901
    -                added_digest = self.add_object(path=f.name)
    
    972
    +                added_digest = self.add_object(path=f.name, link_directly=True)
    
    902 973
                     assert added_digest.hash == digest.hash
    
    903 974
     
    
    904 975
         # Helper function for _fetch_directory().
    
    ... ... @@ -1202,6 +1273,9 @@ class _CASBatchRead():
    1202 1273
             batch_response = self._remote.cas.BatchReadBlobs(self._request)
    
    1203 1274
     
    
    1204 1275
             for response in batch_response.responses:
    
    1276
    +            if response.status.code == code_pb2.NOT_FOUND:
    
    1277
    +                raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format(
    
    1278
    +                    response.digest.hash, response.status.code))
    
    1205 1279
                 if response.status.code != code_pb2.OK:
    
    1206 1280
                     raise ArtifactError("Failed to download blob {}: {}".format(
    
    1207 1281
                         response.digest.hash, response.status.code))
    

  • buildstream/_artifactcache/casserver.py
    ... ... @@ -24,6 +24,9 @@ import signal
    24 24
     import sys
    
    25 25
     import tempfile
    
    26 26
     import uuid
    
    27
    +import errno
    
    28
    +import ctypes
    
    29
    +import threading
    
    27 30
     
    
    28 31
     import click
    
    29 32
     import grpc
    
    ... ... @@ -31,6 +34,7 @@ import grpc
    31 34
     from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    32 35
     from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    33 36
     from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
    
    37
    +from .._protos.google.rpc import code_pb2
    
    34 38
     
    
    35 39
     from .._exceptions import ArtifactError
    
    36 40
     from .._context import Context
    
    ... ... @@ -40,6 +44,10 @@ from .._context import Context
    40 44
     # Limit payload to 1 MiB to leave sufficient headroom for metadata.
    
    41 45
     _MAX_PAYLOAD_BYTES = 1024 * 1024
    
    42 46
     
    
    47
    +# The minimum age in seconds for objects before they can be cleaned
    
    48
    +# up.
    
    49
    +_OBJECT_MIN_AGE = 6 * 60 * 60
    
    50
    +
    
    43 51
     
    
    44 52
     # Trying to push an artifact that is too large
    
    45 53
     class ArtifactTooLargeException(Exception):
    
    ... ... @@ -54,7 +62,7 @@ class ArtifactTooLargeException(Exception):
    54 62
     #     repo (str): Path to CAS repository
    
    55 63
     #     enable_push (bool): Whether to allow blob uploads and artifact updates
    
    56 64
     #
    
    57
    -def create_server(repo, *, enable_push):
    
    65
    +def create_server(repo, max_head_size, min_head_size, *, enable_push):
    
    58 66
         context = Context()
    
    59 67
         context.artifactdir = os.path.abspath(repo)
    
    60 68
     
    
    ... ... @@ -64,11 +72,13 @@ def create_server(repo, *, enable_push):
    64 72
         max_workers = (os.cpu_count() or 1) * 5
    
    65 73
         server = grpc.server(futures.ThreadPoolExecutor(max_workers))
    
    66 74
     
    
    75
    +    cache_cleaner = _CacheCleaner(artifactcache, max_head_size, min_head_size)
    
    76
    +
    
    67 77
         bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
    
    68
    -        _ByteStreamServicer(artifactcache, enable_push=enable_push), server)
    
    78
    +        _ByteStreamServicer(artifactcache, cache_cleaner, enable_push=enable_push), server)
    
    69 79
     
    
    70 80
         remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
    
    71
    -        _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
    
    81
    +        _ContentAddressableStorageServicer(artifactcache, cache_cleaner, enable_push=enable_push), server)
    
    72 82
     
    
    73 83
         remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
    
    74 84
             _CapabilitiesServicer(), server)
    
    ... ... @@ -86,9 +96,16 @@ def create_server(repo, *, enable_push):
    86 96
     @click.option('--client-certs', help="Public client certificates for TLS (PEM-encoded)")
    
    87 97
     @click.option('--enable-push', default=False, is_flag=True,
    
    88 98
                   help="Allow clients to upload blobs and update artifact cache")
    
    99
    +@click.option('--head-room-min', type=click.INT,
    
    100
    +              help="Disk head room minimum in bytes",
    
    101
    +              default=2e9)
    
    102
    +@click.option('--head-room-max', type=click.INT,
    
    103
    +              help="Disk head room maximum in bytes",
    
    104
    +              default=10e9)
    
    89 105
     @click.argument('repo')
    
    90
    -def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
    
    91
    -    server = create_server(repo, enable_push=enable_push)
    
    106
    +def server_main(repo, port, server_key, server_cert, client_certs, enable_push,
    
    107
    +                head_room_min, head_room_max):
    
    108
    +    server = create_server(repo, head_room_max, head_room_min, enable_push=enable_push)
    
    92 109
     
    
    93 110
         use_tls = bool(server_key)
    
    94 111
     
    
    ... ... @@ -129,11 +146,43 @@ def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
    129 146
             server.stop(0)
    
    130 147
     
    
    131 148
     
    
    149
    +class _FallocateCall:
    
    150
    +
    
    151
    +    FALLOC_FL_KEEP_SIZE = 1
    
    152
    +    FALLOC_FL_PUNCH_HOLE = 2
    
    153
    +    FALLOC_FL_NO_HIDE_STALE = 4
    
    154
    +    FALLOC_FL_COLLAPSE_RANGE = 8
    
    155
    +    FALLOC_FL_ZERO_RANGE = 16
    
    156
    +    FALLOC_FL_INSERT_RANGE = 32
    
    157
    +    FALLOC_FL_UNSHARE_RANGE = 64
    
    158
    +
    
    159
    +    def __init__(self):
    
    160
    +        self.libc = ctypes.CDLL("libc.so.6", use_errno=True)
    
    161
    +        try:
    
    162
    +            self.fallocate64 = self.libc.fallocate64
    
    163
    +        except AttributeError:
    
    164
    +            self.fallocate = self.libc.fallocate
    
    165
    +
    
    166
    +    def __call__(self, fd, mode, offset, length):
    
    167
    +        if hasattr(self, 'fallocate64'):
    
    168
    +            ret = self.fallocate64(ctypes.c_int(fd), ctypes.c_int(mode),
    
    169
    +                                   ctypes.c_int64(offset), ctypes.c_int64(length))
    
    170
    +        else:
    
    171
    +            ret = self.fallocate(ctypes.c_int(fd), ctypes.c_int(mode),
    
    172
    +                                 ctypes.c_int(offset), ctypes.c_int(length))
    
    173
    +        if ret == -1:
    
    174
    +            err = ctypes.get_errno()
    
    175
    +            raise OSError(errno, os.strerror(err))
    
    176
    +        return ret
    
    177
    +
    
    178
    +
    
    132 179
     class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    
    133
    -    def __init__(self, cas, *, enable_push):
    
    180
    +    def __init__(self, cas, cache_cleaner, *, enable_push):
    
    134 181
             super().__init__()
    
    135 182
             self.cas = cas
    
    136 183
             self.enable_push = enable_push
    
    184
    +        self.fallocate = _FallocateCall()
    
    185
    +        self.cache_cleaner = cache_cleaner
    
    137 186
     
    
    138 187
         def Read(self, request, context):
    
    139 188
             resource_name = request.resource_name
    
    ... ... @@ -191,25 +240,44 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    191 240
                             context.set_code(grpc.StatusCode.NOT_FOUND)
    
    192 241
                             return response
    
    193 242
     
    
    194
    -                    try:
    
    195
    -                        _clean_up_cache(self.cas, client_digest.size_bytes)
    
    196
    -                    except ArtifactTooLargeException as e:
    
    197
    -                        context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
    
    198
    -                        context.set_details(str(e))
    
    199
    -                        return response
    
    243
    +                    while True:
    
    244
    +                        if client_digest.size_bytes == 0:
    
    245
    +                            break
    
    246
    +                        try:
    
    247
    +                            self.cache_cleaner.clean_up(client_digest.size_bytes)
    
    248
    +                        except ArtifactTooLargeException as e:
    
    249
    +                            context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
    
    250
    +                            context.set_details(str(e))
    
    251
    +                            return response
    
    252
    +
    
    253
    +                        try:
    
    254
    +                            self.fallocate(out.fileno(), 0, 0, client_digest.size_bytes)
    
    255
    +                            break
    
    256
    +                        except OSError as e:
    
    257
    +                            # Multiple upload can happen in the same time
    
    258
    +                            if e.errno != errno.ENOSPC:
    
    259
    +                                raise
    
    260
    +
    
    200 261
                     elif request.resource_name:
    
    201 262
                         # If it is set on subsequent calls, it **must** match the value of the first request.
    
    202 263
                         if request.resource_name != resource_name:
    
    203 264
                             context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    204 265
                             return response
    
    266
    +
    
    267
    +                if (offset + len(request.data)) > client_digest.size_bytes:
    
    268
    +                    context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    269
    +                    return response
    
    270
    +
    
    205 271
                     out.write(request.data)
    
    272
    +
    
    206 273
                     offset += len(request.data)
    
    274
    +
    
    207 275
                     if request.finish_write:
    
    208 276
                         if client_digest.size_bytes != offset:
    
    209 277
                             context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    210 278
                             return response
    
    211 279
                         out.flush()
    
    212
    -                    digest = self.cas.add_object(path=out.name)
    
    280
    +                    digest = self.cas.add_object(path=out.name, link_directly=True)
    
    213 281
                         if digest.hash != client_digest.hash:
    
    214 282
                             context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    215 283
                             return response
    
    ... ... @@ -222,18 +290,26 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    222 290
     
    
    223 291
     
    
    224 292
     class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
    
    225
    -    def __init__(self, cas, *, enable_push):
    
    293
    +    def __init__(self, cas, cache_cleaner, *, enable_push):
    
    226 294
             super().__init__()
    
    227 295
             self.cas = cas
    
    228 296
             self.enable_push = enable_push
    
    297
    +        self.cache_cleaner = cache_cleaner
    
    229 298
     
    
    230 299
         def FindMissingBlobs(self, request, context):
    
    231 300
             response = remote_execution_pb2.FindMissingBlobsResponse()
    
    232 301
             for digest in request.blob_digests:
    
    233
    -            if not _has_object(self.cas, digest):
    
    234
    -                d = response.missing_blob_digests.add()
    
    235
    -                d.hash = digest.hash
    
    236
    -                d.size_bytes = digest.size_bytes
    
    302
    +            objpath = self.cas.objpath(digest)
    
    303
    +            try:
    
    304
    +                os.utime(objpath)
    
    305
    +            except OSError as e:
    
    306
    +                if e.errno != errno.ENOENT:
    
    307
    +                    raise
    
    308
    +                else:
    
    309
    +                    d = response.missing_blob_digests.add()
    
    310
    +                    d.hash = digest.hash
    
    311
    +                    d.size_bytes = digest.size_bytes
    
    312
    +
    
    237 313
             return response
    
    238 314
     
    
    239 315
         def BatchReadBlobs(self, request, context):
    
    ... ... @@ -252,12 +328,12 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
    252 328
                 try:
    
    253 329
                     with open(self.cas.objpath(digest), 'rb') as f:
    
    254 330
                         if os.fstat(f.fileno()).st_size != digest.size_bytes:
    
    255
    -                        blob_response.status.code = grpc.StatusCode.NOT_FOUND
    
    331
    +                        blob_response.status.code = code_pb2.NOT_FOUND
    
    256 332
                             continue
    
    257 333
     
    
    258 334
                         blob_response.data = f.read(digest.size_bytes)
    
    259 335
                 except FileNotFoundError:
    
    260
    -                blob_response.status.code = grpc.StatusCode.NOT_FOUND
    
    336
    +                blob_response.status.code = code_pb2.NOT_FOUND
    
    261 337
     
    
    262 338
             return response
    
    263 339
     
    
    ... ... @@ -287,7 +363,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
    287 363
                     continue
    
    288 364
     
    
    289 365
                 try:
    
    290
    -                _clean_up_cache(self.cas, digest.size_bytes)
    
    366
    +                self.cache_cleaner.clean_up(digest.size_bytes)
    
    291 367
     
    
    292 368
                     with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
    
    293 369
                         out.write(blob_request.data)
    
    ... ... @@ -330,6 +406,12 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
    330 406
     
    
    331 407
             try:
    
    332 408
                 tree = self.cas.resolve_ref(request.key, update_mtime=True)
    
    409
    +            try:
    
    410
    +                self.cas.update_tree_mtime(tree)
    
    411
    +            except FileNotFoundError:
    
    412
    +                self.cas.remove(request.key, defer_prune=True)
    
    413
    +                context.set_code(grpc.StatusCode.NOT_FOUND)
    
    414
    +                return response
    
    333 415
     
    
    334 416
                 response.digest.hash = tree.hash
    
    335 417
                 response.digest.size_bytes = tree.size_bytes
    
    ... ... @@ -402,60 +484,80 @@ def _digest_from_upload_resource_name(resource_name):
    402 484
             return None
    
    403 485
     
    
    404 486
     
    
    405
    -def _has_object(cas, digest):
    
    406
    -    objpath = cas.objpath(digest)
    
    407
    -    return os.path.exists(objpath)
    
    487
    +class _CacheCleaner:
    
    408 488
     
    
    489
    +    __cleanup_cache_lock = threading.Lock()
    
    409 490
     
    
    410
    -# _clean_up_cache()
    
    411
    -#
    
    412
    -# Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
    
    413
    -# is enough space for the incoming artifact
    
    414
    -#
    
    415
    -# Args:
    
    416
    -#   cas: CASCache object
    
    417
    -#   object_size: The size of the object being received in bytes
    
    418
    -#
    
    419
    -# Returns:
    
    420
    -#   int: The total bytes removed on the filesystem
    
    421
    -#
    
    422
    -def _clean_up_cache(cas, object_size):
    
    423
    -    # Determine the available disk space, in bytes, of the file system
    
    424
    -    # which mounts the repo
    
    425
    -    stats = os.statvfs(cas.casdir)
    
    426
    -    buffer_ = int(2e9)                # Add a 2 GB buffer
    
    427
    -    free_disk_space = (stats.f_bfree * stats.f_bsize) - buffer_
    
    428
    -    total_disk_space = (stats.f_blocks * stats.f_bsize) - buffer_
    
    429
    -
    
    430
    -    if object_size > total_disk_space:
    
    431
    -        raise ArtifactTooLargeException("Artifact of size: {} is too large for "
    
    432
    -                                        "the filesystem which mounts the remote "
    
    433
    -                                        "cache".format(object_size))
    
    434
    -
    
    435
    -    if object_size <= free_disk_space:
    
    436
    -        # No need to clean up
    
    437
    -        return 0
    
    438
    -
    
    439
    -    # obtain a list of LRP artifacts
    
    440
    -    LRP_artifacts = cas.list_artifacts()
    
    441
    -
    
    442
    -    removed_size = 0  # in bytes
    
    443
    -    while object_size - removed_size > free_disk_space:
    
    444
    -        try:
    
    445
    -            to_remove = LRP_artifacts.pop(0)  # The first element in the list is the LRP artifact
    
    446
    -        except IndexError:
    
    447
    -            # This exception is caught if there are no more artifacts in the list
    
    448
    -            # LRP_artifacts. This means the the artifact is too large for the filesystem
    
    449
    -            # so we abort the process
    
    450
    -            raise ArtifactTooLargeException("Artifact of size {} is too large for "
    
    451
    -                                            "the filesystem which mounts the remote "
    
    452
    -                                            "cache".format(object_size))
    
    491
    +    def __init__(self, cas, max_head_size, min_head_size=int(2e9)):
    
    492
    +        self.__cas = cas
    
    493
    +        self.__max_head_size = max_head_size
    
    494
    +        self.__min_head_size = min_head_size
    
    453 495
     
    
    454
    -        removed_size += cas.remove(to_remove, defer_prune=False)
    
    496
    +    def __has_space(self, object_size):
    
    497
    +        stats = os.statvfs(self.__cas.casdir)
    
    498
    +        free_disk_space = (stats.f_bavail * stats.f_bsize) - self.__min_head_size
    
    499
    +        total_disk_space = (stats.f_blocks * stats.f_bsize) - self.__min_head_size
    
    455 500
     
    
    456
    -    if removed_size > 0:
    
    457
    -        logging.info("Successfully removed {} bytes from the cache".format(removed_size))
    
    458
    -    else:
    
    459
    -        logging.info("No artifacts were removed from the cache.")
    
    501
    +        if object_size > total_disk_space:
    
    502
    +            raise ArtifactTooLargeException("Artifact of size: {} is too large for "
    
    503
    +                                            "the filesystem which mounts the remote "
    
    504
    +                                            "cache".format(object_size))
    
    460 505
     
    
    461
    -    return removed_size
    506
    +        return object_size <= free_disk_space
    
    507
    +
    
    508
    +    # _clean_up_cache()
    
    509
    +    #
    
    510
    +    # Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
    
    511
    +    # is enough space for the incoming artifact
    
    512
    +    #
    
    513
    +    # Args:
    
    514
    +    #   object_size: The size of the object being received in bytes
    
    515
    +    #
    
    516
    +    # Returns:
    
    517
    +    #   int: The total bytes removed on the filesystem
    
    518
    +    #
    
    519
    +    def clean_up(self, object_size):
    
    520
    +        if self.__has_space(object_size):
    
    521
    +            return 0
    
    522
    +
    
    523
    +        with _CacheCleaner.__cleanup_cache_lock:
    
    524
    +            if self.__has_space(object_size):
    
    525
    +                # Another thread has done the cleanup for us
    
    526
    +                return 0
    
    527
    +
    
    528
    +            stats = os.statvfs(self.__cas.casdir)
    
    529
    +            target_disk_space = (stats.f_bavail * stats.f_bsize) - self.__max_head_size
    
    530
    +
    
    531
    +            # obtain a list of LRP artifacts
    
    532
    +            LRP_objects = self.__cas.list_objects()
    
    533
    +
    
    534
    +            removed_size = 0  # in bytes
    
    535
    +
    
    536
    +            last_mtime = 0
    
    537
    +
    
    538
    +            while object_size - removed_size > target_disk_space:
    
    539
    +                try:
    
    540
    +                    last_mtime, to_remove = LRP_objects.pop(0)  # The first element in the list is the LRP artifact
    
    541
    +                except IndexError:
    
    542
    +                    # This exception is caught if there are no more artifacts in the list
    
    543
    +                    # LRP_artifacts. This means the the artifact is too large for the filesystem
    
    544
    +                    # so we abort the process
    
    545
    +                    raise ArtifactTooLargeException("Artifact of size {} is too large for "
    
    546
    +                                                    "the filesystem which mounts the remote "
    
    547
    +                                                    "cache".format(object_size))
    
    548
    +
    
    549
    +                try:
    
    550
    +                    size = os.stat(to_remove).st_size
    
    551
    +                    os.unlink(to_remove)
    
    552
    +                    removed_size += size
    
    553
    +                except FileNotFoundError:
    
    554
    +                    pass
    
    555
    +
    
    556
    +            self.__cas.clean_up_refs_until(last_mtime)
    
    557
    +
    
    558
    +            if removed_size > 0:
    
    559
    +                logging.info("Successfully removed {} bytes from the cache".format(removed_size))
    
    560
    +            else:
    
    561
    +                logging.info("No artifacts were removed from the cache.")
    
    562
    +
    
    563
    +            return removed_size

  • buildstream/_versions.py
    ... ... @@ -23,7 +23,7 @@
    23 23
     # This version is bumped whenever enhancements are made
    
    24 24
     # to the `project.conf` format or the core element format.
    
    25 25
     #
    
    26
    -BST_FORMAT_VERSION = 17
    
    26
    +BST_FORMAT_VERSION = 18
    
    27 27
     
    
    28 28
     
    
    29 29
     # The base BuildStream artifact version
    

  • buildstream/_yaml.py
    ... ... @@ -1049,6 +1049,12 @@ class ChainMap(collections.ChainMap):
    1049 1049
             for key in clearable:
    
    1050 1050
                 del self[key]
    
    1051 1051
     
    
    1052
    +    def get(self, key, default=None):
    
    1053
    +        try:
    
    1054
    +            return self[key]
    
    1055
    +        except KeyError:
    
    1056
    +            return default
    
    1057
    +
    
    1052 1058
     
    
    1053 1059
     def node_chain_copy(source):
    
    1054 1060
         copy = ChainMap({}, source)
    

  • buildstream/plugins/elements/manual.yaml
    1
    -# No variables added for the manual element by default, set
    
    2
    -# this if you plan to use make, and the sources cannot handle
    
    3
    -# parallelization.
    
    4
    -#
    
    5
    -# variables:
    
    6
    -#
    
    7
    -#   notparallel: True
    
    8
    -
    
    9 1
     # Manual build element does not provide any default
    
    10 2
     # build commands
    
    11 3
     config:
    
    ... ... @@ -28,14 +20,3 @@ config:
    28 20
       strip-commands:
    
    29 21
       - |
    
    30 22
         %{strip-binaries}
    31
    -
    
    32
    -# Use max-jobs CPUs for building and enable verbosity
    
    33
    -environment:
    
    34
    -  MAKEFLAGS: -j%{max-jobs}
    
    35
    -  V: 1
    
    36
    -
    
    37
    -# And dont consider MAKEFLAGS or V as something which may
    
    38
    -# affect build output.
    
    39
    -environment-nocache:
    
    40
    -- MAKEFLAGS
    
    41
    -- V

  • tests/frontend/push.py
    ... ... @@ -253,6 +253,8 @@ def test_artifact_expires(cli, datafiles, tmpdir):
    253 253
             assert cli.get_element_state(project, 'element2.bst') == 'cached'
    
    254 254
             assert_shared(cli, share, project, 'element2.bst')
    
    255 255
     
    
    256
    +        share.make_all_objects_older()
    
    257
    +
    
    256 258
             # Create and build another element of 5 MB (This will exceed the free disk space available)
    
    257 259
             create_element_size('element3.bst', project, element_path, [], int(5e6))
    
    258 260
             result = cli.run(project=project, args=['build', 'element3.bst'])
    
    ... ... @@ -350,6 +352,7 @@ def test_recently_pulled_artifact_does_not_expire(cli, datafiles, tmpdir):
    350 352
             assert cli.get_element_state(project, 'element1.bst') == 'cached'
    
    351 353
     
    
    352 354
             wait_for_cache_granularity()
    
    355
    +        share.make_all_objects_older()
    
    353 356
     
    
    354 357
             # Create and build the element3 (of 5 MB)
    
    355 358
             create_element_size('element3.bst', project, element_path, [], int(5e6))
    

  • tests/testutils/artifactshare.py
    ... ... @@ -138,6 +138,15 @@ class ArtifactShare():
    138 138
             except ArtifactError:
    
    139 139
                 return False
    
    140 140
     
    
    141
    +    def make_all_objects_older(self):
    
    142
    +        for root, dirs, files in os.walk(os.path.join(self.cas.casdir, 'objects')):
    
    143
    +            for name in files:
    
    144
    +                fullname = os.path.join(root, name)
    
    145
    +                st = os.stat(fullname)
    
    146
    +                mtime = st.st_mtime - 6 * 60 * 60
    
    147
    +                atime = st.st_atime - 6 * 60 * 60
    
    148
    +                os.utime(fullname, times=(atime, mtime))
    
    149
    +
    
    141 150
         # close():
    
    142 151
         #
    
    143 152
         # Remove the artifact share.
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]