Raoul Hidalgo Charman pushed to branch raoul/802-refactor-artifactcache at BuildStream / buildstream
Commits:
-
e2817d89
by Raoul Hidalgo Charman at 2018-12-11T11:41:44Z
3 changed files:
Changes:
| ... | ... | @@ -19,14 +19,12 @@ |
| 19 | 19 |
|
| 20 | 20 |
import multiprocessing
|
| 21 | 21 |
import os
|
| 22 |
-import signal
|
|
| 23 | 22 |
import string
|
| 24 | 23 |
from collections.abc import Mapping
|
| 25 | 24 |
|
| 26 | 25 |
from .types import _KeyStrength
|
| 27 | 26 |
from ._exceptions import ArtifactError, CASError, LoadError, LoadErrorReason
|
| 28 | 27 |
from ._message import Message, MessageType
|
| 29 |
-from . import _signals
|
|
| 30 | 28 |
from . import utils
|
| 31 | 29 |
from . import _yaml
|
| 32 | 30 |
|
| ... | ... | @@ -375,20 +373,8 @@ class ArtifactCache(): |
| 375 | 373 |
remotes = {}
|
| 376 | 374 |
q = multiprocessing.Queue()
|
| 377 | 375 |
for remote_spec in remote_specs:
|
| 378 |
- # Use subprocess to avoid creation of gRPC threads in main BuildStream process
|
|
| 379 |
- # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
|
|
| 380 |
- p = multiprocessing.Process(target=self.cas.initialize_remote, args=(remote_spec, q))
|
|
| 381 | 376 |
|
| 382 |
- try:
|
|
| 383 |
- # Keep SIGINT blocked in the child process
|
|
| 384 |
- with _signals.blocked([signal.SIGINT], ignore=False):
|
|
| 385 |
- p.start()
|
|
| 386 |
- |
|
| 387 |
- error = q.get()
|
|
| 388 |
- p.join()
|
|
| 389 |
- except KeyboardInterrupt:
|
|
| 390 |
- utils._kill_process_tree(p.pid)
|
|
| 391 |
- raise
|
|
| 377 |
+ error = CASRemote.check_remote(remote_spec, q)
|
|
| 392 | 378 |
|
| 393 | 379 |
if error and on_failure:
|
| 394 | 380 |
on_failure(remote_spec.url, error)
|
| ... | ... | @@ -35,7 +35,7 @@ from .._protos.buildstream.v2 import buildstream_pb2 |
| 35 | 35 |
from .. import utils
|
| 36 | 36 |
from .._exceptions import CASError
|
| 37 | 37 |
|
| 38 |
-from .casremote import CASRemote, BlobNotFound, _CASBatchRead, _CASBatchUpdate, _MAX_PAYLOAD_BYTES
|
|
| 38 |
+from .casremote import BlobNotFound, _CASBatchRead, _CASBatchUpdate, _MAX_PAYLOAD_BYTES
|
|
| 39 | 39 |
|
| 40 | 40 |
|
| 41 | 41 |
# A CASCache manages a CAS repository as specified in the Remote Execution API.
|
| ... | ... | @@ -185,29 +185,6 @@ class CASCache(): |
| 185 | 185 |
|
| 186 | 186 |
return modified, removed, added
|
| 187 | 187 |
|
| 188 |
- def initialize_remote(self, remote_spec, q):
|
|
| 189 |
- try:
|
|
| 190 |
- remote = CASRemote(remote_spec)
|
|
| 191 |
- remote.init()
|
|
| 192 |
- |
|
| 193 |
- request = buildstream_pb2.StatusRequest()
|
|
| 194 |
- response = remote.ref_storage.Status(request)
|
|
| 195 |
- |
|
| 196 |
- if remote_spec.push and not response.allow_updates:
|
|
| 197 |
- q.put('CAS server does not allow push')
|
|
| 198 |
- else:
|
|
| 199 |
- # No error
|
|
| 200 |
- q.put(None)
|
|
| 201 |
- |
|
| 202 |
- except grpc.RpcError as e:
|
|
| 203 |
- # str(e) is too verbose for errors reported to the user
|
|
| 204 |
- q.put(e.details())
|
|
| 205 |
- |
|
| 206 |
- except Exception as e: # pylint: disable=broad-except
|
|
| 207 |
- # Whatever happens, we need to return it to the calling process
|
|
| 208 |
- #
|
|
| 209 |
- q.put(str(e))
|
|
| 210 |
- |
|
| 211 | 188 |
# pull():
|
| 212 | 189 |
#
|
| 213 | 190 |
# Pull a ref from a remote repository.
|
| 1 | 1 |
from collections import namedtuple
|
| 2 | 2 |
import os
|
| 3 |
+import multiprocessing
|
|
| 4 |
+import signal
|
|
| 3 | 5 |
from urllib.parse import urlparse
|
| 4 | 6 |
|
| 5 | 7 |
import grpc
|
| ... | ... | @@ -8,9 +10,11 @@ from .. import _yaml |
| 8 | 10 |
from .._protos.google.rpc import code_pb2
|
| 9 | 11 |
from .._protos.google.bytestream import bytestream_pb2_grpc
|
| 10 | 12 |
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
| 11 |
-from .._protos.buildstream.v2 import buildstream_pb2_grpc
|
|
| 13 |
+from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
|
|
| 12 | 14 |
|
| 13 | 15 |
from .._exceptions import CASError, LoadError, LoadErrorReason
|
| 16 |
+from .. import _signals
|
|
| 17 |
+from .. import utils
|
|
| 14 | 18 |
|
| 15 | 19 |
# The default limit for gRPC messages is 4 MiB.
|
| 16 | 20 |
# Limit payload to 1 MiB to leave sufficient headroom for metadata.
|
| ... | ... | @@ -157,6 +161,53 @@ class CASRemote(): |
| 157 | 161 |
|
| 158 | 162 |
self._initialized = True
|
| 159 | 163 |
|
| 164 |
+ # check_remote
|
|
| 165 |
+ #
|
|
| 166 |
+ # Used when checking whether remote_specs work in the buildstream main
|
|
| 167 |
+ # thread, runs this in a seperate process to avoid creation of gRPC threads
|
|
| 168 |
+ # in the main BuildStream process
|
|
| 169 |
+ # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
|
|
| 170 |
+ @classmethod
|
|
| 171 |
+ def check_remote(cls, remote_spec, q):
|
|
| 172 |
+ |
|
| 173 |
+ def __check_remote():
|
|
| 174 |
+ try:
|
|
| 175 |
+ remote = cls(remote_spec)
|
|
| 176 |
+ remote.init()
|
|
| 177 |
+ |
|
| 178 |
+ request = buildstream_pb2.StatusRequest()
|
|
| 179 |
+ response = remote.ref_storage.Status(request)
|
|
| 180 |
+ |
|
| 181 |
+ if remote_spec.push and not response.allow_updates:
|
|
| 182 |
+ q.put('CAS server does not allow push')
|
|
| 183 |
+ else:
|
|
| 184 |
+ # No error
|
|
| 185 |
+ q.put(None)
|
|
| 186 |
+ |
|
| 187 |
+ except grpc.RpcError as e:
|
|
| 188 |
+ # str(e) is too verbose for errors reported to the user
|
|
| 189 |
+ q.put(e.details())
|
|
| 190 |
+ |
|
| 191 |
+ except Exception as e: # pylint: disable=broad-except
|
|
| 192 |
+ # Whatever happens, we need to return it to the calling process
|
|
| 193 |
+ #
|
|
| 194 |
+ q.put(str(e))
|
|
| 195 |
+ |
|
| 196 |
+ p = multiprocessing.Process(target=__check_remote)
|
|
| 197 |
+ |
|
| 198 |
+ try:
|
|
| 199 |
+ # Keep SIGINT blocked in the child process
|
|
| 200 |
+ with _signals.blocked([signal.SIGINT], ignore=False):
|
|
| 201 |
+ p.start()
|
|
| 202 |
+ |
|
| 203 |
+ error = q.get()
|
|
| 204 |
+ p.join()
|
|
| 205 |
+ except KeyboardInterrupt:
|
|
| 206 |
+ utils._kill_process_tree(p.pid)
|
|
| 207 |
+ raise
|
|
| 208 |
+ |
|
| 209 |
+ return error
|
|
| 210 |
+ |
|
| 160 | 211 |
|
| 161 | 212 |
# Represents a batch of blobs queued for fetching.
|
| 162 | 213 |
#
|
