... |
... |
@@ -43,6 +43,11 @@ from .._exceptions import ArtifactError |
43
|
43
|
from . import ArtifactCache
|
44
|
44
|
|
45
|
45
|
|
|
46
|
+# The default limit for gRPC messages is 4 MiB.
|
|
47
|
+# Limit payload to 1 MiB to leave sufficient headroom for metadata.
|
|
48
|
+_MAX_PAYLOAD_BYTES = 1024 * 1024
|
|
49
|
+
|
|
50
|
+
|
46
|
51
|
# A CASCache manages artifacts in a CAS repository as specified in the
|
47
|
52
|
# Remote Execution API.
|
48
|
53
|
#
|
... |
... |
@@ -115,7 +120,7 @@ class CASCache(ArtifactCache): |
115
|
120
|
def commit(self, element, content, keys):
|
116
|
121
|
refs = [self.get_artifact_fullname(element, key) for key in keys]
|
117
|
122
|
|
118
|
|
- tree = self._create_tree(content)
|
|
123
|
+ tree = self._commit_directory(content)
|
119
|
124
|
|
120
|
125
|
for ref in refs:
|
121
|
126
|
self.set_ref(ref, tree)
|
... |
... |
@@ -330,12 +335,12 @@ class CASCache(ArtifactCache): |
330
|
335
|
finished = False
|
331
|
336
|
remaining = digest.size_bytes
|
332
|
337
|
while not finished:
|
333
|
|
- chunk_size = min(remaining, 64 * 1024)
|
|
338
|
+ chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
|
334
|
339
|
remaining -= chunk_size
|
335
|
340
|
|
336
|
341
|
request = bytestream_pb2.WriteRequest()
|
337
|
342
|
request.write_offset = offset
|
338
|
|
- # max. 64 kB chunks
|
|
343
|
+ # max. _MAX_PAYLOAD_BYTES chunks
|
339
|
344
|
request.data = f.read(chunk_size)
|
340
|
345
|
request.resource_name = resname
|
341
|
346
|
request.finish_write = remaining <= 0
|
... |
... |
@@ -623,7 +628,21 @@ class CASCache(ArtifactCache): |
623
|
628
|
def _refpath(self, ref):
|
624
|
629
|
return os.path.join(self.casdir, 'refs', 'heads', ref)
|
625
|
630
|
|
626
|
|
- def _create_tree(self, path, *, digest=None):
|
|
631
|
+ # _commit_directory():
|
|
632
|
+ #
|
|
633
|
+ # Adds local directory to content addressable store.
|
|
634
|
+ #
|
|
635
|
+ # Adds files, symbolic links and recursively other directories in
|
|
636
|
+ # a local directory to the content addressable store.
|
|
637
|
+ #
|
|
638
|
+ # Args:
|
|
639
|
+ # path (str): Path to the directory to add.
|
|
640
|
+ # dir_digest (Digest): An optional Digest object to use.
|
|
641
|
+ #
|
|
642
|
+ # Returns:
|
|
643
|
+ # (Digest): Digest object for the directory added.
|
|
644
|
+ #
|
|
645
|
+ def _commit_directory(self, path, *, dir_digest=None):
|
627
|
646
|
directory = remote_execution_pb2.Directory()
|
628
|
647
|
|
629
|
648
|
for name in sorted(os.listdir(path)):
|
... |
... |
@@ -632,7 +651,7 @@ class CASCache(ArtifactCache): |
632
|
651
|
if stat.S_ISDIR(mode):
|
633
|
652
|
dirnode = directory.directories.add()
|
634
|
653
|
dirnode.name = name
|
635
|
|
- self._create_tree(full_path, digest=dirnode.digest)
|
|
654
|
+ self._commit_directory(full_path, dir_digest=dirnode.digest)
|
636
|
655
|
elif stat.S_ISREG(mode):
|
637
|
656
|
filenode = directory.files.add()
|
638
|
657
|
filenode.name = name
|
... |
... |
@@ -645,7 +664,8 @@ class CASCache(ArtifactCache): |
645
|
664
|
else:
|
646
|
665
|
raise ArtifactError("Unsupported file type for {}".format(full_path))
|
647
|
666
|
|
648
|
|
- return self.add_object(digest=digest, buffer=directory.SerializeToString())
|
|
667
|
+ return self.add_object(digest=dir_digest,
|
|
668
|
+ buffer=directory.SerializeToString())
|
649
|
669
|
|
650
|
670
|
def _get_subdir(self, tree, subdir):
|
651
|
671
|
head, name = os.path.split(subdir)
|
... |
... |
@@ -788,39 +808,119 @@ class CASCache(ArtifactCache): |
788
|
808
|
out.flush()
|
789
|
809
|
assert digest.size_bytes == os.fstat(out.fileno()).st_size
|
790
|
810
|
|
791
|
|
- def _fetch_directory(self, remote, tree):
|
792
|
|
- objpath = self.objpath(tree)
|
|
811
|
+ # _ensure_blob():
|
|
812
|
+ #
|
|
813
|
+ # Fetch and add blob if it's not already local.
|
|
814
|
+ #
|
|
815
|
+ # Args:
|
|
816
|
+ # remote (Remote): The remote to use.
|
|
817
|
+ # digest (Digest): Digest object for the blob to fetch.
|
|
818
|
+ #
|
|
819
|
+ # Returns:
|
|
820
|
+ # (str): The path of the object
|
|
821
|
+ #
|
|
822
|
+ def _ensure_blob(self, remote, digest):
|
|
823
|
+ objpath = self.objpath(digest)
|
793
|
824
|
if os.path.exists(objpath):
|
794
|
|
- # already in local cache
|
795
|
|
- return
|
|
825
|
+ # already in local repository
|
|
826
|
+ return objpath
|
796
|
827
|
|
797
|
|
- with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
|
798
|
|
- self._fetch_blob(remote, tree, out)
|
|
828
|
+ with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
|
|
829
|
+ self._fetch_blob(remote, digest, f)
|
799
|
830
|
|
800
|
|
- directory = remote_execution_pb2.Directory()
|
|
831
|
+ added_digest = self.add_object(path=f.name)
|
|
832
|
+ assert added_digest.hash == digest.hash
|
801
|
833
|
|
802
|
|
- with open(out.name, 'rb') as f:
|
803
|
|
- directory.ParseFromString(f.read())
|
|
834
|
+ return objpath
|
804
|
835
|
|
805
|
|
- for filenode in directory.files:
|
806
|
|
- fileobjpath = self.objpath(tree)
|
807
|
|
- if os.path.exists(fileobjpath):
|
808
|
|
- # already in local cache
|
809
|
|
- continue
|
|
836
|
+ def _batch_download_complete(self, batch):
|
|
837
|
+ for digest, data in batch.send():
|
|
838
|
+ with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
|
|
839
|
+ f.write(data)
|
|
840
|
+ f.flush()
|
|
841
|
+
|
|
842
|
+ added_digest = self.add_object(path=f.name)
|
|
843
|
+ assert added_digest.hash == digest.hash
|
|
844
|
+
|
|
845
|
+ # Helper function for _fetch_directory().
|
|
846
|
+ def _fetch_directory_batch(self, remote, batch, fetch_queue, fetch_next_queue):
|
|
847
|
+ self._batch_download_complete(batch)
|
810
|
848
|
|
811
|
|
- with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
|
812
|
|
- self._fetch_blob(remote, filenode.digest, f)
|
|
849
|
+ # All previously scheduled directories are now locally available,
|
|
850
|
+ # move them to the processing queue.
|
|
851
|
+ fetch_queue.extend(fetch_next_queue)
|
|
852
|
+ fetch_next_queue.clear()
|
|
853
|
+ return _CASBatchRead(remote)
|
813
|
854
|
|
814
|
|
- digest = self.add_object(path=f.name)
|
815
|
|
- assert digest.hash == filenode.digest.hash
|
|
855
|
+ # Helper function for _fetch_directory().
|
|
856
|
+ def _fetch_directory_node(self, remote, digest, batch, fetch_queue, fetch_next_queue, *, recursive=False):
|
|
857
|
+ in_local_cache = os.path.exists(self.objpath(digest))
|
|
858
|
+
|
|
859
|
+ if in_local_cache:
|
|
860
|
+ # Skip download, already in local cache.
|
|
861
|
+ pass
|
|
862
|
+ elif (digest.size_bytes >= remote.max_batch_total_size_bytes or
|
|
863
|
+ not remote.batch_read_supported):
|
|
864
|
+ # Too large for batch request, download in independent request.
|
|
865
|
+ self._ensure_blob(remote, digest)
|
|
866
|
+ in_local_cache = True
|
|
867
|
+ else:
|
|
868
|
+ if not batch.add(digest):
|
|
869
|
+ # Not enough space left in batch request.
|
|
870
|
+ # Complete pending batch first.
|
|
871
|
+ batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
|
|
872
|
+ batch.add(digest)
|
|
873
|
+
|
|
874
|
+ if recursive:
|
|
875
|
+ if in_local_cache:
|
|
876
|
+ # Add directory to processing queue.
|
|
877
|
+ fetch_queue.append(digest)
|
|
878
|
+ else:
|
|
879
|
+ # Directory will be available after completing pending batch.
|
|
880
|
+ # Add directory to deferred processing queue.
|
|
881
|
+ fetch_next_queue.append(digest)
|
|
882
|
+
|
|
883
|
+ return batch
|
|
884
|
+
|
|
885
|
+ # _fetch_directory():
|
|
886
|
+ #
|
|
887
|
+ # Fetches remote directory and adds it to content addressable store.
|
|
888
|
+ #
|
|
889
|
+ # Fetches files, symbolic links and recursively other directories in
|
|
890
|
+ # the remote directory and adds them to the content addressable
|
|
891
|
+ # store.
|
|
892
|
+ #
|
|
893
|
+ # Args:
|
|
894
|
+ # remote (Remote): The remote to use.
|
|
895
|
+ # dir_digest (Digest): Digest object for the directory to fetch.
|
|
896
|
+ #
|
|
897
|
+ def _fetch_directory(self, remote, dir_digest):
|
|
898
|
+ fetch_queue = [dir_digest]
|
|
899
|
+ fetch_next_queue = []
|
|
900
|
+ batch = _CASBatchRead(remote)
|
|
901
|
+
|
|
902
|
+ while len(fetch_queue) + len(fetch_next_queue) > 0:
|
|
903
|
+ if len(fetch_queue) == 0:
|
|
904
|
+ batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
|
|
905
|
+
|
|
906
|
+ dir_digest = fetch_queue.pop(0)
|
|
907
|
+
|
|
908
|
+ objpath = self._ensure_blob(remote, dir_digest)
|
|
909
|
+
|
|
910
|
+ directory = remote_execution_pb2.Directory()
|
|
911
|
+ with open(objpath, 'rb') as f:
|
|
912
|
+ directory.ParseFromString(f.read())
|
816
|
913
|
|
817
|
914
|
for dirnode in directory.directories:
|
818
|
|
- self._fetch_directory(remote, dirnode.digest)
|
|
915
|
+ batch = self._fetch_directory_node(remote, dirnode.digest, batch,
|
|
916
|
+ fetch_queue, fetch_next_queue, recursive=True)
|
|
917
|
+
|
|
918
|
+ for filenode in directory.files:
|
|
919
|
+ batch = self._fetch_directory_node(remote, filenode.digest, batch,
|
|
920
|
+ fetch_queue, fetch_next_queue)
|
819
|
921
|
|
820
|
|
- # place directory blob only in final location when we've downloaded
|
821
|
|
- # all referenced blobs to avoid dangling references in the repository
|
822
|
|
- digest = self.add_object(path=out.name)
|
823
|
|
- assert digest.hash == tree.hash
|
|
922
|
+ # Fetch final batch
|
|
923
|
+ self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
|
824
|
924
|
|
825
|
925
|
|
826
|
926
|
# Represents a single remote CAS cache.
|
... |
... |
@@ -870,11 +970,78 @@ class _CASRemote(): |
870
|
970
|
|
871
|
971
|
self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel)
|
872
|
972
|
self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
|
|
973
|
+ self.capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self.channel)
|
873
|
974
|
self.ref_storage = buildstream_pb2_grpc.ReferenceStorageStub(self.channel)
|
874
|
975
|
|
|
976
|
+ self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
|
|
977
|
+ try:
|
|
978
|
+ request = remote_execution_pb2.GetCapabilitiesRequest()
|
|
979
|
+ response = self.capabilities.GetCapabilities(request)
|
|
980
|
+ server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes
|
|
981
|
+ if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes:
|
|
982
|
+ self.max_batch_total_size_bytes = server_max_batch_total_size_bytes
|
|
983
|
+ except grpc.RpcError as e:
|
|
984
|
+ # Simply use the defaults for servers that don't implement GetCapabilities()
|
|
985
|
+ if e.code() != grpc.StatusCode.UNIMPLEMENTED:
|
|
986
|
+ raise
|
|
987
|
+
|
|
988
|
+ # Check whether the server supports BatchReadBlobs()
|
|
989
|
+ self.batch_read_supported = False
|
|
990
|
+ try:
|
|
991
|
+ request = remote_execution_pb2.BatchReadBlobsRequest()
|
|
992
|
+ response = self.cas.BatchReadBlobs(request)
|
|
993
|
+ self.batch_read_supported = True
|
|
994
|
+ except grpc.RpcError as e:
|
|
995
|
+ if e.code() != grpc.StatusCode.UNIMPLEMENTED:
|
|
996
|
+ raise
|
|
997
|
+
|
875
|
998
|
self._initialized = True
|
876
|
999
|
|
877
|
1000
|
|
|
1001
|
+# Represents a batch of blobs queued for fetching.
|
|
1002
|
+#
|
|
1003
|
+class _CASBatchRead():
|
|
1004
|
+ def __init__(self, remote):
|
|
1005
|
+ self._remote = remote
|
|
1006
|
+ self._max_total_size_bytes = remote.max_batch_total_size_bytes
|
|
1007
|
+ self._request = remote_execution_pb2.BatchReadBlobsRequest()
|
|
1008
|
+ self._size = 0
|
|
1009
|
+ self._sent = False
|
|
1010
|
+
|
|
1011
|
+ def add(self, digest):
|
|
1012
|
+ assert not self._sent
|
|
1013
|
+
|
|
1014
|
+ new_batch_size = self._size + digest.size_bytes
|
|
1015
|
+ if new_batch_size > self._max_total_size_bytes:
|
|
1016
|
+ # Not enough space left in current batch
|
|
1017
|
+ return False
|
|
1018
|
+
|
|
1019
|
+ request_digest = self._request.digests.add()
|
|
1020
|
+ request_digest.hash = digest.hash
|
|
1021
|
+ request_digest.size_bytes = digest.size_bytes
|
|
1022
|
+ self._size = new_batch_size
|
|
1023
|
+ return True
|
|
1024
|
+
|
|
1025
|
+ def send(self):
|
|
1026
|
+ assert not self._sent
|
|
1027
|
+ self._sent = True
|
|
1028
|
+
|
|
1029
|
+ if len(self._request.digests) == 0:
|
|
1030
|
+ return
|
|
1031
|
+
|
|
1032
|
+ batch_response = self._remote.cas.BatchReadBlobs(self._request)
|
|
1033
|
+
|
|
1034
|
+ for response in batch_response.responses:
|
|
1035
|
+ if response.status.code != grpc.StatusCode.OK.value[0]:
|
|
1036
|
+ raise ArtifactError("Failed to download blob {}: {}".format(
|
|
1037
|
+ response.digest.hash, response.status.code))
|
|
1038
|
+ if response.digest.size_bytes != len(response.data):
|
|
1039
|
+ raise ArtifactError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
|
|
1040
|
+ response.digest.hash, response.digest.size_bytes, len(response.data)))
|
|
1041
|
+
|
|
1042
|
+ yield (response.digest, response.data)
|
|
1043
|
+
|
|
1044
|
+
|
878
|
1045
|
def _grouper(iterable, n):
|
879
|
1046
|
while True:
|
880
|
1047
|
try:
|