Raoul Hidalgo Charman pushed to branch raoul/802-refactor-artifactcache at BuildStream / buildstream
Commits:
-
e2817d89
by Raoul Hidalgo Charman at 2018-12-11T11:41:44Z
3 changed files:
Changes:
... | ... | @@ -19,14 +19,12 @@ |
19 | 19 |
|
20 | 20 |
import multiprocessing
|
21 | 21 |
import os
|
22 |
-import signal
|
|
23 | 22 |
import string
|
24 | 23 |
from collections.abc import Mapping
|
25 | 24 |
|
26 | 25 |
from .types import _KeyStrength
|
27 | 26 |
from ._exceptions import ArtifactError, CASError, LoadError, LoadErrorReason
|
28 | 27 |
from ._message import Message, MessageType
|
29 |
-from . import _signals
|
|
30 | 28 |
from . import utils
|
31 | 29 |
from . import _yaml
|
32 | 30 |
|
... | ... | @@ -375,20 +373,8 @@ class ArtifactCache(): |
375 | 373 |
remotes = {}
|
376 | 374 |
q = multiprocessing.Queue()
|
377 | 375 |
for remote_spec in remote_specs:
|
378 |
- # Use subprocess to avoid creation of gRPC threads in main BuildStream process
|
|
379 |
- # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
|
|
380 |
- p = multiprocessing.Process(target=self.cas.initialize_remote, args=(remote_spec, q))
|
|
381 | 376 |
|
382 |
- try:
|
|
383 |
- # Keep SIGINT blocked in the child process
|
|
384 |
- with _signals.blocked([signal.SIGINT], ignore=False):
|
|
385 |
- p.start()
|
|
386 |
- |
|
387 |
- error = q.get()
|
|
388 |
- p.join()
|
|
389 |
- except KeyboardInterrupt:
|
|
390 |
- utils._kill_process_tree(p.pid)
|
|
391 |
- raise
|
|
377 |
+ error = CASRemote.check_remote(remote_spec, q)
|
|
392 | 378 |
|
393 | 379 |
if error and on_failure:
|
394 | 380 |
on_failure(remote_spec.url, error)
|
... | ... | @@ -35,7 +35,7 @@ from .._protos.buildstream.v2 import buildstream_pb2 |
35 | 35 |
from .. import utils
|
36 | 36 |
from .._exceptions import CASError
|
37 | 37 |
|
38 |
-from .casremote import CASRemote, BlobNotFound, _CASBatchRead, _CASBatchUpdate, _MAX_PAYLOAD_BYTES
|
|
38 |
+from .casremote import BlobNotFound, _CASBatchRead, _CASBatchUpdate, _MAX_PAYLOAD_BYTES
|
|
39 | 39 |
|
40 | 40 |
|
41 | 41 |
# A CASCache manages a CAS repository as specified in the Remote Execution API.
|
... | ... | @@ -185,29 +185,6 @@ class CASCache(): |
185 | 185 |
|
186 | 186 |
return modified, removed, added
|
187 | 187 |
|
188 |
- def initialize_remote(self, remote_spec, q):
|
|
189 |
- try:
|
|
190 |
- remote = CASRemote(remote_spec)
|
|
191 |
- remote.init()
|
|
192 |
- |
|
193 |
- request = buildstream_pb2.StatusRequest()
|
|
194 |
- response = remote.ref_storage.Status(request)
|
|
195 |
- |
|
196 |
- if remote_spec.push and not response.allow_updates:
|
|
197 |
- q.put('CAS server does not allow push')
|
|
198 |
- else:
|
|
199 |
- # No error
|
|
200 |
- q.put(None)
|
|
201 |
- |
|
202 |
- except grpc.RpcError as e:
|
|
203 |
- # str(e) is too verbose for errors reported to the user
|
|
204 |
- q.put(e.details())
|
|
205 |
- |
|
206 |
- except Exception as e: # pylint: disable=broad-except
|
|
207 |
- # Whatever happens, we need to return it to the calling process
|
|
208 |
- #
|
|
209 |
- q.put(str(e))
|
|
210 |
- |
|
211 | 188 |
# pull():
|
212 | 189 |
#
|
213 | 190 |
# Pull a ref from a remote repository.
|
1 | 1 |
from collections import namedtuple
|
2 | 2 |
import os
|
3 |
+import multiprocessing
|
|
4 |
+import signal
|
|
3 | 5 |
from urllib.parse import urlparse
|
4 | 6 |
|
5 | 7 |
import grpc
|
... | ... | @@ -8,9 +10,11 @@ from .. import _yaml |
8 | 10 |
from .._protos.google.rpc import code_pb2
|
9 | 11 |
from .._protos.google.bytestream import bytestream_pb2_grpc
|
10 | 12 |
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
11 |
-from .._protos.buildstream.v2 import buildstream_pb2_grpc
|
|
13 |
+from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
|
|
12 | 14 |
|
13 | 15 |
from .._exceptions import CASError, LoadError, LoadErrorReason
|
16 |
+from .. import _signals
|
|
17 |
+from .. import utils
|
|
14 | 18 |
|
15 | 19 |
# The default limit for gRPC messages is 4 MiB.
|
16 | 20 |
# Limit payload to 1 MiB to leave sufficient headroom for metadata.
|
... | ... | @@ -157,6 +161,53 @@ class CASRemote(): |
157 | 161 |
|
158 | 162 |
self._initialized = True
|
159 | 163 |
|
164 |
+ # check_remote
|
|
165 |
+ #
|
|
166 |
+ # Used when checking whether remote_specs work in the buildstream main
|
|
167 |
+ # thread, runs this in a seperate process to avoid creation of gRPC threads
|
|
168 |
+ # in the main BuildStream process
|
|
169 |
+ # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
|
|
170 |
+ @classmethod
|
|
171 |
+ def check_remote(cls, remote_spec, q):
|
|
172 |
+ |
|
173 |
+ def __check_remote():
|
|
174 |
+ try:
|
|
175 |
+ remote = cls(remote_spec)
|
|
176 |
+ remote.init()
|
|
177 |
+ |
|
178 |
+ request = buildstream_pb2.StatusRequest()
|
|
179 |
+ response = remote.ref_storage.Status(request)
|
|
180 |
+ |
|
181 |
+ if remote_spec.push and not response.allow_updates:
|
|
182 |
+ q.put('CAS server does not allow push')
|
|
183 |
+ else:
|
|
184 |
+ # No error
|
|
185 |
+ q.put(None)
|
|
186 |
+ |
|
187 |
+ except grpc.RpcError as e:
|
|
188 |
+ # str(e) is too verbose for errors reported to the user
|
|
189 |
+ q.put(e.details())
|
|
190 |
+ |
|
191 |
+ except Exception as e: # pylint: disable=broad-except
|
|
192 |
+ # Whatever happens, we need to return it to the calling process
|
|
193 |
+ #
|
|
194 |
+ q.put(str(e))
|
|
195 |
+ |
|
196 |
+ p = multiprocessing.Process(target=__check_remote)
|
|
197 |
+ |
|
198 |
+ try:
|
|
199 |
+ # Keep SIGINT blocked in the child process
|
|
200 |
+ with _signals.blocked([signal.SIGINT], ignore=False):
|
|
201 |
+ p.start()
|
|
202 |
+ |
|
203 |
+ error = q.get()
|
|
204 |
+ p.join()
|
|
205 |
+ except KeyboardInterrupt:
|
|
206 |
+ utils._kill_process_tree(p.pid)
|
|
207 |
+ raise
|
|
208 |
+ |
|
209 |
+ return error
|
|
210 |
+ |
|
160 | 211 |
|
161 | 212 |
# Represents a batch of blobs queued for fetching.
|
162 | 213 |
#
|