Tristan Van Berkom pushed to branch juerg/cas-batch-1.2 at BuildStream / buildstream
Commits:
-
a009dcbe
by Tristan Van Berkom at 2018-10-02T11:51:19Z
-
7da5104b
by Tristan Van Berkom at 2018-10-02T13:18:54Z
-
6e820362
by Tristan Van Berkom at 2018-10-03T07:35:03Z
-
9568824f
by Jim MacArthur at 2018-10-03T07:35:51Z
-
4a67e4e3
by Jürg Billeter at 2018-10-03T07:35:51Z
-
f585b233
by Jürg Billeter at 2018-10-03T07:35:51Z
4 changed files:
- buildstream/_artifactcache/cascache.py
- buildstream/_artifactcache/casserver.py
- buildstream/plugins/sources/git.py
- buildstream/source.py
Changes:
| ... | ... | @@ -81,6 +81,7 @@ class CASCache(ArtifactCache): |
| 81 | 81 |
################################################
|
| 82 | 82 |
# Implementation of abstract methods #
|
| 83 | 83 |
################################################
|
| 84 |
+ |
|
| 84 | 85 |
def contains(self, element, key):
|
| 85 | 86 |
refpath = self._refpath(self.get_artifact_fullname(element, key))
|
| 86 | 87 |
|
| ... | ... | @@ -156,6 +157,7 @@ class CASCache(ArtifactCache): |
| 156 | 157 |
q = multiprocessing.Queue()
|
| 157 | 158 |
for remote_spec in remote_specs:
|
| 158 | 159 |
# Use subprocess to avoid creation of gRPC threads in main BuildStream process
|
| 160 |
+ # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
|
|
| 159 | 161 |
p = multiprocessing.Process(target=self._initialize_remote, args=(remote_spec, q))
|
| 160 | 162 |
|
| 161 | 163 |
try:
|
| ... | ... | @@ -268,109 +270,69 @@ class CASCache(ArtifactCache): |
| 268 | 270 |
|
| 269 | 271 |
self.set_ref(newref, tree)
|
| 270 | 272 |
|
| 273 |
+ def _push_refs_to_remote(self, refs, remote):
|
|
| 274 |
+ skipped_remote = True
|
|
| 275 |
+ try:
|
|
| 276 |
+ for ref in refs:
|
|
| 277 |
+ tree = self.resolve_ref(ref)
|
|
| 278 |
+ |
|
| 279 |
+ # Check whether ref is already on the server in which case
|
|
| 280 |
+ # there is no need to push the artifact
|
|
| 281 |
+ try:
|
|
| 282 |
+ request = buildstream_pb2.GetReferenceRequest()
|
|
| 283 |
+ request.key = ref
|
|
| 284 |
+ response = remote.ref_storage.GetReference(request)
|
|
| 285 |
+ |
|
| 286 |
+ if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
|
|
| 287 |
+ # ref is already on the server with the same tree
|
|
| 288 |
+ continue
|
|
| 289 |
+ |
|
| 290 |
+ except grpc.RpcError as e:
|
|
| 291 |
+ if e.code() != grpc.StatusCode.NOT_FOUND:
|
|
| 292 |
+ # Intentionally re-raise RpcError for outer except block.
|
|
| 293 |
+ raise
|
|
| 294 |
+ |
|
| 295 |
+ self._send_directory(remote, tree)
|
|
| 296 |
+ |
|
| 297 |
+ request = buildstream_pb2.UpdateReferenceRequest()
|
|
| 298 |
+ request.keys.append(ref)
|
|
| 299 |
+ request.digest.hash = tree.hash
|
|
| 300 |
+ request.digest.size_bytes = tree.size_bytes
|
|
| 301 |
+ remote.ref_storage.UpdateReference(request)
|
|
| 302 |
+ |
|
| 303 |
+ skipped_remote = False
|
|
| 304 |
+ except grpc.RpcError as e:
|
|
| 305 |
+ if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
|
|
| 306 |
+ raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
|
|
| 307 |
+ |
|
| 308 |
+ return not skipped_remote
|
|
| 309 |
+ |
|
| 271 | 310 |
def push(self, element, keys):
|
| 272 |
- refs = [self.get_artifact_fullname(element, key) for key in keys]
|
|
| 311 |
+ |
|
| 312 |
+ refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
|
|
| 273 | 313 |
|
| 274 | 314 |
project = element._get_project()
|
| 275 | 315 |
|
| 276 | 316 |
push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
| 277 | 317 |
|
| 278 | 318 |
pushed = False
|
| 279 |
- display_key = element._get_brief_display_key()
|
|
| 319 |
+ |
|
| 280 | 320 |
for remote in push_remotes:
|
| 281 | 321 |
remote.init()
|
| 282 |
- skipped_remote = True
|
|
| 322 |
+ display_key = element._get_brief_display_key()
|
|
| 283 | 323 |
element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
|
| 284 | 324 |
|
| 285 |
- try:
|
|
| 286 |
- for ref in refs:
|
|
| 287 |
- tree = self.resolve_ref(ref)
|
|
| 288 |
- |
|
| 289 |
- # Check whether ref is already on the server in which case
|
|
| 290 |
- # there is no need to push the artifact
|
|
| 291 |
- try:
|
|
| 292 |
- request = buildstream_pb2.GetReferenceRequest()
|
|
| 293 |
- request.key = ref
|
|
| 294 |
- response = remote.ref_storage.GetReference(request)
|
|
| 295 |
- |
|
| 296 |
- if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
|
|
| 297 |
- # ref is already on the server with the same tree
|
|
| 298 |
- continue
|
|
| 299 |
- |
|
| 300 |
- except grpc.RpcError as e:
|
|
| 301 |
- if e.code() != grpc.StatusCode.NOT_FOUND:
|
|
| 302 |
- # Intentionally re-raise RpcError for outer except block.
|
|
| 303 |
- raise
|
|
| 304 |
- |
|
| 305 |
- missing_blobs = {}
|
|
| 306 |
- required_blobs = self._required_blobs(tree)
|
|
| 307 |
- |
|
| 308 |
- # Limit size of FindMissingBlobs request
|
|
| 309 |
- for required_blobs_group in _grouper(required_blobs, 512):
|
|
| 310 |
- request = remote_execution_pb2.FindMissingBlobsRequest()
|
|
| 311 |
- |
|
| 312 |
- for required_digest in required_blobs_group:
|
|
| 313 |
- d = request.blob_digests.add()
|
|
| 314 |
- d.hash = required_digest.hash
|
|
| 315 |
- d.size_bytes = required_digest.size_bytes
|
|
| 316 |
- |
|
| 317 |
- response = remote.cas.FindMissingBlobs(request)
|
|
| 318 |
- for digest in response.missing_blob_digests:
|
|
| 319 |
- d = remote_execution_pb2.Digest()
|
|
| 320 |
- d.hash = digest.hash
|
|
| 321 |
- d.size_bytes = digest.size_bytes
|
|
| 322 |
- missing_blobs[d.hash] = d
|
|
| 323 |
- |
|
| 324 |
- # Upload any blobs missing on the server
|
|
| 325 |
- skipped_remote = False
|
|
| 326 |
- for digest in missing_blobs.values():
|
|
| 327 |
- uuid_ = uuid.uuid4()
|
|
| 328 |
- resource_name = '/'.join(['uploads', str(uuid_), 'blobs',
|
|
| 329 |
- digest.hash, str(digest.size_bytes)])
|
|
| 330 |
- |
|
| 331 |
- def request_stream(resname):
|
|
| 332 |
- with open(self.objpath(digest), 'rb') as f:
|
|
| 333 |
- assert os.fstat(f.fileno()).st_size == digest.size_bytes
|
|
| 334 |
- offset = 0
|
|
| 335 |
- finished = False
|
|
| 336 |
- remaining = digest.size_bytes
|
|
| 337 |
- while not finished:
|
|
| 338 |
- chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
|
|
| 339 |
- remaining -= chunk_size
|
|
| 340 |
- |
|
| 341 |
- request = bytestream_pb2.WriteRequest()
|
|
| 342 |
- request.write_offset = offset
|
|
| 343 |
- # max. _MAX_PAYLOAD_BYTES chunks
|
|
| 344 |
- request.data = f.read(chunk_size)
|
|
| 345 |
- request.resource_name = resname
|
|
| 346 |
- request.finish_write = remaining <= 0
|
|
| 347 |
- yield request
|
|
| 348 |
- offset += chunk_size
|
|
| 349 |
- finished = request.finish_write
|
|
| 350 |
- response = remote.bytestream.Write(request_stream(resource_name))
|
|
| 351 |
- |
|
| 352 |
- request = buildstream_pb2.UpdateReferenceRequest()
|
|
| 353 |
- request.keys.append(ref)
|
|
| 354 |
- request.digest.hash = tree.hash
|
|
| 355 |
- request.digest.size_bytes = tree.size_bytes
|
|
| 356 |
- remote.ref_storage.UpdateReference(request)
|
|
| 357 |
- |
|
| 358 |
- pushed = True
|
|
| 359 |
- |
|
| 360 |
- if not skipped_remote:
|
|
| 361 |
- element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
|
|
| 362 |
- |
|
| 363 |
- except grpc.RpcError as e:
|
|
| 364 |
- if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
|
|
| 365 |
- raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
|
|
| 366 |
- |
|
| 367 |
- if skipped_remote:
|
|
| 325 |
+ if self._push_refs_to_remote(refs, remote):
|
|
| 326 |
+ element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
|
|
| 327 |
+ pushed = True
|
|
| 328 |
+ else:
|
|
| 368 | 329 |
self.context.message(Message(
|
| 369 | 330 |
None,
|
| 370 | 331 |
MessageType.INFO,
|
| 371 | 332 |
"Remote ({}) already has {} cached".format(
|
| 372 | 333 |
remote.spec.url, element._get_brief_display_key())
|
| 373 | 334 |
))
|
| 335 |
+ |
|
| 374 | 336 |
return pushed
|
| 375 | 337 |
|
| 376 | 338 |
################################################
|
| ... | ... | @@ -599,6 +561,7 @@ class CASCache(ArtifactCache): |
| 599 | 561 |
################################################
|
| 600 | 562 |
# Local Private Methods #
|
| 601 | 563 |
################################################
|
| 564 |
+ |
|
| 602 | 565 |
def _checkout(self, dest, tree):
|
| 603 | 566 |
os.makedirs(dest, exist_ok=True)
|
| 604 | 567 |
|
| ... | ... | @@ -776,16 +739,16 @@ class CASCache(ArtifactCache): |
| 776 | 739 |
#
|
| 777 | 740 |
q.put(str(e))
|
| 778 | 741 |
|
| 779 |
- def _required_blobs(self, tree):
|
|
| 742 |
+ def _required_blobs(self, directory_digest):
|
|
| 780 | 743 |
# parse directory, and recursively add blobs
|
| 781 | 744 |
d = remote_execution_pb2.Digest()
|
| 782 |
- d.hash = tree.hash
|
|
| 783 |
- d.size_bytes = tree.size_bytes
|
|
| 745 |
+ d.hash = directory_digest.hash
|
|
| 746 |
+ d.size_bytes = directory_digest.size_bytes
|
|
| 784 | 747 |
yield d
|
| 785 | 748 |
|
| 786 | 749 |
directory = remote_execution_pb2.Directory()
|
| 787 | 750 |
|
| 788 |
- with open(self.objpath(tree), 'rb') as f:
|
|
| 751 |
+ with open(self.objpath(directory_digest), 'rb') as f:
|
|
| 789 | 752 |
directory.ParseFromString(f.read())
|
| 790 | 753 |
|
| 791 | 754 |
for filenode in directory.files:
|
| ... | ... | @@ -797,16 +760,16 @@ class CASCache(ArtifactCache): |
| 797 | 760 |
for dirnode in directory.directories:
|
| 798 | 761 |
yield from self._required_blobs(dirnode.digest)
|
| 799 | 762 |
|
| 800 |
- def _fetch_blob(self, remote, digest, out):
|
|
| 763 |
+ def _fetch_blob(self, remote, digest, stream):
|
|
| 801 | 764 |
resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
|
| 802 | 765 |
request = bytestream_pb2.ReadRequest()
|
| 803 | 766 |
request.resource_name = resource_name
|
| 804 | 767 |
request.read_offset = 0
|
| 805 | 768 |
for response in remote.bytestream.Read(request):
|
| 806 |
- out.write(response.data)
|
|
| 769 |
+ stream.write(response.data)
|
|
| 770 |
+ stream.flush()
|
|
| 807 | 771 |
|
| 808 |
- out.flush()
|
|
| 809 |
- assert digest.size_bytes == os.fstat(out.fileno()).st_size
|
|
| 772 |
+ assert digest.size_bytes == os.fstat(stream.fileno()).st_size
|
|
| 810 | 773 |
|
| 811 | 774 |
# _ensure_blob():
|
| 812 | 775 |
#
|
| ... | ... | @@ -922,6 +885,79 @@ class CASCache(ArtifactCache): |
| 922 | 885 |
# Fetch final batch
|
| 923 | 886 |
self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
|
| 924 | 887 |
|
| 888 |
+ def _send_blob(self, remote, digest, stream, u_uid=uuid.uuid4()):
|
|
| 889 |
+ resource_name = '/'.join(['uploads', str(u_uid), 'blobs',
|
|
| 890 |
+ digest.hash, str(digest.size_bytes)])
|
|
| 891 |
+ |
|
| 892 |
+ def request_stream(resname, instream):
|
|
| 893 |
+ offset = 0
|
|
| 894 |
+ finished = False
|
|
| 895 |
+ remaining = digest.size_bytes
|
|
| 896 |
+ while not finished:
|
|
| 897 |
+ chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
|
|
| 898 |
+ remaining -= chunk_size
|
|
| 899 |
+ |
|
| 900 |
+ request = bytestream_pb2.WriteRequest()
|
|
| 901 |
+ request.write_offset = offset
|
|
| 902 |
+ # max. _MAX_PAYLOAD_BYTES chunks
|
|
| 903 |
+ request.data = instream.read(chunk_size)
|
|
| 904 |
+ request.resource_name = resname
|
|
| 905 |
+ request.finish_write = remaining <= 0
|
|
| 906 |
+ |
|
| 907 |
+ yield request
|
|
| 908 |
+ |
|
| 909 |
+ offset += chunk_size
|
|
| 910 |
+ finished = request.finish_write
|
|
| 911 |
+ |
|
| 912 |
+ response = remote.bytestream.Write(request_stream(resource_name, stream))
|
|
| 913 |
+ |
|
| 914 |
+ assert response.committed_size == digest.size_bytes
|
|
| 915 |
+ |
|
| 916 |
+ def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
|
|
| 917 |
+ required_blobs = self._required_blobs(digest)
|
|
| 918 |
+ |
|
| 919 |
+ missing_blobs = dict()
|
|
| 920 |
+ # Limit size of FindMissingBlobs request
|
|
| 921 |
+ for required_blobs_group in _grouper(required_blobs, 512):
|
|
| 922 |
+ request = remote_execution_pb2.FindMissingBlobsRequest()
|
|
| 923 |
+ |
|
| 924 |
+ for required_digest in required_blobs_group:
|
|
| 925 |
+ d = request.blob_digests.add()
|
|
| 926 |
+ d.hash = required_digest.hash
|
|
| 927 |
+ d.size_bytes = required_digest.size_bytes
|
|
| 928 |
+ |
|
| 929 |
+ response = remote.cas.FindMissingBlobs(request)
|
|
| 930 |
+ for missing_digest in response.missing_blob_digests:
|
|
| 931 |
+ d = remote_execution_pb2.Digest()
|
|
| 932 |
+ d.hash = missing_digest.hash
|
|
| 933 |
+ d.size_bytes = missing_digest.size_bytes
|
|
| 934 |
+ missing_blobs[d.hash] = d
|
|
| 935 |
+ |
|
| 936 |
+ # Upload any blobs missing on the server
|
|
| 937 |
+ self._send_blobs(remote, missing_blobs.values(), u_uid)
|
|
| 938 |
+ |
|
| 939 |
+ def _send_blobs(self, remote, digests, u_uid=uuid.uuid4()):
|
|
| 940 |
+ batch = _CASBatchUpdate(remote)
|
|
| 941 |
+ |
|
| 942 |
+ for digest in digests:
|
|
| 943 |
+ with open(self.objpath(digest), 'rb') as f:
|
|
| 944 |
+ assert os.fstat(f.fileno()).st_size == digest.size_bytes
|
|
| 945 |
+ |
|
| 946 |
+ if (digest.size_bytes >= remote.max_batch_total_size_bytes or
|
|
| 947 |
+ not remote.batch_update_supported):
|
|
| 948 |
+ # Too large for batch request, upload in independent request.
|
|
| 949 |
+ self._send_blob(remote, digest, f, u_uid=u_uid)
|
|
| 950 |
+ else:
|
|
| 951 |
+ if not batch.add(digest, f):
|
|
| 952 |
+ # Not enough space left in batch request.
|
|
| 953 |
+ # Complete pending batch first.
|
|
| 954 |
+ batch.send()
|
|
| 955 |
+ batch = _CASBatchUpdate(remote)
|
|
| 956 |
+ batch.add(digest, f)
|
|
| 957 |
+ |
|
| 958 |
+ # Send final batch
|
|
| 959 |
+ batch.send()
|
|
| 960 |
+ |
|
| 925 | 961 |
|
| 926 | 962 |
# Represents a single remote CAS cache.
|
| 927 | 963 |
#
|
| ... | ... | @@ -995,6 +1031,17 @@ class _CASRemote(): |
| 995 | 1031 |
if e.code() != grpc.StatusCode.UNIMPLEMENTED:
|
| 996 | 1032 |
raise
|
| 997 | 1033 |
|
| 1034 |
+ # Check whether the server supports BatchUpdateBlobs()
|
|
| 1035 |
+ self.batch_update_supported = False
|
|
| 1036 |
+ try:
|
|
| 1037 |
+ request = remote_execution_pb2.BatchUpdateBlobsRequest()
|
|
| 1038 |
+ response = self.cas.BatchUpdateBlobs(request)
|
|
| 1039 |
+ self.batch_update_supported = True
|
|
| 1040 |
+ except grpc.RpcError as e:
|
|
| 1041 |
+ if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
|
|
| 1042 |
+ e.code() != grpc.StatusCode.PERMISSION_DENIED):
|
|
| 1043 |
+ raise
|
|
| 1044 |
+ |
|
| 998 | 1045 |
self._initialized = True
|
| 999 | 1046 |
|
| 1000 | 1047 |
|
| ... | ... | @@ -1042,6 +1089,46 @@ class _CASBatchRead(): |
| 1042 | 1089 |
yield (response.digest, response.data)
|
| 1043 | 1090 |
|
| 1044 | 1091 |
|
| 1092 |
+# Represents a batch of blobs queued for upload.
|
|
| 1093 |
+#
|
|
| 1094 |
+class _CASBatchUpdate():
|
|
| 1095 |
+ def __init__(self, remote):
|
|
| 1096 |
+ self._remote = remote
|
|
| 1097 |
+ self._max_total_size_bytes = remote.max_batch_total_size_bytes
|
|
| 1098 |
+ self._request = remote_execution_pb2.BatchUpdateBlobsRequest()
|
|
| 1099 |
+ self._size = 0
|
|
| 1100 |
+ self._sent = False
|
|
| 1101 |
+ |
|
| 1102 |
+ def add(self, digest, stream):
|
|
| 1103 |
+ assert not self._sent
|
|
| 1104 |
+ |
|
| 1105 |
+ new_batch_size = self._size + digest.size_bytes
|
|
| 1106 |
+ if new_batch_size > self._max_total_size_bytes:
|
|
| 1107 |
+ # Not enough space left in current batch
|
|
| 1108 |
+ return False
|
|
| 1109 |
+ |
|
| 1110 |
+ blob_request = self._request.requests.add()
|
|
| 1111 |
+ blob_request.digest.hash = digest.hash
|
|
| 1112 |
+ blob_request.digest.size_bytes = digest.size_bytes
|
|
| 1113 |
+ blob_request.data = stream.read(digest.size_bytes)
|
|
| 1114 |
+ self._size = new_batch_size
|
|
| 1115 |
+ return True
|
|
| 1116 |
+ |
|
| 1117 |
+ def send(self):
|
|
| 1118 |
+ assert not self._sent
|
|
| 1119 |
+ self._sent = True
|
|
| 1120 |
+ |
|
| 1121 |
+ if len(self._request.requests) == 0:
|
|
| 1122 |
+ return
|
|
| 1123 |
+ |
|
| 1124 |
+ batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
|
|
| 1125 |
+ |
|
| 1126 |
+ for response in batch_response.responses:
|
|
| 1127 |
+ if response.status.code != grpc.StatusCode.OK.value[0]:
|
|
| 1128 |
+ raise ArtifactError("Failed to upload blob {}: {}".format(
|
|
| 1129 |
+ response.digest.hash, response.status.code))
|
|
| 1130 |
+ |
|
| 1131 |
+ |
|
| 1045 | 1132 |
def _grouper(iterable, n):
|
| 1046 | 1133 |
while True:
|
| 1047 | 1134 |
try:
|
| ... | ... | @@ -70,7 +70,7 @@ def create_server(repo, *, enable_push): |
| 70 | 70 |
_ByteStreamServicer(artifactcache, enable_push=enable_push), server)
|
| 71 | 71 |
|
| 72 | 72 |
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
|
| 73 |
- _ContentAddressableStorageServicer(artifactcache), server)
|
|
| 73 |
+ _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
|
|
| 74 | 74 |
|
| 75 | 75 |
remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
|
| 76 | 76 |
_CapabilitiesServicer(), server)
|
| ... | ... | @@ -224,9 +224,10 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
| 224 | 224 |
|
| 225 | 225 |
|
| 226 | 226 |
class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
|
| 227 |
- def __init__(self, cas):
|
|
| 227 |
+ def __init__(self, cas, *, enable_push):
|
|
| 228 | 228 |
super().__init__()
|
| 229 | 229 |
self.cas = cas
|
| 230 |
+ self.enable_push = enable_push
|
|
| 230 | 231 |
|
| 231 | 232 |
def FindMissingBlobs(self, request, context):
|
| 232 | 233 |
response = remote_execution_pb2.FindMissingBlobsResponse()
|
| ... | ... | @@ -262,6 +263,46 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres |
| 262 | 263 |
|
| 263 | 264 |
return response
|
| 264 | 265 |
|
| 266 |
+ def BatchUpdateBlobs(self, request, context):
|
|
| 267 |
+ response = remote_execution_pb2.BatchUpdateBlobsResponse()
|
|
| 268 |
+ |
|
| 269 |
+ if not self.enable_push:
|
|
| 270 |
+ context.set_code(grpc.StatusCode.PERMISSION_DENIED)
|
|
| 271 |
+ return response
|
|
| 272 |
+ |
|
| 273 |
+ batch_size = 0
|
|
| 274 |
+ |
|
| 275 |
+ for blob_request in request.requests:
|
|
| 276 |
+ digest = blob_request.digest
|
|
| 277 |
+ |
|
| 278 |
+ batch_size += digest.size_bytes
|
|
| 279 |
+ if batch_size > _MAX_PAYLOAD_BYTES:
|
|
| 280 |
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
|
| 281 |
+ return response
|
|
| 282 |
+ |
|
| 283 |
+ blob_response = response.responses.add()
|
|
| 284 |
+ blob_response.digest.hash = digest.hash
|
|
| 285 |
+ blob_response.digest.size_bytes = digest.size_bytes
|
|
| 286 |
+ |
|
| 287 |
+ if len(blob_request.data) != digest.size_bytes:
|
|
| 288 |
+ blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
|
|
| 289 |
+ continue
|
|
| 290 |
+ |
|
| 291 |
+ try:
|
|
| 292 |
+ _clean_up_cache(self.cas, digest.size_bytes)
|
|
| 293 |
+ |
|
| 294 |
+ with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
|
|
| 295 |
+ out.write(blob_request.data)
|
|
| 296 |
+ out.flush()
|
|
| 297 |
+ server_digest = self.cas.add_object(path=out.name)
|
|
| 298 |
+ if server_digest.hash != digest.hash:
|
|
| 299 |
+ blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
|
|
| 300 |
+ |
|
| 301 |
+ except ArtifactTooLargeException:
|
|
| 302 |
+ blob_response.status.code = grpc.StatusCode.RESOURCE_EXHAUSTED
|
|
| 303 |
+ |
|
| 304 |
+ return response
|
|
| 305 |
+ |
|
| 265 | 306 |
|
| 266 | 307 |
class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer):
|
| 267 | 308 |
def GetCapabilities(self, request, context):
|
| ... | ... | @@ -164,10 +164,18 @@ class GitMirror(SourceFetcher): |
| 164 | 164 |
cwd=self.mirror)
|
| 165 | 165 |
|
| 166 | 166 |
def fetch(self, alias_override=None):
|
| 167 |
- self.ensure(alias_override)
|
|
| 168 |
- if not self.has_ref():
|
|
| 169 |
- self._fetch(alias_override)
|
|
| 170 |
- self.assert_ref()
|
|
| 167 |
+ # Resolve the URL for the message
|
|
| 168 |
+ resolved_url = self.source.translate_url(self.url,
|
|
| 169 |
+ alias_override=alias_override,
|
|
| 170 |
+ primary=self.primary)
|
|
| 171 |
+ |
|
| 172 |
+ with self.source.timed_activity("Fetching from {}"
|
|
| 173 |
+ .format(resolved_url),
|
|
| 174 |
+ silent_nested=True):
|
|
| 175 |
+ self.ensure(alias_override)
|
|
| 176 |
+ if not self.has_ref():
|
|
| 177 |
+ self._fetch(alias_override)
|
|
| 178 |
+ self.assert_ref()
|
|
| 171 | 179 |
|
| 172 | 180 |
def has_ref(self):
|
| 173 | 181 |
if not self.ref:
|
| ... | ... | @@ -585,28 +585,48 @@ class Source(Plugin): |
| 585 | 585 |
#
|
| 586 | 586 |
def _fetch(self):
|
| 587 | 587 |
project = self._get_project()
|
| 588 |
- source_fetchers = self.get_source_fetchers()
|
|
| 588 |
+ context = self._get_context()
|
|
| 589 |
+ |
|
| 590 |
+ # Silence the STATUS messages which might happen as a result
|
|
| 591 |
+ # of checking the source fetchers.
|
|
| 592 |
+ with context.silence():
|
|
| 593 |
+ source_fetchers = self.get_source_fetchers()
|
|
| 589 | 594 |
|
| 590 | 595 |
# Use the source fetchers if they are provided
|
| 591 | 596 |
#
|
| 592 | 597 |
if source_fetchers:
|
| 593 |
- for fetcher in source_fetchers:
|
|
| 594 |
- alias = fetcher._get_alias()
|
|
| 595 |
- for uri in project.get_alias_uris(alias, first_pass=self.__first_pass):
|
|
| 596 |
- try:
|
|
| 597 |
- fetcher.fetch(uri)
|
|
| 598 |
- # FIXME: Need to consider temporary vs. permanent failures,
|
|
| 599 |
- # and how this works with retries.
|
|
| 600 |
- except BstError as e:
|
|
| 601 |
- last_error = e
|
|
| 602 |
- continue
|
|
| 603 |
- |
|
| 604 |
- # No error, we're done with this fetcher
|
|
| 605 |
- break
|
|
| 606 | 598 |
|
| 607 |
- else:
|
|
| 608 |
- # No break occurred, raise the last detected error
|
|
| 609 |
- raise last_error
|
|
| 599 |
+ # Use a contorted loop here, this is to allow us to
|
|
| 600 |
+ # silence the messages which can result from consuming
|
|
| 601 |
+ # the items of source_fetchers, if it happens to be a generator.
|
|
| 602 |
+ #
|
|
| 603 |
+ source_fetchers = iter(source_fetchers)
|
|
| 604 |
+ try:
|
|
| 605 |
+ |
|
| 606 |
+ while True:
|
|
| 607 |
+ |
|
| 608 |
+ with context.silence():
|
|
| 609 |
+ fetcher = next(source_fetchers)
|
|
| 610 |
+ |
|
| 611 |
+ alias = fetcher._get_alias()
|
|
| 612 |
+ for uri in project.get_alias_uris(alias, first_pass=self.__first_pass):
|
|
| 613 |
+ try:
|
|
| 614 |
+ fetcher.fetch(uri)
|
|
| 615 |
+ # FIXME: Need to consider temporary vs. permanent failures,
|
|
| 616 |
+ # and how this works with retries.
|
|
| 617 |
+ except BstError as e:
|
|
| 618 |
+ last_error = e
|
|
| 619 |
+ continue
|
|
| 620 |
+ |
|
| 621 |
+ # No error, we're done with this fetcher
|
|
| 622 |
+ break
|
|
| 623 |
+ |
|
| 624 |
+ else:
|
|
| 625 |
+ # No break occurred, raise the last detected error
|
|
| 626 |
+ raise last_error
|
|
| 627 |
+ |
|
| 628 |
+ except StopIteration:
|
|
| 629 |
+ pass
|
|
| 610 | 630 |
|
| 611 | 631 |
# Default codepath is to reinstantiate the Source
|
| 612 | 632 |
#
|
