Jürg Billeter pushed to branch master at BuildStream / buildstream
Commits:
-
26e1a3c7
by Jürg Billeter at 2018-10-01T14:58:06Z
-
f47895c0
by Jürg Billeter at 2018-10-01T14:58:06Z
-
cf00c0a1
by Jürg Billeter at 2018-10-01T15:32:30Z
2 changed files:
Changes:
... | ... | @@ -1048,10 +1048,29 @@ class CASCache(ArtifactCache): |
1048 | 1048 |
missing_blobs[d.hash] = d
|
1049 | 1049 |
|
1050 | 1050 |
# Upload any blobs missing on the server
|
1051 |
- for blob_digest in missing_blobs.values():
|
|
1052 |
- with open(self.objpath(blob_digest), 'rb') as f:
|
|
1053 |
- assert os.fstat(f.fileno()).st_size == blob_digest.size_bytes
|
|
1054 |
- self._send_blob(remote, blob_digest, f, u_uid=u_uid)
|
|
1051 |
+ self._send_blobs(remote, missing_blobs.values(), u_uid)
|
|
1052 |
+ |
|
1053 |
+ def _send_blobs(self, remote, digests, u_uid=uuid.uuid4()):
|
|
1054 |
+ batch = _CASBatchUpdate(remote)
|
|
1055 |
+ |
|
1056 |
+ for digest in digests:
|
|
1057 |
+ with open(self.objpath(digest), 'rb') as f:
|
|
1058 |
+ assert os.fstat(f.fileno()).st_size == digest.size_bytes
|
|
1059 |
+ |
|
1060 |
+ if (digest.size_bytes >= remote.max_batch_total_size_bytes or
|
|
1061 |
+ not remote.batch_update_supported):
|
|
1062 |
+ # Too large for batch request, upload in independent request.
|
|
1063 |
+ self._send_blob(remote, digest, f, u_uid=u_uid)
|
|
1064 |
+ else:
|
|
1065 |
+ if not batch.add(digest, f):
|
|
1066 |
+ # Not enough space left in batch request.
|
|
1067 |
+ # Complete pending batch first.
|
|
1068 |
+ batch.send()
|
|
1069 |
+ batch = _CASBatchUpdate(remote)
|
|
1070 |
+ batch.add(digest, f)
|
|
1071 |
+ |
|
1072 |
+ # Send final batch
|
|
1073 |
+ batch.send()
|
|
1055 | 1074 |
|
1056 | 1075 |
|
1057 | 1076 |
# Represents a single remote CAS cache.
|
... | ... | @@ -1126,6 +1145,17 @@ class _CASRemote(): |
1126 | 1145 |
if e.code() != grpc.StatusCode.UNIMPLEMENTED:
|
1127 | 1146 |
raise
|
1128 | 1147 |
|
1148 |
+ # Check whether the server supports BatchUpdateBlobs()
|
|
1149 |
+ self.batch_update_supported = False
|
|
1150 |
+ try:
|
|
1151 |
+ request = remote_execution_pb2.BatchUpdateBlobsRequest()
|
|
1152 |
+ response = self.cas.BatchUpdateBlobs(request)
|
|
1153 |
+ self.batch_update_supported = True
|
|
1154 |
+ except grpc.RpcError as e:
|
|
1155 |
+ if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
|
|
1156 |
+ e.code() != grpc.StatusCode.PERMISSION_DENIED):
|
|
1157 |
+ raise
|
|
1158 |
+ |
|
1129 | 1159 |
self._initialized = True
|
1130 | 1160 |
|
1131 | 1161 |
|
... | ... | @@ -1173,6 +1203,46 @@ class _CASBatchRead(): |
1173 | 1203 |
yield (response.digest, response.data)
|
1174 | 1204 |
|
1175 | 1205 |
|
1206 |
+# Represents a batch of blobs queued for upload.
|
|
1207 |
+#
|
|
1208 |
+class _CASBatchUpdate():
|
|
1209 |
+ def __init__(self, remote):
|
|
1210 |
+ self._remote = remote
|
|
1211 |
+ self._max_total_size_bytes = remote.max_batch_total_size_bytes
|
|
1212 |
+ self._request = remote_execution_pb2.BatchUpdateBlobsRequest()
|
|
1213 |
+ self._size = 0
|
|
1214 |
+ self._sent = False
|
|
1215 |
+ |
|
1216 |
+ def add(self, digest, stream):
|
|
1217 |
+ assert not self._sent
|
|
1218 |
+ |
|
1219 |
+ new_batch_size = self._size + digest.size_bytes
|
|
1220 |
+ if new_batch_size > self._max_total_size_bytes:
|
|
1221 |
+ # Not enough space left in current batch
|
|
1222 |
+ return False
|
|
1223 |
+ |
|
1224 |
+ blob_request = self._request.requests.add()
|
|
1225 |
+ blob_request.digest.hash = digest.hash
|
|
1226 |
+ blob_request.digest.size_bytes = digest.size_bytes
|
|
1227 |
+ blob_request.data = stream.read(digest.size_bytes)
|
|
1228 |
+ self._size = new_batch_size
|
|
1229 |
+ return True
|
|
1230 |
+ |
|
1231 |
+ def send(self):
|
|
1232 |
+ assert not self._sent
|
|
1233 |
+ self._sent = True
|
|
1234 |
+ |
|
1235 |
+ if len(self._request.requests) == 0:
|
|
1236 |
+ return
|
|
1237 |
+ |
|
1238 |
+ batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
|
|
1239 |
+ |
|
1240 |
+ for response in batch_response.responses:
|
|
1241 |
+ if response.status.code != grpc.StatusCode.OK.value[0]:
|
|
1242 |
+ raise ArtifactError("Failed to upload blob {}: {}".format(
|
|
1243 |
+ response.digest.hash, response.status.code))
|
|
1244 |
+ |
|
1245 |
+ |
|
1176 | 1246 |
def _grouper(iterable, n):
|
1177 | 1247 |
while True:
|
1178 | 1248 |
try:
|
... | ... | @@ -68,7 +68,7 @@ def create_server(repo, *, enable_push): |
68 | 68 |
_ByteStreamServicer(artifactcache, enable_push=enable_push), server)
|
69 | 69 |
|
70 | 70 |
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
|
71 |
- _ContentAddressableStorageServicer(artifactcache), server)
|
|
71 |
+ _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
|
|
72 | 72 |
|
73 | 73 |
remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
|
74 | 74 |
_CapabilitiesServicer(), server)
|
... | ... | @@ -222,9 +222,10 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
222 | 222 |
|
223 | 223 |
|
224 | 224 |
class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
|
225 |
- def __init__(self, cas):
|
|
225 |
+ def __init__(self, cas, *, enable_push):
|
|
226 | 226 |
super().__init__()
|
227 | 227 |
self.cas = cas
|
228 |
+ self.enable_push = enable_push
|
|
228 | 229 |
|
229 | 230 |
def FindMissingBlobs(self, request, context):
|
230 | 231 |
response = remote_execution_pb2.FindMissingBlobsResponse()
|
... | ... | @@ -260,6 +261,46 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres |
260 | 261 |
|
261 | 262 |
return response
|
262 | 263 |
|
264 |
+ def BatchUpdateBlobs(self, request, context):
|
|
265 |
+ response = remote_execution_pb2.BatchUpdateBlobsResponse()
|
|
266 |
+ |
|
267 |
+ if not self.enable_push:
|
|
268 |
+ context.set_code(grpc.StatusCode.PERMISSION_DENIED)
|
|
269 |
+ return response
|
|
270 |
+ |
|
271 |
+ batch_size = 0
|
|
272 |
+ |
|
273 |
+ for blob_request in request.requests:
|
|
274 |
+ digest = blob_request.digest
|
|
275 |
+ |
|
276 |
+ batch_size += digest.size_bytes
|
|
277 |
+ if batch_size > _MAX_PAYLOAD_BYTES:
|
|
278 |
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
|
279 |
+ return response
|
|
280 |
+ |
|
281 |
+ blob_response = response.responses.add()
|
|
282 |
+ blob_response.digest.hash = digest.hash
|
|
283 |
+ blob_response.digest.size_bytes = digest.size_bytes
|
|
284 |
+ |
|
285 |
+ if len(blob_request.data) != digest.size_bytes:
|
|
286 |
+ blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
|
|
287 |
+ continue
|
|
288 |
+ |
|
289 |
+ try:
|
|
290 |
+ _clean_up_cache(self.cas, digest.size_bytes)
|
|
291 |
+ |
|
292 |
+ with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
|
|
293 |
+ out.write(blob_request.data)
|
|
294 |
+ out.flush()
|
|
295 |
+ server_digest = self.cas.add_object(path=out.name)
|
|
296 |
+ if server_digest.hash != digest.hash:
|
|
297 |
+ blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
|
|
298 |
+ |
|
299 |
+ except ArtifactTooLargeException:
|
|
300 |
+ blob_response.status.code = grpc.StatusCode.RESOURCE_EXHAUSTED
|
|
301 |
+ |
|
302 |
+ return response
|
|
303 |
+ |
|
263 | 304 |
|
264 | 305 |
class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer):
|
265 | 306 |
def GetCapabilities(self, request, context):
|