... |
... |
@@ -44,6 +44,11 @@ from .._exceptions import ArtifactError |
44
|
44
|
from . import ArtifactCache
|
45
|
45
|
|
46
|
46
|
|
|
47
|
+# The default limit for gRPC messages is 4 MiB.
|
|
48
|
+# Limit payload to 1 MiB to leave sufficient headroom for metadata.
|
|
49
|
+_MAX_PAYLOAD_BYTES = 1024 * 1024
|
|
50
|
+
|
|
51
|
+
|
47
|
52
|
# A CASCache manages artifacts in a CAS repository as specified in the
|
48
|
53
|
# Remote Execution API.
|
49
|
54
|
#
|
... |
... |
@@ -854,6 +859,80 @@ class CASCache(ArtifactCache): |
854
|
859
|
|
855
|
860
|
assert digest.size_bytes == os.fstat(stream.fileno()).st_size
|
856
|
861
|
|
|
862
|
+ # _ensure_blob():
|
|
863
|
+ #
|
|
864
|
+ # Fetch and add blob if it's not already local.
|
|
865
|
+ #
|
|
866
|
+ # Args:
|
|
867
|
+ # remote (Remote): The remote to use.
|
|
868
|
+ # digest (Digest): Digest object for the blob to fetch.
|
|
869
|
+ #
|
|
870
|
+ # Returns:
|
|
871
|
+ # (str): The path of the object
|
|
872
|
+ #
|
|
873
|
+ def _ensure_blob(self, remote, digest):
|
|
874
|
+ objpath = self.objpath(digest)
|
|
875
|
+ if os.path.exists(objpath):
|
|
876
|
+ # already in local repository
|
|
877
|
+ return objpath
|
|
878
|
+
|
|
879
|
+ with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
|
|
880
|
+ self._fetch_blob(remote, digest, f)
|
|
881
|
+
|
|
882
|
+ added_digest = self.add_object(path=f.name)
|
|
883
|
+ assert added_digest.hash == digest.hash
|
|
884
|
+
|
|
885
|
+ return objpath
|
|
886
|
+
|
|
887
|
+ def _batch_download_complete(self, batch):
|
|
888
|
+ for digest, data in batch.send():
|
|
889
|
+ with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
|
|
890
|
+ f.write(data)
|
|
891
|
+ f.flush()
|
|
892
|
+
|
|
893
|
+ added_digest = self.add_object(path=f.name)
|
|
894
|
+ assert added_digest.hash == digest.hash
|
|
895
|
+
|
|
896
|
+ # Helper function for _fetch_directory().
|
|
897
|
+ def _fetch_directory_batch(self, remote, batch, fetch_queue, fetch_next_queue):
|
|
898
|
+ self._batch_download_complete(batch)
|
|
899
|
+
|
|
900
|
+ # All previously scheduled directories are now locally available,
|
|
901
|
+ # move them to the processing queue.
|
|
902
|
+ fetch_queue.extend(fetch_next_queue)
|
|
903
|
+ fetch_next_queue.clear()
|
|
904
|
+ return _CASBatchRead(remote)
|
|
905
|
+
|
|
906
|
+ # Helper function for _fetch_directory().
|
|
907
|
+ def _fetch_directory_node(self, remote, digest, batch, fetch_queue, fetch_next_queue, *, recursive=False):
|
|
908
|
+ in_local_cache = os.path.exists(self.objpath(digest))
|
|
909
|
+
|
|
910
|
+ if in_local_cache:
|
|
911
|
+ # Skip download, already in local cache.
|
|
912
|
+ pass
|
|
913
|
+ elif (digest.size_bytes >= remote.max_batch_total_size_bytes or
|
|
914
|
+ not remote.batch_read_supported):
|
|
915
|
+ # Too large for batch request, download in independent request.
|
|
916
|
+ self._ensure_blob(remote, digest)
|
|
917
|
+ in_local_cache = True
|
|
918
|
+ else:
|
|
919
|
+ if not batch.add(digest):
|
|
920
|
+ # Not enough space left in batch request.
|
|
921
|
+ # Complete pending batch first.
|
|
922
|
+ batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
|
|
923
|
+ batch.add(digest)
|
|
924
|
+
|
|
925
|
+ if recursive:
|
|
926
|
+ if in_local_cache:
|
|
927
|
+ # Add directory to processing queue.
|
|
928
|
+ fetch_queue.append(digest)
|
|
929
|
+ else:
|
|
930
|
+ # Directory will be available after completing pending batch.
|
|
931
|
+ # Add directory to deferred processing queue.
|
|
932
|
+ fetch_next_queue.append(digest)
|
|
933
|
+
|
|
934
|
+ return batch
|
|
935
|
+
|
857
|
936
|
# _fetch_directory():
|
858
|
937
|
#
|
859
|
938
|
# Fetches remote directory and adds it to content addressable store.
|
... |
... |
@@ -867,39 +946,32 @@ class CASCache(ArtifactCache): |
867
|
946
|
# dir_digest (Digest): Digest object for the directory to fetch.
|
868
|
947
|
#
|
869
|
948
|
def _fetch_directory(self, remote, dir_digest):
|
870
|
|
- objpath = self.objpath(dir_digest)
|
871
|
|
- if os.path.exists(objpath):
|
872
|
|
- # already in local cache
|
873
|
|
- return
|
874
|
|
-
|
875
|
|
- with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
|
876
|
|
- self._fetch_blob(remote, dir_digest, out)
|
877
|
|
-
|
878
|
|
- directory = remote_execution_pb2.Directory()
|
|
949
|
+ fetch_queue = [dir_digest]
|
|
950
|
+ fetch_next_queue = []
|
|
951
|
+ batch = _CASBatchRead(remote)
|
879
|
952
|
|
880
|
|
- with open(out.name, 'rb') as f:
|
881
|
|
- directory.ParseFromString(f.read())
|
|
953
|
+ while len(fetch_queue) + len(fetch_next_queue) > 0:
|
|
954
|
+ if len(fetch_queue) == 0:
|
|
955
|
+ batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
|
882
|
956
|
|
883
|
|
- for filenode in directory.files:
|
884
|
|
- fileobjpath = self.objpath(filenode.digest)
|
885
|
|
- if os.path.exists(fileobjpath):
|
886
|
|
- # already in local cache
|
887
|
|
- continue
|
|
957
|
+ dir_digest = fetch_queue.pop(0)
|
888
|
958
|
|
889
|
|
- with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
|
890
|
|
- self._fetch_blob(remote, filenode.digest, f)
|
|
959
|
+ objpath = self._ensure_blob(remote, dir_digest)
|
891
|
960
|
|
892
|
|
- digest = self.add_object(path=f.name)
|
893
|
|
- assert digest.hash == filenode.digest.hash
|
|
961
|
+ directory = remote_execution_pb2.Directory()
|
|
962
|
+ with open(objpath, 'rb') as f:
|
|
963
|
+ directory.ParseFromString(f.read())
|
894
|
964
|
|
895
|
965
|
for dirnode in directory.directories:
|
896
|
|
- self._fetch_directory(remote, dirnode.digest)
|
|
966
|
+ batch = self._fetch_directory_node(remote, dirnode.digest, batch,
|
|
967
|
+ fetch_queue, fetch_next_queue, recursive=True)
|
|
968
|
+
|
|
969
|
+ for filenode in directory.files:
|
|
970
|
+ batch = self._fetch_directory_node(remote, filenode.digest, batch,
|
|
971
|
+ fetch_queue, fetch_next_queue)
|
897
|
972
|
|
898
|
|
- # Place directory blob only in final location when we've
|
899
|
|
- # downloaded all referenced blobs to avoid dangling
|
900
|
|
- # references in the repository.
|
901
|
|
- digest = self.add_object(path=out.name)
|
902
|
|
- assert digest.hash == dir_digest.hash
|
|
973
|
+ # Fetch final batch
|
|
974
|
+ self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
|
903
|
975
|
|
904
|
976
|
def _fetch_tree(self, remote, digest):
|
905
|
977
|
# download but do not store the Tree object
|
... |
... |
@@ -914,16 +986,7 @@ class CASCache(ArtifactCache): |
914
|
986
|
tree.children.extend([tree.root])
|
915
|
987
|
for directory in tree.children:
|
916
|
988
|
for filenode in directory.files:
|
917
|
|
- fileobjpath = self.objpath(filenode.digest)
|
918
|
|
- if os.path.exists(fileobjpath):
|
919
|
|
- # already in local cache
|
920
|
|
- continue
|
921
|
|
-
|
922
|
|
- with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
|
923
|
|
- self._fetch_blob(remote, filenode.digest, f)
|
924
|
|
-
|
925
|
|
- added_digest = self.add_object(path=f.name)
|
926
|
|
- assert added_digest.hash == filenode.digest.hash
|
|
989
|
+ self._ensure_blob(remote, filenode.digest)
|
927
|
990
|
|
928
|
991
|
# place directory blob only in final location when we've downloaded
|
929
|
992
|
# all referenced blobs to avoid dangling references in the repository
|
... |
... |
@@ -942,12 +1005,12 @@ class CASCache(ArtifactCache): |
942
|
1005
|
finished = False
|
943
|
1006
|
remaining = digest.size_bytes
|
944
|
1007
|
while not finished:
|
945
|
|
- chunk_size = min(remaining, 64 * 1024)
|
|
1008
|
+ chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
|
946
|
1009
|
remaining -= chunk_size
|
947
|
1010
|
|
948
|
1011
|
request = bytestream_pb2.WriteRequest()
|
949
|
1012
|
request.write_offset = offset
|
950
|
|
- # max. 64 kB chunks
|
|
1013
|
+ # max. _MAX_PAYLOAD_BYTES chunks
|
951
|
1014
|
request.data = instream.read(chunk_size)
|
952
|
1015
|
request.resource_name = resname
|
953
|
1016
|
request.finish_write = remaining <= 0
|
... |
... |
@@ -1035,11 +1098,78 @@ class _CASRemote(): |
1035
|
1098
|
|
1036
|
1099
|
self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel)
|
1037
|
1100
|
self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
|
|
1101
|
+ self.capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self.channel)
|
1038
|
1102
|
self.ref_storage = buildstream_pb2_grpc.ReferenceStorageStub(self.channel)
|
1039
|
1103
|
|
|
1104
|
+ self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
|
|
1105
|
+ try:
|
|
1106
|
+ request = remote_execution_pb2.GetCapabilitiesRequest()
|
|
1107
|
+ response = self.capabilities.GetCapabilities(request)
|
|
1108
|
+ server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes
|
|
1109
|
+ if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes:
|
|
1110
|
+ self.max_batch_total_size_bytes = server_max_batch_total_size_bytes
|
|
1111
|
+ except grpc.RpcError as e:
|
|
1112
|
+ # Simply use the defaults for servers that don't implement GetCapabilities()
|
|
1113
|
+ if e.code() != grpc.StatusCode.UNIMPLEMENTED:
|
|
1114
|
+ raise
|
|
1115
|
+
|
|
1116
|
+ # Check whether the server supports BatchReadBlobs()
|
|
1117
|
+ self.batch_read_supported = False
|
|
1118
|
+ try:
|
|
1119
|
+ request = remote_execution_pb2.BatchReadBlobsRequest()
|
|
1120
|
+ response = self.cas.BatchReadBlobs(request)
|
|
1121
|
+ self.batch_read_supported = True
|
|
1122
|
+ except grpc.RpcError as e:
|
|
1123
|
+ if e.code() != grpc.StatusCode.UNIMPLEMENTED:
|
|
1124
|
+ raise
|
|
1125
|
+
|
1040
|
1126
|
self._initialized = True
|
1041
|
1127
|
|
1042
|
1128
|
|
|
1129
|
+# Represents a batch of blobs queued for fetching.
|
|
1130
|
+#
|
|
1131
|
+class _CASBatchRead():
|
|
1132
|
+ def __init__(self, remote):
|
|
1133
|
+ self._remote = remote
|
|
1134
|
+ self._max_total_size_bytes = remote.max_batch_total_size_bytes
|
|
1135
|
+ self._request = remote_execution_pb2.BatchReadBlobsRequest()
|
|
1136
|
+ self._size = 0
|
|
1137
|
+ self._sent = False
|
|
1138
|
+
|
|
1139
|
+ def add(self, digest):
|
|
1140
|
+ assert not self._sent
|
|
1141
|
+
|
|
1142
|
+ new_batch_size = self._size + digest.size_bytes
|
|
1143
|
+ if new_batch_size > self._max_total_size_bytes:
|
|
1144
|
+ # Not enough space left in current batch
|
|
1145
|
+ return False
|
|
1146
|
+
|
|
1147
|
+ request_digest = self._request.digests.add()
|
|
1148
|
+ request_digest.hash = digest.hash
|
|
1149
|
+ request_digest.size_bytes = digest.size_bytes
|
|
1150
|
+ self._size = new_batch_size
|
|
1151
|
+ return True
|
|
1152
|
+
|
|
1153
|
+ def send(self):
|
|
1154
|
+ assert not self._sent
|
|
1155
|
+ self._sent = True
|
|
1156
|
+
|
|
1157
|
+ if len(self._request.digests) == 0:
|
|
1158
|
+ return
|
|
1159
|
+
|
|
1160
|
+ batch_response = self._remote.cas.BatchReadBlobs(self._request)
|
|
1161
|
+
|
|
1162
|
+ for response in batch_response.responses:
|
|
1163
|
+ if response.status.code != grpc.StatusCode.OK.value[0]:
|
|
1164
|
+ raise ArtifactError("Failed to download blob {}: {}".format(
|
|
1165
|
+ response.digest.hash, response.status.code))
|
|
1166
|
+ if response.digest.size_bytes != len(response.data):
|
|
1167
|
+ raise ArtifactError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
|
|
1168
|
+ response.digest.hash, response.digest.size_bytes, len(response.data)))
|
|
1169
|
+
|
|
1170
|
+ yield (response.digest, response.data)
|
|
1171
|
+
|
|
1172
|
+
|
1043
|
1173
|
def _grouper(iterable, n):
|
1044
|
1174
|
while True:
|
1045
|
1175
|
try:
|