... |
... |
@@ -33,11 +33,11 @@ import grpc |
33
|
33
|
|
34
|
34
|
from .. import _yaml
|
35
|
35
|
|
|
36
|
+from .._protos.google.rpc import code_pb2
|
36
|
37
|
from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
|
37
|
38
|
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
38
|
39
|
from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
|
39
|
40
|
|
40
|
|
-from .._message import MessageType, Message
|
41
|
41
|
from .. import _signals, utils
|
42
|
42
|
from .._exceptions import ArtifactError
|
43
|
43
|
|
... |
... |
@@ -81,8 +81,9 @@ class CASCache(ArtifactCache): |
81
|
81
|
################################################
|
82
|
82
|
|
83
|
83
|
def preflight(self):
|
84
|
|
- if (not os.path.isdir(os.path.join(self.casdir, 'refs', 'heads')) or
|
85
|
|
- not os.path.isdir(os.path.join(self.casdir, 'objects'))):
|
|
84
|
+ headdir = os.path.join(self.casdir, 'refs', 'heads')
|
|
85
|
+ objdir = os.path.join(self.casdir, 'objects')
|
|
86
|
+ if not (os.path.isdir(headdir) and os.path.isdir(objdir)):
|
86
|
87
|
raise ArtifactError("CAS repository check failed for '{}'"
|
87
|
88
|
.format(self.casdir))
|
88
|
89
|
|
... |
... |
@@ -918,7 +919,7 @@ class CASCache(ArtifactCache): |
918
|
919
|
# Skip download, already in local cache.
|
919
|
920
|
pass
|
920
|
921
|
elif (digest.size_bytes >= remote.max_batch_total_size_bytes or
|
921
|
|
- not remote.batch_read_supported):
|
|
922
|
+ not remote.batch_read_supported):
|
922
|
923
|
# Too large for batch request, download in independent request.
|
923
|
924
|
self._ensure_blob(remote, digest)
|
924
|
925
|
in_local_cache = True
|
... |
... |
@@ -958,7 +959,7 @@ class CASCache(ArtifactCache): |
958
|
959
|
batch = _CASBatchRead(remote)
|
959
|
960
|
|
960
|
961
|
while len(fetch_queue) + len(fetch_next_queue) > 0:
|
961
|
|
- if len(fetch_queue) == 0:
|
|
962
|
+ if not fetch_queue:
|
962
|
963
|
batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
|
963
|
964
|
|
964
|
965
|
dir_digest = fetch_queue.pop(0)
|
... |
... |
@@ -1087,6 +1088,10 @@ class _CASRemote(): |
1087
|
1088
|
self.bytestream = None
|
1088
|
1089
|
self.cas = None
|
1089
|
1090
|
self.ref_storage = None
|
|
1091
|
+ self.batch_update_supported = None
|
|
1092
|
+ self.batch_read_supported = None
|
|
1093
|
+ self.capabilities = None
|
|
1094
|
+ self.max_batch_total_size_bytes = None
|
1090
|
1095
|
|
1091
|
1096
|
def init(self):
|
1092
|
1097
|
if not self._initialized:
|
... |
... |
@@ -1191,13 +1196,13 @@ class _CASBatchRead(): |
1191
|
1196
|
assert not self._sent
|
1192
|
1197
|
self._sent = True
|
1193
|
1198
|
|
1194
|
|
- if len(self._request.digests) == 0:
|
|
1199
|
+ if not self._request.digests:
|
1195
|
1200
|
return
|
1196
|
1201
|
|
1197
|
1202
|
batch_response = self._remote.cas.BatchReadBlobs(self._request)
|
1198
|
1203
|
|
1199
|
1204
|
for response in batch_response.responses:
|
1200
|
|
- if response.status.code != grpc.StatusCode.OK.value[0]:
|
|
1205
|
+ if response.status.code != code_pb2.OK:
|
1201
|
1206
|
raise ArtifactError("Failed to download blob {}: {}".format(
|
1202
|
1207
|
response.digest.hash, response.status.code))
|
1203
|
1208
|
if response.digest.size_bytes != len(response.data):
|
... |
... |
@@ -1236,13 +1241,13 @@ class _CASBatchUpdate(): |
1236
|
1241
|
assert not self._sent
|
1237
|
1242
|
self._sent = True
|
1238
|
1243
|
|
1239
|
|
- if len(self._request.requests) == 0:
|
|
1244
|
+ if not self._request.requests:
|
1240
|
1245
|
return
|
1241
|
1246
|
|
1242
|
1247
|
batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
|
1243
|
1248
|
|
1244
|
1249
|
for response in batch_response.responses:
|
1245
|
|
- if response.status.code != grpc.StatusCode.OK.value[0]:
|
|
1250
|
+ if response.status.code != code_pb2.OK:
|
1246
|
1251
|
raise ArtifactError("Failed to upload blob {}: {}".format(
|
1247
|
1252
|
response.digest.hash, response.status.code))
|
1248
|
1253
|
|