... |
... |
@@ -38,6 +38,10 @@ from .._context import Context |
38
|
38
|
from .cascache import CASCache
|
39
|
39
|
|
40
|
40
|
|
|
41
|
+# The default limit for gRPC messages is 4 MiB
|
|
42
|
+_MAX_BATCH_TOTAL_SIZE_BYTES = 4 * 1024 * 1024
|
|
43
|
+
|
|
44
|
+
|
41
|
45
|
# Trying to push an artifact that is too large
|
42
|
46
|
class ArtifactTooLargeException(Exception):
|
43
|
47
|
pass
|
... |
... |
@@ -67,6 +71,9 @@ def create_server(repo, *, enable_push): |
67
|
71
|
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
|
68
|
72
|
_ContentAddressableStorageServicer(artifactcache), server)
|
69
|
73
|
|
|
74
|
+ remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
|
|
75
|
+ _CapabilitiesServicer(), server)
|
|
76
|
+
|
70
|
77
|
buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server(
|
71
|
78
|
_ReferenceStorageServicer(artifactcache, enable_push=enable_push), server)
|
72
|
79
|
|
... |
... |
@@ -229,6 +236,48 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres |
229
|
236
|
d.size_bytes = digest.size_bytes
|
230
|
237
|
return response
|
231
|
238
|
|
|
239
|
+ def BatchReadBlobs(self, request, context):
|
|
240
|
+ response = remote_execution_pb2.BatchReadBlobsResponse()
|
|
241
|
+ batch_size = 0
|
|
242
|
+
|
|
243
|
+ for digest in request.digests:
|
|
244
|
+ batch_size += digest.size_bytes
|
|
245
|
+ if batch_size > _MAX_BATCH_TOTAL_SIZE_BYTES:
|
|
246
|
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
|
247
|
+ return response
|
|
248
|
+
|
|
249
|
+ blob_response = response.responses.add()
|
|
250
|
+ blob_response.digest.hash = digest.hash
|
|
251
|
+ blob_response.digest.size_bytes = digest.size_bytes
|
|
252
|
+ try:
|
|
253
|
+ with open(self.cas.objpath(digest), 'rb') as f:
|
|
254
|
+ if os.fstat(f.fileno()).st_size != digest.size_bytes:
|
|
255
|
+ blob_response.status.code = grpc.StatusCode.NOT_FOUND
|
|
256
|
+ continue
|
|
257
|
+
|
|
258
|
+ blob_response.data = f.read(digest.size_bytes)
|
|
259
|
+ except FileNotFoundError:
|
|
260
|
+ blob_response.status.code = grpc.StatusCode.NOT_FOUND
|
|
261
|
+
|
|
262
|
+ return response
|
|
263
|
+
|
|
264
|
+
|
|
265
|
+class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer):
|
|
266
|
+ def GetCapabilities(self, request, context):
|
|
267
|
+ response = remote_execution_pb2.ServerCapabilities()
|
|
268
|
+
|
|
269
|
+ cache_capabilities = response.cache_capabilities
|
|
270
|
+ cache_capabilities.digest_function.append(remote_execution_pb2.SHA256)
|
|
271
|
+ cache_capabilities.action_cache_update_capabilities.update_enabled = False
|
|
272
|
+ cache_capabilities.max_batch_total_size_bytes = _MAX_BATCH_TOTAL_SIZE_BYTES
|
|
273
|
+ cache_capabilities.symlink_absolute_path_strategy = remote_execution_pb2.CacheCapabilities.ALLOWED
|
|
274
|
+
|
|
275
|
+ response.deprecated_api_version.major = 2
|
|
276
|
+ response.low_api_version.major = 2
|
|
277
|
+ response.high_api_version.major = 2
|
|
278
|
+
|
|
279
|
+ return response
|
|
280
|
+
|
232
|
281
|
|
233
|
282
|
class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
|
234
|
283
|
def __init__(self, cas, *, enable_push):
|