[Notes] [Git][BuildStream/buildstream][raoul/802-refactor-artifactcache] 5 commits: cas: move initialization check to CASRemote



Title: GitLab

Raoul Hidalgo Charman pushed to branch raoul/802-refactor-artifactcache at BuildStream / buildstream

Commits:

4 changed files:

Changes:

  • buildstream/_artifactcache.py
    ... ... @@ -19,14 +19,12 @@
    19 19
     
    
    20 20
     import multiprocessing
    
    21 21
     import os
    
    22
    -import signal
    
    23 22
     import string
    
    24 23
     from collections.abc import Mapping
    
    25 24
     
    
    26 25
     from .types import _KeyStrength
    
    27 26
     from ._exceptions import ArtifactError, CASError, LoadError, LoadErrorReason
    
    28 27
     from ._message import Message, MessageType
    
    29
    -from . import _signals
    
    30 28
     from . import utils
    
    31 29
     from . import _yaml
    
    32 30
     
    
    ... ... @@ -375,20 +373,8 @@ class ArtifactCache():
    375 373
             remotes = {}
    
    376 374
             q = multiprocessing.Queue()
    
    377 375
             for remote_spec in remote_specs:
    
    378
    -            # Use subprocess to avoid creation of gRPC threads in main BuildStream process
    
    379
    -            # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
    
    380
    -            p = multiprocessing.Process(target=self.cas.initialize_remote, args=(remote_spec, q))
    
    381 376
     
    
    382
    -            try:
    
    383
    -                # Keep SIGINT blocked in the child process
    
    384
    -                with _signals.blocked([signal.SIGINT], ignore=False):
    
    385
    -                    p.start()
    
    386
    -
    
    387
    -                error = q.get()
    
    388
    -                p.join()
    
    389
    -            except KeyboardInterrupt:
    
    390
    -                utils._kill_process_tree(p.pid)
    
    391
    -                raise
    
    377
    +            error = CASRemote.check_remote(remote_spec, q)
    
    392 378
     
    
    393 379
                 if error and on_failure:
    
    394 380
                     on_failure(remote_spec.url, error)
    
    ... ... @@ -747,7 +733,7 @@ class ArtifactCache():
    747 733
                                     "servers are configured as push remotes.")
    
    748 734
     
    
    749 735
             for remote in push_remotes:
    
    750
    -            message_digest = self.cas.push_message(remote, message)
    
    736
    +            message_digest = remote.push_message(message)
    
    751 737
     
    
    752 738
             return message_digest
    
    753 739
     
    

  • buildstream/_cas/cascache.py
    ... ... @@ -28,14 +28,13 @@ import contextlib
    28 28
     
    
    29 29
     import grpc
    
    30 30
     
    
    31
    -from .._protos.google.bytestream import bytestream_pb2
    
    32 31
     from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    33 32
     from .._protos.buildstream.v2 import buildstream_pb2
    
    34 33
     
    
    35 34
     from .. import utils
    
    36 35
     from .._exceptions import CASError
    
    37 36
     
    
    38
    -from .casremote import CASRemote, BlobNotFound, _CASBatchRead, _CASBatchUpdate, _MAX_PAYLOAD_BYTES
    
    37
    +from .casremote import BlobNotFound, _CASBatchRead, _CASBatchUpdate
    
    39 38
     
    
    40 39
     
    
    41 40
     # A CASCache manages a CAS repository as specified in the Remote Execution API.
    
    ... ... @@ -185,29 +184,6 @@ class CASCache():
    185 184
     
    
    186 185
             return modified, removed, added
    
    187 186
     
    
    188
    -    def initialize_remote(self, remote_spec, q):
    
    189
    -        try:
    
    190
    -            remote = CASRemote(remote_spec)
    
    191
    -            remote.init()
    
    192
    -
    
    193
    -            request = buildstream_pb2.StatusRequest()
    
    194
    -            response = remote.ref_storage.Status(request)
    
    195
    -
    
    196
    -            if remote_spec.push and not response.allow_updates:
    
    197
    -                q.put('CAS server does not allow push')
    
    198
    -            else:
    
    199
    -                # No error
    
    200
    -                q.put(None)
    
    201
    -
    
    202
    -        except grpc.RpcError as e:
    
    203
    -            # str(e) is too verbose for errors reported to the user
    
    204
    -            q.put(e.details())
    
    205
    -
    
    206
    -        except Exception as e:               # pylint: disable=broad-except
    
    207
    -            # Whatever happens, we need to return it to the calling process
    
    208
    -            #
    
    209
    -            q.put(str(e))
    
    210
    -
    
    211 187
         # pull():
    
    212 188
         #
    
    213 189
         # Pull a ref from a remote repository.
    
    ... ... @@ -355,50 +331,6 @@ class CASCache():
    355 331
     
    
    356 332
             self._send_directory(remote, directory.ref)
    
    357 333
     
    
    358
    -    # push_message():
    
    359
    -    #
    
    360
    -    # Push the given protobuf message to a remote.
    
    361
    -    #
    
    362
    -    # Args:
    
    363
    -    #     remote (CASRemote): The remote to push to
    
    364
    -    #     message (Message): A protobuf message to push.
    
    365
    -    #
    
    366
    -    # Raises:
    
    367
    -    #     (CASError): if there was an error
    
    368
    -    #
    
    369
    -    def push_message(self, remote, message):
    
    370
    -
    
    371
    -        message_buffer = message.SerializeToString()
    
    372
    -        message_digest = utils._message_digest(message_buffer)
    
    373
    -
    
    374
    -        remote.init()
    
    375
    -
    
    376
    -        with io.BytesIO(message_buffer) as b:
    
    377
    -            self._send_blob(remote, message_digest, b)
    
    378
    -
    
    379
    -        return message_digest
    
    380
    -
    
    381
    -    # verify_digest_on_remote():
    
    382
    -    #
    
    383
    -    # Check whether the object is already on the server in which case
    
    384
    -    # there is no need to upload it.
    
    385
    -    #
    
    386
    -    # Args:
    
    387
    -    #     remote (CASRemote): The remote to check
    
    388
    -    #     digest (Digest): The object digest.
    
    389
    -    #
    
    390
    -    def verify_digest_on_remote(self, remote, digest):
    
    391
    -        remote.init()
    
    392
    -
    
    393
    -        request = remote_execution_pb2.FindMissingBlobsRequest()
    
    394
    -        request.blob_digests.extend([digest])
    
    395
    -
    
    396
    -        response = remote.cas.FindMissingBlobs(request)
    
    397
    -        if digest in response.missing_blob_digests:
    
    398
    -            return False
    
    399
    -
    
    400
    -        return True
    
    401
    -
    
    402 334
         # objpath():
    
    403 335
         #
    
    404 336
         # Return the path of an object based on its digest.
    
    ... ... @@ -849,17 +781,6 @@ class CASCache():
    849 781
             for dirnode in directory.directories:
    
    850 782
                 yield from self._required_blobs(dirnode.digest)
    
    851 783
     
    
    852
    -    def _fetch_blob(self, remote, digest, stream):
    
    853
    -        resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
    
    854
    -        request = bytestream_pb2.ReadRequest()
    
    855
    -        request.resource_name = resource_name
    
    856
    -        request.read_offset = 0
    
    857
    -        for response in remote.bytestream.Read(request):
    
    858
    -            stream.write(response.data)
    
    859
    -        stream.flush()
    
    860
    -
    
    861
    -        assert digest.size_bytes == os.fstat(stream.fileno()).st_size
    
    862
    -
    
    863 784
         # _ensure_blob():
    
    864 785
         #
    
    865 786
         # Fetch and add blob if it's not already local.
    
    ... ... @@ -878,7 +799,7 @@ class CASCache():
    878 799
                 return objpath
    
    879 800
     
    
    880 801
             with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
    
    881
    -            self._fetch_blob(remote, digest, f)
    
    802
    +            remote._fetch_blob(digest, f)
    
    882 803
     
    
    883 804
                 added_digest = self.add_object(path=f.name, link_directly=True)
    
    884 805
                 assert added_digest.hash == digest.hash
    
    ... ... @@ -985,7 +906,7 @@ class CASCache():
    985 906
         def _fetch_tree(self, remote, digest):
    
    986 907
             # download but do not store the Tree object
    
    987 908
             with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
    
    988
    -            self._fetch_blob(remote, digest, out)
    
    909
    +            remote._fetch_blob(digest, out)
    
    989 910
     
    
    990 911
                 tree = remote_execution_pb2.Tree()
    
    991 912
     
    
    ... ... @@ -1005,34 +926,6 @@ class CASCache():
    1005 926
     
    
    1006 927
             return dirdigest
    
    1007 928
     
    
    1008
    -    def _send_blob(self, remote, digest, stream, u_uid=uuid.uuid4()):
    
    1009
    -        resource_name = '/'.join(['uploads', str(u_uid), 'blobs',
    
    1010
    -                                  digest.hash, str(digest.size_bytes)])
    
    1011
    -
    
    1012
    -        def request_stream(resname, instream):
    
    1013
    -            offset = 0
    
    1014
    -            finished = False
    
    1015
    -            remaining = digest.size_bytes
    
    1016
    -            while not finished:
    
    1017
    -                chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
    
    1018
    -                remaining -= chunk_size
    
    1019
    -
    
    1020
    -                request = bytestream_pb2.WriteRequest()
    
    1021
    -                request.write_offset = offset
    
    1022
    -                # max. _MAX_PAYLOAD_BYTES chunks
    
    1023
    -                request.data = instream.read(chunk_size)
    
    1024
    -                request.resource_name = resname
    
    1025
    -                request.finish_write = remaining <= 0
    
    1026
    -
    
    1027
    -                yield request
    
    1028
    -
    
    1029
    -                offset += chunk_size
    
    1030
    -                finished = request.finish_write
    
    1031
    -
    
    1032
    -        response = remote.bytestream.Write(request_stream(resource_name, stream))
    
    1033
    -
    
    1034
    -        assert response.committed_size == digest.size_bytes
    
    1035
    -
    
    1036 929
         def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
    
    1037 930
             required_blobs = self._required_blobs(digest)
    
    1038 931
     
    
    ... ... @@ -1066,7 +959,7 @@ class CASCache():
    1066 959
                     if (digest.size_bytes >= remote.max_batch_total_size_bytes or
    
    1067 960
                             not remote.batch_update_supported):
    
    1068 961
                         # Too large for batch request, upload in independent request.
    
    1069
    -                    self._send_blob(remote, digest, f, u_uid=u_uid)
    
    962
    +                    remote._send_blob(digest, f, u_uid=u_uid)
    
    1070 963
                     else:
    
    1071 964
                         if not batch.add(digest, f):
    
    1072 965
                             # Not enough space left in batch request.
    

  • buildstream/_cas/casremote.py
    1 1
     from collections import namedtuple
    
    2 2
     import os
    
    3
    +import multiprocessing
    
    4
    +import signal
    
    3 5
     from urllib.parse import urlparse
    
    6
    +import uuid
    
    4 7
     
    
    5 8
     import grpc
    
    6 9
     
    
    7 10
     from .. import _yaml
    
    8 11
     from .._protos.google.rpc import code_pb2
    
    9
    -from .._protos.google.bytestream import bytestream_pb2_grpc
    
    12
    +from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    10 13
     from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    11
    -from .._protos.buildstream.v2 import buildstream_pb2_grpc
    
    14
    +from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
    
    12 15
     
    
    13 16
     from .._exceptions import CASError, LoadError, LoadErrorReason
    
    17
    +from .. import _signals
    
    18
    +from .. import utils
    
    14 19
     
    
    15 20
     # The default limit for gRPC messages is 4 MiB.
    
    16 21
     # Limit payload to 1 MiB to leave sufficient headroom for metadata.
    
    ... ... @@ -157,6 +162,137 @@ class CASRemote():
    157 162
     
    
    158 163
                 self._initialized = True
    
    159 164
     
    
    165
    +    # check_remote
    
    166
    +    #
    
    167
    +    # Used when checking whether remote_specs work in the buildstream main
    
    168
    +    # thread, runs this in a seperate process to avoid creation of gRPC threads
    
    169
    +    # in the main BuildStream process
    
    170
    +    # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
    
    171
    +    @classmethod
    
    172
    +    def check_remote(cls, remote_spec, q):
    
    173
    +
    
    174
    +        def __check_remote():
    
    175
    +            try:
    
    176
    +                remote = cls(remote_spec)
    
    177
    +                remote.init()
    
    178
    +
    
    179
    +                request = buildstream_pb2.StatusRequest()
    
    180
    +                response = remote.ref_storage.Status(request)
    
    181
    +
    
    182
    +                if remote_spec.push and not response.allow_updates:
    
    183
    +                    q.put('CAS server does not allow push')
    
    184
    +                else:
    
    185
    +                    # No error
    
    186
    +                    q.put(None)
    
    187
    +
    
    188
    +            except grpc.RpcError as e:
    
    189
    +                # str(e) is too verbose for errors reported to the user
    
    190
    +                q.put(e.details())
    
    191
    +
    
    192
    +            except Exception as e:               # pylint: disable=broad-except
    
    193
    +                # Whatever happens, we need to return it to the calling process
    
    194
    +                #
    
    195
    +                q.put(str(e))
    
    196
    +
    
    197
    +        p = multiprocessing.Process(target=__check_remote)
    
    198
    +
    
    199
    +        try:
    
    200
    +            # Keep SIGINT blocked in the child process
    
    201
    +            with _signals.blocked([signal.SIGINT], ignore=False):
    
    202
    +                p.start()
    
    203
    +
    
    204
    +            error = q.get()
    
    205
    +            p.join()
    
    206
    +        except KeyboardInterrupt:
    
    207
    +            utils._kill_process_tree(p.pid)
    
    208
    +            raise
    
    209
    +
    
    210
    +        return error
    
    211
    +
    
    212
    +    # verify_digest_on_remote():
    
    213
    +    #
    
    214
    +    # Check whether the object is already on the server in which case
    
    215
    +    # there is no need to upload it.
    
    216
    +    #
    
    217
    +    # Args:
    
    218
    +    #     digest (Digest): The object digest.
    
    219
    +    #
    
    220
    +    def verify_digest_on_remote(self, digest):
    
    221
    +        self.init()
    
    222
    +
    
    223
    +        request = remote_execution_pb2.FindMissingBlobsRequest()
    
    224
    +        request.blob_digests.extend([digest])
    
    225
    +
    
    226
    +        response = self.cas.FindMissingBlobs(request)
    
    227
    +        if digest in response.missing_blob_digests:
    
    228
    +            return False
    
    229
    +
    
    230
    +        return True
    
    231
    +
    
    232
    +    # push_message():
    
    233
    +    #
    
    234
    +    # Push the given protobuf message to a remote.
    
    235
    +    #
    
    236
    +    # Args:
    
    237
    +    #     message (Message): A protobuf message to push.
    
    238
    +    #
    
    239
    +    # Raises:
    
    240
    +    #     (CASError): if there was an error
    
    241
    +    #
    
    242
    +    def push_message(self, message):
    
    243
    +
    
    244
    +        message_buffer = message.SerializeToString()
    
    245
    +        message_digest = utils._message_digest(message_buffer)
    
    246
    +
    
    247
    +        remote.init()
    
    248
    +
    
    249
    +        with io.BytesIO(message_buffer) as b:
    
    250
    +            remote._send_blob(message_digest, b)
    
    251
    +
    
    252
    +        return message_digest
    
    253
    +
    
    254
    +    ################################################
    
    255
    +    #             Local Private Methods            #
    
    256
    +    ################################################
    
    257
    +    def _fetch_blob(self, digest, stream):
    
    258
    +        resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
    
    259
    +        request = bytestream_pb2.ReadRequest()
    
    260
    +        request.resource_name = resource_name
    
    261
    +        request.read_offset = 0
    
    262
    +        for response in self.bytestream.Read(request):
    
    263
    +            stream.write(response.data)
    
    264
    +        stream.flush()
    
    265
    +
    
    266
    +        assert digest.size_bytes == os.fstat(stream.fileno()).st_size
    
    267
    +
    
    268
    +    def _send_blob(self, digest, stream, u_uid=uuid.uuid4()):
    
    269
    +        resource_name = '/'.join(['uploads', str(u_uid), 'blobs',
    
    270
    +                                  digest.hash, str(digest.size_bytes)])
    
    271
    +
    
    272
    +        def request_stream(resname, instream):
    
    273
    +            offset = 0
    
    274
    +            finished = False
    
    275
    +            remaining = digest.size_bytes
    
    276
    +            while not finished:
    
    277
    +                chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
    
    278
    +                remaining -= chunk_size
    
    279
    +
    
    280
    +                request = bytestream_pb2.WriteRequest()
    
    281
    +                request.write_offset = offset
    
    282
    +                # max. _MAX_PAYLOAD_BYTES chunks
    
    283
    +                request.data = instream.read(chunk_size)
    
    284
    +                request.resource_name = resname
    
    285
    +                request.finish_write = remaining <= 0
    
    286
    +
    
    287
    +                yield request
    
    288
    +
    
    289
    +                offset += chunk_size
    
    290
    +                finished = request.finish_write
    
    291
    +
    
    292
    +        response = self.bytestream.Write(request_stream(resource_name, stream))
    
    293
    +
    
    294
    +        assert response.committed_size == digest.size_bytes
    
    295
    +
    
    160 296
     
    
    161 297
     # Represents a batch of blobs queued for fetching.
    
    162 298
     #
    

  • buildstream/sandbox/_sandboxremote.py
    ... ... @@ -305,17 +305,17 @@ class SandboxRemote(Sandbox):
    305 305
                 except grpc.RpcError as e:
    
    306 306
                     raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
    
    307 307
     
    
    308
    -            if not cascache.verify_digest_on_remote(casremote, upload_vdir.ref):
    
    308
    +            if not casremote.verify_digest_on_remote(upload_vdir.ref):
    
    309 309
                     raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
    
    310 310
     
    
    311 311
                 # Push command and action
    
    312 312
                 try:
    
    313
    -                cascache.push_message(casremote, command_proto)
    
    313
    +                casremote.push_message(command_proto)
    
    314 314
                 except grpc.RpcError as e:
    
    315 315
                     raise SandboxError("Failed to push command to remote: {}".format(e))
    
    316 316
     
    
    317 317
                 try:
    
    318
    -                cascache.push_message(casremote, action)
    
    318
    +                casremote.push_message(action)
    
    319 319
                 except grpc.RpcError as e:
    
    320 320
                     raise SandboxError("Failed to push action to remote: {}".format(e))
    
    321 321
     
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]