Chandan Singh pushed to branch chandan/fix-checkout-none-1.2 at BuildStream / buildstream
Commits:
-
092038d2
by Tiago Gomes at 2018-09-26T08:16:33Z
-
075ffefa
by Valentin David at 2018-09-28T09:23:19Z
-
621bd236
by Tiago Gomes at 2018-09-28T09:52:15Z
-
34e81ae1
by Tiago Gomes at 2018-09-30T06:33:34Z
-
b842658c
by Tiago Gomes at 2018-09-30T06:33:46Z
-
e3ff069e
by Tiago Gomes at 2018-09-30T06:33:49Z
-
9bca9183
by Tiago Gomes at 2018-09-30T06:34:21Z
-
2d025076
by Jürg Billeter at 2018-09-30T06:39:57Z
-
01831afe
by Jürg Billeter at 2018-09-30T06:41:13Z
-
1b7245da
by Jürg Billeter at 2018-09-30T06:41:18Z
-
fd46a9f9
by Jürg Billeter at 2018-09-30T06:41:25Z
-
764b7517
by Jürg Billeter at 2018-10-01T15:42:08Z
-
a009dcbe
by Tristan Van Berkom at 2018-10-02T11:51:19Z
-
7da5104b
by Tristan Van Berkom at 2018-10-02T13:18:54Z
-
6e820362
by Tristan Van Berkom at 2018-10-03T07:35:03Z
-
9568824f
by Jim MacArthur at 2018-10-03T07:35:51Z
-
4a67e4e3
by Jürg Billeter at 2018-10-03T07:35:51Z
-
f585b233
by Jürg Billeter at 2018-10-03T07:35:51Z
-
244e3c7c
by Tristan Van Berkom at 2018-10-03T08:05:47Z
-
3f4587ab
by Tristan Van Berkom at 2018-10-03T09:36:34Z
-
a33fd160
by Tristan Van Berkom at 2018-10-03T10:00:50Z
-
e80f435a
by Tristan Van Berkom at 2018-10-03T12:06:46Z
-
013a8ad4
by Tristan Van Berkom at 2018-10-03T12:37:24Z
-
262e789f
by Tristan Van Berkom at 2018-10-03T13:03:52Z
-
10d69988
by Tristan Van Berkom at 2018-10-03T13:03:56Z
-
c02a1ae8
by Tristan Van Berkom at 2018-10-03T13:04:03Z
-
eb92e8e9
by Tristan Van Berkom at 2018-10-03T13:44:31Z
-
26d48cc9
by Valentin David at 2018-10-04T14:55:13Z
-
96f09d48
by Valentin David at 2018-10-04T15:16:46Z
-
10abe77f
by Tristan Van Berkom at 2018-10-05T07:01:57Z
-
ed4a2641
by Chandan Singh at 2018-10-17T00:28:25Z
18 changed files:
- .gitlab-ci.yml
- NEWS
- buildstream/_artifactcache/artifactcache.py
- buildstream/_artifactcache/cascache.py
- buildstream/_artifactcache/casserver.py
- buildstream/_scheduler/jobs/job.py
- buildstream/_scheduler/queues/buildqueue.py
- buildstream/_scheduler/queues/queue.py
- buildstream/_scheduler/scheduler.py
- buildstream/_yaml.py
- buildstream/element.py
- buildstream/plugins/sources/git.py
- buildstream/source.py
- buildstream/utils.py
- tests/format/project.py
- tests/frontend/buildcheckout.py
- + tests/frontend/project/elements/checkout-deps.bst
- + tests/frontend/project/files/etc-files/etc/buildstream/config
Changes:
... | ... | @@ -168,7 +168,7 @@ docs: |
168 | 168 |
variables:
|
169 | 169 |
bst_ext_url: git+https://gitlab.com/BuildStream/bst-external.git
|
170 | 170 |
bst_ext_ref: 1d6ab71151b93c8cbc0a91a36ffe9270f3b835f1 # 0.5.1
|
171 |
- fd_sdk_ref: 718ea88089644a1ea5b488de0b90c2c565cb75f8 # 18.08.12
|
|
171 |
+ fd_sdk_ref: 88d7c22c2281b987faa02edd57df80d430eecf1f # 18.08.12
|
|
172 | 172 |
before_script:
|
173 | 173 |
- (cd dist && ./unpack.sh && cd buildstream && pip3 install .)
|
174 | 174 |
- pip3 install --user -e ${bst_ext_url}@${bst_ext_ref}#egg=bst_ext
|
1 |
+=================
|
|
2 |
+buildstream 1.2.3
|
|
3 |
+=================
|
|
4 |
+ |
|
5 |
+ o Fixed an unhandled exception when cleaning up a build sandbox (#153)
|
|
6 |
+ |
|
7 |
+ o Fixed race condition when calculating cache size and commiting artifacts
|
|
8 |
+ |
|
9 |
+ o Fixed regression where terminating with `^C` results in a double user interrogation (#693)
|
|
10 |
+ |
|
11 |
+ o Fixed regression in summary when builds are terminated (#479)
|
|
12 |
+ |
|
13 |
+ o Fixed regression where irrelevant status messages appear from git sources
|
|
14 |
+ |
|
15 |
+ o Improve performance of artifact uploads by batching file transfers (#676/#677)
|
|
16 |
+ |
|
17 |
+ o Fixed performance of artifact downloads by batching file transfers (#554)
|
|
18 |
+ |
|
19 |
+ o Fixed checks for paths which escape the project directory (#673)
|
|
20 |
+ |
|
1 | 21 |
=================
|
2 | 22 |
buildstream 1.2.2
|
3 | 23 |
=================
|
... | ... | @@ -277,7 +277,7 @@ class ArtifactCache(): |
277 | 277 |
"Please increase the cache-quota in {}."
|
278 | 278 |
.format(self.context.config_origin or default_conf))
|
279 | 279 |
|
280 |
- if self.get_quota_exceeded():
|
|
280 |
+ if self.has_quota_exceeded():
|
|
281 | 281 |
raise ArtifactError("Cache too full. Aborting.",
|
282 | 282 |
detail=detail,
|
283 | 283 |
reason="cache-too-full")
|
... | ... | @@ -364,14 +364,14 @@ class ArtifactCache(): |
364 | 364 |
self._cache_size = cache_size
|
365 | 365 |
self._write_cache_size(self._cache_size)
|
366 | 366 |
|
367 |
- # get_quota_exceeded()
|
|
367 |
+ # has_quota_exceeded()
|
|
368 | 368 |
#
|
369 | 369 |
# Checks if the current artifact cache size exceeds the quota.
|
370 | 370 |
#
|
371 | 371 |
# Returns:
|
372 | 372 |
# (bool): True of the quota is exceeded
|
373 | 373 |
#
|
374 |
- def get_quota_exceeded(self):
|
|
374 |
+ def has_quota_exceeded(self):
|
|
375 | 375 |
return self.get_cache_size() > self._cache_quota
|
376 | 376 |
|
377 | 377 |
################################################
|
... | ... | @@ -43,6 +43,11 @@ from .._exceptions import ArtifactError |
43 | 43 |
from . import ArtifactCache
|
44 | 44 |
|
45 | 45 |
|
46 |
+# The default limit for gRPC messages is 4 MiB.
|
|
47 |
+# Limit payload to 1 MiB to leave sufficient headroom for metadata.
|
|
48 |
+_MAX_PAYLOAD_BYTES = 1024 * 1024
|
|
49 |
+ |
|
50 |
+ |
|
46 | 51 |
# A CASCache manages artifacts in a CAS repository as specified in the
|
47 | 52 |
# Remote Execution API.
|
48 | 53 |
#
|
... | ... | @@ -76,6 +81,7 @@ class CASCache(ArtifactCache): |
76 | 81 |
################################################
|
77 | 82 |
# Implementation of abstract methods #
|
78 | 83 |
################################################
|
84 |
+ |
|
79 | 85 |
def contains(self, element, key):
|
80 | 86 |
refpath = self._refpath(self.get_artifact_fullname(element, key))
|
81 | 87 |
|
... | ... | @@ -115,7 +121,7 @@ class CASCache(ArtifactCache): |
115 | 121 |
def commit(self, element, content, keys):
|
116 | 122 |
refs = [self.get_artifact_fullname(element, key) for key in keys]
|
117 | 123 |
|
118 |
- tree = self._create_tree(content)
|
|
124 |
+ tree = self._commit_directory(content)
|
|
119 | 125 |
|
120 | 126 |
for ref in refs:
|
121 | 127 |
self.set_ref(ref, tree)
|
... | ... | @@ -151,6 +157,7 @@ class CASCache(ArtifactCache): |
151 | 157 |
q = multiprocessing.Queue()
|
152 | 158 |
for remote_spec in remote_specs:
|
153 | 159 |
# Use subprocess to avoid creation of gRPC threads in main BuildStream process
|
160 |
+ # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
|
|
154 | 161 |
p = multiprocessing.Process(target=self._initialize_remote, args=(remote_spec, q))
|
155 | 162 |
|
156 | 163 |
try:
|
... | ... | @@ -263,109 +270,69 @@ class CASCache(ArtifactCache): |
263 | 270 |
|
264 | 271 |
self.set_ref(newref, tree)
|
265 | 272 |
|
273 |
+ def _push_refs_to_remote(self, refs, remote):
|
|
274 |
+ skipped_remote = True
|
|
275 |
+ try:
|
|
276 |
+ for ref in refs:
|
|
277 |
+ tree = self.resolve_ref(ref)
|
|
278 |
+ |
|
279 |
+ # Check whether ref is already on the server in which case
|
|
280 |
+ # there is no need to push the artifact
|
|
281 |
+ try:
|
|
282 |
+ request = buildstream_pb2.GetReferenceRequest()
|
|
283 |
+ request.key = ref
|
|
284 |
+ response = remote.ref_storage.GetReference(request)
|
|
285 |
+ |
|
286 |
+ if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
|
|
287 |
+ # ref is already on the server with the same tree
|
|
288 |
+ continue
|
|
289 |
+ |
|
290 |
+ except grpc.RpcError as e:
|
|
291 |
+ if e.code() != grpc.StatusCode.NOT_FOUND:
|
|
292 |
+ # Intentionally re-raise RpcError for outer except block.
|
|
293 |
+ raise
|
|
294 |
+ |
|
295 |
+ self._send_directory(remote, tree)
|
|
296 |
+ |
|
297 |
+ request = buildstream_pb2.UpdateReferenceRequest()
|
|
298 |
+ request.keys.append(ref)
|
|
299 |
+ request.digest.hash = tree.hash
|
|
300 |
+ request.digest.size_bytes = tree.size_bytes
|
|
301 |
+ remote.ref_storage.UpdateReference(request)
|
|
302 |
+ |
|
303 |
+ skipped_remote = False
|
|
304 |
+ except grpc.RpcError as e:
|
|
305 |
+ if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
|
|
306 |
+ raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
|
|
307 |
+ |
|
308 |
+ return not skipped_remote
|
|
309 |
+ |
|
266 | 310 |
def push(self, element, keys):
|
267 |
- refs = [self.get_artifact_fullname(element, key) for key in keys]
|
|
311 |
+ |
|
312 |
+ refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
|
|
268 | 313 |
|
269 | 314 |
project = element._get_project()
|
270 | 315 |
|
271 | 316 |
push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
272 | 317 |
|
273 | 318 |
pushed = False
|
274 |
- display_key = element._get_brief_display_key()
|
|
319 |
+ |
|
275 | 320 |
for remote in push_remotes:
|
276 | 321 |
remote.init()
|
277 |
- skipped_remote = True
|
|
322 |
+ display_key = element._get_brief_display_key()
|
|
278 | 323 |
element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
|
279 | 324 |
|
280 |
- try:
|
|
281 |
- for ref in refs:
|
|
282 |
- tree = self.resolve_ref(ref)
|
|
283 |
- |
|
284 |
- # Check whether ref is already on the server in which case
|
|
285 |
- # there is no need to push the artifact
|
|
286 |
- try:
|
|
287 |
- request = buildstream_pb2.GetReferenceRequest()
|
|
288 |
- request.key = ref
|
|
289 |
- response = remote.ref_storage.GetReference(request)
|
|
290 |
- |
|
291 |
- if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
|
|
292 |
- # ref is already on the server with the same tree
|
|
293 |
- continue
|
|
294 |
- |
|
295 |
- except grpc.RpcError as e:
|
|
296 |
- if e.code() != grpc.StatusCode.NOT_FOUND:
|
|
297 |
- # Intentionally re-raise RpcError for outer except block.
|
|
298 |
- raise
|
|
299 |
- |
|
300 |
- missing_blobs = {}
|
|
301 |
- required_blobs = self._required_blobs(tree)
|
|
302 |
- |
|
303 |
- # Limit size of FindMissingBlobs request
|
|
304 |
- for required_blobs_group in _grouper(required_blobs, 512):
|
|
305 |
- request = remote_execution_pb2.FindMissingBlobsRequest()
|
|
306 |
- |
|
307 |
- for required_digest in required_blobs_group:
|
|
308 |
- d = request.blob_digests.add()
|
|
309 |
- d.hash = required_digest.hash
|
|
310 |
- d.size_bytes = required_digest.size_bytes
|
|
311 |
- |
|
312 |
- response = remote.cas.FindMissingBlobs(request)
|
|
313 |
- for digest in response.missing_blob_digests:
|
|
314 |
- d = remote_execution_pb2.Digest()
|
|
315 |
- d.hash = digest.hash
|
|
316 |
- d.size_bytes = digest.size_bytes
|
|
317 |
- missing_blobs[d.hash] = d
|
|
318 |
- |
|
319 |
- # Upload any blobs missing on the server
|
|
320 |
- skipped_remote = False
|
|
321 |
- for digest in missing_blobs.values():
|
|
322 |
- uuid_ = uuid.uuid4()
|
|
323 |
- resource_name = '/'.join(['uploads', str(uuid_), 'blobs',
|
|
324 |
- digest.hash, str(digest.size_bytes)])
|
|
325 |
- |
|
326 |
- def request_stream(resname):
|
|
327 |
- with open(self.objpath(digest), 'rb') as f:
|
|
328 |
- assert os.fstat(f.fileno()).st_size == digest.size_bytes
|
|
329 |
- offset = 0
|
|
330 |
- finished = False
|
|
331 |
- remaining = digest.size_bytes
|
|
332 |
- while not finished:
|
|
333 |
- chunk_size = min(remaining, 64 * 1024)
|
|
334 |
- remaining -= chunk_size
|
|
335 |
- |
|
336 |
- request = bytestream_pb2.WriteRequest()
|
|
337 |
- request.write_offset = offset
|
|
338 |
- # max. 64 kB chunks
|
|
339 |
- request.data = f.read(chunk_size)
|
|
340 |
- request.resource_name = resname
|
|
341 |
- request.finish_write = remaining <= 0
|
|
342 |
- yield request
|
|
343 |
- offset += chunk_size
|
|
344 |
- finished = request.finish_write
|
|
345 |
- response = remote.bytestream.Write(request_stream(resource_name))
|
|
346 |
- |
|
347 |
- request = buildstream_pb2.UpdateReferenceRequest()
|
|
348 |
- request.keys.append(ref)
|
|
349 |
- request.digest.hash = tree.hash
|
|
350 |
- request.digest.size_bytes = tree.size_bytes
|
|
351 |
- remote.ref_storage.UpdateReference(request)
|
|
352 |
- |
|
353 |
- pushed = True
|
|
354 |
- |
|
355 |
- if not skipped_remote:
|
|
356 |
- element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
|
|
357 |
- |
|
358 |
- except grpc.RpcError as e:
|
|
359 |
- if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
|
|
360 |
- raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
|
|
361 |
- |
|
362 |
- if skipped_remote:
|
|
325 |
+ if self._push_refs_to_remote(refs, remote):
|
|
326 |
+ element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
|
|
327 |
+ pushed = True
|
|
328 |
+ else:
|
|
363 | 329 |
self.context.message(Message(
|
364 | 330 |
None,
|
365 | 331 |
MessageType.INFO,
|
366 | 332 |
"Remote ({}) already has {} cached".format(
|
367 | 333 |
remote.spec.url, element._get_brief_display_key())
|
368 | 334 |
))
|
335 |
+ |
|
369 | 336 |
return pushed
|
370 | 337 |
|
371 | 338 |
################################################
|
... | ... | @@ -451,7 +418,7 @@ class CASCache(ArtifactCache): |
451 | 418 |
def set_ref(self, ref, tree):
|
452 | 419 |
refpath = self._refpath(ref)
|
453 | 420 |
os.makedirs(os.path.dirname(refpath), exist_ok=True)
|
454 |
- with utils.save_file_atomic(refpath, 'wb') as f:
|
|
421 |
+ with utils.save_file_atomic(refpath, 'wb', tempdir=self.tmpdir) as f:
|
|
455 | 422 |
f.write(tree.SerializeToString())
|
456 | 423 |
|
457 | 424 |
# resolve_ref():
|
... | ... | @@ -594,6 +561,7 @@ class CASCache(ArtifactCache): |
594 | 561 |
################################################
|
595 | 562 |
# Local Private Methods #
|
596 | 563 |
################################################
|
564 |
+ |
|
597 | 565 |
def _checkout(self, dest, tree):
|
598 | 566 |
os.makedirs(dest, exist_ok=True)
|
599 | 567 |
|
... | ... | @@ -623,7 +591,21 @@ class CASCache(ArtifactCache): |
623 | 591 |
def _refpath(self, ref):
|
624 | 592 |
return os.path.join(self.casdir, 'refs', 'heads', ref)
|
625 | 593 |
|
626 |
- def _create_tree(self, path, *, digest=None):
|
|
594 |
+ # _commit_directory():
|
|
595 |
+ #
|
|
596 |
+ # Adds local directory to content addressable store.
|
|
597 |
+ #
|
|
598 |
+ # Adds files, symbolic links and recursively other directories in
|
|
599 |
+ # a local directory to the content addressable store.
|
|
600 |
+ #
|
|
601 |
+ # Args:
|
|
602 |
+ # path (str): Path to the directory to add.
|
|
603 |
+ # dir_digest (Digest): An optional Digest object to use.
|
|
604 |
+ #
|
|
605 |
+ # Returns:
|
|
606 |
+ # (Digest): Digest object for the directory added.
|
|
607 |
+ #
|
|
608 |
+ def _commit_directory(self, path, *, dir_digest=None):
|
|
627 | 609 |
directory = remote_execution_pb2.Directory()
|
628 | 610 |
|
629 | 611 |
for name in sorted(os.listdir(path)):
|
... | ... | @@ -632,7 +614,7 @@ class CASCache(ArtifactCache): |
632 | 614 |
if stat.S_ISDIR(mode):
|
633 | 615 |
dirnode = directory.directories.add()
|
634 | 616 |
dirnode.name = name
|
635 |
- self._create_tree(full_path, digest=dirnode.digest)
|
|
617 |
+ self._commit_directory(full_path, dir_digest=dirnode.digest)
|
|
636 | 618 |
elif stat.S_ISREG(mode):
|
637 | 619 |
filenode = directory.files.add()
|
638 | 620 |
filenode.name = name
|
... | ... | @@ -645,7 +627,8 @@ class CASCache(ArtifactCache): |
645 | 627 |
else:
|
646 | 628 |
raise ArtifactError("Unsupported file type for {}".format(full_path))
|
647 | 629 |
|
648 |
- return self.add_object(digest=digest, buffer=directory.SerializeToString())
|
|
630 |
+ return self.add_object(digest=dir_digest,
|
|
631 |
+ buffer=directory.SerializeToString())
|
|
649 | 632 |
|
650 | 633 |
def _get_subdir(self, tree, subdir):
|
651 | 634 |
head, name = os.path.split(subdir)
|
... | ... | @@ -756,16 +739,16 @@ class CASCache(ArtifactCache): |
756 | 739 |
#
|
757 | 740 |
q.put(str(e))
|
758 | 741 |
|
759 |
- def _required_blobs(self, tree):
|
|
742 |
+ def _required_blobs(self, directory_digest):
|
|
760 | 743 |
# parse directory, and recursively add blobs
|
761 | 744 |
d = remote_execution_pb2.Digest()
|
762 |
- d.hash = tree.hash
|
|
763 |
- d.size_bytes = tree.size_bytes
|
|
745 |
+ d.hash = directory_digest.hash
|
|
746 |
+ d.size_bytes = directory_digest.size_bytes
|
|
764 | 747 |
yield d
|
765 | 748 |
|
766 | 749 |
directory = remote_execution_pb2.Directory()
|
767 | 750 |
|
768 |
- with open(self.objpath(tree), 'rb') as f:
|
|
751 |
+ with open(self.objpath(directory_digest), 'rb') as f:
|
|
769 | 752 |
directory.ParseFromString(f.read())
|
770 | 753 |
|
771 | 754 |
for filenode in directory.files:
|
... | ... | @@ -777,50 +760,203 @@ class CASCache(ArtifactCache): |
777 | 760 |
for dirnode in directory.directories:
|
778 | 761 |
yield from self._required_blobs(dirnode.digest)
|
779 | 762 |
|
780 |
- def _fetch_blob(self, remote, digest, out):
|
|
763 |
+ def _fetch_blob(self, remote, digest, stream):
|
|
781 | 764 |
resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
|
782 | 765 |
request = bytestream_pb2.ReadRequest()
|
783 | 766 |
request.resource_name = resource_name
|
784 | 767 |
request.read_offset = 0
|
785 | 768 |
for response in remote.bytestream.Read(request):
|
786 |
- out.write(response.data)
|
|
769 |
+ stream.write(response.data)
|
|
770 |
+ stream.flush()
|
|
787 | 771 |
|
788 |
- out.flush()
|
|
789 |
- assert digest.size_bytes == os.fstat(out.fileno()).st_size
|
|
772 |
+ assert digest.size_bytes == os.fstat(stream.fileno()).st_size
|
|
790 | 773 |
|
791 |
- def _fetch_directory(self, remote, tree):
|
|
792 |
- objpath = self.objpath(tree)
|
|
774 |
+ # _ensure_blob():
|
|
775 |
+ #
|
|
776 |
+ # Fetch and add blob if it's not already local.
|
|
777 |
+ #
|
|
778 |
+ # Args:
|
|
779 |
+ # remote (Remote): The remote to use.
|
|
780 |
+ # digest (Digest): Digest object for the blob to fetch.
|
|
781 |
+ #
|
|
782 |
+ # Returns:
|
|
783 |
+ # (str): The path of the object
|
|
784 |
+ #
|
|
785 |
+ def _ensure_blob(self, remote, digest):
|
|
786 |
+ objpath = self.objpath(digest)
|
|
793 | 787 |
if os.path.exists(objpath):
|
794 |
- # already in local cache
|
|
795 |
- return
|
|
788 |
+ # already in local repository
|
|
789 |
+ return objpath
|
|
796 | 790 |
|
797 |
- with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
|
|
798 |
- self._fetch_blob(remote, tree, out)
|
|
791 |
+ with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
|
|
792 |
+ self._fetch_blob(remote, digest, f)
|
|
799 | 793 |
|
800 |
- directory = remote_execution_pb2.Directory()
|
|
794 |
+ added_digest = self.add_object(path=f.name)
|
|
795 |
+ assert added_digest.hash == digest.hash
|
|
801 | 796 |
|
802 |
- with open(out.name, 'rb') as f:
|
|
803 |
- directory.ParseFromString(f.read())
|
|
797 |
+ return objpath
|
|
804 | 798 |
|
805 |
- for filenode in directory.files:
|
|
806 |
- fileobjpath = self.objpath(tree)
|
|
807 |
- if os.path.exists(fileobjpath):
|
|
808 |
- # already in local cache
|
|
809 |
- continue
|
|
799 |
+ def _batch_download_complete(self, batch):
|
|
800 |
+ for digest, data in batch.send():
|
|
801 |
+ with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
|
|
802 |
+ f.write(data)
|
|
803 |
+ f.flush()
|
|
804 |
+ |
|
805 |
+ added_digest = self.add_object(path=f.name)
|
|
806 |
+ assert added_digest.hash == digest.hash
|
|
807 |
+ |
|
808 |
+ # Helper function for _fetch_directory().
|
|
809 |
+ def _fetch_directory_batch(self, remote, batch, fetch_queue, fetch_next_queue):
|
|
810 |
+ self._batch_download_complete(batch)
|
|
811 |
+ |
|
812 |
+ # All previously scheduled directories are now locally available,
|
|
813 |
+ # move them to the processing queue.
|
|
814 |
+ fetch_queue.extend(fetch_next_queue)
|
|
815 |
+ fetch_next_queue.clear()
|
|
816 |
+ return _CASBatchRead(remote)
|
|
817 |
+ |
|
818 |
+ # Helper function for _fetch_directory().
|
|
819 |
+ def _fetch_directory_node(self, remote, digest, batch, fetch_queue, fetch_next_queue, *, recursive=False):
|
|
820 |
+ in_local_cache = os.path.exists(self.objpath(digest))
|
|
810 | 821 |
|
811 |
- with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
|
|
812 |
- self._fetch_blob(remote, filenode.digest, f)
|
|
822 |
+ if in_local_cache:
|
|
823 |
+ # Skip download, already in local cache.
|
|
824 |
+ pass
|
|
825 |
+ elif (digest.size_bytes >= remote.max_batch_total_size_bytes or
|
|
826 |
+ not remote.batch_read_supported):
|
|
827 |
+ # Too large for batch request, download in independent request.
|
|
828 |
+ self._ensure_blob(remote, digest)
|
|
829 |
+ in_local_cache = True
|
|
830 |
+ else:
|
|
831 |
+ if not batch.add(digest):
|
|
832 |
+ # Not enough space left in batch request.
|
|
833 |
+ # Complete pending batch first.
|
|
834 |
+ batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
|
|
835 |
+ batch.add(digest)
|
|
836 |
+ |
|
837 |
+ if recursive:
|
|
838 |
+ if in_local_cache:
|
|
839 |
+ # Add directory to processing queue.
|
|
840 |
+ fetch_queue.append(digest)
|
|
841 |
+ else:
|
|
842 |
+ # Directory will be available after completing pending batch.
|
|
843 |
+ # Add directory to deferred processing queue.
|
|
844 |
+ fetch_next_queue.append(digest)
|
|
845 |
+ |
|
846 |
+ return batch
|
|
847 |
+ |
|
848 |
+ # _fetch_directory():
|
|
849 |
+ #
|
|
850 |
+ # Fetches remote directory and adds it to content addressable store.
|
|
851 |
+ #
|
|
852 |
+ # Fetches files, symbolic links and recursively other directories in
|
|
853 |
+ # the remote directory and adds them to the content addressable
|
|
854 |
+ # store.
|
|
855 |
+ #
|
|
856 |
+ # Args:
|
|
857 |
+ # remote (Remote): The remote to use.
|
|
858 |
+ # dir_digest (Digest): Digest object for the directory to fetch.
|
|
859 |
+ #
|
|
860 |
+ def _fetch_directory(self, remote, dir_digest):
|
|
861 |
+ fetch_queue = [dir_digest]
|
|
862 |
+ fetch_next_queue = []
|
|
863 |
+ batch = _CASBatchRead(remote)
|
|
864 |
+ |
|
865 |
+ while len(fetch_queue) + len(fetch_next_queue) > 0:
|
|
866 |
+ if len(fetch_queue) == 0:
|
|
867 |
+ batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
|
|
868 |
+ |
|
869 |
+ dir_digest = fetch_queue.pop(0)
|
|
813 | 870 |
|
814 |
- digest = self.add_object(path=f.name)
|
|
815 |
- assert digest.hash == filenode.digest.hash
|
|
871 |
+ objpath = self._ensure_blob(remote, dir_digest)
|
|
872 |
+ |
|
873 |
+ directory = remote_execution_pb2.Directory()
|
|
874 |
+ with open(objpath, 'rb') as f:
|
|
875 |
+ directory.ParseFromString(f.read())
|
|
816 | 876 |
|
817 | 877 |
for dirnode in directory.directories:
|
818 |
- self._fetch_directory(remote, dirnode.digest)
|
|
878 |
+ batch = self._fetch_directory_node(remote, dirnode.digest, batch,
|
|
879 |
+ fetch_queue, fetch_next_queue, recursive=True)
|
|
880 |
+ |
|
881 |
+ for filenode in directory.files:
|
|
882 |
+ batch = self._fetch_directory_node(remote, filenode.digest, batch,
|
|
883 |
+ fetch_queue, fetch_next_queue)
|
|
884 |
+ |
|
885 |
+ # Fetch final batch
|
|
886 |
+ self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
|
|
887 |
+ |
|
888 |
+ def _send_blob(self, remote, digest, stream, u_uid=uuid.uuid4()):
|
|
889 |
+ resource_name = '/'.join(['uploads', str(u_uid), 'blobs',
|
|
890 |
+ digest.hash, str(digest.size_bytes)])
|
|
891 |
+ |
|
892 |
+ def request_stream(resname, instream):
|
|
893 |
+ offset = 0
|
|
894 |
+ finished = False
|
|
895 |
+ remaining = digest.size_bytes
|
|
896 |
+ while not finished:
|
|
897 |
+ chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
|
|
898 |
+ remaining -= chunk_size
|
|
899 |
+ |
|
900 |
+ request = bytestream_pb2.WriteRequest()
|
|
901 |
+ request.write_offset = offset
|
|
902 |
+ # max. _MAX_PAYLOAD_BYTES chunks
|
|
903 |
+ request.data = instream.read(chunk_size)
|
|
904 |
+ request.resource_name = resname
|
|
905 |
+ request.finish_write = remaining <= 0
|
|
906 |
+ |
|
907 |
+ yield request
|
|
908 |
+ |
|
909 |
+ offset += chunk_size
|
|
910 |
+ finished = request.finish_write
|
|
911 |
+ |
|
912 |
+ response = remote.bytestream.Write(request_stream(resource_name, stream))
|
|
913 |
+ |
|
914 |
+ assert response.committed_size == digest.size_bytes
|
|
915 |
+ |
|
916 |
+ def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
|
|
917 |
+ required_blobs = self._required_blobs(digest)
|
|
918 |
+ |
|
919 |
+ missing_blobs = dict()
|
|
920 |
+ # Limit size of FindMissingBlobs request
|
|
921 |
+ for required_blobs_group in _grouper(required_blobs, 512):
|
|
922 |
+ request = remote_execution_pb2.FindMissingBlobsRequest()
|
|
923 |
+ |
|
924 |
+ for required_digest in required_blobs_group:
|
|
925 |
+ d = request.blob_digests.add()
|
|
926 |
+ d.hash = required_digest.hash
|
|
927 |
+ d.size_bytes = required_digest.size_bytes
|
|
928 |
+ |
|
929 |
+ response = remote.cas.FindMissingBlobs(request)
|
|
930 |
+ for missing_digest in response.missing_blob_digests:
|
|
931 |
+ d = remote_execution_pb2.Digest()
|
|
932 |
+ d.hash = missing_digest.hash
|
|
933 |
+ d.size_bytes = missing_digest.size_bytes
|
|
934 |
+ missing_blobs[d.hash] = d
|
|
935 |
+ |
|
936 |
+ # Upload any blobs missing on the server
|
|
937 |
+ self._send_blobs(remote, missing_blobs.values(), u_uid)
|
|
938 |
+ |
|
939 |
+ def _send_blobs(self, remote, digests, u_uid=uuid.uuid4()):
|
|
940 |
+ batch = _CASBatchUpdate(remote)
|
|
941 |
+ |
|
942 |
+ for digest in digests:
|
|
943 |
+ with open(self.objpath(digest), 'rb') as f:
|
|
944 |
+ assert os.fstat(f.fileno()).st_size == digest.size_bytes
|
|
945 |
+ |
|
946 |
+ if (digest.size_bytes >= remote.max_batch_total_size_bytes or
|
|
947 |
+ not remote.batch_update_supported):
|
|
948 |
+ # Too large for batch request, upload in independent request.
|
|
949 |
+ self._send_blob(remote, digest, f, u_uid=u_uid)
|
|
950 |
+ else:
|
|
951 |
+ if not batch.add(digest, f):
|
|
952 |
+ # Not enough space left in batch request.
|
|
953 |
+ # Complete pending batch first.
|
|
954 |
+ batch.send()
|
|
955 |
+ batch = _CASBatchUpdate(remote)
|
|
956 |
+ batch.add(digest, f)
|
|
819 | 957 |
|
820 |
- # place directory blob only in final location when we've downloaded
|
|
821 |
- # all referenced blobs to avoid dangling references in the repository
|
|
822 |
- digest = self.add_object(path=out.name)
|
|
823 |
- assert digest.hash == tree.hash
|
|
958 |
+ # Send final batch
|
|
959 |
+ batch.send()
|
|
824 | 960 |
|
825 | 961 |
|
826 | 962 |
# Represents a single remote CAS cache.
|
... | ... | @@ -870,11 +1006,129 @@ class _CASRemote(): |
870 | 1006 |
|
871 | 1007 |
self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel)
|
872 | 1008 |
self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
|
1009 |
+ self.capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self.channel)
|
|
873 | 1010 |
self.ref_storage = buildstream_pb2_grpc.ReferenceStorageStub(self.channel)
|
874 | 1011 |
|
1012 |
+ self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
|
|
1013 |
+ try:
|
|
1014 |
+ request = remote_execution_pb2.GetCapabilitiesRequest()
|
|
1015 |
+ response = self.capabilities.GetCapabilities(request)
|
|
1016 |
+ server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes
|
|
1017 |
+ if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes:
|
|
1018 |
+ self.max_batch_total_size_bytes = server_max_batch_total_size_bytes
|
|
1019 |
+ except grpc.RpcError as e:
|
|
1020 |
+ # Simply use the defaults for servers that don't implement GetCapabilities()
|
|
1021 |
+ if e.code() != grpc.StatusCode.UNIMPLEMENTED:
|
|
1022 |
+ raise
|
|
1023 |
+ |
|
1024 |
+ # Check whether the server supports BatchReadBlobs()
|
|
1025 |
+ self.batch_read_supported = False
|
|
1026 |
+ try:
|
|
1027 |
+ request = remote_execution_pb2.BatchReadBlobsRequest()
|
|
1028 |
+ response = self.cas.BatchReadBlobs(request)
|
|
1029 |
+ self.batch_read_supported = True
|
|
1030 |
+ except grpc.RpcError as e:
|
|
1031 |
+ if e.code() != grpc.StatusCode.UNIMPLEMENTED:
|
|
1032 |
+ raise
|
|
1033 |
+ |
|
1034 |
+ # Check whether the server supports BatchUpdateBlobs()
|
|
1035 |
+ self.batch_update_supported = False
|
|
1036 |
+ try:
|
|
1037 |
+ request = remote_execution_pb2.BatchUpdateBlobsRequest()
|
|
1038 |
+ response = self.cas.BatchUpdateBlobs(request)
|
|
1039 |
+ self.batch_update_supported = True
|
|
1040 |
+ except grpc.RpcError as e:
|
|
1041 |
+ if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
|
|
1042 |
+ e.code() != grpc.StatusCode.PERMISSION_DENIED):
|
|
1043 |
+ raise
|
|
1044 |
+ |
|
875 | 1045 |
self._initialized = True
|
876 | 1046 |
|
877 | 1047 |
|
1048 |
+# Represents a batch of blobs queued for fetching.
|
|
1049 |
+#
|
|
1050 |
+class _CASBatchRead():
|
|
1051 |
+ def __init__(self, remote):
|
|
1052 |
+ self._remote = remote
|
|
1053 |
+ self._max_total_size_bytes = remote.max_batch_total_size_bytes
|
|
1054 |
+ self._request = remote_execution_pb2.BatchReadBlobsRequest()
|
|
1055 |
+ self._size = 0
|
|
1056 |
+ self._sent = False
|
|
1057 |
+ |
|
1058 |
+ def add(self, digest):
|
|
1059 |
+ assert not self._sent
|
|
1060 |
+ |
|
1061 |
+ new_batch_size = self._size + digest.size_bytes
|
|
1062 |
+ if new_batch_size > self._max_total_size_bytes:
|
|
1063 |
+ # Not enough space left in current batch
|
|
1064 |
+ return False
|
|
1065 |
+ |
|
1066 |
+ request_digest = self._request.digests.add()
|
|
1067 |
+ request_digest.hash = digest.hash
|
|
1068 |
+ request_digest.size_bytes = digest.size_bytes
|
|
1069 |
+ self._size = new_batch_size
|
|
1070 |
+ return True
|
|
1071 |
+ |
|
1072 |
+ def send(self):
|
|
1073 |
+ assert not self._sent
|
|
1074 |
+ self._sent = True
|
|
1075 |
+ |
|
1076 |
+ if len(self._request.digests) == 0:
|
|
1077 |
+ return
|
|
1078 |
+ |
|
1079 |
+ batch_response = self._remote.cas.BatchReadBlobs(self._request)
|
|
1080 |
+ |
|
1081 |
+ for response in batch_response.responses:
|
|
1082 |
+ if response.status.code != grpc.StatusCode.OK.value[0]:
|
|
1083 |
+ raise ArtifactError("Failed to download blob {}: {}".format(
|
|
1084 |
+ response.digest.hash, response.status.code))
|
|
1085 |
+ if response.digest.size_bytes != len(response.data):
|
|
1086 |
+ raise ArtifactError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
|
|
1087 |
+ response.digest.hash, response.digest.size_bytes, len(response.data)))
|
|
1088 |
+ |
|
1089 |
+ yield (response.digest, response.data)
|
|
1090 |
+ |
|
1091 |
+ |
|
1092 |
+# Represents a batch of blobs queued for upload.
|
|
1093 |
+#
|
|
1094 |
+class _CASBatchUpdate():
|
|
1095 |
+ def __init__(self, remote):
|
|
1096 |
+ self._remote = remote
|
|
1097 |
+ self._max_total_size_bytes = remote.max_batch_total_size_bytes
|
|
1098 |
+ self._request = remote_execution_pb2.BatchUpdateBlobsRequest()
|
|
1099 |
+ self._size = 0
|
|
1100 |
+ self._sent = False
|
|
1101 |
+ |
|
1102 |
+ def add(self, digest, stream):
|
|
1103 |
+ assert not self._sent
|
|
1104 |
+ |
|
1105 |
+ new_batch_size = self._size + digest.size_bytes
|
|
1106 |
+ if new_batch_size > self._max_total_size_bytes:
|
|
1107 |
+ # Not enough space left in current batch
|
|
1108 |
+ return False
|
|
1109 |
+ |
|
1110 |
+ blob_request = self._request.requests.add()
|
|
1111 |
+ blob_request.digest.hash = digest.hash
|
|
1112 |
+ blob_request.digest.size_bytes = digest.size_bytes
|
|
1113 |
+ blob_request.data = stream.read(digest.size_bytes)
|
|
1114 |
+ self._size = new_batch_size
|
|
1115 |
+ return True
|
|
1116 |
+ |
|
1117 |
+ def send(self):
|
|
1118 |
+ assert not self._sent
|
|
1119 |
+ self._sent = True
|
|
1120 |
+ |
|
1121 |
+ if len(self._request.requests) == 0:
|
|
1122 |
+ return
|
|
1123 |
+ |
|
1124 |
+ batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
|
|
1125 |
+ |
|
1126 |
+ for response in batch_response.responses:
|
|
1127 |
+ if response.status.code != grpc.StatusCode.OK.value[0]:
|
|
1128 |
+ raise ArtifactError("Failed to upload blob {}: {}".format(
|
|
1129 |
+ response.digest.hash, response.status.code))
|
|
1130 |
+ |
|
1131 |
+ |
|
878 | 1132 |
def _grouper(iterable, n):
|
879 | 1133 |
while True:
|
880 | 1134 |
try:
|
... | ... | @@ -38,8 +38,9 @@ from .._context import Context |
38 | 38 |
from .cascache import CASCache
|
39 | 39 |
|
40 | 40 |
|
41 |
-# The default limit for gRPC messages is 4 MiB
|
|
42 |
-_MAX_BATCH_TOTAL_SIZE_BYTES = 4 * 1024 * 1024
|
|
41 |
+# The default limit for gRPC messages is 4 MiB.
|
|
42 |
+# Limit payload to 1 MiB to leave sufficient headroom for metadata.
|
|
43 |
+_MAX_PAYLOAD_BYTES = 1024 * 1024
|
|
43 | 44 |
|
44 | 45 |
|
45 | 46 |
# Trying to push an artifact that is too large
|
... | ... | @@ -69,7 +70,7 @@ def create_server(repo, *, enable_push): |
69 | 70 |
_ByteStreamServicer(artifactcache, enable_push=enable_push), server)
|
70 | 71 |
|
71 | 72 |
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
|
72 |
- _ContentAddressableStorageServicer(artifactcache), server)
|
|
73 |
+ _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
|
|
73 | 74 |
|
74 | 75 |
remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
|
75 | 76 |
_CapabilitiesServicer(), server)
|
... | ... | @@ -158,7 +159,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
158 | 159 |
|
159 | 160 |
remaining = client_digest.size_bytes - request.read_offset
|
160 | 161 |
while remaining > 0:
|
161 |
- chunk_size = min(remaining, 64 * 1024)
|
|
162 |
+ chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
|
|
162 | 163 |
remaining -= chunk_size
|
163 | 164 |
|
164 | 165 |
response = bytestream_pb2.ReadResponse()
|
... | ... | @@ -223,9 +224,10 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
223 | 224 |
|
224 | 225 |
|
225 | 226 |
class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
|
226 |
- def __init__(self, cas):
|
|
227 |
+ def __init__(self, cas, *, enable_push):
|
|
227 | 228 |
super().__init__()
|
228 | 229 |
self.cas = cas
|
230 |
+ self.enable_push = enable_push
|
|
229 | 231 |
|
230 | 232 |
def FindMissingBlobs(self, request, context):
|
231 | 233 |
response = remote_execution_pb2.FindMissingBlobsResponse()
|
... | ... | @@ -242,7 +244,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres |
242 | 244 |
|
243 | 245 |
for digest in request.digests:
|
244 | 246 |
batch_size += digest.size_bytes
|
245 |
- if batch_size > _MAX_BATCH_TOTAL_SIZE_BYTES:
|
|
247 |
+ if batch_size > _MAX_PAYLOAD_BYTES:
|
|
246 | 248 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
247 | 249 |
return response
|
248 | 250 |
|
... | ... | @@ -261,6 +263,46 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres |
261 | 263 |
|
262 | 264 |
return response
|
263 | 265 |
|
266 |
+ def BatchUpdateBlobs(self, request, context):
|
|
267 |
+ response = remote_execution_pb2.BatchUpdateBlobsResponse()
|
|
268 |
+ |
|
269 |
+ if not self.enable_push:
|
|
270 |
+ context.set_code(grpc.StatusCode.PERMISSION_DENIED)
|
|
271 |
+ return response
|
|
272 |
+ |
|
273 |
+ batch_size = 0
|
|
274 |
+ |
|
275 |
+ for blob_request in request.requests:
|
|
276 |
+ digest = blob_request.digest
|
|
277 |
+ |
|
278 |
+ batch_size += digest.size_bytes
|
|
279 |
+ if batch_size > _MAX_PAYLOAD_BYTES:
|
|
280 |
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
|
281 |
+ return response
|
|
282 |
+ |
|
283 |
+ blob_response = response.responses.add()
|
|
284 |
+ blob_response.digest.hash = digest.hash
|
|
285 |
+ blob_response.digest.size_bytes = digest.size_bytes
|
|
286 |
+ |
|
287 |
+ if len(blob_request.data) != digest.size_bytes:
|
|
288 |
+ blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
|
|
289 |
+ continue
|
|
290 |
+ |
|
291 |
+ try:
|
|
292 |
+ _clean_up_cache(self.cas, digest.size_bytes)
|
|
293 |
+ |
|
294 |
+ with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
|
|
295 |
+ out.write(blob_request.data)
|
|
296 |
+ out.flush()
|
|
297 |
+ server_digest = self.cas.add_object(path=out.name)
|
|
298 |
+ if server_digest.hash != digest.hash:
|
|
299 |
+ blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
|
|
300 |
+ |
|
301 |
+ except ArtifactTooLargeException:
|
|
302 |
+ blob_response.status.code = grpc.StatusCode.RESOURCE_EXHAUSTED
|
|
303 |
+ |
|
304 |
+ return response
|
|
305 |
+ |
|
264 | 306 |
|
265 | 307 |
class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer):
|
266 | 308 |
def GetCapabilities(self, request, context):
|
... | ... | @@ -269,7 +311,7 @@ class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer): |
269 | 311 |
cache_capabilities = response.cache_capabilities
|
270 | 312 |
cache_capabilities.digest_function.append(remote_execution_pb2.SHA256)
|
271 | 313 |
cache_capabilities.action_cache_update_capabilities.update_enabled = False
|
272 |
- cache_capabilities.max_batch_total_size_bytes = _MAX_BATCH_TOTAL_SIZE_BYTES
|
|
314 |
+ cache_capabilities.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
|
|
273 | 315 |
cache_capabilities.symlink_absolute_path_strategy = remote_execution_pb2.CacheCapabilities.ALLOWED
|
274 | 316 |
|
275 | 317 |
response.deprecated_api_version.major = 2
|
... | ... | @@ -119,6 +119,8 @@ class Job(): |
119 | 119 |
self._result = None # Return value of child action in the parent
|
120 | 120 |
self._tries = 0 # Try count, for retryable jobs
|
121 | 121 |
self._skipped_flag = False # Indicate whether the job was skipped.
|
122 |
+ self._terminated = False # Whether this job has been explicitly terminated
|
|
123 |
+ |
|
122 | 124 |
# If False, a retry will not be attempted regardless of whether _tries is less than _max_retries.
|
123 | 125 |
#
|
124 | 126 |
self._retry_flag = True
|
... | ... | @@ -188,6 +190,8 @@ class Job(): |
188 | 190 |
# Terminate the process using multiprocessing API pathway
|
189 | 191 |
self._process.terminate()
|
190 | 192 |
|
193 |
+ self._terminated = True
|
|
194 |
+ |
|
191 | 195 |
# terminate_wait()
|
192 | 196 |
#
|
193 | 197 |
# Wait for terminated jobs to complete
|
... | ... | @@ -271,18 +275,22 @@ class Job(): |
271 | 275 |
# running the integration commands).
|
272 | 276 |
#
|
273 | 277 |
# Args:
|
274 |
- # (int): The plugin identifier for this task
|
|
278 |
+ # task_id (int): The plugin identifier for this task
|
|
275 | 279 |
#
|
276 | 280 |
def set_task_id(self, task_id):
|
277 | 281 |
self._task_id = task_id
|
278 | 282 |
|
279 | 283 |
# skipped
|
280 | 284 |
#
|
285 |
+ # This will evaluate to True if the job was skipped
|
|
286 |
+ # during processing, or if it was forcefully terminated.
|
|
287 |
+ #
|
|
281 | 288 |
# Returns:
|
282 |
- # bool: True if the job was skipped while processing.
|
|
289 |
+ # (bool): Whether the job should appear as skipped
|
|
290 |
+ #
|
|
283 | 291 |
@property
|
284 | 292 |
def skipped(self):
|
285 |
- return self._skipped_flag
|
|
293 |
+ return self._skipped_flag or self._terminated
|
|
286 | 294 |
|
287 | 295 |
#######################################################
|
288 | 296 |
# Abstract Methods #
|
... | ... | @@ -65,7 +65,7 @@ class BuildQueue(Queue): |
65 | 65 |
# If the estimated size outgrows the quota, ask the scheduler
|
66 | 66 |
# to queue a job to actually check the real cache size.
|
67 | 67 |
#
|
68 |
- if artifacts.get_quota_exceeded():
|
|
68 |
+ if artifacts.has_quota_exceeded():
|
|
69 | 69 |
self._scheduler.check_cache_size()
|
70 | 70 |
|
71 | 71 |
def done(self, job, element, result, success):
|
... | ... | @@ -325,15 +325,22 @@ class Queue(): |
325 | 325 |
detail=traceback.format_exc())
|
326 | 326 |
self.failed_elements.append(element)
|
327 | 327 |
else:
|
328 |
- |
|
329 |
- # No exception occured, handle the success/failure state in the normal way
|
|
330 | 328 |
#
|
329 |
+ # No exception occured in post processing
|
|
330 |
+ #
|
|
331 |
+ |
|
332 |
+ # Only place in the output done queue if the job
|
|
333 |
+ # was considered successful
|
|
331 | 334 |
if success:
|
332 | 335 |
self._done_queue.append(job)
|
333 |
- if not job.skipped:
|
|
334 |
- self.processed_elements.append(element)
|
|
335 |
- else:
|
|
336 |
- self.skipped_elements.append(element)
|
|
336 |
+ |
|
337 |
+ # A Job can be skipped whether or not it has failed,
|
|
338 |
+ # we want to only bookkeep them as processed or failed
|
|
339 |
+ # if they are not skipped.
|
|
340 |
+ if job.skipped:
|
|
341 |
+ self.skipped_elements.append(element)
|
|
342 |
+ elif success:
|
|
343 |
+ self.processed_elements.append(element)
|
|
337 | 344 |
else:
|
338 | 345 |
self.failed_elements.append(element)
|
339 | 346 |
|
... | ... | @@ -349,7 +349,7 @@ class Scheduler(): |
349 | 349 |
platform = Platform.get_platform()
|
350 | 350 |
artifacts = platform.artifactcache
|
351 | 351 |
|
352 |
- if not artifacts.get_quota_exceeded():
|
|
352 |
+ if not artifacts.has_quota_exceeded():
|
|
353 | 353 |
return
|
354 | 354 |
|
355 | 355 |
job = CleanupJob(self, 'cleanup', 'cleanup/cleanup',
|
... | ... | @@ -387,6 +387,15 @@ class Scheduler(): |
387 | 387 |
# A loop registered event callback for keyboard interrupts
|
388 | 388 |
#
|
389 | 389 |
def _interrupt_event(self):
|
390 |
+ |
|
391 |
+ # FIXME: This should not be needed, but for some reason we receive an
|
|
392 |
+ # additional SIGINT event when the user hits ^C a second time
|
|
393 |
+ # to inform us that they really intend to terminate; even though
|
|
394 |
+ # we have disconnected our handlers at this time.
|
|
395 |
+ #
|
|
396 |
+ if self.terminated:
|
|
397 |
+ return
|
|
398 |
+ |
|
390 | 399 |
# Leave this to the frontend to decide, if no
|
391 | 400 |
# interrrupt callback was specified, then just terminate.
|
392 | 401 |
if self._interrupt_callback:
|
... | ... | @@ -467,7 +467,7 @@ def node_get_project_path(node, key, project_dir, *, |
467 | 467 |
"{}: Specified path '{}' does not exist"
|
468 | 468 |
.format(provenance, path_str))
|
469 | 469 |
|
470 |
- is_inside = project_dir_path in full_resolved_path.parents or (
|
|
470 |
+ is_inside = project_dir_path.resolve() in full_resolved_path.parents or (
|
|
471 | 471 |
full_resolved_path == project_dir_path)
|
472 | 472 |
|
473 | 473 |
if path.is_absolute() or not is_inside:
|
... | ... | @@ -434,7 +434,7 @@ class Element(Plugin): |
434 | 434 |
visited=visited, recursed=True)
|
435 | 435 |
|
436 | 436 |
# Yeild self only at the end, after anything needed has been traversed
|
437 |
- if should_yield and (recurse or recursed) and (scope == Scope.ALL or scope == Scope.RUN):
|
|
437 |
+ if should_yield and (recurse or recursed) and scope != Scope.BUILD:
|
|
438 | 438 |
yield self
|
439 | 439 |
|
440 | 440 |
def search(self, scope, name):
|
... | ... | @@ -1289,17 +1289,21 @@ class Element(Plugin): |
1289 | 1289 |
if scope == Scope.BUILD:
|
1290 | 1290 |
self.stage(sandbox)
|
1291 | 1291 |
elif scope == Scope.RUN:
|
1292 |
- # Stage deps in the sandbox root
|
|
1293 | 1292 |
if deps == 'run':
|
1294 |
- with self.timed_activity("Staging dependencies", silent_nested=True):
|
|
1295 |
- self.stage_dependency_artifacts(sandbox, scope)
|
|
1296 |
- |
|
1297 |
- # Run any integration commands provided by the dependencies
|
|
1298 |
- # once they are all staged and ready
|
|
1299 |
- if integrate:
|
|
1300 |
- with self.timed_activity("Integrating sandbox"):
|
|
1301 |
- for dep in self.dependencies(scope):
|
|
1302 |
- dep.integrate(sandbox)
|
|
1293 |
+ dependency_scope = Scope.RUN
|
|
1294 |
+ else:
|
|
1295 |
+ dependency_scope = None
|
|
1296 |
+ |
|
1297 |
+ # Stage deps in the sandbox root
|
|
1298 |
+ with self.timed_activity("Staging dependencies", silent_nested=True):
|
|
1299 |
+ self.stage_dependency_artifacts(sandbox, dependency_scope)
|
|
1300 |
+ |
|
1301 |
+ # Run any integration commands provided by the dependencies
|
|
1302 |
+ # once they are all staged and ready
|
|
1303 |
+ if integrate:
|
|
1304 |
+ with self.timed_activity("Integrating sandbox"):
|
|
1305 |
+ for dep in self.dependencies(dependency_scope):
|
|
1306 |
+ dep.integrate(sandbox)
|
|
1303 | 1307 |
|
1304 | 1308 |
yield sandbox
|
1305 | 1309 |
|
... | ... | @@ -164,10 +164,18 @@ class GitMirror(SourceFetcher): |
164 | 164 |
cwd=self.mirror)
|
165 | 165 |
|
166 | 166 |
def fetch(self, alias_override=None):
|
167 |
- self.ensure(alias_override)
|
|
168 |
- if not self.has_ref():
|
|
169 |
- self._fetch(alias_override)
|
|
170 |
- self.assert_ref()
|
|
167 |
+ # Resolve the URL for the message
|
|
168 |
+ resolved_url = self.source.translate_url(self.url,
|
|
169 |
+ alias_override=alias_override,
|
|
170 |
+ primary=self.primary)
|
|
171 |
+ |
|
172 |
+ with self.source.timed_activity("Fetching from {}"
|
|
173 |
+ .format(resolved_url),
|
|
174 |
+ silent_nested=True):
|
|
175 |
+ self.ensure(alias_override)
|
|
176 |
+ if not self.has_ref():
|
|
177 |
+ self._fetch(alias_override)
|
|
178 |
+ self.assert_ref()
|
|
171 | 179 |
|
172 | 180 |
def has_ref(self):
|
173 | 181 |
if not self.ref:
|
... | ... | @@ -585,28 +585,48 @@ class Source(Plugin): |
585 | 585 |
#
|
586 | 586 |
def _fetch(self):
|
587 | 587 |
project = self._get_project()
|
588 |
- source_fetchers = self.get_source_fetchers()
|
|
588 |
+ context = self._get_context()
|
|
589 |
+ |
|
590 |
+ # Silence the STATUS messages which might happen as a result
|
|
591 |
+ # of checking the source fetchers.
|
|
592 |
+ with context.silence():
|
|
593 |
+ source_fetchers = self.get_source_fetchers()
|
|
589 | 594 |
|
590 | 595 |
# Use the source fetchers if they are provided
|
591 | 596 |
#
|
592 | 597 |
if source_fetchers:
|
593 |
- for fetcher in source_fetchers:
|
|
594 |
- alias = fetcher._get_alias()
|
|
595 |
- for uri in project.get_alias_uris(alias, first_pass=self.__first_pass):
|
|
596 |
- try:
|
|
597 |
- fetcher.fetch(uri)
|
|
598 |
- # FIXME: Need to consider temporary vs. permanent failures,
|
|
599 |
- # and how this works with retries.
|
|
600 |
- except BstError as e:
|
|
601 |
- last_error = e
|
|
602 |
- continue
|
|
603 |
- |
|
604 |
- # No error, we're done with this fetcher
|
|
605 |
- break
|
|
606 | 598 |
|
607 |
- else:
|
|
608 |
- # No break occurred, raise the last detected error
|
|
609 |
- raise last_error
|
|
599 |
+ # Use a contorted loop here, this is to allow us to
|
|
600 |
+ # silence the messages which can result from consuming
|
|
601 |
+ # the items of source_fetchers, if it happens to be a generator.
|
|
602 |
+ #
|
|
603 |
+ source_fetchers = iter(source_fetchers)
|
|
604 |
+ try:
|
|
605 |
+ |
|
606 |
+ while True:
|
|
607 |
+ |
|
608 |
+ with context.silence():
|
|
609 |
+ fetcher = next(source_fetchers)
|
|
610 |
+ |
|
611 |
+ alias = fetcher._get_alias()
|
|
612 |
+ for uri in project.get_alias_uris(alias, first_pass=self.__first_pass):
|
|
613 |
+ try:
|
|
614 |
+ fetcher.fetch(uri)
|
|
615 |
+ # FIXME: Need to consider temporary vs. permanent failures,
|
|
616 |
+ # and how this works with retries.
|
|
617 |
+ except BstError as e:
|
|
618 |
+ last_error = e
|
|
619 |
+ continue
|
|
620 |
+ |
|
621 |
+ # No error, we're done with this fetcher
|
|
622 |
+ break
|
|
623 |
+ |
|
624 |
+ else:
|
|
625 |
+ # No break occurred, raise the last detected error
|
|
626 |
+ raise last_error
|
|
627 |
+ |
|
628 |
+ except StopIteration:
|
|
629 |
+ pass
|
|
610 | 630 |
|
611 | 631 |
# Default codepath is to reinstantiate the Source
|
612 | 632 |
#
|
... | ... | @@ -496,7 +496,7 @@ def get_bst_version(): |
496 | 496 |
|
497 | 497 |
@contextmanager
|
498 | 498 |
def save_file_atomic(filename, mode='w', *, buffering=-1, encoding=None,
|
499 |
- errors=None, newline=None, closefd=True, opener=None):
|
|
499 |
+ errors=None, newline=None, closefd=True, opener=None, tempdir=None):
|
|
500 | 500 |
"""Save a file with a temporary name and rename it into place when ready.
|
501 | 501 |
|
502 | 502 |
This is a context manager which is meant for saving data to files.
|
... | ... | @@ -523,8 +523,9 @@ def save_file_atomic(filename, mode='w', *, buffering=-1, encoding=None, |
523 | 523 |
# https://bugs.python.org/issue8604
|
524 | 524 |
|
525 | 525 |
assert os.path.isabs(filename), "The utils.save_file_atomic() parameter ``filename`` must be an absolute path"
|
526 |
- dirname = os.path.dirname(filename)
|
|
527 |
- fd, tempname = tempfile.mkstemp(dir=dirname)
|
|
526 |
+ if tempdir is None:
|
|
527 |
+ tempdir = os.path.dirname(filename)
|
|
528 |
+ fd, tempname = tempfile.mkstemp(dir=tempdir)
|
|
528 | 529 |
os.close(fd)
|
529 | 530 |
|
530 | 531 |
f = open(tempname, mode=mode, buffering=buffering, encoding=encoding,
|
... | ... | @@ -556,6 +557,9 @@ def save_file_atomic(filename, mode='w', *, buffering=-1, encoding=None, |
556 | 557 |
#
|
557 | 558 |
# Get the disk usage of a given directory in bytes.
|
558 | 559 |
#
|
560 |
+# This function assumes that files do not inadvertantly
|
|
561 |
+# disappear while this function is running.
|
|
562 |
+#
|
|
559 | 563 |
# Arguments:
|
560 | 564 |
# (str) The path whose size to check.
|
561 | 565 |
#
|
... | ... | @@ -675,7 +679,7 @@ def _force_rmtree(rootpath, **kwargs): |
675 | 679 |
|
676 | 680 |
try:
|
677 | 681 |
shutil.rmtree(rootpath, **kwargs)
|
678 |
- except shutil.Error as e:
|
|
682 |
+ except OSError as e:
|
|
679 | 683 |
raise UtilError("Failed to remove cache directory '{}': {}"
|
680 | 684 |
.format(rootpath, e))
|
681 | 685 |
|
... | ... | @@ -181,3 +181,15 @@ def test_project_refs_options(cli, datafiles): |
181 | 181 |
|
182 | 182 |
# Assert that the cache keys are different
|
183 | 183 |
assert result1.output != result2.output
|
184 |
+ |
|
185 |
+ |
|
186 |
+@pytest.mark.datafiles(os.path.join(DATA_DIR, 'element-path'))
|
|
187 |
+def test_element_path_project_path_contains_symlinks(cli, datafiles, tmpdir):
|
|
188 |
+ real_project = str(datafiles)
|
|
189 |
+ linked_project = os.path.join(str(tmpdir), 'linked')
|
|
190 |
+ os.symlink(real_project, linked_project)
|
|
191 |
+ os.makedirs(os.path.join(real_project, 'elements'), exist_ok=True)
|
|
192 |
+ with open(os.path.join(real_project, 'elements', 'element.bst'), 'w') as f:
|
|
193 |
+ f.write("kind: manual\n")
|
|
194 |
+ result = cli.run(project=linked_project, args=['show', 'element.bst'])
|
|
195 |
+ result.assert_success()
|
... | ... | @@ -65,9 +65,10 @@ def test_build_checkout(datafiles, cli, strict, hardlinks): |
65 | 65 |
def test_build_checkout_deps(datafiles, cli, deps):
|
66 | 66 |
project = os.path.join(datafiles.dirname, datafiles.basename)
|
67 | 67 |
checkout = os.path.join(cli.directory, 'checkout')
|
68 |
+ element_name = "checkout-deps.bst"
|
|
68 | 69 |
|
69 | 70 |
# First build it
|
70 |
- result = cli.run(project=project, args=['build', 'target.bst'])
|
|
71 |
+ result = cli.run(project=project, args=['build', element_name])
|
|
71 | 72 |
result.assert_success()
|
72 | 73 |
|
73 | 74 |
# Assert that after a successful build, the builddir is empty
|
... | ... | @@ -76,20 +77,15 @@ def test_build_checkout_deps(datafiles, cli, deps): |
76 | 77 |
assert not os.listdir(builddir)
|
77 | 78 |
|
78 | 79 |
# Now check it out
|
79 |
- result = cli.run(project=project, args=['checkout', 'target.bst', '--deps', deps, checkout])
|
|
80 |
+ result = cli.run(project=project, args=['checkout', element_name, '--deps', deps, checkout])
|
|
80 | 81 |
result.assert_success()
|
81 | 82 |
|
82 |
- # Check that the executable hello file is found in the checkout
|
|
83 |
- filename = os.path.join(checkout, 'usr', 'bin', 'hello')
|
|
84 |
- |
|
85 |
- if deps == "run":
|
|
86 |
- assert os.path.exists(filename)
|
|
87 |
- else:
|
|
88 |
- assert not os.path.exists(filename)
|
|
89 |
- |
|
90 |
- # Check that the executable hello file is found in the checkout
|
|
91 |
- filename = os.path.join(checkout, 'usr', 'include', 'pony.h')
|
|
83 |
+ # Verify output of this element
|
|
84 |
+ filename = os.path.join(checkout, 'etc', 'buildstream', 'config')
|
|
85 |
+ assert os.path.exists(filename)
|
|
92 | 86 |
|
87 |
+ # Verify output of this element's runtime dependencies
|
|
88 |
+ filename = os.path.join(checkout, 'usr', 'bin', 'hello')
|
|
93 | 89 |
if deps == "run":
|
94 | 90 |
assert os.path.exists(filename)
|
95 | 91 |
else:
|
1 |
+kind: import
|
|
2 |
+description: It is important for this element to have both build and runtime dependencies
|
|
3 |
+sources:
|
|
4 |
+- kind: local
|
|
5 |
+ path: files/etc-files
|
|
6 |
+depends:
|
|
7 |
+- filename: import-dev.bst
|
|
8 |
+ type: build
|
|
9 |
+- filename: import-bin.bst
|
|
10 |
+ type: runtime
|
1 |
+config
|