... |
... |
@@ -81,6 +81,7 @@ class CASCache(ArtifactCache): |
81
|
81
|
################################################
|
82
|
82
|
# Implementation of abstract methods #
|
83
|
83
|
################################################
|
|
84
|
+
|
84
|
85
|
def contains(self, element, key):
|
85
|
86
|
refpath = self._refpath(self.get_artifact_fullname(element, key))
|
86
|
87
|
|
... |
... |
@@ -156,6 +157,7 @@ class CASCache(ArtifactCache): |
156
|
157
|
q = multiprocessing.Queue()
|
157
|
158
|
for remote_spec in remote_specs:
|
158
|
159
|
# Use subprocess to avoid creation of gRPC threads in main BuildStream process
|
|
160
|
+ # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
|
159
|
161
|
p = multiprocessing.Process(target=self._initialize_remote, args=(remote_spec, q))
|
160
|
162
|
|
161
|
163
|
try:
|
... |
... |
@@ -268,109 +270,69 @@ class CASCache(ArtifactCache): |
268
|
270
|
|
269
|
271
|
self.set_ref(newref, tree)
|
270
|
272
|
|
|
273
|
+ def _push_refs_to_remote(self, refs, remote):
|
|
274
|
+ skipped_remote = True
|
|
275
|
+ try:
|
|
276
|
+ for ref in refs:
|
|
277
|
+ tree = self.resolve_ref(ref)
|
|
278
|
+
|
|
279
|
+ # Check whether ref is already on the server in which case
|
|
280
|
+ # there is no need to push the artifact
|
|
281
|
+ try:
|
|
282
|
+ request = buildstream_pb2.GetReferenceRequest()
|
|
283
|
+ request.key = ref
|
|
284
|
+ response = remote.ref_storage.GetReference(request)
|
|
285
|
+
|
|
286
|
+ if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
|
|
287
|
+ # ref is already on the server with the same tree
|
|
288
|
+ continue
|
|
289
|
+
|
|
290
|
+ except grpc.RpcError as e:
|
|
291
|
+ if e.code() != grpc.StatusCode.NOT_FOUND:
|
|
292
|
+ # Intentionally re-raise RpcError for outer except block.
|
|
293
|
+ raise
|
|
294
|
+
|
|
295
|
+ self._send_directory(remote, tree)
|
|
296
|
+
|
|
297
|
+ request = buildstream_pb2.UpdateReferenceRequest()
|
|
298
|
+ request.keys.append(ref)
|
|
299
|
+ request.digest.hash = tree.hash
|
|
300
|
+ request.digest.size_bytes = tree.size_bytes
|
|
301
|
+ remote.ref_storage.UpdateReference(request)
|
|
302
|
+
|
|
303
|
+ skipped_remote = False
|
|
304
|
+ except grpc.RpcError as e:
|
|
305
|
+ if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
|
|
306
|
+ raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
|
|
307
|
+
|
|
308
|
+ return not skipped_remote
|
|
309
|
+
|
271
|
310
|
def push(self, element, keys):
|
272
|
|
- refs = [self.get_artifact_fullname(element, key) for key in keys]
|
|
311
|
+
|
|
312
|
+ refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
|
273
|
313
|
|
274
|
314
|
project = element._get_project()
|
275
|
315
|
|
276
|
316
|
push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
277
|
317
|
|
278
|
318
|
pushed = False
|
279
|
|
- display_key = element._get_brief_display_key()
|
|
319
|
+
|
280
|
320
|
for remote in push_remotes:
|
281
|
321
|
remote.init()
|
282
|
|
- skipped_remote = True
|
|
322
|
+ display_key = element._get_brief_display_key()
|
283
|
323
|
element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
|
284
|
324
|
|
285
|
|
- try:
|
286
|
|
- for ref in refs:
|
287
|
|
- tree = self.resolve_ref(ref)
|
288
|
|
-
|
289
|
|
- # Check whether ref is already on the server in which case
|
290
|
|
- # there is no need to push the artifact
|
291
|
|
- try:
|
292
|
|
- request = buildstream_pb2.GetReferenceRequest()
|
293
|
|
- request.key = ref
|
294
|
|
- response = remote.ref_storage.GetReference(request)
|
295
|
|
-
|
296
|
|
- if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
|
297
|
|
- # ref is already on the server with the same tree
|
298
|
|
- continue
|
299
|
|
-
|
300
|
|
- except grpc.RpcError as e:
|
301
|
|
- if e.code() != grpc.StatusCode.NOT_FOUND:
|
302
|
|
- # Intentionally re-raise RpcError for outer except block.
|
303
|
|
- raise
|
304
|
|
-
|
305
|
|
- missing_blobs = {}
|
306
|
|
- required_blobs = self._required_blobs(tree)
|
307
|
|
-
|
308
|
|
- # Limit size of FindMissingBlobs request
|
309
|
|
- for required_blobs_group in _grouper(required_blobs, 512):
|
310
|
|
- request = remote_execution_pb2.FindMissingBlobsRequest()
|
311
|
|
-
|
312
|
|
- for required_digest in required_blobs_group:
|
313
|
|
- d = request.blob_digests.add()
|
314
|
|
- d.hash = required_digest.hash
|
315
|
|
- d.size_bytes = required_digest.size_bytes
|
316
|
|
-
|
317
|
|
- response = remote.cas.FindMissingBlobs(request)
|
318
|
|
- for digest in response.missing_blob_digests:
|
319
|
|
- d = remote_execution_pb2.Digest()
|
320
|
|
- d.hash = digest.hash
|
321
|
|
- d.size_bytes = digest.size_bytes
|
322
|
|
- missing_blobs[d.hash] = d
|
323
|
|
-
|
324
|
|
- # Upload any blobs missing on the server
|
325
|
|
- skipped_remote = False
|
326
|
|
- for digest in missing_blobs.values():
|
327
|
|
- uuid_ = uuid.uuid4()
|
328
|
|
- resource_name = '/'.join(['uploads', str(uuid_), 'blobs',
|
329
|
|
- digest.hash, str(digest.size_bytes)])
|
330
|
|
-
|
331
|
|
- def request_stream(resname):
|
332
|
|
- with open(self.objpath(digest), 'rb') as f:
|
333
|
|
- assert os.fstat(f.fileno()).st_size == digest.size_bytes
|
334
|
|
- offset = 0
|
335
|
|
- finished = False
|
336
|
|
- remaining = digest.size_bytes
|
337
|
|
- while not finished:
|
338
|
|
- chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
|
339
|
|
- remaining -= chunk_size
|
340
|
|
-
|
341
|
|
- request = bytestream_pb2.WriteRequest()
|
342
|
|
- request.write_offset = offset
|
343
|
|
- # max. _MAX_PAYLOAD_BYTES chunks
|
344
|
|
- request.data = f.read(chunk_size)
|
345
|
|
- request.resource_name = resname
|
346
|
|
- request.finish_write = remaining <= 0
|
347
|
|
- yield request
|
348
|
|
- offset += chunk_size
|
349
|
|
- finished = request.finish_write
|
350
|
|
- response = remote.bytestream.Write(request_stream(resource_name))
|
351
|
|
-
|
352
|
|
- request = buildstream_pb2.UpdateReferenceRequest()
|
353
|
|
- request.keys.append(ref)
|
354
|
|
- request.digest.hash = tree.hash
|
355
|
|
- request.digest.size_bytes = tree.size_bytes
|
356
|
|
- remote.ref_storage.UpdateReference(request)
|
357
|
|
-
|
358
|
|
- pushed = True
|
359
|
|
-
|
360
|
|
- if not skipped_remote:
|
361
|
|
- element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
|
362
|
|
-
|
363
|
|
- except grpc.RpcError as e:
|
364
|
|
- if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
|
365
|
|
- raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
|
366
|
|
-
|
367
|
|
- if skipped_remote:
|
|
325
|
+ if self._push_refs_to_remote(refs, remote):
|
|
326
|
+ element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
|
|
327
|
+ pushed = True
|
|
328
|
+ else:
|
368
|
329
|
self.context.message(Message(
|
369
|
330
|
None,
|
370
|
331
|
MessageType.INFO,
|
371
|
332
|
"Remote ({}) already has {} cached".format(
|
372
|
333
|
remote.spec.url, element._get_brief_display_key())
|
373
|
334
|
))
|
|
335
|
+
|
374
|
336
|
return pushed
|
375
|
337
|
|
376
|
338
|
################################################
|
... |
... |
@@ -599,6 +561,7 @@ class CASCache(ArtifactCache): |
599
|
561
|
################################################
|
600
|
562
|
# Local Private Methods #
|
601
|
563
|
################################################
|
|
564
|
+
|
602
|
565
|
def _checkout(self, dest, tree):
|
603
|
566
|
os.makedirs(dest, exist_ok=True)
|
604
|
567
|
|
... |
... |
@@ -776,16 +739,16 @@ class CASCache(ArtifactCache): |
776
|
739
|
#
|
777
|
740
|
q.put(str(e))
|
778
|
741
|
|
779
|
|
- def _required_blobs(self, tree):
|
|
742
|
+ def _required_blobs(self, directory_digest):
|
780
|
743
|
# parse directory, and recursively add blobs
|
781
|
744
|
d = remote_execution_pb2.Digest()
|
782
|
|
- d.hash = tree.hash
|
783
|
|
- d.size_bytes = tree.size_bytes
|
|
745
|
+ d.hash = directory_digest.hash
|
|
746
|
+ d.size_bytes = directory_digest.size_bytes
|
784
|
747
|
yield d
|
785
|
748
|
|
786
|
749
|
directory = remote_execution_pb2.Directory()
|
787
|
750
|
|
788
|
|
- with open(self.objpath(tree), 'rb') as f:
|
|
751
|
+ with open(self.objpath(directory_digest), 'rb') as f:
|
789
|
752
|
directory.ParseFromString(f.read())
|
790
|
753
|
|
791
|
754
|
for filenode in directory.files:
|
... |
... |
@@ -797,16 +760,16 @@ class CASCache(ArtifactCache): |
797
|
760
|
for dirnode in directory.directories:
|
798
|
761
|
yield from self._required_blobs(dirnode.digest)
|
799
|
762
|
|
800
|
|
- def _fetch_blob(self, remote, digest, out):
|
|
763
|
+ def _fetch_blob(self, remote, digest, stream):
|
801
|
764
|
resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
|
802
|
765
|
request = bytestream_pb2.ReadRequest()
|
803
|
766
|
request.resource_name = resource_name
|
804
|
767
|
request.read_offset = 0
|
805
|
768
|
for response in remote.bytestream.Read(request):
|
806
|
|
- out.write(response.data)
|
|
769
|
+ stream.write(response.data)
|
|
770
|
+ stream.flush()
|
807
|
771
|
|
808
|
|
- out.flush()
|
809
|
|
- assert digest.size_bytes == os.fstat(out.fileno()).st_size
|
|
772
|
+ assert digest.size_bytes == os.fstat(stream.fileno()).st_size
|
810
|
773
|
|
811
|
774
|
# _ensure_blob():
|
812
|
775
|
#
|
... |
... |
@@ -922,6 +885,79 @@ class CASCache(ArtifactCache): |
922
|
885
|
# Fetch final batch
|
923
|
886
|
self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
|
924
|
887
|
|
|
888
|
+ def _send_blob(self, remote, digest, stream, u_uid=uuid.uuid4()):
|
|
889
|
+ resource_name = '/'.join(['uploads', str(u_uid), 'blobs',
|
|
890
|
+ digest.hash, str(digest.size_bytes)])
|
|
891
|
+
|
|
892
|
+ def request_stream(resname, instream):
|
|
893
|
+ offset = 0
|
|
894
|
+ finished = False
|
|
895
|
+ remaining = digest.size_bytes
|
|
896
|
+ while not finished:
|
|
897
|
+ chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
|
|
898
|
+ remaining -= chunk_size
|
|
899
|
+
|
|
900
|
+ request = bytestream_pb2.WriteRequest()
|
|
901
|
+ request.write_offset = offset
|
|
902
|
+ # max. _MAX_PAYLOAD_BYTES chunks
|
|
903
|
+ request.data = instream.read(chunk_size)
|
|
904
|
+ request.resource_name = resname
|
|
905
|
+ request.finish_write = remaining <= 0
|
|
906
|
+
|
|
907
|
+ yield request
|
|
908
|
+
|
|
909
|
+ offset += chunk_size
|
|
910
|
+ finished = request.finish_write
|
|
911
|
+
|
|
912
|
+ response = remote.bytestream.Write(request_stream(resource_name, stream))
|
|
913
|
+
|
|
914
|
+ assert response.committed_size == digest.size_bytes
|
|
915
|
+
|
|
916
|
+ def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
|
|
917
|
+ required_blobs = self._required_blobs(digest)
|
|
918
|
+
|
|
919
|
+ missing_blobs = dict()
|
|
920
|
+ # Limit size of FindMissingBlobs request
|
|
921
|
+ for required_blobs_group in _grouper(required_blobs, 512):
|
|
922
|
+ request = remote_execution_pb2.FindMissingBlobsRequest()
|
|
923
|
+
|
|
924
|
+ for required_digest in required_blobs_group:
|
|
925
|
+ d = request.blob_digests.add()
|
|
926
|
+ d.hash = required_digest.hash
|
|
927
|
+ d.size_bytes = required_digest.size_bytes
|
|
928
|
+
|
|
929
|
+ response = remote.cas.FindMissingBlobs(request)
|
|
930
|
+ for missing_digest in response.missing_blob_digests:
|
|
931
|
+ d = remote_execution_pb2.Digest()
|
|
932
|
+ d.hash = missing_digest.hash
|
|
933
|
+ d.size_bytes = missing_digest.size_bytes
|
|
934
|
+ missing_blobs[d.hash] = d
|
|
935
|
+
|
|
936
|
+ # Upload any blobs missing on the server
|
|
937
|
+ self._send_blobs(remote, missing_blobs.values(), u_uid)
|
|
938
|
+
|
|
939
|
+ def _send_blobs(self, remote, digests, u_uid=uuid.uuid4()):
|
|
940
|
+ batch = _CASBatchUpdate(remote)
|
|
941
|
+
|
|
942
|
+ for digest in digests:
|
|
943
|
+ with open(self.objpath(digest), 'rb') as f:
|
|
944
|
+ assert os.fstat(f.fileno()).st_size == digest.size_bytes
|
|
945
|
+
|
|
946
|
+ if (digest.size_bytes >= remote.max_batch_total_size_bytes or
|
|
947
|
+ not remote.batch_update_supported):
|
|
948
|
+ # Too large for batch request, upload in independent request.
|
|
949
|
+ self._send_blob(remote, digest, f, u_uid=u_uid)
|
|
950
|
+ else:
|
|
951
|
+ if not batch.add(digest, f):
|
|
952
|
+ # Not enough space left in batch request.
|
|
953
|
+ # Complete pending batch first.
|
|
954
|
+ batch.send()
|
|
955
|
+ batch = _CASBatchUpdate(remote)
|
|
956
|
+ batch.add(digest, f)
|
|
957
|
+
|
|
958
|
+ # Send final batch
|
|
959
|
+ batch.send()
|
|
960
|
+
|
925
|
961
|
|
926
|
962
|
# Represents a single remote CAS cache.
|
927
|
963
|
#
|
... |
... |
@@ -995,6 +1031,17 @@ class _CASRemote(): |
995
|
1031
|
if e.code() != grpc.StatusCode.UNIMPLEMENTED:
|
996
|
1032
|
raise
|
997
|
1033
|
|
|
1034
|
+ # Check whether the server supports BatchUpdateBlobs()
|
|
1035
|
+ self.batch_update_supported = False
|
|
1036
|
+ try:
|
|
1037
|
+ request = remote_execution_pb2.BatchUpdateBlobsRequest()
|
|
1038
|
+ response = self.cas.BatchUpdateBlobs(request)
|
|
1039
|
+ self.batch_update_supported = True
|
|
1040
|
+ except grpc.RpcError as e:
|
|
1041
|
+ if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
|
|
1042
|
+ e.code() != grpc.StatusCode.PERMISSION_DENIED):
|
|
1043
|
+ raise
|
|
1044
|
+
|
998
|
1045
|
self._initialized = True
|
999
|
1046
|
|
1000
|
1047
|
|
... |
... |
@@ -1042,6 +1089,46 @@ class _CASBatchRead(): |
1042
|
1089
|
yield (response.digest, response.data)
|
1043
|
1090
|
|
1044
|
1091
|
|
|
1092
|
+# Represents a batch of blobs queued for upload.
|
|
1093
|
+#
|
|
1094
|
+class _CASBatchUpdate():
|
|
1095
|
+ def __init__(self, remote):
|
|
1096
|
+ self._remote = remote
|
|
1097
|
+ self._max_total_size_bytes = remote.max_batch_total_size_bytes
|
|
1098
|
+ self._request = remote_execution_pb2.BatchUpdateBlobsRequest()
|
|
1099
|
+ self._size = 0
|
|
1100
|
+ self._sent = False
|
|
1101
|
+
|
|
1102
|
+ def add(self, digest, stream):
|
|
1103
|
+ assert not self._sent
|
|
1104
|
+
|
|
1105
|
+ new_batch_size = self._size + digest.size_bytes
|
|
1106
|
+ if new_batch_size > self._max_total_size_bytes:
|
|
1107
|
+ # Not enough space left in current batch
|
|
1108
|
+ return False
|
|
1109
|
+
|
|
1110
|
+ blob_request = self._request.requests.add()
|
|
1111
|
+ blob_request.digest.hash = digest.hash
|
|
1112
|
+ blob_request.digest.size_bytes = digest.size_bytes
|
|
1113
|
+ blob_request.data = stream.read(digest.size_bytes)
|
|
1114
|
+ self._size = new_batch_size
|
|
1115
|
+ return True
|
|
1116
|
+
|
|
1117
|
+ def send(self):
|
|
1118
|
+ assert not self._sent
|
|
1119
|
+ self._sent = True
|
|
1120
|
+
|
|
1121
|
+ if len(self._request.requests) == 0:
|
|
1122
|
+ return
|
|
1123
|
+
|
|
1124
|
+ batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
|
|
1125
|
+
|
|
1126
|
+ for response in batch_response.responses:
|
|
1127
|
+ if response.status.code != grpc.StatusCode.OK.value[0]:
|
|
1128
|
+ raise ArtifactError("Failed to upload blob {}: {}".format(
|
|
1129
|
+ response.digest.hash, response.status.code))
|
|
1130
|
+
|
|
1131
|
+
|
1045
|
1132
|
def _grouper(iterable, n):
|
1046
|
1133
|
while True:
|
1047
|
1134
|
try:
|