[Notes] [Git][BuildStream/buildstream][bst-1.2] 4 commits: cascache.py: Preparation for remote execution



Title: GitLab

Tristan Van Berkom pushed to branch bst-1.2 at BuildStream / buildstream

Commits:

2 changed files:

Changes:

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -81,6 +81,7 @@ class CASCache(ArtifactCache):
    81 81
         ################################################
    
    82 82
         #     Implementation of abstract methods       #
    
    83 83
         ################################################
    
    84
    +
    
    84 85
         def contains(self, element, key):
    
    85 86
             refpath = self._refpath(self.get_artifact_fullname(element, key))
    
    86 87
     
    
    ... ... @@ -156,6 +157,7 @@ class CASCache(ArtifactCache):
    156 157
             q = multiprocessing.Queue()
    
    157 158
             for remote_spec in remote_specs:
    
    158 159
                 # Use subprocess to avoid creation of gRPC threads in main BuildStream process
    
    160
    +            # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
    
    159 161
                 p = multiprocessing.Process(target=self._initialize_remote, args=(remote_spec, q))
    
    160 162
     
    
    161 163
                 try:
    
    ... ... @@ -268,109 +270,69 @@ class CASCache(ArtifactCache):
    268 270
     
    
    269 271
             self.set_ref(newref, tree)
    
    270 272
     
    
    273
    +    def _push_refs_to_remote(self, refs, remote):
    
    274
    +        skipped_remote = True
    
    275
    +        try:
    
    276
    +            for ref in refs:
    
    277
    +                tree = self.resolve_ref(ref)
    
    278
    +
    
    279
    +                # Check whether ref is already on the server in which case
    
    280
    +                # there is no need to push the artifact
    
    281
    +                try:
    
    282
    +                    request = buildstream_pb2.GetReferenceRequest()
    
    283
    +                    request.key = ref
    
    284
    +                    response = remote.ref_storage.GetReference(request)
    
    285
    +
    
    286
    +                    if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
    
    287
    +                        # ref is already on the server with the same tree
    
    288
    +                        continue
    
    289
    +
    
    290
    +                except grpc.RpcError as e:
    
    291
    +                    if e.code() != grpc.StatusCode.NOT_FOUND:
    
    292
    +                        # Intentionally re-raise RpcError for outer except block.
    
    293
    +                        raise
    
    294
    +
    
    295
    +                self._send_directory(remote, tree)
    
    296
    +
    
    297
    +                request = buildstream_pb2.UpdateReferenceRequest()
    
    298
    +                request.keys.append(ref)
    
    299
    +                request.digest.hash = tree.hash
    
    300
    +                request.digest.size_bytes = tree.size_bytes
    
    301
    +                remote.ref_storage.UpdateReference(request)
    
    302
    +
    
    303
    +                skipped_remote = False
    
    304
    +        except grpc.RpcError as e:
    
    305
    +            if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
    
    306
    +                raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
    
    307
    +
    
    308
    +        return not skipped_remote
    
    309
    +
    
    271 310
         def push(self, element, keys):
    
    272
    -        refs = [self.get_artifact_fullname(element, key) for key in keys]
    
    311
    +
    
    312
    +        refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
    
    273 313
     
    
    274 314
             project = element._get_project()
    
    275 315
     
    
    276 316
             push_remotes = [r for r in self._remotes[project] if r.spec.push]
    
    277 317
     
    
    278 318
             pushed = False
    
    279
    -        display_key = element._get_brief_display_key()
    
    319
    +
    
    280 320
             for remote in push_remotes:
    
    281 321
                 remote.init()
    
    282
    -            skipped_remote = True
    
    322
    +            display_key = element._get_brief_display_key()
    
    283 323
                 element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
    
    284 324
     
    
    285
    -            try:
    
    286
    -                for ref in refs:
    
    287
    -                    tree = self.resolve_ref(ref)
    
    288
    -
    
    289
    -                    # Check whether ref is already on the server in which case
    
    290
    -                    # there is no need to push the artifact
    
    291
    -                    try:
    
    292
    -                        request = buildstream_pb2.GetReferenceRequest()
    
    293
    -                        request.key = ref
    
    294
    -                        response = remote.ref_storage.GetReference(request)
    
    295
    -
    
    296
    -                        if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
    
    297
    -                            # ref is already on the server with the same tree
    
    298
    -                            continue
    
    299
    -
    
    300
    -                    except grpc.RpcError as e:
    
    301
    -                        if e.code() != grpc.StatusCode.NOT_FOUND:
    
    302
    -                            # Intentionally re-raise RpcError for outer except block.
    
    303
    -                            raise
    
    304
    -
    
    305
    -                    missing_blobs = {}
    
    306
    -                    required_blobs = self._required_blobs(tree)
    
    307
    -
    
    308
    -                    # Limit size of FindMissingBlobs request
    
    309
    -                    for required_blobs_group in _grouper(required_blobs, 512):
    
    310
    -                        request = remote_execution_pb2.FindMissingBlobsRequest()
    
    311
    -
    
    312
    -                        for required_digest in required_blobs_group:
    
    313
    -                            d = request.blob_digests.add()
    
    314
    -                            d.hash = required_digest.hash
    
    315
    -                            d.size_bytes = required_digest.size_bytes
    
    316
    -
    
    317
    -                        response = remote.cas.FindMissingBlobs(request)
    
    318
    -                        for digest in response.missing_blob_digests:
    
    319
    -                            d = remote_execution_pb2.Digest()
    
    320
    -                            d.hash = digest.hash
    
    321
    -                            d.size_bytes = digest.size_bytes
    
    322
    -                            missing_blobs[d.hash] = d
    
    323
    -
    
    324
    -                    # Upload any blobs missing on the server
    
    325
    -                    skipped_remote = False
    
    326
    -                    for digest in missing_blobs.values():
    
    327
    -                        uuid_ = uuid.uuid4()
    
    328
    -                        resource_name = '/'.join(['uploads', str(uuid_), 'blobs',
    
    329
    -                                                  digest.hash, str(digest.size_bytes)])
    
    330
    -
    
    331
    -                        def request_stream(resname):
    
    332
    -                            with open(self.objpath(digest), 'rb') as f:
    
    333
    -                                assert os.fstat(f.fileno()).st_size == digest.size_bytes
    
    334
    -                                offset = 0
    
    335
    -                                finished = False
    
    336
    -                                remaining = digest.size_bytes
    
    337
    -                                while not finished:
    
    338
    -                                    chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
    
    339
    -                                    remaining -= chunk_size
    
    340
    -
    
    341
    -                                    request = bytestream_pb2.WriteRequest()
    
    342
    -                                    request.write_offset = offset
    
    343
    -                                    # max. _MAX_PAYLOAD_BYTES chunks
    
    344
    -                                    request.data = f.read(chunk_size)
    
    345
    -                                    request.resource_name = resname
    
    346
    -                                    request.finish_write = remaining <= 0
    
    347
    -                                    yield request
    
    348
    -                                    offset += chunk_size
    
    349
    -                                    finished = request.finish_write
    
    350
    -                        response = remote.bytestream.Write(request_stream(resource_name))
    
    351
    -
    
    352
    -                    request = buildstream_pb2.UpdateReferenceRequest()
    
    353
    -                    request.keys.append(ref)
    
    354
    -                    request.digest.hash = tree.hash
    
    355
    -                    request.digest.size_bytes = tree.size_bytes
    
    356
    -                    remote.ref_storage.UpdateReference(request)
    
    357
    -
    
    358
    -                    pushed = True
    
    359
    -
    
    360
    -                if not skipped_remote:
    
    361
    -                    element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
    
    362
    -
    
    363
    -            except grpc.RpcError as e:
    
    364
    -                if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
    
    365
    -                    raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
    
    366
    -
    
    367
    -            if skipped_remote:
    
    325
    +            if self._push_refs_to_remote(refs, remote):
    
    326
    +                element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
    
    327
    +                pushed = True
    
    328
    +            else:
    
    368 329
                     self.context.message(Message(
    
    369 330
                         None,
    
    370 331
                         MessageType.INFO,
    
    371 332
                         "Remote ({}) already has {} cached".format(
    
    372 333
                             remote.spec.url, element._get_brief_display_key())
    
    373 334
                     ))
    
    335
    +
    
    374 336
             return pushed
    
    375 337
     
    
    376 338
         ################################################
    
    ... ... @@ -599,6 +561,7 @@ class CASCache(ArtifactCache):
    599 561
         ################################################
    
    600 562
         #             Local Private Methods            #
    
    601 563
         ################################################
    
    564
    +
    
    602 565
         def _checkout(self, dest, tree):
    
    603 566
             os.makedirs(dest, exist_ok=True)
    
    604 567
     
    
    ... ... @@ -776,16 +739,16 @@ class CASCache(ArtifactCache):
    776 739
                 #
    
    777 740
                 q.put(str(e))
    
    778 741
     
    
    779
    -    def _required_blobs(self, tree):
    
    742
    +    def _required_blobs(self, directory_digest):
    
    780 743
             # parse directory, and recursively add blobs
    
    781 744
             d = remote_execution_pb2.Digest()
    
    782
    -        d.hash = tree.hash
    
    783
    -        d.size_bytes = tree.size_bytes
    
    745
    +        d.hash = directory_digest.hash
    
    746
    +        d.size_bytes = directory_digest.size_bytes
    
    784 747
             yield d
    
    785 748
     
    
    786 749
             directory = remote_execution_pb2.Directory()
    
    787 750
     
    
    788
    -        with open(self.objpath(tree), 'rb') as f:
    
    751
    +        with open(self.objpath(directory_digest), 'rb') as f:
    
    789 752
                 directory.ParseFromString(f.read())
    
    790 753
     
    
    791 754
             for filenode in directory.files:
    
    ... ... @@ -797,16 +760,16 @@ class CASCache(ArtifactCache):
    797 760
             for dirnode in directory.directories:
    
    798 761
                 yield from self._required_blobs(dirnode.digest)
    
    799 762
     
    
    800
    -    def _fetch_blob(self, remote, digest, out):
    
    763
    +    def _fetch_blob(self, remote, digest, stream):
    
    801 764
             resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
    
    802 765
             request = bytestream_pb2.ReadRequest()
    
    803 766
             request.resource_name = resource_name
    
    804 767
             request.read_offset = 0
    
    805 768
             for response in remote.bytestream.Read(request):
    
    806
    -            out.write(response.data)
    
    769
    +            stream.write(response.data)
    
    770
    +        stream.flush()
    
    807 771
     
    
    808
    -        out.flush()
    
    809
    -        assert digest.size_bytes == os.fstat(out.fileno()).st_size
    
    772
    +        assert digest.size_bytes == os.fstat(stream.fileno()).st_size
    
    810 773
     
    
    811 774
         # _ensure_blob():
    
    812 775
         #
    
    ... ... @@ -922,6 +885,79 @@ class CASCache(ArtifactCache):
    922 885
             # Fetch final batch
    
    923 886
             self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
    
    924 887
     
    
    888
    +    def _send_blob(self, remote, digest, stream, u_uid=uuid.uuid4()):
    
    889
    +        resource_name = '/'.join(['uploads', str(u_uid), 'blobs',
    
    890
    +                                  digest.hash, str(digest.size_bytes)])
    
    891
    +
    
    892
    +        def request_stream(resname, instream):
    
    893
    +            offset = 0
    
    894
    +            finished = False
    
    895
    +            remaining = digest.size_bytes
    
    896
    +            while not finished:
    
    897
    +                chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
    
    898
    +                remaining -= chunk_size
    
    899
    +
    
    900
    +                request = bytestream_pb2.WriteRequest()
    
    901
    +                request.write_offset = offset
    
    902
    +                # max. _MAX_PAYLOAD_BYTES chunks
    
    903
    +                request.data = instream.read(chunk_size)
    
    904
    +                request.resource_name = resname
    
    905
    +                request.finish_write = remaining <= 0
    
    906
    +
    
    907
    +                yield request
    
    908
    +
    
    909
    +                offset += chunk_size
    
    910
    +                finished = request.finish_write
    
    911
    +
    
    912
    +        response = remote.bytestream.Write(request_stream(resource_name, stream))
    
    913
    +
    
    914
    +        assert response.committed_size == digest.size_bytes
    
    915
    +
    
    916
    +    def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
    
    917
    +        required_blobs = self._required_blobs(digest)
    
    918
    +
    
    919
    +        missing_blobs = dict()
    
    920
    +        # Limit size of FindMissingBlobs request
    
    921
    +        for required_blobs_group in _grouper(required_blobs, 512):
    
    922
    +            request = remote_execution_pb2.FindMissingBlobsRequest()
    
    923
    +
    
    924
    +            for required_digest in required_blobs_group:
    
    925
    +                d = request.blob_digests.add()
    
    926
    +                d.hash = required_digest.hash
    
    927
    +                d.size_bytes = required_digest.size_bytes
    
    928
    +
    
    929
    +            response = remote.cas.FindMissingBlobs(request)
    
    930
    +            for missing_digest in response.missing_blob_digests:
    
    931
    +                d = remote_execution_pb2.Digest()
    
    932
    +                d.hash = missing_digest.hash
    
    933
    +                d.size_bytes = missing_digest.size_bytes
    
    934
    +                missing_blobs[d.hash] = d
    
    935
    +
    
    936
    +        # Upload any blobs missing on the server
    
    937
    +        self._send_blobs(remote, missing_blobs.values(), u_uid)
    
    938
    +
    
    939
    +    def _send_blobs(self, remote, digests, u_uid=uuid.uuid4()):
    
    940
    +        batch = _CASBatchUpdate(remote)
    
    941
    +
    
    942
    +        for digest in digests:
    
    943
    +            with open(self.objpath(digest), 'rb') as f:
    
    944
    +                assert os.fstat(f.fileno()).st_size == digest.size_bytes
    
    945
    +
    
    946
    +                if (digest.size_bytes >= remote.max_batch_total_size_bytes or
    
    947
    +                        not remote.batch_update_supported):
    
    948
    +                    # Too large for batch request, upload in independent request.
    
    949
    +                    self._send_blob(remote, digest, f, u_uid=u_uid)
    
    950
    +                else:
    
    951
    +                    if not batch.add(digest, f):
    
    952
    +                        # Not enough space left in batch request.
    
    953
    +                        # Complete pending batch first.
    
    954
    +                        batch.send()
    
    955
    +                        batch = _CASBatchUpdate(remote)
    
    956
    +                        batch.add(digest, f)
    
    957
    +
    
    958
    +        # Send final batch
    
    959
    +        batch.send()
    
    960
    +
    
    925 961
     
    
    926 962
     # Represents a single remote CAS cache.
    
    927 963
     #
    
    ... ... @@ -995,6 +1031,17 @@ class _CASRemote():
    995 1031
                     if e.code() != grpc.StatusCode.UNIMPLEMENTED:
    
    996 1032
                         raise
    
    997 1033
     
    
    1034
    +            # Check whether the server supports BatchUpdateBlobs()
    
    1035
    +            self.batch_update_supported = False
    
    1036
    +            try:
    
    1037
    +                request = remote_execution_pb2.BatchUpdateBlobsRequest()
    
    1038
    +                response = self.cas.BatchUpdateBlobs(request)
    
    1039
    +                self.batch_update_supported = True
    
    1040
    +            except grpc.RpcError as e:
    
    1041
    +                if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
    
    1042
    +                        e.code() != grpc.StatusCode.PERMISSION_DENIED):
    
    1043
    +                    raise
    
    1044
    +
    
    998 1045
                 self._initialized = True
    
    999 1046
     
    
    1000 1047
     
    
    ... ... @@ -1042,6 +1089,46 @@ class _CASBatchRead():
    1042 1089
                 yield (response.digest, response.data)
    
    1043 1090
     
    
    1044 1091
     
    
    1092
    +# Represents a batch of blobs queued for upload.
    
    1093
    +#
    
    1094
    +class _CASBatchUpdate():
    
    1095
    +    def __init__(self, remote):
    
    1096
    +        self._remote = remote
    
    1097
    +        self._max_total_size_bytes = remote.max_batch_total_size_bytes
    
    1098
    +        self._request = remote_execution_pb2.BatchUpdateBlobsRequest()
    
    1099
    +        self._size = 0
    
    1100
    +        self._sent = False
    
    1101
    +
    
    1102
    +    def add(self, digest, stream):
    
    1103
    +        assert not self._sent
    
    1104
    +
    
    1105
    +        new_batch_size = self._size + digest.size_bytes
    
    1106
    +        if new_batch_size > self._max_total_size_bytes:
    
    1107
    +            # Not enough space left in current batch
    
    1108
    +            return False
    
    1109
    +
    
    1110
    +        blob_request = self._request.requests.add()
    
    1111
    +        blob_request.digest.hash = digest.hash
    
    1112
    +        blob_request.digest.size_bytes = digest.size_bytes
    
    1113
    +        blob_request.data = stream.read(digest.size_bytes)
    
    1114
    +        self._size = new_batch_size
    
    1115
    +        return True
    
    1116
    +
    
    1117
    +    def send(self):
    
    1118
    +        assert not self._sent
    
    1119
    +        self._sent = True
    
    1120
    +
    
    1121
    +        if len(self._request.requests) == 0:
    
    1122
    +            return
    
    1123
    +
    
    1124
    +        batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
    
    1125
    +
    
    1126
    +        for response in batch_response.responses:
    
    1127
    +            if response.status.code != grpc.StatusCode.OK.value[0]:
    
    1128
    +                raise ArtifactError("Failed to upload blob {}: {}".format(
    
    1129
    +                    response.digest.hash, response.status.code))
    
    1130
    +
    
    1131
    +
    
    1045 1132
     def _grouper(iterable, n):
    
    1046 1133
         while True:
    
    1047 1134
             try:
    

  • buildstream/_artifactcache/casserver.py
    ... ... @@ -70,7 +70,7 @@ def create_server(repo, *, enable_push):
    70 70
             _ByteStreamServicer(artifactcache, enable_push=enable_push), server)
    
    71 71
     
    
    72 72
         remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
    
    73
    -        _ContentAddressableStorageServicer(artifactcache), server)
    
    73
    +        _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
    
    74 74
     
    
    75 75
         remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
    
    76 76
             _CapabilitiesServicer(), server)
    
    ... ... @@ -224,9 +224,10 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    224 224
     
    
    225 225
     
    
    226 226
     class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
    
    227
    -    def __init__(self, cas):
    
    227
    +    def __init__(self, cas, *, enable_push):
    
    228 228
             super().__init__()
    
    229 229
             self.cas = cas
    
    230
    +        self.enable_push = enable_push
    
    230 231
     
    
    231 232
         def FindMissingBlobs(self, request, context):
    
    232 233
             response = remote_execution_pb2.FindMissingBlobsResponse()
    
    ... ... @@ -262,6 +263,46 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
    262 263
     
    
    263 264
             return response
    
    264 265
     
    
    266
    +    def BatchUpdateBlobs(self, request, context):
    
    267
    +        response = remote_execution_pb2.BatchUpdateBlobsResponse()
    
    268
    +
    
    269
    +        if not self.enable_push:
    
    270
    +            context.set_code(grpc.StatusCode.PERMISSION_DENIED)
    
    271
    +            return response
    
    272
    +
    
    273
    +        batch_size = 0
    
    274
    +
    
    275
    +        for blob_request in request.requests:
    
    276
    +            digest = blob_request.digest
    
    277
    +
    
    278
    +            batch_size += digest.size_bytes
    
    279
    +            if batch_size > _MAX_PAYLOAD_BYTES:
    
    280
    +                context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    281
    +                return response
    
    282
    +
    
    283
    +            blob_response = response.responses.add()
    
    284
    +            blob_response.digest.hash = digest.hash
    
    285
    +            blob_response.digest.size_bytes = digest.size_bytes
    
    286
    +
    
    287
    +            if len(blob_request.data) != digest.size_bytes:
    
    288
    +                blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
    
    289
    +                continue
    
    290
    +
    
    291
    +            try:
    
    292
    +                _clean_up_cache(self.cas, digest.size_bytes)
    
    293
    +
    
    294
    +                with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
    
    295
    +                    out.write(blob_request.data)
    
    296
    +                    out.flush()
    
    297
    +                    server_digest = self.cas.add_object(path=out.name)
    
    298
    +                    if server_digest.hash != digest.hash:
    
    299
    +                        blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
    
    300
    +
    
    301
    +            except ArtifactTooLargeException:
    
    302
    +                blob_response.status.code = grpc.StatusCode.RESOURCE_EXHAUSTED
    
    303
    +
    
    304
    +        return response
    
    305
    +
    
    265 306
     
    
    266 307
     class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer):
    
    267 308
         def GetCapabilities(self, request, context):
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]