Tristan Van Berkom pushed to branch juerg/cas-batch-1.2 at BuildStream / buildstream
Commits:
-
a009dcbe
by Tristan Van Berkom at 2018-10-02T11:51:19Z
-
7da5104b
by Tristan Van Berkom at 2018-10-02T13:18:54Z
-
6e820362
by Tristan Van Berkom at 2018-10-03T07:35:03Z
-
9568824f
by Jim MacArthur at 2018-10-03T07:35:51Z
-
4a67e4e3
by Jürg Billeter at 2018-10-03T07:35:51Z
-
f585b233
by Jürg Billeter at 2018-10-03T07:35:51Z
4 changed files:
- buildstream/_artifactcache/cascache.py
- buildstream/_artifactcache/casserver.py
- buildstream/plugins/sources/git.py
- buildstream/source.py
Changes:
... | ... | @@ -81,6 +81,7 @@ class CASCache(ArtifactCache): |
81 | 81 |
################################################
|
82 | 82 |
# Implementation of abstract methods #
|
83 | 83 |
################################################
|
84 |
+ |
|
84 | 85 |
def contains(self, element, key):
|
85 | 86 |
refpath = self._refpath(self.get_artifact_fullname(element, key))
|
86 | 87 |
|
... | ... | @@ -156,6 +157,7 @@ class CASCache(ArtifactCache): |
156 | 157 |
q = multiprocessing.Queue()
|
157 | 158 |
for remote_spec in remote_specs:
|
158 | 159 |
# Use subprocess to avoid creation of gRPC threads in main BuildStream process
|
160 |
+ # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
|
|
159 | 161 |
p = multiprocessing.Process(target=self._initialize_remote, args=(remote_spec, q))
|
160 | 162 |
|
161 | 163 |
try:
|
... | ... | @@ -268,109 +270,69 @@ class CASCache(ArtifactCache): |
268 | 270 |
|
269 | 271 |
self.set_ref(newref, tree)
|
270 | 272 |
|
273 |
+ def _push_refs_to_remote(self, refs, remote):
|
|
274 |
+ skipped_remote = True
|
|
275 |
+ try:
|
|
276 |
+ for ref in refs:
|
|
277 |
+ tree = self.resolve_ref(ref)
|
|
278 |
+ |
|
279 |
+ # Check whether ref is already on the server in which case
|
|
280 |
+ # there is no need to push the artifact
|
|
281 |
+ try:
|
|
282 |
+ request = buildstream_pb2.GetReferenceRequest()
|
|
283 |
+ request.key = ref
|
|
284 |
+ response = remote.ref_storage.GetReference(request)
|
|
285 |
+ |
|
286 |
+ if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
|
|
287 |
+ # ref is already on the server with the same tree
|
|
288 |
+ continue
|
|
289 |
+ |
|
290 |
+ except grpc.RpcError as e:
|
|
291 |
+ if e.code() != grpc.StatusCode.NOT_FOUND:
|
|
292 |
+ # Intentionally re-raise RpcError for outer except block.
|
|
293 |
+ raise
|
|
294 |
+ |
|
295 |
+ self._send_directory(remote, tree)
|
|
296 |
+ |
|
297 |
+ request = buildstream_pb2.UpdateReferenceRequest()
|
|
298 |
+ request.keys.append(ref)
|
|
299 |
+ request.digest.hash = tree.hash
|
|
300 |
+ request.digest.size_bytes = tree.size_bytes
|
|
301 |
+ remote.ref_storage.UpdateReference(request)
|
|
302 |
+ |
|
303 |
+ skipped_remote = False
|
|
304 |
+ except grpc.RpcError as e:
|
|
305 |
+ if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
|
|
306 |
+ raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
|
|
307 |
+ |
|
308 |
+ return not skipped_remote
|
|
309 |
+ |
|
271 | 310 |
def push(self, element, keys):
|
272 |
- refs = [self.get_artifact_fullname(element, key) for key in keys]
|
|
311 |
+ |
|
312 |
+ refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
|
|
273 | 313 |
|
274 | 314 |
project = element._get_project()
|
275 | 315 |
|
276 | 316 |
push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
277 | 317 |
|
278 | 318 |
pushed = False
|
279 |
- display_key = element._get_brief_display_key()
|
|
319 |
+ |
|
280 | 320 |
for remote in push_remotes:
|
281 | 321 |
remote.init()
|
282 |
- skipped_remote = True
|
|
322 |
+ display_key = element._get_brief_display_key()
|
|
283 | 323 |
element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
|
284 | 324 |
|
285 |
- try:
|
|
286 |
- for ref in refs:
|
|
287 |
- tree = self.resolve_ref(ref)
|
|
288 |
- |
|
289 |
- # Check whether ref is already on the server in which case
|
|
290 |
- # there is no need to push the artifact
|
|
291 |
- try:
|
|
292 |
- request = buildstream_pb2.GetReferenceRequest()
|
|
293 |
- request.key = ref
|
|
294 |
- response = remote.ref_storage.GetReference(request)
|
|
295 |
- |
|
296 |
- if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
|
|
297 |
- # ref is already on the server with the same tree
|
|
298 |
- continue
|
|
299 |
- |
|
300 |
- except grpc.RpcError as e:
|
|
301 |
- if e.code() != grpc.StatusCode.NOT_FOUND:
|
|
302 |
- # Intentionally re-raise RpcError for outer except block.
|
|
303 |
- raise
|
|
304 |
- |
|
305 |
- missing_blobs = {}
|
|
306 |
- required_blobs = self._required_blobs(tree)
|
|
307 |
- |
|
308 |
- # Limit size of FindMissingBlobs request
|
|
309 |
- for required_blobs_group in _grouper(required_blobs, 512):
|
|
310 |
- request = remote_execution_pb2.FindMissingBlobsRequest()
|
|
311 |
- |
|
312 |
- for required_digest in required_blobs_group:
|
|
313 |
- d = request.blob_digests.add()
|
|
314 |
- d.hash = required_digest.hash
|
|
315 |
- d.size_bytes = required_digest.size_bytes
|
|
316 |
- |
|
317 |
- response = remote.cas.FindMissingBlobs(request)
|
|
318 |
- for digest in response.missing_blob_digests:
|
|
319 |
- d = remote_execution_pb2.Digest()
|
|
320 |
- d.hash = digest.hash
|
|
321 |
- d.size_bytes = digest.size_bytes
|
|
322 |
- missing_blobs[d.hash] = d
|
|
323 |
- |
|
324 |
- # Upload any blobs missing on the server
|
|
325 |
- skipped_remote = False
|
|
326 |
- for digest in missing_blobs.values():
|
|
327 |
- uuid_ = uuid.uuid4()
|
|
328 |
- resource_name = '/'.join(['uploads', str(uuid_), 'blobs',
|
|
329 |
- digest.hash, str(digest.size_bytes)])
|
|
330 |
- |
|
331 |
- def request_stream(resname):
|
|
332 |
- with open(self.objpath(digest), 'rb') as f:
|
|
333 |
- assert os.fstat(f.fileno()).st_size == digest.size_bytes
|
|
334 |
- offset = 0
|
|
335 |
- finished = False
|
|
336 |
- remaining = digest.size_bytes
|
|
337 |
- while not finished:
|
|
338 |
- chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
|
|
339 |
- remaining -= chunk_size
|
|
340 |
- |
|
341 |
- request = bytestream_pb2.WriteRequest()
|
|
342 |
- request.write_offset = offset
|
|
343 |
- # max. _MAX_PAYLOAD_BYTES chunks
|
|
344 |
- request.data = f.read(chunk_size)
|
|
345 |
- request.resource_name = resname
|
|
346 |
- request.finish_write = remaining <= 0
|
|
347 |
- yield request
|
|
348 |
- offset += chunk_size
|
|
349 |
- finished = request.finish_write
|
|
350 |
- response = remote.bytestream.Write(request_stream(resource_name))
|
|
351 |
- |
|
352 |
- request = buildstream_pb2.UpdateReferenceRequest()
|
|
353 |
- request.keys.append(ref)
|
|
354 |
- request.digest.hash = tree.hash
|
|
355 |
- request.digest.size_bytes = tree.size_bytes
|
|
356 |
- remote.ref_storage.UpdateReference(request)
|
|
357 |
- |
|
358 |
- pushed = True
|
|
359 |
- |
|
360 |
- if not skipped_remote:
|
|
361 |
- element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
|
|
362 |
- |
|
363 |
- except grpc.RpcError as e:
|
|
364 |
- if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
|
|
365 |
- raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
|
|
366 |
- |
|
367 |
- if skipped_remote:
|
|
325 |
+ if self._push_refs_to_remote(refs, remote):
|
|
326 |
+ element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
|
|
327 |
+ pushed = True
|
|
328 |
+ else:
|
|
368 | 329 |
self.context.message(Message(
|
369 | 330 |
None,
|
370 | 331 |
MessageType.INFO,
|
371 | 332 |
"Remote ({}) already has {} cached".format(
|
372 | 333 |
remote.spec.url, element._get_brief_display_key())
|
373 | 334 |
))
|
335 |
+ |
|
374 | 336 |
return pushed
|
375 | 337 |
|
376 | 338 |
################################################
|
... | ... | @@ -599,6 +561,7 @@ class CASCache(ArtifactCache): |
599 | 561 |
################################################
|
600 | 562 |
# Local Private Methods #
|
601 | 563 |
################################################
|
564 |
+ |
|
602 | 565 |
def _checkout(self, dest, tree):
|
603 | 566 |
os.makedirs(dest, exist_ok=True)
|
604 | 567 |
|
... | ... | @@ -776,16 +739,16 @@ class CASCache(ArtifactCache): |
776 | 739 |
#
|
777 | 740 |
q.put(str(e))
|
778 | 741 |
|
779 |
- def _required_blobs(self, tree):
|
|
742 |
+ def _required_blobs(self, directory_digest):
|
|
780 | 743 |
# parse directory, and recursively add blobs
|
781 | 744 |
d = remote_execution_pb2.Digest()
|
782 |
- d.hash = tree.hash
|
|
783 |
- d.size_bytes = tree.size_bytes
|
|
745 |
+ d.hash = directory_digest.hash
|
|
746 |
+ d.size_bytes = directory_digest.size_bytes
|
|
784 | 747 |
yield d
|
785 | 748 |
|
786 | 749 |
directory = remote_execution_pb2.Directory()
|
787 | 750 |
|
788 |
- with open(self.objpath(tree), 'rb') as f:
|
|
751 |
+ with open(self.objpath(directory_digest), 'rb') as f:
|
|
789 | 752 |
directory.ParseFromString(f.read())
|
790 | 753 |
|
791 | 754 |
for filenode in directory.files:
|
... | ... | @@ -797,16 +760,16 @@ class CASCache(ArtifactCache): |
797 | 760 |
for dirnode in directory.directories:
|
798 | 761 |
yield from self._required_blobs(dirnode.digest)
|
799 | 762 |
|
800 |
- def _fetch_blob(self, remote, digest, out):
|
|
763 |
+ def _fetch_blob(self, remote, digest, stream):
|
|
801 | 764 |
resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
|
802 | 765 |
request = bytestream_pb2.ReadRequest()
|
803 | 766 |
request.resource_name = resource_name
|
804 | 767 |
request.read_offset = 0
|
805 | 768 |
for response in remote.bytestream.Read(request):
|
806 |
- out.write(response.data)
|
|
769 |
+ stream.write(response.data)
|
|
770 |
+ stream.flush()
|
|
807 | 771 |
|
808 |
- out.flush()
|
|
809 |
- assert digest.size_bytes == os.fstat(out.fileno()).st_size
|
|
772 |
+ assert digest.size_bytes == os.fstat(stream.fileno()).st_size
|
|
810 | 773 |
|
811 | 774 |
# _ensure_blob():
|
812 | 775 |
#
|
... | ... | @@ -922,6 +885,79 @@ class CASCache(ArtifactCache): |
922 | 885 |
# Fetch final batch
|
923 | 886 |
self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
|
924 | 887 |
|
888 |
+ def _send_blob(self, remote, digest, stream, u_uid=uuid.uuid4()):
|
|
889 |
+ resource_name = '/'.join(['uploads', str(u_uid), 'blobs',
|
|
890 |
+ digest.hash, str(digest.size_bytes)])
|
|
891 |
+ |
|
892 |
+ def request_stream(resname, instream):
|
|
893 |
+ offset = 0
|
|
894 |
+ finished = False
|
|
895 |
+ remaining = digest.size_bytes
|
|
896 |
+ while not finished:
|
|
897 |
+ chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
|
|
898 |
+ remaining -= chunk_size
|
|
899 |
+ |
|
900 |
+ request = bytestream_pb2.WriteRequest()
|
|
901 |
+ request.write_offset = offset
|
|
902 |
+ # max. _MAX_PAYLOAD_BYTES chunks
|
|
903 |
+ request.data = instream.read(chunk_size)
|
|
904 |
+ request.resource_name = resname
|
|
905 |
+ request.finish_write = remaining <= 0
|
|
906 |
+ |
|
907 |
+ yield request
|
|
908 |
+ |
|
909 |
+ offset += chunk_size
|
|
910 |
+ finished = request.finish_write
|
|
911 |
+ |
|
912 |
+ response = remote.bytestream.Write(request_stream(resource_name, stream))
|
|
913 |
+ |
|
914 |
+ assert response.committed_size == digest.size_bytes
|
|
915 |
+ |
|
916 |
+ def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
|
|
917 |
+ required_blobs = self._required_blobs(digest)
|
|
918 |
+ |
|
919 |
+ missing_blobs = dict()
|
|
920 |
+ # Limit size of FindMissingBlobs request
|
|
921 |
+ for required_blobs_group in _grouper(required_blobs, 512):
|
|
922 |
+ request = remote_execution_pb2.FindMissingBlobsRequest()
|
|
923 |
+ |
|
924 |
+ for required_digest in required_blobs_group:
|
|
925 |
+ d = request.blob_digests.add()
|
|
926 |
+ d.hash = required_digest.hash
|
|
927 |
+ d.size_bytes = required_digest.size_bytes
|
|
928 |
+ |
|
929 |
+ response = remote.cas.FindMissingBlobs(request)
|
|
930 |
+ for missing_digest in response.missing_blob_digests:
|
|
931 |
+ d = remote_execution_pb2.Digest()
|
|
932 |
+ d.hash = missing_digest.hash
|
|
933 |
+ d.size_bytes = missing_digest.size_bytes
|
|
934 |
+ missing_blobs[d.hash] = d
|
|
935 |
+ |
|
936 |
+ # Upload any blobs missing on the server
|
|
937 |
+ self._send_blobs(remote, missing_blobs.values(), u_uid)
|
|
938 |
+ |
|
939 |
+ def _send_blobs(self, remote, digests, u_uid=uuid.uuid4()):
|
|
940 |
+ batch = _CASBatchUpdate(remote)
|
|
941 |
+ |
|
942 |
+ for digest in digests:
|
|
943 |
+ with open(self.objpath(digest), 'rb') as f:
|
|
944 |
+ assert os.fstat(f.fileno()).st_size == digest.size_bytes
|
|
945 |
+ |
|
946 |
+ if (digest.size_bytes >= remote.max_batch_total_size_bytes or
|
|
947 |
+ not remote.batch_update_supported):
|
|
948 |
+ # Too large for batch request, upload in independent request.
|
|
949 |
+ self._send_blob(remote, digest, f, u_uid=u_uid)
|
|
950 |
+ else:
|
|
951 |
+ if not batch.add(digest, f):
|
|
952 |
+ # Not enough space left in batch request.
|
|
953 |
+ # Complete pending batch first.
|
|
954 |
+ batch.send()
|
|
955 |
+ batch = _CASBatchUpdate(remote)
|
|
956 |
+ batch.add(digest, f)
|
|
957 |
+ |
|
958 |
+ # Send final batch
|
|
959 |
+ batch.send()
|
|
960 |
+ |
|
925 | 961 |
|
926 | 962 |
# Represents a single remote CAS cache.
|
927 | 963 |
#
|
... | ... | @@ -995,6 +1031,17 @@ class _CASRemote(): |
995 | 1031 |
if e.code() != grpc.StatusCode.UNIMPLEMENTED:
|
996 | 1032 |
raise
|
997 | 1033 |
|
1034 |
+ # Check whether the server supports BatchUpdateBlobs()
|
|
1035 |
+ self.batch_update_supported = False
|
|
1036 |
+ try:
|
|
1037 |
+ request = remote_execution_pb2.BatchUpdateBlobsRequest()
|
|
1038 |
+ response = self.cas.BatchUpdateBlobs(request)
|
|
1039 |
+ self.batch_update_supported = True
|
|
1040 |
+ except grpc.RpcError as e:
|
|
1041 |
+ if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
|
|
1042 |
+ e.code() != grpc.StatusCode.PERMISSION_DENIED):
|
|
1043 |
+ raise
|
|
1044 |
+ |
|
998 | 1045 |
self._initialized = True
|
999 | 1046 |
|
1000 | 1047 |
|
... | ... | @@ -1042,6 +1089,46 @@ class _CASBatchRead(): |
1042 | 1089 |
yield (response.digest, response.data)
|
1043 | 1090 |
|
1044 | 1091 |
|
1092 |
+# Represents a batch of blobs queued for upload.
|
|
1093 |
+#
|
|
1094 |
+class _CASBatchUpdate():
|
|
1095 |
+ def __init__(self, remote):
|
|
1096 |
+ self._remote = remote
|
|
1097 |
+ self._max_total_size_bytes = remote.max_batch_total_size_bytes
|
|
1098 |
+ self._request = remote_execution_pb2.BatchUpdateBlobsRequest()
|
|
1099 |
+ self._size = 0
|
|
1100 |
+ self._sent = False
|
|
1101 |
+ |
|
1102 |
+ def add(self, digest, stream):
|
|
1103 |
+ assert not self._sent
|
|
1104 |
+ |
|
1105 |
+ new_batch_size = self._size + digest.size_bytes
|
|
1106 |
+ if new_batch_size > self._max_total_size_bytes:
|
|
1107 |
+ # Not enough space left in current batch
|
|
1108 |
+ return False
|
|
1109 |
+ |
|
1110 |
+ blob_request = self._request.requests.add()
|
|
1111 |
+ blob_request.digest.hash = digest.hash
|
|
1112 |
+ blob_request.digest.size_bytes = digest.size_bytes
|
|
1113 |
+ blob_request.data = stream.read(digest.size_bytes)
|
|
1114 |
+ self._size = new_batch_size
|
|
1115 |
+ return True
|
|
1116 |
+ |
|
1117 |
+ def send(self):
|
|
1118 |
+ assert not self._sent
|
|
1119 |
+ self._sent = True
|
|
1120 |
+ |
|
1121 |
+ if len(self._request.requests) == 0:
|
|
1122 |
+ return
|
|
1123 |
+ |
|
1124 |
+ batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
|
|
1125 |
+ |
|
1126 |
+ for response in batch_response.responses:
|
|
1127 |
+ if response.status.code != grpc.StatusCode.OK.value[0]:
|
|
1128 |
+ raise ArtifactError("Failed to upload blob {}: {}".format(
|
|
1129 |
+ response.digest.hash, response.status.code))
|
|
1130 |
+ |
|
1131 |
+ |
|
1045 | 1132 |
def _grouper(iterable, n):
|
1046 | 1133 |
while True:
|
1047 | 1134 |
try:
|
... | ... | @@ -70,7 +70,7 @@ def create_server(repo, *, enable_push): |
70 | 70 |
_ByteStreamServicer(artifactcache, enable_push=enable_push), server)
|
71 | 71 |
|
72 | 72 |
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
|
73 |
- _ContentAddressableStorageServicer(artifactcache), server)
|
|
73 |
+ _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
|
|
74 | 74 |
|
75 | 75 |
remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
|
76 | 76 |
_CapabilitiesServicer(), server)
|
... | ... | @@ -224,9 +224,10 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
224 | 224 |
|
225 | 225 |
|
226 | 226 |
class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
|
227 |
- def __init__(self, cas):
|
|
227 |
+ def __init__(self, cas, *, enable_push):
|
|
228 | 228 |
super().__init__()
|
229 | 229 |
self.cas = cas
|
230 |
+ self.enable_push = enable_push
|
|
230 | 231 |
|
231 | 232 |
def FindMissingBlobs(self, request, context):
|
232 | 233 |
response = remote_execution_pb2.FindMissingBlobsResponse()
|
... | ... | @@ -262,6 +263,46 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres |
262 | 263 |
|
263 | 264 |
return response
|
264 | 265 |
|
266 |
+ def BatchUpdateBlobs(self, request, context):
|
|
267 |
+ response = remote_execution_pb2.BatchUpdateBlobsResponse()
|
|
268 |
+ |
|
269 |
+ if not self.enable_push:
|
|
270 |
+ context.set_code(grpc.StatusCode.PERMISSION_DENIED)
|
|
271 |
+ return response
|
|
272 |
+ |
|
273 |
+ batch_size = 0
|
|
274 |
+ |
|
275 |
+ for blob_request in request.requests:
|
|
276 |
+ digest = blob_request.digest
|
|
277 |
+ |
|
278 |
+ batch_size += digest.size_bytes
|
|
279 |
+ if batch_size > _MAX_PAYLOAD_BYTES:
|
|
280 |
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
|
281 |
+ return response
|
|
282 |
+ |
|
283 |
+ blob_response = response.responses.add()
|
|
284 |
+ blob_response.digest.hash = digest.hash
|
|
285 |
+ blob_response.digest.size_bytes = digest.size_bytes
|
|
286 |
+ |
|
287 |
+ if len(blob_request.data) != digest.size_bytes:
|
|
288 |
+ blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
|
|
289 |
+ continue
|
|
290 |
+ |
|
291 |
+ try:
|
|
292 |
+ _clean_up_cache(self.cas, digest.size_bytes)
|
|
293 |
+ |
|
294 |
+ with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
|
|
295 |
+ out.write(blob_request.data)
|
|
296 |
+ out.flush()
|
|
297 |
+ server_digest = self.cas.add_object(path=out.name)
|
|
298 |
+ if server_digest.hash != digest.hash:
|
|
299 |
+ blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
|
|
300 |
+ |
|
301 |
+ except ArtifactTooLargeException:
|
|
302 |
+ blob_response.status.code = grpc.StatusCode.RESOURCE_EXHAUSTED
|
|
303 |
+ |
|
304 |
+ return response
|
|
305 |
+ |
|
265 | 306 |
|
266 | 307 |
class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer):
|
267 | 308 |
def GetCapabilities(self, request, context):
|
... | ... | @@ -164,10 +164,18 @@ class GitMirror(SourceFetcher): |
164 | 164 |
cwd=self.mirror)
|
165 | 165 |
|
166 | 166 |
def fetch(self, alias_override=None):
|
167 |
- self.ensure(alias_override)
|
|
168 |
- if not self.has_ref():
|
|
169 |
- self._fetch(alias_override)
|
|
170 |
- self.assert_ref()
|
|
167 |
+ # Resolve the URL for the message
|
|
168 |
+ resolved_url = self.source.translate_url(self.url,
|
|
169 |
+ alias_override=alias_override,
|
|
170 |
+ primary=self.primary)
|
|
171 |
+ |
|
172 |
+ with self.source.timed_activity("Fetching from {}"
|
|
173 |
+ .format(resolved_url),
|
|
174 |
+ silent_nested=True):
|
|
175 |
+ self.ensure(alias_override)
|
|
176 |
+ if not self.has_ref():
|
|
177 |
+ self._fetch(alias_override)
|
|
178 |
+ self.assert_ref()
|
|
171 | 179 |
|
172 | 180 |
def has_ref(self):
|
173 | 181 |
if not self.ref:
|
... | ... | @@ -585,28 +585,48 @@ class Source(Plugin): |
585 | 585 |
#
|
586 | 586 |
def _fetch(self):
|
587 | 587 |
project = self._get_project()
|
588 |
- source_fetchers = self.get_source_fetchers()
|
|
588 |
+ context = self._get_context()
|
|
589 |
+ |
|
590 |
+ # Silence the STATUS messages which might happen as a result
|
|
591 |
+ # of checking the source fetchers.
|
|
592 |
+ with context.silence():
|
|
593 |
+ source_fetchers = self.get_source_fetchers()
|
|
589 | 594 |
|
590 | 595 |
# Use the source fetchers if they are provided
|
591 | 596 |
#
|
592 | 597 |
if source_fetchers:
|
593 |
- for fetcher in source_fetchers:
|
|
594 |
- alias = fetcher._get_alias()
|
|
595 |
- for uri in project.get_alias_uris(alias, first_pass=self.__first_pass):
|
|
596 |
- try:
|
|
597 |
- fetcher.fetch(uri)
|
|
598 |
- # FIXME: Need to consider temporary vs. permanent failures,
|
|
599 |
- # and how this works with retries.
|
|
600 |
- except BstError as e:
|
|
601 |
- last_error = e
|
|
602 |
- continue
|
|
603 |
- |
|
604 |
- # No error, we're done with this fetcher
|
|
605 |
- break
|
|
606 | 598 |
|
607 |
- else:
|
|
608 |
- # No break occurred, raise the last detected error
|
|
609 |
- raise last_error
|
|
599 |
+ # Use a contorted loop here, this is to allow us to
|
|
600 |
+ # silence the messages which can result from consuming
|
|
601 |
+ # the items of source_fetchers, if it happens to be a generator.
|
|
602 |
+ #
|
|
603 |
+ source_fetchers = iter(source_fetchers)
|
|
604 |
+ try:
|
|
605 |
+ |
|
606 |
+ while True:
|
|
607 |
+ |
|
608 |
+ with context.silence():
|
|
609 |
+ fetcher = next(source_fetchers)
|
|
610 |
+ |
|
611 |
+ alias = fetcher._get_alias()
|
|
612 |
+ for uri in project.get_alias_uris(alias, first_pass=self.__first_pass):
|
|
613 |
+ try:
|
|
614 |
+ fetcher.fetch(uri)
|
|
615 |
+ # FIXME: Need to consider temporary vs. permanent failures,
|
|
616 |
+ # and how this works with retries.
|
|
617 |
+ except BstError as e:
|
|
618 |
+ last_error = e
|
|
619 |
+ continue
|
|
620 |
+ |
|
621 |
+ # No error, we're done with this fetcher
|
|
622 |
+ break
|
|
623 |
+ |
|
624 |
+ else:
|
|
625 |
+ # No break occurred, raise the last detected error
|
|
626 |
+ raise last_error
|
|
627 |
+ |
|
628 |
+ except StopIteration:
|
|
629 |
+ pass
|
|
610 | 630 |
|
611 | 631 |
# Default codepath is to reinstantiate the Source
|
612 | 632 |
#
|