[Notes] [Git][BuildStream/buildstream][valentindavid/cache_server_fill_up-1.2] 36 commits: artifactcache: fix oversight



Title: GitLab

Valentin David pushed to branch valentindavid/cache_server_fill_up-1.2 at BuildStream / buildstream

Commits:

17 changed files:

Changes:

  • NEWS
    1
    +=================
    
    2
    +buildstream 1.2.3
    
    3
    +=================
    
    4
    +
    
    5
    + o Fixed an unhandled exception when cleaning up a build sandbox (#153)
    
    6
    +
    
    7
    + o Fixed race condition when calculating cache size and commiting artifacts
    
    8
    +
    
    9
    + o Fixed regression where terminating with `^C` results in a double user interrogation (#693)
    
    10
    +
    
    11
    + o Fixed regression in summary when builds are terminated (#479)
    
    12
    +
    
    13
    + o Fixed regression where irrelevant status messages appear from git sources
    
    14
    +
    
    15
    + o Improve performance of artifact uploads by batching file transfers (#676/#677)
    
    16
    +
    
    17
    + o Fixed performance of artifact downloads by batching file transfers (#554)
    
    18
    +
    
    19
    + o Fixed checks for paths which escape the project directory (#673)
    
    20
    +
    
    1 21
     =================
    
    2 22
     buildstream 1.2.2
    
    3 23
     =================
    

  • buildstream/_artifactcache/artifactcache.py
    ... ... @@ -277,7 +277,7 @@ class ArtifactCache():
    277 277
                               "Please increase the cache-quota in {}."
    
    278 278
                               .format(self.context.config_origin or default_conf))
    
    279 279
     
    
    280
    -                if self.get_quota_exceeded():
    
    280
    +                if self.has_quota_exceeded():
    
    281 281
                         raise ArtifactError("Cache too full. Aborting.",
    
    282 282
                                             detail=detail,
    
    283 283
                                             reason="cache-too-full")
    
    ... ... @@ -364,14 +364,14 @@ class ArtifactCache():
    364 364
             self._cache_size = cache_size
    
    365 365
             self._write_cache_size(self._cache_size)
    
    366 366
     
    
    367
    -    # get_quota_exceeded()
    
    367
    +    # has_quota_exceeded()
    
    368 368
         #
    
    369 369
         # Checks if the current artifact cache size exceeds the quota.
    
    370 370
         #
    
    371 371
         # Returns:
    
    372 372
         #    (bool): True of the quota is exceeded
    
    373 373
         #
    
    374
    -    def get_quota_exceeded(self):
    
    374
    +    def has_quota_exceeded(self):
    
    375 375
             return self.get_cache_size() > self._cache_quota
    
    376 376
     
    
    377 377
         ################################################
    

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -26,6 +26,7 @@ import stat
    26 26
     import tempfile
    
    27 27
     import uuid
    
    28 28
     import errno
    
    29
    +import contextlib
    
    29 30
     from urllib.parse import urlparse
    
    30 31
     
    
    31 32
     import grpc
    
    ... ... @@ -43,6 +44,11 @@ from .._exceptions import ArtifactError
    43 44
     from . import ArtifactCache
    
    44 45
     
    
    45 46
     
    
    47
    +# The default limit for gRPC messages is 4 MiB.
    
    48
    +# Limit payload to 1 MiB to leave sufficient headroom for metadata.
    
    49
    +_MAX_PAYLOAD_BYTES = 1024 * 1024
    
    50
    +
    
    51
    +
    
    46 52
     # A CASCache manages artifacts in a CAS repository as specified in the
    
    47 53
     # Remote Execution API.
    
    48 54
     #
    
    ... ... @@ -76,6 +82,7 @@ class CASCache(ArtifactCache):
    76 82
         ################################################
    
    77 83
         #     Implementation of abstract methods       #
    
    78 84
         ################################################
    
    85
    +
    
    79 86
         def contains(self, element, key):
    
    80 87
             refpath = self._refpath(self.get_artifact_fullname(element, key))
    
    81 88
     
    
    ... ... @@ -115,7 +122,7 @@ class CASCache(ArtifactCache):
    115 122
         def commit(self, element, content, keys):
    
    116 123
             refs = [self.get_artifact_fullname(element, key) for key in keys]
    
    117 124
     
    
    118
    -        tree = self._create_tree(content)
    
    125
    +        tree = self._commit_directory(content)
    
    119 126
     
    
    120 127
             for ref in refs:
    
    121 128
                 self.set_ref(ref, tree)
    
    ... ... @@ -151,6 +158,7 @@ class CASCache(ArtifactCache):
    151 158
             q = multiprocessing.Queue()
    
    152 159
             for remote_spec in remote_specs:
    
    153 160
                 # Use subprocess to avoid creation of gRPC threads in main BuildStream process
    
    161
    +            # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
    
    154 162
                 p = multiprocessing.Process(target=self._initialize_remote, args=(remote_spec, q))
    
    155 163
     
    
    156 164
                 try:
    
    ... ... @@ -263,109 +271,69 @@ class CASCache(ArtifactCache):
    263 271
     
    
    264 272
             self.set_ref(newref, tree)
    
    265 273
     
    
    274
    +    def _push_refs_to_remote(self, refs, remote):
    
    275
    +        skipped_remote = True
    
    276
    +        try:
    
    277
    +            for ref in refs:
    
    278
    +                tree = self.resolve_ref(ref)
    
    279
    +
    
    280
    +                # Check whether ref is already on the server in which case
    
    281
    +                # there is no need to push the artifact
    
    282
    +                try:
    
    283
    +                    request = buildstream_pb2.GetReferenceRequest()
    
    284
    +                    request.key = ref
    
    285
    +                    response = remote.ref_storage.GetReference(request)
    
    286
    +
    
    287
    +                    if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
    
    288
    +                        # ref is already on the server with the same tree
    
    289
    +                        continue
    
    290
    +
    
    291
    +                except grpc.RpcError as e:
    
    292
    +                    if e.code() != grpc.StatusCode.NOT_FOUND:
    
    293
    +                        # Intentionally re-raise RpcError for outer except block.
    
    294
    +                        raise
    
    295
    +
    
    296
    +                self._send_directory(remote, tree)
    
    297
    +
    
    298
    +                request = buildstream_pb2.UpdateReferenceRequest()
    
    299
    +                request.keys.append(ref)
    
    300
    +                request.digest.hash = tree.hash
    
    301
    +                request.digest.size_bytes = tree.size_bytes
    
    302
    +                remote.ref_storage.UpdateReference(request)
    
    303
    +
    
    304
    +                skipped_remote = False
    
    305
    +        except grpc.RpcError as e:
    
    306
    +            if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
    
    307
    +                raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
    
    308
    +
    
    309
    +        return not skipped_remote
    
    310
    +
    
    266 311
         def push(self, element, keys):
    
    267
    -        refs = [self.get_artifact_fullname(element, key) for key in keys]
    
    312
    +
    
    313
    +        refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
    
    268 314
     
    
    269 315
             project = element._get_project()
    
    270 316
     
    
    271 317
             push_remotes = [r for r in self._remotes[project] if r.spec.push]
    
    272 318
     
    
    273 319
             pushed = False
    
    274
    -        display_key = element._get_brief_display_key()
    
    320
    +
    
    275 321
             for remote in push_remotes:
    
    276 322
                 remote.init()
    
    277
    -            skipped_remote = True
    
    323
    +            display_key = element._get_brief_display_key()
    
    278 324
                 element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
    
    279 325
     
    
    280
    -            try:
    
    281
    -                for ref in refs:
    
    282
    -                    tree = self.resolve_ref(ref)
    
    283
    -
    
    284
    -                    # Check whether ref is already on the server in which case
    
    285
    -                    # there is no need to push the artifact
    
    286
    -                    try:
    
    287
    -                        request = buildstream_pb2.GetReferenceRequest()
    
    288
    -                        request.key = ref
    
    289
    -                        response = remote.ref_storage.GetReference(request)
    
    290
    -
    
    291
    -                        if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
    
    292
    -                            # ref is already on the server with the same tree
    
    293
    -                            continue
    
    294
    -
    
    295
    -                    except grpc.RpcError as e:
    
    296
    -                        if e.code() != grpc.StatusCode.NOT_FOUND:
    
    297
    -                            # Intentionally re-raise RpcError for outer except block.
    
    298
    -                            raise
    
    299
    -
    
    300
    -                    missing_blobs = {}
    
    301
    -                    required_blobs = self._required_blobs(tree)
    
    302
    -
    
    303
    -                    # Limit size of FindMissingBlobs request
    
    304
    -                    for required_blobs_group in _grouper(required_blobs, 512):
    
    305
    -                        request = remote_execution_pb2.FindMissingBlobsRequest()
    
    306
    -
    
    307
    -                        for required_digest in required_blobs_group:
    
    308
    -                            d = request.blob_digests.add()
    
    309
    -                            d.hash = required_digest.hash
    
    310
    -                            d.size_bytes = required_digest.size_bytes
    
    311
    -
    
    312
    -                        response = remote.cas.FindMissingBlobs(request)
    
    313
    -                        for digest in response.missing_blob_digests:
    
    314
    -                            d = remote_execution_pb2.Digest()
    
    315
    -                            d.hash = digest.hash
    
    316
    -                            d.size_bytes = digest.size_bytes
    
    317
    -                            missing_blobs[d.hash] = d
    
    318
    -
    
    319
    -                    # Upload any blobs missing on the server
    
    320
    -                    skipped_remote = False
    
    321
    -                    for digest in missing_blobs.values():
    
    322
    -                        uuid_ = uuid.uuid4()
    
    323
    -                        resource_name = '/'.join(['uploads', str(uuid_), 'blobs',
    
    324
    -                                                  digest.hash, str(digest.size_bytes)])
    
    325
    -
    
    326
    -                        def request_stream(resname):
    
    327
    -                            with open(self.objpath(digest), 'rb') as f:
    
    328
    -                                assert os.fstat(f.fileno()).st_size == digest.size_bytes
    
    329
    -                                offset = 0
    
    330
    -                                finished = False
    
    331
    -                                remaining = digest.size_bytes
    
    332
    -                                while not finished:
    
    333
    -                                    chunk_size = min(remaining, 64 * 1024)
    
    334
    -                                    remaining -= chunk_size
    
    335
    -
    
    336
    -                                    request = bytestream_pb2.WriteRequest()
    
    337
    -                                    request.write_offset = offset
    
    338
    -                                    # max. 64 kB chunks
    
    339
    -                                    request.data = f.read(chunk_size)
    
    340
    -                                    request.resource_name = resname
    
    341
    -                                    request.finish_write = remaining <= 0
    
    342
    -                                    yield request
    
    343
    -                                    offset += chunk_size
    
    344
    -                                    finished = request.finish_write
    
    345
    -                        response = remote.bytestream.Write(request_stream(resource_name))
    
    346
    -
    
    347
    -                    request = buildstream_pb2.UpdateReferenceRequest()
    
    348
    -                    request.keys.append(ref)
    
    349
    -                    request.digest.hash = tree.hash
    
    350
    -                    request.digest.size_bytes = tree.size_bytes
    
    351
    -                    remote.ref_storage.UpdateReference(request)
    
    352
    -
    
    353
    -                    pushed = True
    
    354
    -
    
    355
    -                if not skipped_remote:
    
    356
    -                    element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
    
    357
    -
    
    358
    -            except grpc.RpcError as e:
    
    359
    -                if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
    
    360
    -                    raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
    
    361
    -
    
    362
    -            if skipped_remote:
    
    326
    +            if self._push_refs_to_remote(refs, remote):
    
    327
    +                element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
    
    328
    +                pushed = True
    
    329
    +            else:
    
    363 330
                     self.context.message(Message(
    
    364 331
                         None,
    
    365 332
                         MessageType.INFO,
    
    366 333
                         "Remote ({}) already has {} cached".format(
    
    367 334
                             remote.spec.url, element._get_brief_display_key())
    
    368 335
                     ))
    
    336
    +
    
    369 337
             return pushed
    
    370 338
     
    
    371 339
         ################################################
    
    ... ... @@ -393,13 +361,14 @@ class CASCache(ArtifactCache):
    393 361
         #     digest (Digest): An optional Digest object to populate
    
    394 362
         #     path (str): Path to file to add
    
    395 363
         #     buffer (bytes): Byte buffer to add
    
    364
    +    #     link_directly (bool): Whether file given by path can be linked
    
    396 365
         #
    
    397 366
         # Returns:
    
    398 367
         #     (Digest): The digest of the added object
    
    399 368
         #
    
    400 369
         # Either `path` or `buffer` must be passed, but not both.
    
    401 370
         #
    
    402
    -    def add_object(self, *, digest=None, path=None, buffer=None):
    
    371
    +    def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False):
    
    403 372
             # Exactly one of the two parameters has to be specified
    
    404 373
             assert (path is None) != (buffer is None)
    
    405 374
     
    
    ... ... @@ -409,28 +378,34 @@ class CASCache(ArtifactCache):
    409 378
             try:
    
    410 379
                 h = hashlib.sha256()
    
    411 380
                 # Always write out new file to avoid corruption if input file is modified
    
    412
    -            with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
    
    413
    -                # Set mode bits to 0644
    
    414
    -                os.chmod(out.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
    
    415
    -
    
    416
    -                if path:
    
    417
    -                    with open(path, 'rb') as f:
    
    418
    -                        for chunk in iter(lambda: f.read(4096), b""):
    
    419
    -                            h.update(chunk)
    
    420
    -                            out.write(chunk)
    
    381
    +            with contextlib.ExitStack() as stack:
    
    382
    +                if path is not None and link_directly:
    
    383
    +                    tmp = stack.enter_context(open(path, 'rb'))
    
    384
    +                    for chunk in iter(lambda: tmp.read(4096), b""):
    
    385
    +                        h.update(chunk)
    
    421 386
                     else:
    
    422
    -                    h.update(buffer)
    
    423
    -                    out.write(buffer)
    
    387
    +                    tmp = stack.enter_context(tempfile.NamedTemporaryFile(dir=self.tmpdir))
    
    388
    +                    # Set mode bits to 0644
    
    389
    +                    os.chmod(tmp.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
    
    390
    +
    
    391
    +                    if path:
    
    392
    +                        with open(path, 'rb') as f:
    
    393
    +                            for chunk in iter(lambda: f.read(4096), b""):
    
    394
    +                                h.update(chunk)
    
    395
    +                                tmp.write(chunk)
    
    396
    +                    else:
    
    397
    +                        h.update(buffer)
    
    398
    +                        tmp.write(buffer)
    
    424 399
     
    
    425
    -                out.flush()
    
    400
    +                    tmp.flush()
    
    426 401
     
    
    427 402
                     digest.hash = h.hexdigest()
    
    428
    -                digest.size_bytes = os.fstat(out.fileno()).st_size
    
    403
    +                digest.size_bytes = os.fstat(tmp.fileno()).st_size
    
    429 404
     
    
    430 405
                     # Place file at final location
    
    431 406
                     objpath = self.objpath(digest)
    
    432 407
                     os.makedirs(os.path.dirname(objpath), exist_ok=True)
    
    433
    -                os.link(out.name, objpath)
    
    408
    +                os.link(tmp.name, objpath)
    
    434 409
     
    
    435 410
             except FileExistsError as e:
    
    436 411
                 # We can ignore the failed link() if the object is already in the repo.
    
    ... ... @@ -451,7 +426,7 @@ class CASCache(ArtifactCache):
    451 426
         def set_ref(self, ref, tree):
    
    452 427
             refpath = self._refpath(ref)
    
    453 428
             os.makedirs(os.path.dirname(refpath), exist_ok=True)
    
    454
    -        with utils.save_file_atomic(refpath, 'wb') as f:
    
    429
    +        with utils.save_file_atomic(refpath, 'wb', tempdir=self.tmpdir) as f:
    
    455 430
                 f.write(tree.SerializeToString())
    
    456 431
     
    
    457 432
         # resolve_ref():
    
    ... ... @@ -565,7 +540,12 @@ class CASCache(ArtifactCache):
    565 540
         #
    
    566 541
         # Prune unreachable objects from the repo.
    
    567 542
         #
    
    568
    -    def prune(self):
    
    543
    +    # Args:
    
    544
    +    #    keep_after (int|None): timestamp after which unreachable objects
    
    545
    +    #                           are kept. None if no unreachable object
    
    546
    +    #                           should be kept.
    
    547
    +    #
    
    548
    +    def prune(self, keep_after=None):
    
    569 549
             ref_heads = os.path.join(self.casdir, 'refs', 'heads')
    
    570 550
     
    
    571 551
             pruned = 0
    
    ... ... @@ -586,6 +566,10 @@ class CASCache(ArtifactCache):
    586 566
                     objhash = os.path.basename(root) + filename
    
    587 567
                     if objhash not in reachable:
    
    588 568
                         obj_path = os.path.join(root, filename)
    
    569
    +                    if keep_after:
    
    570
    +                        st = os.stat(obj_path)
    
    571
    +                        if st.st_mtime >= keep_after:
    
    572
    +                            continue
    
    589 573
                         pruned += os.stat(obj_path).st_size
    
    590 574
                         os.unlink(obj_path)
    
    591 575
     
    
    ... ... @@ -594,6 +578,7 @@ class CASCache(ArtifactCache):
    594 578
         ################################################
    
    595 579
         #             Local Private Methods            #
    
    596 580
         ################################################
    
    581
    +
    
    597 582
         def _checkout(self, dest, tree):
    
    598 583
             os.makedirs(dest, exist_ok=True)
    
    599 584
     
    
    ... ... @@ -623,7 +608,21 @@ class CASCache(ArtifactCache):
    623 608
         def _refpath(self, ref):
    
    624 609
             return os.path.join(self.casdir, 'refs', 'heads', ref)
    
    625 610
     
    
    626
    -    def _create_tree(self, path, *, digest=None):
    
    611
    +    # _commit_directory():
    
    612
    +    #
    
    613
    +    # Adds local directory to content addressable store.
    
    614
    +    #
    
    615
    +    # Adds files, symbolic links and recursively other directories in
    
    616
    +    # a local directory to the content addressable store.
    
    617
    +    #
    
    618
    +    # Args:
    
    619
    +    #     path (str): Path to the directory to add.
    
    620
    +    #     dir_digest (Digest): An optional Digest object to use.
    
    621
    +    #
    
    622
    +    # Returns:
    
    623
    +    #     (Digest): Digest object for the directory added.
    
    624
    +    #
    
    625
    +    def _commit_directory(self, path, *, dir_digest=None):
    
    627 626
             directory = remote_execution_pb2.Directory()
    
    628 627
     
    
    629 628
             for name in sorted(os.listdir(path)):
    
    ... ... @@ -632,7 +631,7 @@ class CASCache(ArtifactCache):
    632 631
                 if stat.S_ISDIR(mode):
    
    633 632
                     dirnode = directory.directories.add()
    
    634 633
                     dirnode.name = name
    
    635
    -                self._create_tree(full_path, digest=dirnode.digest)
    
    634
    +                self._commit_directory(full_path, dir_digest=dirnode.digest)
    
    636 635
                 elif stat.S_ISREG(mode):
    
    637 636
                     filenode = directory.files.add()
    
    638 637
                     filenode.name = name
    
    ... ... @@ -645,7 +644,8 @@ class CASCache(ArtifactCache):
    645 644
                 else:
    
    646 645
                     raise ArtifactError("Unsupported file type for {}".format(full_path))
    
    647 646
     
    
    648
    -        return self.add_object(digest=digest, buffer=directory.SerializeToString())
    
    647
    +        return self.add_object(digest=dir_digest,
    
    648
    +                               buffer=directory.SerializeToString())
    
    649 649
     
    
    650 650
         def _get_subdir(self, tree, subdir):
    
    651 651
             head, name = os.path.split(subdir)
    
    ... ... @@ -756,16 +756,16 @@ class CASCache(ArtifactCache):
    756 756
                 #
    
    757 757
                 q.put(str(e))
    
    758 758
     
    
    759
    -    def _required_blobs(self, tree):
    
    759
    +    def _required_blobs(self, directory_digest):
    
    760 760
             # parse directory, and recursively add blobs
    
    761 761
             d = remote_execution_pb2.Digest()
    
    762
    -        d.hash = tree.hash
    
    763
    -        d.size_bytes = tree.size_bytes
    
    762
    +        d.hash = directory_digest.hash
    
    763
    +        d.size_bytes = directory_digest.size_bytes
    
    764 764
             yield d
    
    765 765
     
    
    766 766
             directory = remote_execution_pb2.Directory()
    
    767 767
     
    
    768
    -        with open(self.objpath(tree), 'rb') as f:
    
    768
    +        with open(self.objpath(directory_digest), 'rb') as f:
    
    769 769
                 directory.ParseFromString(f.read())
    
    770 770
     
    
    771 771
             for filenode in directory.files:
    
    ... ... @@ -777,50 +777,203 @@ class CASCache(ArtifactCache):
    777 777
             for dirnode in directory.directories:
    
    778 778
                 yield from self._required_blobs(dirnode.digest)
    
    779 779
     
    
    780
    -    def _fetch_blob(self, remote, digest, out):
    
    780
    +    def _fetch_blob(self, remote, digest, stream):
    
    781 781
             resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
    
    782 782
             request = bytestream_pb2.ReadRequest()
    
    783 783
             request.resource_name = resource_name
    
    784 784
             request.read_offset = 0
    
    785 785
             for response in remote.bytestream.Read(request):
    
    786
    -            out.write(response.data)
    
    786
    +            stream.write(response.data)
    
    787
    +        stream.flush()
    
    787 788
     
    
    788
    -        out.flush()
    
    789
    -        assert digest.size_bytes == os.fstat(out.fileno()).st_size
    
    789
    +        assert digest.size_bytes == os.fstat(stream.fileno()).st_size
    
    790 790
     
    
    791
    -    def _fetch_directory(self, remote, tree):
    
    792
    -        objpath = self.objpath(tree)
    
    791
    +    # _ensure_blob():
    
    792
    +    #
    
    793
    +    # Fetch and add blob if it's not already local.
    
    794
    +    #
    
    795
    +    # Args:
    
    796
    +    #     remote (Remote): The remote to use.
    
    797
    +    #     digest (Digest): Digest object for the blob to fetch.
    
    798
    +    #
    
    799
    +    # Returns:
    
    800
    +    #     (str): The path of the object
    
    801
    +    #
    
    802
    +    def _ensure_blob(self, remote, digest):
    
    803
    +        objpath = self.objpath(digest)
    
    793 804
             if os.path.exists(objpath):
    
    794
    -            # already in local cache
    
    795
    -            return
    
    805
    +            # already in local repository
    
    806
    +            return objpath
    
    796 807
     
    
    797
    -        with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
    
    798
    -            self._fetch_blob(remote, tree, out)
    
    808
    +        with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
    
    809
    +            self._fetch_blob(remote, digest, f)
    
    799 810
     
    
    800
    -            directory = remote_execution_pb2.Directory()
    
    811
    +            added_digest = self.add_object(path=f.name)
    
    812
    +            assert added_digest.hash == digest.hash
    
    801 813
     
    
    802
    -            with open(out.name, 'rb') as f:
    
    803
    -                directory.ParseFromString(f.read())
    
    814
    +        return objpath
    
    804 815
     
    
    805
    -            for filenode in directory.files:
    
    806
    -                fileobjpath = self.objpath(tree)
    
    807
    -                if os.path.exists(fileobjpath):
    
    808
    -                    # already in local cache
    
    809
    -                    continue
    
    816
    +    def _batch_download_complete(self, batch):
    
    817
    +        for digest, data in batch.send():
    
    818
    +            with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
    
    819
    +                f.write(data)
    
    820
    +                f.flush()
    
    821
    +
    
    822
    +                added_digest = self.add_object(path=f.name)
    
    823
    +                assert added_digest.hash == digest.hash
    
    810 824
     
    
    811
    -                with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
    
    812
    -                    self._fetch_blob(remote, filenode.digest, f)
    
    825
    +    # Helper function for _fetch_directory().
    
    826
    +    def _fetch_directory_batch(self, remote, batch, fetch_queue, fetch_next_queue):
    
    827
    +        self._batch_download_complete(batch)
    
    813 828
     
    
    814
    -                    digest = self.add_object(path=f.name)
    
    815
    -                    assert digest.hash == filenode.digest.hash
    
    829
    +        # All previously scheduled directories are now locally available,
    
    830
    +        # move them to the processing queue.
    
    831
    +        fetch_queue.extend(fetch_next_queue)
    
    832
    +        fetch_next_queue.clear()
    
    833
    +        return _CASBatchRead(remote)
    
    834
    +
    
    835
    +    # Helper function for _fetch_directory().
    
    836
    +    def _fetch_directory_node(self, remote, digest, batch, fetch_queue, fetch_next_queue, *, recursive=False):
    
    837
    +        in_local_cache = os.path.exists(self.objpath(digest))
    
    838
    +
    
    839
    +        if in_local_cache:
    
    840
    +            # Skip download, already in local cache.
    
    841
    +            pass
    
    842
    +        elif (digest.size_bytes >= remote.max_batch_total_size_bytes or
    
    843
    +                not remote.batch_read_supported):
    
    844
    +            # Too large for batch request, download in independent request.
    
    845
    +            self._ensure_blob(remote, digest)
    
    846
    +            in_local_cache = True
    
    847
    +        else:
    
    848
    +            if not batch.add(digest):
    
    849
    +                # Not enough space left in batch request.
    
    850
    +                # Complete pending batch first.
    
    851
    +                batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
    
    852
    +                batch.add(digest)
    
    853
    +
    
    854
    +        if recursive:
    
    855
    +            if in_local_cache:
    
    856
    +                # Add directory to processing queue.
    
    857
    +                fetch_queue.append(digest)
    
    858
    +            else:
    
    859
    +                # Directory will be available after completing pending batch.
    
    860
    +                # Add directory to deferred processing queue.
    
    861
    +                fetch_next_queue.append(digest)
    
    862
    +
    
    863
    +        return batch
    
    864
    +
    
    865
    +    # _fetch_directory():
    
    866
    +    #
    
    867
    +    # Fetches remote directory and adds it to content addressable store.
    
    868
    +    #
    
    869
    +    # Fetches files, symbolic links and recursively other directories in
    
    870
    +    # the remote directory and adds them to the content addressable
    
    871
    +    # store.
    
    872
    +    #
    
    873
    +    # Args:
    
    874
    +    #     remote (Remote): The remote to use.
    
    875
    +    #     dir_digest (Digest): Digest object for the directory to fetch.
    
    876
    +    #
    
    877
    +    def _fetch_directory(self, remote, dir_digest):
    
    878
    +        fetch_queue = [dir_digest]
    
    879
    +        fetch_next_queue = []
    
    880
    +        batch = _CASBatchRead(remote)
    
    881
    +
    
    882
    +        while len(fetch_queue) + len(fetch_next_queue) > 0:
    
    883
    +            if len(fetch_queue) == 0:
    
    884
    +                batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
    
    885
    +
    
    886
    +            dir_digest = fetch_queue.pop(0)
    
    887
    +
    
    888
    +            objpath = self._ensure_blob(remote, dir_digest)
    
    889
    +
    
    890
    +            directory = remote_execution_pb2.Directory()
    
    891
    +            with open(objpath, 'rb') as f:
    
    892
    +                directory.ParseFromString(f.read())
    
    816 893
     
    
    817 894
                 for dirnode in directory.directories:
    
    818
    -                self._fetch_directory(remote, dirnode.digest)
    
    895
    +                batch = self._fetch_directory_node(remote, dirnode.digest, batch,
    
    896
    +                                                   fetch_queue, fetch_next_queue, recursive=True)
    
    819 897
     
    
    820
    -            # place directory blob only in final location when we've downloaded
    
    821
    -            # all referenced blobs to avoid dangling references in the repository
    
    822
    -            digest = self.add_object(path=out.name)
    
    823
    -            assert digest.hash == tree.hash
    
    898
    +            for filenode in directory.files:
    
    899
    +                batch = self._fetch_directory_node(remote, filenode.digest, batch,
    
    900
    +                                                   fetch_queue, fetch_next_queue)
    
    901
    +
    
    902
    +        # Fetch final batch
    
    903
    +        self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
    
    904
    +
    
    905
    +    def _send_blob(self, remote, digest, stream, u_uid=uuid.uuid4()):
    
    906
    +        resource_name = '/'.join(['uploads', str(u_uid), 'blobs',
    
    907
    +                                  digest.hash, str(digest.size_bytes)])
    
    908
    +
    
    909
    +        def request_stream(resname, instream):
    
    910
    +            offset = 0
    
    911
    +            finished = False
    
    912
    +            remaining = digest.size_bytes
    
    913
    +            while not finished:
    
    914
    +                chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
    
    915
    +                remaining -= chunk_size
    
    916
    +
    
    917
    +                request = bytestream_pb2.WriteRequest()
    
    918
    +                request.write_offset = offset
    
    919
    +                # max. _MAX_PAYLOAD_BYTES chunks
    
    920
    +                request.data = instream.read(chunk_size)
    
    921
    +                request.resource_name = resname
    
    922
    +                request.finish_write = remaining <= 0
    
    923
    +
    
    924
    +                yield request
    
    925
    +
    
    926
    +                offset += chunk_size
    
    927
    +                finished = request.finish_write
    
    928
    +
    
    929
    +        response = remote.bytestream.Write(request_stream(resource_name, stream))
    
    930
    +
    
    931
    +        assert response.committed_size == digest.size_bytes
    
    932
    +
    
    933
    +    def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
    
    934
    +        required_blobs = self._required_blobs(digest)
    
    935
    +
    
    936
    +        missing_blobs = dict()
    
    937
    +        # Limit size of FindMissingBlobs request
    
    938
    +        for required_blobs_group in _grouper(required_blobs, 512):
    
    939
    +            request = remote_execution_pb2.FindMissingBlobsRequest()
    
    940
    +
    
    941
    +            for required_digest in required_blobs_group:
    
    942
    +                d = request.blob_digests.add()
    
    943
    +                d.hash = required_digest.hash
    
    944
    +                d.size_bytes = required_digest.size_bytes
    
    945
    +
    
    946
    +            response = remote.cas.FindMissingBlobs(request)
    
    947
    +            for missing_digest in response.missing_blob_digests:
    
    948
    +                d = remote_execution_pb2.Digest()
    
    949
    +                d.hash = missing_digest.hash
    
    950
    +                d.size_bytes = missing_digest.size_bytes
    
    951
    +                missing_blobs[d.hash] = d
    
    952
    +
    
    953
    +        # Upload any blobs missing on the server
    
    954
    +        self._send_blobs(remote, missing_blobs.values(), u_uid)
    
    955
    +
    
    956
    +    def _send_blobs(self, remote, digests, u_uid=uuid.uuid4()):
    
    957
    +        batch = _CASBatchUpdate(remote)
    
    958
    +
    
    959
    +        for digest in digests:
    
    960
    +            with open(self.objpath(digest), 'rb') as f:
    
    961
    +                assert os.fstat(f.fileno()).st_size == digest.size_bytes
    
    962
    +
    
    963
    +                if (digest.size_bytes >= remote.max_batch_total_size_bytes or
    
    964
    +                        not remote.batch_update_supported):
    
    965
    +                    # Too large for batch request, upload in independent request.
    
    966
    +                    self._send_blob(remote, digest, f, u_uid=u_uid)
    
    967
    +                else:
    
    968
    +                    if not batch.add(digest, f):
    
    969
    +                        # Not enough space left in batch request.
    
    970
    +                        # Complete pending batch first.
    
    971
    +                        batch.send()
    
    972
    +                        batch = _CASBatchUpdate(remote)
    
    973
    +                        batch.add(digest, f)
    
    974
    +
    
    975
    +        # Send final batch
    
    976
    +        batch.send()
    
    824 977
     
    
    825 978
     
    
    826 979
     # Represents a single remote CAS cache.
    
    ... ... @@ -870,11 +1023,129 @@ class _CASRemote():
    870 1023
     
    
    871 1024
                 self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel)
    
    872 1025
                 self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
    
    1026
    +            self.capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self.channel)
    
    873 1027
                 self.ref_storage = buildstream_pb2_grpc.ReferenceStorageStub(self.channel)
    
    874 1028
     
    
    1029
    +            self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
    
    1030
    +            try:
    
    1031
    +                request = remote_execution_pb2.GetCapabilitiesRequest()
    
    1032
    +                response = self.capabilities.GetCapabilities(request)
    
    1033
    +                server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes
    
    1034
    +                if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes:
    
    1035
    +                    self.max_batch_total_size_bytes = server_max_batch_total_size_bytes
    
    1036
    +            except grpc.RpcError as e:
    
    1037
    +                # Simply use the defaults for servers that don't implement GetCapabilities()
    
    1038
    +                if e.code() != grpc.StatusCode.UNIMPLEMENTED:
    
    1039
    +                    raise
    
    1040
    +
    
    1041
    +            # Check whether the server supports BatchReadBlobs()
    
    1042
    +            self.batch_read_supported = False
    
    1043
    +            try:
    
    1044
    +                request = remote_execution_pb2.BatchReadBlobsRequest()
    
    1045
    +                response = self.cas.BatchReadBlobs(request)
    
    1046
    +                self.batch_read_supported = True
    
    1047
    +            except grpc.RpcError as e:
    
    1048
    +                if e.code() != grpc.StatusCode.UNIMPLEMENTED:
    
    1049
    +                    raise
    
    1050
    +
    
    1051
    +            # Check whether the server supports BatchUpdateBlobs()
    
    1052
    +            self.batch_update_supported = False
    
    1053
    +            try:
    
    1054
    +                request = remote_execution_pb2.BatchUpdateBlobsRequest()
    
    1055
    +                response = self.cas.BatchUpdateBlobs(request)
    
    1056
    +                self.batch_update_supported = True
    
    1057
    +            except grpc.RpcError as e:
    
    1058
    +                if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
    
    1059
    +                        e.code() != grpc.StatusCode.PERMISSION_DENIED):
    
    1060
    +                    raise
    
    1061
    +
    
    875 1062
                 self._initialized = True
    
    876 1063
     
    
    877 1064
     
    
    1065
    +# Represents a batch of blobs queued for fetching.
    
    1066
    +#
    
    1067
    +class _CASBatchRead():
    
    1068
    +    def __init__(self, remote):
    
    1069
    +        self._remote = remote
    
    1070
    +        self._max_total_size_bytes = remote.max_batch_total_size_bytes
    
    1071
    +        self._request = remote_execution_pb2.BatchReadBlobsRequest()
    
    1072
    +        self._size = 0
    
    1073
    +        self._sent = False
    
    1074
    +
    
    1075
    +    def add(self, digest):
    
    1076
    +        assert not self._sent
    
    1077
    +
    
    1078
    +        new_batch_size = self._size + digest.size_bytes
    
    1079
    +        if new_batch_size > self._max_total_size_bytes:
    
    1080
    +            # Not enough space left in current batch
    
    1081
    +            return False
    
    1082
    +
    
    1083
    +        request_digest = self._request.digests.add()
    
    1084
    +        request_digest.hash = digest.hash
    
    1085
    +        request_digest.size_bytes = digest.size_bytes
    
    1086
    +        self._size = new_batch_size
    
    1087
    +        return True
    
    1088
    +
    
    1089
    +    def send(self):
    
    1090
    +        assert not self._sent
    
    1091
    +        self._sent = True
    
    1092
    +
    
    1093
    +        if len(self._request.digests) == 0:
    
    1094
    +            return
    
    1095
    +
    
    1096
    +        batch_response = self._remote.cas.BatchReadBlobs(self._request)
    
    1097
    +
    
    1098
    +        for response in batch_response.responses:
    
    1099
    +            if response.status.code != grpc.StatusCode.OK.value[0]:
    
    1100
    +                raise ArtifactError("Failed to download blob {}: {}".format(
    
    1101
    +                    response.digest.hash, response.status.code))
    
    1102
    +            if response.digest.size_bytes != len(response.data):
    
    1103
    +                raise ArtifactError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
    
    1104
    +                    response.digest.hash, response.digest.size_bytes, len(response.data)))
    
    1105
    +
    
    1106
    +            yield (response.digest, response.data)
    
    1107
    +
    
    1108
    +
    
    1109
    +# Represents a batch of blobs queued for upload.
    
    1110
    +#
    
    1111
    +class _CASBatchUpdate():
    
    1112
    +    def __init__(self, remote):
    
    1113
    +        self._remote = remote
    
    1114
    +        self._max_total_size_bytes = remote.max_batch_total_size_bytes
    
    1115
    +        self._request = remote_execution_pb2.BatchUpdateBlobsRequest()
    
    1116
    +        self._size = 0
    
    1117
    +        self._sent = False
    
    1118
    +
    
    1119
    +    def add(self, digest, stream):
    
    1120
    +        assert not self._sent
    
    1121
    +
    
    1122
    +        new_batch_size = self._size + digest.size_bytes
    
    1123
    +        if new_batch_size > self._max_total_size_bytes:
    
    1124
    +            # Not enough space left in current batch
    
    1125
    +            return False
    
    1126
    +
    
    1127
    +        blob_request = self._request.requests.add()
    
    1128
    +        blob_request.digest.hash = digest.hash
    
    1129
    +        blob_request.digest.size_bytes = digest.size_bytes
    
    1130
    +        blob_request.data = stream.read(digest.size_bytes)
    
    1131
    +        self._size = new_batch_size
    
    1132
    +        return True
    
    1133
    +
    
    1134
    +    def send(self):
    
    1135
    +        assert not self._sent
    
    1136
    +        self._sent = True
    
    1137
    +
    
    1138
    +        if len(self._request.requests) == 0:
    
    1139
    +            return
    
    1140
    +
    
    1141
    +        batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
    
    1142
    +
    
    1143
    +        for response in batch_response.responses:
    
    1144
    +            if response.status.code != grpc.StatusCode.OK.value[0]:
    
    1145
    +                raise ArtifactError("Failed to upload blob {}: {}".format(
    
    1146
    +                    response.digest.hash, response.status.code))
    
    1147
    +
    
    1148
    +
    
    878 1149
     def _grouper(iterable, n):
    
    879 1150
         while True:
    
    880 1151
             try:
    

  • buildstream/_artifactcache/casserver.py
    ... ... @@ -24,6 +24,10 @@ import signal
    24 24
     import sys
    
    25 25
     import tempfile
    
    26 26
     import uuid
    
    27
    +import time
    
    28
    +import errno
    
    29
    +import ctypes
    
    30
    +import faulthandler
    
    27 31
     
    
    28 32
     import click
    
    29 33
     import grpc
    
    ... ... @@ -38,8 +42,13 @@ from .._context import Context
    38 42
     from .cascache import CASCache
    
    39 43
     
    
    40 44
     
    
    41
    -# The default limit for gRPC messages is 4 MiB
    
    42
    -_MAX_BATCH_TOTAL_SIZE_BYTES = 4 * 1024 * 1024
    
    45
    +# The default limit for gRPC messages is 4 MiB.
    
    46
    +# Limit payload to 1 MiB to leave sufficient headroom for metadata.
    
    47
    +_MAX_PAYLOAD_BYTES = 1024 * 1024
    
    48
    +
    
    49
    +# The minimum age in seconds for objects before they can be cleaned
    
    50
    +# up.
    
    51
    +_OBJECT_MIN_AGE = 6 * 60 * 60
    
    43 52
     
    
    44 53
     
    
    45 54
     # Trying to push an artifact that is too large
    
    ... ... @@ -69,7 +78,7 @@ def create_server(repo, *, enable_push):
    69 78
             _ByteStreamServicer(artifactcache, enable_push=enable_push), server)
    
    70 79
     
    
    71 80
         remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
    
    72
    -        _ContentAddressableStorageServicer(artifactcache), server)
    
    81
    +        _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
    
    73 82
     
    
    74 83
         remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
    
    75 84
             _CapabilitiesServicer(), server)
    
    ... ... @@ -89,6 +98,8 @@ def create_server(repo, *, enable_push):
    89 98
                   help="Allow clients to upload blobs and update artifact cache")
    
    90 99
     @click.argument('repo')
    
    91 100
     def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
    
    101
    +    faulthandler.register(signal.SIGUSR1, all_threads=True)
    
    102
    +
    
    92 103
         server = create_server(repo, enable_push=enable_push)
    
    93 104
     
    
    94 105
         use_tls = bool(server_key)
    
    ... ... @@ -130,11 +141,43 @@ def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
    130 141
             server.stop(0)
    
    131 142
     
    
    132 143
     
    
    144
    +class _FallocateCall:
    
    145
    +
    
    146
    +    FALLOC_FL_KEEP_SIZE = 1
    
    147
    +    FALLOC_FL_PUNCH_HOLE = 2
    
    148
    +    FALLOC_FL_NO_HIDE_STALE = 4
    
    149
    +    FALLOC_FL_COLLAPSE_RANGE = 8
    
    150
    +    FALLOC_FL_ZERO_RANGE = 16
    
    151
    +    FALLOC_FL_INSERT_RANGE = 32
    
    152
    +    FALLOC_FL_UNSHARE_RANGE = 64
    
    153
    +
    
    154
    +    def __init__(self):
    
    155
    +        self.libc = ctypes.CDLL("libc.so.6", use_errno=True)
    
    156
    +        try:
    
    157
    +            self.fallocate64 = self.libc.fallocate64
    
    158
    +        except AttributeError:
    
    159
    +            self.fallocate = self.libc.fallocate
    
    160
    +
    
    161
    +    def __call__(self, fd, mode, offset, length):
    
    162
    +        if hasattr(self, 'fallocate64'):
    
    163
    +            print(fd, mode, offset, length)
    
    164
    +            ret = self.fallocate64(ctypes.c_int(fd), ctypes.c_int(mode),
    
    165
    +                                   ctypes.c_int64(offset), ctypes.c_int64(length))
    
    166
    +        else:
    
    167
    +            ret = self.fallocate(ctypes.c_int(fd), ctypes.c_int(mode),
    
    168
    +                                 ctypes.c_int(offset), ctypes.c_int(length))
    
    169
    +        if ret == -1:
    
    170
    +            errno = ctypes.get_errno()
    
    171
    +            raise OSError(errno, os.strerror(errno))
    
    172
    +        return ret
    
    173
    +
    
    174
    +
    
    133 175
     class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    
    134 176
         def __init__(self, cas, *, enable_push):
    
    135 177
             super().__init__()
    
    136 178
             self.cas = cas
    
    137 179
             self.enable_push = enable_push
    
    180
    +        self.fallocate = _FallocateCall()
    
    138 181
     
    
    139 182
         def Read(self, request, context):
    
    140 183
             resource_name = request.resource_name
    
    ... ... @@ -158,7 +201,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    158 201
     
    
    159 202
                     remaining = client_digest.size_bytes - request.read_offset
    
    160 203
                     while remaining > 0:
    
    161
    -                    chunk_size = min(remaining, 64 * 1024)
    
    204
    +                    chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
    
    162 205
                         remaining -= chunk_size
    
    163 206
     
    
    164 207
                         response = bytestream_pb2.ReadResponse()
    
    ... ... @@ -192,25 +235,44 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    192 235
                             context.set_code(grpc.StatusCode.NOT_FOUND)
    
    193 236
                             return response
    
    194 237
     
    
    195
    -                    try:
    
    196
    -                        _clean_up_cache(self.cas, client_digest.size_bytes)
    
    197
    -                    except ArtifactTooLargeException as e:
    
    198
    -                        context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
    
    199
    -                        context.set_details(str(e))
    
    200
    -                        return response
    
    238
    +                    while True:
    
    239
    +                        if client_digest.size_bytes == 0:
    
    240
    +                            break
    
    241
    +                        try:
    
    242
    +                            _clean_up_cache(self.cas, client_digest.size_bytes)
    
    243
    +                        except ArtifactTooLargeException as e:
    
    244
    +                            context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
    
    245
    +                            context.set_details(str(e))
    
    246
    +                            return response
    
    247
    +
    
    248
    +                        try:
    
    249
    +                            self.fallocate(out.fileno(), 0, 0, client_digest.size_bytes)
    
    250
    +                            break
    
    251
    +                        except OSError as e:
    
    252
    +                            # Multiple upload can happen in the same time
    
    253
    +                            if e.errno != errno.ENOSPC:
    
    254
    +                                raise
    
    255
    +
    
    201 256
                     elif request.resource_name:
    
    202 257
                         # If it is set on subsequent calls, it **must** match the value of the first request.
    
    203 258
                         if request.resource_name != resource_name:
    
    204 259
                             context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    205 260
                             return response
    
    261
    +
    
    262
    +                if (offset + len(request.data)) > client_digest.size_bytes:
    
    263
    +                    context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    264
    +                    return response
    
    265
    +
    
    206 266
                     out.write(request.data)
    
    267
    +
    
    207 268
                     offset += len(request.data)
    
    269
    +
    
    208 270
                     if request.finish_write:
    
    209 271
                         if client_digest.size_bytes != offset:
    
    210 272
                             context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    211 273
                             return response
    
    212 274
                         out.flush()
    
    213
    -                    digest = self.cas.add_object(path=out.name)
    
    275
    +                    digest = self.cas.add_object(path=out.name, link_directly=True)
    
    214 276
                         if digest.hash != client_digest.hash:
    
    215 277
                             context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    216 278
                             return response
    
    ... ... @@ -223,17 +285,25 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    223 285
     
    
    224 286
     
    
    225 287
     class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
    
    226
    -    def __init__(self, cas):
    
    288
    +    def __init__(self, cas, *, enable_push):
    
    227 289
             super().__init__()
    
    228 290
             self.cas = cas
    
    291
    +        self.enable_push = enable_push
    
    229 292
     
    
    230 293
         def FindMissingBlobs(self, request, context):
    
    231 294
             response = remote_execution_pb2.FindMissingBlobsResponse()
    
    232 295
             for digest in request.blob_digests:
    
    233
    -            if not _has_object(self.cas, digest):
    
    234
    -                d = response.missing_blob_digests.add()
    
    235
    -                d.hash = digest.hash
    
    236
    -                d.size_bytes = digest.size_bytes
    
    296
    +            objpath = self.cas.objpath(digest)
    
    297
    +            try:
    
    298
    +                os.utime(objpath)
    
    299
    +            except OSError as e:
    
    300
    +                if e.errno != errno.ENOENT:
    
    301
    +                    raise
    
    302
    +                else:
    
    303
    +                    d = response.missing_blob_digests.add()
    
    304
    +                    d.hash = digest.hash
    
    305
    +                    d.size_bytes = digest.size_bytes
    
    306
    +
    
    237 307
             return response
    
    238 308
     
    
    239 309
         def BatchReadBlobs(self, request, context):
    
    ... ... @@ -242,7 +312,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
    242 312
     
    
    243 313
             for digest in request.digests:
    
    244 314
                 batch_size += digest.size_bytes
    
    245
    -            if batch_size > _MAX_BATCH_TOTAL_SIZE_BYTES:
    
    315
    +            if batch_size > _MAX_PAYLOAD_BYTES:
    
    246 316
                     context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    247 317
                     return response
    
    248 318
     
    
    ... ... @@ -261,6 +331,46 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
    261 331
     
    
    262 332
             return response
    
    263 333
     
    
    334
    +    def BatchUpdateBlobs(self, request, context):
    
    335
    +        response = remote_execution_pb2.BatchUpdateBlobsResponse()
    
    336
    +
    
    337
    +        if not self.enable_push:
    
    338
    +            context.set_code(grpc.StatusCode.PERMISSION_DENIED)
    
    339
    +            return response
    
    340
    +
    
    341
    +        batch_size = 0
    
    342
    +
    
    343
    +        for blob_request in request.requests:
    
    344
    +            digest = blob_request.digest
    
    345
    +
    
    346
    +            batch_size += digest.size_bytes
    
    347
    +            if batch_size > _MAX_PAYLOAD_BYTES:
    
    348
    +                context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    349
    +                return response
    
    350
    +
    
    351
    +            blob_response = response.responses.add()
    
    352
    +            blob_response.digest.hash = digest.hash
    
    353
    +            blob_response.digest.size_bytes = digest.size_bytes
    
    354
    +
    
    355
    +            if len(blob_request.data) != digest.size_bytes:
    
    356
    +                blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
    
    357
    +                continue
    
    358
    +
    
    359
    +            try:
    
    360
    +                _clean_up_cache(self.cas, digest.size_bytes)
    
    361
    +
    
    362
    +                with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
    
    363
    +                    out.write(blob_request.data)
    
    364
    +                    out.flush()
    
    365
    +                    server_digest = self.cas.add_object(path=out.name)
    
    366
    +                    if server_digest.hash != digest.hash:
    
    367
    +                        blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
    
    368
    +
    
    369
    +            except ArtifactTooLargeException:
    
    370
    +                blob_response.status.code = grpc.StatusCode.RESOURCE_EXHAUSTED
    
    371
    +
    
    372
    +        return response
    
    373
    +
    
    264 374
     
    
    265 375
     class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer):
    
    266 376
         def GetCapabilities(self, request, context):
    
    ... ... @@ -269,7 +379,7 @@ class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer):
    269 379
             cache_capabilities = response.cache_capabilities
    
    270 380
             cache_capabilities.digest_function.append(remote_execution_pb2.SHA256)
    
    271 381
             cache_capabilities.action_cache_update_capabilities.update_enabled = False
    
    272
    -        cache_capabilities.max_batch_total_size_bytes = _MAX_BATCH_TOTAL_SIZE_BYTES
    
    382
    +        cache_capabilities.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
    
    273 383
             cache_capabilities.symlink_absolute_path_strategy = remote_execution_pb2.CacheCapabilities.ALLOWED
    
    274 384
     
    
    275 385
             response.deprecated_api_version.major = 2
    
    ... ... @@ -362,11 +472,6 @@ def _digest_from_upload_resource_name(resource_name):
    362 472
             return None
    
    363 473
     
    
    364 474
     
    
    365
    -def _has_object(cas, digest):
    
    366
    -    objpath = cas.objpath(digest)
    
    367
    -    return os.path.exists(objpath)
    
    368
    -
    
    369
    -
    
    370 475
     # _clean_up_cache()
    
    371 476
     #
    
    372 477
     # Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
    
    ... ... @@ -399,7 +504,14 @@ def _clean_up_cache(cas, object_size):
    399 504
         # obtain a list of LRP artifacts
    
    400 505
         LRP_artifacts = cas.list_artifacts()
    
    401 506
     
    
    507
    +    keep_after = time.time() - _OBJECT_MIN_AGE
    
    508
    +
    
    402 509
         removed_size = 0  # in bytes
    
    510
    +    if object_size - removed_size > free_disk_space:
    
    511
    +        # First we try to see if some unreferenced objects became old
    
    512
    +        # enough to be removed.
    
    513
    +        removed_size += cas.prune(keep_after=keep_after)
    
    514
    +
    
    403 515
         while object_size - removed_size > free_disk_space:
    
    404 516
             try:
    
    405 517
                 to_remove = LRP_artifacts.pop(0)  # The first element in the list is the LRP artifact
    
    ... ... @@ -411,7 +523,8 @@ def _clean_up_cache(cas, object_size):
    411 523
                                                 "the filesystem which mounts the remote "
    
    412 524
                                                 "cache".format(object_size))
    
    413 525
     
    
    414
    -        removed_size += cas.remove(to_remove, defer_prune=False)
    
    526
    +        cas.remove(to_remove, defer_prune=True)
    
    527
    +        removed_size += cas.prune(keep_after=keep_after)
    
    415 528
     
    
    416 529
         if removed_size > 0:
    
    417 530
             logging.info("Successfully removed {} bytes from the cache".format(removed_size))
    

  • buildstream/_scheduler/jobs/job.py
    ... ... @@ -119,6 +119,8 @@ class Job():
    119 119
             self._result = None                    # Return value of child action in the parent
    
    120 120
             self._tries = 0                        # Try count, for retryable jobs
    
    121 121
             self._skipped_flag = False             # Indicate whether the job was skipped.
    
    122
    +        self._terminated = False               # Whether this job has been explicitly terminated
    
    123
    +
    
    122 124
             # If False, a retry will not be attempted regardless of whether _tries is less than _max_retries.
    
    123 125
             #
    
    124 126
             self._retry_flag = True
    
    ... ... @@ -188,6 +190,8 @@ class Job():
    188 190
             # Terminate the process using multiprocessing API pathway
    
    189 191
             self._process.terminate()
    
    190 192
     
    
    193
    +        self._terminated = True
    
    194
    +
    
    191 195
         # terminate_wait()
    
    192 196
         #
    
    193 197
         # Wait for terminated jobs to complete
    
    ... ... @@ -271,18 +275,22 @@ class Job():
    271 275
         # running the integration commands).
    
    272 276
         #
    
    273 277
         # Args:
    
    274
    -    #     (int): The plugin identifier for this task
    
    278
    +    #     task_id (int): The plugin identifier for this task
    
    275 279
         #
    
    276 280
         def set_task_id(self, task_id):
    
    277 281
             self._task_id = task_id
    
    278 282
     
    
    279 283
         # skipped
    
    280 284
         #
    
    285
    +    # This will evaluate to True if the job was skipped
    
    286
    +    # during processing, or if it was forcefully terminated.
    
    287
    +    #
    
    281 288
         # Returns:
    
    282
    -    #    bool: True if the job was skipped while processing.
    
    289
    +    #    (bool): Whether the job should appear as skipped
    
    290
    +    #
    
    283 291
         @property
    
    284 292
         def skipped(self):
    
    285
    -        return self._skipped_flag
    
    293
    +        return self._skipped_flag or self._terminated
    
    286 294
     
    
    287 295
         #######################################################
    
    288 296
         #                  Abstract Methods                   #
    

  • buildstream/_scheduler/queues/buildqueue.py
    ... ... @@ -65,7 +65,7 @@ class BuildQueue(Queue):
    65 65
             # If the estimated size outgrows the quota, ask the scheduler
    
    66 66
             # to queue a job to actually check the real cache size.
    
    67 67
             #
    
    68
    -        if artifacts.get_quota_exceeded():
    
    68
    +        if artifacts.has_quota_exceeded():
    
    69 69
                 self._scheduler.check_cache_size()
    
    70 70
     
    
    71 71
         def done(self, job, element, result, success):
    

  • buildstream/_scheduler/queues/queue.py
    ... ... @@ -325,15 +325,22 @@ class Queue():
    325 325
                               detail=traceback.format_exc())
    
    326 326
                 self.failed_elements.append(element)
    
    327 327
             else:
    
    328
    -
    
    329
    -            # No exception occured, handle the success/failure state in the normal way
    
    330 328
                 #
    
    329
    +            # No exception occured in post processing
    
    330
    +            #
    
    331
    +
    
    332
    +            # Only place in the output done queue if the job
    
    333
    +            # was considered successful
    
    331 334
                 if success:
    
    332 335
                     self._done_queue.append(job)
    
    333
    -                if not job.skipped:
    
    334
    -                    self.processed_elements.append(element)
    
    335
    -                else:
    
    336
    -                    self.skipped_elements.append(element)
    
    336
    +
    
    337
    +            # A Job can be skipped whether or not it has failed,
    
    338
    +            # we want to only bookkeep them as processed or failed
    
    339
    +            # if they are not skipped.
    
    340
    +            if job.skipped:
    
    341
    +                self.skipped_elements.append(element)
    
    342
    +            elif success:
    
    343
    +                self.processed_elements.append(element)
    
    337 344
                 else:
    
    338 345
                     self.failed_elements.append(element)
    
    339 346
     
    

  • buildstream/_scheduler/scheduler.py
    ... ... @@ -349,7 +349,7 @@ class Scheduler():
    349 349
             platform = Platform.get_platform()
    
    350 350
             artifacts = platform.artifactcache
    
    351 351
     
    
    352
    -        if not artifacts.get_quota_exceeded():
    
    352
    +        if not artifacts.has_quota_exceeded():
    
    353 353
                 return
    
    354 354
     
    
    355 355
             job = CleanupJob(self, 'cleanup', 'cleanup/cleanup',
    
    ... ... @@ -387,6 +387,15 @@ class Scheduler():
    387 387
         # A loop registered event callback for keyboard interrupts
    
    388 388
         #
    
    389 389
         def _interrupt_event(self):
    
    390
    +
    
    391
    +        # FIXME: This should not be needed, but for some reason we receive an
    
    392
    +        #        additional SIGINT event when the user hits ^C a second time
    
    393
    +        #        to inform us that they really intend to terminate; even though
    
    394
    +        #        we have disconnected our handlers at this time.
    
    395
    +        #
    
    396
    +        if self.terminated:
    
    397
    +            return
    
    398
    +
    
    390 399
             # Leave this to the frontend to decide, if no
    
    391 400
             # interrrupt callback was specified, then just terminate.
    
    392 401
             if self._interrupt_callback:
    

  • buildstream/plugins/sources/git.py
    ... ... @@ -164,10 +164,18 @@ class GitMirror(SourceFetcher):
    164 164
                              cwd=self.mirror)
    
    165 165
     
    
    166 166
         def fetch(self, alias_override=None):
    
    167
    -        self.ensure(alias_override)
    
    168
    -        if not self.has_ref():
    
    169
    -            self._fetch(alias_override)
    
    170
    -        self.assert_ref()
    
    167
    +        # Resolve the URL for the message
    
    168
    +        resolved_url = self.source.translate_url(self.url,
    
    169
    +                                                 alias_override=alias_override,
    
    170
    +                                                 primary=self.primary)
    
    171
    +
    
    172
    +        with self.source.timed_activity("Fetching from {}"
    
    173
    +                                        .format(resolved_url),
    
    174
    +                                        silent_nested=True):
    
    175
    +            self.ensure(alias_override)
    
    176
    +            if not self.has_ref():
    
    177
    +                self._fetch(alias_override)
    
    178
    +            self.assert_ref()
    
    171 179
     
    
    172 180
         def has_ref(self):
    
    173 181
             if not self.ref:
    

  • buildstream/source.py
    ... ... @@ -585,28 +585,48 @@ class Source(Plugin):
    585 585
         #
    
    586 586
         def _fetch(self):
    
    587 587
             project = self._get_project()
    
    588
    -        source_fetchers = self.get_source_fetchers()
    
    588
    +        context = self._get_context()
    
    589
    +
    
    590
    +        # Silence the STATUS messages which might happen as a result
    
    591
    +        # of checking the source fetchers.
    
    592
    +        with context.silence():
    
    593
    +            source_fetchers = self.get_source_fetchers()
    
    589 594
     
    
    590 595
             # Use the source fetchers if they are provided
    
    591 596
             #
    
    592 597
             if source_fetchers:
    
    593
    -            for fetcher in source_fetchers:
    
    594
    -                alias = fetcher._get_alias()
    
    595
    -                for uri in project.get_alias_uris(alias, first_pass=self.__first_pass):
    
    596
    -                    try:
    
    597
    -                        fetcher.fetch(uri)
    
    598
    -                    # FIXME: Need to consider temporary vs. permanent failures,
    
    599
    -                    #        and how this works with retries.
    
    600
    -                    except BstError as e:
    
    601
    -                        last_error = e
    
    602
    -                        continue
    
    603
    -
    
    604
    -                    # No error, we're done with this fetcher
    
    605
    -                    break
    
    606 598
     
    
    607
    -                else:
    
    608
    -                    # No break occurred, raise the last detected error
    
    609
    -                    raise last_error
    
    599
    +            # Use a contorted loop here, this is to allow us to
    
    600
    +            # silence the messages which can result from consuming
    
    601
    +            # the items of source_fetchers, if it happens to be a generator.
    
    602
    +            #
    
    603
    +            source_fetchers = iter(source_fetchers)
    
    604
    +            try:
    
    605
    +
    
    606
    +                while True:
    
    607
    +
    
    608
    +                    with context.silence():
    
    609
    +                        fetcher = next(source_fetchers)
    
    610
    +
    
    611
    +                    alias = fetcher._get_alias()
    
    612
    +                    for uri in project.get_alias_uris(alias, first_pass=self.__first_pass):
    
    613
    +                        try:
    
    614
    +                            fetcher.fetch(uri)
    
    615
    +                        # FIXME: Need to consider temporary vs. permanent failures,
    
    616
    +                        #        and how this works with retries.
    
    617
    +                        except BstError as e:
    
    618
    +                            last_error = e
    
    619
    +                            continue
    
    620
    +
    
    621
    +                        # No error, we're done with this fetcher
    
    622
    +                        break
    
    623
    +
    
    624
    +                    else:
    
    625
    +                        # No break occurred, raise the last detected error
    
    626
    +                        raise last_error
    
    627
    +
    
    628
    +            except StopIteration:
    
    629
    +                pass
    
    610 630
     
    
    611 631
             # Default codepath is to reinstantiate the Source
    
    612 632
             #
    

  • buildstream/utils.py
    ... ... @@ -496,7 +496,7 @@ def get_bst_version():
    496 496
     
    
    497 497
     @contextmanager
    
    498 498
     def save_file_atomic(filename, mode='w', *, buffering=-1, encoding=None,
    
    499
    -                     errors=None, newline=None, closefd=True, opener=None):
    
    499
    +                     errors=None, newline=None, closefd=True, opener=None, tempdir=None):
    
    500 500
         """Save a file with a temporary name and rename it into place when ready.
    
    501 501
     
    
    502 502
         This is a context manager which is meant for saving data to files.
    
    ... ... @@ -523,8 +523,9 @@ def save_file_atomic(filename, mode='w', *, buffering=-1, encoding=None,
    523 523
         # https://bugs.python.org/issue8604
    
    524 524
     
    
    525 525
         assert os.path.isabs(filename), "The utils.save_file_atomic() parameter ``filename`` must be an absolute path"
    
    526
    -    dirname = os.path.dirname(filename)
    
    527
    -    fd, tempname = tempfile.mkstemp(dir=dirname)
    
    526
    +    if tempdir is None:
    
    527
    +        tempdir = os.path.dirname(filename)
    
    528
    +    fd, tempname = tempfile.mkstemp(dir=tempdir)
    
    528 529
         os.close(fd)
    
    529 530
     
    
    530 531
         f = open(tempname, mode=mode, buffering=buffering, encoding=encoding,
    
    ... ... @@ -556,6 +557,9 @@ def save_file_atomic(filename, mode='w', *, buffering=-1, encoding=None,
    556 557
     #
    
    557 558
     # Get the disk usage of a given directory in bytes.
    
    558 559
     #
    
    560
    +# This function assumes that files do not inadvertantly
    
    561
    +# disappear while this function is running.
    
    562
    +#
    
    559 563
     # Arguments:
    
    560 564
     #     (str) The path whose size to check.
    
    561 565
     #
    
    ... ... @@ -675,7 +679,7 @@ def _force_rmtree(rootpath, **kwargs):
    675 679
     
    
    676 680
         try:
    
    677 681
             shutil.rmtree(rootpath, **kwargs)
    
    678
    -    except shutil.Error as e:
    
    682
    +    except OSError as e:
    
    679 683
             raise UtilError("Failed to remove cache directory '{}': {}"
    
    680 684
                             .format(rootpath, e))
    
    681 685
     
    

  • doc/examples/autotools/project.conf
    ... ... @@ -9,5 +9,5 @@ element-path: elements
    9 9
     
    
    10 10
     # Define some aliases for the tarballs we download
    
    11 11
     aliases:
    
    12
    -  alpine: https://gnome7.codethink.co.uk/tarballs/
    
    12
    +  alpine: https://bst-integration-test-images.ams3.cdn.digitaloceanspaces.com/
    
    13 13
       gnu: http://ftpmirror.gnu.org/gnu/automake/

  • doc/examples/integration-commands/project.conf
    ... ... @@ -9,4 +9,4 @@ element-path: elements
    9 9
     
    
    10 10
     # Define an alias for our alpine tarball
    
    11 11
     aliases:
    
    12
    -  alpine: https://gnome7.codethink.co.uk/tarballs/
    12
    +  alpine: https://bst-integration-test-images.ams3.cdn.digitaloceanspaces.com/

  • doc/examples/running-commands/project.conf
    ... ... @@ -9,4 +9,4 @@ element-path: elements
    9 9
     
    
    10 10
     # Define an alias for our alpine tarball
    
    11 11
     aliases:
    
    12
    -  alpine: https://gnome7.codethink.co.uk/tarballs/
    12
    +  alpine: https://bst-integration-test-images.ams3.cdn.digitaloceanspaces.com/

  • tests/frontend/push.py
    ... ... @@ -231,6 +231,8 @@ def test_artifact_expires(cli, datafiles, tmpdir):
    231 231
             assert cli.get_element_state(project, 'element2.bst') == 'cached'
    
    232 232
             assert_shared(cli, share, project, 'element2.bst')
    
    233 233
     
    
    234
    +        share.make_all_objects_older()
    
    235
    +
    
    234 236
             # Create and build another element of 5 MB (This will exceed the free disk space available)
    
    235 237
             create_element_size('element3.bst', project, element_path, [], int(5e6))
    
    236 238
             result = cli.run(project=project, args=['build', 'element3.bst'])
    
    ... ... @@ -327,6 +329,8 @@ def test_recently_pulled_artifact_does_not_expire(cli, datafiles, tmpdir):
    327 329
             # Ensure element1 is cached locally
    
    328 330
             assert cli.get_element_state(project, 'element1.bst') == 'cached'
    
    329 331
     
    
    332
    +        share.make_all_objects_older()
    
    333
    +
    
    330 334
             # Create and build the element3 (of 5 MB)
    
    331 335
             create_element_size('element3.bst', project, element_path, [], int(5e6))
    
    332 336
             result = cli.run(project=project, args=['build', 'element3.bst'])
    

  • tests/integration/project/project.conf
    ... ... @@ -2,7 +2,7 @@
    2 2
     name: test
    
    3 3
     element-path: elements
    
    4 4
     aliases:
    
    5
    -  alpine: https://gnome7.codethink.co.uk/tarballs/
    
    5
    +  alpine: https://bst-integration-test-images.ams3.cdn.digitaloceanspaces.com/
    
    6 6
       project_dir: file://{project_dir}
    
    7 7
     options:
    
    8 8
       linux:
    

  • tests/testutils/artifactshare.py
    ... ... @@ -122,6 +122,15 @@ class ArtifactShare():
    122 122
             except ArtifactError:
    
    123 123
                 return False
    
    124 124
     
    
    125
    +    def make_all_objects_older(self):
    
    126
    +        for root, dirs, files in os.walk(os.path.join(self.cas.casdir, 'objects')):
    
    127
    +            for name in files:
    
    128
    +                fullname = os.path.join(root, name)
    
    129
    +                st = os.stat(fullname)
    
    130
    +                mtime = st.st_mtime - 6 * 60 * 60
    
    131
    +                atime = st.st_atime - 6 * 60 * 60
    
    132
    +                os.utime(fullname, times=(atime, mtime))
    
    133
    +
    
    125 134
         # close():
    
    126 135
         #
    
    127 136
         # Remove the artifact share.
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]