Jürg Billeter pushed to branch juerg/cas-batch at BuildStream / buildstream
Commits:
-
14e1a3b3
by Jürg Billeter at 2018-10-01T08:18:05Z
-
232662f1
by Jürg Billeter at 2018-10-01T08:53:06Z
-
f447aedd
by Tiago Gomes at 2018-10-01T10:33:11Z
-
682dddce
by Tiago Gomes at 2018-10-01T10:35:12Z
-
fafa8136
by Tiago Gomes at 2018-10-01T10:59:54Z
-
26e1a3c7
by Jürg Billeter at 2018-10-01T14:58:06Z
-
f47895c0
by Jürg Billeter at 2018-10-01T14:58:06Z
5 changed files:
- .gitlab-ci.yml
- buildstream/_artifactcache/cascache.py
- buildstream/_artifactcache/casserver.py
- buildstream/_platform/darwin.py
- buildstream/_platform/platform.py
Changes:
... | ... | @@ -161,14 +161,14 @@ docs: |
161 | 161 |
.overnight-tests: &overnight-tests-template
|
162 | 162 |
stage: test
|
163 | 163 |
variables:
|
164 |
- bst_ext_url: git+https://gitlab.com/BuildStream/bst-external.git
|
|
165 |
- bst_ext_ref: 1d6ab71151b93c8cbc0a91a36ffe9270f3b835f1 # 0.5.1
|
|
166 |
- fd_sdk_ref: 88d7c22c2281b987faa02edd57df80d430eecf1f # 18.08.12
|
|
164 |
+ BST_EXT_URL: git+https://gitlab.com/BuildStream/bst-external.git
|
|
165 |
+ BST_EXT_REF: 1d6ab71151b93c8cbc0a91a36ffe9270f3b835f1 # 0.5.1
|
|
166 |
+ FD_SDK_REF: 88d7c22c2281b987faa02edd57df80d430eecf1f # 18.08.11-35-g88d7c22c
|
|
167 | 167 |
before_script:
|
168 | 168 |
- (cd dist && ./unpack.sh && cd buildstream && pip3 install .)
|
169 |
- - pip3 install --user -e ${bst_ext_url}@${bst_ext_ref}#egg=bst_ext
|
|
169 |
+ - pip3 install --user -e ${BST_EXT_URL}@${BST_EXT_REF}#egg=bst_ext
|
|
170 | 170 |
- git clone https://gitlab.com/freedesktop-sdk/freedesktop-sdk.git
|
171 |
- - git -C freedesktop-sdk checkout ${fd_sdk_ref}
|
|
171 |
+ - git -C freedesktop-sdk checkout ${FD_SDK_REF}
|
|
172 | 172 |
only:
|
173 | 173 |
- schedules
|
174 | 174 |
|
... | ... | @@ -1048,10 +1048,29 @@ class CASCache(ArtifactCache): |
1048 | 1048 |
missing_blobs[d.hash] = d
|
1049 | 1049 |
|
1050 | 1050 |
# Upload any blobs missing on the server
|
1051 |
- for blob_digest in missing_blobs.values():
|
|
1052 |
- with open(self.objpath(blob_digest), 'rb') as f:
|
|
1053 |
- assert os.fstat(f.fileno()).st_size == blob_digest.size_bytes
|
|
1054 |
- self._send_blob(remote, blob_digest, f, u_uid=u_uid)
|
|
1051 |
+ self._send_blobs(remote, missing_blobs.values(), u_uid)
|
|
1052 |
+ |
|
1053 |
+ def _send_blobs(self, remote, digests, u_uid=uuid.uuid4()):
|
|
1054 |
+ batch = _CASBatchUpdate(remote)
|
|
1055 |
+ |
|
1056 |
+ for digest in digests:
|
|
1057 |
+ with open(self.objpath(digest), 'rb') as f:
|
|
1058 |
+ assert os.fstat(f.fileno()).st_size == digest.size_bytes
|
|
1059 |
+ |
|
1060 |
+ if (digest.size_bytes >= remote.max_batch_total_size_bytes or
|
|
1061 |
+ not remote.batch_update_supported):
|
|
1062 |
+ # Too large for batch request, upload in independent request.
|
|
1063 |
+ self._send_blob(remote, digest, f, u_uid=u_uid)
|
|
1064 |
+ else:
|
|
1065 |
+ if not batch.add(digest, f):
|
|
1066 |
+ # Not enough space left in batch request.
|
|
1067 |
+ # Complete pending batch first.
|
|
1068 |
+ batch.send()
|
|
1069 |
+ batch = _CASBatchUpdate(remote)
|
|
1070 |
+ batch.add(digest, f)
|
|
1071 |
+ |
|
1072 |
+ # Send final batch
|
|
1073 |
+ batch.send()
|
|
1055 | 1074 |
|
1056 | 1075 |
|
1057 | 1076 |
# Represents a single remote CAS cache.
|
... | ... | @@ -1126,6 +1145,17 @@ class _CASRemote(): |
1126 | 1145 |
if e.code() != grpc.StatusCode.UNIMPLEMENTED:
|
1127 | 1146 |
raise
|
1128 | 1147 |
|
1148 |
+ # Check whether the server supports BatchUpdateBlobs()
|
|
1149 |
+ self.batch_update_supported = False
|
|
1150 |
+ try:
|
|
1151 |
+ request = remote_execution_pb2.BatchUpdateBlobsRequest()
|
|
1152 |
+ response = self.cas.BatchUpdateBlobs(request)
|
|
1153 |
+ self.batch_update_supported = True
|
|
1154 |
+ except grpc.RpcError as e:
|
|
1155 |
+ if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
|
|
1156 |
+ e.code() != grpc.StatusCode.PERMISSION_DENIED):
|
|
1157 |
+ raise
|
|
1158 |
+ |
|
1129 | 1159 |
self._initialized = True
|
1130 | 1160 |
|
1131 | 1161 |
|
... | ... | @@ -1173,6 +1203,46 @@ class _CASBatchRead(): |
1173 | 1203 |
yield (response.digest, response.data)
|
1174 | 1204 |
|
1175 | 1205 |
|
1206 |
+# Represents a batch of blobs queued for upload.
|
|
1207 |
+#
|
|
1208 |
+class _CASBatchUpdate():
|
|
1209 |
+ def __init__(self, remote):
|
|
1210 |
+ self._remote = remote
|
|
1211 |
+ self._max_total_size_bytes = remote.max_batch_total_size_bytes
|
|
1212 |
+ self._request = remote_execution_pb2.BatchUpdateBlobsRequest()
|
|
1213 |
+ self._size = 0
|
|
1214 |
+ self._sent = False
|
|
1215 |
+ |
|
1216 |
+ def add(self, digest, stream):
|
|
1217 |
+ assert not self._sent
|
|
1218 |
+ |
|
1219 |
+ new_batch_size = self._size + digest.size_bytes
|
|
1220 |
+ if new_batch_size > self._max_total_size_bytes:
|
|
1221 |
+ # Not enough space left in current batch
|
|
1222 |
+ return False
|
|
1223 |
+ |
|
1224 |
+ blob_request = self._request.requests.add()
|
|
1225 |
+ blob_request.digest.hash = digest.hash
|
|
1226 |
+ blob_request.digest.size_bytes = digest.size_bytes
|
|
1227 |
+ blob_request.data = stream.read(digest.size_bytes)
|
|
1228 |
+ self._size = new_batch_size
|
|
1229 |
+ return True
|
|
1230 |
+ |
|
1231 |
+ def send(self):
|
|
1232 |
+ assert not self._sent
|
|
1233 |
+ self._sent = True
|
|
1234 |
+ |
|
1235 |
+ if len(self._request.requests) == 0:
|
|
1236 |
+ return
|
|
1237 |
+ |
|
1238 |
+ batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
|
|
1239 |
+ |
|
1240 |
+ for response in batch_response.responses:
|
|
1241 |
+ if response.status.code != grpc.StatusCode.OK.value[0]:
|
|
1242 |
+ raise ArtifactError("Failed to upload blob {}: {}".format(
|
|
1243 |
+ response.digest.hash, response.status.code))
|
|
1244 |
+ |
|
1245 |
+ |
|
1176 | 1246 |
def _grouper(iterable, n):
|
1177 | 1247 |
while True:
|
1178 | 1248 |
try:
|
... | ... | @@ -68,7 +68,7 @@ def create_server(repo, *, enable_push): |
68 | 68 |
_ByteStreamServicer(artifactcache, enable_push=enable_push), server)
|
69 | 69 |
|
70 | 70 |
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
|
71 |
- _ContentAddressableStorageServicer(artifactcache), server)
|
|
71 |
+ _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
|
|
72 | 72 |
|
73 | 73 |
remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
|
74 | 74 |
_CapabilitiesServicer(), server)
|
... | ... | @@ -222,9 +222,10 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
222 | 222 |
|
223 | 223 |
|
224 | 224 |
class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
|
225 |
- def __init__(self, cas):
|
|
225 |
+ def __init__(self, cas, *, enable_push):
|
|
226 | 226 |
super().__init__()
|
227 | 227 |
self.cas = cas
|
228 |
+ self.enable_push = enable_push
|
|
228 | 229 |
|
229 | 230 |
def FindMissingBlobs(self, request, context):
|
230 | 231 |
response = remote_execution_pb2.FindMissingBlobsResponse()
|
... | ... | @@ -260,6 +261,46 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres |
260 | 261 |
|
261 | 262 |
return response
|
262 | 263 |
|
264 |
+ def BatchUpdateBlobs(self, request, context):
|
|
265 |
+ response = remote_execution_pb2.BatchUpdateBlobsResponse()
|
|
266 |
+ |
|
267 |
+ if not self.enable_push:
|
|
268 |
+ context.set_code(grpc.StatusCode.PERMISSION_DENIED)
|
|
269 |
+ return response
|
|
270 |
+ |
|
271 |
+ batch_size = 0
|
|
272 |
+ |
|
273 |
+ for blob_request in request.requests:
|
|
274 |
+ digest = blob_request.digest
|
|
275 |
+ |
|
276 |
+ batch_size += digest.size_bytes
|
|
277 |
+ if batch_size > _MAX_PAYLOAD_BYTES:
|
|
278 |
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
|
279 |
+ return response
|
|
280 |
+ |
|
281 |
+ blob_response = response.responses.add()
|
|
282 |
+ blob_response.digest.hash = digest.hash
|
|
283 |
+ blob_response.digest.size_bytes = digest.size_bytes
|
|
284 |
+ |
|
285 |
+ if len(blob_request.data) != digest.size_bytes:
|
|
286 |
+ blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
|
|
287 |
+ continue
|
|
288 |
+ |
|
289 |
+ try:
|
|
290 |
+ _clean_up_cache(self.cas, digest.size_bytes)
|
|
291 |
+ |
|
292 |
+ with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
|
|
293 |
+ out.write(blob_request.data)
|
|
294 |
+ out.flush()
|
|
295 |
+ server_digest = self.cas.add_object(path=out.name)
|
|
296 |
+ if server_digest.hash != digest.hash:
|
|
297 |
+ blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
|
|
298 |
+ |
|
299 |
+ except ArtifactTooLargeException:
|
|
300 |
+ blob_response.status.code = grpc.StatusCode.RESOURCE_EXHAUSTED
|
|
301 |
+ |
|
302 |
+ return response
|
|
303 |
+ |
|
263 | 304 |
|
264 | 305 |
class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer):
|
265 | 306 |
def GetCapabilities(self, request, context):
|
... | ... | @@ -41,10 +41,11 @@ class Darwin(Platform): |
41 | 41 |
return True
|
42 | 42 |
|
43 | 43 |
def get_cpu_count(self, cap=None):
|
44 |
- if cap < os.cpu_count():
|
|
45 |
- return cap
|
|
44 |
+ cpu_count = os.cpu_count()
|
|
45 |
+ if cap is None:
|
|
46 |
+ return cpu_count
|
|
46 | 47 |
else:
|
47 |
- return os.cpu_count()
|
|
48 |
+ return min(cpu_count, cap)
|
|
48 | 49 |
|
49 | 50 |
def set_resource_limits(self, soft_limit=OPEN_MAX, hard_limit=None):
|
50 | 51 |
super().set_resource_limits(soft_limit)
|
... | ... | @@ -67,7 +67,11 @@ class Platform(): |
67 | 67 |
return cls._instance
|
68 | 68 |
|
69 | 69 |
def get_cpu_count(self, cap=None):
|
70 |
- return min(len(os.sched_getaffinity(0)), cap)
|
|
70 |
+ cpu_count = len(os.sched_getaffinity(0))
|
|
71 |
+ if cap is None:
|
|
72 |
+ return cpu_count
|
|
73 |
+ else:
|
|
74 |
+ return min(cpu_count, cap)
|
|
71 | 75 |
|
72 | 76 |
##################################################################
|
73 | 77 |
# Sandbox functions #
|