[Notes] [Git][BuildStream/buildstream][valentindavid/cache_server_fill_up-1.2] 13 commits: Ensure `--deps=none` option works for `bst checkout`



Title: GitLab

Valentin David pushed to branch valentindavid/cache_server_fill_up-1.2 at BuildStream / buildstream

Commits:

18 changed files:

Changes:

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -26,6 +26,7 @@ import stat
    26 26
     import tempfile
    
    27 27
     import uuid
    
    28 28
     import errno
    
    29
    +import contextlib
    
    29 30
     from urllib.parse import urlparse
    
    30 31
     
    
    31 32
     import grpc
    
    ... ... @@ -48,6 +49,13 @@ from . import ArtifactCache
    48 49
     _MAX_PAYLOAD_BYTES = 1024 * 1024
    
    49 50
     
    
    50 51
     
    
    52
    +class BlobNotFound(ArtifactError):
    
    53
    +
    
    54
    +    def __init__(self, blob, msg):
    
    55
    +        self.blob = blob
    
    56
    +        super().__init__(msg)
    
    57
    +
    
    58
    +
    
    51 59
     # A CASCache manages artifacts in a CAS repository as specified in the
    
    52 60
     # Remote Execution API.
    
    53 61
     #
    
    ... ... @@ -259,6 +267,10 @@ class CASCache(ArtifactCache):
    259 267
                         element.info("Remote ({}) does not have {} cached".format(
    
    260 268
                             remote.spec.url, element._get_brief_display_key()
    
    261 269
                         ))
    
    270
    +            except BlobNotFound as e:
    
    271
    +                element.info("Remote ({}) does not have {} cached".format(
    
    272
    +                    remote.spec.url, element._get_brief_display_key()
    
    273
    +                ))
    
    262 274
     
    
    263 275
             return False
    
    264 276
     
    
    ... ... @@ -360,13 +372,14 @@ class CASCache(ArtifactCache):
    360 372
         #     digest (Digest): An optional Digest object to populate
    
    361 373
         #     path (str): Path to file to add
    
    362 374
         #     buffer (bytes): Byte buffer to add
    
    375
    +    #     link_directly (bool): Whether file given by path can be linked
    
    363 376
         #
    
    364 377
         # Returns:
    
    365 378
         #     (Digest): The digest of the added object
    
    366 379
         #
    
    367 380
         # Either `path` or `buffer` must be passed, but not both.
    
    368 381
         #
    
    369
    -    def add_object(self, *, digest=None, path=None, buffer=None):
    
    382
    +    def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False):
    
    370 383
             # Exactly one of the two parameters has to be specified
    
    371 384
             assert (path is None) != (buffer is None)
    
    372 385
     
    
    ... ... @@ -376,28 +389,34 @@ class CASCache(ArtifactCache):
    376 389
             try:
    
    377 390
                 h = hashlib.sha256()
    
    378 391
                 # Always write out new file to avoid corruption if input file is modified
    
    379
    -            with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
    
    380
    -                # Set mode bits to 0644
    
    381
    -                os.chmod(out.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
    
    382
    -
    
    383
    -                if path:
    
    384
    -                    with open(path, 'rb') as f:
    
    385
    -                        for chunk in iter(lambda: f.read(4096), b""):
    
    386
    -                            h.update(chunk)
    
    387
    -                            out.write(chunk)
    
    392
    +            with contextlib.ExitStack() as stack:
    
    393
    +                if path is not None and link_directly:
    
    394
    +                    tmp = stack.enter_context(open(path, 'rb'))
    
    395
    +                    for chunk in iter(lambda: tmp.read(4096), b""):
    
    396
    +                        h.update(chunk)
    
    388 397
                     else:
    
    389
    -                    h.update(buffer)
    
    390
    -                    out.write(buffer)
    
    398
    +                    tmp = stack.enter_context(tempfile.NamedTemporaryFile(dir=self.tmpdir))
    
    399
    +                    # Set mode bits to 0644
    
    400
    +                    os.chmod(tmp.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
    
    391 401
     
    
    392
    -                out.flush()
    
    402
    +                    if path:
    
    403
    +                        with open(path, 'rb') as f:
    
    404
    +                            for chunk in iter(lambda: f.read(4096), b""):
    
    405
    +                                h.update(chunk)
    
    406
    +                                tmp.write(chunk)
    
    407
    +                    else:
    
    408
    +                        h.update(buffer)
    
    409
    +                        tmp.write(buffer)
    
    410
    +
    
    411
    +                    tmp.flush()
    
    393 412
     
    
    394 413
                     digest.hash = h.hexdigest()
    
    395
    -                digest.size_bytes = os.fstat(out.fileno()).st_size
    
    414
    +                digest.size_bytes = os.fstat(tmp.fileno()).st_size
    
    396 415
     
    
    397 416
                     # Place file at final location
    
    398 417
                     objpath = self.objpath(digest)
    
    399 418
                     os.makedirs(os.path.dirname(objpath), exist_ok=True)
    
    400
    -                os.link(out.name, objpath)
    
    419
    +                os.link(tmp.name, objpath)
    
    401 420
     
    
    402 421
             except FileExistsError as e:
    
    403 422
                 # We can ignore the failed link() if the object is already in the repo.
    
    ... ... @@ -481,6 +500,41 @@ class CASCache(ArtifactCache):
    481 500
             # first element of this list will be the file modified earliest.
    
    482 501
             return [ref for _, ref in sorted(zip(mtimes, refs))]
    
    483 502
     
    
    503
    +    # list_objects():
    
    504
    +    #
    
    505
    +    # List cached objects in Least Recently Modified (LRM) order.
    
    506
    +    #
    
    507
    +    # Returns:
    
    508
    +    #     (list) - A list of objects and timestamps in LRM order
    
    509
    +    #
    
    510
    +    def list_objects(self):
    
    511
    +        objs = []
    
    512
    +        mtimes = []
    
    513
    +
    
    514
    +        for root, _, files in os.walk(os.path.join(self.casdir, 'objects')):
    
    515
    +            for filename in files:
    
    516
    +                obj_path = os.path.join(root, filename)
    
    517
    +                try:
    
    518
    +                    mtimes.append(os.path.getmtime(obj_path))
    
    519
    +                except FileNotFoundError:
    
    520
    +                    pass
    
    521
    +                else:
    
    522
    +                    objs.append(obj_path)
    
    523
    +
    
    524
    +        # NOTE: Sorted will sort from earliest to latest, thus the
    
    525
    +        # first element of this list will be the file modified earliest.
    
    526
    +        return sorted(zip(mtimes, objs))
    
    527
    +
    
    528
    +    def clean_up_refs_until(self, time):
    
    529
    +        ref_heads = os.path.join(self.casdir, 'refs', 'heads')
    
    530
    +
    
    531
    +        for root, _, files in os.walk(ref_heads):
    
    532
    +            for filename in files:
    
    533
    +                ref_path = os.path.join(root, filename)
    
    534
    +                # Obtain the mtime (the time a file was last modified)
    
    535
    +                if os.path.getmtime(ref_path) < time:
    
    536
    +                    os.unlink(ref_path)
    
    537
    +
    
    484 538
         # remove():
    
    485 539
         #
    
    486 540
         # Removes the given symbolic ref from the repo.
    
    ... ... @@ -558,6 +612,10 @@ class CASCache(ArtifactCache):
    558 612
     
    
    559 613
             return pruned
    
    560 614
     
    
    615
    +    def update_tree_mtime(self, tree):
    
    616
    +        reachable = set()
    
    617
    +        self._reachable_refs_dir(reachable, tree, update_mtime=True)
    
    618
    +
    
    561 619
         ################################################
    
    562 620
         #             Local Private Methods            #
    
    563 621
         ################################################
    
    ... ... @@ -699,10 +757,13 @@ class CASCache(ArtifactCache):
    699 757
                     a += 1
    
    700 758
                     b += 1
    
    701 759
     
    
    702
    -    def _reachable_refs_dir(self, reachable, tree):
    
    760
    +    def _reachable_refs_dir(self, reachable, tree, update_mtime=False):
    
    703 761
             if tree.hash in reachable:
    
    704 762
                 return
    
    705 763
     
    
    764
    +        if update_mtime:
    
    765
    +            os.utime(self.objpath(tree))
    
    766
    +
    
    706 767
             reachable.add(tree.hash)
    
    707 768
     
    
    708 769
             directory = remote_execution_pb2.Directory()
    
    ... ... @@ -711,10 +772,12 @@ class CASCache(ArtifactCache):
    711 772
                 directory.ParseFromString(f.read())
    
    712 773
     
    
    713 774
             for filenode in directory.files:
    
    775
    +            if update_mtime:
    
    776
    +                os.utime(self.objpath(filenode.digest))
    
    714 777
                 reachable.add(filenode.digest.hash)
    
    715 778
     
    
    716 779
             for dirnode in directory.directories:
    
    717
    -            self._reachable_refs_dir(reachable, dirnode.digest)
    
    780
    +            self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime)
    
    718 781
     
    
    719 782
         def _initialize_remote(self, remote_spec, q):
    
    720 783
             try:
    
    ... ... @@ -791,7 +854,7 @@ class CASCache(ArtifactCache):
    791 854
             with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
    
    792 855
                 self._fetch_blob(remote, digest, f)
    
    793 856
     
    
    794
    -            added_digest = self.add_object(path=f.name)
    
    857
    +            added_digest = self.add_object(path=f.name, link_directly=True)
    
    795 858
                 assert added_digest.hash == digest.hash
    
    796 859
     
    
    797 860
             return objpath
    
    ... ... @@ -802,7 +865,7 @@ class CASCache(ArtifactCache):
    802 865
                     f.write(data)
    
    803 866
                     f.flush()
    
    804 867
     
    
    805
    -                added_digest = self.add_object(path=f.name)
    
    868
    +                added_digest = self.add_object(path=f.name, link_directly=True)
    
    806 869
                     assert added_digest.hash == digest.hash
    
    807 870
     
    
    808 871
         # Helper function for _fetch_directory().
    
    ... ... @@ -1079,6 +1142,9 @@ class _CASBatchRead():
    1079 1142
             batch_response = self._remote.cas.BatchReadBlobs(self._request)
    
    1080 1143
     
    
    1081 1144
             for response in batch_response.responses:
    
    1145
    +            if response.status.code == grpc.StatusCode.NOT_FOUND.value[0]:
    
    1146
    +                raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format(
    
    1147
    +                    response.digest.hash, response.status.code))
    
    1082 1148
                 if response.status.code != grpc.StatusCode.OK.value[0]:
    
    1083 1149
                     raise ArtifactError("Failed to download blob {}: {}".format(
    
    1084 1150
                         response.digest.hash, response.status.code))
    

  • buildstream/_artifactcache/casserver.py
    ... ... @@ -24,6 +24,9 @@ import signal
    24 24
     import sys
    
    25 25
     import tempfile
    
    26 26
     import uuid
    
    27
    +import errno
    
    28
    +import ctypes
    
    29
    +import threading
    
    27 30
     
    
    28 31
     import click
    
    29 32
     import grpc
    
    ... ... @@ -56,7 +59,9 @@ class ArtifactTooLargeException(Exception):
    56 59
     #     repo (str): Path to CAS repository
    
    57 60
     #     enable_push (bool): Whether to allow blob uploads and artifact updates
    
    58 61
     #
    
    59
    -def create_server(repo, *, enable_push):
    
    62
    +def create_server(repo, *, enable_push,
    
    63
    +                  max_head_size=int(10e9),
    
    64
    +                  min_head_size=int(2e9)):
    
    60 65
         context = Context()
    
    61 66
         context.artifactdir = os.path.abspath(repo)
    
    62 67
     
    
    ... ... @@ -66,11 +71,13 @@ def create_server(repo, *, enable_push):
    66 71
         max_workers = (os.cpu_count() or 1) * 5
    
    67 72
         server = grpc.server(futures.ThreadPoolExecutor(max_workers))
    
    68 73
     
    
    74
    +    cache_cleaner = _CacheCleaner(artifactcache, max_head_size, min_head_size)
    
    75
    +
    
    69 76
         bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
    
    70
    -        _ByteStreamServicer(artifactcache, enable_push=enable_push), server)
    
    77
    +        _ByteStreamServicer(artifactcache, cache_cleaner, enable_push=enable_push), server)
    
    71 78
     
    
    72 79
         remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
    
    73
    -        _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
    
    80
    +        _ContentAddressableStorageServicer(artifactcache, cache_cleaner, enable_push=enable_push), server)
    
    74 81
     
    
    75 82
         remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
    
    76 83
             _CapabilitiesServicer(), server)
    
    ... ... @@ -88,9 +95,19 @@ def create_server(repo, *, enable_push):
    88 95
     @click.option('--client-certs', help="Public client certificates for TLS (PEM-encoded)")
    
    89 96
     @click.option('--enable-push', default=False, is_flag=True,
    
    90 97
                   help="Allow clients to upload blobs and update artifact cache")
    
    98
    +@click.option('--head-room-min', type=click.INT,
    
    99
    +              help="Disk head room minimum in bytes",
    
    100
    +              default=2e9)
    
    101
    +@click.option('--head-room-max', type=click.INT,
    
    102
    +              help="Disk head room maximum in bytes",
    
    103
    +              default=10e9)
    
    91 104
     @click.argument('repo')
    
    92
    -def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
    
    93
    -    server = create_server(repo, enable_push=enable_push)
    
    105
    +def server_main(repo, port, server_key, server_cert, client_certs, enable_push,
    
    106
    +                head_room_min, head_room_max):
    
    107
    +    server = create_server(repo,
    
    108
    +                           max_head_size=head_room_max,
    
    109
    +                           min_head_size=head_room_min,
    
    110
    +                           enable_push=enable_push)
    
    94 111
     
    
    95 112
         use_tls = bool(server_key)
    
    96 113
     
    
    ... ... @@ -131,11 +148,43 @@ def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
    131 148
             server.stop(0)
    
    132 149
     
    
    133 150
     
    
    151
    +class _FallocateCall:
    
    152
    +
    
    153
    +    FALLOC_FL_KEEP_SIZE = 1
    
    154
    +    FALLOC_FL_PUNCH_HOLE = 2
    
    155
    +    FALLOC_FL_NO_HIDE_STALE = 4
    
    156
    +    FALLOC_FL_COLLAPSE_RANGE = 8
    
    157
    +    FALLOC_FL_ZERO_RANGE = 16
    
    158
    +    FALLOC_FL_INSERT_RANGE = 32
    
    159
    +    FALLOC_FL_UNSHARE_RANGE = 64
    
    160
    +
    
    161
    +    def __init__(self):
    
    162
    +        self.libc = ctypes.CDLL("libc.so.6", use_errno=True)
    
    163
    +        try:
    
    164
    +            self.fallocate64 = self.libc.fallocate64
    
    165
    +        except AttributeError:
    
    166
    +            self.fallocate = self.libc.fallocate
    
    167
    +
    
    168
    +    def __call__(self, fd, mode, offset, length):
    
    169
    +        if hasattr(self, 'fallocate64'):
    
    170
    +            ret = self.fallocate64(ctypes.c_int(fd), ctypes.c_int(mode),
    
    171
    +                                   ctypes.c_int64(offset), ctypes.c_int64(length))
    
    172
    +        else:
    
    173
    +            ret = self.fallocate(ctypes.c_int(fd), ctypes.c_int(mode),
    
    174
    +                                 ctypes.c_int(offset), ctypes.c_int(length))
    
    175
    +        if ret == -1:
    
    176
    +            err = ctypes.get_errno()
    
    177
    +            raise OSError(errno, os.strerror(err))
    
    178
    +        return ret
    
    179
    +
    
    180
    +
    
    134 181
     class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    
    135
    -    def __init__(self, cas, *, enable_push):
    
    182
    +    def __init__(self, cas, cache_cleaner, *, enable_push):
    
    136 183
             super().__init__()
    
    137 184
             self.cas = cas
    
    138 185
             self.enable_push = enable_push
    
    186
    +        self.fallocate = _FallocateCall()
    
    187
    +        self.cache_cleaner = cache_cleaner
    
    139 188
     
    
    140 189
         def Read(self, request, context):
    
    141 190
             resource_name = request.resource_name
    
    ... ... @@ -193,17 +242,34 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    193 242
                             context.set_code(grpc.StatusCode.NOT_FOUND)
    
    194 243
                             return response
    
    195 244
     
    
    196
    -                    try:
    
    197
    -                        _clean_up_cache(self.cas, client_digest.size_bytes)
    
    198
    -                    except ArtifactTooLargeException as e:
    
    199
    -                        context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
    
    200
    -                        context.set_details(str(e))
    
    201
    -                        return response
    
    245
    +                    while True:
    
    246
    +                        if client_digest.size_bytes == 0:
    
    247
    +                            break
    
    248
    +                        try:
    
    249
    +                            self.cache_cleaner.clean_up(client_digest.size_bytes)
    
    250
    +                        except ArtifactTooLargeException as e:
    
    251
    +                            context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
    
    252
    +                            context.set_details(str(e))
    
    253
    +                            return response
    
    254
    +
    
    255
    +                        try:
    
    256
    +                            self.fallocate(out.fileno(), 0, 0, client_digest.size_bytes)
    
    257
    +                            break
    
    258
    +                        except OSError as e:
    
    259
    +                            # Multiple upload can happen in the same time
    
    260
    +                            if e.errno != errno.ENOSPC:
    
    261
    +                                raise
    
    262
    +
    
    202 263
                     elif request.resource_name:
    
    203 264
                         # If it is set on subsequent calls, it **must** match the value of the first request.
    
    204 265
                         if request.resource_name != resource_name:
    
    205 266
                             context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    206 267
                             return response
    
    268
    +
    
    269
    +                if (offset + len(request.data)) > client_digest.size_bytes:
    
    270
    +                    context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    271
    +                    return response
    
    272
    +
    
    207 273
                     out.write(request.data)
    
    208 274
                     offset += len(request.data)
    
    209 275
                     if request.finish_write:
    
    ... ... @@ -211,7 +277,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    211 277
                             context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    212 278
                             return response
    
    213 279
                         out.flush()
    
    214
    -                    digest = self.cas.add_object(path=out.name)
    
    280
    +                    digest = self.cas.add_object(path=out.name, link_directly=True)
    
    215 281
                         if digest.hash != client_digest.hash:
    
    216 282
                             context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    217 283
                             return response
    
    ... ... @@ -224,18 +290,26 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    224 290
     
    
    225 291
     
    
    226 292
     class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
    
    227
    -    def __init__(self, cas, *, enable_push):
    
    293
    +    def __init__(self, cas, cache_cleaner, *, enable_push):
    
    228 294
             super().__init__()
    
    229 295
             self.cas = cas
    
    230 296
             self.enable_push = enable_push
    
    297
    +        self.cache_cleaner = cache_cleaner
    
    231 298
     
    
    232 299
         def FindMissingBlobs(self, request, context):
    
    233 300
             response = remote_execution_pb2.FindMissingBlobsResponse()
    
    234 301
             for digest in request.blob_digests:
    
    235
    -            if not _has_object(self.cas, digest):
    
    236
    -                d = response.missing_blob_digests.add()
    
    237
    -                d.hash = digest.hash
    
    238
    -                d.size_bytes = digest.size_bytes
    
    302
    +            objpath = self.cas.objpath(digest)
    
    303
    +            try:
    
    304
    +                os.utime(objpath)
    
    305
    +            except OSError as e:
    
    306
    +                if e.errno != errno.ENOENT:
    
    307
    +                    raise
    
    308
    +                else:
    
    309
    +                    d = response.missing_blob_digests.add()
    
    310
    +                    d.hash = digest.hash
    
    311
    +                    d.size_bytes = digest.size_bytes
    
    312
    +
    
    239 313
             return response
    
    240 314
     
    
    241 315
         def BatchReadBlobs(self, request, context):
    
    ... ... @@ -254,12 +328,12 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
    254 328
                 try:
    
    255 329
                     with open(self.cas.objpath(digest), 'rb') as f:
    
    256 330
                         if os.fstat(f.fileno()).st_size != digest.size_bytes:
    
    257
    -                        blob_response.status.code = grpc.StatusCode.NOT_FOUND
    
    331
    +                        blob_response.status.code = grpc.StatusCode.NOT_FOUND.value[0]
    
    258 332
                             continue
    
    259 333
     
    
    260 334
                         blob_response.data = f.read(digest.size_bytes)
    
    261 335
                 except FileNotFoundError:
    
    262
    -                blob_response.status.code = grpc.StatusCode.NOT_FOUND
    
    336
    +                blob_response.status.code = grpc.StatusCode.NOT_FOUND.value[0]
    
    263 337
     
    
    264 338
             return response
    
    265 339
     
    
    ... ... @@ -289,7 +363,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
    289 363
                     continue
    
    290 364
     
    
    291 365
                 try:
    
    292
    -                _clean_up_cache(self.cas, digest.size_bytes)
    
    366
    +                self.cache_cleaner.clean_up(digest.size_bytes)
    
    293 367
     
    
    294 368
                     with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
    
    295 369
                         out.write(blob_request.data)
    
    ... ... @@ -332,6 +406,12 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
    332 406
     
    
    333 407
             try:
    
    334 408
                 tree = self.cas.resolve_ref(request.key, update_mtime=True)
    
    409
    +            try:
    
    410
    +                self.cas.update_tree_mtime(tree)
    
    411
    +            except FileNotFoundError:
    
    412
    +                self.cas.remove(request.key, defer_prune=True)
    
    413
    +                context.set_code(grpc.StatusCode.NOT_FOUND)
    
    414
    +                return response
    
    335 415
     
    
    336 416
                 response.digest.hash = tree.hash
    
    337 417
                 response.digest.size_bytes = tree.size_bytes
    
    ... ... @@ -404,60 +484,79 @@ def _digest_from_upload_resource_name(resource_name):
    404 484
             return None
    
    405 485
     
    
    406 486
     
    
    407
    -def _has_object(cas, digest):
    
    408
    -    objpath = cas.objpath(digest)
    
    409
    -    return os.path.exists(objpath)
    
    487
    +class _CacheCleaner:
    
    410 488
     
    
    489
    +    __cleanup_cache_lock = threading.Lock()
    
    411 490
     
    
    412
    -# _clean_up_cache()
    
    413
    -#
    
    414
    -# Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
    
    415
    -# is enough space for the incoming artifact
    
    416
    -#
    
    417
    -# Args:
    
    418
    -#   cas: CASCache object
    
    419
    -#   object_size: The size of the object being received in bytes
    
    420
    -#
    
    421
    -# Returns:
    
    422
    -#   int: The total bytes removed on the filesystem
    
    423
    -#
    
    424
    -def _clean_up_cache(cas, object_size):
    
    425
    -    # Determine the available disk space, in bytes, of the file system
    
    426
    -    # which mounts the repo
    
    427
    -    stats = os.statvfs(cas.casdir)
    
    428
    -    buffer_ = int(2e9)                # Add a 2 GB buffer
    
    429
    -    free_disk_space = (stats.f_bfree * stats.f_bsize) - buffer_
    
    430
    -    total_disk_space = (stats.f_blocks * stats.f_bsize) - buffer_
    
    431
    -
    
    432
    -    if object_size > total_disk_space:
    
    433
    -        raise ArtifactTooLargeException("Artifact of size: {} is too large for "
    
    434
    -                                        "the filesystem which mounts the remote "
    
    435
    -                                        "cache".format(object_size))
    
    436
    -
    
    437
    -    if object_size <= free_disk_space:
    
    438
    -        # No need to clean up
    
    439
    -        return 0
    
    440
    -
    
    441
    -    # obtain a list of LRP artifacts
    
    442
    -    LRP_artifacts = cas.list_artifacts()
    
    443
    -
    
    444
    -    removed_size = 0  # in bytes
    
    445
    -    while object_size - removed_size > free_disk_space:
    
    446
    -        try:
    
    447
    -            to_remove = LRP_artifacts.pop(0)  # The first element in the list is the LRP artifact
    
    448
    -        except IndexError:
    
    449
    -            # This exception is caught if there are no more artifacts in the list
    
    450
    -            # LRP_artifacts. This means the the artifact is too large for the filesystem
    
    451
    -            # so we abort the process
    
    452
    -            raise ArtifactTooLargeException("Artifact of size {} is too large for "
    
    453
    -                                            "the filesystem which mounts the remote "
    
    454
    -                                            "cache".format(object_size))
    
    491
    +    def __init__(self, cas, max_head_size, min_head_size=int(2e9)):
    
    492
    +        self.__cas = cas
    
    493
    +        self.__max_head_size = max_head_size
    
    494
    +        self.__min_head_size = min_head_size
    
    455 495
     
    
    456
    -        removed_size += cas.remove(to_remove, defer_prune=False)
    
    496
    +    def __has_space(self, object_size):
    
    497
    +        stats = os.statvfs(self.__cas.casdir)
    
    498
    +        free_disk_space = (stats.f_bavail * stats.f_bsize) - self.__min_head_size
    
    499
    +        total_disk_space = (stats.f_blocks * stats.f_bsize) - self.__min_head_size
    
    457 500
     
    
    458
    -    if removed_size > 0:
    
    459
    -        logging.info("Successfully removed {} bytes from the cache".format(removed_size))
    
    460
    -    else:
    
    461
    -        logging.info("No artifacts were removed from the cache.")
    
    501
    +        if object_size > total_disk_space:
    
    502
    +            raise ArtifactTooLargeException("Artifact of size: {} is too large for "
    
    503
    +                                            "the filesystem which mounts the remote "
    
    504
    +                                            "cache".format(object_size))
    
    462 505
     
    
    463
    -    return removed_size
    506
    +        return object_size <= free_disk_space
    
    507
    +
    
    508
    +    # _clean_up_cache()
    
    509
    +    #
    
    510
    +    # Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
    
    511
    +    # is enough space for the incoming artifact
    
    512
    +    #
    
    513
    +    # Args:
    
    514
    +    #   object_size: The size of the object being received in bytes
    
    515
    +    #
    
    516
    +    # Returns:
    
    517
    +    #   int: The total bytes removed on the filesystem
    
    518
    +    #
    
    519
    +    def clean_up(self, object_size):
    
    520
    +        if self.__has_space(object_size):
    
    521
    +            return 0
    
    522
    +
    
    523
    +        with _CacheCleaner.__cleanup_cache_lock:
    
    524
    +            if self.__has_space(object_size):
    
    525
    +                # Another thread has done the cleanup for us
    
    526
    +                return 0
    
    527
    +
    
    528
    +            stats = os.statvfs(self.__cas.casdir)
    
    529
    +            target_disk_space = (stats.f_bavail * stats.f_bsize) - self.__max_head_size
    
    530
    +
    
    531
    +            # obtain a list of LRP artifacts
    
    532
    +            LRP_objects = self.__cas.list_objects()
    
    533
    +
    
    534
    +            removed_size = 0  # in bytes
    
    535
    +            last_mtime = 0
    
    536
    +
    
    537
    +            while object_size - removed_size > target_disk_space:
    
    538
    +                try:
    
    539
    +                    last_mtime, to_remove = LRP_objects.pop(0)  # The first element in the list is the LRP artifact
    
    540
    +                except IndexError:
    
    541
    +                    # This exception is caught if there are no more artifacts in the list
    
    542
    +                    # LRP_artifacts. This means the the artifact is too large for the filesystem
    
    543
    +                    # so we abort the process
    
    544
    +                    raise ArtifactTooLargeException("Artifact of size {} is too large for "
    
    545
    +                                                    "the filesystem which mounts the remote "
    
    546
    +                                                    "cache".format(object_size))
    
    547
    +
    
    548
    +                try:
    
    549
    +                    size = os.stat(to_remove).st_size
    
    550
    +                    os.unlink(to_remove)
    
    551
    +                    removed_size += size
    
    552
    +                except FileNotFoundError:
    
    553
    +                    pass
    
    554
    +
    
    555
    +            self.__cas.clean_up_refs_until(last_mtime)
    
    556
    +
    
    557
    +            if removed_size > 0:
    
    558
    +                logging.info("Successfully removed {} bytes from the cache".format(removed_size))
    
    559
    +            else:
    
    560
    +                logging.info("No artifacts were removed from the cache.")
    
    561
    +
    
    562
    +            return removed_size

  • buildstream/element.py
    ... ... @@ -434,7 +434,7 @@ class Element(Plugin):
    434 434
                                                     visited=visited, recursed=True)
    
    435 435
     
    
    436 436
             # Yeild self only at the end, after anything needed has been traversed
    
    437
    -        if should_yield and (recurse or recursed) and (scope == Scope.ALL or scope == Scope.RUN):
    
    437
    +        if should_yield and (recurse or recursed) and scope != Scope.BUILD:
    
    438 438
                 yield self
    
    439 439
     
    
    440 440
         def search(self, scope, name):
    
    ... ... @@ -1289,17 +1289,21 @@ class Element(Plugin):
    1289 1289
                     if scope == Scope.BUILD:
    
    1290 1290
                         self.stage(sandbox)
    
    1291 1291
                     elif scope == Scope.RUN:
    
    1292
    -                    # Stage deps in the sandbox root
    
    1293 1292
                         if deps == 'run':
    
    1294
    -                        with self.timed_activity("Staging dependencies", silent_nested=True):
    
    1295
    -                            self.stage_dependency_artifacts(sandbox, scope)
    
    1296
    -
    
    1297
    -                        # Run any integration commands provided by the dependencies
    
    1298
    -                        # once they are all staged and ready
    
    1299
    -                        if integrate:
    
    1300
    -                            with self.timed_activity("Integrating sandbox"):
    
    1301
    -                                for dep in self.dependencies(scope):
    
    1302
    -                                    dep.integrate(sandbox)
    
    1293
    +                        dependency_scope = Scope.RUN
    
    1294
    +                    else:
    
    1295
    +                        dependency_scope = None
    
    1296
    +
    
    1297
    +                    # Stage deps in the sandbox root
    
    1298
    +                    with self.timed_activity("Staging dependencies", silent_nested=True):
    
    1299
    +                        self.stage_dependency_artifacts(sandbox, dependency_scope)
    
    1300
    +
    
    1301
    +                    # Run any integration commands provided by the dependencies
    
    1302
    +                    # once they are all staged and ready
    
    1303
    +                    if integrate:
    
    1304
    +                        with self.timed_activity("Integrating sandbox"):
    
    1305
    +                            for dep in self.dependencies(dependency_scope):
    
    1306
    +                                dep.integrate(sandbox)
    
    1303 1307
     
    
    1304 1308
                 yield sandbox
    
    1305 1309
     
    

  • buildstream/scriptelement.py
    ... ... @@ -201,16 +201,20 @@ class ScriptElement(Element):
    201 201
             # Setup environment
    
    202 202
             sandbox.set_environment(self.get_environment())
    
    203 203
     
    
    204
    +        # Tell the sandbox to mount the install root
    
    205
    +        directories = {self.__install_root: False}
    
    206
    +
    
    204 207
             # Mark the artifact directories in the layout
    
    205 208
             for item in self.__layout:
    
    206
    -            if item['destination'] != '/':
    
    207
    -                if item['element']:
    
    208
    -                    sandbox.mark_directory(item['destination'], artifact=True)
    
    209
    -                else:
    
    210
    -                    sandbox.mark_directory(item['destination'])
    
    211
    -
    
    212
    -        # Tell the sandbox to mount the install root
    
    213
    -        sandbox.mark_directory(self.__install_root)
    
    209
    +            destination = item['destination']
    
    210
    +            was_artifact = directories.get(destination, False)
    
    211
    +            directories[destination] = item['element'] or was_artifact
    
    212
    +
    
    213
    +        for directory, artifact in directories.items():
    
    214
    +            # Root does not need to be marked as it is always mounted
    
    215
    +            # with artifact (unless explicitly marked non-artifact)
    
    216
    +            if directory != '/':
    
    217
    +                sandbox.mark_directory(directory, artifact=artifact)
    
    214 218
     
    
    215 219
         def stage(self, sandbox):
    
    216 220
     
    

  • tests/frontend/buildcheckout.py
    ... ... @@ -65,9 +65,10 @@ def test_build_checkout(datafiles, cli, strict, hardlinks):
    65 65
     def test_build_checkout_deps(datafiles, cli, deps):
    
    66 66
         project = os.path.join(datafiles.dirname, datafiles.basename)
    
    67 67
         checkout = os.path.join(cli.directory, 'checkout')
    
    68
    +    element_name = "checkout-deps.bst"
    
    68 69
     
    
    69 70
         # First build it
    
    70
    -    result = cli.run(project=project, args=['build', 'target.bst'])
    
    71
    +    result = cli.run(project=project, args=['build', element_name])
    
    71 72
         result.assert_success()
    
    72 73
     
    
    73 74
         # Assert that after a successful build, the builddir is empty
    
    ... ... @@ -76,20 +77,15 @@ def test_build_checkout_deps(datafiles, cli, deps):
    76 77
         assert not os.listdir(builddir)
    
    77 78
     
    
    78 79
         # Now check it out
    
    79
    -    result = cli.run(project=project, args=['checkout', 'target.bst', '--deps', deps, checkout])
    
    80
    +    result = cli.run(project=project, args=['checkout', element_name, '--deps', deps, checkout])
    
    80 81
         result.assert_success()
    
    81 82
     
    
    82
    -    # Check that the executable hello file is found in the checkout
    
    83
    -    filename = os.path.join(checkout, 'usr', 'bin', 'hello')
    
    84
    -
    
    85
    -    if deps == "run":
    
    86
    -        assert os.path.exists(filename)
    
    87
    -    else:
    
    88
    -        assert not os.path.exists(filename)
    
    89
    -
    
    90
    -    # Check that the executable hello file is found in the checkout
    
    91
    -    filename = os.path.join(checkout, 'usr', 'include', 'pony.h')
    
    83
    +    # Verify output of this element
    
    84
    +    filename = os.path.join(checkout, 'etc', 'buildstream', 'config')
    
    85
    +    assert os.path.exists(filename)
    
    92 86
     
    
    87
    +    # Verify output of this element's runtime dependencies
    
    88
    +    filename = os.path.join(checkout, 'usr', 'bin', 'hello')
    
    93 89
         if deps == "run":
    
    94 90
             assert os.path.exists(filename)
    
    95 91
         else:
    

  • tests/frontend/project/elements/checkout-deps.bst
    1
    +kind: import
    
    2
    +description: It is important for this element to have both build and runtime dependencies
    
    3
    +sources:
    
    4
    +- kind: local
    
    5
    +  path: files/etc-files
    
    6
    +depends:
    
    7
    +- filename: import-dev.bst
    
    8
    +  type: build
    
    9
    +- filename: import-bin.bst
    
    10
    +  type: runtime

  • tests/frontend/project/files/etc-files/etc/buildstream/config
    1
    +config

  • tests/frontend/push.py
    ... ... @@ -208,6 +208,8 @@ def test_artifact_expires(cli, datafiles, tmpdir):
    208 208
         # Create an artifact share (remote artifact cache) in the tmpdir/artifactshare
    
    209 209
         # Mock a file system with 12 MB free disk space
    
    210 210
         with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'),
    
    211
    +                               min_head_size=int(2e9),
    
    212
    +                               max_head_size=int(2e9),
    
    211 213
                                    total_space=int(10e9), free_space=(int(12e6) + int(2e9))) as share:
    
    212 214
     
    
    213 215
             # Configure bst to push to the cache
    
    ... ... @@ -291,6 +293,8 @@ def test_recently_pulled_artifact_does_not_expire(cli, datafiles, tmpdir):
    291 293
         # Create an artifact share (remote cache) in tmpdir/artifactshare
    
    292 294
         # Mock a file system with 12 MB free disk space
    
    293 295
         with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'),
    
    296
    +                               min_head_size=int(2e9),
    
    297
    +                               max_head_size=int(2e9),
    
    294 298
                                    total_space=int(10e9), free_space=(int(12e6) + int(2e9))) as share:
    
    295 299
     
    
    296 300
             # Configure bst to push to the cache
    

  • tests/integration/project/elements/script/corruption-2.bst
    1
    +kind: script
    
    2
    +
    
    3
    +depends:
    
    4
    +- filename: base.bst
    
    5
    +  type: build
    
    6
    +- filename: script/corruption-image.bst
    
    7
    +  type: build
    
    8
    +
    
    9
    +config:
    
    10
    +  commands:
    
    11
    +  - echo smashed >>/canary

  • tests/integration/project/elements/script/corruption-image.bst
    1
    +kind: import
    
    2
    +sources:
    
    3
    +- kind: local
    
    4
    +  path: files/canary

  • tests/integration/project/elements/script/corruption-integration.bst
    1
    +kind: stack
    
    2
    +
    
    3
    +public:
    
    4
    +  bst:
    
    5
    +    integration-commands:
    
    6
    +      - echo smashed >>/canary
    
    7
    +

  • tests/integration/project/elements/script/corruption.bst
    1
    +kind: script
    
    2
    +
    
    3
    +depends:
    
    4
    +- filename: base.bst
    
    5
    +  type: build
    
    6
    +- filename: script/corruption-image.bst
    
    7
    +  type: build
    
    8
    +- filename: script/corruption-integration.bst
    
    9
    +  type: build
    
    10
    +
    
    11
    +variables:
    
    12
    +  install-root: "/"
    
    13
    +
    
    14
    +config:
    
    15
    +  layout:
    
    16
    +  - element: base.bst
    
    17
    +    destination: "/"
    
    18
    +  - element: script/corruption-image.bst
    
    19
    +    destination: "/"
    
    20
    +  - element: script/corruption-integration.bst
    
    21
    +    destination: "/"

  • tests/integration/project/elements/script/marked-tmpdir.bst
    1
    +kind: compose
    
    2
    +
    
    3
    +depends:
    
    4
    +- filename: base.bst
    
    5
    +  type: build
    
    6
    +
    
    7
    +public:
    
    8
    +  bst:
    
    9
    +    split-rules:
    
    10
    +      remove:
    
    11
    +        - "/tmp/**"
    
    12
    +        - "/tmp"

  • tests/integration/project/elements/script/no-tmpdir.bst
    1
    +kind: filter
    
    2
    +
    
    3
    +depends:
    
    4
    +- filename: script/marked-tmpdir.bst
    
    5
    +  type: build
    
    6
    +
    
    7
    +config:
    
    8
    +  exclude:
    
    9
    +  - remove
    
    10
    +  include-orphans: True
    
    11
    +
    
    12
    +

  • tests/integration/project/elements/script/tmpdir.bst
    1
    +kind: script
    
    2
    +
    
    3
    +depends:
    
    4
    +- filename: script/no-tmpdir.bst
    
    5
    +  type: build
    
    6
    +
    
    7
    +config:
    
    8
    +  commands:
    
    9
    +  - |
    
    10
    +    mkdir -p /tmp/blah

  • tests/integration/project/files/canary
    1
    +alive

  • tests/integration/script.py
    ... ... @@ -155,3 +155,70 @@ def test_script_layout(cli, tmpdir, datafiles):
    155 155
             text = f.read()
    
    156 156
     
    
    157 157
         assert text == "Hi\n"
    
    158
    +
    
    159
    +
    
    160
    +@pytest.mark.datafiles(DATA_DIR)
    
    161
    +def test_regression_cache_corruption(cli, tmpdir, datafiles):
    
    162
    +    project = str(datafiles)
    
    163
    +    checkout_original = os.path.join(cli.directory, 'checkout-original')
    
    164
    +    checkout_after = os.path.join(cli.directory, 'checkout-after')
    
    165
    +    element_name = 'script/corruption.bst'
    
    166
    +    canary_element_name = 'script/corruption-image.bst'
    
    167
    +
    
    168
    +    res = cli.run(project=project, args=['build', canary_element_name])
    
    169
    +    assert res.exit_code == 0
    
    170
    +
    
    171
    +    res = cli.run(project=project, args=['checkout', canary_element_name,
    
    172
    +                                         checkout_original])
    
    173
    +    assert res.exit_code == 0
    
    174
    +
    
    175
    +    with open(os.path.join(checkout_original, 'canary')) as f:
    
    176
    +        assert f.read() == 'alive\n'
    
    177
    +
    
    178
    +    res = cli.run(project=project, args=['build', element_name])
    
    179
    +    assert res.exit_code == 0
    
    180
    +
    
    181
    +    res = cli.run(project=project, args=['checkout', canary_element_name,
    
    182
    +                                         checkout_after])
    
    183
    +    assert res.exit_code == 0
    
    184
    +
    
    185
    +    with open(os.path.join(checkout_after, 'canary')) as f:
    
    186
    +        assert f.read() == 'alive\n'
    
    187
    +
    
    188
    +
    
    189
    +@pytest.mark.datafiles(DATA_DIR)
    
    190
    +def test_regression_tmpdir(cli, tmpdir, datafiles):
    
    191
    +    project = str(datafiles)
    
    192
    +    element_name = 'script/tmpdir.bst'
    
    193
    +
    
    194
    +    res = cli.run(project=project, args=['build', element_name])
    
    195
    +    assert res.exit_code == 0
    
    196
    +
    
    197
    +
    
    198
    +@pytest.mark.datafiles(DATA_DIR)
    
    199
    +def test_regression_cache_corruption_2(cli, tmpdir, datafiles):
    
    200
    +    project = str(datafiles)
    
    201
    +    checkout_original = os.path.join(cli.directory, 'checkout-original')
    
    202
    +    checkout_after = os.path.join(cli.directory, 'checkout-after')
    
    203
    +    element_name = 'script/corruption-2.bst'
    
    204
    +    canary_element_name = 'script/corruption-image.bst'
    
    205
    +
    
    206
    +    res = cli.run(project=project, args=['build', canary_element_name])
    
    207
    +    assert res.exit_code == 0
    
    208
    +
    
    209
    +    res = cli.run(project=project, args=['checkout', canary_element_name,
    
    210
    +                                         checkout_original])
    
    211
    +    assert res.exit_code == 0
    
    212
    +
    
    213
    +    with open(os.path.join(checkout_original, 'canary')) as f:
    
    214
    +        assert f.read() == 'alive\n'
    
    215
    +
    
    216
    +    res = cli.run(project=project, args=['build', element_name])
    
    217
    +    assert res.exit_code == 0
    
    218
    +
    
    219
    +    res = cli.run(project=project, args=['checkout', canary_element_name,
    
    220
    +                                         checkout_after])
    
    221
    +    assert res.exit_code == 0
    
    222
    +
    
    223
    +    with open(os.path.join(checkout_after, 'canary')) as f:
    
    224
    +        assert f.read() == 'alive\n'

  • tests/testutils/artifactshare.py
    ... ... @@ -29,7 +29,11 @@ from buildstream._exceptions import ArtifactError
    29 29
     #
    
    30 30
     class ArtifactShare():
    
    31 31
     
    
    32
    -    def __init__(self, directory, *, total_space=None, free_space=None):
    
    32
    +    def __init__(self, directory, *,
    
    33
    +                 total_space=None,
    
    34
    +                 free_space=None,
    
    35
    +                 min_head_size=int(2e9),
    
    36
    +                 max_head_size=int(10e9)):
    
    33 37
     
    
    34 38
             # The working directory for the artifact share (in case it
    
    35 39
             # needs to do something outside of it's backend's storage folder).
    
    ... ... @@ -53,6 +57,9 @@ class ArtifactShare():
    53 57
             self.total_space = total_space
    
    54 58
             self.free_space = free_space
    
    55 59
     
    
    60
    +        self.max_head_size = max_head_size
    
    61
    +        self.min_head_size = min_head_size
    
    62
    +
    
    56 63
             q = Queue()
    
    57 64
     
    
    58 65
             self.process = Process(target=self.run, args=(q,))
    
    ... ... @@ -76,7 +83,10 @@ class ArtifactShare():
    76 83
                     self.free_space = self.total_space
    
    77 84
                 os.statvfs = self._mock_statvfs
    
    78 85
     
    
    79
    -        server = create_server(self.repodir, enable_push=True)
    
    86
    +        server = create_server(self.repodir,
    
    87
    +                               max_head_size=self.max_head_size,
    
    88
    +                               min_head_size=self.min_head_size,
    
    89
    +                               enable_push=True)
    
    80 90
             port = server.add_insecure_port('localhost:0')
    
    81 91
     
    
    82 92
             server.start()
    
    ... ... @@ -118,6 +128,15 @@ class ArtifactShare():
    118 128
     
    
    119 129
             try:
    
    120 130
                 tree = self.cas.resolve_ref(artifact_key)
    
    131
    +            reachable = set()
    
    132
    +            try:
    
    133
    +                self.cas._reachable_refs_dir(reachable, tree, update_mtime=False)
    
    134
    +            except FileNotFoundError:
    
    135
    +                return False
    
    136
    +            for digest in reachable:
    
    137
    +                object_name = os.path.join(self.cas.casdir, 'objects', digest[:2], digest[2:])
    
    138
    +                if not os.path.exists(object_name):
    
    139
    +                    return False
    
    121 140
                 return True
    
    122 141
             except ArtifactError:
    
    123 142
                 return False
    
    ... ... @@ -149,8 +168,11 @@ class ArtifactShare():
    149 168
     # Create an ArtifactShare for use in a test case
    
    150 169
     #
    
    151 170
     @contextmanager
    
    152
    -def create_artifact_share(directory, *, total_space=None, free_space=None):
    
    153
    -    share = ArtifactShare(directory, total_space=total_space, free_space=free_space)
    
    171
    +def create_artifact_share(directory, *, total_space=None, free_space=None,
    
    172
    +                          min_head_size=int(2e9),
    
    173
    +                          max_head_size=int(10e9)):
    
    174
    +    share = ArtifactShare(directory, total_space=total_space, free_space=free_space,
    
    175
    +                          min_head_size=min_head_size, max_head_size=max_head_size)
    
    154 176
         try:
    
    155 177
             yield share
    
    156 178
         finally:
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]