[Notes] [Git][BuildStream/buildstream][tpollard/774] 16 commits: Use f_bavail to query available space. Not f_bfree.



Title: GitLab

Tom Pollard pushed to branch tpollard/774 at BuildStream / buildstream

Commits:

18 changed files:

Changes:

  • .gitlab-ci.yml
    1
    -image: buildstream/testsuite-debian:9-master-119-552f5fc6
    
    1
    +image: buildstream/testsuite-debian:9-master-123-7ce6581b
    
    2 2
     
    
    3 3
     cache:
    
    4 4
       key: "$CI_JOB_NAME-"
    
    ... ... @@ -140,7 +140,7 @@ tests-unix:
    140 140
     
    
    141 141
     tests-fedora-missing-deps:
    
    142 142
       # Ensure that tests behave nicely while missing bwrap and ostree
    
    143
    -  image: buildstream/testsuite-fedora:28-master-119-552f5fc6
    
    143
    +  image: buildstream/testsuite-fedora:28-master-123-7ce6581b
    
    144 144
       <<: *tests
    
    145 145
     
    
    146 146
       script:
    

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -25,6 +25,7 @@ import os
    25 25
     import stat
    
    26 26
     import tempfile
    
    27 27
     import uuid
    
    28
    +import contextlib
    
    28 29
     from urllib.parse import urlparse
    
    29 30
     
    
    30 31
     import grpc
    
    ... ... @@ -88,6 +89,13 @@ class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key
    88 89
     CASRemoteSpec.__new__.__defaults__ = (None, None, None)
    
    89 90
     
    
    90 91
     
    
    92
    +class BlobNotFound(CASError):
    
    93
    +
    
    94
    +    def __init__(self, blob, msg):
    
    95
    +        self.blob = blob
    
    96
    +        super().__init__(msg)
    
    97
    +
    
    98
    +
    
    91 99
     # A CASCache manages a CAS repository as specified in the Remote Execution API.
    
    92 100
     #
    
    93 101
     # Args:
    
    ... ... @@ -299,6 +307,8 @@ class CASCache():
    299 307
                     raise CASError("Failed to pull ref {}: {}".format(ref, e)) from e
    
    300 308
                 else:
    
    301 309
                     return False
    
    310
    +        except BlobNotFound as e:
    
    311
    +            return False
    
    302 312
     
    
    303 313
         # pull_tree():
    
    304 314
         #
    
    ... ... @@ -471,13 +481,14 @@ class CASCache():
    471 481
         #     digest (Digest): An optional Digest object to populate
    
    472 482
         #     path (str): Path to file to add
    
    473 483
         #     buffer (bytes): Byte buffer to add
    
    484
    +    #     link_directly (bool): Whether file given by path can be linked
    
    474 485
         #
    
    475 486
         # Returns:
    
    476 487
         #     (Digest): The digest of the added object
    
    477 488
         #
    
    478 489
         # Either `path` or `buffer` must be passed, but not both.
    
    479 490
         #
    
    480
    -    def add_object(self, *, digest=None, path=None, buffer=None):
    
    491
    +    def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False):
    
    481 492
             # Exactly one of the two parameters has to be specified
    
    482 493
             assert (path is None) != (buffer is None)
    
    483 494
     
    
    ... ... @@ -487,28 +498,34 @@ class CASCache():
    487 498
             try:
    
    488 499
                 h = hashlib.sha256()
    
    489 500
                 # Always write out new file to avoid corruption if input file is modified
    
    490
    -            with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
    
    491
    -                # Set mode bits to 0644
    
    492
    -                os.chmod(out.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
    
    493
    -
    
    494
    -                if path:
    
    495
    -                    with open(path, 'rb') as f:
    
    496
    -                        for chunk in iter(lambda: f.read(4096), b""):
    
    497
    -                            h.update(chunk)
    
    498
    -                            out.write(chunk)
    
    501
    +            with contextlib.ExitStack() as stack:
    
    502
    +                if path is not None and link_directly:
    
    503
    +                    tmp = stack.enter_context(open(path, 'rb'))
    
    504
    +                    for chunk in iter(lambda: tmp.read(4096), b""):
    
    505
    +                        h.update(chunk)
    
    499 506
                     else:
    
    500
    -                    h.update(buffer)
    
    501
    -                    out.write(buffer)
    
    507
    +                    tmp = stack.enter_context(tempfile.NamedTemporaryFile(dir=self.tmpdir))
    
    508
    +                    # Set mode bits to 0644
    
    509
    +                    os.chmod(tmp.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
    
    502 510
     
    
    503
    -                out.flush()
    
    511
    +                    if path:
    
    512
    +                        with open(path, 'rb') as f:
    
    513
    +                            for chunk in iter(lambda: f.read(4096), b""):
    
    514
    +                                h.update(chunk)
    
    515
    +                                tmp.write(chunk)
    
    516
    +                    else:
    
    517
    +                        h.update(buffer)
    
    518
    +                        tmp.write(buffer)
    
    519
    +
    
    520
    +                    tmp.flush()
    
    504 521
     
    
    505 522
                     digest.hash = h.hexdigest()
    
    506
    -                digest.size_bytes = os.fstat(out.fileno()).st_size
    
    523
    +                digest.size_bytes = os.fstat(tmp.fileno()).st_size
    
    507 524
     
    
    508 525
                     # Place file at final location
    
    509 526
                     objpath = self.objpath(digest)
    
    510 527
                     os.makedirs(os.path.dirname(objpath), exist_ok=True)
    
    511
    -                os.link(out.name, objpath)
    
    528
    +                os.link(tmp.name, objpath)
    
    512 529
     
    
    513 530
             except FileExistsError as e:
    
    514 531
                 # We can ignore the failed link() if the object is already in the repo.
    
    ... ... @@ -606,6 +623,41 @@ class CASCache():
    606 623
             # first ref of this list will be the file modified earliest.
    
    607 624
             return [ref for _, ref in sorted(zip(mtimes, refs))]
    
    608 625
     
    
    626
    +    # list_objects():
    
    627
    +    #
    
    628
    +    # List cached objects in Least Recently Modified (LRM) order.
    
    629
    +    #
    
    630
    +    # Returns:
    
    631
    +    #     (list) - A list of objects and timestamps in LRM order
    
    632
    +    #
    
    633
    +    def list_objects(self):
    
    634
    +        objs = []
    
    635
    +        mtimes = []
    
    636
    +
    
    637
    +        for root, _, files in os.walk(os.path.join(self.casdir, 'objects')):
    
    638
    +            for filename in files:
    
    639
    +                obj_path = os.path.join(root, filename)
    
    640
    +                try:
    
    641
    +                    mtimes.append(os.path.getmtime(obj_path))
    
    642
    +                except FileNotFoundError:
    
    643
    +                    pass
    
    644
    +                else:
    
    645
    +                    objs.append(obj_path)
    
    646
    +
    
    647
    +        # NOTE: Sorted will sort from earliest to latest, thus the
    
    648
    +        # first element of this list will be the file modified earliest.
    
    649
    +        return sorted(zip(mtimes, objs))
    
    650
    +
    
    651
    +    def clean_up_refs_until(self, time):
    
    652
    +        ref_heads = os.path.join(self.casdir, 'refs', 'heads')
    
    653
    +
    
    654
    +        for root, _, files in os.walk(ref_heads):
    
    655
    +            for filename in files:
    
    656
    +                ref_path = os.path.join(root, filename)
    
    657
    +                # Obtain the mtime (the time a file was last modified)
    
    658
    +                if os.path.getmtime(ref_path) < time:
    
    659
    +                    os.unlink(ref_path)
    
    660
    +
    
    609 661
         # remove():
    
    610 662
         #
    
    611 663
         # Removes the given symbolic ref from the repo.
    
    ... ... @@ -665,6 +717,10 @@ class CASCache():
    665 717
     
    
    666 718
             return pruned
    
    667 719
     
    
    720
    +    def update_tree_mtime(self, tree):
    
    721
    +        reachable = set()
    
    722
    +        self._reachable_refs_dir(reachable, tree, update_mtime=True)
    
    723
    +
    
    668 724
         ################################################
    
    669 725
         #             Local Private Methods            #
    
    670 726
         ################################################
    
    ... ... @@ -811,10 +867,13 @@ class CASCache():
    811 867
                     a += 1
    
    812 868
                     b += 1
    
    813 869
     
    
    814
    -    def _reachable_refs_dir(self, reachable, tree):
    
    870
    +    def _reachable_refs_dir(self, reachable, tree, update_mtime=False):
    
    815 871
             if tree.hash in reachable:
    
    816 872
                 return
    
    817 873
     
    
    874
    +        if update_mtime:
    
    875
    +            os.utime(self.objpath(tree))
    
    876
    +
    
    818 877
             reachable.add(tree.hash)
    
    819 878
     
    
    820 879
             directory = remote_execution_pb2.Directory()
    
    ... ... @@ -823,10 +882,12 @@ class CASCache():
    823 882
                 directory.ParseFromString(f.read())
    
    824 883
     
    
    825 884
             for filenode in directory.files:
    
    885
    +            if update_mtime:
    
    886
    +                os.utime(self.objpath(filenode.digest))
    
    826 887
                 reachable.add(filenode.digest.hash)
    
    827 888
     
    
    828 889
             for dirnode in directory.directories:
    
    829
    -            self._reachable_refs_dir(reachable, dirnode.digest)
    
    890
    +            self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime)
    
    830 891
     
    
    831 892
         def _required_blobs(self, directory_digest):
    
    832 893
             # parse directory, and recursively add blobs
    
    ... ... @@ -880,7 +941,7 @@ class CASCache():
    880 941
             with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
    
    881 942
                 self._fetch_blob(remote, digest, f)
    
    882 943
     
    
    883
    -            added_digest = self.add_object(path=f.name)
    
    944
    +            added_digest = self.add_object(path=f.name, link_directly=True)
    
    884 945
                 assert added_digest.hash == digest.hash
    
    885 946
     
    
    886 947
             return objpath
    
    ... ... @@ -891,7 +952,7 @@ class CASCache():
    891 952
                     f.write(data)
    
    892 953
                     f.flush()
    
    893 954
     
    
    894
    -                added_digest = self.add_object(path=f.name)
    
    955
    +                added_digest = self.add_object(path=f.name, link_directly=True)
    
    895 956
                     assert added_digest.hash == digest.hash
    
    896 957
     
    
    897 958
         # Helper function for _fetch_directory().
    
    ... ... @@ -1203,6 +1264,9 @@ class _CASBatchRead():
    1203 1264
             batch_response = self._remote.cas.BatchReadBlobs(self._request)
    
    1204 1265
     
    
    1205 1266
             for response in batch_response.responses:
    
    1267
    +            if response.status.code == code_pb2.NOT_FOUND:
    
    1268
    +                raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format(
    
    1269
    +                    response.digest.hash, response.status.code))
    
    1206 1270
                 if response.status.code != code_pb2.OK:
    
    1207 1271
                     raise CASError("Failed to download blob {}: {}".format(
    
    1208 1272
                         response.digest.hash, response.status.code))
    

  • buildstream/_artifactcache/casserver.py
    ... ... @@ -24,6 +24,8 @@ import signal
    24 24
     import sys
    
    25 25
     import tempfile
    
    26 26
     import uuid
    
    27
    +import errno
    
    28
    +import threading
    
    27 29
     
    
    28 30
     import click
    
    29 31
     import grpc
    
    ... ... @@ -31,6 +33,7 @@ import grpc
    31 33
     from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    32 34
     from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    33 35
     from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
    
    36
    +from .._protos.google.rpc import code_pb2
    
    34 37
     
    
    35 38
     from .._exceptions import CASError
    
    36 39
     
    
    ... ... @@ -55,18 +58,22 @@ class ArtifactTooLargeException(Exception):
    55 58
     #     repo (str): Path to CAS repository
    
    56 59
     #     enable_push (bool): Whether to allow blob uploads and artifact updates
    
    57 60
     #
    
    58
    -def create_server(repo, *, enable_push):
    
    61
    +def create_server(repo, *, enable_push,
    
    62
    +                  max_head_size=int(10e9),
    
    63
    +                  min_head_size=int(2e9)):
    
    59 64
         cas = CASCache(os.path.abspath(repo))
    
    60 65
     
    
    61 66
         # Use max_workers default from Python 3.5+
    
    62 67
         max_workers = (os.cpu_count() or 1) * 5
    
    63 68
         server = grpc.server(futures.ThreadPoolExecutor(max_workers))
    
    64 69
     
    
    70
    +    cache_cleaner = _CacheCleaner(cas, max_head_size, min_head_size)
    
    71
    +
    
    65 72
         bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
    
    66
    -        _ByteStreamServicer(cas, enable_push=enable_push), server)
    
    73
    +        _ByteStreamServicer(cas, cache_cleaner, enable_push=enable_push), server)
    
    67 74
     
    
    68 75
         remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
    
    69
    -        _ContentAddressableStorageServicer(cas, enable_push=enable_push), server)
    
    76
    +        _ContentAddressableStorageServicer(cas, cache_cleaner, enable_push=enable_push), server)
    
    70 77
     
    
    71 78
         remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
    
    72 79
             _CapabilitiesServicer(), server)
    
    ... ... @@ -84,9 +91,19 @@ def create_server(repo, *, enable_push):
    84 91
     @click.option('--client-certs', help="Public client certificates for TLS (PEM-encoded)")
    
    85 92
     @click.option('--enable-push', default=False, is_flag=True,
    
    86 93
                   help="Allow clients to upload blobs and update artifact cache")
    
    94
    +@click.option('--head-room-min', type=click.INT,
    
    95
    +              help="Disk head room minimum in bytes",
    
    96
    +              default=2e9)
    
    97
    +@click.option('--head-room-max', type=click.INT,
    
    98
    +              help="Disk head room maximum in bytes",
    
    99
    +              default=10e9)
    
    87 100
     @click.argument('repo')
    
    88
    -def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
    
    89
    -    server = create_server(repo, enable_push=enable_push)
    
    101
    +def server_main(repo, port, server_key, server_cert, client_certs, enable_push,
    
    102
    +                head_room_min, head_room_max):
    
    103
    +    server = create_server(repo,
    
    104
    +                           max_head_size=head_room_max,
    
    105
    +                           min_head_size=head_room_min,
    
    106
    +                           enable_push=enable_push)
    
    90 107
     
    
    91 108
         use_tls = bool(server_key)
    
    92 109
     
    
    ... ... @@ -128,10 +145,11 @@ def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
    128 145
     
    
    129 146
     
    
    130 147
     class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    
    131
    -    def __init__(self, cas, *, enable_push):
    
    148
    +    def __init__(self, cas, cache_cleaner, *, enable_push):
    
    132 149
             super().__init__()
    
    133 150
             self.cas = cas
    
    134 151
             self.enable_push = enable_push
    
    152
    +        self.cache_cleaner = cache_cleaner
    
    135 153
     
    
    136 154
         def Read(self, request, context):
    
    137 155
             resource_name = request.resource_name
    
    ... ... @@ -189,17 +207,34 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    189 207
                             context.set_code(grpc.StatusCode.NOT_FOUND)
    
    190 208
                             return response
    
    191 209
     
    
    192
    -                    try:
    
    193
    -                        _clean_up_cache(self.cas, client_digest.size_bytes)
    
    194
    -                    except ArtifactTooLargeException as e:
    
    195
    -                        context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
    
    196
    -                        context.set_details(str(e))
    
    197
    -                        return response
    
    210
    +                    while True:
    
    211
    +                        if client_digest.size_bytes == 0:
    
    212
    +                            break
    
    213
    +                        try:
    
    214
    +                            self.cache_cleaner.clean_up(client_digest.size_bytes)
    
    215
    +                        except ArtifactTooLargeException as e:
    
    216
    +                            context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
    
    217
    +                            context.set_details(str(e))
    
    218
    +                            return response
    
    219
    +
    
    220
    +                        try:
    
    221
    +                            os.posix_fallocate(out.fileno(), 0, client_digest.size_bytes)
    
    222
    +                            break
    
    223
    +                        except OSError as e:
    
    224
    +                            # Multiple upload can happen in the same time
    
    225
    +                            if e.errno != errno.ENOSPC:
    
    226
    +                                raise
    
    227
    +
    
    198 228
                     elif request.resource_name:
    
    199 229
                         # If it is set on subsequent calls, it **must** match the value of the first request.
    
    200 230
                         if request.resource_name != resource_name:
    
    201 231
                             context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    202 232
                             return response
    
    233
    +
    
    234
    +                if (offset + len(request.data)) > client_digest.size_bytes:
    
    235
    +                    context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    236
    +                    return response
    
    237
    +
    
    203 238
                     out.write(request.data)
    
    204 239
                     offset += len(request.data)
    
    205 240
                     if request.finish_write:
    
    ... ... @@ -207,7 +242,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    207 242
                             context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    208 243
                             return response
    
    209 244
                         out.flush()
    
    210
    -                    digest = self.cas.add_object(path=out.name)
    
    245
    +                    digest = self.cas.add_object(path=out.name, link_directly=True)
    
    211 246
                         if digest.hash != client_digest.hash:
    
    212 247
                             context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    213 248
                             return response
    
    ... ... @@ -220,18 +255,26 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    220 255
     
    
    221 256
     
    
    222 257
     class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
    
    223
    -    def __init__(self, cas, *, enable_push):
    
    258
    +    def __init__(self, cas, cache_cleaner, *, enable_push):
    
    224 259
             super().__init__()
    
    225 260
             self.cas = cas
    
    226 261
             self.enable_push = enable_push
    
    262
    +        self.cache_cleaner = cache_cleaner
    
    227 263
     
    
    228 264
         def FindMissingBlobs(self, request, context):
    
    229 265
             response = remote_execution_pb2.FindMissingBlobsResponse()
    
    230 266
             for digest in request.blob_digests:
    
    231
    -            if not _has_object(self.cas, digest):
    
    232
    -                d = response.missing_blob_digests.add()
    
    233
    -                d.hash = digest.hash
    
    234
    -                d.size_bytes = digest.size_bytes
    
    267
    +            objpath = self.cas.objpath(digest)
    
    268
    +            try:
    
    269
    +                os.utime(objpath)
    
    270
    +            except OSError as e:
    
    271
    +                if e.errno != errno.ENOENT:
    
    272
    +                    raise
    
    273
    +                else:
    
    274
    +                    d = response.missing_blob_digests.add()
    
    275
    +                    d.hash = digest.hash
    
    276
    +                    d.size_bytes = digest.size_bytes
    
    277
    +
    
    235 278
             return response
    
    236 279
     
    
    237 280
         def BatchReadBlobs(self, request, context):
    
    ... ... @@ -250,12 +293,12 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
    250 293
                 try:
    
    251 294
                     with open(self.cas.objpath(digest), 'rb') as f:
    
    252 295
                         if os.fstat(f.fileno()).st_size != digest.size_bytes:
    
    253
    -                        blob_response.status.code = grpc.StatusCode.NOT_FOUND
    
    296
    +                        blob_response.status.code = code_pb2.NOT_FOUND
    
    254 297
                             continue
    
    255 298
     
    
    256 299
                         blob_response.data = f.read(digest.size_bytes)
    
    257 300
                 except FileNotFoundError:
    
    258
    -                blob_response.status.code = grpc.StatusCode.NOT_FOUND
    
    301
    +                blob_response.status.code = code_pb2.NOT_FOUND
    
    259 302
     
    
    260 303
             return response
    
    261 304
     
    
    ... ... @@ -285,7 +328,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
    285 328
                     continue
    
    286 329
     
    
    287 330
                 try:
    
    288
    -                _clean_up_cache(self.cas, digest.size_bytes)
    
    331
    +                self.cache_cleaner.clean_up(digest.size_bytes)
    
    289 332
     
    
    290 333
                     with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
    
    291 334
                         out.write(blob_request.data)
    
    ... ... @@ -328,6 +371,12 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
    328 371
     
    
    329 372
             try:
    
    330 373
                 tree = self.cas.resolve_ref(request.key, update_mtime=True)
    
    374
    +            try:
    
    375
    +                self.cas.update_tree_mtime(tree)
    
    376
    +            except FileNotFoundError:
    
    377
    +                self.cas.remove(request.key, defer_prune=True)
    
    378
    +                context.set_code(grpc.StatusCode.NOT_FOUND)
    
    379
    +                return response
    
    331 380
     
    
    332 381
                 response.digest.hash = tree.hash
    
    333 382
                 response.digest.size_bytes = tree.size_bytes
    
    ... ... @@ -400,60 +449,79 @@ def _digest_from_upload_resource_name(resource_name):
    400 449
             return None
    
    401 450
     
    
    402 451
     
    
    403
    -def _has_object(cas, digest):
    
    404
    -    objpath = cas.objpath(digest)
    
    405
    -    return os.path.exists(objpath)
    
    452
    +class _CacheCleaner:
    
    406 453
     
    
    454
    +    __cleanup_cache_lock = threading.Lock()
    
    407 455
     
    
    408
    -# _clean_up_cache()
    
    409
    -#
    
    410
    -# Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
    
    411
    -# is enough space for the incoming artifact
    
    412
    -#
    
    413
    -# Args:
    
    414
    -#   cas: CASCache object
    
    415
    -#   object_size: The size of the object being received in bytes
    
    416
    -#
    
    417
    -# Returns:
    
    418
    -#   int: The total bytes removed on the filesystem
    
    419
    -#
    
    420
    -def _clean_up_cache(cas, object_size):
    
    421
    -    # Determine the available disk space, in bytes, of the file system
    
    422
    -    # which mounts the repo
    
    423
    -    stats = os.statvfs(cas.casdir)
    
    424
    -    buffer_ = int(2e9)                # Add a 2 GB buffer
    
    425
    -    free_disk_space = (stats.f_bfree * stats.f_bsize) - buffer_
    
    426
    -    total_disk_space = (stats.f_blocks * stats.f_bsize) - buffer_
    
    427
    -
    
    428
    -    if object_size > total_disk_space:
    
    429
    -        raise ArtifactTooLargeException("Artifact of size: {} is too large for "
    
    430
    -                                        "the filesystem which mounts the remote "
    
    431
    -                                        "cache".format(object_size))
    
    432
    -
    
    433
    -    if object_size <= free_disk_space:
    
    434
    -        # No need to clean up
    
    435
    -        return 0
    
    436
    -
    
    437
    -    # obtain a list of LRP artifacts
    
    438
    -    LRP_artifacts = cas.list_refs()
    
    439
    -
    
    440
    -    removed_size = 0  # in bytes
    
    441
    -    while object_size - removed_size > free_disk_space:
    
    442
    -        try:
    
    443
    -            to_remove = LRP_artifacts.pop(0)  # The first element in the list is the LRP artifact
    
    444
    -        except IndexError:
    
    445
    -            # This exception is caught if there are no more artifacts in the list
    
    446
    -            # LRP_artifacts. This means the the artifact is too large for the filesystem
    
    447
    -            # so we abort the process
    
    448
    -            raise ArtifactTooLargeException("Artifact of size {} is too large for "
    
    449
    -                                            "the filesystem which mounts the remote "
    
    450
    -                                            "cache".format(object_size))
    
    456
    +    def __init__(self, cas, max_head_size, min_head_size=int(2e9)):
    
    457
    +        self.__cas = cas
    
    458
    +        self.__max_head_size = max_head_size
    
    459
    +        self.__min_head_size = min_head_size
    
    451 460
     
    
    452
    -        removed_size += cas.remove(to_remove, defer_prune=False)
    
    461
    +    def __has_space(self, object_size):
    
    462
    +        stats = os.statvfs(self.__cas.casdir)
    
    463
    +        free_disk_space = (stats.f_bavail * stats.f_bsize) - self.__min_head_size
    
    464
    +        total_disk_space = (stats.f_blocks * stats.f_bsize) - self.__min_head_size
    
    453 465
     
    
    454
    -    if removed_size > 0:
    
    455
    -        logging.info("Successfully removed {} bytes from the cache".format(removed_size))
    
    456
    -    else:
    
    457
    -        logging.info("No artifacts were removed from the cache.")
    
    466
    +        if object_size > total_disk_space:
    
    467
    +            raise ArtifactTooLargeException("Artifact of size: {} is too large for "
    
    468
    +                                            "the filesystem which mounts the remote "
    
    469
    +                                            "cache".format(object_size))
    
    458 470
     
    
    459
    -    return removed_size
    471
    +        return object_size <= free_disk_space
    
    472
    +
    
    473
    +    # _clean_up_cache()
    
    474
    +    #
    
    475
    +    # Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
    
    476
    +    # is enough space for the incoming artifact
    
    477
    +    #
    
    478
    +    # Args:
    
    479
    +    #   object_size: The size of the object being received in bytes
    
    480
    +    #
    
    481
    +    # Returns:
    
    482
    +    #   int: The total bytes removed on the filesystem
    
    483
    +    #
    
    484
    +    def clean_up(self, object_size):
    
    485
    +        if self.__has_space(object_size):
    
    486
    +            return 0
    
    487
    +
    
    488
    +        with _CacheCleaner.__cleanup_cache_lock:
    
    489
    +            if self.__has_space(object_size):
    
    490
    +                # Another thread has done the cleanup for us
    
    491
    +                return 0
    
    492
    +
    
    493
    +            stats = os.statvfs(self.__cas.casdir)
    
    494
    +            target_disk_space = (stats.f_bavail * stats.f_bsize) - self.__max_head_size
    
    495
    +
    
    496
    +            # obtain a list of LRP artifacts
    
    497
    +            LRP_objects = self.__cas.list_objects()
    
    498
    +
    
    499
    +            removed_size = 0  # in bytes
    
    500
    +            last_mtime = 0
    
    501
    +
    
    502
    +            while object_size - removed_size > target_disk_space:
    
    503
    +                try:
    
    504
    +                    last_mtime, to_remove = LRP_objects.pop(0)  # The first element in the list is the LRP artifact
    
    505
    +                except IndexError:
    
    506
    +                    # This exception is caught if there are no more artifacts in the list
    
    507
    +                    # LRP_artifacts. This means the the artifact is too large for the filesystem
    
    508
    +                    # so we abort the process
    
    509
    +                    raise ArtifactTooLargeException("Artifact of size {} is too large for "
    
    510
    +                                                    "the filesystem which mounts the remote "
    
    511
    +                                                    "cache".format(object_size))
    
    512
    +
    
    513
    +                try:
    
    514
    +                    size = os.stat(to_remove).st_size
    
    515
    +                    os.unlink(to_remove)
    
    516
    +                    removed_size += size
    
    517
    +                except FileNotFoundError:
    
    518
    +                    pass
    
    519
    +
    
    520
    +            self.__cas.clean_up_refs_until(last_mtime)
    
    521
    +
    
    522
    +            if removed_size > 0:
    
    523
    +                logging.info("Successfully removed {} bytes from the cache".format(removed_size))
    
    524
    +            else:
    
    525
    +                logging.info("No artifacts were removed from the cache.")
    
    526
    +
    
    527
    +            return removed_size

  • buildstream/_frontend/cli.py
    ... ... @@ -469,6 +469,10 @@ def push(app, elements, deps, remote):
    469 469
         The default destination is the highest priority configured cache. You can
    
    470 470
         override this by passing a different cache URL with the `--remote` flag.
    
    471 471
     
    
    472
    +    If bst has been configured to include build trees on artifact pulls,
    
    473
    +    an attempt will be made to pull any required build trees to avoid the
    
    474
    +    skipping of partial artifacts being pushed.
    
    475
    +
    
    472 476
         Specify `--deps` to control which artifacts to push:
    
    473 477
     
    
    474 478
         \b
    

  • buildstream/_stream.py
    ... ... @@ -327,6 +327,10 @@ class Stream():
    327 327
         # If `remote` specified as None, then regular configuration will be used
    
    328 328
         # to determine where to push artifacts to.
    
    329 329
         #
    
    330
    +    # If any of the given targets are missing their expected buildtree artifact,
    
    331
    +    # a pull queue will be created if user context and availavble remotes allow for
    
    332
    +    # attempting to fetch them.
    
    333
    +    #
    
    330 334
         def push(self, targets, *,
    
    331 335
                  selection=PipelineSelection.NONE,
    
    332 336
                  remote=None):
    
    ... ... @@ -345,8 +349,17 @@ class Stream():
    345 349
                 raise StreamError("No artifact caches available for pushing artifacts")
    
    346 350
     
    
    347 351
             self._pipeline.assert_consistent(elements)
    
    348
    -        self._add_queue(PushQueue(self._scheduler))
    
    349
    -        self._enqueue_plan(elements)
    
    352
    +
    
    353
    +        # Check if we require a pull queue, with given artifact state and context
    
    354
    +        require_buildtrees = self._buildtree_pull_required(elements)
    
    355
    +        if require_buildtrees:
    
    356
    +            self._message(MessageType.INFO, "Attempting to fetch missing artifact buildtrees")
    
    357
    +            self._add_queue(PullQueue(self._scheduler))
    
    358
    +            self._enqueue_plan(require_buildtrees)
    
    359
    +
    
    360
    +        push_queue = PushQueue(self._scheduler)
    
    361
    +        self._add_queue(push_queue)
    
    362
    +        self._enqueue_plan(elements, queue=push_queue)
    
    350 363
             self._run()
    
    351 364
     
    
    352 365
         # checkout()
    
    ... ... @@ -1237,3 +1250,26 @@ class Stream():
    1237 1250
                 parts.append(element.normal_name)
    
    1238 1251
     
    
    1239 1252
             return os.path.join(directory, *reversed(parts))
    
    1253
    +
    
    1254
    +    # _buildtree_pull_required()
    
    1255
    +    #
    
    1256
    +    # Check if current task, given config, requires element buildtree artifact
    
    1257
    +    #
    
    1258
    +    # Args:
    
    1259
    +    #    elements (list): elements to check if buildtrees are required
    
    1260
    +    #
    
    1261
    +    # Returns:
    
    1262
    +    #    (list): elements requiring buildtrees
    
    1263
    +    #
    
    1264
    +    def _buildtree_pull_required(self, elements):
    
    1265
    +        required_list = []
    
    1266
    +
    
    1267
    +        # If context is set to not pull buildtrees, or no fetch remotes, return empty list
    
    1268
    +        if not (self._context.pull_buildtrees or self._artifacts.has_fetch_remotes()):
    
    1269
    +            return required_list
    
    1270
    +
    
    1271
    +        for element in elements:
    
    1272
    +            if not element._cached_buildtree():
    
    1273
    +                required_list.append(element)
    
    1274
    +
    
    1275
    +        return required_list

  • buildstream/element.py
    ... ... @@ -1998,6 +1998,17 @@ class Element(Plugin):
    1998 1998
         def _get_source_element(self):
    
    1999 1999
             return self
    
    2000 2000
     
    
    2001
    +    # _cached_buildtree()
    
    2002
    +    #
    
    2003
    +    # Check if the element has an expected cached buildtree artifact
    
    2004
    +    #
    
    2005
    +    # Returns:
    
    2006
    +    #     (bool): True if artifact cached with buildtree, False if
    
    2007
    +    #             element not cached or missing expected buildtree
    
    2008
    +    #
    
    2009
    +    def _cached_buildtree(self):
    
    2010
    +        return self.__cached_buildtree()
    
    2011
    +
    
    2001 2012
         #############################################################
    
    2002 2013
         #                   Private Local Methods                   #
    
    2003 2014
         #############################################################
    
    ... ... @@ -2777,10 +2788,10 @@ class Element(Plugin):
    2777 2788
     
    
    2778 2789
             if not self._cached():
    
    2779 2790
                 return False
    
    2780
    -        elif context.get_strict():
    
    2781
    -            if not self.__artifacts.contains_subdir_artifact(self, self.__strict_cache_key, 'buildtree'):
    
    2782
    -                return False
    
    2783
    -        elif not self.__artifacts.contains_subdir_artifact(self, self.__weak_cache_key, 'buildtree'):
    
    2791
    +
    
    2792
    +        key_strength = _KeyStrength.STRONG if context.get_strict() else _KeyStrength.WEAK
    
    2793
    +        if not self.__artifacts.contains_subdir_artifact(self, self._get_cache_key(strength=key_strength),
    
    2794
    +                                                         'buildtree'):
    
    2784 2795
                 return False
    
    2785 2796
     
    
    2786 2797
             return True
    

  • buildstream/plugins/sources/_downloadablefilesource.py
    ... ... @@ -5,16 +5,77 @@ import urllib.request
    5 5
     import urllib.error
    
    6 6
     import contextlib
    
    7 7
     import shutil
    
    8
    +import netrc
    
    8 9
     
    
    9 10
     from buildstream import Source, SourceError, Consistency
    
    10 11
     from buildstream import utils
    
    11 12
     
    
    12 13
     
    
    14
    +class _NetrcFTPOpener(urllib.request.FTPHandler):
    
    15
    +
    
    16
    +    def __init__(self, netrc_config):
    
    17
    +        self.netrc = netrc_config
    
    18
    +
    
    19
    +    def _split(self, netloc):
    
    20
    +        userpass, hostport = urllib.parse.splituser(netloc)
    
    21
    +        host, port = urllib.parse.splitport(hostport)
    
    22
    +        if userpass:
    
    23
    +            user, passwd = urllib.parse.splitpasswd(userpass)
    
    24
    +        else:
    
    25
    +            user = None
    
    26
    +            passwd = None
    
    27
    +        return host, port, user, passwd
    
    28
    +
    
    29
    +    def _unsplit(self, host, port, user, passwd):
    
    30
    +        if port:
    
    31
    +            host = '{}:{}'.format(host, port)
    
    32
    +        if user:
    
    33
    +            if passwd:
    
    34
    +                user = '{}:{}'.format(user, passwd)
    
    35
    +            host = '{}@{}'.format(user, host)
    
    36
    +
    
    37
    +        return host
    
    38
    +
    
    39
    +    def ftp_open(self, req):
    
    40
    +        host, port, user, passwd = self._split(req.host)
    
    41
    +
    
    42
    +        if user is None and self.netrc:
    
    43
    +            entry = self.netrc.authenticators(host)
    
    44
    +            if entry:
    
    45
    +                user, _, passwd = entry
    
    46
    +
    
    47
    +        req.host = self._unsplit(host, port, user, passwd)
    
    48
    +
    
    49
    +        return super().ftp_open(req)
    
    50
    +
    
    51
    +
    
    52
    +class _NetrcPasswordManager:
    
    53
    +
    
    54
    +    def __init__(self, netrc_config):
    
    55
    +        self.netrc = netrc_config
    
    56
    +
    
    57
    +    def add_password(self, realm, uri, user, passwd):
    
    58
    +        pass
    
    59
    +
    
    60
    +    def find_user_password(self, realm, authuri):
    
    61
    +        if not self.netrc:
    
    62
    +            return None, None
    
    63
    +        parts = urllib.parse.urlsplit(authuri)
    
    64
    +        entry = self.netrc.authenticators(parts.hostname)
    
    65
    +        if not entry:
    
    66
    +            return None, None
    
    67
    +        else:
    
    68
    +            login, _, password = entry
    
    69
    +            return login, password
    
    70
    +
    
    71
    +
    
    13 72
     class DownloadableFileSource(Source):
    
    14 73
         # pylint: disable=attribute-defined-outside-init
    
    15 74
     
    
    16 75
         COMMON_CONFIG_KEYS = Source.COMMON_CONFIG_KEYS + ['url', 'ref', 'etag']
    
    17 76
     
    
    77
    +    __urlopener = None
    
    78
    +
    
    18 79
         def configure(self, node):
    
    19 80
             self.original_url = self.node_get_member(node, str, 'url')
    
    20 81
             self.ref = self.node_get_member(node, str, 'ref', None)
    
    ... ... @@ -118,7 +179,8 @@ class DownloadableFileSource(Source):
    118 179
                         if etag and self.get_consistency() == Consistency.CACHED:
    
    119 180
                             request.add_header('If-None-Match', etag)
    
    120 181
     
    
    121
    -                with contextlib.closing(urllib.request.urlopen(request)) as response:
    
    182
    +                opener = self.__get_urlopener()
    
    183
    +                with contextlib.closing(opener.open(request)) as response:
    
    122 184
                         info = response.info()
    
    123 185
     
    
    124 186
                         etag = info['ETag'] if 'ETag' in info else None
    
    ... ... @@ -164,3 +226,19 @@ class DownloadableFileSource(Source):
    164 226
     
    
    165 227
         def _get_mirror_file(self, sha=None):
    
    166 228
             return os.path.join(self._get_mirror_dir(), sha or self.ref)
    
    229
    +
    
    230
    +    def __get_urlopener(self):
    
    231
    +        if not DownloadableFileSource.__urlopener:
    
    232
    +            try:
    
    233
    +                netrc_config = netrc.netrc()
    
    234
    +            except FileNotFoundError:
    
    235
    +                DownloadableFileSource.__urlopener = urllib.request.build_opener()
    
    236
    +            except netrc.NetrcParseError as e:
    
    237
    +                self.warn('{}: While reading .netrc: {}'.format(self, e))
    
    238
    +                return urllib.request.build_opener()
    
    239
    +            else:
    
    240
    +                netrc_pw_mgr = _NetrcPasswordManager(netrc_config)
    
    241
    +                http_auth = urllib.request.HTTPBasicAuthHandler(netrc_pw_mgr)
    
    242
    +                ftp_handler = _NetrcFTPOpener(netrc_config)
    
    243
    +                DownloadableFileSource.__urlopener = urllib.request.build_opener(http_auth, ftp_handler)
    
    244
    +        return DownloadableFileSource.__urlopener

  • buildstream/sandbox/_sandboxremote.py
    ... ... @@ -139,8 +139,7 @@ class SandboxRemote(Sandbox):
    139 139
     
    
    140 140
             # Upload the Command message to the remote CAS server
    
    141 141
             command_digest = cascache.push_message(casremote, remote_command)
    
    142
    -        if not command_digest or not cascache.verify_digest_on_remote(casremote, command_digest):
    
    143
    -            raise SandboxError("Failed pushing build command to remote CAS.")
    
    142
    +
    
    144 143
             # Create and send the action.
    
    145 144
             action = remote_execution_pb2.Action(command_digest=command_digest,
    
    146 145
                                                  input_root_digest=input_root_digest,
    
    ... ... @@ -149,8 +148,6 @@ class SandboxRemote(Sandbox):
    149 148
     
    
    150 149
             # Upload the Action message to the remote CAS server
    
    151 150
             action_digest = cascache.push_message(casremote, action)
    
    152
    -        if not action_digest or not cascache.verify_digest_on_remote(casremote, action_digest):
    
    153
    -            raise SandboxError("Failed pushing build action to remote CAS.")
    
    154 151
     
    
    155 152
             # Next, try to create a communication channel to the BuildGrid server.
    
    156 153
             url = urlparse(self.exec_url)
    
    ... ... @@ -299,15 +296,11 @@ class SandboxRemote(Sandbox):
    299 296
     
    
    300 297
             casremote = CASRemote(self.storage_remote_spec)
    
    301 298
             # Now, push that key (without necessarily needing a ref) to the remote.
    
    302
    -
    
    303 299
             try:
    
    304 300
                 cascache.push_directory(casremote, upload_vdir)
    
    305 301
             except grpc.RpcError as e:
    
    306 302
                 raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
    
    307 303
     
    
    308
    -        if not cascache.verify_digest_on_remote(casremote, upload_vdir.ref):
    
    309
    -            raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
    
    310
    -
    
    311 304
             # Now transmit the command to execute
    
    312 305
             operation = self.run_remote_command(command, upload_vdir.ref, cwd, env)
    
    313 306
     
    

  • dev-requirements.txt
    ... ... @@ -9,3 +9,4 @@ pytest-pep8
    9 9
     pytest-pylint
    
    10 10
     pytest-xdist
    
    11 11
     pytest-timeout
    
    12
    +pyftpdlib

  • tests/frontend/push.py
    ... ... @@ -230,6 +230,8 @@ def test_artifact_expires(cli, datafiles, tmpdir):
    230 230
         # Create an artifact share (remote artifact cache) in the tmpdir/artifactshare
    
    231 231
         # Mock a file system with 12 MB free disk space
    
    232 232
         with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'),
    
    233
    +                               min_head_size=int(2e9),
    
    234
    +                               max_head_size=int(2e9),
    
    233 235
                                    total_space=int(10e9), free_space=(int(12e6) + int(2e9))) as share:
    
    234 236
     
    
    235 237
             # Configure bst to push to the cache
    
    ... ... @@ -313,6 +315,8 @@ def test_recently_pulled_artifact_does_not_expire(cli, datafiles, tmpdir):
    313 315
         # Create an artifact share (remote cache) in tmpdir/artifactshare
    
    314 316
         # Mock a file system with 12 MB free disk space
    
    315 317
         with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'),
    
    318
    +                               min_head_size=int(2e9),
    
    319
    +                               max_head_size=int(2e9),
    
    316 320
                                    total_space=int(10e9), free_space=(int(12e6) + int(2e9))) as share:
    
    317 321
     
    
    318 322
             # Configure bst to push to the cache
    

  • tests/integration/pullbuildtrees.py
    ... ... @@ -38,7 +38,8 @@ def test_pullbuildtrees(cli, tmpdir, datafiles, integration_cache):
    38 38
     
    
    39 39
         # Create artifact shares for pull & push testing
    
    40 40
         with create_artifact_share(os.path.join(str(tmpdir), 'share1')) as share1,\
    
    41
    -        create_artifact_share(os.path.join(str(tmpdir), 'share2')) as share2:
    
    41
    +        create_artifact_share(os.path.join(str(tmpdir), 'share2')) as share2,\
    
    42
    +        create_artifact_share(os.path.join(str(tmpdir), 'share3')) as share3:
    
    42 43
             cli.configure({
    
    43 44
                 'artifacts': {'url': share1.repo, 'push': True},
    
    44 45
                 'artifactdir': os.path.join(str(tmpdir), 'artifacts')
    
    ... ... @@ -123,6 +124,32 @@ def test_pullbuildtrees(cli, tmpdir, datafiles, integration_cache):
    123 124
             assert share2.has_artifact('test', element_name, cli.get_element_key(project, element_name))
    
    124 125
             default_state(cli, tmpdir, share1)
    
    125 126
     
    
    127
    +        # Assert that bst push will automatically attempt to pull a missing buildtree
    
    128
    +        # if pull-buildtrees is set, however as share3 is the only defined remote and is empty,
    
    129
    +        # assert that no element artifact buildtrees are pulled (no available remote buildtree) and thus the
    
    130
    +        # artifact cannot be pushed.
    
    131
    +        result = cli.run(project=project, args=['pull', element_name])
    
    132
    +        assert element_name in result.get_pulled_elements()
    
    133
    +        cli.configure({'artifacts': {'url': share3.repo, 'push': True}})
    
    134
    +        result = cli.run(project=project, args=['--pull-buildtrees', 'push', element_name])
    
    135
    +        assert "Attempting to fetch missing artifact buildtrees" in result.stderr
    
    136
    +        assert element_name not in result.get_pulled_elements()
    
    137
    +        assert not os.path.isdir(buildtreedir)
    
    138
    +        assert element_name not in result.get_pushed_elements()
    
    139
    +        assert not share3.has_artifact('test', element_name, cli.get_element_key(project, element_name))
    
    140
    +
    
    141
    +        # Assert that if we add an extra remote that has the buildtree artfact cached, bst push will
    
    142
    +        # automatically attempt to pull it and will be successful, leading to the full artifact being pushed
    
    143
    +        # to the empty share3. This gives the ability to attempt push currently partial artifacts to a remote,
    
    144
    +        # without exlipictly requiring a bst pull.
    
    145
    +        cli.configure({'artifacts': [{'url': share1.repo, 'push': False}, {'url': share3.repo, 'push': True}]})
    
    146
    +        result = cli.run(project=project, args=['--pull-buildtrees', 'push', element_name])
    
    147
    +        assert "Attempting to fetch missing artifact buildtrees" in result.stderr
    
    148
    +        assert element_name in result.get_pulled_elements()
    
    149
    +        assert os.path.isdir(buildtreedir)
    
    150
    +        assert element_name in result.get_pushed_elements()
    
    151
    +        assert share3.has_artifact('test', element_name, cli.get_element_key(project, element_name))
    
    152
    +
    
    126 153
     
    
    127 154
     # Ensure that only valid pull-buildtrees boolean options make it through the loading
    
    128 155
     # process.
    

  • tests/sources/remote.py
    ... ... @@ -5,6 +5,7 @@ import pytest
    5 5
     from buildstream._exceptions import ErrorDomain
    
    6 6
     from buildstream import _yaml
    
    7 7
     from tests.testutils import cli
    
    8
    +from tests.testutils.file_server import create_file_server
    
    8 9
     
    
    9 10
     DATA_DIR = os.path.join(
    
    10 11
         os.path.dirname(os.path.realpath(__file__)),
    
    ... ... @@ -22,6 +23,16 @@ def generate_project(project_dir, tmpdir):
    22 23
         }, project_file)
    
    23 24
     
    
    24 25
     
    
    26
    +def generate_project_file_server(server, project_dir):
    
    27
    +    project_file = os.path.join(project_dir, "project.conf")
    
    28
    +    _yaml.dump({
    
    29
    +        'name': 'foo',
    
    30
    +        'aliases': {
    
    31
    +            'tmpdir': server.base_url()
    
    32
    +        }
    
    33
    +    }, project_file)
    
    34
    +
    
    35
    +
    
    25 36
     # Test that without ref, consistency is set appropriately.
    
    26 37
     @pytest.mark.datafiles(os.path.join(DATA_DIR, 'no-ref'))
    
    27 38
     def test_no_ref(cli, tmpdir, datafiles):
    
    ... ... @@ -164,3 +175,35 @@ def test_executable(cli, tmpdir, datafiles):
    164 175
         assert (mode & stat.S_IEXEC)
    
    165 176
         # Assert executable by anyone
    
    166 177
         assert(mode & (stat.S_IEXEC | stat.S_IXGRP | stat.S_IXOTH))
    
    178
    +
    
    179
    +
    
    180
    +@pytest.mark.parametrize('server_type', ('FTP', 'HTTP'))
    
    181
    +@pytest.mark.datafiles(os.path.join(DATA_DIR, 'single-file'))
    
    182
    +def test_use_netrc(cli, datafiles, server_type, tmpdir):
    
    183
    +    fake_home = os.path.join(str(tmpdir), 'fake_home')
    
    184
    +    os.makedirs(fake_home, exist_ok=True)
    
    185
    +    project = str(datafiles)
    
    186
    +    checkoutdir = os.path.join(str(tmpdir), 'checkout')
    
    187
    +
    
    188
    +    os.environ['HOME'] = fake_home
    
    189
    +    with open(os.path.join(fake_home, '.netrc'), 'wb') as f:
    
    190
    +        os.fchmod(f.fileno(), 0o700)
    
    191
    +        f.write(b'machine 127.0.0.1\n')
    
    192
    +        f.write(b'login testuser\n')
    
    193
    +        f.write(b'password 12345\n')
    
    194
    +
    
    195
    +    with create_file_server(server_type) as server:
    
    196
    +        server.add_user('testuser', '12345', project)
    
    197
    +        generate_project_file_server(server, project)
    
    198
    +
    
    199
    +        server.start()
    
    200
    +
    
    201
    +        result = cli.run(project=project, args=['fetch', 'target.bst'])
    
    202
    +        result.assert_success()
    
    203
    +        result = cli.run(project=project, args=['build', 'target.bst'])
    
    204
    +        result.assert_success()
    
    205
    +        result = cli.run(project=project, args=['checkout', 'target.bst', checkoutdir])
    
    206
    +        result.assert_success()
    
    207
    +
    
    208
    +        checkout_file = os.path.join(checkoutdir, 'file')
    
    209
    +        assert(os.path.exists(checkout_file))

  • tests/sources/tar.py
    ... ... @@ -3,11 +3,13 @@ import pytest
    3 3
     import tarfile
    
    4 4
     import tempfile
    
    5 5
     import subprocess
    
    6
    +import urllib.parse
    
    6 7
     from shutil import copyfile, rmtree
    
    7 8
     
    
    8 9
     from buildstream._exceptions import ErrorDomain
    
    9 10
     from buildstream import _yaml
    
    10 11
     from tests.testutils import cli
    
    12
    +from tests.testutils.file_server import create_file_server
    
    11 13
     from tests.testutils.site import HAVE_LZIP
    
    12 14
     from . import list_dir_contents
    
    13 15
     
    
    ... ... @@ -49,6 +51,16 @@ def generate_project(project_dir, tmpdir):
    49 51
         }, project_file)
    
    50 52
     
    
    51 53
     
    
    54
    +def generate_project_file_server(base_url, project_dir):
    
    55
    +    project_file = os.path.join(project_dir, "project.conf")
    
    56
    +    _yaml.dump({
    
    57
    +        'name': 'foo',
    
    58
    +        'aliases': {
    
    59
    +            'tmpdir': base_url
    
    60
    +        }
    
    61
    +    }, project_file)
    
    62
    +
    
    63
    +
    
    52 64
     # Test that without ref, consistency is set appropriately.
    
    53 65
     @pytest.mark.datafiles(os.path.join(DATA_DIR, 'no-ref'))
    
    54 66
     def test_no_ref(cli, tmpdir, datafiles):
    
    ... ... @@ -302,3 +314,77 @@ def test_read_only_dir(cli, tmpdir, datafiles):
    302 314
                 else:
    
    303 315
                     os.remove(path)
    
    304 316
             rmtree(str(tmpdir), onerror=make_dir_writable)
    
    317
    +
    
    318
    +
    
    319
    +@pytest.mark.parametrize('server_type', ('FTP', 'HTTP'))
    
    320
    +@pytest.mark.datafiles(os.path.join(DATA_DIR, 'fetch'))
    
    321
    +def test_use_netrc(cli, datafiles, server_type, tmpdir):
    
    322
    +    file_server_files = os.path.join(str(tmpdir), 'file_server')
    
    323
    +    fake_home = os.path.join(str(tmpdir), 'fake_home')
    
    324
    +    os.makedirs(file_server_files, exist_ok=True)
    
    325
    +    os.makedirs(fake_home, exist_ok=True)
    
    326
    +    project = str(datafiles)
    
    327
    +    checkoutdir = os.path.join(str(tmpdir), 'checkout')
    
    328
    +
    
    329
    +    os.environ['HOME'] = fake_home
    
    330
    +    with open(os.path.join(fake_home, '.netrc'), 'wb') as f:
    
    331
    +        os.fchmod(f.fileno(), 0o700)
    
    332
    +        f.write(b'machine 127.0.0.1\n')
    
    333
    +        f.write(b'login testuser\n')
    
    334
    +        f.write(b'password 12345\n')
    
    335
    +
    
    336
    +    with create_file_server(server_type) as server:
    
    337
    +        server.add_user('testuser', '12345', file_server_files)
    
    338
    +        generate_project_file_server(server.base_url(), project)
    
    339
    +
    
    340
    +        src_tar = os.path.join(file_server_files, 'a.tar.gz')
    
    341
    +        _assemble_tar(os.path.join(str(datafiles), 'content'), 'a', src_tar)
    
    342
    +
    
    343
    +        server.start()
    
    344
    +
    
    345
    +        result = cli.run(project=project, args=['track', 'target.bst'])
    
    346
    +        result.assert_success()
    
    347
    +        result = cli.run(project=project, args=['fetch', 'target.bst'])
    
    348
    +        result.assert_success()
    
    349
    +        result = cli.run(project=project, args=['build', 'target.bst'])
    
    350
    +        result.assert_success()
    
    351
    +        result = cli.run(project=project, args=['checkout', 'target.bst', checkoutdir])
    
    352
    +        result.assert_success()
    
    353
    +
    
    354
    +        original_dir = os.path.join(str(datafiles), 'content', 'a')
    
    355
    +        original_contents = list_dir_contents(original_dir)
    
    356
    +        checkout_contents = list_dir_contents(checkoutdir)
    
    357
    +        assert(checkout_contents == original_contents)
    
    358
    +
    
    359
    +
    
    360
    +@pytest.mark.parametrize('server_type', ('FTP', 'HTTP'))
    
    361
    +@pytest.mark.datafiles(os.path.join(DATA_DIR, 'fetch'))
    
    362
    +def test_netrc_already_specified_user(cli, datafiles, server_type, tmpdir):
    
    363
    +    file_server_files = os.path.join(str(tmpdir), 'file_server')
    
    364
    +    fake_home = os.path.join(str(tmpdir), 'fake_home')
    
    365
    +    os.makedirs(file_server_files, exist_ok=True)
    
    366
    +    os.makedirs(fake_home, exist_ok=True)
    
    367
    +    project = str(datafiles)
    
    368
    +    checkoutdir = os.path.join(str(tmpdir), 'checkout')
    
    369
    +
    
    370
    +    os.environ['HOME'] = fake_home
    
    371
    +    with open(os.path.join(fake_home, '.netrc'), 'wb') as f:
    
    372
    +        os.fchmod(f.fileno(), 0o700)
    
    373
    +        f.write(b'machine 127.0.0.1\n')
    
    374
    +        f.write(b'login testuser\n')
    
    375
    +        f.write(b'password 12345\n')
    
    376
    +
    
    377
    +    with create_file_server(server_type) as server:
    
    378
    +        server.add_user('otheruser', '12345', file_server_files)
    
    379
    +        parts = urllib.parse.urlsplit(server.base_url())
    
    380
    +        base_url = urllib.parse.urlunsplit([parts[0]] + ['otheruser@{}'.format(parts[1])] + list(parts[2:]))
    
    381
    +        generate_project_file_server(base_url, project)
    
    382
    +
    
    383
    +        src_tar = os.path.join(file_server_files, 'a.tar.gz')
    
    384
    +        _assemble_tar(os.path.join(str(datafiles), 'content'), 'a', src_tar)
    
    385
    +
    
    386
    +        server.start()
    
    387
    +
    
    388
    +        result = cli.run(project=project, args=['track', 'target.bst'])
    
    389
    +        result.assert_main_error(ErrorDomain.STREAM, None)
    
    390
    +        result.assert_task_error(ErrorDomain.SOURCE, None)

  • tests/sources/zip.py
    ... ... @@ -5,6 +5,7 @@ import zipfile
    5 5
     from buildstream._exceptions import ErrorDomain
    
    6 6
     from buildstream import _yaml
    
    7 7
     from tests.testutils import cli
    
    8
    +from tests.testutils.file_server import create_file_server
    
    8 9
     from . import list_dir_contents
    
    9 10
     
    
    10 11
     DATA_DIR = os.path.join(
    
    ... ... @@ -35,6 +36,16 @@ def generate_project(project_dir, tmpdir):
    35 36
         }, project_file)
    
    36 37
     
    
    37 38
     
    
    39
    +def generate_project_file_server(server, project_dir):
    
    40
    +    project_file = os.path.join(project_dir, "project.conf")
    
    41
    +    _yaml.dump({
    
    42
    +        'name': 'foo',
    
    43
    +        'aliases': {
    
    44
    +            'tmpdir': server.base_url()
    
    45
    +        }
    
    46
    +    }, project_file)
    
    47
    +
    
    48
    +
    
    38 49
     # Test that without ref, consistency is set appropriately.
    
    39 50
     @pytest.mark.datafiles(os.path.join(DATA_DIR, 'no-ref'))
    
    40 51
     def test_no_ref(cli, tmpdir, datafiles):
    
    ... ... @@ -176,3 +187,44 @@ def test_stage_explicit_basedir(cli, tmpdir, datafiles):
    176 187
         original_contents = list_dir_contents(original_dir)
    
    177 188
         checkout_contents = list_dir_contents(checkoutdir)
    
    178 189
         assert(checkout_contents == original_contents)
    
    190
    +
    
    191
    +
    
    192
    +@pytest.mark.parametrize('server_type', ('FTP', 'HTTP'))
    
    193
    +@pytest.mark.datafiles(os.path.join(DATA_DIR, 'fetch'))
    
    194
    +def test_use_netrc(cli, datafiles, server_type, tmpdir):
    
    195
    +    file_server_files = os.path.join(str(tmpdir), 'file_server')
    
    196
    +    fake_home = os.path.join(str(tmpdir), 'fake_home')
    
    197
    +    os.makedirs(file_server_files, exist_ok=True)
    
    198
    +    os.makedirs(fake_home, exist_ok=True)
    
    199
    +    project = str(datafiles)
    
    200
    +    checkoutdir = os.path.join(str(tmpdir), 'checkout')
    
    201
    +
    
    202
    +    os.environ['HOME'] = fake_home
    
    203
    +    with open(os.path.join(fake_home, '.netrc'), 'wb') as f:
    
    204
    +        os.fchmod(f.fileno(), 0o700)
    
    205
    +        f.write(b'machine 127.0.0.1\n')
    
    206
    +        f.write(b'login testuser\n')
    
    207
    +        f.write(b'password 12345\n')
    
    208
    +
    
    209
    +    with create_file_server(server_type) as server:
    
    210
    +        server.add_user('testuser', '12345', file_server_files)
    
    211
    +        generate_project_file_server(server, project)
    
    212
    +
    
    213
    +        src_zip = os.path.join(file_server_files, 'a.zip')
    
    214
    +        _assemble_zip(os.path.join(str(datafiles), 'content'), src_zip)
    
    215
    +
    
    216
    +        server.start()
    
    217
    +
    
    218
    +        result = cli.run(project=project, args=['track', 'target.bst'])
    
    219
    +        result.assert_success()
    
    220
    +        result = cli.run(project=project, args=['fetch', 'target.bst'])
    
    221
    +        result.assert_success()
    
    222
    +        result = cli.run(project=project, args=['build', 'target.bst'])
    
    223
    +        result.assert_success()
    
    224
    +        result = cli.run(project=project, args=['checkout', 'target.bst', checkoutdir])
    
    225
    +        result.assert_success()
    
    226
    +
    
    227
    +        original_dir = os.path.join(str(datafiles), 'content', 'a')
    
    228
    +        original_contents = list_dir_contents(original_dir)
    
    229
    +        checkout_contents = list_dir_contents(checkoutdir)
    
    230
    +        assert(checkout_contents == original_contents)

  • tests/testutils/artifactshare.py
    ... ... @@ -29,7 +29,11 @@ from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution
    29 29
     #
    
    30 30
     class ArtifactShare():
    
    31 31
     
    
    32
    -    def __init__(self, directory, *, total_space=None, free_space=None):
    
    32
    +    def __init__(self, directory, *,
    
    33
    +                 total_space=None,
    
    34
    +                 free_space=None,
    
    35
    +                 min_head_size=int(2e9),
    
    36
    +                 max_head_size=int(10e9)):
    
    33 37
     
    
    34 38
             # The working directory for the artifact share (in case it
    
    35 39
             # needs to do something outside of its backend's storage folder).
    
    ... ... @@ -50,6 +54,9 @@ class ArtifactShare():
    50 54
             self.total_space = total_space
    
    51 55
             self.free_space = free_space
    
    52 56
     
    
    57
    +        self.max_head_size = max_head_size
    
    58
    +        self.min_head_size = min_head_size
    
    59
    +
    
    53 60
             q = Queue()
    
    54 61
     
    
    55 62
             self.process = Process(target=self.run, args=(q,))
    
    ... ... @@ -74,7 +81,10 @@ class ArtifactShare():
    74 81
                         self.free_space = self.total_space
    
    75 82
                     os.statvfs = self._mock_statvfs
    
    76 83
     
    
    77
    -            server = create_server(self.repodir, enable_push=True)
    
    84
    +            server = create_server(self.repodir,
    
    85
    +                                   max_head_size=self.max_head_size,
    
    86
    +                                   min_head_size=self.min_head_size,
    
    87
    +                                   enable_push=True)
    
    78 88
                 port = server.add_insecure_port('localhost:0')
    
    79 89
     
    
    80 90
                 server.start()
    
    ... ... @@ -136,6 +146,15 @@ class ArtifactShare():
    136 146
     
    
    137 147
             try:
    
    138 148
                 tree = self.cas.resolve_ref(artifact_key)
    
    149
    +            reachable = set()
    
    150
    +            try:
    
    151
    +                self.cas._reachable_refs_dir(reachable, tree, update_mtime=False)
    
    152
    +            except FileNotFoundError:
    
    153
    +                return None
    
    154
    +            for digest in reachable:
    
    155
    +                object_name = os.path.join(self.cas.casdir, 'objects', digest[:2], digest[2:])
    
    156
    +                if not os.path.exists(object_name):
    
    157
    +                    return None
    
    139 158
                 return tree
    
    140 159
             except CASError:
    
    141 160
                 return None
    
    ... ... @@ -167,8 +186,11 @@ class ArtifactShare():
    167 186
     # Create an ArtifactShare for use in a test case
    
    168 187
     #
    
    169 188
     @contextmanager
    
    170
    -def create_artifact_share(directory, *, total_space=None, free_space=None):
    
    171
    -    share = ArtifactShare(directory, total_space=total_space, free_space=free_space)
    
    189
    +def create_artifact_share(directory, *, total_space=None, free_space=None,
    
    190
    +                          min_head_size=int(2e9),
    
    191
    +                          max_head_size=int(10e9)):
    
    192
    +    share = ArtifactShare(directory, total_space=total_space, free_space=free_space,
    
    193
    +                          min_head_size=min_head_size, max_head_size=max_head_size)
    
    172 194
         try:
    
    173 195
             yield share
    
    174 196
         finally:
    

  • tests/testutils/file_server.py
    1
    +from contextlib import contextmanager
    
    2
    +
    
    3
    +from .ftp_server import SimpleFtpServer
    
    4
    +from .http_server import SimpleHttpServer
    
    5
    +
    
    6
    +
    
    7
    +@contextmanager
    
    8
    +def create_file_server(file_server_type):
    
    9
    +    if file_server_type == 'FTP':
    
    10
    +        server = SimpleFtpServer()
    
    11
    +    elif file_server_type == 'HTTP':
    
    12
    +        server = SimpleHttpServer()
    
    13
    +    else:
    
    14
    +        assert False
    
    15
    +
    
    16
    +    try:
    
    17
    +        yield server
    
    18
    +    finally:
    
    19
    +        server.stop()

  • tests/testutils/ftp_server.py
    1
    +import multiprocessing
    
    2
    +
    
    3
    +from pyftpdlib.authorizers import DummyAuthorizer
    
    4
    +from pyftpdlib.handlers import FTPHandler
    
    5
    +from pyftpdlib.servers import FTPServer
    
    6
    +
    
    7
    +
    
    8
    +class SimpleFtpServer(multiprocessing.Process):
    
    9
    +    def __init__(self):
    
    10
    +        super().__init__()
    
    11
    +        self.authorizer = DummyAuthorizer()
    
    12
    +        handler = FTPHandler
    
    13
    +        handler.authorizer = self.authorizer
    
    14
    +        self.server = FTPServer(('127.0.0.1', 0), handler)
    
    15
    +
    
    16
    +    def run(self):
    
    17
    +        self.server.serve_forever()
    
    18
    +
    
    19
    +    def stop(self):
    
    20
    +        self.server.close_all()
    
    21
    +        self.server.close()
    
    22
    +        self.terminate()
    
    23
    +        self.join()
    
    24
    +
    
    25
    +    def allow_anonymous(self, cwd):
    
    26
    +        self.authorizer.add_anonymous(cwd)
    
    27
    +
    
    28
    +    def add_user(self, user, password, cwd):
    
    29
    +        self.authorizer.add_user(user, password, cwd, perm='elradfmwMT')
    
    30
    +
    
    31
    +    def base_url(self):
    
    32
    +        return 'ftp://127.0.0.1:{}'.format(self.server.address[1])

  • tests/testutils/http_server.py
    1
    +import multiprocessing
    
    2
    +import os
    
    3
    +import posixpath
    
    4
    +import html
    
    5
    +import threading
    
    6
    +import base64
    
    7
    +from http.server import SimpleHTTPRequestHandler, HTTPServer, HTTPStatus
    
    8
    +
    
    9
    +
    
    10
    +class Unauthorized(Exception):
    
    11
    +    pass
    
    12
    +
    
    13
    +
    
    14
    +class RequestHandler(SimpleHTTPRequestHandler):
    
    15
    +
    
    16
    +    def get_root_dir(self):
    
    17
    +        authorization = self.headers.get('authorization')
    
    18
    +        if not authorization:
    
    19
    +            if not self.server.anonymous_dir:
    
    20
    +                raise Unauthorized('unauthorized')
    
    21
    +            return self.server.anonymous_dir
    
    22
    +        else:
    
    23
    +            authorization = authorization.split()
    
    24
    +            if len(authorization) != 2 or authorization[0].lower() != 'basic':
    
    25
    +                raise Unauthorized('unauthorized')
    
    26
    +            try:
    
    27
    +                decoded = base64.decodebytes(authorization[1].encode('ascii'))
    
    28
    +                user, password = decoded.decode('ascii').split(':')
    
    29
    +                expected_password, directory = self.server.users[user]
    
    30
    +                if password == expected_password:
    
    31
    +                    return directory
    
    32
    +            except:
    
    33
    +                raise Unauthorized('unauthorized')
    
    34
    +            return None
    
    35
    +
    
    36
    +    def unauthorized(self):
    
    37
    +        shortmsg, longmsg = self.responses[HTTPStatus.UNAUTHORIZED]
    
    38
    +        self.send_response(HTTPStatus.UNAUTHORIZED, shortmsg)
    
    39
    +        self.send_header('Connection', 'close')
    
    40
    +
    
    41
    +        content = (self.error_message_format % {
    
    42
    +            'code': HTTPStatus.UNAUTHORIZED,
    
    43
    +            'message': html.escape(longmsg, quote=False),
    
    44
    +            'explain': html.escape(longmsg, quote=False)
    
    45
    +        })
    
    46
    +        body = content.encode('UTF-8', 'replace')
    
    47
    +        self.send_header('Content-Type', self.error_content_type)
    
    48
    +        self.send_header('Content-Length', str(len(body)))
    
    49
    +        self.send_header('WWW-Authenticate', 'Basic realm="{}"'.format(self.server.realm))
    
    50
    +        self.end_headers()
    
    51
    +        self.end_headers()
    
    52
    +
    
    53
    +        if self.command != 'HEAD' and body:
    
    54
    +            self.wfile.write(body)
    
    55
    +
    
    56
    +    def do_GET(self):
    
    57
    +        try:
    
    58
    +            super().do_GET()
    
    59
    +        except Unauthorized:
    
    60
    +            self.unauthorized()
    
    61
    +
    
    62
    +    def do_HEAD(self):
    
    63
    +        try:
    
    64
    +            super().do_HEAD()
    
    65
    +        except Unauthorized:
    
    66
    +            self.unauthorized()
    
    67
    +
    
    68
    +    def translate_path(self, path):
    
    69
    +        path = path.split('?', 1)[0]
    
    70
    +        path = path.split('#', 1)[0]
    
    71
    +        path = posixpath.normpath(path)
    
    72
    +        assert(posixpath.isabs(path))
    
    73
    +        path = posixpath.relpath(path, '/')
    
    74
    +        return os.path.join(self.get_root_dir(), path)
    
    75
    +
    
    76
    +
    
    77
    +class AuthHTTPServer(HTTPServer):
    
    78
    +    def __init__(self, *args, **kwargs):
    
    79
    +        self.users = {}
    
    80
    +        self.anonymous_dir = None
    
    81
    +        self.realm = 'Realm'
    
    82
    +        super().__init__(*args, **kwargs)
    
    83
    +
    
    84
    +
    
    85
    +class SimpleHttpServer(multiprocessing.Process):
    
    86
    +    def __init__(self):
    
    87
    +        self.__stop = multiprocessing.Queue()
    
    88
    +        super().__init__()
    
    89
    +        self.server = AuthHTTPServer(('127.0.0.1', 0), RequestHandler)
    
    90
    +        self.started = False
    
    91
    +
    
    92
    +    def start(self):
    
    93
    +        self.started = True
    
    94
    +        super().start()
    
    95
    +
    
    96
    +    def run(self):
    
    97
    +        t = threading.Thread(target=self.server.serve_forever)
    
    98
    +        t.start()
    
    99
    +        self.__stop.get()
    
    100
    +        self.server.shutdown()
    
    101
    +        t.join()
    
    102
    +
    
    103
    +    def stop(self):
    
    104
    +        if not self.started:
    
    105
    +            return
    
    106
    +        self.__stop.put(None)
    
    107
    +        self.terminate()
    
    108
    +        self.join()
    
    109
    +
    
    110
    +    def allow_anonymous(self, cwd):
    
    111
    +        self.server.anonymous_dir = cwd
    
    112
    +
    
    113
    +    def add_user(self, user, password, cwd):
    
    114
    +        self.server.users[user] = (password, cwd)
    
    115
    +
    
    116
    +    def base_url(self):
    
    117
    +        return 'http://127.0.0.1:{}'.format(self.server.server_port)



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]