Tom Pollard pushed to branch tpollard/494 at BuildStream / buildstream
Commits:
-
0993de53
by Richard Maw at 2018-11-05T16:41:05Z
-
be8f0a54
by richardmaw-codethink at 2018-11-05T17:08:43Z
-
bfb639bf
by Jürg Billeter at 2018-11-05T17:18:12Z
-
ca855f91
by Jürg Billeter at 2018-11-05T17:18:12Z
-
15fed21c
by Jürg Billeter at 2018-11-05T17:18:12Z
-
0085d2aa
by Jürg Billeter at 2018-11-05T17:18:12Z
-
f69b1117
by Jürg Billeter at 2018-11-05T17:18:12Z
-
e398f877
by Jürg Billeter at 2018-11-05T17:18:12Z
-
626d20ae
by Jürg Billeter at 2018-11-05T17:18:12Z
-
ec04446b
by Jürg Billeter at 2018-11-05T17:51:39Z
-
78c72bb2
by Tom Pollard at 2018-11-06T10:12:40Z
-
39c1b9b8
by Tom Pollard at 2018-11-06T12:59:53Z
23 changed files:
- NEWS
- buildstream/_artifactcache/artifactcache.py
- buildstream/_artifactcache/cascache.py
- buildstream/_artifactcache/casserver.py
- buildstream/_context.py
- buildstream/_elementfactory.py
- buildstream/_exceptions.py
- buildstream/_frontend/app.py
- buildstream/_frontend/cli.py
- buildstream/_loader/loader.py
- buildstream/_pipeline.py
- buildstream/_project.py
- buildstream/_scheduler/queues/pullqueue.py
- buildstream/data/userconfig.yaml
- buildstream/element.py
- buildstream/storage/_casbaseddirectory.py
- dev-requirements.txt
- tests/artifactcache/pull.py
- tests/artifactcache/push.py
- tests/completions/completions.py
- tests/integration/build-tree.py
- + tests/integration/pullbuildtrees.py
- tests/testutils/artifactshare.py
Changes:
... | ... | @@ -38,13 +38,23 @@ buildstream 1.3.1 |
38 | 38 |
a bug fix to workspaces so they can be build in workspaces too.
|
39 | 39 |
|
40 | 40 |
o Creating a build shell through the interactive mode or `bst shell --build`
|
41 |
- will now use the cached build tree. It is now easier to debug local build
|
|
42 |
- failures.
|
|
41 |
+ will now use the cached buildtree if available locally. It is now easier to
|
|
42 |
+ debug local build failures.
|
|
43 | 43 |
|
44 | 44 |
o `bst shell --sysroot` now takes any directory that contains a sysroot,
|
45 | 45 |
instead of just a specially-formatted build-root with a `root` and `scratch`
|
46 | 46 |
subdirectory.
|
47 | 47 |
|
48 |
+ o Due to the element `build tree` being cached in the respective artifact their
|
|
49 |
+ size in some cases has significantly increased. In *most* cases the build tree
|
|
50 |
+ is not utilised when building targets, as such by default bst 'pull' & 'build'
|
|
51 |
+ will not fetch buildtrees from remotes. This behaviour can be overriden with
|
|
52 |
+ the cli main option '--pull-build-trees', or the user configuration option
|
|
53 |
+ 'pullbuildtrees = True'. The override will also add the build tree to already
|
|
54 |
+ cached artifacts. When attempting to populate an artifactcache server with
|
|
55 |
+ cached artifacts, only 'complete' elements can be pushed. If the element is
|
|
56 |
+ expected to have a populated build tree then it must be cached before pushing.
|
|
57 |
+ |
|
48 | 58 |
|
49 | 59 |
=================
|
50 | 60 |
buildstream 1.1.5
|
... | ... | @@ -17,17 +17,22 @@ |
17 | 17 |
# Authors:
|
18 | 18 |
# Tristan Maat <tristan maat codethink co uk>
|
19 | 19 |
|
20 |
+import multiprocessing
|
|
20 | 21 |
import os
|
22 |
+import signal
|
|
21 | 23 |
import string
|
22 | 24 |
from collections import namedtuple
|
23 | 25 |
from collections.abc import Mapping
|
24 | 26 |
|
25 | 27 |
from ..types import _KeyStrength
|
26 |
-from .._exceptions import ArtifactError, ImplError, LoadError, LoadErrorReason
|
|
28 |
+from .._exceptions import ArtifactError, CASError, LoadError, LoadErrorReason
|
|
27 | 29 |
from .._message import Message, MessageType
|
30 |
+from .. import _signals
|
|
28 | 31 |
from .. import utils
|
29 | 32 |
from .. import _yaml
|
30 | 33 |
|
34 |
+from .cascache import CASCache, CASRemote
|
|
35 |
+ |
|
31 | 36 |
|
32 | 37 |
CACHE_SIZE_FILE = "cache_size"
|
33 | 38 |
|
... | ... | @@ -93,7 +98,8 @@ class ArtifactCache(): |
93 | 98 |
def __init__(self, context):
|
94 | 99 |
self.context = context
|
95 | 100 |
self.extractdir = os.path.join(context.artifactdir, 'extract')
|
96 |
- self.tmpdir = os.path.join(context.artifactdir, 'tmp')
|
|
101 |
+ |
|
102 |
+ self.cas = CASCache(context.artifactdir)
|
|
97 | 103 |
|
98 | 104 |
self.global_remote_specs = []
|
99 | 105 |
self.project_remote_specs = {}
|
... | ... | @@ -104,12 +110,15 @@ class ArtifactCache(): |
104 | 110 |
self._cache_lower_threshold = None # The target cache size for a cleanup
|
105 | 111 |
self._remotes_setup = False # Check to prevent double-setup of remotes
|
106 | 112 |
|
113 |
+ # Per-project list of _CASRemote instances.
|
|
114 |
+ self._remotes = {}
|
|
115 |
+ |
|
116 |
+ self._has_fetch_remotes = False
|
|
117 |
+ self._has_push_remotes = False
|
|
118 |
+ |
|
107 | 119 |
os.makedirs(self.extractdir, exist_ok=True)
|
108 |
- os.makedirs(self.tmpdir, exist_ok=True)
|
|
109 | 120 |
|
110 |
- ################################################
|
|
111 |
- # Methods implemented on the abstract class #
|
|
112 |
- ################################################
|
|
121 |
+ self._calculate_cache_quota()
|
|
113 | 122 |
|
114 | 123 |
# get_artifact_fullname()
|
115 | 124 |
#
|
... | ... | @@ -240,8 +249,10 @@ class ArtifactCache(): |
240 | 249 |
for key in (strong_key, weak_key):
|
241 | 250 |
if key:
|
242 | 251 |
try:
|
243 |
- self.update_mtime(element, key)
|
|
244 |
- except ArtifactError:
|
|
252 |
+ ref = self.get_artifact_fullname(element, key)
|
|
253 |
+ |
|
254 |
+ self.cas.update_mtime(ref)
|
|
255 |
+ except CASError:
|
|
245 | 256 |
pass
|
246 | 257 |
|
247 | 258 |
# clean():
|
... | ... | @@ -252,7 +263,7 @@ class ArtifactCache(): |
252 | 263 |
# (int): The size of the cache after having cleaned up
|
253 | 264 |
#
|
254 | 265 |
def clean(self):
|
255 |
- artifacts = self.list_artifacts() # pylint: disable=assignment-from-no-return
|
|
266 |
+ artifacts = self.list_artifacts()
|
|
256 | 267 |
|
257 | 268 |
# Build a set of the cache keys which are required
|
258 | 269 |
# based on the required elements at cleanup time
|
... | ... | @@ -294,7 +305,7 @@ class ArtifactCache(): |
294 | 305 |
if key not in required_artifacts:
|
295 | 306 |
|
296 | 307 |
# Remove the actual artifact, if it's not required.
|
297 |
- size = self.remove(to_remove) # pylint: disable=assignment-from-no-return
|
|
308 |
+ size = self.remove(to_remove)
|
|
298 | 309 |
|
299 | 310 |
# Remove the size from the removed size
|
300 | 311 |
self.set_cache_size(self._cache_size - size)
|
... | ... | @@ -311,7 +322,7 @@ class ArtifactCache(): |
311 | 322 |
# (int): The size of the artifact cache.
|
312 | 323 |
#
|
313 | 324 |
def compute_cache_size(self):
|
314 |
- self._cache_size = self.calculate_cache_size() # pylint: disable=assignment-from-no-return
|
|
325 |
+ self._cache_size = self.cas.calculate_cache_size()
|
|
315 | 326 |
|
316 | 327 |
return self._cache_size
|
317 | 328 |
|
... | ... | @@ -380,28 +391,12 @@ class ArtifactCache(): |
380 | 391 |
def has_quota_exceeded(self):
|
381 | 392 |
return self.get_cache_size() > self._cache_quota
|
382 | 393 |
|
383 |
- ################################################
|
|
384 |
- # Abstract methods for subclasses to implement #
|
|
385 |
- ################################################
|
|
386 |
- |
|
387 | 394 |
# preflight():
|
388 | 395 |
#
|
389 | 396 |
# Preflight check.
|
390 | 397 |
#
|
391 | 398 |
def preflight(self):
|
392 |
- pass
|
|
393 |
- |
|
394 |
- # update_mtime()
|
|
395 |
- #
|
|
396 |
- # Update the mtime of an artifact.
|
|
397 |
- #
|
|
398 |
- # Args:
|
|
399 |
- # element (Element): The Element to update
|
|
400 |
- # key (str): The key of the artifact.
|
|
401 |
- #
|
|
402 |
- def update_mtime(self, element, key):
|
|
403 |
- raise ImplError("Cache '{kind}' does not implement update_mtime()"
|
|
404 |
- .format(kind=type(self).__name__))
|
|
399 |
+ self.cas.preflight()
|
|
405 | 400 |
|
406 | 401 |
# initialize_remotes():
|
407 | 402 |
#
|
... | ... | @@ -411,7 +406,59 @@ class ArtifactCache(): |
411 | 406 |
# on_failure (callable): Called if we fail to contact one of the caches.
|
412 | 407 |
#
|
413 | 408 |
def initialize_remotes(self, *, on_failure=None):
|
414 |
- pass
|
|
409 |
+ remote_specs = self.global_remote_specs
|
|
410 |
+ |
|
411 |
+ for project in self.project_remote_specs:
|
|
412 |
+ remote_specs += self.project_remote_specs[project]
|
|
413 |
+ |
|
414 |
+ remote_specs = list(utils._deduplicate(remote_specs))
|
|
415 |
+ |
|
416 |
+ remotes = {}
|
|
417 |
+ q = multiprocessing.Queue()
|
|
418 |
+ for remote_spec in remote_specs:
|
|
419 |
+ # Use subprocess to avoid creation of gRPC threads in main BuildStream process
|
|
420 |
+ # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
|
|
421 |
+ p = multiprocessing.Process(target=self.cas.initialize_remote, args=(remote_spec, q))
|
|
422 |
+ |
|
423 |
+ try:
|
|
424 |
+ # Keep SIGINT blocked in the child process
|
|
425 |
+ with _signals.blocked([signal.SIGINT], ignore=False):
|
|
426 |
+ p.start()
|
|
427 |
+ |
|
428 |
+ error = q.get()
|
|
429 |
+ p.join()
|
|
430 |
+ except KeyboardInterrupt:
|
|
431 |
+ utils._kill_process_tree(p.pid)
|
|
432 |
+ raise
|
|
433 |
+ |
|
434 |
+ if error and on_failure:
|
|
435 |
+ on_failure(remote_spec.url, error)
|
|
436 |
+ elif error:
|
|
437 |
+ raise ArtifactError(error)
|
|
438 |
+ else:
|
|
439 |
+ self._has_fetch_remotes = True
|
|
440 |
+ if remote_spec.push:
|
|
441 |
+ self._has_push_remotes = True
|
|
442 |
+ |
|
443 |
+ remotes[remote_spec.url] = CASRemote(remote_spec)
|
|
444 |
+ |
|
445 |
+ for project in self.context.get_projects():
|
|
446 |
+ remote_specs = self.global_remote_specs
|
|
447 |
+ if project in self.project_remote_specs:
|
|
448 |
+ remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project]))
|
|
449 |
+ |
|
450 |
+ project_remotes = []
|
|
451 |
+ |
|
452 |
+ for remote_spec in remote_specs:
|
|
453 |
+ # Errors are already handled in the loop above,
|
|
454 |
+ # skip unreachable remotes here.
|
|
455 |
+ if remote_spec.url not in remotes:
|
|
456 |
+ continue
|
|
457 |
+ |
|
458 |
+ remote = remotes[remote_spec.url]
|
|
459 |
+ project_remotes.append(remote)
|
|
460 |
+ |
|
461 |
+ self._remotes[project] = project_remotes
|
|
415 | 462 |
|
416 | 463 |
# contains():
|
417 | 464 |
#
|
... | ... | @@ -425,8 +472,25 @@ class ArtifactCache(): |
425 | 472 |
# Returns: True if the artifact is in the cache, False otherwise
|
426 | 473 |
#
|
427 | 474 |
def contains(self, element, key):
|
428 |
- raise ImplError("Cache '{kind}' does not implement contains()"
|
|
429 |
- .format(kind=type(self).__name__))
|
|
475 |
+ ref = self.get_artifact_fullname(element, key)
|
|
476 |
+ |
|
477 |
+ return self.cas.contains(ref)
|
|
478 |
+ |
|
479 |
+ # contains_subdir_artifact():
|
|
480 |
+ #
|
|
481 |
+ # Check whether an artifact element contains a digest for a subdir
|
|
482 |
+ # which is populated in the cache, i.e non dangling.
|
|
483 |
+ #
|
|
484 |
+ # Args:
|
|
485 |
+ # element (Element): The Element to check
|
|
486 |
+ # key (str): The cache key to use
|
|
487 |
+ # subdir (str): The subdir to check
|
|
488 |
+ #
|
|
489 |
+ # Returns: True if the subdir exists & is populated in the cache, False otherwise
|
|
490 |
+ #
|
|
491 |
+ def contains_subdir_artifact(self, element, key, subdir):
|
|
492 |
+ ref = self.get_artifact_fullname(element, key)
|
|
493 |
+ return self.cas.contains_subdir_artifact(ref, subdir)
|
|
430 | 494 |
|
431 | 495 |
# list_artifacts():
|
432 | 496 |
#
|
... | ... | @@ -437,8 +501,7 @@ class ArtifactCache(): |
437 | 501 |
# `ArtifactCache.get_artifact_fullname` in LRU order
|
438 | 502 |
#
|
439 | 503 |
def list_artifacts(self):
|
440 |
- raise ImplError("Cache '{kind}' does not implement list_artifacts()"
|
|
441 |
- .format(kind=type(self).__name__))
|
|
504 |
+ return self.cas.list_refs()
|
|
442 | 505 |
|
443 | 506 |
# remove():
|
444 | 507 |
#
|
... | ... | @@ -450,9 +513,31 @@ class ArtifactCache(): |
450 | 513 |
# generated by
|
451 | 514 |
# `ArtifactCache.get_artifact_fullname`)
|
452 | 515 |
#
|
453 |
- def remove(self, artifact_name):
|
|
454 |
- raise ImplError("Cache '{kind}' does not implement remove()"
|
|
455 |
- .format(kind=type(self).__name__))
|
|
516 |
+ # Returns:
|
|
517 |
+ # (int|None) The amount of space pruned from the repository in
|
|
518 |
+ # Bytes, or None if defer_prune is True
|
|
519 |
+ #
|
|
520 |
+ def remove(self, ref):
|
|
521 |
+ |
|
522 |
+ # Remove extract if not used by other ref
|
|
523 |
+ tree = self.cas.resolve_ref(ref)
|
|
524 |
+ ref_name, ref_hash = os.path.split(ref)
|
|
525 |
+ extract = os.path.join(self.extractdir, ref_name, tree.hash)
|
|
526 |
+ keys_file = os.path.join(extract, 'meta', 'keys.yaml')
|
|
527 |
+ if os.path.exists(keys_file):
|
|
528 |
+ keys_meta = _yaml.load(keys_file)
|
|
529 |
+ keys = [keys_meta['strong'], keys_meta['weak']]
|
|
530 |
+ remove_extract = True
|
|
531 |
+ for other_hash in keys:
|
|
532 |
+ if other_hash == ref_hash:
|
|
533 |
+ continue
|
|
534 |
+ remove_extract = False
|
|
535 |
+ break
|
|
536 |
+ |
|
537 |
+ if remove_extract:
|
|
538 |
+ utils._force_rmtree(extract)
|
|
539 |
+ |
|
540 |
+ return self.cas.remove(ref)
|
|
456 | 541 |
|
457 | 542 |
# extract():
|
458 | 543 |
#
|
... | ... | @@ -464,6 +549,7 @@ class ArtifactCache(): |
464 | 549 |
# Args:
|
465 | 550 |
# element (Element): The Element to extract
|
466 | 551 |
# key (str): The cache key to use
|
552 |
+ # subdir (str): Optional specific subdir to extract
|
|
467 | 553 |
#
|
468 | 554 |
# Raises:
|
469 | 555 |
# ArtifactError: In cases there was an OSError, or if the artifact
|
... | ... | @@ -471,9 +557,12 @@ class ArtifactCache(): |
471 | 557 |
#
|
472 | 558 |
# Returns: path to extracted artifact
|
473 | 559 |
#
|
474 |
- def extract(self, element, key):
|
|
475 |
- raise ImplError("Cache '{kind}' does not implement extract()"
|
|
476 |
- .format(kind=type(self).__name__))
|
|
560 |
+ def extract(self, element, key, subdir=None):
|
|
561 |
+ ref = self.get_artifact_fullname(element, key)
|
|
562 |
+ |
|
563 |
+ path = os.path.join(self.extractdir, element._get_project().name, element.normal_name)
|
|
564 |
+ |
|
565 |
+ return self.cas.extract(ref, path, subdir=subdir)
|
|
477 | 566 |
|
478 | 567 |
# commit():
|
479 | 568 |
#
|
... | ... | @@ -485,8 +574,9 @@ class ArtifactCache(): |
485 | 574 |
# keys (list): The cache keys to use
|
486 | 575 |
#
|
487 | 576 |
def commit(self, element, content, keys):
|
488 |
- raise ImplError("Cache '{kind}' does not implement commit()"
|
|
489 |
- .format(kind=type(self).__name__))
|
|
577 |
+ refs = [self.get_artifact_fullname(element, key) for key in keys]
|
|
578 |
+ |
|
579 |
+ self.cas.commit(refs, content)
|
|
490 | 580 |
|
491 | 581 |
# diff():
|
492 | 582 |
#
|
... | ... | @@ -500,8 +590,10 @@ class ArtifactCache(): |
500 | 590 |
# subdir (str): A subdirectory to limit the comparison to
|
501 | 591 |
#
|
502 | 592 |
def diff(self, element, key_a, key_b, *, subdir=None):
|
503 |
- raise ImplError("Cache '{kind}' does not implement diff()"
|
|
504 |
- .format(kind=type(self).__name__))
|
|
593 |
+ ref_a = self.get_artifact_fullname(element, key_a)
|
|
594 |
+ ref_b = self.get_artifact_fullname(element, key_b)
|
|
595 |
+ |
|
596 |
+ return self.cas.diff(ref_a, ref_b, subdir=subdir)
|
|
505 | 597 |
|
506 | 598 |
# has_fetch_remotes():
|
507 | 599 |
#
|
... | ... | @@ -513,7 +605,16 @@ class ArtifactCache(): |
513 | 605 |
# Returns: True if any remote repositories are configured, False otherwise
|
514 | 606 |
#
|
515 | 607 |
def has_fetch_remotes(self, *, element=None):
|
516 |
- return False
|
|
608 |
+ if not self._has_fetch_remotes:
|
|
609 |
+ # No project has fetch remotes
|
|
610 |
+ return False
|
|
611 |
+ elif element is None:
|
|
612 |
+ # At least one (sub)project has fetch remotes
|
|
613 |
+ return True
|
|
614 |
+ else:
|
|
615 |
+ # Check whether the specified element's project has fetch remotes
|
|
616 |
+ remotes_for_project = self._remotes[element._get_project()]
|
|
617 |
+ return bool(remotes_for_project)
|
|
517 | 618 |
|
518 | 619 |
# has_push_remotes():
|
519 | 620 |
#
|
... | ... | @@ -525,7 +626,16 @@ class ArtifactCache(): |
525 | 626 |
# Returns: True if any remote repository is configured, False otherwise
|
526 | 627 |
#
|
527 | 628 |
def has_push_remotes(self, *, element=None):
|
528 |
- return False
|
|
629 |
+ if not self._has_push_remotes:
|
|
630 |
+ # No project has push remotes
|
|
631 |
+ return False
|
|
632 |
+ elif element is None:
|
|
633 |
+ # At least one (sub)project has push remotes
|
|
634 |
+ return True
|
|
635 |
+ else:
|
|
636 |
+ # Check whether the specified element's project has push remotes
|
|
637 |
+ remotes_for_project = self._remotes[element._get_project()]
|
|
638 |
+ return any(remote.spec.push for remote in remotes_for_project)
|
|
529 | 639 |
|
530 | 640 |
# push():
|
531 | 641 |
#
|
... | ... | @@ -542,8 +652,28 @@ class ArtifactCache(): |
542 | 652 |
# (ArtifactError): if there was an error
|
543 | 653 |
#
|
544 | 654 |
def push(self, element, keys):
|
545 |
- raise ImplError("Cache '{kind}' does not implement push()"
|
|
546 |
- .format(kind=type(self).__name__))
|
|
655 |
+ refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
|
|
656 |
+ |
|
657 |
+ project = element._get_project()
|
|
658 |
+ |
|
659 |
+ push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
|
660 |
+ |
|
661 |
+ pushed = False
|
|
662 |
+ |
|
663 |
+ for remote in push_remotes:
|
|
664 |
+ remote.init()
|
|
665 |
+ display_key = element._get_brief_display_key()
|
|
666 |
+ element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
|
|
667 |
+ |
|
668 |
+ if self.cas.push(refs, remote):
|
|
669 |
+ element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
|
|
670 |
+ pushed = True
|
|
671 |
+ else:
|
|
672 |
+ element.info("Remote ({}) already has {} cached".format(
|
|
673 |
+ remote.spec.url, element._get_brief_display_key()
|
|
674 |
+ ))
|
|
675 |
+ |
|
676 |
+ return pushed
|
|
547 | 677 |
|
548 | 678 |
# pull():
|
549 | 679 |
#
|
... | ... | @@ -553,13 +683,142 @@ class ArtifactCache(): |
553 | 683 |
# element (Element): The Element whose artifact is to be fetched
|
554 | 684 |
# key (str): The cache key to use
|
555 | 685 |
# progress (callable): The progress callback, if any
|
686 |
+ # subdir (str): The optional specific subdir to pull
|
|
687 |
+ # excluded_subdirs (list): The optional list of subdirs to not pull
|
|
556 | 688 |
#
|
557 | 689 |
# Returns:
|
558 | 690 |
# (bool): True if pull was successful, False if artifact was not available
|
559 | 691 |
#
|
560 |
- def pull(self, element, key, *, progress=None):
|
|
561 |
- raise ImplError("Cache '{kind}' does not implement pull()"
|
|
562 |
- .format(kind=type(self).__name__))
|
|
692 |
+ def pull(self, element, key, *, progress=None, subdir=None, excluded_subdirs=None):
|
|
693 |
+ ref = self.get_artifact_fullname(element, key)
|
|
694 |
+ |
|
695 |
+ project = element._get_project()
|
|
696 |
+ |
|
697 |
+ for remote in self._remotes[project]:
|
|
698 |
+ try:
|
|
699 |
+ display_key = element._get_brief_display_key()
|
|
700 |
+ element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url))
|
|
701 |
+ |
|
702 |
+ if self.cas.pull(ref, remote, progress=progress, subdir=subdir, excluded_subdirs=excluded_subdirs):
|
|
703 |
+ element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
|
|
704 |
+ if subdir:
|
|
705 |
+ # Attempt to extract subdir into artifact extract dir if it already exists
|
|
706 |
+ # without containing the subdir. If the respective artifact extract dir does not
|
|
707 |
+ # exist a complete extraction will complete.
|
|
708 |
+ self.extract(element, key, subdir)
|
|
709 |
+ # no need to pull from additional remotes
|
|
710 |
+ return True
|
|
711 |
+ else:
|
|
712 |
+ element.info("Remote ({}) does not have {} cached".format(
|
|
713 |
+ remote.spec.url, element._get_brief_display_key()
|
|
714 |
+ ))
|
|
715 |
+ |
|
716 |
+ except CASError as e:
|
|
717 |
+ raise ArtifactError("Failed to pull artifact {}: {}".format(
|
|
718 |
+ element._get_brief_display_key(), e)) from e
|
|
719 |
+ |
|
720 |
+ return False
|
|
721 |
+ |
|
722 |
+ # pull_tree():
|
|
723 |
+ #
|
|
724 |
+ # Pull a single Tree rather than an artifact.
|
|
725 |
+ # Does not update local refs.
|
|
726 |
+ #
|
|
727 |
+ # Args:
|
|
728 |
+ # project (Project): The current project
|
|
729 |
+ # digest (Digest): The digest of the tree
|
|
730 |
+ #
|
|
731 |
+ def pull_tree(self, project, digest):
|
|
732 |
+ for remote in self._remotes[project]:
|
|
733 |
+ digest = self.cas.pull_tree(remote, digest)
|
|
734 |
+ |
|
735 |
+ if digest:
|
|
736 |
+ # no need to pull from additional remotes
|
|
737 |
+ return digest
|
|
738 |
+ |
|
739 |
+ return None
|
|
740 |
+ |
|
741 |
+ # push_directory():
|
|
742 |
+ #
|
|
743 |
+ # Push the given virtual directory to all remotes.
|
|
744 |
+ #
|
|
745 |
+ # Args:
|
|
746 |
+ # project (Project): The current project
|
|
747 |
+ # directory (Directory): A virtual directory object to push.
|
|
748 |
+ #
|
|
749 |
+ # Raises:
|
|
750 |
+ # (ArtifactError): if there was an error
|
|
751 |
+ #
|
|
752 |
+ def push_directory(self, project, directory):
|
|
753 |
+ if self._has_push_remotes:
|
|
754 |
+ push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
|
755 |
+ else:
|
|
756 |
+ push_remotes = []
|
|
757 |
+ |
|
758 |
+ if not push_remotes:
|
|
759 |
+ raise ArtifactError("push_directory was called, but no remote artifact " +
|
|
760 |
+ "servers are configured as push remotes.")
|
|
761 |
+ |
|
762 |
+ if directory.ref is None:
|
|
763 |
+ return
|
|
764 |
+ |
|
765 |
+ for remote in push_remotes:
|
|
766 |
+ self.cas.push_directory(remote, directory)
|
|
767 |
+ |
|
768 |
+ # push_message():
|
|
769 |
+ #
|
|
770 |
+ # Push the given protobuf message to all remotes.
|
|
771 |
+ #
|
|
772 |
+ # Args:
|
|
773 |
+ # project (Project): The current project
|
|
774 |
+ # message (Message): A protobuf message to push.
|
|
775 |
+ #
|
|
776 |
+ # Raises:
|
|
777 |
+ # (ArtifactError): if there was an error
|
|
778 |
+ #
|
|
779 |
+ def push_message(self, project, message):
|
|
780 |
+ |
|
781 |
+ if self._has_push_remotes:
|
|
782 |
+ push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
|
783 |
+ else:
|
|
784 |
+ push_remotes = []
|
|
785 |
+ |
|
786 |
+ if not push_remotes:
|
|
787 |
+ raise ArtifactError("push_message was called, but no remote artifact " +
|
|
788 |
+ "servers are configured as push remotes.")
|
|
789 |
+ |
|
790 |
+ for remote in push_remotes:
|
|
791 |
+ message_digest = self.cas.push_message(remote, message)
|
|
792 |
+ |
|
793 |
+ return message_digest
|
|
794 |
+ |
|
795 |
+ # verify_digest_pushed():
|
|
796 |
+ #
|
|
797 |
+ # Check whether the object is already on the server in which case
|
|
798 |
+ # there is no need to upload it.
|
|
799 |
+ #
|
|
800 |
+ # Args:
|
|
801 |
+ # project (Project): The current project
|
|
802 |
+ # digest (Digest): The object digest.
|
|
803 |
+ #
|
|
804 |
+ def verify_digest_pushed(self, project, digest):
|
|
805 |
+ |
|
806 |
+ if self._has_push_remotes:
|
|
807 |
+ push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
|
808 |
+ else:
|
|
809 |
+ push_remotes = []
|
|
810 |
+ |
|
811 |
+ if not push_remotes:
|
|
812 |
+ raise ArtifactError("verify_digest_pushed was called, but no remote artifact " +
|
|
813 |
+ "servers are configured as push remotes.")
|
|
814 |
+ |
|
815 |
+ pushed = False
|
|
816 |
+ |
|
817 |
+ for remote in push_remotes:
|
|
818 |
+ if self.cas.verify_digest_on_remote(remote, digest):
|
|
819 |
+ pushed = True
|
|
820 |
+ |
|
821 |
+ return pushed
|
|
563 | 822 |
|
564 | 823 |
# link_key():
|
565 | 824 |
#
|
... | ... | @@ -571,19 +830,10 @@ class ArtifactCache(): |
571 | 830 |
# newkey (str): A new cache key for the artifact
|
572 | 831 |
#
|
573 | 832 |
def link_key(self, element, oldkey, newkey):
|
574 |
- raise ImplError("Cache '{kind}' does not implement link_key()"
|
|
575 |
- .format(kind=type(self).__name__))
|
|
833 |
+ oldref = self.get_artifact_fullname(element, oldkey)
|
|
834 |
+ newref = self.get_artifact_fullname(element, newkey)
|
|
576 | 835 |
|
577 |
- # calculate_cache_size()
|
|
578 |
- #
|
|
579 |
- # Return the real artifact cache size.
|
|
580 |
- #
|
|
581 |
- # Returns:
|
|
582 |
- # (int): The size of the artifact cache.
|
|
583 |
- #
|
|
584 |
- def calculate_cache_size(self):
|
|
585 |
- raise ImplError("Cache '{kind}' does not implement calculate_cache_size()"
|
|
586 |
- .format(kind=type(self).__name__))
|
|
836 |
+ self.cas.link_ref(oldref, newref)
|
|
587 | 837 |
|
588 | 838 |
################################################
|
589 | 839 |
# Local Private Methods #
|
... | ... | @@ -20,9 +20,7 @@ |
20 | 20 |
import hashlib
|
21 | 21 |
import itertools
|
22 | 22 |
import io
|
23 |
-import multiprocessing
|
|
24 | 23 |
import os
|
25 |
-import signal
|
|
26 | 24 |
import stat
|
27 | 25 |
import tempfile
|
28 | 26 |
import uuid
|
... | ... | @@ -31,17 +29,13 @@ from urllib.parse import urlparse |
31 | 29 |
|
32 | 30 |
import grpc
|
33 | 31 |
|
34 |
-from .. import _yaml
|
|
35 |
- |
|
36 | 32 |
from .._protos.google.rpc import code_pb2
|
37 | 33 |
from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
|
38 | 34 |
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
39 | 35 |
from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
|
40 | 36 |
|
41 |
-from .. import _signals, utils
|
|
42 |
-from .._exceptions import ArtifactError
|
|
43 |
- |
|
44 |
-from . import ArtifactCache
|
|
37 |
+from .. import utils
|
|
38 |
+from .._exceptions import CASError
|
|
45 | 39 |
|
46 | 40 |
|
47 | 41 |
# The default limit for gRPC messages is 4 MiB.
|
... | ... | @@ -49,62 +43,101 @@ from . import ArtifactCache |
49 | 43 |
_MAX_PAYLOAD_BYTES = 1024 * 1024
|
50 | 44 |
|
51 | 45 |
|
52 |
-# A CASCache manages artifacts in a CAS repository as specified in the
|
|
53 |
-# Remote Execution API.
|
|
46 |
+# A CASCache manages a CAS repository as specified in the Remote Execution API.
|
|
54 | 47 |
#
|
55 | 48 |
# Args:
|
56 |
-# context (Context): The BuildStream context
|
|
49 |
+# path (str): The root directory for the CAS repository
|
|
57 | 50 |
#
|
58 |
-# Pushing is explicitly disabled by the platform in some cases,
|
|
59 |
-# like when we are falling back to functioning without using
|
|
60 |
-# user namespaces.
|
|
61 |
-#
|
|
62 |
-class CASCache(ArtifactCache):
|
|
63 |
- |
|
64 |
- def __init__(self, context):
|
|
65 |
- super().__init__(context)
|
|
51 |
+class CASCache():
|
|
66 | 52 |
|
67 |
- self.casdir = os.path.join(context.artifactdir, 'cas')
|
|
53 |
+ def __init__(self, path):
|
|
54 |
+ self.casdir = os.path.join(path, 'cas')
|
|
55 |
+ self.tmpdir = os.path.join(path, 'tmp')
|
|
68 | 56 |
os.makedirs(os.path.join(self.casdir, 'refs', 'heads'), exist_ok=True)
|
69 | 57 |
os.makedirs(os.path.join(self.casdir, 'objects'), exist_ok=True)
|
58 |
+ os.makedirs(self.tmpdir, exist_ok=True)
|
|
70 | 59 |
|
71 |
- self._calculate_cache_quota()
|
|
72 |
- |
|
73 |
- # Per-project list of _CASRemote instances.
|
|
74 |
- self._remotes = {}
|
|
75 |
- |
|
76 |
- self._has_fetch_remotes = False
|
|
77 |
- self._has_push_remotes = False
|
|
78 |
- |
|
79 |
- ################################################
|
|
80 |
- # Implementation of abstract methods #
|
|
81 |
- ################################################
|
|
82 |
- |
|
60 |
+ # preflight():
|
|
61 |
+ #
|
|
62 |
+ # Preflight check.
|
|
63 |
+ #
|
|
83 | 64 |
def preflight(self):
|
84 | 65 |
headdir = os.path.join(self.casdir, 'refs', 'heads')
|
85 | 66 |
objdir = os.path.join(self.casdir, 'objects')
|
86 | 67 |
if not (os.path.isdir(headdir) and os.path.isdir(objdir)):
|
87 |
- raise ArtifactError("CAS repository check failed for '{}'"
|
|
88 |
- .format(self.casdir))
|
|
68 |
+ raise CASError("CAS repository check failed for '{}'".format(self.casdir))
|
|
89 | 69 |
|
90 |
- def contains(self, element, key):
|
|
91 |
- refpath = self._refpath(self.get_artifact_fullname(element, key))
|
|
70 |
+ # contains():
|
|
71 |
+ #
|
|
72 |
+ # Check whether the specified ref is already available in the local CAS cache.
|
|
73 |
+ #
|
|
74 |
+ # Args:
|
|
75 |
+ # ref (str): The ref to check
|
|
76 |
+ #
|
|
77 |
+ # Returns: True if the ref is in the cache, False otherwise
|
|
78 |
+ #
|
|
79 |
+ def contains(self, ref):
|
|
80 |
+ refpath = self._refpath(ref)
|
|
92 | 81 |
|
93 | 82 |
# This assumes that the repository doesn't have any dangling pointers
|
94 | 83 |
return os.path.exists(refpath)
|
95 | 84 |
|
96 |
- def extract(self, element, key):
|
|
97 |
- ref = self.get_artifact_fullname(element, key)
|
|
85 |
+ # contains_subdir_artifact():
|
|
86 |
+ #
|
|
87 |
+ # Check whether the specified artifact element tree has a digest for a subdir
|
|
88 |
+ # which is populated in the cache, i.e non dangling.
|
|
89 |
+ #
|
|
90 |
+ # Args:
|
|
91 |
+ # ref (str): The ref to check
|
|
92 |
+ # subdir (str): The subdir to check
|
|
93 |
+ #
|
|
94 |
+ # Returns: True if the subdir exists & is populated in the cache, False otherwise
|
|
95 |
+ #
|
|
96 |
+ #
|
|
97 |
+ def contains_subdir_artifact(self, ref, subdir):
|
|
98 |
+ tree = self.resolve_ref(ref)
|
|
99 |
+ |
|
100 |
+ # This assumes that the subdir digest is present in the element tree
|
|
101 |
+ subdirdigest = self._get_subdir(tree, subdir)
|
|
102 |
+ objpath = self.objpath(subdirdigest)
|
|
103 |
+ |
|
104 |
+ # True if subdir content is cached or if empty as expected
|
|
105 |
+ return os.path.exists(objpath)
|
|
98 | 106 |
|
107 |
+ # extract():
|
|
108 |
+ #
|
|
109 |
+ # Extract cached directory for the specified ref if it hasn't
|
|
110 |
+ # already been extracted.
|
|
111 |
+ #
|
|
112 |
+ # Args:
|
|
113 |
+ # ref (str): The ref whose directory to extract
|
|
114 |
+ # path (str): The destination path
|
|
115 |
+ # subdir (str): Optional specific dir to extract
|
|
116 |
+ #
|
|
117 |
+ # Raises:
|
|
118 |
+ # CASError: In cases there was an OSError, or if the ref did not exist.
|
|
119 |
+ #
|
|
120 |
+ # Returns: path to extracted directory
|
|
121 |
+ #
|
|
122 |
+ def extract(self, ref, path, subdir=None):
|
|
99 | 123 |
tree = self.resolve_ref(ref, update_mtime=True)
|
100 | 124 |
|
101 |
- dest = os.path.join(self.extractdir, element._get_project().name,
|
|
102 |
- element.normal_name, tree.hash)
|
|
125 |
+ elementdest = dest = os.path.join(path, tree.hash)
|
|
126 |
+ |
|
127 |
+ # If artifact is already extracted, check if the optional subdir
|
|
128 |
+ # has also been extracted. If the artifact has not been extracted
|
|
129 |
+ # a full extraction would include the optional subdir
|
|
103 | 130 |
if os.path.isdir(dest):
|
104 |
- # artifact has already been extracted
|
|
105 |
- return dest
|
|
131 |
+ if subdir:
|
|
132 |
+ if not os.path.isdir(os.path.join(dest, subdir)):
|
|
133 |
+ dest = os.path.join(dest, subdir)
|
|
134 |
+ tree = self._get_subdir(tree, subdir)
|
|
135 |
+ else:
|
|
136 |
+ return dest
|
|
137 |
+ else:
|
|
138 |
+ return dest
|
|
106 | 139 |
|
107 |
- with tempfile.TemporaryDirectory(prefix='tmp', dir=self.extractdir) as tmpdir:
|
|
140 |
+ with tempfile.TemporaryDirectory(prefix='tmp', dir=self.tmpdir) as tmpdir:
|
|
108 | 141 |
checkoutdir = os.path.join(tmpdir, ref)
|
109 | 142 |
self._checkout(checkoutdir, tree)
|
110 | 143 |
|
... | ... | @@ -118,23 +151,35 @@ class CASCache(ArtifactCache): |
118 | 151 |
# If rename fails with these errors, another process beat
|
119 | 152 |
# us to it so just ignore.
|
120 | 153 |
if e.errno not in [errno.ENOTEMPTY, errno.EEXIST]:
|
121 |
- raise ArtifactError("Failed to extract artifact for ref '{}': {}"
|
|
122 |
- .format(ref, e)) from e
|
|
123 |
- |
|
124 |
- return dest
|
|
154 |
+ raise CASError("Failed to extract directory for ref '{}': {}".format(ref, e)) from e
|
|
125 | 155 |
|
126 |
- def commit(self, element, content, keys):
|
|
127 |
- refs = [self.get_artifact_fullname(element, key) for key in keys]
|
|
156 |
+ return elementdest
|
|
128 | 157 |
|
129 |
- tree = self._commit_directory(content)
|
|
158 |
+ # commit():
|
|
159 |
+ #
|
|
160 |
+ # Commit directory to cache.
|
|
161 |
+ #
|
|
162 |
+ # Args:
|
|
163 |
+ # refs (list): The refs to set
|
|
164 |
+ # path (str): The directory to import
|
|
165 |
+ #
|
|
166 |
+ def commit(self, refs, path):
|
|
167 |
+ tree = self._commit_directory(path)
|
|
130 | 168 |
|
131 | 169 |
for ref in refs:
|
132 | 170 |
self.set_ref(ref, tree)
|
133 | 171 |
|
134 |
- def diff(self, element, key_a, key_b, *, subdir=None):
|
|
135 |
- ref_a = self.get_artifact_fullname(element, key_a)
|
|
136 |
- ref_b = self.get_artifact_fullname(element, key_b)
|
|
137 |
- |
|
172 |
+ # diff():
|
|
173 |
+ #
|
|
174 |
+ # Return a list of files that have been added or modified between
|
|
175 |
+ # the refs described by ref_a and ref_b.
|
|
176 |
+ #
|
|
177 |
+ # Args:
|
|
178 |
+ # ref_a (str): The first ref
|
|
179 |
+ # ref_b (str): The second ref
|
|
180 |
+ # subdir (str): A subdirectory to limit the comparison to
|
|
181 |
+ #
|
|
182 |
+ def diff(self, ref_a, ref_b, *, subdir=None):
|
|
138 | 183 |
tree_a = self.resolve_ref(ref_a)
|
139 | 184 |
tree_b = self.resolve_ref(ref_b)
|
140 | 185 |
|
... | ... | @@ -150,158 +195,129 @@ class CASCache(ArtifactCache): |
150 | 195 |
|
151 | 196 |
return modified, removed, added
|
152 | 197 |
|
153 |
- def initialize_remotes(self, *, on_failure=None):
|
|
154 |
- remote_specs = self.global_remote_specs
|
|
155 |
- |
|
156 |
- for project in self.project_remote_specs:
|
|
157 |
- remote_specs += self.project_remote_specs[project]
|
|
158 |
- |
|
159 |
- remote_specs = list(utils._deduplicate(remote_specs))
|
|
160 |
- |
|
161 |
- remotes = {}
|
|
162 |
- q = multiprocessing.Queue()
|
|
163 |
- for remote_spec in remote_specs:
|
|
164 |
- # Use subprocess to avoid creation of gRPC threads in main BuildStream process
|
|
165 |
- # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
|
|
166 |
- p = multiprocessing.Process(target=self._initialize_remote, args=(remote_spec, q))
|
|
198 |
+ def initialize_remote(self, remote_spec, q):
|
|
199 |
+ try:
|
|
200 |
+ remote = CASRemote(remote_spec)
|
|
201 |
+ remote.init()
|
|
167 | 202 |
|
168 |
- try:
|
|
169 |
- # Keep SIGINT blocked in the child process
|
|
170 |
- with _signals.blocked([signal.SIGINT], ignore=False):
|
|
171 |
- p.start()
|
|
172 |
- |
|
173 |
- error = q.get()
|
|
174 |
- p.join()
|
|
175 |
- except KeyboardInterrupt:
|
|
176 |
- utils._kill_process_tree(p.pid)
|
|
177 |
- raise
|
|
203 |
+ request = buildstream_pb2.StatusRequest()
|
|
204 |
+ response = remote.ref_storage.Status(request)
|
|
178 | 205 |
|
179 |
- if error and on_failure:
|
|
180 |
- on_failure(remote_spec.url, error)
|
|
181 |
- elif error:
|
|
182 |
- raise ArtifactError(error)
|
|
206 |
+ if remote_spec.push and not response.allow_updates:
|
|
207 |
+ q.put('CAS server does not allow push')
|
|
183 | 208 |
else:
|
184 |
- self._has_fetch_remotes = True
|
|
185 |
- if remote_spec.push:
|
|
186 |
- self._has_push_remotes = True
|
|
209 |
+ # No error
|
|
210 |
+ q.put(None)
|
|
187 | 211 |
|
188 |
- remotes[remote_spec.url] = _CASRemote(remote_spec)
|
|
212 |
+ except grpc.RpcError as e:
|
|
213 |
+ # str(e) is too verbose for errors reported to the user
|
|
214 |
+ q.put(e.details())
|
|
189 | 215 |
|
190 |
- for project in self.context.get_projects():
|
|
191 |
- remote_specs = self.global_remote_specs
|
|
192 |
- if project in self.project_remote_specs:
|
|
193 |
- remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project]))
|
|
216 |
+ except Exception as e: # pylint: disable=broad-except
|
|
217 |
+ # Whatever happens, we need to return it to the calling process
|
|
218 |
+ #
|
|
219 |
+ q.put(str(e))
|
|
194 | 220 |
|
195 |
- project_remotes = []
|
|
221 |
+ # pull():
|
|
222 |
+ #
|
|
223 |
+ # Pull a ref from a remote repository.
|
|
224 |
+ #
|
|
225 |
+ # Args:
|
|
226 |
+ # ref (str): The ref to pull
|
|
227 |
+ # remote (CASRemote): The remote repository to pull from
|
|
228 |
+ # progress (callable): The progress callback, if any
|
|
229 |
+ # subdir (str): The optional specific subdir to pull
|
|
230 |
+ # excluded_subdirs (list): The optional list of subdirs to not pull
|
|
231 |
+ #
|
|
232 |
+ # Returns:
|
|
233 |
+ # (bool): True if pull was successful, False if ref was not available
|
|
234 |
+ #
|
|
235 |
+ def pull(self, ref, remote, *, progress=None, subdir=None, excluded_subdirs=None):
|
|
236 |
+ try:
|
|
237 |
+ remote.init()
|
|
196 | 238 |
|
197 |
- for remote_spec in remote_specs:
|
|
198 |
- # Errors are already handled in the loop above,
|
|
199 |
- # skip unreachable remotes here.
|
|
200 |
- if remote_spec.url not in remotes:
|
|
201 |
- continue
|
|
239 |
+ request = buildstream_pb2.GetReferenceRequest()
|
|
240 |
+ request.key = ref
|
|
241 |
+ response = remote.ref_storage.GetReference(request)
|
|
202 | 242 |
|
203 |
- remote = remotes[remote_spec.url]
|
|
204 |
- project_remotes.append(remote)
|
|
243 |
+ tree = remote_execution_pb2.Digest()
|
|
244 |
+ tree.hash = response.digest.hash
|
|
245 |
+ tree.size_bytes = response.digest.size_bytes
|
|
205 | 246 |
|
206 |
- self._remotes[project] = project_remotes
|
|
247 |
+ # Check if the element artifact is present, if so just fetch the subdir.
|
|
248 |
+ if subdir and os.path.exists(self.objpath(tree)):
|
|
249 |
+ self._fetch_subdir(remote, tree, subdir)
|
|
250 |
+ else:
|
|
251 |
+ # Fetch artifact, excluded_subdirs determined in pullqueue
|
|
252 |
+ self._fetch_directory(remote, tree, excluded_subdirs=excluded_subdirs)
|
|
207 | 253 |
|
208 |
- def has_fetch_remotes(self, *, element=None):
|
|
209 |
- if not self._has_fetch_remotes:
|
|
210 |
- # No project has fetch remotes
|
|
211 |
- return False
|
|
212 |
- elif element is None:
|
|
213 |
- # At least one (sub)project has fetch remotes
|
|
214 |
- return True
|
|
215 |
- else:
|
|
216 |
- # Check whether the specified element's project has fetch remotes
|
|
217 |
- remotes_for_project = self._remotes[element._get_project()]
|
|
218 |
- return bool(remotes_for_project)
|
|
254 |
+ self.set_ref(ref, tree)
|
|
219 | 255 |
|
220 |
- def has_push_remotes(self, *, element=None):
|
|
221 |
- if not self._has_push_remotes:
|
|
222 |
- # No project has push remotes
|
|
223 |
- return False
|
|
224 |
- elif element is None:
|
|
225 |
- # At least one (sub)project has push remotes
|
|
226 | 256 |
return True
|
227 |
- else:
|
|
228 |
- # Check whether the specified element's project has push remotes
|
|
229 |
- remotes_for_project = self._remotes[element._get_project()]
|
|
230 |
- return any(remote.spec.push for remote in remotes_for_project)
|
|
231 |
- |
|
232 |
- def pull(self, element, key, *, progress=None):
|
|
233 |
- ref = self.get_artifact_fullname(element, key)
|
|
234 |
- |
|
235 |
- project = element._get_project()
|
|
236 |
- |
|
237 |
- for remote in self._remotes[project]:
|
|
238 |
- try:
|
|
239 |
- remote.init()
|
|
240 |
- display_key = element._get_brief_display_key()
|
|
241 |
- element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url))
|
|
242 |
- |
|
243 |
- request = buildstream_pb2.GetReferenceRequest()
|
|
244 |
- request.key = ref
|
|
245 |
- response = remote.ref_storage.GetReference(request)
|
|
246 |
- |
|
247 |
- tree = remote_execution_pb2.Digest()
|
|
248 |
- tree.hash = response.digest.hash
|
|
249 |
- tree.size_bytes = response.digest.size_bytes
|
|
250 |
- |
|
251 |
- self._fetch_directory(remote, tree)
|
|
252 |
- |
|
253 |
- self.set_ref(ref, tree)
|
|
254 |
- |
|
255 |
- element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
|
|
256 |
- # no need to pull from additional remotes
|
|
257 |
- return True
|
|
258 |
- |
|
259 |
- except grpc.RpcError as e:
|
|
260 |
- if e.code() != grpc.StatusCode.NOT_FOUND:
|
|
261 |
- raise ArtifactError("Failed to pull artifact {}: {}".format(
|
|
262 |
- element._get_brief_display_key(), e)) from e
|
|
263 |
- else:
|
|
264 |
- element.info("Remote ({}) does not have {} cached".format(
|
|
265 |
- remote.spec.url, element._get_brief_display_key()
|
|
266 |
- ))
|
|
267 |
- |
|
268 |
- return False
|
|
269 |
- |
|
270 |
- def pull_tree(self, project, digest):
|
|
271 |
- """ Pull a single Tree rather than an artifact.
|
|
272 |
- Does not update local refs. """
|
|
257 |
+ except grpc.RpcError as e:
|
|
258 |
+ if e.code() != grpc.StatusCode.NOT_FOUND:
|
|
259 |
+ raise CASError("Failed to pull ref {}: {}".format(ref, e)) from e
|
|
260 |
+ else:
|
|
261 |
+ return False
|
|
273 | 262 |
|
274 |
- for remote in self._remotes[project]:
|
|
275 |
- try:
|
|
276 |
- remote.init()
|
|
263 |
+ # pull_tree():
|
|
264 |
+ #
|
|
265 |
+ # Pull a single Tree rather than a ref.
|
|
266 |
+ # Does not update local refs.
|
|
267 |
+ #
|
|
268 |
+ # Args:
|
|
269 |
+ # remote (CASRemote): The remote to pull from
|
|
270 |
+ # digest (Digest): The digest of the tree
|
|
271 |
+ #
|
|
272 |
+ def pull_tree(self, remote, digest):
|
|
273 |
+ try:
|
|
274 |
+ remote.init()
|
|
277 | 275 |
|
278 |
- digest = self._fetch_tree(remote, digest)
|
|
276 |
+ digest = self._fetch_tree(remote, digest)
|
|
279 | 277 |
|
280 |
- # no need to pull from additional remotes
|
|
281 |
- return digest
|
|
278 |
+ return digest
|
|
282 | 279 |
|
283 |
- except grpc.RpcError as e:
|
|
284 |
- if e.code() != grpc.StatusCode.NOT_FOUND:
|
|
285 |
- raise
|
|
280 |
+ except grpc.RpcError as e:
|
|
281 |
+ if e.code() != grpc.StatusCode.NOT_FOUND:
|
|
282 |
+ raise
|
|
286 | 283 |
|
287 | 284 |
return None
|
288 | 285 |
|
289 |
- def link_key(self, element, oldkey, newkey):
|
|
290 |
- oldref = self.get_artifact_fullname(element, oldkey)
|
|
291 |
- newref = self.get_artifact_fullname(element, newkey)
|
|
292 |
- |
|
286 |
+ # link_ref():
|
|
287 |
+ #
|
|
288 |
+ # Add an alias for an existing ref.
|
|
289 |
+ #
|
|
290 |
+ # Args:
|
|
291 |
+ # oldref (str): An existing ref
|
|
292 |
+ # newref (str): A new ref for the same directory
|
|
293 |
+ #
|
|
294 |
+ def link_ref(self, oldref, newref):
|
|
293 | 295 |
tree = self.resolve_ref(oldref)
|
294 | 296 |
|
295 | 297 |
self.set_ref(newref, tree)
|
296 | 298 |
|
297 |
- def _push_refs_to_remote(self, refs, remote):
|
|
299 |
+ # push():
|
|
300 |
+ #
|
|
301 |
+ # Push committed refs to remote repository.
|
|
302 |
+ #
|
|
303 |
+ # Args:
|
|
304 |
+ # refs (list): The refs to push
|
|
305 |
+ # remote (CASRemote): The remote to push to
|
|
306 |
+ #
|
|
307 |
+ # Returns:
|
|
308 |
+ # (bool): True if any remote was updated, False if no pushes were required
|
|
309 |
+ #
|
|
310 |
+ # Raises:
|
|
311 |
+ # (CASError): if there was an error
|
|
312 |
+ #
|
|
313 |
+ def push(self, refs, remote):
|
|
298 | 314 |
skipped_remote = True
|
299 | 315 |
try:
|
300 | 316 |
for ref in refs:
|
301 | 317 |
tree = self.resolve_ref(ref)
|
302 | 318 |
|
303 | 319 |
# Check whether ref is already on the server in which case
|
304 |
- # there is no need to push the artifact
|
|
320 |
+ # there is no need to push the ref
|
|
305 | 321 |
try:
|
306 | 322 |
request = buildstream_pb2.GetReferenceRequest()
|
307 | 323 |
request.key = ref
|
... | ... | @@ -327,65 +343,38 @@ class CASCache(ArtifactCache): |
327 | 343 |
skipped_remote = False
|
328 | 344 |
except grpc.RpcError as e:
|
329 | 345 |
if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
|
330 |
- raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
|
|
346 |
+ raise CASError("Failed to push ref {}: {}".format(refs, e), temporary=True) from e
|
|
331 | 347 |
|
332 | 348 |
return not skipped_remote
|
333 | 349 |
|
334 |
- def push(self, element, keys):
|
|
335 |
- |
|
336 |
- refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
|
|
337 |
- |
|
338 |
- project = element._get_project()
|
|
339 |
- |
|
340 |
- push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
|
341 |
- |
|
342 |
- pushed = False
|
|
343 |
- |
|
344 |
- for remote in push_remotes:
|
|
345 |
- remote.init()
|
|
346 |
- display_key = element._get_brief_display_key()
|
|
347 |
- element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
|
|
348 |
- |
|
349 |
- if self._push_refs_to_remote(refs, remote):
|
|
350 |
- element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
|
|
351 |
- pushed = True
|
|
352 |
- else:
|
|
353 |
- element.info("Remote ({}) already has {} cached".format(
|
|
354 |
- remote.spec.url, element._get_brief_display_key()
|
|
355 |
- ))
|
|
356 |
- |
|
357 |
- return pushed
|
|
358 |
- |
|
359 |
- def push_directory(self, project, directory):
|
|
360 |
- """ Push the given virtual directory to all remotes.
|
|
361 |
- |
|
362 |
- Args:
|
|
363 |
- project (Project): The current project
|
|
364 |
- directory (Directory): A virtual directory object to push.
|
|
365 |
- |
|
366 |
- Raises: ArtifactError if no push remotes are configured.
|
|
367 |
- """
|
|
368 |
- |
|
369 |
- if self._has_push_remotes:
|
|
370 |
- push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
|
371 |
- else:
|
|
372 |
- push_remotes = []
|
|
373 |
- |
|
374 |
- if not push_remotes:
|
|
375 |
- raise ArtifactError("CASCache: push_directory was called, but no remote artifact " +
|
|
376 |
- "servers are configured as push remotes.")
|
|
377 |
- |
|
378 |
- if directory.ref is None:
|
|
379 |
- return
|
|
380 |
- |
|
381 |
- for remote in push_remotes:
|
|
382 |
- remote.init()
|
|
383 |
- |
|
384 |
- self._send_directory(remote, directory.ref)
|
|
350 |
+ # push_directory():
|
|
351 |
+ #
|
|
352 |
+ # Push the given virtual directory to a remote.
|
|
353 |
+ #
|
|
354 |
+ # Args:
|
|
355 |
+ # remote (CASRemote): The remote to push to
|
|
356 |
+ # directory (Directory): A virtual directory object to push.
|
|
357 |
+ #
|
|
358 |
+ # Raises:
|
|
359 |
+ # (CASError): if there was an error
|
|
360 |
+ #
|
|
361 |
+ def push_directory(self, remote, directory):
|
|
362 |
+ remote.init()
|
|
385 | 363 |
|
386 |
- def push_message(self, project, message):
|
|
364 |
+ self._send_directory(remote, directory.ref)
|
|
387 | 365 |
|
388 |
- push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
|
366 |
+ # push_message():
|
|
367 |
+ #
|
|
368 |
+ # Push the given protobuf message to a remote.
|
|
369 |
+ #
|
|
370 |
+ # Args:
|
|
371 |
+ # remote (CASRemote): The remote to push to
|
|
372 |
+ # message (Message): A protobuf message to push.
|
|
373 |
+ #
|
|
374 |
+ # Raises:
|
|
375 |
+ # (CASError): if there was an error
|
|
376 |
+ #
|
|
377 |
+ def push_message(self, remote, message):
|
|
389 | 378 |
|
390 | 379 |
message_buffer = message.SerializeToString()
|
391 | 380 |
message_sha = hashlib.sha256(message_buffer)
|
... | ... | @@ -393,17 +382,25 @@ class CASCache(ArtifactCache): |
393 | 382 |
message_digest.hash = message_sha.hexdigest()
|
394 | 383 |
message_digest.size_bytes = len(message_buffer)
|
395 | 384 |
|
396 |
- for remote in push_remotes:
|
|
397 |
- remote.init()
|
|
385 |
+ remote.init()
|
|
398 | 386 |
|
399 |
- with io.BytesIO(message_buffer) as b:
|
|
400 |
- self._send_blob(remote, message_digest, b)
|
|
387 |
+ with io.BytesIO(message_buffer) as b:
|
|
388 |
+ self._send_blob(remote, message_digest, b)
|
|
401 | 389 |
|
402 | 390 |
return message_digest
|
403 | 391 |
|
404 |
- def _verify_digest_on_remote(self, remote, digest):
|
|
405 |
- # Check whether ref is already on the server in which case
|
|
406 |
- # there is no need to push the artifact
|
|
392 |
+ # verify_digest_on_remote():
|
|
393 |
+ #
|
|
394 |
+ # Check whether the object is already on the server in which case
|
|
395 |
+ # there is no need to upload it.
|
|
396 |
+ #
|
|
397 |
+ # Args:
|
|
398 |
+ # remote (CASRemote): The remote to check
|
|
399 |
+ # digest (Digest): The object digest.
|
|
400 |
+ #
|
|
401 |
+ def verify_digest_on_remote(self, remote, digest):
|
|
402 |
+ remote.init()
|
|
403 |
+ |
|
407 | 404 |
request = remote_execution_pb2.FindMissingBlobsRequest()
|
408 | 405 |
request.blob_digests.extend([digest])
|
409 | 406 |
|
... | ... | @@ -413,24 +410,6 @@ class CASCache(ArtifactCache): |
413 | 410 |
|
414 | 411 |
return True
|
415 | 412 |
|
416 |
- def verify_digest_pushed(self, project, digest):
|
|
417 |
- |
|
418 |
- push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
|
419 |
- |
|
420 |
- pushed = False
|
|
421 |
- |
|
422 |
- for remote in push_remotes:
|
|
423 |
- remote.init()
|
|
424 |
- |
|
425 |
- if self._verify_digest_on_remote(remote, digest):
|
|
426 |
- pushed = True
|
|
427 |
- |
|
428 |
- return pushed
|
|
429 |
- |
|
430 |
- ################################################
|
|
431 |
- # API Private Methods #
|
|
432 |
- ################################################
|
|
433 |
- |
|
434 | 413 |
# objpath():
|
435 | 414 |
#
|
436 | 415 |
# Return the path of an object based on its digest.
|
... | ... | @@ -496,7 +475,7 @@ class CASCache(ArtifactCache): |
496 | 475 |
pass
|
497 | 476 |
|
498 | 477 |
except OSError as e:
|
499 |
- raise ArtifactError("Failed to hash object: {}".format(e)) from e
|
|
478 |
+ raise CASError("Failed to hash object: {}".format(e)) from e
|
|
500 | 479 |
|
501 | 480 |
return digest
|
502 | 481 |
|
... | ... | @@ -537,26 +516,39 @@ class CASCache(ArtifactCache): |
537 | 516 |
return digest
|
538 | 517 |
|
539 | 518 |
except FileNotFoundError as e:
|
540 |
- raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e
|
|
519 |
+ raise CASError("Attempt to access unavailable ref: {}".format(e)) from e
|
|
541 | 520 |
|
542 |
- def update_mtime(self, element, key):
|
|
521 |
+ # update_mtime()
|
|
522 |
+ #
|
|
523 |
+ # Update the mtime of a ref.
|
|
524 |
+ #
|
|
525 |
+ # Args:
|
|
526 |
+ # ref (str): The ref to update
|
|
527 |
+ #
|
|
528 |
+ def update_mtime(self, ref):
|
|
543 | 529 |
try:
|
544 |
- ref = self.get_artifact_fullname(element, key)
|
|
545 | 530 |
os.utime(self._refpath(ref))
|
546 | 531 |
except FileNotFoundError as e:
|
547 |
- raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e
|
|
532 |
+ raise CASError("Attempt to access unavailable ref: {}".format(e)) from e
|
|
548 | 533 |
|
534 |
+ # calculate_cache_size()
|
|
535 |
+ #
|
|
536 |
+ # Return the real disk usage of the CAS cache.
|
|
537 |
+ #
|
|
538 |
+ # Returns:
|
|
539 |
+ # (int): The size of the cache.
|
|
540 |
+ #
|
|
549 | 541 |
def calculate_cache_size(self):
|
550 | 542 |
return utils._get_dir_size(self.casdir)
|
551 | 543 |
|
552 |
- # list_artifacts():
|
|
544 |
+ # list_refs():
|
|
553 | 545 |
#
|
554 |
- # List cached artifacts in Least Recently Modified (LRM) order.
|
|
546 |
+ # List refs in Least Recently Modified (LRM) order.
|
|
555 | 547 |
#
|
556 | 548 |
# Returns:
|
557 | 549 |
# (list) - A list of refs in LRM order
|
558 | 550 |
#
|
559 |
- def list_artifacts(self):
|
|
551 |
+ def list_refs(self):
|
|
560 | 552 |
# string of: /path/to/repo/refs/heads
|
561 | 553 |
ref_heads = os.path.join(self.casdir, 'refs', 'heads')
|
562 | 554 |
|
... | ... | @@ -571,7 +563,7 @@ class CASCache(ArtifactCache): |
571 | 563 |
mtimes.append(os.path.getmtime(ref_path))
|
572 | 564 |
|
573 | 565 |
# NOTE: Sorted will sort from earliest to latest, thus the
|
574 |
- # first element of this list will be the file modified earliest.
|
|
566 |
+ # first ref of this list will be the file modified earliest.
|
|
575 | 567 |
return [ref for _, ref in sorted(zip(mtimes, refs))]
|
576 | 568 |
|
577 | 569 |
# remove():
|
... | ... | @@ -590,28 +582,10 @@ class CASCache(ArtifactCache): |
590 | 582 |
#
|
591 | 583 |
def remove(self, ref, *, defer_prune=False):
|
592 | 584 |
|
593 |
- # Remove extract if not used by other ref
|
|
594 |
- tree = self.resolve_ref(ref)
|
|
595 |
- ref_name, ref_hash = os.path.split(ref)
|
|
596 |
- extract = os.path.join(self.extractdir, ref_name, tree.hash)
|
|
597 |
- keys_file = os.path.join(extract, 'meta', 'keys.yaml')
|
|
598 |
- if os.path.exists(keys_file):
|
|
599 |
- keys_meta = _yaml.load(keys_file)
|
|
600 |
- keys = [keys_meta['strong'], keys_meta['weak']]
|
|
601 |
- remove_extract = True
|
|
602 |
- for other_hash in keys:
|
|
603 |
- if other_hash == ref_hash:
|
|
604 |
- continue
|
|
605 |
- remove_extract = False
|
|
606 |
- break
|
|
607 |
- |
|
608 |
- if remove_extract:
|
|
609 |
- utils._force_rmtree(extract)
|
|
610 |
- |
|
611 | 585 |
# Remove cache ref
|
612 | 586 |
refpath = self._refpath(ref)
|
613 | 587 |
if not os.path.exists(refpath):
|
614 |
- raise ArtifactError("Could not find artifact for ref '{}'".format(ref))
|
|
588 |
+ raise CASError("Could not find ref '{}'".format(ref))
|
|
615 | 589 |
|
616 | 590 |
os.unlink(refpath)
|
617 | 591 |
|
... | ... | @@ -673,8 +647,10 @@ class CASCache(ArtifactCache): |
673 | 647 |
stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
|
674 | 648 |
|
675 | 649 |
for dirnode in directory.directories:
|
676 |
- fullpath = os.path.join(dest, dirnode.name)
|
|
677 |
- self._checkout(fullpath, dirnode.digest)
|
|
650 |
+ # Don't try to checkout a dangling ref
|
|
651 |
+ if os.path.exists(self.objpath(dirnode.digest)):
|
|
652 |
+ fullpath = os.path.join(dest, dirnode.name)
|
|
653 |
+ self._checkout(fullpath, dirnode.digest)
|
|
678 | 654 |
|
679 | 655 |
for symlinknode in directory.symlinks:
|
680 | 656 |
# symlink
|
... | ... | @@ -721,7 +697,7 @@ class CASCache(ArtifactCache): |
721 | 697 |
# The process serving the socket can't be cached anyway
|
722 | 698 |
pass
|
723 | 699 |
else:
|
724 |
- raise ArtifactError("Unsupported file type for {}".format(full_path))
|
|
700 |
+ raise CASError("Unsupported file type for {}".format(full_path))
|
|
725 | 701 |
|
726 | 702 |
return self.add_object(digest=dir_digest,
|
727 | 703 |
buffer=directory.SerializeToString())
|
... | ... | @@ -740,7 +716,7 @@ class CASCache(ArtifactCache): |
740 | 716 |
if dirnode.name == name:
|
741 | 717 |
return dirnode.digest
|
742 | 718 |
|
743 |
- raise ArtifactError("Subdirectory {} not found".format(name))
|
|
719 |
+ raise CASError("Subdirectory {} not found".format(name))
|
|
744 | 720 |
|
745 | 721 |
def _diff_trees(self, tree_a, tree_b, *, added, removed, modified, path=""):
|
746 | 722 |
dir_a = remote_execution_pb2.Directory()
|
... | ... | @@ -812,29 +788,6 @@ class CASCache(ArtifactCache): |
812 | 788 |
for dirnode in directory.directories:
|
813 | 789 |
self._reachable_refs_dir(reachable, dirnode.digest)
|
814 | 790 |
|
815 |
- def _initialize_remote(self, remote_spec, q):
|
|
816 |
- try:
|
|
817 |
- remote = _CASRemote(remote_spec)
|
|
818 |
- remote.init()
|
|
819 |
- |
|
820 |
- request = buildstream_pb2.StatusRequest()
|
|
821 |
- response = remote.ref_storage.Status(request)
|
|
822 |
- |
|
823 |
- if remote_spec.push and not response.allow_updates:
|
|
824 |
- q.put('Artifact server does not allow push')
|
|
825 |
- else:
|
|
826 |
- # No error
|
|
827 |
- q.put(None)
|
|
828 |
- |
|
829 |
- except grpc.RpcError as e:
|
|
830 |
- # str(e) is too verbose for errors reported to the user
|
|
831 |
- q.put(e.details())
|
|
832 |
- |
|
833 |
- except Exception as e: # pylint: disable=broad-except
|
|
834 |
- # Whatever happens, we need to return it to the calling process
|
|
835 |
- #
|
|
836 |
- q.put(str(e))
|
|
837 |
- |
|
838 | 791 |
def _required_blobs(self, directory_digest):
|
839 | 792 |
# parse directory, and recursively add blobs
|
840 | 793 |
d = remote_execution_pb2.Digest()
|
... | ... | @@ -952,11 +905,14 @@ class CASCache(ArtifactCache): |
952 | 905 |
# Args:
|
953 | 906 |
# remote (Remote): The remote to use.
|
954 | 907 |
# dir_digest (Digest): Digest object for the directory to fetch.
|
908 |
+ # excluded_subdirs (list): The optional list of subdirs to not fetch
|
|
955 | 909 |
#
|
956 |
- def _fetch_directory(self, remote, dir_digest):
|
|
910 |
+ def _fetch_directory(self, remote, dir_digest, *, excluded_subdirs=None):
|
|
957 | 911 |
fetch_queue = [dir_digest]
|
958 | 912 |
fetch_next_queue = []
|
959 | 913 |
batch = _CASBatchRead(remote)
|
914 |
+ if not excluded_subdirs:
|
|
915 |
+ excluded_subdirs = []
|
|
960 | 916 |
|
961 | 917 |
while len(fetch_queue) + len(fetch_next_queue) > 0:
|
962 | 918 |
if not fetch_queue:
|
... | ... | @@ -971,8 +927,9 @@ class CASCache(ArtifactCache): |
971 | 927 |
directory.ParseFromString(f.read())
|
972 | 928 |
|
973 | 929 |
for dirnode in directory.directories:
|
974 |
- batch = self._fetch_directory_node(remote, dirnode.digest, batch,
|
|
975 |
- fetch_queue, fetch_next_queue, recursive=True)
|
|
930 |
+ if dirnode.name not in excluded_subdirs:
|
|
931 |
+ batch = self._fetch_directory_node(remote, dirnode.digest, batch,
|
|
932 |
+ fetch_queue, fetch_next_queue, recursive=True)
|
|
976 | 933 |
|
977 | 934 |
for filenode in directory.files:
|
978 | 935 |
batch = self._fetch_directory_node(remote, filenode.digest, batch,
|
... | ... | @@ -981,6 +938,10 @@ class CASCache(ArtifactCache): |
981 | 938 |
# Fetch final batch
|
982 | 939 |
self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
|
983 | 940 |
|
941 |
+ def _fetch_subdir(self, remote, tree, subdir):
|
|
942 |
+ subdirdigest = self._get_subdir(tree, subdir)
|
|
943 |
+ self._fetch_directory(remote, subdirdigest)
|
|
944 |
+ |
|
984 | 945 |
def _fetch_tree(self, remote, digest):
|
985 | 946 |
# download but do not store the Tree object
|
986 | 947 |
with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
|
... | ... | @@ -1080,7 +1041,7 @@ class CASCache(ArtifactCache): |
1080 | 1041 |
|
1081 | 1042 |
# Represents a single remote CAS cache.
|
1082 | 1043 |
#
|
1083 |
-class _CASRemote():
|
|
1044 |
+class CASRemote():
|
|
1084 | 1045 |
def __init__(self, spec):
|
1085 | 1046 |
self.spec = spec
|
1086 | 1047 |
self._initialized = False
|
... | ... | @@ -1125,7 +1086,7 @@ class _CASRemote(): |
1125 | 1086 |
certificate_chain=client_cert_bytes)
|
1126 | 1087 |
self.channel = grpc.secure_channel('{}:{}'.format(url.hostname, port), credentials)
|
1127 | 1088 |
else:
|
1128 |
- raise ArtifactError("Unsupported URL: {}".format(self.spec.url))
|
|
1089 |
+ raise CASError("Unsupported URL: {}".format(self.spec.url))
|
|
1129 | 1090 |
|
1130 | 1091 |
self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel)
|
1131 | 1092 |
self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
|
... | ... | @@ -1203,10 +1164,10 @@ class _CASBatchRead(): |
1203 | 1164 |
|
1204 | 1165 |
for response in batch_response.responses:
|
1205 | 1166 |
if response.status.code != code_pb2.OK:
|
1206 |
- raise ArtifactError("Failed to download blob {}: {}".format(
|
|
1167 |
+ raise CASError("Failed to download blob {}: {}".format(
|
|
1207 | 1168 |
response.digest.hash, response.status.code))
|
1208 | 1169 |
if response.digest.size_bytes != len(response.data):
|
1209 |
- raise ArtifactError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
|
|
1170 |
+ raise CASError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
|
|
1210 | 1171 |
response.digest.hash, response.digest.size_bytes, len(response.data)))
|
1211 | 1172 |
|
1212 | 1173 |
yield (response.digest, response.data)
|
... | ... | @@ -1248,7 +1209,7 @@ class _CASBatchUpdate(): |
1248 | 1209 |
|
1249 | 1210 |
for response in batch_response.responses:
|
1250 | 1211 |
if response.status.code != code_pb2.OK:
|
1251 |
- raise ArtifactError("Failed to upload blob {}: {}".format(
|
|
1212 |
+ raise CASError("Failed to upload blob {}: {}".format(
|
|
1252 | 1213 |
response.digest.hash, response.status.code))
|
1253 | 1214 |
|
1254 | 1215 |
|
... | ... | @@ -32,8 +32,9 @@ from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remo |
32 | 32 |
from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
|
33 | 33 |
from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
|
34 | 34 |
|
35 |
-from .._exceptions import ArtifactError
|
|
36 |
-from .._context import Context
|
|
35 |
+from .._exceptions import CASError
|
|
36 |
+ |
|
37 |
+from .cascache import CASCache
|
|
37 | 38 |
|
38 | 39 |
|
39 | 40 |
# The default limit for gRPC messages is 4 MiB.
|
... | ... | @@ -55,26 +56,23 @@ class ArtifactTooLargeException(Exception): |
55 | 56 |
# enable_push (bool): Whether to allow blob uploads and artifact updates
|
56 | 57 |
#
|
57 | 58 |
def create_server(repo, *, enable_push):
|
58 |
- context = Context()
|
|
59 |
- context.artifactdir = os.path.abspath(repo)
|
|
60 |
- |
|
61 |
- artifactcache = context.artifactcache
|
|
59 |
+ cas = CASCache(os.path.abspath(repo))
|
|
62 | 60 |
|
63 | 61 |
# Use max_workers default from Python 3.5+
|
64 | 62 |
max_workers = (os.cpu_count() or 1) * 5
|
65 | 63 |
server = grpc.server(futures.ThreadPoolExecutor(max_workers))
|
66 | 64 |
|
67 | 65 |
bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
|
68 |
- _ByteStreamServicer(artifactcache, enable_push=enable_push), server)
|
|
66 |
+ _ByteStreamServicer(cas, enable_push=enable_push), server)
|
|
69 | 67 |
|
70 | 68 |
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
|
71 |
- _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
|
|
69 |
+ _ContentAddressableStorageServicer(cas, enable_push=enable_push), server)
|
|
72 | 70 |
|
73 | 71 |
remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
|
74 | 72 |
_CapabilitiesServicer(), server)
|
75 | 73 |
|
76 | 74 |
buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server(
|
77 |
- _ReferenceStorageServicer(artifactcache, enable_push=enable_push), server)
|
|
75 |
+ _ReferenceStorageServicer(cas, enable_push=enable_push), server)
|
|
78 | 76 |
|
79 | 77 |
return server
|
80 | 78 |
|
... | ... | @@ -333,7 +331,7 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): |
333 | 331 |
|
334 | 332 |
response.digest.hash = tree.hash
|
335 | 333 |
response.digest.size_bytes = tree.size_bytes
|
336 |
- except ArtifactError:
|
|
334 |
+ except CASError:
|
|
337 | 335 |
context.set_code(grpc.StatusCode.NOT_FOUND)
|
338 | 336 |
|
339 | 337 |
return response
|
... | ... | @@ -437,7 +435,7 @@ def _clean_up_cache(cas, object_size): |
437 | 435 |
return 0
|
438 | 436 |
|
439 | 437 |
# obtain a list of LRP artifacts
|
440 |
- LRP_artifacts = cas.list_artifacts()
|
|
438 |
+ LRP_artifacts = cas.list_refs()
|
|
441 | 439 |
|
442 | 440 |
removed_size = 0 # in bytes
|
443 | 441 |
while object_size - removed_size > free_disk_space:
|
... | ... | @@ -31,7 +31,6 @@ from ._exceptions import LoadError, LoadErrorReason, BstError |
31 | 31 |
from ._message import Message, MessageType
|
32 | 32 |
from ._profile import Topics, profile_start, profile_end
|
33 | 33 |
from ._artifactcache import ArtifactCache
|
34 |
-from ._artifactcache.cascache import CASCache
|
|
35 | 34 |
from ._workspaces import Workspaces
|
36 | 35 |
from .plugin import _plugin_lookup
|
37 | 36 |
|
... | ... | @@ -105,6 +104,9 @@ class Context(): |
105 | 104 |
# What to do when a build fails in non interactive mode
|
106 | 105 |
self.sched_error_action = 'continue'
|
107 | 106 |
|
107 |
+ # Whether or not to attempt to pull build trees globally
|
|
108 |
+ self.pull_build_trees = False
|
|
109 |
+ |
|
108 | 110 |
# Whether elements must be rebuilt when their dependencies have changed
|
109 | 111 |
self._strict_build_plan = None
|
110 | 112 |
|
... | ... | @@ -161,7 +163,7 @@ class Context(): |
161 | 163 |
_yaml.node_validate(defaults, [
|
162 | 164 |
'sourcedir', 'builddir', 'artifactdir', 'logdir',
|
163 | 165 |
'scheduler', 'artifacts', 'logging', 'projects',
|
164 |
- 'cache'
|
|
166 |
+ 'cache', 'pullbuildtrees'
|
|
165 | 167 |
])
|
166 | 168 |
|
167 | 169 |
for directory in ['sourcedir', 'builddir', 'artifactdir', 'logdir']:
|
... | ... | @@ -186,6 +188,9 @@ class Context(): |
186 | 188 |
# Load artifact share configuration
|
187 | 189 |
self.artifact_cache_specs = ArtifactCache.specs_from_config_node(defaults)
|
188 | 190 |
|
191 |
+ # Load pull build trees configuration
|
|
192 |
+ self.pull_build_trees = _yaml.node_get(defaults, bool, 'pullbuildtrees', default_value='False')
|
|
193 |
+ |
|
189 | 194 |
# Load logging config
|
190 | 195 |
logging = _yaml.node_get(defaults, Mapping, 'logging')
|
191 | 196 |
_yaml.node_validate(logging, [
|
... | ... | @@ -233,7 +238,7 @@ class Context(): |
233 | 238 |
@property
|
234 | 239 |
def artifactcache(self):
|
235 | 240 |
if not self._artifactcache:
|
236 |
- self._artifactcache = CASCache(self)
|
|
241 |
+ self._artifactcache = ArtifactCache(self)
|
|
237 | 242 |
|
238 | 243 |
return self._artifactcache
|
239 | 244 |
|
... | ... | @@ -47,7 +47,6 @@ class ElementFactory(PluginContext): |
47 | 47 |
# Args:
|
48 | 48 |
# context (object): The Context object for processing
|
49 | 49 |
# project (object): The project object
|
50 |
- # artifacts (ArtifactCache): The artifact cache
|
|
51 | 50 |
# meta (object): The loaded MetaElement
|
52 | 51 |
#
|
53 | 52 |
# Returns: A newly created Element object of the appropriate kind
|
... | ... | @@ -56,9 +55,9 @@ class ElementFactory(PluginContext): |
56 | 55 |
# PluginError (if the kind lookup failed)
|
57 | 56 |
# LoadError (if the element itself took issue with the config)
|
58 | 57 |
#
|
59 |
- def create(self, context, project, artifacts, meta):
|
|
58 |
+ def create(self, context, project, meta):
|
|
60 | 59 |
element_type, default_config = self.lookup(meta.kind)
|
61 |
- element = element_type(context, project, artifacts, meta, default_config)
|
|
60 |
+ element = element_type(context, project, meta, default_config)
|
|
62 | 61 |
version = self._format_versions.get(meta.kind, 0)
|
63 | 62 |
self._assert_plugin_format(element, version)
|
64 | 63 |
return element
|
... | ... | @@ -90,6 +90,7 @@ class ErrorDomain(Enum): |
90 | 90 |
APP = 12
|
91 | 91 |
STREAM = 13
|
92 | 92 |
VIRTUAL_FS = 14
|
93 |
+ CAS = 15
|
|
93 | 94 |
|
94 | 95 |
|
95 | 96 |
# BstError is an internal base exception class for BuildSream
|
... | ... | @@ -274,6 +275,15 @@ class ArtifactError(BstError): |
274 | 275 |
super().__init__(message, detail=detail, domain=ErrorDomain.ARTIFACT, reason=reason, temporary=True)
|
275 | 276 |
|
276 | 277 |
|
278 |
+# CASError
|
|
279 |
+#
|
|
280 |
+# Raised when errors are encountered in the CAS
|
|
281 |
+#
|
|
282 |
+class CASError(BstError):
|
|
283 |
+ def __init__(self, message, *, detail=None, reason=None, temporary=False):
|
|
284 |
+ super().__init__(message, detail=detail, domain=ErrorDomain.CAS, reason=reason, temporary=True)
|
|
285 |
+ |
|
286 |
+ |
|
277 | 287 |
# PipelineError
|
278 | 288 |
#
|
279 | 289 |
# Raised from pipeline operations
|
... | ... | @@ -182,7 +182,8 @@ class App(): |
182 | 182 |
'fetchers': 'sched_fetchers',
|
183 | 183 |
'builders': 'sched_builders',
|
184 | 184 |
'pushers': 'sched_pushers',
|
185 |
- 'network_retries': 'sched_network_retries'
|
|
185 |
+ 'network_retries': 'sched_network_retries',
|
|
186 |
+ 'pull_build_trees': 'pull_build_trees'
|
|
186 | 187 |
}
|
187 | 188 |
for cli_option, context_attr in override_map.items():
|
188 | 189 |
option_value = self._main_options.get(cli_option)
|
... | ... | @@ -219,6 +219,8 @@ def print_version(ctx, param, value): |
219 | 219 |
help="Specify a project option")
|
220 | 220 |
@click.option('--default-mirror', default=None,
|
221 | 221 |
help="The mirror to fetch from first, before attempting other mirrors")
|
222 |
+@click.option('--pull-build-trees', is_flag=True, default=None,
|
|
223 |
+ help="Include an element's build trees when pulling remote element artifacts")
|
|
222 | 224 |
@click.pass_context
|
223 | 225 |
def cli(context, **kwargs):
|
224 | 226 |
"""Build and manipulate BuildStream projects
|
... | ... | @@ -537,7 +537,7 @@ class Loader(): |
537 | 537 |
raise LoadError(LoadErrorReason.INVALID_DATA,
|
538 | 538 |
"{}: Expected junction but element kind is {}".format(filename, meta_element.kind))
|
539 | 539 |
|
540 |
- element = Element._new_from_meta(meta_element, self._context.artifactcache)
|
|
540 |
+ element = Element._new_from_meta(meta_element)
|
|
541 | 541 |
element._preflight()
|
542 | 542 |
|
543 | 543 |
sources = list(element.sources())
|
... | ... | @@ -106,7 +106,7 @@ class Pipeline(): |
106 | 106 |
|
107 | 107 |
profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in targets))
|
108 | 108 |
|
109 |
- elements = self._project.load_elements(targets, self._artifacts,
|
|
109 |
+ elements = self._project.load_elements(targets,
|
|
110 | 110 |
rewritable=rewritable,
|
111 | 111 |
fetch_subprojects=fetch_subprojects)
|
112 | 112 |
|
... | ... | @@ -224,18 +224,17 @@ class Project(): |
224 | 224 |
# Instantiate and return an element
|
225 | 225 |
#
|
226 | 226 |
# Args:
|
227 |
- # artifacts (ArtifactCache): The artifact cache
|
|
228 | 227 |
# meta (MetaElement): The loaded MetaElement
|
229 | 228 |
# first_pass (bool): Whether to use first pass configuration (for junctions)
|
230 | 229 |
#
|
231 | 230 |
# Returns:
|
232 | 231 |
# (Element): A newly created Element object of the appropriate kind
|
233 | 232 |
#
|
234 |
- def create_element(self, artifacts, meta, *, first_pass=False):
|
|
233 |
+ def create_element(self, meta, *, first_pass=False):
|
|
235 | 234 |
if first_pass:
|
236 |
- return self.first_pass_config.element_factory.create(self._context, self, artifacts, meta)
|
|
235 |
+ return self.first_pass_config.element_factory.create(self._context, self, meta)
|
|
237 | 236 |
else:
|
238 |
- return self.config.element_factory.create(self._context, self, artifacts, meta)
|
|
237 |
+ return self.config.element_factory.create(self._context, self, meta)
|
|
239 | 238 |
|
240 | 239 |
# create_source()
|
241 | 240 |
#
|
... | ... | @@ -305,7 +304,6 @@ class Project(): |
305 | 304 |
#
|
306 | 305 |
# Args:
|
307 | 306 |
# targets (list): Target names
|
308 |
- # artifacts (ArtifactCache): Artifact cache
|
|
309 | 307 |
# rewritable (bool): Whether the loaded files should be rewritable
|
310 | 308 |
# this is a bit more expensive due to deep copies
|
311 | 309 |
# fetch_subprojects (bool): Whether we should fetch subprojects as a part of the
|
... | ... | @@ -314,7 +312,7 @@ class Project(): |
314 | 312 |
# Returns:
|
315 | 313 |
# (list): A list of loaded Element
|
316 | 314 |
#
|
317 |
- def load_elements(self, targets, artifacts, *,
|
|
315 |
+ def load_elements(self, targets, *,
|
|
318 | 316 |
rewritable=False, fetch_subprojects=False):
|
319 | 317 |
with self._context.timed_activity("Loading elements", silent_nested=True):
|
320 | 318 |
meta_elements = self.loader.load(targets, rewritable=rewritable,
|
... | ... | @@ -323,7 +321,7 @@ class Project(): |
323 | 321 |
|
324 | 322 |
with self._context.timed_activity("Resolving elements"):
|
325 | 323 |
elements = [
|
326 |
- Element._new_from_meta(meta, artifacts)
|
|
324 |
+ Element._new_from_meta(meta)
|
|
327 | 325 |
for meta in meta_elements
|
328 | 326 |
]
|
329 | 327 |
|
... | ... | @@ -32,9 +32,20 @@ class PullQueue(Queue): |
32 | 32 |
complete_name = "Pulled"
|
33 | 33 |
resources = [ResourceType.DOWNLOAD, ResourceType.CACHE]
|
34 | 34 |
|
35 |
+ def __init__(self, scheduler):
|
|
36 |
+ super().__init__(scheduler)
|
|
37 |
+ |
|
38 |
+ # Current default exclusions on pull
|
|
39 |
+ self._excluded_subdirs = ["buildtree"]
|
|
40 |
+ self._subdir = None
|
|
41 |
+ # If build trees are to be pulled, remove the value from exclusion list
|
|
42 |
+ if self._scheduler.context.pull_build_trees:
|
|
43 |
+ self._subdir = "buildtree"
|
|
44 |
+ self._excluded_subdirs.remove(self._subdir)
|
|
45 |
+ |
|
35 | 46 |
def process(self, element):
|
36 | 47 |
# returns whether an artifact was downloaded or not
|
37 |
- if not element._pull():
|
|
48 |
+ if not element._pull(subdir=self._subdir, excluded_subdirs=self._excluded_subdirs):
|
|
38 | 49 |
raise SkipJob(self.action_name)
|
39 | 50 |
|
40 | 51 |
def status(self, element):
|
... | ... | @@ -49,7 +60,7 @@ class PullQueue(Queue): |
49 | 60 |
if not element._can_query_cache():
|
50 | 61 |
return QueueStatus.WAIT
|
51 | 62 |
|
52 |
- if element._pull_pending():
|
|
63 |
+ if element._pull_pending(subdir=self._subdir):
|
|
53 | 64 |
return QueueStatus.READY
|
54 | 65 |
else:
|
55 | 66 |
return QueueStatus.SKIP
|
... | ... | @@ -97,3 +97,5 @@ logging: |
97 | 97 |
|
98 | 98 |
[%{elapsed}][%{key}][%{element}] %{action} %{message}
|
99 | 99 |
|
100 |
+# Whether to pull buildtrees when downloading element artifacts
|
|
101 |
+pullbuildtrees: False
|
... | ... | @@ -174,7 +174,7 @@ class Element(Plugin): |
174 | 174 |
*Since: 1.4*
|
175 | 175 |
"""
|
176 | 176 |
|
177 |
- def __init__(self, context, project, artifacts, meta, plugin_conf):
|
|
177 |
+ def __init__(self, context, project, meta, plugin_conf):
|
|
178 | 178 |
|
179 | 179 |
self.__cache_key_dict = None # Dict for cache key calculation
|
180 | 180 |
self.__cache_key = None # Our cached cache key
|
... | ... | @@ -199,7 +199,7 @@ class Element(Plugin): |
199 | 199 |
self.__sources = [] # List of Sources
|
200 | 200 |
self.__weak_cache_key = None # Our cached weak cache key
|
201 | 201 |
self.__strict_cache_key = None # Our cached cache key for strict builds
|
202 |
- self.__artifacts = artifacts # Artifact cache
|
|
202 |
+ self.__artifacts = context.artifactcache # Artifact cache
|
|
203 | 203 |
self.__consistency = Consistency.INCONSISTENT # Cached overall consistency state
|
204 | 204 |
self.__strong_cached = None # Whether we have a cached artifact
|
205 | 205 |
self.__weak_cached = None # Whether we have a cached artifact
|
... | ... | @@ -872,14 +872,13 @@ class Element(Plugin): |
872 | 872 |
# and its dependencies from a meta element.
|
873 | 873 |
#
|
874 | 874 |
# Args:
|
875 |
- # artifacts (ArtifactCache): The artifact cache
|
|
876 | 875 |
# meta (MetaElement): The meta element
|
877 | 876 |
#
|
878 | 877 |
# Returns:
|
879 | 878 |
# (Element): A newly created Element instance
|
880 | 879 |
#
|
881 | 880 |
@classmethod
|
882 |
- def _new_from_meta(cls, meta, artifacts):
|
|
881 |
+ def _new_from_meta(cls, meta):
|
|
883 | 882 |
|
884 | 883 |
if not meta.first_pass:
|
885 | 884 |
meta.project.ensure_fully_loaded()
|
... | ... | @@ -887,7 +886,7 @@ class Element(Plugin): |
887 | 886 |
if meta in cls.__instantiated_elements:
|
888 | 887 |
return cls.__instantiated_elements[meta]
|
889 | 888 |
|
890 |
- element = meta.project.create_element(artifacts, meta, first_pass=meta.first_pass)
|
|
889 |
+ element = meta.project.create_element(meta, first_pass=meta.first_pass)
|
|
891 | 890 |
cls.__instantiated_elements[meta] = element
|
892 | 891 |
|
893 | 892 |
# Instantiate sources
|
... | ... | @@ -904,10 +903,10 @@ class Element(Plugin): |
904 | 903 |
|
905 | 904 |
# Instantiate dependencies
|
906 | 905 |
for meta_dep in meta.dependencies:
|
907 |
- dependency = Element._new_from_meta(meta_dep, artifacts)
|
|
906 |
+ dependency = Element._new_from_meta(meta_dep)
|
|
908 | 907 |
element.__runtime_dependencies.append(dependency)
|
909 | 908 |
for meta_dep in meta.build_dependencies:
|
910 |
- dependency = Element._new_from_meta(meta_dep, artifacts)
|
|
909 |
+ dependency = Element._new_from_meta(meta_dep)
|
|
911 | 910 |
element.__build_dependencies.append(dependency)
|
912 | 911 |
|
913 | 912 |
return element
|
... | ... | @@ -1399,9 +1398,18 @@ class Element(Plugin): |
1399 | 1398 |
.format(workspace.get_absolute_path())):
|
1400 | 1399 |
workspace.stage(temp_staging_directory)
|
1401 | 1400 |
elif self._cached():
|
1402 |
- # We have a cached buildtree to use, instead
|
|
1403 |
- artifact_base, _ = self.__extract()
|
|
1404 |
- import_dir = os.path.join(artifact_base, 'buildtree')
|
|
1401 |
+ # Check if we have a cached buildtree to use
|
|
1402 |
+ context = self._get_context()
|
|
1403 |
+ if context.get_strict():
|
|
1404 |
+ if self.__artifacts.contains_subdir_artifact(self, self.__strict_cache_key, 'buildtree'):
|
|
1405 |
+ artifact_base, _ = self.__extract()
|
|
1406 |
+ import_dir = os.path.join(artifact_base, 'buildtree')
|
|
1407 |
+ elif self.__artifacts.contains_subdir_artifact(self, self.__weak_cache_key, 'buildtree'):
|
|
1408 |
+ artifact_base, _ = self.__extract()
|
|
1409 |
+ import_dir = os.path.join(artifact_base, 'buildtree')
|
|
1410 |
+ else:
|
|
1411 |
+ self.warn("{} is cached without a buildtree, the source will be staged instead"
|
|
1412 |
+ .format(self.name))
|
|
1405 | 1413 |
else:
|
1406 | 1414 |
# No workspace, stage directly
|
1407 | 1415 |
for source in self.sources():
|
... | ... | @@ -1699,18 +1707,26 @@ class Element(Plugin): |
1699 | 1707 |
|
1700 | 1708 |
# _pull_pending()
|
1701 | 1709 |
#
|
1702 |
- # Check whether the artifact will be pulled.
|
|
1710 |
+ # Check whether the artifact will be pulled. If the pull operation is to
|
|
1711 |
+ # include a specific subdir of the element artifact (from cli or user conf)
|
|
1712 |
+ # then the local cache is queried for the subdirs existence.
|
|
1713 |
+ #
|
|
1714 |
+ # Args:
|
|
1715 |
+ # subdir (str): Whether the pull has been invoked with a specific subdir set
|
|
1703 | 1716 |
#
|
1704 | 1717 |
# Returns:
|
1705 | 1718 |
# (bool): Whether a pull operation is pending
|
1706 | 1719 |
#
|
1707 |
- def _pull_pending(self):
|
|
1720 |
+ def _pull_pending(self, subdir=None):
|
|
1708 | 1721 |
if self._get_workspace():
|
1709 | 1722 |
# Workspace builds are never pushed to artifact servers
|
1710 | 1723 |
return False
|
1711 | 1724 |
|
1712 |
- if self.__strong_cached:
|
|
1713 |
- # Artifact already in local cache
|
|
1725 |
+ if self.__strong_cached and subdir:
|
|
1726 |
+ # If we've specified a subdir, check if the subdir is cached locally
|
|
1727 |
+ if self.__artifacts.contains_subdir_artifact(self, self.__strict_cache_key, subdir):
|
|
1728 |
+ return False
|
|
1729 |
+ elif self.__strong_cached:
|
|
1714 | 1730 |
return False
|
1715 | 1731 |
|
1716 | 1732 |
# Pull is pending if artifact remote server available
|
... | ... | @@ -1732,50 +1748,27 @@ class Element(Plugin): |
1732 | 1748 |
|
1733 | 1749 |
self._update_state()
|
1734 | 1750 |
|
1735 |
- def _pull_strong(self, *, progress=None):
|
|
1736 |
- weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
|
|
1737 |
- |
|
1738 |
- key = self.__strict_cache_key
|
|
1739 |
- if not self.__artifacts.pull(self, key, progress=progress):
|
|
1740 |
- return False
|
|
1741 |
- |
|
1742 |
- # update weak ref by pointing it to this newly fetched artifact
|
|
1743 |
- self.__artifacts.link_key(self, key, weak_key)
|
|
1744 |
- |
|
1745 |
- return True
|
|
1746 |
- |
|
1747 |
- def _pull_weak(self, *, progress=None):
|
|
1748 |
- weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
|
|
1749 |
- |
|
1750 |
- if not self.__artifacts.pull(self, weak_key, progress=progress):
|
|
1751 |
- return False
|
|
1752 |
- |
|
1753 |
- # extract strong cache key from this newly fetched artifact
|
|
1754 |
- self._pull_done()
|
|
1755 |
- |
|
1756 |
- # create tag for strong cache key
|
|
1757 |
- key = self._get_cache_key(strength=_KeyStrength.STRONG)
|
|
1758 |
- self.__artifacts.link_key(self, weak_key, key)
|
|
1759 |
- |
|
1760 |
- return True
|
|
1761 |
- |
|
1762 | 1751 |
# _pull():
|
1763 | 1752 |
#
|
1764 | 1753 |
# Pull artifact from remote artifact repository into local artifact cache.
|
1765 | 1754 |
#
|
1755 |
+ # Args:
|
|
1756 |
+ # subdir (str): The optional specific subdir to pull
|
|
1757 |
+ # excluded_subdirs (list): The optional list of subdirs to not pull
|
|
1758 |
+ #
|
|
1766 | 1759 |
# Returns: True if the artifact has been downloaded, False otherwise
|
1767 | 1760 |
#
|
1768 |
- def _pull(self):
|
|
1761 |
+ def _pull(self, subdir=None, excluded_subdirs=None):
|
|
1769 | 1762 |
context = self._get_context()
|
1770 | 1763 |
|
1771 | 1764 |
def progress(percent, message):
|
1772 | 1765 |
self.status(message)
|
1773 | 1766 |
|
1774 | 1767 |
# Attempt to pull artifact without knowing whether it's available
|
1775 |
- pulled = self._pull_strong(progress=progress)
|
|
1768 |
+ pulled = self.__pull_strong(progress=progress, subdir=subdir, excluded_subdirs=excluded_subdirs)
|
|
1776 | 1769 |
|
1777 | 1770 |
if not pulled and not self._cached() and not context.get_strict():
|
1778 |
- pulled = self._pull_weak(progress=progress)
|
|
1771 |
+ pulled = self.__pull_weak(progress=progress, subdir=subdir, excluded_subdirs=excluded_subdirs)
|
|
1779 | 1772 |
|
1780 | 1773 |
if not pulled:
|
1781 | 1774 |
return False
|
... | ... | @@ -1798,10 +1791,21 @@ class Element(Plugin): |
1798 | 1791 |
if not self._cached():
|
1799 | 1792 |
return True
|
1800 | 1793 |
|
1801 |
- # Do not push tained artifact
|
|
1794 |
+ # Do not push tainted artifact
|
|
1802 | 1795 |
if self.__get_tainted():
|
1803 | 1796 |
return True
|
1804 | 1797 |
|
1798 |
+ # strict_cache_key can't be relied on to be available when running in non strict mode
|
|
1799 |
+ context = self._get_context()
|
|
1800 |
+ |
|
1801 |
+ # Do not push elements that have a dangling buildtree artifact unless element type is
|
|
1802 |
+ # expected to have an empty buildtree directory
|
|
1803 |
+ if context.get_strict():
|
|
1804 |
+ if not self.__artifacts.contains_subdir_artifact(self, self.__strict_cache_key, 'buildtree'):
|
|
1805 |
+ return True
|
|
1806 |
+ elif not self.__artifacts.contains_subdir_artifact(self, self.__weak_cache_key, 'buildtree'):
|
|
1807 |
+ return True
|
|
1808 |
+ |
|
1805 | 1809 |
return False
|
1806 | 1810 |
|
1807 | 1811 |
# _push():
|
... | ... | @@ -2057,7 +2061,7 @@ class Element(Plugin): |
2057 | 2061 |
'sources': [s._get_unique_key(workspace is None) for s in self.__sources],
|
2058 | 2062 |
'workspace': '' if workspace is None else workspace.get_key(self._get_project()),
|
2059 | 2063 |
'public': self.__public,
|
2060 |
- 'cache': type(self.__artifacts).__name__
|
|
2064 |
+ 'cache': 'CASCache'
|
|
2061 | 2065 |
}
|
2062 | 2066 |
|
2063 | 2067 |
self.__cache_key_dict['fatal-warnings'] = sorted(project._fatal_warnings)
|
... | ... | @@ -2682,6 +2686,59 @@ class Element(Plugin): |
2682 | 2686 |
|
2683 | 2687 |
return utils._deduplicate(keys)
|
2684 | 2688 |
|
2689 |
+ # __pull_strong():
|
|
2690 |
+ #
|
|
2691 |
+ # Attempt pulling given element from configured artifact caches with
|
|
2692 |
+ # the strict cache key
|
|
2693 |
+ #
|
|
2694 |
+ # Args:
|
|
2695 |
+ # progress (callable): The progress callback, if any
|
|
2696 |
+ # subdir (str): The optional specific subdir to pull
|
|
2697 |
+ # excluded_subdirs (list): The optional list of subdirs to not pull
|
|
2698 |
+ #
|
|
2699 |
+ # Returns:
|
|
2700 |
+ # (bool): Whether or not the pull was successful
|
|
2701 |
+ #
|
|
2702 |
+ def __pull_strong(self, *, progress=None, subdir=None, excluded_subdirs=None):
|
|
2703 |
+ weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
|
|
2704 |
+ key = self.__strict_cache_key
|
|
2705 |
+ if not self.__artifacts.pull(self, key, progress=progress, subdir=subdir,
|
|
2706 |
+ excluded_subdirs=excluded_subdirs):
|
|
2707 |
+ return False
|
|
2708 |
+ |
|
2709 |
+ # update weak ref by pointing it to this newly fetched artifact
|
|
2710 |
+ self.__artifacts.link_key(self, key, weak_key)
|
|
2711 |
+ |
|
2712 |
+ return True
|
|
2713 |
+ |
|
2714 |
+ # __pull_weak():
|
|
2715 |
+ #
|
|
2716 |
+ # Attempt pulling given element from configured artifact caches with
|
|
2717 |
+ # the weak cache key
|
|
2718 |
+ #
|
|
2719 |
+ # Args:
|
|
2720 |
+ # progress (callable): The progress callback, if any
|
|
2721 |
+ # subdir (str): The optional specific subdir to pull
|
|
2722 |
+ # excluded_subdirs (list): The optional list of subdirs to not pull
|
|
2723 |
+ #
|
|
2724 |
+ # Returns:
|
|
2725 |
+ # (bool): Whether or not the pull was successful
|
|
2726 |
+ #
|
|
2727 |
+ def __pull_weak(self, *, progress=None, subdir=None, excluded_subdirs=None):
|
|
2728 |
+ weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
|
|
2729 |
+ if not self.__artifacts.pull(self, weak_key, progress=progress, subdir=subdir,
|
|
2730 |
+ excluded_subdirs=excluded_subdirs):
|
|
2731 |
+ return False
|
|
2732 |
+ |
|
2733 |
+ # extract strong cache key from this newly fetched artifact
|
|
2734 |
+ self._pull_done()
|
|
2735 |
+ |
|
2736 |
+ # create tag for strong cache key
|
|
2737 |
+ key = self._get_cache_key(strength=_KeyStrength.STRONG)
|
|
2738 |
+ self.__artifacts.link_key(self, weak_key, key)
|
|
2739 |
+ |
|
2740 |
+ return True
|
|
2741 |
+ |
|
2685 | 2742 |
|
2686 | 2743 |
def _overlap_error_detail(f, forbidden_overlap_elements, elements):
|
2687 | 2744 |
if forbidden_overlap_elements:
|
... | ... | @@ -79,7 +79,7 @@ class CasBasedDirectory(Directory): |
79 | 79 |
self.filename = filename
|
80 | 80 |
self.common_name = common_name
|
81 | 81 |
self.pb2_directory = remote_execution_pb2.Directory()
|
82 |
- self.cas_cache = context.artifactcache
|
|
82 |
+ self.cas_cache = context.artifactcache.cas
|
|
83 | 83 |
if ref:
|
84 | 84 |
with open(self.cas_cache.objpath(ref), 'rb') as f:
|
85 | 85 |
self.pb2_directory.ParseFromString(f.read())
|
1 | 1 |
coverage == 4.4.0
|
2 | 2 |
pep8
|
3 | 3 |
pylint == 2.1.1
|
4 |
-pytest >= 3.7
|
|
4 |
+pytest >= 3.8
|
|
5 | 5 |
pytest-cov >= 2.5.0
|
6 | 6 |
pytest-datafiles
|
7 | 7 |
pytest-env
|
... | ... | @@ -90,7 +90,7 @@ def test_pull(cli, tmpdir, datafiles): |
90 | 90 |
cas = context.artifactcache
|
91 | 91 |
|
92 | 92 |
# Assert that the element's artifact is **not** cached
|
93 |
- element = project.load_elements(['target.bst'], cas)[0]
|
|
93 |
+ element = project.load_elements(['target.bst'])[0]
|
|
94 | 94 |
element_key = cli.get_element_key(project_dir, 'target.bst')
|
95 | 95 |
assert not cas.contains(element, element_key)
|
96 | 96 |
|
... | ... | @@ -132,7 +132,7 @@ def _test_pull(user_config_file, project_dir, artifact_dir, |
132 | 132 |
cas = context.artifactcache
|
133 | 133 |
|
134 | 134 |
# Load the target element
|
135 |
- element = project.load_elements([element_name], cas)[0]
|
|
135 |
+ element = project.load_elements([element_name])[0]
|
|
136 | 136 |
|
137 | 137 |
# Manually setup the CAS remote
|
138 | 138 |
cas.setup_remotes(use_config=True)
|
... | ... | @@ -190,15 +190,16 @@ def test_pull_tree(cli, tmpdir, datafiles): |
190 | 190 |
# Load the project and CAS cache
|
191 | 191 |
project = Project(project_dir, context)
|
192 | 192 |
project.ensure_fully_loaded()
|
193 |
- cas = context.artifactcache
|
|
193 |
+ artifactcache = context.artifactcache
|
|
194 |
+ cas = artifactcache.cas
|
|
194 | 195 |
|
195 | 196 |
# Assert that the element's artifact is cached
|
196 |
- element = project.load_elements(['target.bst'], cas)[0]
|
|
197 |
+ element = project.load_elements(['target.bst'])[0]
|
|
197 | 198 |
element_key = cli.get_element_key(project_dir, 'target.bst')
|
198 |
- assert cas.contains(element, element_key)
|
|
199 |
+ assert artifactcache.contains(element, element_key)
|
|
199 | 200 |
|
200 | 201 |
# Retrieve the Directory object from the cached artifact
|
201 |
- artifact_ref = cas.get_artifact_fullname(element, element_key)
|
|
202 |
+ artifact_ref = artifactcache.get_artifact_fullname(element, element_key)
|
|
202 | 203 |
artifact_digest = cas.resolve_ref(artifact_ref)
|
203 | 204 |
|
204 | 205 |
queue = multiprocessing.Queue()
|
... | ... | @@ -268,12 +269,13 @@ def _test_push_tree(user_config_file, project_dir, artifact_dir, artifact_digest |
268 | 269 |
project.ensure_fully_loaded()
|
269 | 270 |
|
270 | 271 |
# Create a local CAS cache handle
|
271 |
- cas = context.artifactcache
|
|
272 |
+ artifactcache = context.artifactcache
|
|
273 |
+ cas = artifactcache.cas
|
|
272 | 274 |
|
273 | 275 |
# Manually setup the CAS remote
|
274 |
- cas.setup_remotes(use_config=True)
|
|
276 |
+ artifactcache.setup_remotes(use_config=True)
|
|
275 | 277 |
|
276 |
- if cas.has_push_remotes():
|
|
278 |
+ if artifactcache.has_push_remotes():
|
|
277 | 279 |
directory = remote_execution_pb2.Directory()
|
278 | 280 |
|
279 | 281 |
with open(cas.objpath(artifact_digest), 'rb') as f:
|
... | ... | @@ -284,7 +286,7 @@ def _test_push_tree(user_config_file, project_dir, artifact_dir, artifact_digest |
284 | 286 |
tree_maker(cas, tree, directory)
|
285 | 287 |
|
286 | 288 |
# Push the Tree as a regular message
|
287 |
- tree_digest = cas.push_message(project, tree)
|
|
289 |
+ tree_digest = artifactcache.push_message(project, tree)
|
|
288 | 290 |
|
289 | 291 |
queue.put((tree_digest.hash, tree_digest.size_bytes))
|
290 | 292 |
else:
|
... | ... | @@ -69,7 +69,7 @@ def test_push(cli, tmpdir, datafiles): |
69 | 69 |
cas = context.artifactcache
|
70 | 70 |
|
71 | 71 |
# Assert that the element's artifact is cached
|
72 |
- element = project.load_elements(['target.bst'], cas)[0]
|
|
72 |
+ element = project.load_elements(['target.bst'])[0]
|
|
73 | 73 |
element_key = cli.get_element_key(project_dir, 'target.bst')
|
74 | 74 |
assert cas.contains(element, element_key)
|
75 | 75 |
|
... | ... | @@ -111,7 +111,7 @@ def _test_push(user_config_file, project_dir, artifact_dir, |
111 | 111 |
cas = context.artifactcache
|
112 | 112 |
|
113 | 113 |
# Load the target element
|
114 |
- element = project.load_elements([element_name], cas)[0]
|
|
114 |
+ element = project.load_elements([element_name])[0]
|
|
115 | 115 |
|
116 | 116 |
# Manually setup the CAS remote
|
117 | 117 |
cas.setup_remotes(use_config=True)
|
... | ... | @@ -165,20 +165,21 @@ def test_push_directory(cli, tmpdir, datafiles): |
165 | 165 |
# Load the project and CAS cache
|
166 | 166 |
project = Project(project_dir, context)
|
167 | 167 |
project.ensure_fully_loaded()
|
168 |
- cas = context.artifactcache
|
|
168 |
+ artifactcache = context.artifactcache
|
|
169 |
+ cas = artifactcache.cas
|
|
169 | 170 |
|
170 | 171 |
# Assert that the element's artifact is cached
|
171 |
- element = project.load_elements(['target.bst'], cas)[0]
|
|
172 |
+ element = project.load_elements(['target.bst'])[0]
|
|
172 | 173 |
element_key = cli.get_element_key(project_dir, 'target.bst')
|
173 |
- assert cas.contains(element, element_key)
|
|
174 |
+ assert artifactcache.contains(element, element_key)
|
|
174 | 175 |
|
175 | 176 |
# Manually setup the CAS remote
|
176 |
- cas.setup_remotes(use_config=True)
|
|
177 |
- cas.initialize_remotes()
|
|
178 |
- assert cas.has_push_remotes(element=element)
|
|
177 |
+ artifactcache.setup_remotes(use_config=True)
|
|
178 |
+ artifactcache.initialize_remotes()
|
|
179 |
+ assert artifactcache.has_push_remotes(element=element)
|
|
179 | 180 |
|
180 | 181 |
# Recreate the CasBasedDirectory object from the cached artifact
|
181 |
- artifact_ref = cas.get_artifact_fullname(element, element_key)
|
|
182 |
+ artifact_ref = artifactcache.get_artifact_fullname(element, element_key)
|
|
182 | 183 |
artifact_digest = cas.resolve_ref(artifact_ref)
|
183 | 184 |
|
184 | 185 |
queue = multiprocessing.Queue()
|
... | ... | @@ -42,6 +42,7 @@ MAIN_OPTIONS = [ |
42 | 42 |
"-o ",
|
43 | 43 |
"--option ",
|
44 | 44 |
"--on-error ",
|
45 |
+ "--pull-build-trees ",
|
|
45 | 46 |
"--pushers ",
|
46 | 47 |
"--strict ",
|
47 | 48 |
"--verbose ",
|
... | ... | @@ -70,8 +70,8 @@ def test_buildtree_pulled(cli, tmpdir, datafiles): |
70 | 70 |
})
|
71 | 71 |
assert cli.get_element_state(project, element_name) != 'cached'
|
72 | 72 |
|
73 |
- # Pull from cache
|
|
74 |
- result = cli.run(project=project, args=['pull', '--deps', 'all', element_name])
|
|
73 |
+ # Pull from cache, ensuring cli options is set to pull the buildtree
|
|
74 |
+ result = cli.run(project=project, args=['--pull-build-trees', 'pull', '--deps', 'all', element_name])
|
|
75 | 75 |
result.assert_success()
|
76 | 76 |
|
77 | 77 |
# Check it's using the cached build tree
|
1 |
+import os
|
|
2 |
+import shutil
|
|
3 |
+import pytest
|
|
4 |
+ |
|
5 |
+from tests.testutils import cli_integration as cli, create_artifact_share
|
|
6 |
+from tests.testutils.integration import assert_contains
|
|
7 |
+ |
|
8 |
+ |
|
9 |
+DATA_DIR = os.path.join(
|
|
10 |
+ os.path.dirname(os.path.realpath(__file__)),
|
|
11 |
+ "project"
|
|
12 |
+)
|
|
13 |
+ |
|
14 |
+ |
|
15 |
+# Remove artifact cache & set cli.config value of pullbuildtrees
|
|
16 |
+# to false, which is the default user context. The cache has to be
|
|
17 |
+# cleared as just forcefully removing the refpath leaves dangling objects.
|
|
18 |
+def default_state(cli, tmpdir, share):
|
|
19 |
+ shutil.rmtree(os.path.join(str(tmpdir), 'artifacts'))
|
|
20 |
+ cli.configure({
|
|
21 |
+ 'pullbuildtrees': False,
|
|
22 |
+ 'artifacts': {'url': share.repo, 'push': False},
|
|
23 |
+ 'artifactdir': os.path.join(str(tmpdir), 'artifacts')
|
|
24 |
+ })
|
|
25 |
+ |
|
26 |
+ |
|
27 |
+# A test to capture the integration of the pullbuildtrees
|
|
28 |
+# behaviour, which by default is to not include the buildtree
|
|
29 |
+# directory of an element.
|
|
30 |
+@pytest.mark.integration
|
|
31 |
+@pytest.mark.datafiles(DATA_DIR)
|
|
32 |
+def test_pullbuildtrees(cli, tmpdir, datafiles, integration_cache):
|
|
33 |
+ |
|
34 |
+ project = os.path.join(datafiles.dirname, datafiles.basename)
|
|
35 |
+ element_name = 'autotools/amhello.bst'
|
|
36 |
+ |
|
37 |
+ # Create artifact shares for pull & push testing
|
|
38 |
+ with create_artifact_share(os.path.join(str(tmpdir), 'share1')) as share1,\
|
|
39 |
+ create_artifact_share(os.path.join(str(tmpdir), 'share2')) as share2:
|
|
40 |
+ cli.configure({
|
|
41 |
+ 'artifacts': {'url': share1.repo, 'push': True},
|
|
42 |
+ 'artifactdir': os.path.join(str(tmpdir), 'artifacts')
|
|
43 |
+ })
|
|
44 |
+ |
|
45 |
+ # Build autotools element, checked pushed, delete local
|
|
46 |
+ result = cli.run(project=project, args=['build', element_name])
|
|
47 |
+ assert result.exit_code == 0
|
|
48 |
+ assert cli.get_element_state(project, element_name) == 'cached'
|
|
49 |
+ assert share1.has_artifact('test', element_name, cli.get_element_key(project, element_name))
|
|
50 |
+ default_state(cli, tmpdir, share1)
|
|
51 |
+ |
|
52 |
+ # Pull artifact with default config, assert that pulling again
|
|
53 |
+ # doesn't create a pull job, then assert with buildtrees user
|
|
54 |
+ # config set creates a pull job.
|
|
55 |
+ result = cli.run(project=project, args=['pull', element_name])
|
|
56 |
+ assert element_name in result.get_pulled_elements()
|
|
57 |
+ result = cli.run(project=project, args=['pull', element_name])
|
|
58 |
+ assert element_name not in result.get_pulled_elements()
|
|
59 |
+ cli.configure({'pullbuildtrees': True})
|
|
60 |
+ result = cli.run(project=project, args=['pull', element_name])
|
|
61 |
+ assert element_name in result.get_pulled_elements()
|
|
62 |
+ default_state(cli, tmpdir, share1)
|
|
63 |
+ |
|
64 |
+ # Pull artifact with default config, then assert that pulling
|
|
65 |
+ # with buildtrees cli flag set creates a pull job.
|
|
66 |
+ # Also assert that the buildtree is added to the artifact's
|
|
67 |
+ # extract dir
|
|
68 |
+ result = cli.run(project=project, args=['pull', element_name])
|
|
69 |
+ assert element_name in result.get_pulled_elements()
|
|
70 |
+ elementdigest = share1.has_artifact('test', element_name, cli.get_element_key(project, element_name))
|
|
71 |
+ buildtreedir = os.path.join(str(tmpdir), 'artifacts', 'extract', 'test', 'autotools-amhello',
|
|
72 |
+ elementdigest.hash, 'buildtree')
|
|
73 |
+ assert not os.path.isdir(buildtreedir)
|
|
74 |
+ result = cli.run(project=project, args=['--pull-build-trees', 'pull', element_name])
|
|
75 |
+ assert element_name in result.get_pulled_elements()
|
|
76 |
+ assert os.path.isdir(buildtreedir)
|
|
77 |
+ default_state(cli, tmpdir, share1)
|
|
78 |
+ |
|
79 |
+ # Pull artifact with pullbuildtrees set in user config, then assert
|
|
80 |
+ # that pulling with the same user config doesn't creates a pull job,
|
|
81 |
+ # or when buildtrees cli flag is set.
|
|
82 |
+ cli.configure({'pullbuildtrees': True})
|
|
83 |
+ result = cli.run(project=project, args=['pull', element_name])
|
|
84 |
+ assert element_name in result.get_pulled_elements()
|
|
85 |
+ result = cli.run(project=project, args=['pull', element_name])
|
|
86 |
+ assert element_name not in result.get_pulled_elements()
|
|
87 |
+ result = cli.run(project=project, args=['--pull-build-trees', 'pull', element_name])
|
|
88 |
+ assert element_name not in result.get_pulled_elements()
|
|
89 |
+ default_state(cli, tmpdir, share1)
|
|
90 |
+ |
|
91 |
+ # Pull artifact with default config and buildtrees cli flag set, then assert
|
|
92 |
+ # that pulling with pullbuildtrees set in user config doesn't create a pull
|
|
93 |
+ # job.
|
|
94 |
+ result = cli.run(project=project, args=['--pull-build-trees', 'pull', element_name])
|
|
95 |
+ assert element_name in result.get_pulled_elements()
|
|
96 |
+ cli.configure({'pullbuildtrees': True})
|
|
97 |
+ result = cli.run(project=project, args=['pull', element_name])
|
|
98 |
+ assert element_name not in result.get_pulled_elements()
|
|
99 |
+ default_state(cli, tmpdir, share1)
|
|
100 |
+ |
|
101 |
+ # Assert that a partial build element (not containing a populated buildtree dir)
|
|
102 |
+ # can't be pushed to an artifact share, then assert that a complete build element
|
|
103 |
+ # can be. This will attempt a partial pull from share1 and then a partial push
|
|
104 |
+ # to share2
|
|
105 |
+ result = cli.run(project=project, args=['pull', element_name])
|
|
106 |
+ assert element_name in result.get_pulled_elements()
|
|
107 |
+ cli.configure({'artifacts': {'url': share2.repo, 'push': True}})
|
|
108 |
+ result = cli.run(project=project, args=['push', element_name])
|
|
109 |
+ assert element_name not in result.get_pushed_elements()
|
|
110 |
+ assert not share2.has_artifact('test', element_name, cli.get_element_key(project, element_name))
|
|
111 |
+ |
|
112 |
+ # Assert that after pulling the missing buildtree the element artifact can be
|
|
113 |
+ # successfully pushed to the remote. This will attempt to pull the buildtree
|
|
114 |
+ # from share1 and then a 'complete' push to share2
|
|
115 |
+ cli.configure({'artifacts': {'url': share1.repo, 'push': False}})
|
|
116 |
+ result = cli.run(project=project, args=['--pull-build-trees', 'pull', element_name])
|
|
117 |
+ assert element_name in result.get_pulled_elements()
|
|
118 |
+ cli.configure({'artifacts': {'url': share2.repo, 'push': True}})
|
|
119 |
+ result = cli.run(project=project, args=['push', element_name])
|
|
120 |
+ assert element_name in result.get_pushed_elements()
|
|
121 |
+ assert share2.has_artifact('test', element_name, cli.get_element_key(project, element_name))
|
|
122 |
+ default_state(cli, tmpdir, share1)
|
... | ... | @@ -13,7 +13,7 @@ import pytest_cov |
13 | 13 |
from buildstream import _yaml
|
14 | 14 |
from buildstream._artifactcache.casserver import create_server
|
15 | 15 |
from buildstream._context import Context
|
16 |
-from buildstream._exceptions import ArtifactError
|
|
16 |
+from buildstream._exceptions import CASError
|
|
17 | 17 |
from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
18 | 18 |
|
19 | 19 |
|
... | ... | @@ -48,7 +48,7 @@ class ArtifactShare(): |
48 | 48 |
context = Context()
|
49 | 49 |
context.artifactdir = self.repodir
|
50 | 50 |
|
51 |
- self.cas = context.artifactcache
|
|
51 |
+ self.cas = context.artifactcache.cas
|
|
52 | 52 |
|
53 | 53 |
self.total_space = total_space
|
54 | 54 |
self.free_space = free_space
|
... | ... | @@ -114,7 +114,7 @@ class ArtifactShare(): |
114 | 114 |
# cache_key (str): The cache key
|
115 | 115 |
#
|
116 | 116 |
# Returns:
|
117 |
- # (bool): True if the artifact exists in the share, otherwise false.
|
|
117 |
+ # (str): artifact digest if the artifact exists in the share, otherwise None.
|
|
118 | 118 |
def has_artifact(self, project_name, element_name, cache_key):
|
119 | 119 |
|
120 | 120 |
# NOTE: This should be kept in line with our
|
... | ... | @@ -134,9 +134,9 @@ class ArtifactShare(): |
134 | 134 |
|
135 | 135 |
try:
|
136 | 136 |
tree = self.cas.resolve_ref(artifact_key)
|
137 |
- return True
|
|
138 |
- except ArtifactError:
|
|
139 |
- return False
|
|
137 |
+ return tree
|
|
138 |
+ except CASError:
|
|
139 |
+ return None
|
|
140 | 140 |
|
141 | 141 |
# close():
|
142 | 142 |
#
|