Martin Blanchard pushed to branch jmac/remote_execution_client at BuildStream / buildstream
Commits:
-
9b327eb6
by Ben Brewer at 2018-09-04T14:41:23Z
-
3e67e64a
by Javier Jardón at 2018-09-04T16:43:13Z
-
3409609e
by Daniel Silverstone at 2018-09-04T16:55:51Z
-
7b32e1ec
by Tristan Maat at 2018-09-04T17:20:55Z
-
695c147d
by Jim MacArthur at 2018-09-05T14:34:02Z
-
9aea070e
by Martin Blanchard at 2018-09-05T14:34:03Z
-
41c08baa
by Martin Blanchard at 2018-09-05T14:34:03Z
-
73e302c7
by Martin Blanchard at 2018-09-05T14:34:03Z
-
4a8d6c70
by Jim MacArthur at 2018-09-05T14:34:03Z
-
7d560ec3
by Jim MacArthur at 2018-09-05T14:34:03Z
-
e8adef8c
by Jim MacArthur at 2018-09-05T14:34:03Z
-
2b66ce43
by Jim MacArthur at 2018-09-05T14:34:03Z
-
f16fd627
by Jim MacArthur at 2018-09-05T14:34:04Z
-
8ee4ef4c
by Jim MacArthur at 2018-09-05T14:34:04Z
-
11714d14
by Jim MacArthur at 2018-09-05T14:34:04Z
-
1b62f0d2
by Jim MacArthur at 2018-09-05T14:34:04Z
-
6499c622
by Martin Blanchard at 2018-09-05T14:34:04Z
-
90581058
by Martin Blanchard at 2018-09-05T14:34:04Z
-
3f11a3da
by Martin Blanchard at 2018-09-05T14:34:04Z
-
c6c8119d
by Martin Blanchard at 2018-09-05T14:34:04Z
-
92061282
by Martin Blanchard at 2018-09-05T14:34:04Z
-
a22b8bcc
by Martin Blanchard at 2018-09-05T14:34:04Z
-
41da4a0d
by Martin Blanchard at 2018-09-05T14:34:04Z
27 changed files:
- buildstream/_artifactcache/cascache.py
- buildstream/_project.py
- buildstream/_scheduler/jobs/job.py
- buildstream/buildelement.py
- buildstream/data/projectconfig.yaml
- buildstream/element.py
- buildstream/plugins/elements/autotools.py
- buildstream/plugins/elements/cmake.py
- buildstream/plugins/elements/make.py
- buildstream/plugins/elements/meson.py
- buildstream/plugins/elements/qmake.py
- buildstream/sandbox/__init__.py
- + buildstream/sandbox/_sandboxremote.py
- buildstream/sandbox/sandbox.py
- buildstream/storage/_casbaseddirectory.py
- doc/source/format_project.rst
- doc/source/install_artifacts.rst
- + tests/artifactcache/project/elements/compose-all.bst
- + tests/artifactcache/project/elements/import-bin.bst
- + tests/artifactcache/project/elements/import-dev.bst
- + tests/artifactcache/project/elements/target.bst
- + tests/artifactcache/project/files/bin-files/usr/bin/hello
- + tests/artifactcache/project/files/dev-files/usr/include/pony.h
- + tests/artifactcache/project/project.conf
- + tests/artifactcache/pull.py
- + tests/artifactcache/push.py
- tests/testutils/artifactshare.py
Changes:
... | ... | @@ -19,6 +19,7 @@ |
19 | 19 |
|
20 | 20 |
import hashlib
|
21 | 21 |
import itertools
|
22 |
+import io
|
|
22 | 23 |
import multiprocessing
|
23 | 24 |
import os
|
24 | 25 |
import signal
|
... | ... | @@ -76,6 +77,7 @@ class CASCache(ArtifactCache): |
76 | 77 |
################################################
|
77 | 78 |
# Implementation of abstract methods #
|
78 | 79 |
################################################
|
80 |
+ |
|
79 | 81 |
def contains(self, element, key):
|
80 | 82 |
refpath = self._refpath(self.get_artifact_fullname(element, key))
|
81 | 83 |
|
... | ... | @@ -259,6 +261,25 @@ class CASCache(ArtifactCache): |
259 | 261 |
|
260 | 262 |
return False
|
261 | 263 |
|
264 |
+ def pull_tree(self, project, digest):
|
|
265 |
+ """ Pull a single Tree rather than an artifact.
|
|
266 |
+ Does not update local refs. """
|
|
267 |
+ |
|
268 |
+ for remote in self._remotes[project]:
|
|
269 |
+ try:
|
|
270 |
+ remote.init()
|
|
271 |
+ |
|
272 |
+ digest = self._fetch_tree(remote, digest)
|
|
273 |
+ |
|
274 |
+ # no need to pull from additional remotes
|
|
275 |
+ return digest
|
|
276 |
+ |
|
277 |
+ except grpc.RpcError as e:
|
|
278 |
+ if e.code() != grpc.StatusCode.NOT_FOUND:
|
|
279 |
+ raise
|
|
280 |
+ |
|
281 |
+ return None
|
|
282 |
+ |
|
262 | 283 |
def link_key(self, element, oldkey, newkey):
|
263 | 284 |
oldref = self.get_artifact_fullname(element, oldkey)
|
264 | 285 |
newref = self.get_artifact_fullname(element, newkey)
|
... | ... | @@ -267,8 +288,46 @@ class CASCache(ArtifactCache): |
267 | 288 |
|
268 | 289 |
self.set_ref(newref, tree)
|
269 | 290 |
|
291 |
+ def _push_refs_to_remote(self, refs, remote):
|
|
292 |
+ skipped_remote = True
|
|
293 |
+ try:
|
|
294 |
+ for ref in refs:
|
|
295 |
+ tree = self.resolve_ref(ref)
|
|
296 |
+ |
|
297 |
+ # Check whether ref is already on the server in which case
|
|
298 |
+ # there is no need to push the artifact
|
|
299 |
+ try:
|
|
300 |
+ request = buildstream_pb2.GetReferenceRequest()
|
|
301 |
+ request.key = ref
|
|
302 |
+ response = remote.ref_storage.GetReference(request)
|
|
303 |
+ |
|
304 |
+ if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
|
|
305 |
+ # ref is already on the server with the same tree
|
|
306 |
+ continue
|
|
307 |
+ |
|
308 |
+ except grpc.RpcError as e:
|
|
309 |
+ if e.code() != grpc.StatusCode.NOT_FOUND:
|
|
310 |
+ # Intentionally re-raise RpcError for outer except block.
|
|
311 |
+ raise
|
|
312 |
+ |
|
313 |
+ self._send_directory(remote, tree)
|
|
314 |
+ |
|
315 |
+ request = buildstream_pb2.UpdateReferenceRequest()
|
|
316 |
+ request.keys.append(ref)
|
|
317 |
+ request.digest.hash = tree.hash
|
|
318 |
+ request.digest.size_bytes = tree.size_bytes
|
|
319 |
+ remote.ref_storage.UpdateReference(request)
|
|
320 |
+ |
|
321 |
+ skipped_remote = False
|
|
322 |
+ except grpc.RpcError as e:
|
|
323 |
+ if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
|
|
324 |
+ raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
|
|
325 |
+ |
|
326 |
+ return not skipped_remote
|
|
327 |
+ |
|
270 | 328 |
def push(self, element, keys):
|
271 |
- refs = [self.get_artifact_fullname(element, key) for key in keys]
|
|
329 |
+ |
|
330 |
+ refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
|
|
272 | 331 |
|
273 | 332 |
project = element._get_project()
|
274 | 333 |
|
... | ... | @@ -278,97 +337,80 @@ class CASCache(ArtifactCache): |
278 | 337 |
|
279 | 338 |
for remote in push_remotes:
|
280 | 339 |
remote.init()
|
281 |
- skipped_remote = True
|
|
282 |
- element.info("Pushing {} -> {}".format(element._get_brief_display_key(), remote.spec.url))
|
|
283 | 340 |
|
284 |
- try:
|
|
285 |
- for ref in refs:
|
|
286 |
- tree = self.resolve_ref(ref)
|
|
287 |
- |
|
288 |
- # Check whether ref is already on the server in which case
|
|
289 |
- # there is no need to push the artifact
|
|
290 |
- try:
|
|
291 |
- request = buildstream_pb2.GetReferenceRequest()
|
|
292 |
- request.key = ref
|
|
293 |
- response = remote.ref_storage.GetReference(request)
|
|
294 |
- |
|
295 |
- if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
|
|
296 |
- # ref is already on the server with the same tree
|
|
297 |
- continue
|
|
298 |
- |
|
299 |
- except grpc.RpcError as e:
|
|
300 |
- if e.code() != grpc.StatusCode.NOT_FOUND:
|
|
301 |
- # Intentionally re-raise RpcError for outer except block.
|
|
302 |
- raise
|
|
303 |
- |
|
304 |
- missing_blobs = {}
|
|
305 |
- required_blobs = self._required_blobs(tree)
|
|
306 |
- |
|
307 |
- # Limit size of FindMissingBlobs request
|
|
308 |
- for required_blobs_group in _grouper(required_blobs, 512):
|
|
309 |
- request = remote_execution_pb2.FindMissingBlobsRequest()
|
|
310 |
- |
|
311 |
- for required_digest in required_blobs_group:
|
|
312 |
- d = request.blob_digests.add()
|
|
313 |
- d.hash = required_digest.hash
|
|
314 |
- d.size_bytes = required_digest.size_bytes
|
|
315 |
- |
|
316 |
- response = remote.cas.FindMissingBlobs(request)
|
|
317 |
- for digest in response.missing_blob_digests:
|
|
318 |
- d = remote_execution_pb2.Digest()
|
|
319 |
- d.hash = digest.hash
|
|
320 |
- d.size_bytes = digest.size_bytes
|
|
321 |
- missing_blobs[d.hash] = d
|
|
322 |
- |
|
323 |
- # Upload any blobs missing on the server
|
|
324 |
- skipped_remote = False
|
|
325 |
- for digest in missing_blobs.values():
|
|
326 |
- uuid_ = uuid.uuid4()
|
|
327 |
- resource_name = '/'.join(['uploads', str(uuid_), 'blobs',
|
|
328 |
- digest.hash, str(digest.size_bytes)])
|
|
329 |
- |
|
330 |
- def request_stream(resname):
|
|
331 |
- with open(self.objpath(digest), 'rb') as f:
|
|
332 |
- assert os.fstat(f.fileno()).st_size == digest.size_bytes
|
|
333 |
- offset = 0
|
|
334 |
- finished = False
|
|
335 |
- remaining = digest.size_bytes
|
|
336 |
- while not finished:
|
|
337 |
- chunk_size = min(remaining, 64 * 1024)
|
|
338 |
- remaining -= chunk_size
|
|
339 |
- |
|
340 |
- request = bytestream_pb2.WriteRequest()
|
|
341 |
- request.write_offset = offset
|
|
342 |
- # max. 64 kB chunks
|
|
343 |
- request.data = f.read(chunk_size)
|
|
344 |
- request.resource_name = resname
|
|
345 |
- request.finish_write = remaining <= 0
|
|
346 |
- yield request
|
|
347 |
- offset += chunk_size
|
|
348 |
- finished = request.finish_write
|
|
349 |
- response = remote.bytestream.Write(request_stream(resource_name))
|
|
350 |
- |
|
351 |
- request = buildstream_pb2.UpdateReferenceRequest()
|
|
352 |
- request.keys.append(ref)
|
|
353 |
- request.digest.hash = tree.hash
|
|
354 |
- request.digest.size_bytes = tree.size_bytes
|
|
355 |
- remote.ref_storage.UpdateReference(request)
|
|
356 |
- |
|
357 |
- pushed = True
|
|
358 |
- |
|
359 |
- except grpc.RpcError as e:
|
|
360 |
- if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
|
|
361 |
- raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
|
|
341 |
+ element.info("Pushing {} -> {}".format(element._get_brief_display_key(), remote.spec.url))
|
|
362 | 342 |
|
363 |
- if skipped_remote:
|
|
343 |
+ if self._push_refs_to_remote(refs, remote):
|
|
344 |
+ pushed = True
|
|
345 |
+ else:
|
|
364 | 346 |
self.context.message(Message(
|
365 | 347 |
None,
|
366 | 348 |
MessageType.SKIPPED,
|
367 | 349 |
"Remote ({}) already has {} cached".format(
|
368 | 350 |
remote.spec.url, element._get_brief_display_key())
|
369 | 351 |
))
|
352 |
+ |
|
353 |
+ return pushed
|
|
354 |
+ |
|
355 |
+ def push_directory(self, project, directory):
|
|
356 |
+ |
|
357 |
+ push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
|
358 |
+ |
|
359 |
+ if directory.ref is None:
|
|
360 |
+ return None
|
|
361 |
+ |
|
362 |
+ for remote in push_remotes:
|
|
363 |
+ remote.init()
|
|
364 |
+ |
|
365 |
+ self._send_directory(remote, directory.ref)
|
|
366 |
+ |
|
367 |
+ return directory.ref
|
|
368 |
+ |
|
369 |
+ def push_message(self, project, message):
|
|
370 |
+ |
|
371 |
+ push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
|
372 |
+ |
|
373 |
+ message_buffer = message.SerializeToString()
|
|
374 |
+ message_sha = hashlib.sha256(message_buffer)
|
|
375 |
+ message_digest = remote_execution_pb2.Digest()
|
|
376 |
+ message_digest.hash = message_sha.hexdigest()
|
|
377 |
+ message_digest.size_bytes = len(message_buffer)
|
|
378 |
+ |
|
379 |
+ for remote in push_remotes:
|
|
380 |
+ remote.init()
|
|
381 |
+ |
|
382 |
+ with io.BytesIO(message_buffer) as b:
|
|
383 |
+ self._send_blob(remote, message_digest, b)
|
|
384 |
+ |
|
385 |
+ return message_digest
|
|
386 |
+ |
|
387 |
+ def _verify_digest_on_remote(self, remote, digest):
|
|
388 |
+ # Check whether ref is already on the server in which case
|
|
389 |
+ # there is no need to push the artifact
|
|
390 |
+ request = remote_execution_pb2.FindMissingBlobsRequest()
|
|
391 |
+ request.blob_digests.extend([digest])
|
|
392 |
+ |
|
393 |
+ response = remote.cas.FindMissingBlobs(request)
|
|
394 |
+ if digest in response.missing_blob_digests:
|
|
395 |
+ return False
|
|
396 |
+ |
|
397 |
+ return True
|
|
398 |
+ |
|
399 |
+ def verify_digest_pushed(self, project, digest):
|
|
400 |
+ |
|
401 |
+ push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
|
402 |
+ |
|
403 |
+ pushed = False
|
|
404 |
+ |
|
405 |
+ for remote in push_remotes:
|
|
406 |
+ remote.init()
|
|
407 |
+ |
|
408 |
+ if self._verify_digest_on_remote(remote, digest):
|
|
409 |
+ pushed = True
|
|
410 |
+ |
|
370 | 411 |
return pushed
|
371 | 412 |
|
413 |
+ |
|
372 | 414 |
################################################
|
373 | 415 |
# API Private Methods #
|
374 | 416 |
################################################
|
... | ... | @@ -599,6 +641,7 @@ class CASCache(ArtifactCache): |
599 | 641 |
################################################
|
600 | 642 |
# Local Private Methods #
|
601 | 643 |
################################################
|
644 |
+ |
|
602 | 645 |
def _checkout(self, dest, tree):
|
603 | 646 |
os.makedirs(dest, exist_ok=True)
|
604 | 647 |
|
... | ... | @@ -761,16 +804,16 @@ class CASCache(ArtifactCache): |
761 | 804 |
#
|
762 | 805 |
q.put(str(e))
|
763 | 806 |
|
764 |
- def _required_blobs(self, tree):
|
|
807 |
+ def _required_blobs(self, directory_digest):
|
|
765 | 808 |
# parse directory, and recursively add blobs
|
766 | 809 |
d = remote_execution_pb2.Digest()
|
767 |
- d.hash = tree.hash
|
|
768 |
- d.size_bytes = tree.size_bytes
|
|
810 |
+ d.hash = directory_digest.hash
|
|
811 |
+ d.size_bytes = directory_digest.size_bytes
|
|
769 | 812 |
yield d
|
770 | 813 |
|
771 | 814 |
directory = remote_execution_pb2.Directory()
|
772 | 815 |
|
773 |
- with open(self.objpath(tree), 'rb') as f:
|
|
816 |
+ with open(self.objpath(directory_digest), 'rb') as f:
|
|
774 | 817 |
directory.ParseFromString(f.read())
|
775 | 818 |
|
776 | 819 |
for filenode in directory.files:
|
... | ... | @@ -782,16 +825,16 @@ class CASCache(ArtifactCache): |
782 | 825 |
for dirnode in directory.directories:
|
783 | 826 |
yield from self._required_blobs(dirnode.digest)
|
784 | 827 |
|
785 |
- def _fetch_blob(self, remote, digest, out):
|
|
828 |
+ def _fetch_blob(self, remote, digest, stream):
|
|
786 | 829 |
resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
|
787 | 830 |
request = bytestream_pb2.ReadRequest()
|
788 | 831 |
request.resource_name = resource_name
|
789 | 832 |
request.read_offset = 0
|
790 | 833 |
for response in remote.bytestream.Read(request):
|
791 |
- out.write(response.data)
|
|
834 |
+ stream.write(response.data)
|
|
835 |
+ stream.flush()
|
|
792 | 836 |
|
793 |
- out.flush()
|
|
794 |
- assert digest.size_bytes == os.fstat(out.fileno()).st_size
|
|
837 |
+ assert digest.size_bytes == os.fstat(stream.fileno()).st_size
|
|
795 | 838 |
|
796 | 839 |
def _fetch_directory(self, remote, tree):
|
797 | 840 |
objpath = self.objpath(tree)
|
... | ... | @@ -827,6 +870,92 @@ class CASCache(ArtifactCache): |
827 | 870 |
digest = self.add_object(path=out.name)
|
828 | 871 |
assert digest.hash == tree.hash
|
829 | 872 |
|
873 |
+ def _fetch_tree(self, remote, digest):
|
|
874 |
+ # download but do not store the Tree object
|
|
875 |
+ with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
|
|
876 |
+ self._fetch_blob(remote, digest, out)
|
|
877 |
+ |
|
878 |
+ tree = remote_execution_pb2.Tree()
|
|
879 |
+ |
|
880 |
+ with open(out.name, 'rb') as f:
|
|
881 |
+ tree.ParseFromString(f.read())
|
|
882 |
+ |
|
883 |
+ tree.children.extend([tree.root])
|
|
884 |
+ for directory in tree.children:
|
|
885 |
+ for filenode in directory.files:
|
|
886 |
+ fileobjpath = self.objpath(filenode.digest)
|
|
887 |
+ if os.path.exists(fileobjpath):
|
|
888 |
+ # already in local cache
|
|
889 |
+ continue
|
|
890 |
+ |
|
891 |
+ with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
|
|
892 |
+ self._fetch_blob(remote, filenode.digest, f)
|
|
893 |
+ |
|
894 |
+ added_digest = self.add_object(path=f.name)
|
|
895 |
+ assert added_digest.hash == filenode.digest.hash
|
|
896 |
+ |
|
897 |
+ # place directory blob only in final location when we've downloaded
|
|
898 |
+ # all referenced blobs to avoid dangling references in the repository
|
|
899 |
+ dirbuffer = directory.SerializeToString()
|
|
900 |
+ dirdigest = self.add_object(buffer=dirbuffer)
|
|
901 |
+ assert dirdigest.size_bytes == len(dirbuffer)
|
|
902 |
+ |
|
903 |
+ return dirdigest
|
|
904 |
+ |
|
905 |
+ def _send_blob(self, remote, digest, stream, u_uid=uuid.uuid4()):
|
|
906 |
+ resource_name = '/'.join(['uploads', str(u_uid), 'blobs',
|
|
907 |
+ digest.hash, str(digest.size_bytes)])
|
|
908 |
+ |
|
909 |
+ def request_stream(resname, instream):
|
|
910 |
+ offset = 0
|
|
911 |
+ finished = False
|
|
912 |
+ remaining = digest.size_bytes
|
|
913 |
+ while not finished:
|
|
914 |
+ chunk_size = min(remaining, 64 * 1024)
|
|
915 |
+ remaining -= chunk_size
|
|
916 |
+ |
|
917 |
+ request = bytestream_pb2.WriteRequest()
|
|
918 |
+ request.write_offset = offset
|
|
919 |
+ # max. 64 kB chunks
|
|
920 |
+ request.data = instream.read(chunk_size)
|
|
921 |
+ request.resource_name = resname
|
|
922 |
+ request.finish_write = remaining <= 0
|
|
923 |
+ |
|
924 |
+ yield request
|
|
925 |
+ |
|
926 |
+ offset += chunk_size
|
|
927 |
+ finished = request.finish_write
|
|
928 |
+ |
|
929 |
+ response = remote.bytestream.Write(request_stream(resource_name, stream))
|
|
930 |
+ |
|
931 |
+ assert response.committed_size == digest.size_bytes
|
|
932 |
+ |
|
933 |
+ def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
|
|
934 |
+ required_blobs = self._required_blobs(digest)
|
|
935 |
+ |
|
936 |
+ missing_blobs = dict()
|
|
937 |
+ # Limit size of FindMissingBlobs request
|
|
938 |
+ for required_blobs_group in _grouper(required_blobs, 512):
|
|
939 |
+ request = remote_execution_pb2.FindMissingBlobsRequest()
|
|
940 |
+ |
|
941 |
+ for required_digest in required_blobs_group:
|
|
942 |
+ d = request.blob_digests.add()
|
|
943 |
+ d.hash = required_digest.hash
|
|
944 |
+ d.size_bytes = required_digest.size_bytes
|
|
945 |
+ |
|
946 |
+ response = remote.cas.FindMissingBlobs(request)
|
|
947 |
+ for missing_digest in response.missing_blob_digests:
|
|
948 |
+ d = remote_execution_pb2.Digest()
|
|
949 |
+ d.hash = missing_digest.hash
|
|
950 |
+ d.size_bytes = missing_digest.size_bytes
|
|
951 |
+ missing_blobs[d.hash] = d
|
|
952 |
+ |
|
953 |
+ # Upload any blobs missing on the server
|
|
954 |
+ for blob_digest in missing_blobs.values():
|
|
955 |
+ with open(self.objpath(blob_digest), 'rb') as f:
|
|
956 |
+ assert os.fstat(f.fileno()).st_size == blob_digest.size_bytes
|
|
957 |
+ self._send_blob(remote, blob_digest, f, u_uid=u_uid)
|
|
958 |
+ |
|
830 | 959 |
|
831 | 960 |
# Represents a single remote CAS cache.
|
832 | 961 |
#
|
... | ... | @@ -129,6 +129,7 @@ class Project(): |
129 | 129 |
|
130 | 130 |
self.artifact_cache_specs = None
|
131 | 131 |
self._sandbox = None
|
132 |
+ self._remote_execution = None
|
|
132 | 133 |
self._splits = None
|
133 | 134 |
|
134 | 135 |
self._context.add_project(self)
|
... | ... | @@ -471,7 +472,7 @@ class Project(): |
471 | 472 |
'aliases', 'name',
|
472 | 473 |
'artifacts', 'options',
|
473 | 474 |
'fail-on-overlap', 'shell', 'fatal-warnings',
|
474 |
- 'ref-storage', 'sandbox', 'mirrors'
|
|
475 |
+ 'ref-storage', 'sandbox', 'mirrors', 'remote-execution'
|
|
475 | 476 |
])
|
476 | 477 |
|
477 | 478 |
#
|
... | ... | @@ -489,6 +490,9 @@ class Project(): |
489 | 490 |
# Load sandbox configuration
|
490 | 491 |
self._sandbox = _yaml.node_get(config, Mapping, 'sandbox')
|
491 | 492 |
|
493 |
+ # Load remote execution configuration
|
|
494 |
+ self._remote_execution = _yaml.node_get(config, Mapping, 'remote-execution')
|
|
495 |
+ |
|
492 | 496 |
# Load project split rules
|
493 | 497 |
self._splits = _yaml.node_get(config, Mapping, 'split-rules')
|
494 | 498 |
|
... | ... | @@ -109,7 +109,7 @@ class Job(): |
109 | 109 |
# Private members
|
110 | 110 |
#
|
111 | 111 |
self._scheduler = scheduler # The scheduler
|
112 |
- self._queue = multiprocessing.Queue() # A message passing queue
|
|
112 |
+ self._queue = None # A message passing queue
|
|
113 | 113 |
self._process = None # The Process object
|
114 | 114 |
self._watcher = None # Child process watcher
|
115 | 115 |
self._listening = False # Whether the parent is currently listening
|
... | ... | @@ -130,6 +130,8 @@ class Job(): |
130 | 130 |
#
|
131 | 131 |
def spawn(self):
|
132 | 132 |
|
133 |
+ self._queue = multiprocessing.Queue()
|
|
134 |
+ |
|
133 | 135 |
self._tries += 1
|
134 | 136 |
self._parent_start_listening()
|
135 | 137 |
|
... | ... | @@ -552,6 +554,9 @@ class Job(): |
552 | 554 |
self.parent_complete(returncode == RC_OK, self._result)
|
553 | 555 |
self._scheduler.job_completed(self, returncode == RC_OK)
|
554 | 556 |
|
557 |
+ # Force the deletion of the queue and process objects to try and clean up FDs
|
|
558 |
+ self._queue = self._process = None
|
|
559 |
+ |
|
555 | 560 |
# _parent_process_envelope()
|
556 | 561 |
#
|
557 | 562 |
# Processes a message Envelope deserialized form the message queue.
|
... | ... | @@ -155,6 +155,9 @@ class BuildElement(Element): |
155 | 155 |
command_dir = build_root
|
156 | 156 |
sandbox.set_work_directory(command_dir)
|
157 | 157 |
|
158 |
+ # Tell sandbox which directory is preserved in the finished artifact
|
|
159 |
+ sandbox.set_output_directory(install_root)
|
|
160 |
+ |
|
158 | 161 |
# Setup environment
|
159 | 162 |
sandbox.set_environment(self.get_environment())
|
160 | 163 |
|
... | ... | @@ -204,3 +204,6 @@ shell: |
204 | 204 |
# Command to run when `bst shell` does not provide a command
|
205 | 205 |
#
|
206 | 206 |
command: [ 'sh', '-i' ]
|
207 |
+ |
|
208 |
+remote-execution:
|
|
209 |
+ url: ""
|
|
\ No newline at end of file |
... | ... | @@ -95,6 +95,7 @@ from . import _site |
95 | 95 |
from ._platform import Platform
|
96 | 96 |
from .plugin import CoreWarnings
|
97 | 97 |
from .sandbox._config import SandboxConfig
|
98 |
+from .sandbox._sandboxremote import SandboxRemote
|
|
98 | 99 |
|
99 | 100 |
from .storage.directory import Directory
|
100 | 101 |
from .storage._filebaseddirectory import FileBasedDirectory
|
... | ... | @@ -250,6 +251,9 @@ class Element(Plugin): |
250 | 251 |
# Extract Sandbox config
|
251 | 252 |
self.__sandbox_config = self.__extract_sandbox_config(meta)
|
252 | 253 |
|
254 |
+ # Extract remote execution URL
|
|
255 |
+ self.__remote_execution_url = self.__extract_remote_execution_config(meta)
|
|
256 |
+ |
|
253 | 257 |
def __lt__(self, other):
|
254 | 258 |
return self.name < other.name
|
255 | 259 |
|
... | ... | @@ -1570,6 +1574,8 @@ class Element(Plugin): |
1570 | 1574 |
finally:
|
1571 | 1575 |
if collect is not None:
|
1572 | 1576 |
try:
|
1577 |
+ # Sandbox will probably have replaced its virtual directory, so get it again
|
|
1578 |
+ sandbox_vroot = sandbox.get_virtual_directory()
|
|
1573 | 1579 |
collectvdir = sandbox_vroot.descend(collect.lstrip(os.sep).split(os.sep))
|
1574 | 1580 |
except VirtualDirectoryError:
|
1575 | 1581 |
# No collect directory existed
|
... | ... | @@ -2146,7 +2152,32 @@ class Element(Plugin): |
2146 | 2152 |
project = self._get_project()
|
2147 | 2153 |
platform = Platform.get_platform()
|
2148 | 2154 |
|
2149 |
- if directory is not None and os.path.exists(directory):
|
|
2155 |
+ if self.__remote_execution_url and self.BST_VIRTUAL_DIRECTORY:
|
|
2156 |
+ if not self.__artifacts.has_push_remotes(element=self):
|
|
2157 |
+ # Give an early warning if remote execution will not work
|
|
2158 |
+ raise ElementError("Artifact {} is configured to use remote execution but has no push remotes. "
|
|
2159 |
+ .format(self.name) +
|
|
2160 |
+ "The remote artifact server(s) may not be correctly configured or contactable.")
|
|
2161 |
+ |
|
2162 |
+ self.info("Using a remote sandbox for artifact {}".format(self.name))
|
|
2163 |
+ |
|
2164 |
+ sandbox = SandboxRemote(context, project,
|
|
2165 |
+ directory,
|
|
2166 |
+ stdout=stdout,
|
|
2167 |
+ stderr=stderr,
|
|
2168 |
+ config=config,
|
|
2169 |
+ server_url=self.__remote_execution_url,
|
|
2170 |
+ allow_real_directory=False)
|
|
2171 |
+ yield sandbox
|
|
2172 |
+ |
|
2173 |
+ elif directory is not None and os.path.exists(directory):
|
|
2174 |
+ if self.__remote_execution_url:
|
|
2175 |
+ self.warn("Artifact {} is configured to use remote execution but element plugin does not support it."
|
|
2176 |
+ .format(self.name), detail="Element plugin '{kind}' does not support virtual directories."
|
|
2177 |
+ .format(kind=self.get_kind()), warning_token="remote-failure")
|
|
2178 |
+ |
|
2179 |
+ self.info("Falling back to local sandbox for artifact {}".format(self.name))
|
|
2180 |
+ |
|
2150 | 2181 |
sandbox = platform.create_sandbox(context, project,
|
2151 | 2182 |
directory,
|
2152 | 2183 |
stdout=stdout,
|
... | ... | @@ -2318,6 +2349,20 @@ class Element(Plugin): |
2318 | 2349 |
return SandboxConfig(self.node_get_member(sandbox_config, int, 'build-uid'),
|
2319 | 2350 |
self.node_get_member(sandbox_config, int, 'build-gid'))
|
2320 | 2351 |
|
2352 |
+ # Remote execution configuration data (server URL), to be used by the remote sandbox.
|
|
2353 |
+ #
|
|
2354 |
+ def __extract_remote_execution_config(self, meta):
|
|
2355 |
+ if self.__is_junction:
|
|
2356 |
+ return None
|
|
2357 |
+ else:
|
|
2358 |
+ project = self._get_project()
|
|
2359 |
+ project.ensure_fully_loaded()
|
|
2360 |
+ if project._remote_execution:
|
|
2361 |
+ rexec_config = _yaml.node_chain_copy(project._remote_execution)
|
|
2362 |
+ return self.node_get_member(rexec_config, str, 'url')
|
|
2363 |
+ else:
|
|
2364 |
+ return None
|
|
2365 |
+ |
|
2321 | 2366 |
# This makes a special exception for the split rules, which
|
2322 | 2367 |
# elements may extend but whos defaults are defined in the project.
|
2323 | 2368 |
#
|
... | ... | @@ -57,7 +57,8 @@ from buildstream import BuildElement |
57 | 57 |
|
58 | 58 |
# Element implementation for the 'autotools' kind.
|
59 | 59 |
class AutotoolsElement(BuildElement):
|
60 |
- pass
|
|
60 |
+ # Supports virtual directories (required for remote execution)
|
|
61 |
+ BST_VIRTUAL_DIRECTORY = True
|
|
61 | 62 |
|
62 | 63 |
|
63 | 64 |
# Plugin entry point
|
... | ... | @@ -56,7 +56,8 @@ from buildstream import BuildElement |
56 | 56 |
|
57 | 57 |
# Element implementation for the 'cmake' kind.
|
58 | 58 |
class CMakeElement(BuildElement):
|
59 |
- pass
|
|
59 |
+ # Supports virtual directories (required for remote execution)
|
|
60 |
+ BST_VIRTUAL_DIRECTORY = True
|
|
60 | 61 |
|
61 | 62 |
|
62 | 63 |
# Plugin entry point
|
... | ... | @@ -38,7 +38,8 @@ from buildstream import BuildElement |
38 | 38 |
|
39 | 39 |
# Element implementation for the 'make' kind.
|
40 | 40 |
class MakeElement(BuildElement):
|
41 |
- pass
|
|
41 |
+ # Supports virtual directories (required for remote execution)
|
|
42 |
+ BST_VIRTUAL_DIRECTORY = True
|
|
42 | 43 |
|
43 | 44 |
|
44 | 45 |
# Plugin entry point
|
... | ... | @@ -53,7 +53,8 @@ from buildstream import BuildElement |
53 | 53 |
|
54 | 54 |
# Element implementation for the 'meson' kind.
|
55 | 55 |
class MesonElement(BuildElement):
|
56 |
- pass
|
|
56 |
+ # Supports virtual directories (required for remote execution)
|
|
57 |
+ BST_VIRTUAL_DIRECTORY = True
|
|
57 | 58 |
|
58 | 59 |
|
59 | 60 |
# Plugin entry point
|
... | ... | @@ -33,7 +33,8 @@ from buildstream import BuildElement |
33 | 33 |
|
34 | 34 |
# Element implementation for the 'qmake' kind.
|
35 | 35 |
class QMakeElement(BuildElement):
|
36 |
- pass
|
|
36 |
+ # Supports virtual directories (required for remote execution)
|
|
37 |
+ BST_VIRTUAL_DIRECTORY = True
|
|
37 | 38 |
|
38 | 39 |
|
39 | 40 |
# Plugin entry point
|
... | ... | @@ -20,3 +20,4 @@ |
20 | 20 |
from .sandbox import Sandbox, SandboxFlags
|
21 | 21 |
from ._sandboxchroot import SandboxChroot
|
22 | 22 |
from ._sandboxbwrap import SandboxBwrap
|
23 |
+from ._sandboxremote import SandboxRemote
|
1 |
+#!/usr/bin/env python3
|
|
2 |
+#
|
|
3 |
+# Copyright (C) 2018 Bloomberg LP
|
|
4 |
+#
|
|
5 |
+# This program is free software; you can redistribute it and/or
|
|
6 |
+# modify it under the terms of the GNU Lesser General Public
|
|
7 |
+# License as published by the Free Software Foundation; either
|
|
8 |
+# version 2 of the License, or (at your option) any later version.
|
|
9 |
+#
|
|
10 |
+# This library is distributed in the hope that it will be useful,
|
|
11 |
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
12 |
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
13 |
+# Lesser General Public License for more details.
|
|
14 |
+#
|
|
15 |
+# You should have received a copy of the GNU Lesser General Public
|
|
16 |
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
|
|
17 |
+#
|
|
18 |
+# Authors:
|
|
19 |
+# Jim MacArthur <jim macarthur codethink co uk>
|
|
20 |
+ |
|
21 |
+import os
|
|
22 |
+import re
|
|
23 |
+from urllib.parse import urlparse
|
|
24 |
+ |
|
25 |
+import grpc
|
|
26 |
+ |
|
27 |
+from . import Sandbox
|
|
28 |
+from ..storage._filebaseddirectory import FileBasedDirectory
|
|
29 |
+from ..storage._casbaseddirectory import CasBasedDirectory
|
|
30 |
+from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
|
31 |
+from .._artifactcache.cascache import CASCache
|
|
32 |
+ |
|
33 |
+ |
|
34 |
+class SandboxError(Exception):
|
|
35 |
+ pass
|
|
36 |
+ |
|
37 |
+ |
|
38 |
+# SandboxRemote()
|
|
39 |
+#
|
|
40 |
+# This isn't really a sandbox, it's a stub which sends all the sources and build
|
|
41 |
+# commands to a remote server and retrieves the results from it.
|
|
42 |
+#
|
|
43 |
+class SandboxRemote(Sandbox):
|
|
44 |
+ |
|
45 |
+ def __init__(self, *args, **kwargs):
|
|
46 |
+ super().__init__(*args, **kwargs)
|
|
47 |
+ self.cascache = None
|
|
48 |
+ |
|
49 |
+ url = urlparse(kwargs['server_url'])
|
|
50 |
+ if not url.scheme or not url.hostname or not url.port:
|
|
51 |
+ raise SandboxError("Configured remote URL '{}' does not match the expected layout. "
|
|
52 |
+ .format(self.server_url) +
|
|
53 |
+ "It should be of the form <protocol>://<domain name>:<port>.")
|
|
54 |
+ elif url.scheme != 'http':
|
|
55 |
+ raise SandboxError("Configured remote '{}' uses an unsupported protocol. "
|
|
56 |
+ "Only plain HTTP is currenlty supported (no HTTPS).")
|
|
57 |
+ |
|
58 |
+ self.server_url = '{}:{}'.format(url.hostname, url.port)
|
|
59 |
+ |
|
60 |
+ def _get_cascache(self):
|
|
61 |
+ if self.cascache is None:
|
|
62 |
+ self.cascache = CASCache(self._get_context())
|
|
63 |
+ self.cascache.setup_remotes(use_config=True)
|
|
64 |
+ return self.cascache
|
|
65 |
+ |
|
66 |
+ def run_remote_command(self, command, input_root_digest, working_directory, environment):
|
|
67 |
+ # Sends an execution request to the remote execution server.
|
|
68 |
+ #
|
|
69 |
+ # This function blocks until it gets a response from the server.
|
|
70 |
+ #
|
|
71 |
+ environment_variables = [remote_execution_pb2.Command.
|
|
72 |
+ EnvironmentVariable(name=k, value=v)
|
|
73 |
+ for (k, v) in environment.items()]
|
|
74 |
+ |
|
75 |
+ # Create and send the Command object.
|
|
76 |
+ remote_command = remote_execution_pb2.Command(arguments=command,
|
|
77 |
+ working_directory=working_directory,
|
|
78 |
+ environment_variables=environment_variables,
|
|
79 |
+ output_files=[],
|
|
80 |
+ output_directories=[self._output_directory],
|
|
81 |
+ platform=None)
|
|
82 |
+ |
|
83 |
+ cascache = self._get_cascache()
|
|
84 |
+ # Upload the Command message to the remote CAS server
|
|
85 |
+ command_digest = cascache.push_message(self._get_project(), remote_command)
|
|
86 |
+ if not command_digest or not cascache.verify_digest_pushed(self._get_project(), command_digest):
|
|
87 |
+ # Command push failed
|
|
88 |
+ return None
|
|
89 |
+ |
|
90 |
+ # Create and send the action.
|
|
91 |
+ action = remote_execution_pb2.Action(command_digest=command_digest,
|
|
92 |
+ input_root_digest=input_root_digest,
|
|
93 |
+ timeout=None,
|
|
94 |
+ do_not_cache=False)
|
|
95 |
+ |
|
96 |
+ # Upload the Action message to the remote CAS server
|
|
97 |
+ action_digest = cascache.push_message(self._get_project(), action)
|
|
98 |
+ if not action_digest or not cascache.verify_digest_pushed(self._get_project(), action_digest):
|
|
99 |
+ # Action push failed
|
|
100 |
+ return None
|
|
101 |
+ |
|
102 |
+ # Next, try to create a communication channel to the BuildGrid server.
|
|
103 |
+ channel = grpc.insecure_channel(self.server_url)
|
|
104 |
+ stub = remote_execution_pb2_grpc.ExecutionStub(channel)
|
|
105 |
+ request = remote_execution_pb2.ExecuteRequest(action_digest=action_digest,
|
|
106 |
+ skip_cache_lookup=False)
|
|
107 |
+ try:
|
|
108 |
+ operation_iterator = stub.Execute(request)
|
|
109 |
+ except grpc.RpcError:
|
|
110 |
+ return None
|
|
111 |
+ |
|
112 |
+ operation = None
|
|
113 |
+ with self._get_context().timed_activity("Waiting for the remote build to complete"):
|
|
114 |
+ # It is advantageous to check operation_iterator.code() is grpc.StatusCode.OK here,
|
|
115 |
+ # which will check the server is actually contactable. However, calling it when the
|
|
116 |
+ # server is available seems to cause .code() to hang forever.
|
|
117 |
+ for operation in operation_iterator:
|
|
118 |
+ if operation.done:
|
|
119 |
+ break
|
|
120 |
+ |
|
121 |
+ return operation
|
|
122 |
+ |
|
123 |
+ def process_job_output(self, output_directories, output_files):
|
|
124 |
+ # Reads the remote execution server response to an execution request.
|
|
125 |
+ #
|
|
126 |
+ # output_directories is an array of OutputDirectory objects.
|
|
127 |
+ # output_files is an array of OutputFile objects.
|
|
128 |
+ #
|
|
129 |
+ # We only specify one output_directory, so it's an error
|
|
130 |
+ # for there to be any output files or more than one directory at the moment.
|
|
131 |
+ #
|
|
132 |
+ if output_files:
|
|
133 |
+ raise SandboxError("Output files were returned when we didn't request any.")
|
|
134 |
+ elif not output_directories:
|
|
135 |
+ error_text = "No output directory was returned from the build server."
|
|
136 |
+ raise SandboxError(error_text)
|
|
137 |
+ elif len(output_directories) > 1:
|
|
138 |
+ error_text = "More than one output directory was returned from the build server: {}."
|
|
139 |
+ raise SandboxError(error_text.format(output_directories))
|
|
140 |
+ |
|
141 |
+ tree_digest = output_directories[0].tree_digest
|
|
142 |
+ if tree_digest is None or not tree_digest.hash:
|
|
143 |
+ raise SandboxError("Output directory structure had no digest attached.")
|
|
144 |
+ |
|
145 |
+ cascache = self._get_cascache()
|
|
146 |
+ # Now do a pull to ensure we have the necessary parts.
|
|
147 |
+ dir_digest = cascache.pull_tree(self._get_project(), tree_digest)
|
|
148 |
+ if dir_digest is None or not dir_digest.hash or not dir_digest.size_bytes:
|
|
149 |
+ raise SandboxError("Output directory structure pulling from remote failed.")
|
|
150 |
+ |
|
151 |
+ path_components = os.path.split(self._output_directory)
|
|
152 |
+ |
|
153 |
+ # Now what we have is a digest for the output. Once we return, the calling process will
|
|
154 |
+ # attempt to descend into our directory and find that directory, so we need to overwrite
|
|
155 |
+ # that.
|
|
156 |
+ |
|
157 |
+ if not path_components:
|
|
158 |
+ # The artifact wants the whole directory; we could just return the returned hash in its
|
|
159 |
+ # place, but we don't have a means to do that yet.
|
|
160 |
+ raise SandboxError("Unimplemented: Output directory is empty or equal to the sandbox root.")
|
|
161 |
+ |
|
162 |
+ # At the moment, we will get the whole directory back in the first directory argument and we need
|
|
163 |
+ # to replace the sandbox's virtual directory with that. Creating a new virtual directory object
|
|
164 |
+ # from another hash will be interesting, though...
|
|
165 |
+ |
|
166 |
+ new_dir = CasBasedDirectory(self._get_context(), ref=dir_digest)
|
|
167 |
+ self._set_virtual_directory(new_dir)
|
|
168 |
+ |
|
169 |
+ def run(self, command, flags, *, cwd=None, env=None):
|
|
170 |
+ # Upload sources
|
|
171 |
+ upload_vdir = self.get_virtual_directory()
|
|
172 |
+ |
|
173 |
+ if isinstance(upload_vdir, FileBasedDirectory):
|
|
174 |
+ # Make a new temporary directory to put source in
|
|
175 |
+ upload_vdir = CasBasedDirectory(self._get_context(), ref=None)
|
|
176 |
+ upload_vdir.import_files(self.get_virtual_directory()._get_underlying_directory())
|
|
177 |
+ |
|
178 |
+ upload_vdir.recalculate_hash()
|
|
179 |
+ |
|
180 |
+ cascache = self._get_cascache()
|
|
181 |
+ # Now, push that key (without necessarily needing a ref) to the remote.
|
|
182 |
+ vdir_digest = cascache.push_directory(self._get_project(), upload_vdir)
|
|
183 |
+ if not vdir_digest or not cascache.verify_digest_pushed(self._get_project(), vdir_digest):
|
|
184 |
+ raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
|
|
185 |
+ |
|
186 |
+ # Set up environment and working directory
|
|
187 |
+ if cwd is None:
|
|
188 |
+ cwd = self._get_work_directory()
|
|
189 |
+ |
|
190 |
+ if cwd is None:
|
|
191 |
+ cwd = '/'
|
|
192 |
+ |
|
193 |
+ if env is None:
|
|
194 |
+ env = self._get_environment()
|
|
195 |
+ |
|
196 |
+ # We want command args as a list of strings
|
|
197 |
+ if isinstance(command, str):
|
|
198 |
+ command = [command]
|
|
199 |
+ |
|
200 |
+ # Now transmit the command to execute
|
|
201 |
+ operation = self.run_remote_command(command, upload_vdir.ref, cwd, env)
|
|
202 |
+ |
|
203 |
+ if operation is None:
|
|
204 |
+ # Failure of remote execution, usually due to an error in BuildStream
|
|
205 |
+ # NB This error could be raised in __run_remote_command
|
|
206 |
+ raise SandboxError("No response returned from server")
|
|
207 |
+ |
|
208 |
+ assert(not operation.HasField('error') and operation.HasField('response'))
|
|
209 |
+ |
|
210 |
+ execution_response = remote_execution_pb2.ExecuteResponse()
|
|
211 |
+ # The response is expected to be an ExecutionResponse message
|
|
212 |
+ assert(operation.response.Is(execution_response.DESCRIPTOR))
|
|
213 |
+ |
|
214 |
+ operation.response.Unpack(execution_response)
|
|
215 |
+ |
|
216 |
+ if execution_response.status.code != 0:
|
|
217 |
+ # A normal error during the build: the remote execution system
|
|
218 |
+ # has worked correctly but the command failed.
|
|
219 |
+ # execution_response.error also contains 'message' (str) and
|
|
220 |
+ # 'details' (iterator of Any) which we ignore at the moment.
|
|
221 |
+ return execution_response.status.code
|
|
222 |
+ |
|
223 |
+ action_result = execution_response.result
|
|
224 |
+ |
|
225 |
+ self.process_job_output(action_result.output_directories, action_result.output_files)
|
|
226 |
+ |
|
227 |
+ return 0
|
... | ... | @@ -99,9 +99,11 @@ class Sandbox(): |
99 | 99 |
self.__stdout = kwargs['stdout']
|
100 | 100 |
self.__stderr = kwargs['stderr']
|
101 | 101 |
|
102 |
- # Setup the directories. Root should be available to subclasses, hence
|
|
103 |
- # being single-underscore. The others are private to this class.
|
|
102 |
+ # Setup the directories. Root and output_directory should be
|
|
103 |
+ # available to subclasses, hence being single-underscore. The
|
|
104 |
+ # others are private to this class.
|
|
104 | 105 |
self._root = os.path.join(directory, 'root')
|
106 |
+ self._output_directory = None
|
|
105 | 107 |
self.__directory = directory
|
106 | 108 |
self.__scratch = os.path.join(self.__directory, 'scratch')
|
107 | 109 |
for directory_ in [self._root, self.__scratch]:
|
... | ... | @@ -144,11 +146,17 @@ class Sandbox(): |
144 | 146 |
self._vdir = FileBasedDirectory(self._root)
|
145 | 147 |
return self._vdir
|
146 | 148 |
|
149 |
+ def _set_virtual_directory(self, virtual_directory):
|
|
150 |
+ """ Sets virtual directory. Useful after remote execution
|
|
151 |
+ has rewritten the working directory.
|
|
152 |
+ """
|
|
153 |
+ self._vdir = virtual_directory
|
|
154 |
+ |
|
147 | 155 |
def set_environment(self, environment):
|
148 | 156 |
"""Sets the environment variables for the sandbox
|
149 | 157 |
|
150 | 158 |
Args:
|
151 |
- directory (dict): The environment variables to use in the sandbox
|
|
159 |
+ environment (dict): The environment variables to use in the sandbox
|
|
152 | 160 |
"""
|
153 | 161 |
self.__env = environment
|
154 | 162 |
|
... | ... | @@ -160,6 +168,15 @@ class Sandbox(): |
160 | 168 |
"""
|
161 | 169 |
self.__cwd = directory
|
162 | 170 |
|
171 |
+ def set_output_directory(self, directory):
|
|
172 |
+ """Sets the output directory - the directory which is preserved
|
|
173 |
+ as an artifact after assembly.
|
|
174 |
+ |
|
175 |
+ Args:
|
|
176 |
+ directory (str): An absolute path within the sandbox
|
|
177 |
+ """
|
|
178 |
+ self._output_directory = directory
|
|
179 |
+ |
|
163 | 180 |
def mark_directory(self, directory, *, artifact=False):
|
164 | 181 |
"""Marks a sandbox directory and ensures it will exist
|
165 | 182 |
|
... | ... | @@ -543,6 +543,15 @@ class CasBasedDirectory(Directory): |
543 | 543 |
filelist.append(k)
|
544 | 544 |
return filelist
|
545 | 545 |
|
546 |
+ def recalculate_hash(self):
|
|
547 |
+ """ Recalcuates the hash for this directory and store the results in
|
|
548 |
+ the cache. If this directory has a parent, tell it to
|
|
549 |
+ recalculate (since changing this directory changes an entry in
|
|
550 |
+ the parent). Hashes for subdirectories also get recalculated.
|
|
551 |
+ """
|
|
552 |
+ self._recalculate_recursing_up()
|
|
553 |
+ self._recalculate_recursing_down()
|
|
554 |
+ |
|
546 | 555 |
def _get_identifier(self):
|
547 | 556 |
path = ""
|
548 | 557 |
if self.parent:
|
... | ... | @@ -204,6 +204,24 @@ with an artifact share. |
204 | 204 |
You can also specify a list of caches here; earlier entries in the list
|
205 | 205 |
will have higher priority than later ones.
|
206 | 206 |
|
207 |
+Remote execution
|
|
208 |
+~~~~~~~~~~~~~~~~
|
|
209 |
+BuildStream supports remote execution using the Google Remote Execution API
|
|
210 |
+(REAPI). A description of how remote execution works is beyond the scope
|
|
211 |
+of this document, but you can specify a remote server complying with the REAPI
|
|
212 |
+using the `remote-execution` option:
|
|
213 |
+ |
|
214 |
+.. code:: yaml
|
|
215 |
+ |
|
216 |
+ remote-execution:
|
|
217 |
+ |
|
218 |
+ # A url defining a remote execution server
|
|
219 |
+ url: http://buildserver.example.com:50051
|
|
220 |
+ |
|
221 |
+The url should contain a hostname and port separated by ':'. Only plain HTTP is
|
|
222 |
+currently suported (no HTTPS).
|
|
223 |
+ |
|
224 |
+The Remote Execution API can be found via https://github.com/bazelbuild/remote-apis.
|
|
207 | 225 |
|
208 | 226 |
.. _project_essentials_mirrors:
|
209 | 227 |
|
... | ... | @@ -161,13 +161,13 @@ Below are two examples of how to run the cache server as a systemd service, one |
161 | 161 |
|
162 | 162 |
[Service]
|
163 | 163 |
Environment="LC_ALL=C.UTF-8"
|
164 |
- ExecStart=/usr/local/bin/bst-artifact-server --port 11001 --server-key {{certs_path}}/privkey.pem --
|
|
165 |
- server-cert {{certs_path}}/fullchain.pem {{artifacts_path}}
|
|
164 |
+ ExecStart=/usr/local/bin/bst-artifact-server --port 11001 --server-key {{certs_path}}/server.key --server-cert {{certs_path}}/server.crt {{artifacts_path}}
|
|
166 | 165 |
User=artifacts
|
167 | 166 |
|
168 | 167 |
[Install]
|
169 | 168 |
WantedBy=multi-user.target
|
170 | 169 |
|
170 |
+.. code:: ini
|
|
171 | 171 |
|
172 | 172 |
#
|
173 | 173 |
# Pull/Push
|
... | ... | @@ -178,9 +178,7 @@ Below are two examples of how to run the cache server as a systemd service, one |
178 | 178 |
|
179 | 179 |
[Service]
|
180 | 180 |
Environment="LC_ALL=C.UTF-8"
|
181 |
- ExecStart=/usr/local/bin/bst-artifact-server --port 11002 --server-key {{certs_path}}/privkey.pem --
|
|
182 |
- server-cert {{certs_path}}/fullchain.pem --client-certs /home/artifacts/authorized.crt --enable-push /
|
|
183 |
- {{artifacts_path}}
|
|
181 |
+ ExecStart=/usr/local/bin/bst-artifact-server --port 11002 --server-key {{certs_path}}/server.key --server-cert {{certs_path}}/server.crt --client-certs {{certs_path}}/authorized.crt --enable-push {{artifacts_path}}
|
|
184 | 182 |
User=artifacts
|
185 | 183 |
|
186 | 184 |
[Install]
|
... | ... | @@ -188,11 +186,16 @@ Below are two examples of how to run the cache server as a systemd service, one |
188 | 186 |
|
189 | 187 |
Here we define when systemd should start the service, which is after the networking stack has been started, we then define how to run the cache with the desired configuration, under the artifacts user. The {{ }} are there to denote where you should change these files to point to your desired locations.
|
190 | 188 |
|
189 |
+For more information on systemd services see:
|
|
190 |
+`Creating Systemd Service Files <https://www.devdungeon.com/content/creating-systemd-service-files>`_.
|
|
191 |
+ |
|
191 | 192 |
User configuration
|
192 | 193 |
~~~~~~~~~~~~~~~~~~
|
193 | 194 |
The user configuration for artifacts is documented with the rest
|
194 | 195 |
of the :ref:`user configuration documentation <user_config>`.
|
195 | 196 |
|
197 |
+Note that for self-signed certificates, the public key fields are mandatory.
|
|
198 |
+ |
|
196 | 199 |
Assuming you have the same setup used in this document, and that your
|
197 | 200 |
host is reachable on the internet as ``artifacts.com`` (for example),
|
198 | 201 |
then a user can use the following user configuration:
|
1 |
+kind: compose
|
|
2 |
+ |
|
3 |
+depends:
|
|
4 |
+- filename: import-bin.bst
|
|
5 |
+ type: build
|
|
6 |
+- filename: import-dev.bst
|
|
7 |
+ type: build
|
|
8 |
+ |
|
9 |
+config:
|
|
10 |
+ # Dont try running the sandbox, we dont have a
|
|
11 |
+ # runtime to run anything in this context.
|
|
12 |
+ integrate: False
|
1 |
+kind: import
|
|
2 |
+sources:
|
|
3 |
+- kind: local
|
|
4 |
+ path: files/bin-files
|
1 |
+kind: import
|
|
2 |
+sources:
|
|
3 |
+- kind: local
|
|
4 |
+ path: files/dev-files
|
1 |
+kind: stack
|
|
2 |
+description: |
|
|
3 |
+ |
|
4 |
+ Main stack target for the bst build test
|
|
5 |
+ |
|
6 |
+depends:
|
|
7 |
+- import-bin.bst
|
|
8 |
+- import-dev.bst
|
|
9 |
+- compose-all.bst
|
1 |
+#!/bin/bash
|
|
2 |
+ |
|
3 |
+echo "Hello !"
|
1 |
+#ifndef __PONY_H__
|
|
2 |
+#define __PONY_H__
|
|
3 |
+ |
|
4 |
+#define PONY_BEGIN "Once upon a time, there was a pony."
|
|
5 |
+#define PONY_END "And they lived happily ever after, the end."
|
|
6 |
+ |
|
7 |
+#define MAKE_PONY(story) \
|
|
8 |
+ PONY_BEGIN \
|
|
9 |
+ story \
|
|
10 |
+ PONY_END
|
|
11 |
+ |
|
12 |
+#endif /* __PONY_H__ */
|
1 |
+# Project config for frontend build test
|
|
2 |
+name: test
|
|
3 |
+ |
|
4 |
+element-path: elements
|
1 |
+import hashlib
|
|
2 |
+import os
|
|
3 |
+import pytest
|
|
4 |
+ |
|
5 |
+from buildstream._artifactcache.artifactcache import ArtifactCacheSpec
|
|
6 |
+from buildstream._artifactcache.cascache import CASCache
|
|
7 |
+from buildstream._context import Context
|
|
8 |
+from buildstream._project import Project
|
|
9 |
+from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
|
10 |
+ |
|
11 |
+from tests.testutils import cli, create_artifact_share
|
|
12 |
+ |
|
13 |
+ |
|
14 |
+# Project directory
|
|
15 |
+DATA_DIR = os.path.join(
|
|
16 |
+ os.path.dirname(os.path.realpath(__file__)),
|
|
17 |
+ "project",
|
|
18 |
+)
|
|
19 |
+ |
|
20 |
+ |
|
21 |
+# Handle messages from the pipeline
|
|
22 |
+def message_handler(message, context):
|
|
23 |
+ pass
|
|
24 |
+ |
|
25 |
+ |
|
26 |
+def tree_maker(cas, tree, directory):
|
|
27 |
+ if tree.root.ByteSize() == 0:
|
|
28 |
+ tree.root.CopyFrom(directory)
|
|
29 |
+ |
|
30 |
+ for directory_node in directory.directories:
|
|
31 |
+ child_directory = tree.children.add()
|
|
32 |
+ |
|
33 |
+ with open(cas.objpath(directory_node.digest), 'rb') as f:
|
|
34 |
+ child_directory.ParseFromString(f.read())
|
|
35 |
+ |
|
36 |
+ tree_maker(cas, tree, child_directory)
|
|
37 |
+ |
|
38 |
+ |
|
39 |
+@pytest.mark.datafiles(DATA_DIR)
|
|
40 |
+def test_pull(cli, tmpdir, datafiles):
|
|
41 |
+ project_dir = str(datafiles)
|
|
42 |
+ |
|
43 |
+ # Set up an artifact cache.
|
|
44 |
+ with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
|
|
45 |
+ # Configure artifact share
|
|
46 |
+ cli.configure({
|
|
47 |
+ 'scheduler': {
|
|
48 |
+ 'pushers': 1
|
|
49 |
+ },
|
|
50 |
+ 'artifacts': {
|
|
51 |
+ 'url': share.repo,
|
|
52 |
+ 'push': True,
|
|
53 |
+ }
|
|
54 |
+ })
|
|
55 |
+ |
|
56 |
+ # First build the project with the artifact cache configured
|
|
57 |
+ result = cli.run(project=project_dir, args=['build', 'target.bst'])
|
|
58 |
+ result.assert_success()
|
|
59 |
+ |
|
60 |
+ # Assert that we are now cached locally
|
|
61 |
+ assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
|
|
62 |
+ # Assert that we shared/pushed the cached artifact
|
|
63 |
+ element_key = cli.get_element_key(project_dir, 'target.bst')
|
|
64 |
+ assert share.has_artifact('test', 'target.bst', element_key)
|
|
65 |
+ |
|
66 |
+ # Delete the artifact locally
|
|
67 |
+ cli.remove_artifact_from_cache(project_dir, 'target.bst')
|
|
68 |
+ |
|
69 |
+ # Assert that we are not cached locally anymore
|
|
70 |
+ assert cli.get_element_state(project_dir, 'target.bst') != 'cached'
|
|
71 |
+ |
|
72 |
+ # Fake minimal context
|
|
73 |
+ context = Context()
|
|
74 |
+ context.set_message_handler(message_handler)
|
|
75 |
+ context.sched_pushers = 1
|
|
76 |
+ context.artifactdir = os.path.join(tmpdir, 'cache', 'artifacts')
|
|
77 |
+ context.artifact_cache_specs = [ArtifactCacheSpec(url=share.repo,
|
|
78 |
+ push=True)]
|
|
79 |
+ |
|
80 |
+ # Load the project and CAS cache
|
|
81 |
+ project = Project(project_dir, context)
|
|
82 |
+ project.ensure_fully_loaded()
|
|
83 |
+ cas = CASCache(context)
|
|
84 |
+ |
|
85 |
+ # Assert that the element's artifact is **not** cached
|
|
86 |
+ element = project.load_elements(['target.bst'], cas)[0]
|
|
87 |
+ element_key = cli.get_element_key(project_dir, 'target.bst')
|
|
88 |
+ assert not cas.contains(element, element_key)
|
|
89 |
+ |
|
90 |
+ # Manually setup the CAS remote
|
|
91 |
+ cas.setup_remotes(use_config=True)
|
|
92 |
+ cas.initialize_remotes()
|
|
93 |
+ assert cas.has_push_remotes()
|
|
94 |
+ |
|
95 |
+ # Pull the artifact
|
|
96 |
+ pulled = cas.pull(element, element_key)
|
|
97 |
+ assert pulled is True
|
|
98 |
+ assert cas.contains(element, element_key)
|
|
99 |
+ |
|
100 |
+ # Finally, close the opened gRPC channels properly!
|
|
101 |
+ for remote in cas._remotes[project]:
|
|
102 |
+ if remote.channel:
|
|
103 |
+ remote.channel.close()
|
|
104 |
+ |
|
105 |
+ |
|
106 |
+@pytest.mark.datafiles(DATA_DIR)
|
|
107 |
+def test_pull_tree(cli, tmpdir, datafiles):
|
|
108 |
+ project_dir = str(datafiles)
|
|
109 |
+ |
|
110 |
+ # Set up an artifact cache.
|
|
111 |
+ with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
|
|
112 |
+ # Configure artifact share
|
|
113 |
+ cli.configure({
|
|
114 |
+ 'scheduler': {
|
|
115 |
+ 'pushers': 1
|
|
116 |
+ },
|
|
117 |
+ 'artifacts': {
|
|
118 |
+ 'url': share.repo,
|
|
119 |
+ 'push': True,
|
|
120 |
+ }
|
|
121 |
+ })
|
|
122 |
+ |
|
123 |
+ # First build the project with the artifact cache configured
|
|
124 |
+ result = cli.run(project=project_dir, args=['build', 'target.bst'])
|
|
125 |
+ result.assert_success()
|
|
126 |
+ |
|
127 |
+ # Assert that we are now cached locally
|
|
128 |
+ assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
|
|
129 |
+ # Assert that we shared/pushed the cached artifact
|
|
130 |
+ element_key = cli.get_element_key(project_dir, 'target.bst')
|
|
131 |
+ assert share.has_artifact('test', 'target.bst', element_key)
|
|
132 |
+ |
|
133 |
+ # Fake minimal context
|
|
134 |
+ context = Context()
|
|
135 |
+ context.set_message_handler(message_handler)
|
|
136 |
+ context.sched_pushers = 1
|
|
137 |
+ context.artifactdir = os.path.join(tmpdir, 'cache', 'artifacts')
|
|
138 |
+ context.artifact_cache_specs = [ArtifactCacheSpec(url=share.repo,
|
|
139 |
+ push=True)]
|
|
140 |
+ |
|
141 |
+ # Load the project and CAS cache
|
|
142 |
+ project = Project(project_dir, context)
|
|
143 |
+ project.ensure_fully_loaded()
|
|
144 |
+ cas = CASCache(context)
|
|
145 |
+ |
|
146 |
+ # Assert that the element's artifact is cached
|
|
147 |
+ element = project.load_elements(['target.bst'], cas)[0]
|
|
148 |
+ element_key = cli.get_element_key(project_dir, 'target.bst')
|
|
149 |
+ assert cas.contains(element, element_key)
|
|
150 |
+ |
|
151 |
+ # Manually setup the CAS remote
|
|
152 |
+ cas.setup_remotes(use_config=True)
|
|
153 |
+ cas.initialize_remotes()
|
|
154 |
+ assert cas.has_push_remotes(element=element)
|
|
155 |
+ |
|
156 |
+ # Retrieve the Directory object from the cached artifact
|
|
157 |
+ artifact_ref = cas.get_artifact_fullname(element, element_key)
|
|
158 |
+ artifact_digest = cas.resolve_ref(artifact_ref)
|
|
159 |
+ |
|
160 |
+ directory = remote_execution_pb2.Directory()
|
|
161 |
+ |
|
162 |
+ with open(cas.objpath(artifact_digest), 'rb') as f:
|
|
163 |
+ directory.ParseFromString(f.read())
|
|
164 |
+ |
|
165 |
+ # Build the Tree object while we are still cached
|
|
166 |
+ tree = remote_execution_pb2.Tree()
|
|
167 |
+ tree_maker(cas, tree, directory)
|
|
168 |
+ |
|
169 |
+ # Push the Tree as a regular message
|
|
170 |
+ tree_digest = cas.push_message(project, tree)
|
|
171 |
+ |
|
172 |
+ # Now delete the artifact locally
|
|
173 |
+ cli.remove_artifact_from_cache(project_dir, 'target.bst')
|
|
174 |
+ |
|
175 |
+ # Assert that we are not cached locally anymore
|
|
176 |
+ assert cli.get_element_state(project_dir, 'target.bst') != 'cached'
|
|
177 |
+ |
|
178 |
+ # Pull the artifact using the Tree object
|
|
179 |
+ directory_digest = cas.pull_tree(project, tree_digest)
|
|
180 |
+ assert directory_digest == artifact_digest
|
|
181 |
+ |
|
182 |
+ # Ensure the entire Tree stucture has been pulled
|
|
183 |
+ assert os.path.exists(cas.objpath(directory_digest))
|
|
184 |
+ for child_directory in tree.children:
|
|
185 |
+ child_blob = child_directory.SerializeToString()
|
|
186 |
+ |
|
187 |
+ child_digest = remote_execution_pb2.Digest()
|
|
188 |
+ child_digest.hash = hashlib.sha256(child_blob).hexdigest()
|
|
189 |
+ child_digest.size_bytes = len(child_blob)
|
|
190 |
+ |
|
191 |
+ assert os.path.exists(cas.objpath(child_digest))
|
|
192 |
+ |
|
193 |
+ # Finally, close the opened gRPC channels properly!
|
|
194 |
+ for remote in cas._remotes[project]:
|
|
195 |
+ if remote.channel:
|
|
196 |
+ remote.channel.close()
|
1 |
+import os
|
|
2 |
+import pytest
|
|
3 |
+ |
|
4 |
+from pluginbase import PluginBase
|
|
5 |
+from buildstream._artifactcache.artifactcache import ArtifactCacheSpec
|
|
6 |
+from buildstream._artifactcache.cascache import CASCache
|
|
7 |
+from buildstream._context import Context
|
|
8 |
+from buildstream._project import Project
|
|
9 |
+from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
|
10 |
+from buildstream.storage._casbaseddirectory import CasBasedDirectory
|
|
11 |
+ |
|
12 |
+from tests.testutils import cli, create_artifact_share
|
|
13 |
+ |
|
14 |
+ |
|
15 |
+# Project directory
|
|
16 |
+DATA_DIR = os.path.join(
|
|
17 |
+ os.path.dirname(os.path.realpath(__file__)),
|
|
18 |
+ "project",
|
|
19 |
+)
|
|
20 |
+ |
|
21 |
+ |
|
22 |
+# Handle messages from the pipeline
|
|
23 |
+def message_handler(message, context):
|
|
24 |
+ pass
|
|
25 |
+ |
|
26 |
+ |
|
27 |
+@pytest.mark.datafiles(DATA_DIR)
|
|
28 |
+def test_push(cli, tmpdir, datafiles):
|
|
29 |
+ project_dir = str(datafiles)
|
|
30 |
+ |
|
31 |
+ # First build the project without the artifact cache configured
|
|
32 |
+ result = cli.run(project=project_dir, args=['build', 'target.bst'])
|
|
33 |
+ result.assert_success()
|
|
34 |
+ |
|
35 |
+ # Assert that we are now cached locally
|
|
36 |
+ assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
|
|
37 |
+ |
|
38 |
+ # Set up an artifact cache.
|
|
39 |
+ with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
|
|
40 |
+ # Fake minimal context
|
|
41 |
+ context = Context()
|
|
42 |
+ context.set_message_handler(message_handler)
|
|
43 |
+ context.sched_pushers = 1
|
|
44 |
+ context.artifactdir = os.path.join(tmpdir, 'cache', 'artifacts')
|
|
45 |
+ context.artifact_cache_specs = [ArtifactCacheSpec(url=share.repo,
|
|
46 |
+ push=True)]
|
|
47 |
+ |
|
48 |
+ # Load the project and CAS cache
|
|
49 |
+ project = Project(project_dir, context)
|
|
50 |
+ project.ensure_fully_loaded()
|
|
51 |
+ cas = CASCache(context)
|
|
52 |
+ |
|
53 |
+ # Assert that the element's artifact is cached
|
|
54 |
+ element = project.load_elements(['target.bst'], cas)[0]
|
|
55 |
+ element_key = cli.get_element_key(project_dir, 'target.bst')
|
|
56 |
+ assert cas.contains(element, element_key)
|
|
57 |
+ |
|
58 |
+ # Manually setup the CAS remote
|
|
59 |
+ cas.setup_remotes(use_config=True)
|
|
60 |
+ cas.initialize_remotes()
|
|
61 |
+ assert cas.has_push_remotes(element=element)
|
|
62 |
+ |
|
63 |
+ # Push the element's artifact
|
|
64 |
+ pushed = cas.push(element, [element_key])
|
|
65 |
+ assert pushed is True
|
|
66 |
+ assert share.has_artifact('test', 'target.bst', element_key)
|
|
67 |
+ |
|
68 |
+ # Finally, close the opened gRPC channels properly!
|
|
69 |
+ for remote in cas._remotes[project]:
|
|
70 |
+ if remote.channel:
|
|
71 |
+ remote.channel.close()
|
|
72 |
+ |
|
73 |
+ |
|
74 |
+@pytest.mark.datafiles(DATA_DIR)
|
|
75 |
+def test_push_directory(cli, tmpdir, datafiles):
|
|
76 |
+ project_dir = str(datafiles)
|
|
77 |
+ |
|
78 |
+ # First build the project without the artifact cache configured
|
|
79 |
+ result = cli.run(project=project_dir, args=['build', 'target.bst'])
|
|
80 |
+ result.assert_success()
|
|
81 |
+ |
|
82 |
+ # Assert that we are now cached locally
|
|
83 |
+ assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
|
|
84 |
+ |
|
85 |
+ # Set up an artifact cache.
|
|
86 |
+ with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
|
|
87 |
+ # Fake minimal context
|
|
88 |
+ context = Context()
|
|
89 |
+ context.set_message_handler(message_handler)
|
|
90 |
+ context.sched_pushers = 1
|
|
91 |
+ context.artifactdir = os.path.join(tmpdir, 'cache', 'artifacts')
|
|
92 |
+ context.artifact_cache_specs = [ArtifactCacheSpec(url=share.repo,
|
|
93 |
+ push=True)]
|
|
94 |
+ |
|
95 |
+ # Load the project and CAS cache
|
|
96 |
+ project = Project(project_dir, context)
|
|
97 |
+ project.ensure_fully_loaded()
|
|
98 |
+ cas = CASCache(context)
|
|
99 |
+ |
|
100 |
+ # Assert that the element's artifact is cached
|
|
101 |
+ element = project.load_elements(['target.bst'], cas)[0]
|
|
102 |
+ element_key = cli.get_element_key(project_dir, 'target.bst')
|
|
103 |
+ assert cas.contains(element, element_key)
|
|
104 |
+ |
|
105 |
+ # Manually setup the CAS remote
|
|
106 |
+ cas.setup_remotes(use_config=True)
|
|
107 |
+ cas.initialize_remotes()
|
|
108 |
+ assert cas.has_push_remotes(element=element)
|
|
109 |
+ |
|
110 |
+ # Recreate the CasBasedDirectory object from the cached artifact
|
|
111 |
+ artifact_ref = cas.get_artifact_fullname(element, element_key)
|
|
112 |
+ artifact_digest = cas.resolve_ref(artifact_ref)
|
|
113 |
+ |
|
114 |
+ directory = CasBasedDirectory(context, ref=artifact_digest)
|
|
115 |
+ |
|
116 |
+ # Push the CasBasedDirectory object
|
|
117 |
+ directory_digest = cas.push_directory(project, directory)
|
|
118 |
+ assert directory_digest == artifact_digest
|
|
119 |
+ assert share.has_object(directory_digest)
|
|
120 |
+ |
|
121 |
+ # Finally, close the opened gRPC channels properly!
|
|
122 |
+ for remote in cas._remotes[project]:
|
|
123 |
+ if remote.channel:
|
|
124 |
+ remote.channel.close()
|
|
125 |
+ |
|
126 |
+ |
|
127 |
+@pytest.mark.datafiles(DATA_DIR)
|
|
128 |
+def test_push_message(cli, tmpdir, datafiles):
|
|
129 |
+ project_dir = str(datafiles)
|
|
130 |
+ |
|
131 |
+ # Set up an artifact cache.
|
|
132 |
+ with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
|
|
133 |
+ # Fake minimal context
|
|
134 |
+ context = Context()
|
|
135 |
+ context.set_message_handler(message_handler)
|
|
136 |
+ context.sched_pushers = 1
|
|
137 |
+ context.artifactdir = os.path.join(tmpdir, 'cache', 'artifacts')
|
|
138 |
+ context.artifact_cache_specs = [ArtifactCacheSpec(url=share.repo,
|
|
139 |
+ push=True)]
|
|
140 |
+ |
|
141 |
+ # Load the project and CAS cache
|
|
142 |
+ project = Project(project_dir, context)
|
|
143 |
+ project.ensure_fully_loaded()
|
|
144 |
+ cas = CASCache(context)
|
|
145 |
+ |
|
146 |
+ # Manually setup the CAS remote
|
|
147 |
+ cas.setup_remotes(use_config=True)
|
|
148 |
+ cas.initialize_remotes()
|
|
149 |
+ assert cas.has_push_remotes()
|
|
150 |
+ |
|
151 |
+ # Create an example message object
|
|
152 |
+ command = remote_execution_pb2.Command(arguments=['/usr/bin/gcc', '--help'],
|
|
153 |
+ working_directory='/buildstream-build',
|
|
154 |
+ output_directories=['/buildstream-install'])
|
|
155 |
+ |
|
156 |
+ # Push the message object
|
|
157 |
+ digest = cas.push_message(project, command)
|
|
158 |
+ assert digest
|
|
159 |
+ assert share.has_object(digest)
|
|
160 |
+ |
|
161 |
+ # Finally, close the opened gRPC channels properly!
|
|
162 |
+ for remote in cas._remotes[project]:
|
|
163 |
+ if remote.channel:
|
|
164 |
+ remote.channel.close()
|
... | ... | @@ -15,6 +15,7 @@ from buildstream._artifactcache.cascache import CASCache |
15 | 15 |
from buildstream._artifactcache.casserver import create_server
|
16 | 16 |
from buildstream._context import Context
|
17 | 17 |
from buildstream._exceptions import ArtifactError
|
18 |
+from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
|
18 | 19 |
|
19 | 20 |
|
20 | 21 |
# ArtifactShare()
|
... | ... | @@ -87,6 +88,23 @@ class ArtifactShare(): |
87 | 88 |
# Sleep until termination by signal
|
88 | 89 |
signal.pause()
|
89 | 90 |
|
91 |
+ # has_object():
|
|
92 |
+ #
|
|
93 |
+ # Checks whether the object is present in the share
|
|
94 |
+ #
|
|
95 |
+ # Args:
|
|
96 |
+ # digest (str): The object's digest
|
|
97 |
+ #
|
|
98 |
+ # Returns:
|
|
99 |
+ # (bool): True if the object exists in the share, otherwise false.
|
|
100 |
+ def has_object(self, digest):
|
|
101 |
+ |
|
102 |
+ assert isinstance(digest, remote_execution_pb2.Digest)
|
|
103 |
+ |
|
104 |
+ object_path = self.cas.objpath(digest)
|
|
105 |
+ |
|
106 |
+ return os.path.exists(object_path)
|
|
107 |
+ |
|
90 | 108 |
# has_artifact():
|
91 | 109 |
#
|
92 | 110 |
# Checks whether the artifact is present in the share
|