[Notes] [Git][BuildStream/buildstream][raoul/802-refactor-artifactcache] casremote: move subprocess initialization to a class method.



Title: GitLab

Raoul Hidalgo Charman pushed to branch raoul/802-refactor-artifactcache at BuildStream / buildstream

Commits:

3 changed files:

Changes:

  • buildstream/_artifactcache.py
    ... ... @@ -19,14 +19,12 @@
    19 19
     
    
    20 20
     import multiprocessing
    
    21 21
     import os
    
    22
    -import signal
    
    23 22
     import string
    
    24 23
     from collections.abc import Mapping
    
    25 24
     
    
    26 25
     from .types import _KeyStrength
    
    27 26
     from ._exceptions import ArtifactError, CASError, LoadError, LoadErrorReason
    
    28 27
     from ._message import Message, MessageType
    
    29
    -from . import _signals
    
    30 28
     from . import utils
    
    31 29
     from . import _yaml
    
    32 30
     
    
    ... ... @@ -375,20 +373,8 @@ class ArtifactCache():
    375 373
             remotes = {}
    
    376 374
             q = multiprocessing.Queue()
    
    377 375
             for remote_spec in remote_specs:
    
    378
    -            # Use subprocess to avoid creation of gRPC threads in main BuildStream process
    
    379
    -            # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
    
    380
    -            p = multiprocessing.Process(target=self.cas.initialize_remote, args=(remote_spec, q))
    
    381 376
     
    
    382
    -            try:
    
    383
    -                # Keep SIGINT blocked in the child process
    
    384
    -                with _signals.blocked([signal.SIGINT], ignore=False):
    
    385
    -                    p.start()
    
    386
    -
    
    387
    -                error = q.get()
    
    388
    -                p.join()
    
    389
    -            except KeyboardInterrupt:
    
    390
    -                utils._kill_process_tree(p.pid)
    
    391
    -                raise
    
    377
    +            error = CASRemote.check_remote(remote_spec, q)
    
    392 378
     
    
    393 379
                 if error and on_failure:
    
    394 380
                     on_failure(remote_spec.url, error)
    

  • buildstream/_cas/cascache.py
    ... ... @@ -35,7 +35,7 @@ from .._protos.buildstream.v2 import buildstream_pb2
    35 35
     from .. import utils
    
    36 36
     from .._exceptions import CASError
    
    37 37
     
    
    38
    -from .casremote import CASRemote, BlobNotFound, _CASBatchRead, _CASBatchUpdate, _MAX_PAYLOAD_BYTES
    
    38
    +from .casremote import BlobNotFound, _CASBatchRead, _CASBatchUpdate, _MAX_PAYLOAD_BYTES
    
    39 39
     
    
    40 40
     
    
    41 41
     # A CASCache manages a CAS repository as specified in the Remote Execution API.
    
    ... ... @@ -185,29 +185,6 @@ class CASCache():
    185 185
     
    
    186 186
             return modified, removed, added
    
    187 187
     
    
    188
    -    def initialize_remote(self, remote_spec, q):
    
    189
    -        try:
    
    190
    -            remote = CASRemote(remote_spec)
    
    191
    -            remote.init()
    
    192
    -
    
    193
    -            request = buildstream_pb2.StatusRequest()
    
    194
    -            response = remote.ref_storage.Status(request)
    
    195
    -
    
    196
    -            if remote_spec.push and not response.allow_updates:
    
    197
    -                q.put('CAS server does not allow push')
    
    198
    -            else:
    
    199
    -                # No error
    
    200
    -                q.put(None)
    
    201
    -
    
    202
    -        except grpc.RpcError as e:
    
    203
    -            # str(e) is too verbose for errors reported to the user
    
    204
    -            q.put(e.details())
    
    205
    -
    
    206
    -        except Exception as e:               # pylint: disable=broad-except
    
    207
    -            # Whatever happens, we need to return it to the calling process
    
    208
    -            #
    
    209
    -            q.put(str(e))
    
    210
    -
    
    211 188
         # pull():
    
    212 189
         #
    
    213 190
         # Pull a ref from a remote repository.
    

  • buildstream/_cas/casremote.py
    1 1
     from collections import namedtuple
    
    2 2
     import os
    
    3
    +import multiprocessing
    
    4
    +import signal
    
    3 5
     from urllib.parse import urlparse
    
    4 6
     
    
    5 7
     import grpc
    
    ... ... @@ -8,9 +10,11 @@ from .. import _yaml
    8 10
     from .._protos.google.rpc import code_pb2
    
    9 11
     from .._protos.google.bytestream import bytestream_pb2_grpc
    
    10 12
     from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    11
    -from .._protos.buildstream.v2 import buildstream_pb2_grpc
    
    13
    +from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
    
    12 14
     
    
    13 15
     from .._exceptions import CASError, LoadError, LoadErrorReason
    
    16
    +from .. import _signals
    
    17
    +from .. import utils
    
    14 18
     
    
    15 19
     # The default limit for gRPC messages is 4 MiB.
    
    16 20
     # Limit payload to 1 MiB to leave sufficient headroom for metadata.
    
    ... ... @@ -157,6 +161,53 @@ class CASRemote():
    157 161
     
    
    158 162
                 self._initialized = True
    
    159 163
     
    
    164
    +    # check_remote
    
    165
    +    #
    
    166
    +    # Used when checking whether remote_specs work in the buildstream main
    
    167
    +    # thread, runs this in a seperate process to avoid creation of gRPC threads
    
    168
    +    # in the main BuildStream process
    
    169
    +    # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
    
    170
    +    @classmethod
    
    171
    +    def check_remote(cls, remote_spec, q):
    
    172
    +
    
    173
    +        def __check_remote():
    
    174
    +            try:
    
    175
    +                remote = cls(remote_spec)
    
    176
    +                remote.init()
    
    177
    +
    
    178
    +                request = buildstream_pb2.StatusRequest()
    
    179
    +                response = remote.ref_storage.Status(request)
    
    180
    +
    
    181
    +                if remote_spec.push and not response.allow_updates:
    
    182
    +                    q.put('CAS server does not allow push')
    
    183
    +                else:
    
    184
    +                    # No error
    
    185
    +                    q.put(None)
    
    186
    +
    
    187
    +            except grpc.RpcError as e:
    
    188
    +                # str(e) is too verbose for errors reported to the user
    
    189
    +                q.put(e.details())
    
    190
    +
    
    191
    +            except Exception as e:               # pylint: disable=broad-except
    
    192
    +                # Whatever happens, we need to return it to the calling process
    
    193
    +                #
    
    194
    +                q.put(str(e))
    
    195
    +
    
    196
    +        p = multiprocessing.Process(target=__check_remote)
    
    197
    +
    
    198
    +        try:
    
    199
    +            # Keep SIGINT blocked in the child process
    
    200
    +            with _signals.blocked([signal.SIGINT], ignore=False):
    
    201
    +                p.start()
    
    202
    +
    
    203
    +            error = q.get()
    
    204
    +            p.join()
    
    205
    +        except KeyboardInterrupt:
    
    206
    +            utils._kill_process_tree(p.pid)
    
    207
    +            raise
    
    208
    +
    
    209
    +        return error
    
    210
    +
    
    160 211
     
    
    161 212
     # Represents a batch of blobs queued for fetching.
    
    162 213
     #
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]