Benjamin Schubert pushed to branch bschubert/tests-no-chroot-linux at BuildStream / buildstream
Commits:
-
3788e701
by Jürg Billeter at 2018-11-03T11:52:00Z
-
82e971ef
by Jürg Billeter at 2018-11-05T11:33:20Z
-
62942bfd
by Valentin David at 2018-11-05T12:14:20Z
-
442da2f9
by Javier Jardón at 2018-11-05T12:41:54Z
-
0993de53
by Richard Maw at 2018-11-05T16:41:05Z
-
be8f0a54
by richardmaw-codethink at 2018-11-05T17:08:43Z
-
bfb639bf
by Jürg Billeter at 2018-11-05T17:18:12Z
-
ca855f91
by Jürg Billeter at 2018-11-05T17:18:12Z
-
15fed21c
by Jürg Billeter at 2018-11-05T17:18:12Z
-
0085d2aa
by Jürg Billeter at 2018-11-05T17:18:12Z
-
f69b1117
by Jürg Billeter at 2018-11-05T17:18:12Z
-
e398f877
by Jürg Billeter at 2018-11-05T17:18:12Z
-
626d20ae
by Jürg Billeter at 2018-11-05T17:18:12Z
-
ec04446b
by Jürg Billeter at 2018-11-05T17:51:39Z
-
9a045080
by Josh Smith at 2018-11-06T13:23:19Z
-
d3a07e6b
by Josh Smith at 2018-11-06T13:23:19Z
-
0c09fb9c
by richardmaw-codethink at 2018-11-06T14:12:20Z
-
b4eec489
by Jim MacArthur at 2018-11-06T16:44:14Z
-
1f7acf74
by Jim MacArthur at 2018-11-06T17:16:31Z
-
aa738ea7
by Benjamin Schubert at 2018-11-07T09:40:06Z
-
1bf9a3a8
by Benjamin Schubert at 2018-11-07T09:40:18Z
23 changed files:
- .gitlab-ci.yml
- buildstream/_artifactcache/artifactcache.py
- buildstream/_artifactcache/cascache.py
- buildstream/_artifactcache/casserver.py
- buildstream/_context.py
- buildstream/_elementfactory.py
- buildstream/_exceptions.py
- buildstream/_loader/loader.py
- buildstream/_pipeline.py
- buildstream/_project.py
- buildstream/element.py
- buildstream/sandbox/_sandboxdummy.py
- buildstream/sandbox/_sandboxremote.py
- buildstream/storage/_casbaseddirectory.py
- buildstream/utils.py
- conftest.py
- dev-requirements.txt
- tests/artifactcache/pull.py
- tests/artifactcache/push.py
- + tests/integration/missing_dependencies.py
- tests/testutils/artifactshare.py
- + tests/testutils/mock_os.py
- + tests/utils/misc.py
Changes:
| ... | ... | @@ -166,6 +166,12 @@ docs: |
| 166 | 166 |
BST_EXT_REF: 1d6ab71151b93c8cbc0a91a36ffe9270f3b835f1 # 0.5.1
|
| 167 | 167 |
FD_SDK_REF: 88d7c22c2281b987faa02edd57df80d430eecf1f # 18.08.11-35-g88d7c22c
|
| 168 | 168 |
before_script:
|
| 169 |
+ - |
|
|
| 170 |
+ mkdir -p "${HOME}/.config"
|
|
| 171 |
+ cat <<EOF >"${HOME}/.config/buildstream.conf"
|
|
| 172 |
+ scheduler:
|
|
| 173 |
+ fetchers: 2
|
|
| 174 |
+ EOF
|
|
| 169 | 175 |
- (cd dist && ./unpack.sh && cd buildstream && pip3 install .)
|
| 170 | 176 |
- pip3 install --user -e ${BST_EXT_URL}@${BST_EXT_REF}#egg=bst_ext
|
| 171 | 177 |
- git clone https://gitlab.com/freedesktop-sdk/freedesktop-sdk.git
|
| ... | ... | @@ -17,17 +17,22 @@ |
| 17 | 17 |
# Authors:
|
| 18 | 18 |
# Tristan Maat <tristan maat codethink co uk>
|
| 19 | 19 |
|
| 20 |
+import multiprocessing
|
|
| 20 | 21 |
import os
|
| 22 |
+import signal
|
|
| 21 | 23 |
import string
|
| 22 | 24 |
from collections import namedtuple
|
| 23 | 25 |
from collections.abc import Mapping
|
| 24 | 26 |
|
| 25 | 27 |
from ..types import _KeyStrength
|
| 26 |
-from .._exceptions import ArtifactError, ImplError, LoadError, LoadErrorReason
|
|
| 28 |
+from .._exceptions import ArtifactError, CASError, LoadError, LoadErrorReason
|
|
| 27 | 29 |
from .._message import Message, MessageType
|
| 30 |
+from .. import _signals
|
|
| 28 | 31 |
from .. import utils
|
| 29 | 32 |
from .. import _yaml
|
| 30 | 33 |
|
| 34 |
+from .cascache import CASCache, CASRemote
|
|
| 35 |
+ |
|
| 31 | 36 |
|
| 32 | 37 |
CACHE_SIZE_FILE = "cache_size"
|
| 33 | 38 |
|
| ... | ... | @@ -93,7 +98,8 @@ class ArtifactCache(): |
| 93 | 98 |
def __init__(self, context):
|
| 94 | 99 |
self.context = context
|
| 95 | 100 |
self.extractdir = os.path.join(context.artifactdir, 'extract')
|
| 96 |
- self.tmpdir = os.path.join(context.artifactdir, 'tmp')
|
|
| 101 |
+ |
|
| 102 |
+ self.cas = CASCache(context.artifactdir)
|
|
| 97 | 103 |
|
| 98 | 104 |
self.global_remote_specs = []
|
| 99 | 105 |
self.project_remote_specs = {}
|
| ... | ... | @@ -104,12 +110,15 @@ class ArtifactCache(): |
| 104 | 110 |
self._cache_lower_threshold = None # The target cache size for a cleanup
|
| 105 | 111 |
self._remotes_setup = False # Check to prevent double-setup of remotes
|
| 106 | 112 |
|
| 113 |
+ # Per-project list of _CASRemote instances.
|
|
| 114 |
+ self._remotes = {}
|
|
| 115 |
+ |
|
| 116 |
+ self._has_fetch_remotes = False
|
|
| 117 |
+ self._has_push_remotes = False
|
|
| 118 |
+ |
|
| 107 | 119 |
os.makedirs(self.extractdir, exist_ok=True)
|
| 108 |
- os.makedirs(self.tmpdir, exist_ok=True)
|
|
| 109 | 120 |
|
| 110 |
- ################################################
|
|
| 111 |
- # Methods implemented on the abstract class #
|
|
| 112 |
- ################################################
|
|
| 121 |
+ self._calculate_cache_quota()
|
|
| 113 | 122 |
|
| 114 | 123 |
# get_artifact_fullname()
|
| 115 | 124 |
#
|
| ... | ... | @@ -240,8 +249,10 @@ class ArtifactCache(): |
| 240 | 249 |
for key in (strong_key, weak_key):
|
| 241 | 250 |
if key:
|
| 242 | 251 |
try:
|
| 243 |
- self.update_mtime(element, key)
|
|
| 244 |
- except ArtifactError:
|
|
| 252 |
+ ref = self.get_artifact_fullname(element, key)
|
|
| 253 |
+ |
|
| 254 |
+ self.cas.update_mtime(ref)
|
|
| 255 |
+ except CASError:
|
|
| 245 | 256 |
pass
|
| 246 | 257 |
|
| 247 | 258 |
# clean():
|
| ... | ... | @@ -252,7 +263,7 @@ class ArtifactCache(): |
| 252 | 263 |
# (int): The size of the cache after having cleaned up
|
| 253 | 264 |
#
|
| 254 | 265 |
def clean(self):
|
| 255 |
- artifacts = self.list_artifacts() # pylint: disable=assignment-from-no-return
|
|
| 266 |
+ artifacts = self.list_artifacts()
|
|
| 256 | 267 |
|
| 257 | 268 |
# Build a set of the cache keys which are required
|
| 258 | 269 |
# based on the required elements at cleanup time
|
| ... | ... | @@ -294,7 +305,7 @@ class ArtifactCache(): |
| 294 | 305 |
if key not in required_artifacts:
|
| 295 | 306 |
|
| 296 | 307 |
# Remove the actual artifact, if it's not required.
|
| 297 |
- size = self.remove(to_remove) # pylint: disable=assignment-from-no-return
|
|
| 308 |
+ size = self.remove(to_remove)
|
|
| 298 | 309 |
|
| 299 | 310 |
# Remove the size from the removed size
|
| 300 | 311 |
self.set_cache_size(self._cache_size - size)
|
| ... | ... | @@ -311,7 +322,7 @@ class ArtifactCache(): |
| 311 | 322 |
# (int): The size of the artifact cache.
|
| 312 | 323 |
#
|
| 313 | 324 |
def compute_cache_size(self):
|
| 314 |
- self._cache_size = self.calculate_cache_size() # pylint: disable=assignment-from-no-return
|
|
| 325 |
+ self._cache_size = self.cas.calculate_cache_size()
|
|
| 315 | 326 |
|
| 316 | 327 |
return self._cache_size
|
| 317 | 328 |
|
| ... | ... | @@ -380,28 +391,12 @@ class ArtifactCache(): |
| 380 | 391 |
def has_quota_exceeded(self):
|
| 381 | 392 |
return self.get_cache_size() > self._cache_quota
|
| 382 | 393 |
|
| 383 |
- ################################################
|
|
| 384 |
- # Abstract methods for subclasses to implement #
|
|
| 385 |
- ################################################
|
|
| 386 |
- |
|
| 387 | 394 |
# preflight():
|
| 388 | 395 |
#
|
| 389 | 396 |
# Preflight check.
|
| 390 | 397 |
#
|
| 391 | 398 |
def preflight(self):
|
| 392 |
- pass
|
|
| 393 |
- |
|
| 394 |
- # update_mtime()
|
|
| 395 |
- #
|
|
| 396 |
- # Update the mtime of an artifact.
|
|
| 397 |
- #
|
|
| 398 |
- # Args:
|
|
| 399 |
- # element (Element): The Element to update
|
|
| 400 |
- # key (str): The key of the artifact.
|
|
| 401 |
- #
|
|
| 402 |
- def update_mtime(self, element, key):
|
|
| 403 |
- raise ImplError("Cache '{kind}' does not implement update_mtime()"
|
|
| 404 |
- .format(kind=type(self).__name__))
|
|
| 399 |
+ self.cas.preflight()
|
|
| 405 | 400 |
|
| 406 | 401 |
# initialize_remotes():
|
| 407 | 402 |
#
|
| ... | ... | @@ -411,7 +406,59 @@ class ArtifactCache(): |
| 411 | 406 |
# on_failure (callable): Called if we fail to contact one of the caches.
|
| 412 | 407 |
#
|
| 413 | 408 |
def initialize_remotes(self, *, on_failure=None):
|
| 414 |
- pass
|
|
| 409 |
+ remote_specs = self.global_remote_specs
|
|
| 410 |
+ |
|
| 411 |
+ for project in self.project_remote_specs:
|
|
| 412 |
+ remote_specs += self.project_remote_specs[project]
|
|
| 413 |
+ |
|
| 414 |
+ remote_specs = list(utils._deduplicate(remote_specs))
|
|
| 415 |
+ |
|
| 416 |
+ remotes = {}
|
|
| 417 |
+ q = multiprocessing.Queue()
|
|
| 418 |
+ for remote_spec in remote_specs:
|
|
| 419 |
+ # Use subprocess to avoid creation of gRPC threads in main BuildStream process
|
|
| 420 |
+ # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
|
|
| 421 |
+ p = multiprocessing.Process(target=self.cas.initialize_remote, args=(remote_spec, q))
|
|
| 422 |
+ |
|
| 423 |
+ try:
|
|
| 424 |
+ # Keep SIGINT blocked in the child process
|
|
| 425 |
+ with _signals.blocked([signal.SIGINT], ignore=False):
|
|
| 426 |
+ p.start()
|
|
| 427 |
+ |
|
| 428 |
+ error = q.get()
|
|
| 429 |
+ p.join()
|
|
| 430 |
+ except KeyboardInterrupt:
|
|
| 431 |
+ utils._kill_process_tree(p.pid)
|
|
| 432 |
+ raise
|
|
| 433 |
+ |
|
| 434 |
+ if error and on_failure:
|
|
| 435 |
+ on_failure(remote_spec.url, error)
|
|
| 436 |
+ elif error:
|
|
| 437 |
+ raise ArtifactError(error)
|
|
| 438 |
+ else:
|
|
| 439 |
+ self._has_fetch_remotes = True
|
|
| 440 |
+ if remote_spec.push:
|
|
| 441 |
+ self._has_push_remotes = True
|
|
| 442 |
+ |
|
| 443 |
+ remotes[remote_spec.url] = CASRemote(remote_spec)
|
|
| 444 |
+ |
|
| 445 |
+ for project in self.context.get_projects():
|
|
| 446 |
+ remote_specs = self.global_remote_specs
|
|
| 447 |
+ if project in self.project_remote_specs:
|
|
| 448 |
+ remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project]))
|
|
| 449 |
+ |
|
| 450 |
+ project_remotes = []
|
|
| 451 |
+ |
|
| 452 |
+ for remote_spec in remote_specs:
|
|
| 453 |
+ # Errors are already handled in the loop above,
|
|
| 454 |
+ # skip unreachable remotes here.
|
|
| 455 |
+ if remote_spec.url not in remotes:
|
|
| 456 |
+ continue
|
|
| 457 |
+ |
|
| 458 |
+ remote = remotes[remote_spec.url]
|
|
| 459 |
+ project_remotes.append(remote)
|
|
| 460 |
+ |
|
| 461 |
+ self._remotes[project] = project_remotes
|
|
| 415 | 462 |
|
| 416 | 463 |
# contains():
|
| 417 | 464 |
#
|
| ... | ... | @@ -425,8 +472,9 @@ class ArtifactCache(): |
| 425 | 472 |
# Returns: True if the artifact is in the cache, False otherwise
|
| 426 | 473 |
#
|
| 427 | 474 |
def contains(self, element, key):
|
| 428 |
- raise ImplError("Cache '{kind}' does not implement contains()"
|
|
| 429 |
- .format(kind=type(self).__name__))
|
|
| 475 |
+ ref = self.get_artifact_fullname(element, key)
|
|
| 476 |
+ |
|
| 477 |
+ return self.cas.contains(ref)
|
|
| 430 | 478 |
|
| 431 | 479 |
# list_artifacts():
|
| 432 | 480 |
#
|
| ... | ... | @@ -437,8 +485,7 @@ class ArtifactCache(): |
| 437 | 485 |
# `ArtifactCache.get_artifact_fullname` in LRU order
|
| 438 | 486 |
#
|
| 439 | 487 |
def list_artifacts(self):
|
| 440 |
- raise ImplError("Cache '{kind}' does not implement list_artifacts()"
|
|
| 441 |
- .format(kind=type(self).__name__))
|
|
| 488 |
+ return self.cas.list_refs()
|
|
| 442 | 489 |
|
| 443 | 490 |
# remove():
|
| 444 | 491 |
#
|
| ... | ... | @@ -450,9 +497,31 @@ class ArtifactCache(): |
| 450 | 497 |
# generated by
|
| 451 | 498 |
# `ArtifactCache.get_artifact_fullname`)
|
| 452 | 499 |
#
|
| 453 |
- def remove(self, artifact_name):
|
|
| 454 |
- raise ImplError("Cache '{kind}' does not implement remove()"
|
|
| 455 |
- .format(kind=type(self).__name__))
|
|
| 500 |
+ # Returns:
|
|
| 501 |
+ # (int|None) The amount of space pruned from the repository in
|
|
| 502 |
+ # Bytes, or None if defer_prune is True
|
|
| 503 |
+ #
|
|
| 504 |
+ def remove(self, ref):
|
|
| 505 |
+ |
|
| 506 |
+ # Remove extract if not used by other ref
|
|
| 507 |
+ tree = self.cas.resolve_ref(ref)
|
|
| 508 |
+ ref_name, ref_hash = os.path.split(ref)
|
|
| 509 |
+ extract = os.path.join(self.extractdir, ref_name, tree.hash)
|
|
| 510 |
+ keys_file = os.path.join(extract, 'meta', 'keys.yaml')
|
|
| 511 |
+ if os.path.exists(keys_file):
|
|
| 512 |
+ keys_meta = _yaml.load(keys_file)
|
|
| 513 |
+ keys = [keys_meta['strong'], keys_meta['weak']]
|
|
| 514 |
+ remove_extract = True
|
|
| 515 |
+ for other_hash in keys:
|
|
| 516 |
+ if other_hash == ref_hash:
|
|
| 517 |
+ continue
|
|
| 518 |
+ remove_extract = False
|
|
| 519 |
+ break
|
|
| 520 |
+ |
|
| 521 |
+ if remove_extract:
|
|
| 522 |
+ utils._force_rmtree(extract)
|
|
| 523 |
+ |
|
| 524 |
+ return self.cas.remove(ref)
|
|
| 456 | 525 |
|
| 457 | 526 |
# extract():
|
| 458 | 527 |
#
|
| ... | ... | @@ -472,8 +541,11 @@ class ArtifactCache(): |
| 472 | 541 |
# Returns: path to extracted artifact
|
| 473 | 542 |
#
|
| 474 | 543 |
def extract(self, element, key):
|
| 475 |
- raise ImplError("Cache '{kind}' does not implement extract()"
|
|
| 476 |
- .format(kind=type(self).__name__))
|
|
| 544 |
+ ref = self.get_artifact_fullname(element, key)
|
|
| 545 |
+ |
|
| 546 |
+ path = os.path.join(self.extractdir, element._get_project().name, element.normal_name)
|
|
| 547 |
+ |
|
| 548 |
+ return self.cas.extract(ref, path)
|
|
| 477 | 549 |
|
| 478 | 550 |
# commit():
|
| 479 | 551 |
#
|
| ... | ... | @@ -485,8 +557,9 @@ class ArtifactCache(): |
| 485 | 557 |
# keys (list): The cache keys to use
|
| 486 | 558 |
#
|
| 487 | 559 |
def commit(self, element, content, keys):
|
| 488 |
- raise ImplError("Cache '{kind}' does not implement commit()"
|
|
| 489 |
- .format(kind=type(self).__name__))
|
|
| 560 |
+ refs = [self.get_artifact_fullname(element, key) for key in keys]
|
|
| 561 |
+ |
|
| 562 |
+ self.cas.commit(refs, content)
|
|
| 490 | 563 |
|
| 491 | 564 |
# diff():
|
| 492 | 565 |
#
|
| ... | ... | @@ -500,8 +573,10 @@ class ArtifactCache(): |
| 500 | 573 |
# subdir (str): A subdirectory to limit the comparison to
|
| 501 | 574 |
#
|
| 502 | 575 |
def diff(self, element, key_a, key_b, *, subdir=None):
|
| 503 |
- raise ImplError("Cache '{kind}' does not implement diff()"
|
|
| 504 |
- .format(kind=type(self).__name__))
|
|
| 576 |
+ ref_a = self.get_artifact_fullname(element, key_a)
|
|
| 577 |
+ ref_b = self.get_artifact_fullname(element, key_b)
|
|
| 578 |
+ |
|
| 579 |
+ return self.cas.diff(ref_a, ref_b, subdir=subdir)
|
|
| 505 | 580 |
|
| 506 | 581 |
# has_fetch_remotes():
|
| 507 | 582 |
#
|
| ... | ... | @@ -513,7 +588,16 @@ class ArtifactCache(): |
| 513 | 588 |
# Returns: True if any remote repositories are configured, False otherwise
|
| 514 | 589 |
#
|
| 515 | 590 |
def has_fetch_remotes(self, *, element=None):
|
| 516 |
- return False
|
|
| 591 |
+ if not self._has_fetch_remotes:
|
|
| 592 |
+ # No project has fetch remotes
|
|
| 593 |
+ return False
|
|
| 594 |
+ elif element is None:
|
|
| 595 |
+ # At least one (sub)project has fetch remotes
|
|
| 596 |
+ return True
|
|
| 597 |
+ else:
|
|
| 598 |
+ # Check whether the specified element's project has fetch remotes
|
|
| 599 |
+ remotes_for_project = self._remotes[element._get_project()]
|
|
| 600 |
+ return bool(remotes_for_project)
|
|
| 517 | 601 |
|
| 518 | 602 |
# has_push_remotes():
|
| 519 | 603 |
#
|
| ... | ... | @@ -525,7 +609,16 @@ class ArtifactCache(): |
| 525 | 609 |
# Returns: True if any remote repository is configured, False otherwise
|
| 526 | 610 |
#
|
| 527 | 611 |
def has_push_remotes(self, *, element=None):
|
| 528 |
- return False
|
|
| 612 |
+ if not self._has_push_remotes:
|
|
| 613 |
+ # No project has push remotes
|
|
| 614 |
+ return False
|
|
| 615 |
+ elif element is None:
|
|
| 616 |
+ # At least one (sub)project has push remotes
|
|
| 617 |
+ return True
|
|
| 618 |
+ else:
|
|
| 619 |
+ # Check whether the specified element's project has push remotes
|
|
| 620 |
+ remotes_for_project = self._remotes[element._get_project()]
|
|
| 621 |
+ return any(remote.spec.push for remote in remotes_for_project)
|
|
| 529 | 622 |
|
| 530 | 623 |
# push():
|
| 531 | 624 |
#
|
| ... | ... | @@ -542,8 +635,28 @@ class ArtifactCache(): |
| 542 | 635 |
# (ArtifactError): if there was an error
|
| 543 | 636 |
#
|
| 544 | 637 |
def push(self, element, keys):
|
| 545 |
- raise ImplError("Cache '{kind}' does not implement push()"
|
|
| 546 |
- .format(kind=type(self).__name__))
|
|
| 638 |
+ refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
|
|
| 639 |
+ |
|
| 640 |
+ project = element._get_project()
|
|
| 641 |
+ |
|
| 642 |
+ push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
|
| 643 |
+ |
|
| 644 |
+ pushed = False
|
|
| 645 |
+ |
|
| 646 |
+ for remote in push_remotes:
|
|
| 647 |
+ remote.init()
|
|
| 648 |
+ display_key = element._get_brief_display_key()
|
|
| 649 |
+ element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
|
|
| 650 |
+ |
|
| 651 |
+ if self.cas.push(refs, remote):
|
|
| 652 |
+ element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
|
|
| 653 |
+ pushed = True
|
|
| 654 |
+ else:
|
|
| 655 |
+ element.info("Remote ({}) already has {} cached".format(
|
|
| 656 |
+ remote.spec.url, element._get_brief_display_key()
|
|
| 657 |
+ ))
|
|
| 658 |
+ |
|
| 659 |
+ return pushed
|
|
| 547 | 660 |
|
| 548 | 661 |
# pull():
|
| 549 | 662 |
#
|
| ... | ... | @@ -558,8 +671,130 @@ class ArtifactCache(): |
| 558 | 671 |
# (bool): True if pull was successful, False if artifact was not available
|
| 559 | 672 |
#
|
| 560 | 673 |
def pull(self, element, key, *, progress=None):
|
| 561 |
- raise ImplError("Cache '{kind}' does not implement pull()"
|
|
| 562 |
- .format(kind=type(self).__name__))
|
|
| 674 |
+ ref = self.get_artifact_fullname(element, key)
|
|
| 675 |
+ |
|
| 676 |
+ project = element._get_project()
|
|
| 677 |
+ |
|
| 678 |
+ for remote in self._remotes[project]:
|
|
| 679 |
+ try:
|
|
| 680 |
+ display_key = element._get_brief_display_key()
|
|
| 681 |
+ element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url))
|
|
| 682 |
+ |
|
| 683 |
+ if self.cas.pull(ref, remote, progress=progress):
|
|
| 684 |
+ element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
|
|
| 685 |
+ # no need to pull from additional remotes
|
|
| 686 |
+ return True
|
|
| 687 |
+ else:
|
|
| 688 |
+ element.info("Remote ({}) does not have {} cached".format(
|
|
| 689 |
+ remote.spec.url, element._get_brief_display_key()
|
|
| 690 |
+ ))
|
|
| 691 |
+ |
|
| 692 |
+ except CASError as e:
|
|
| 693 |
+ raise ArtifactError("Failed to pull artifact {}: {}".format(
|
|
| 694 |
+ element._get_brief_display_key(), e)) from e
|
|
| 695 |
+ |
|
| 696 |
+ return False
|
|
| 697 |
+ |
|
| 698 |
+ # pull_tree():
|
|
| 699 |
+ #
|
|
| 700 |
+ # Pull a single Tree rather than an artifact.
|
|
| 701 |
+ # Does not update local refs.
|
|
| 702 |
+ #
|
|
| 703 |
+ # Args:
|
|
| 704 |
+ # project (Project): The current project
|
|
| 705 |
+ # digest (Digest): The digest of the tree
|
|
| 706 |
+ #
|
|
| 707 |
+ def pull_tree(self, project, digest):
|
|
| 708 |
+ for remote in self._remotes[project]:
|
|
| 709 |
+ digest = self.cas.pull_tree(remote, digest)
|
|
| 710 |
+ |
|
| 711 |
+ if digest:
|
|
| 712 |
+ # no need to pull from additional remotes
|
|
| 713 |
+ return digest
|
|
| 714 |
+ |
|
| 715 |
+ return None
|
|
| 716 |
+ |
|
| 717 |
+ # push_directory():
|
|
| 718 |
+ #
|
|
| 719 |
+ # Push the given virtual directory to all remotes.
|
|
| 720 |
+ #
|
|
| 721 |
+ # Args:
|
|
| 722 |
+ # project (Project): The current project
|
|
| 723 |
+ # directory (Directory): A virtual directory object to push.
|
|
| 724 |
+ #
|
|
| 725 |
+ # Raises:
|
|
| 726 |
+ # (ArtifactError): if there was an error
|
|
| 727 |
+ #
|
|
| 728 |
+ def push_directory(self, project, directory):
|
|
| 729 |
+ if self._has_push_remotes:
|
|
| 730 |
+ push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
|
| 731 |
+ else:
|
|
| 732 |
+ push_remotes = []
|
|
| 733 |
+ |
|
| 734 |
+ if not push_remotes:
|
|
| 735 |
+ raise ArtifactError("push_directory was called, but no remote artifact " +
|
|
| 736 |
+ "servers are configured as push remotes.")
|
|
| 737 |
+ |
|
| 738 |
+ if directory.ref is None:
|
|
| 739 |
+ return
|
|
| 740 |
+ |
|
| 741 |
+ for remote in push_remotes:
|
|
| 742 |
+ self.cas.push_directory(remote, directory)
|
|
| 743 |
+ |
|
| 744 |
+ # push_message():
|
|
| 745 |
+ #
|
|
| 746 |
+ # Push the given protobuf message to all remotes.
|
|
| 747 |
+ #
|
|
| 748 |
+ # Args:
|
|
| 749 |
+ # project (Project): The current project
|
|
| 750 |
+ # message (Message): A protobuf message to push.
|
|
| 751 |
+ #
|
|
| 752 |
+ # Raises:
|
|
| 753 |
+ # (ArtifactError): if there was an error
|
|
| 754 |
+ #
|
|
| 755 |
+ def push_message(self, project, message):
|
|
| 756 |
+ |
|
| 757 |
+ if self._has_push_remotes:
|
|
| 758 |
+ push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
|
| 759 |
+ else:
|
|
| 760 |
+ push_remotes = []
|
|
| 761 |
+ |
|
| 762 |
+ if not push_remotes:
|
|
| 763 |
+ raise ArtifactError("push_message was called, but no remote artifact " +
|
|
| 764 |
+ "servers are configured as push remotes.")
|
|
| 765 |
+ |
|
| 766 |
+ for remote in push_remotes:
|
|
| 767 |
+ message_digest = self.cas.push_message(remote, message)
|
|
| 768 |
+ |
|
| 769 |
+ return message_digest
|
|
| 770 |
+ |
|
| 771 |
+ # verify_digest_pushed():
|
|
| 772 |
+ #
|
|
| 773 |
+ # Check whether the object is already on the server in which case
|
|
| 774 |
+ # there is no need to upload it.
|
|
| 775 |
+ #
|
|
| 776 |
+ # Args:
|
|
| 777 |
+ # project (Project): The current project
|
|
| 778 |
+ # digest (Digest): The object digest.
|
|
| 779 |
+ #
|
|
| 780 |
+ def verify_digest_pushed(self, project, digest):
|
|
| 781 |
+ |
|
| 782 |
+ if self._has_push_remotes:
|
|
| 783 |
+ push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
|
| 784 |
+ else:
|
|
| 785 |
+ push_remotes = []
|
|
| 786 |
+ |
|
| 787 |
+ if not push_remotes:
|
|
| 788 |
+ raise ArtifactError("verify_digest_pushed was called, but no remote artifact " +
|
|
| 789 |
+ "servers are configured as push remotes.")
|
|
| 790 |
+ |
|
| 791 |
+ pushed = False
|
|
| 792 |
+ |
|
| 793 |
+ for remote in push_remotes:
|
|
| 794 |
+ if self.cas.verify_digest_on_remote(remote, digest):
|
|
| 795 |
+ pushed = True
|
|
| 796 |
+ |
|
| 797 |
+ return pushed
|
|
| 563 | 798 |
|
| 564 | 799 |
# link_key():
|
| 565 | 800 |
#
|
| ... | ... | @@ -571,19 +806,10 @@ class ArtifactCache(): |
| 571 | 806 |
# newkey (str): A new cache key for the artifact
|
| 572 | 807 |
#
|
| 573 | 808 |
def link_key(self, element, oldkey, newkey):
|
| 574 |
- raise ImplError("Cache '{kind}' does not implement link_key()"
|
|
| 575 |
- .format(kind=type(self).__name__))
|
|
| 809 |
+ oldref = self.get_artifact_fullname(element, oldkey)
|
|
| 810 |
+ newref = self.get_artifact_fullname(element, newkey)
|
|
| 576 | 811 |
|
| 577 |
- # calculate_cache_size()
|
|
| 578 |
- #
|
|
| 579 |
- # Return the real artifact cache size.
|
|
| 580 |
- #
|
|
| 581 |
- # Returns:
|
|
| 582 |
- # (int): The size of the artifact cache.
|
|
| 583 |
- #
|
|
| 584 |
- def calculate_cache_size(self):
|
|
| 585 |
- raise ImplError("Cache '{kind}' does not implement calculate_cache_size()"
|
|
| 586 |
- .format(kind=type(self).__name__))
|
|
| 812 |
+ self.cas.link_ref(oldref, newref)
|
|
| 587 | 813 |
|
| 588 | 814 |
################################################
|
| 589 | 815 |
# Local Private Methods #
|
| ... | ... | @@ -20,9 +20,7 @@ |
| 20 | 20 |
import hashlib
|
| 21 | 21 |
import itertools
|
| 22 | 22 |
import io
|
| 23 |
-import multiprocessing
|
|
| 24 | 23 |
import os
|
| 25 |
-import signal
|
|
| 26 | 24 |
import stat
|
| 27 | 25 |
import tempfile
|
| 28 | 26 |
import uuid
|
| ... | ... | @@ -31,17 +29,13 @@ from urllib.parse import urlparse |
| 31 | 29 |
|
| 32 | 30 |
import grpc
|
| 33 | 31 |
|
| 34 |
-from .. import _yaml
|
|
| 35 |
- |
|
| 36 | 32 |
from .._protos.google.rpc import code_pb2
|
| 37 | 33 |
from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
|
| 38 | 34 |
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
| 39 | 35 |
from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
|
| 40 | 36 |
|
| 41 |
-from .. import _signals, utils
|
|
| 42 |
-from .._exceptions import ArtifactError
|
|
| 43 |
- |
|
| 44 |
-from . import ArtifactCache
|
|
| 37 |
+from .. import utils
|
|
| 38 |
+from .._exceptions import CASError
|
|
| 45 | 39 |
|
| 46 | 40 |
|
| 47 | 41 |
# The default limit for gRPC messages is 4 MiB.
|
| ... | ... | @@ -49,62 +43,68 @@ from . import ArtifactCache |
| 49 | 43 |
_MAX_PAYLOAD_BYTES = 1024 * 1024
|
| 50 | 44 |
|
| 51 | 45 |
|
| 52 |
-# A CASCache manages artifacts in a CAS repository as specified in the
|
|
| 53 |
-# Remote Execution API.
|
|
| 46 |
+# A CASCache manages a CAS repository as specified in the Remote Execution API.
|
|
| 54 | 47 |
#
|
| 55 | 48 |
# Args:
|
| 56 |
-# context (Context): The BuildStream context
|
|
| 57 |
-#
|
|
| 58 |
-# Pushing is explicitly disabled by the platform in some cases,
|
|
| 59 |
-# like when we are falling back to functioning without using
|
|
| 60 |
-# user namespaces.
|
|
| 49 |
+# path (str): The root directory for the CAS repository
|
|
| 61 | 50 |
#
|
| 62 |
-class CASCache(ArtifactCache):
|
|
| 51 |
+class CASCache():
|
|
| 63 | 52 |
|
| 64 |
- def __init__(self, context):
|
|
| 65 |
- super().__init__(context)
|
|
| 66 |
- |
|
| 67 |
- self.casdir = os.path.join(context.artifactdir, 'cas')
|
|
| 53 |
+ def __init__(self, path):
|
|
| 54 |
+ self.casdir = os.path.join(path, 'cas')
|
|
| 55 |
+ self.tmpdir = os.path.join(path, 'tmp')
|
|
| 68 | 56 |
os.makedirs(os.path.join(self.casdir, 'refs', 'heads'), exist_ok=True)
|
| 69 | 57 |
os.makedirs(os.path.join(self.casdir, 'objects'), exist_ok=True)
|
| 58 |
+ os.makedirs(self.tmpdir, exist_ok=True)
|
|
| 70 | 59 |
|
| 71 |
- self._calculate_cache_quota()
|
|
| 72 |
- |
|
| 73 |
- # Per-project list of _CASRemote instances.
|
|
| 74 |
- self._remotes = {}
|
|
| 75 |
- |
|
| 76 |
- self._has_fetch_remotes = False
|
|
| 77 |
- self._has_push_remotes = False
|
|
| 78 |
- |
|
| 79 |
- ################################################
|
|
| 80 |
- # Implementation of abstract methods #
|
|
| 81 |
- ################################################
|
|
| 82 |
- |
|
| 60 |
+ # preflight():
|
|
| 61 |
+ #
|
|
| 62 |
+ # Preflight check.
|
|
| 63 |
+ #
|
|
| 83 | 64 |
def preflight(self):
|
| 84 | 65 |
headdir = os.path.join(self.casdir, 'refs', 'heads')
|
| 85 | 66 |
objdir = os.path.join(self.casdir, 'objects')
|
| 86 | 67 |
if not (os.path.isdir(headdir) and os.path.isdir(objdir)):
|
| 87 |
- raise ArtifactError("CAS repository check failed for '{}'"
|
|
| 88 |
- .format(self.casdir))
|
|
| 68 |
+ raise CASError("CAS repository check failed for '{}'".format(self.casdir))
|
|
| 89 | 69 |
|
| 90 |
- def contains(self, element, key):
|
|
| 91 |
- refpath = self._refpath(self.get_artifact_fullname(element, key))
|
|
| 70 |
+ # contains():
|
|
| 71 |
+ #
|
|
| 72 |
+ # Check whether the specified ref is already available in the local CAS cache.
|
|
| 73 |
+ #
|
|
| 74 |
+ # Args:
|
|
| 75 |
+ # ref (str): The ref to check
|
|
| 76 |
+ #
|
|
| 77 |
+ # Returns: True if the ref is in the cache, False otherwise
|
|
| 78 |
+ #
|
|
| 79 |
+ def contains(self, ref):
|
|
| 80 |
+ refpath = self._refpath(ref)
|
|
| 92 | 81 |
|
| 93 | 82 |
# This assumes that the repository doesn't have any dangling pointers
|
| 94 | 83 |
return os.path.exists(refpath)
|
| 95 | 84 |
|
| 96 |
- def extract(self, element, key):
|
|
| 97 |
- ref = self.get_artifact_fullname(element, key)
|
|
| 98 |
- |
|
| 85 |
+ # extract():
|
|
| 86 |
+ #
|
|
| 87 |
+ # Extract cached directory for the specified ref if it hasn't
|
|
| 88 |
+ # already been extracted.
|
|
| 89 |
+ #
|
|
| 90 |
+ # Args:
|
|
| 91 |
+ # ref (str): The ref whose directory to extract
|
|
| 92 |
+ # path (str): The destination path
|
|
| 93 |
+ #
|
|
| 94 |
+ # Raises:
|
|
| 95 |
+ # CASError: In cases there was an OSError, or if the ref did not exist.
|
|
| 96 |
+ #
|
|
| 97 |
+ # Returns: path to extracted directory
|
|
| 98 |
+ #
|
|
| 99 |
+ def extract(self, ref, path):
|
|
| 99 | 100 |
tree = self.resolve_ref(ref, update_mtime=True)
|
| 100 | 101 |
|
| 101 |
- dest = os.path.join(self.extractdir, element._get_project().name,
|
|
| 102 |
- element.normal_name, tree.hash)
|
|
| 102 |
+ dest = os.path.join(path, tree.hash)
|
|
| 103 | 103 |
if os.path.isdir(dest):
|
| 104 |
- # artifact has already been extracted
|
|
| 104 |
+ # directory has already been extracted
|
|
| 105 | 105 |
return dest
|
| 106 | 106 |
|
| 107 |
- with tempfile.TemporaryDirectory(prefix='tmp', dir=self.extractdir) as tmpdir:
|
|
| 107 |
+ with tempfile.TemporaryDirectory(prefix='tmp', dir=self.tmpdir) as tmpdir:
|
|
| 108 | 108 |
checkoutdir = os.path.join(tmpdir, ref)
|
| 109 | 109 |
self._checkout(checkoutdir, tree)
|
| 110 | 110 |
|
| ... | ... | @@ -118,23 +118,35 @@ class CASCache(ArtifactCache): |
| 118 | 118 |
# If rename fails with these errors, another process beat
|
| 119 | 119 |
# us to it so just ignore.
|
| 120 | 120 |
if e.errno not in [errno.ENOTEMPTY, errno.EEXIST]:
|
| 121 |
- raise ArtifactError("Failed to extract artifact for ref '{}': {}"
|
|
| 122 |
- .format(ref, e)) from e
|
|
| 121 |
+ raise CASError("Failed to extract directory for ref '{}': {}".format(ref, e)) from e
|
|
| 123 | 122 |
|
| 124 | 123 |
return dest
|
| 125 | 124 |
|
| 126 |
- def commit(self, element, content, keys):
|
|
| 127 |
- refs = [self.get_artifact_fullname(element, key) for key in keys]
|
|
| 128 |
- |
|
| 129 |
- tree = self._commit_directory(content)
|
|
| 125 |
+ # commit():
|
|
| 126 |
+ #
|
|
| 127 |
+ # Commit directory to cache.
|
|
| 128 |
+ #
|
|
| 129 |
+ # Args:
|
|
| 130 |
+ # refs (list): The refs to set
|
|
| 131 |
+ # path (str): The directory to import
|
|
| 132 |
+ #
|
|
| 133 |
+ def commit(self, refs, path):
|
|
| 134 |
+ tree = self._commit_directory(path)
|
|
| 130 | 135 |
|
| 131 | 136 |
for ref in refs:
|
| 132 | 137 |
self.set_ref(ref, tree)
|
| 133 | 138 |
|
| 134 |
- def diff(self, element, key_a, key_b, *, subdir=None):
|
|
| 135 |
- ref_a = self.get_artifact_fullname(element, key_a)
|
|
| 136 |
- ref_b = self.get_artifact_fullname(element, key_b)
|
|
| 137 |
- |
|
| 139 |
+ # diff():
|
|
| 140 |
+ #
|
|
| 141 |
+ # Return a list of files that have been added or modified between
|
|
| 142 |
+ # the refs described by ref_a and ref_b.
|
|
| 143 |
+ #
|
|
| 144 |
+ # Args:
|
|
| 145 |
+ # ref_a (str): The first ref
|
|
| 146 |
+ # ref_b (str): The second ref
|
|
| 147 |
+ # subdir (str): A subdirectory to limit the comparison to
|
|
| 148 |
+ #
|
|
| 149 |
+ def diff(self, ref_a, ref_b, *, subdir=None):
|
|
| 138 | 150 |
tree_a = self.resolve_ref(ref_a)
|
| 139 | 151 |
tree_b = self.resolve_ref(ref_b)
|
| 140 | 152 |
|
| ... | ... | @@ -150,158 +162,122 @@ class CASCache(ArtifactCache): |
| 150 | 162 |
|
| 151 | 163 |
return modified, removed, added
|
| 152 | 164 |
|
| 153 |
- def initialize_remotes(self, *, on_failure=None):
|
|
| 154 |
- remote_specs = self.global_remote_specs
|
|
| 155 |
- |
|
| 156 |
- for project in self.project_remote_specs:
|
|
| 157 |
- remote_specs += self.project_remote_specs[project]
|
|
| 158 |
- |
|
| 159 |
- remote_specs = list(utils._deduplicate(remote_specs))
|
|
| 160 |
- |
|
| 161 |
- remotes = {}
|
|
| 162 |
- q = multiprocessing.Queue()
|
|
| 163 |
- for remote_spec in remote_specs:
|
|
| 164 |
- # Use subprocess to avoid creation of gRPC threads in main BuildStream process
|
|
| 165 |
- # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
|
|
| 166 |
- p = multiprocessing.Process(target=self._initialize_remote, args=(remote_spec, q))
|
|
| 165 |
+ def initialize_remote(self, remote_spec, q):
|
|
| 166 |
+ try:
|
|
| 167 |
+ remote = CASRemote(remote_spec)
|
|
| 168 |
+ remote.init()
|
|
| 167 | 169 |
|
| 168 |
- try:
|
|
| 169 |
- # Keep SIGINT blocked in the child process
|
|
| 170 |
- with _signals.blocked([signal.SIGINT], ignore=False):
|
|
| 171 |
- p.start()
|
|
| 172 |
- |
|
| 173 |
- error = q.get()
|
|
| 174 |
- p.join()
|
|
| 175 |
- except KeyboardInterrupt:
|
|
| 176 |
- utils._kill_process_tree(p.pid)
|
|
| 177 |
- raise
|
|
| 170 |
+ request = buildstream_pb2.StatusRequest()
|
|
| 171 |
+ response = remote.ref_storage.Status(request)
|
|
| 178 | 172 |
|
| 179 |
- if error and on_failure:
|
|
| 180 |
- on_failure(remote_spec.url, error)
|
|
| 181 |
- elif error:
|
|
| 182 |
- raise ArtifactError(error)
|
|
| 173 |
+ if remote_spec.push and not response.allow_updates:
|
|
| 174 |
+ q.put('CAS server does not allow push')
|
|
| 183 | 175 |
else:
|
| 184 |
- self._has_fetch_remotes = True
|
|
| 185 |
- if remote_spec.push:
|
|
| 186 |
- self._has_push_remotes = True
|
|
| 176 |
+ # No error
|
|
| 177 |
+ q.put(None)
|
|
| 187 | 178 |
|
| 188 |
- remotes[remote_spec.url] = _CASRemote(remote_spec)
|
|
| 179 |
+ except grpc.RpcError as e:
|
|
| 180 |
+ # str(e) is too verbose for errors reported to the user
|
|
| 181 |
+ q.put(e.details())
|
|
| 189 | 182 |
|
| 190 |
- for project in self.context.get_projects():
|
|
| 191 |
- remote_specs = self.global_remote_specs
|
|
| 192 |
- if project in self.project_remote_specs:
|
|
| 193 |
- remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project]))
|
|
| 183 |
+ except Exception as e: # pylint: disable=broad-except
|
|
| 184 |
+ # Whatever happens, we need to return it to the calling process
|
|
| 185 |
+ #
|
|
| 186 |
+ q.put(str(e))
|
|
| 194 | 187 |
|
| 195 |
- project_remotes = []
|
|
| 188 |
+ # pull():
|
|
| 189 |
+ #
|
|
| 190 |
+ # Pull a ref from a remote repository.
|
|
| 191 |
+ #
|
|
| 192 |
+ # Args:
|
|
| 193 |
+ # ref (str): The ref to pull
|
|
| 194 |
+ # remote (CASRemote): The remote repository to pull from
|
|
| 195 |
+ # progress (callable): The progress callback, if any
|
|
| 196 |
+ #
|
|
| 197 |
+ # Returns:
|
|
| 198 |
+ # (bool): True if pull was successful, False if ref was not available
|
|
| 199 |
+ #
|
|
| 200 |
+ def pull(self, ref, remote, *, progress=None):
|
|
| 201 |
+ try:
|
|
| 202 |
+ remote.init()
|
|
| 196 | 203 |
|
| 197 |
- for remote_spec in remote_specs:
|
|
| 198 |
- # Errors are already handled in the loop above,
|
|
| 199 |
- # skip unreachable remotes here.
|
|
| 200 |
- if remote_spec.url not in remotes:
|
|
| 201 |
- continue
|
|
| 204 |
+ request = buildstream_pb2.GetReferenceRequest()
|
|
| 205 |
+ request.key = ref
|
|
| 206 |
+ response = remote.ref_storage.GetReference(request)
|
|
| 202 | 207 |
|
| 203 |
- remote = remotes[remote_spec.url]
|
|
| 204 |
- project_remotes.append(remote)
|
|
| 208 |
+ tree = remote_execution_pb2.Digest()
|
|
| 209 |
+ tree.hash = response.digest.hash
|
|
| 210 |
+ tree.size_bytes = response.digest.size_bytes
|
|
| 205 | 211 |
|
| 206 |
- self._remotes[project] = project_remotes
|
|
| 212 |
+ self._fetch_directory(remote, tree)
|
|
| 207 | 213 |
|
| 208 |
- def has_fetch_remotes(self, *, element=None):
|
|
| 209 |
- if not self._has_fetch_remotes:
|
|
| 210 |
- # No project has fetch remotes
|
|
| 211 |
- return False
|
|
| 212 |
- elif element is None:
|
|
| 213 |
- # At least one (sub)project has fetch remotes
|
|
| 214 |
- return True
|
|
| 215 |
- else:
|
|
| 216 |
- # Check whether the specified element's project has fetch remotes
|
|
| 217 |
- remotes_for_project = self._remotes[element._get_project()]
|
|
| 218 |
- return bool(remotes_for_project)
|
|
| 214 |
+ self.set_ref(ref, tree)
|
|
| 219 | 215 |
|
| 220 |
- def has_push_remotes(self, *, element=None):
|
|
| 221 |
- if not self._has_push_remotes:
|
|
| 222 |
- # No project has push remotes
|
|
| 223 |
- return False
|
|
| 224 |
- elif element is None:
|
|
| 225 |
- # At least one (sub)project has push remotes
|
|
| 226 | 216 |
return True
|
| 227 |
- else:
|
|
| 228 |
- # Check whether the specified element's project has push remotes
|
|
| 229 |
- remotes_for_project = self._remotes[element._get_project()]
|
|
| 230 |
- return any(remote.spec.push for remote in remotes_for_project)
|
|
| 231 |
- |
|
| 232 |
- def pull(self, element, key, *, progress=None):
|
|
| 233 |
- ref = self.get_artifact_fullname(element, key)
|
|
| 234 |
- |
|
| 235 |
- project = element._get_project()
|
|
| 236 |
- |
|
| 237 |
- for remote in self._remotes[project]:
|
|
| 238 |
- try:
|
|
| 239 |
- remote.init()
|
|
| 240 |
- display_key = element._get_brief_display_key()
|
|
| 241 |
- element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url))
|
|
| 242 |
- |
|
| 243 |
- request = buildstream_pb2.GetReferenceRequest()
|
|
| 244 |
- request.key = ref
|
|
| 245 |
- response = remote.ref_storage.GetReference(request)
|
|
| 246 |
- |
|
| 247 |
- tree = remote_execution_pb2.Digest()
|
|
| 248 |
- tree.hash = response.digest.hash
|
|
| 249 |
- tree.size_bytes = response.digest.size_bytes
|
|
| 250 |
- |
|
| 251 |
- self._fetch_directory(remote, tree)
|
|
| 252 |
- |
|
| 253 |
- self.set_ref(ref, tree)
|
|
| 254 |
- |
|
| 255 |
- element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
|
|
| 256 |
- # no need to pull from additional remotes
|
|
| 257 |
- return True
|
|
| 258 |
- |
|
| 259 |
- except grpc.RpcError as e:
|
|
| 260 |
- if e.code() != grpc.StatusCode.NOT_FOUND:
|
|
| 261 |
- raise ArtifactError("Failed to pull artifact {}: {}".format(
|
|
| 262 |
- element._get_brief_display_key(), e)) from e
|
|
| 263 |
- else:
|
|
| 264 |
- element.info("Remote ({}) does not have {} cached".format(
|
|
| 265 |
- remote.spec.url, element._get_brief_display_key()
|
|
| 266 |
- ))
|
|
| 267 |
- |
|
| 268 |
- return False
|
|
| 269 |
- |
|
| 270 |
- def pull_tree(self, project, digest):
|
|
| 271 |
- """ Pull a single Tree rather than an artifact.
|
|
| 272 |
- Does not update local refs. """
|
|
| 217 |
+ except grpc.RpcError as e:
|
|
| 218 |
+ if e.code() != grpc.StatusCode.NOT_FOUND:
|
|
| 219 |
+ raise CASError("Failed to pull ref {}: {}".format(ref, e)) from e
|
|
| 220 |
+ else:
|
|
| 221 |
+ return False
|
|
| 273 | 222 |
|
| 274 |
- for remote in self._remotes[project]:
|
|
| 275 |
- try:
|
|
| 276 |
- remote.init()
|
|
| 223 |
+ # pull_tree():
|
|
| 224 |
+ #
|
|
| 225 |
+ # Pull a single Tree rather than a ref.
|
|
| 226 |
+ # Does not update local refs.
|
|
| 227 |
+ #
|
|
| 228 |
+ # Args:
|
|
| 229 |
+ # remote (CASRemote): The remote to pull from
|
|
| 230 |
+ # digest (Digest): The digest of the tree
|
|
| 231 |
+ #
|
|
| 232 |
+ def pull_tree(self, remote, digest):
|
|
| 233 |
+ try:
|
|
| 234 |
+ remote.init()
|
|
| 277 | 235 |
|
| 278 |
- digest = self._fetch_tree(remote, digest)
|
|
| 236 |
+ digest = self._fetch_tree(remote, digest)
|
|
| 279 | 237 |
|
| 280 |
- # no need to pull from additional remotes
|
|
| 281 |
- return digest
|
|
| 238 |
+ return digest
|
|
| 282 | 239 |
|
| 283 |
- except grpc.RpcError as e:
|
|
| 284 |
- if e.code() != grpc.StatusCode.NOT_FOUND:
|
|
| 285 |
- raise
|
|
| 240 |
+ except grpc.RpcError as e:
|
|
| 241 |
+ if e.code() != grpc.StatusCode.NOT_FOUND:
|
|
| 242 |
+ raise
|
|
| 286 | 243 |
|
| 287 | 244 |
return None
|
| 288 | 245 |
|
| 289 |
- def link_key(self, element, oldkey, newkey):
|
|
| 290 |
- oldref = self.get_artifact_fullname(element, oldkey)
|
|
| 291 |
- newref = self.get_artifact_fullname(element, newkey)
|
|
| 292 |
- |
|
| 246 |
+ # link_ref():
|
|
| 247 |
+ #
|
|
| 248 |
+ # Add an alias for an existing ref.
|
|
| 249 |
+ #
|
|
| 250 |
+ # Args:
|
|
| 251 |
+ # oldref (str): An existing ref
|
|
| 252 |
+ # newref (str): A new ref for the same directory
|
|
| 253 |
+ #
|
|
| 254 |
+ def link_ref(self, oldref, newref):
|
|
| 293 | 255 |
tree = self.resolve_ref(oldref)
|
| 294 | 256 |
|
| 295 | 257 |
self.set_ref(newref, tree)
|
| 296 | 258 |
|
| 297 |
- def _push_refs_to_remote(self, refs, remote):
|
|
| 259 |
+ # push():
|
|
| 260 |
+ #
|
|
| 261 |
+ # Push committed refs to remote repository.
|
|
| 262 |
+ #
|
|
| 263 |
+ # Args:
|
|
| 264 |
+ # refs (list): The refs to push
|
|
| 265 |
+ # remote (CASRemote): The remote to push to
|
|
| 266 |
+ #
|
|
| 267 |
+ # Returns:
|
|
| 268 |
+ # (bool): True if any remote was updated, False if no pushes were required
|
|
| 269 |
+ #
|
|
| 270 |
+ # Raises:
|
|
| 271 |
+ # (CASError): if there was an error
|
|
| 272 |
+ #
|
|
| 273 |
+ def push(self, refs, remote):
|
|
| 298 | 274 |
skipped_remote = True
|
| 299 | 275 |
try:
|
| 300 | 276 |
for ref in refs:
|
| 301 | 277 |
tree = self.resolve_ref(ref)
|
| 302 | 278 |
|
| 303 | 279 |
# Check whether ref is already on the server in which case
|
| 304 |
- # there is no need to push the artifact
|
|
| 280 |
+ # there is no need to push the ref
|
|
| 305 | 281 |
try:
|
| 306 | 282 |
request = buildstream_pb2.GetReferenceRequest()
|
| 307 | 283 |
request.key = ref
|
| ... | ... | @@ -327,65 +303,38 @@ class CASCache(ArtifactCache): |
| 327 | 303 |
skipped_remote = False
|
| 328 | 304 |
except grpc.RpcError as e:
|
| 329 | 305 |
if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
|
| 330 |
- raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
|
|
| 306 |
+ raise CASError("Failed to push ref {}: {}".format(refs, e), temporary=True) from e
|
|
| 331 | 307 |
|
| 332 | 308 |
return not skipped_remote
|
| 333 | 309 |
|
| 334 |
- def push(self, element, keys):
|
|
| 335 |
- |
|
| 336 |
- refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
|
|
| 337 |
- |
|
| 338 |
- project = element._get_project()
|
|
| 339 |
- |
|
| 340 |
- push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
|
| 341 |
- |
|
| 342 |
- pushed = False
|
|
| 343 |
- |
|
| 344 |
- for remote in push_remotes:
|
|
| 345 |
- remote.init()
|
|
| 346 |
- display_key = element._get_brief_display_key()
|
|
| 347 |
- element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
|
|
| 348 |
- |
|
| 349 |
- if self._push_refs_to_remote(refs, remote):
|
|
| 350 |
- element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
|
|
| 351 |
- pushed = True
|
|
| 352 |
- else:
|
|
| 353 |
- element.info("Remote ({}) already has {} cached".format(
|
|
| 354 |
- remote.spec.url, element._get_brief_display_key()
|
|
| 355 |
- ))
|
|
| 356 |
- |
|
| 357 |
- return pushed
|
|
| 358 |
- |
|
| 359 |
- def push_directory(self, project, directory):
|
|
| 360 |
- """ Push the given virtual directory to all remotes.
|
|
| 361 |
- |
|
| 362 |
- Args:
|
|
| 363 |
- project (Project): The current project
|
|
| 364 |
- directory (Directory): A virtual directory object to push.
|
|
| 365 |
- |
|
| 366 |
- Raises: ArtifactError if no push remotes are configured.
|
|
| 367 |
- """
|
|
| 368 |
- |
|
| 369 |
- if self._has_push_remotes:
|
|
| 370 |
- push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
|
| 371 |
- else:
|
|
| 372 |
- push_remotes = []
|
|
| 373 |
- |
|
| 374 |
- if not push_remotes:
|
|
| 375 |
- raise ArtifactError("CASCache: push_directory was called, but no remote artifact " +
|
|
| 376 |
- "servers are configured as push remotes.")
|
|
| 377 |
- |
|
| 378 |
- if directory.ref is None:
|
|
| 379 |
- return
|
|
| 380 |
- |
|
| 381 |
- for remote in push_remotes:
|
|
| 382 |
- remote.init()
|
|
| 383 |
- |
|
| 384 |
- self._send_directory(remote, directory.ref)
|
|
| 310 |
+ # push_directory():
|
|
| 311 |
+ #
|
|
| 312 |
+ # Push the given virtual directory to a remote.
|
|
| 313 |
+ #
|
|
| 314 |
+ # Args:
|
|
| 315 |
+ # remote (CASRemote): The remote to push to
|
|
| 316 |
+ # directory (Directory): A virtual directory object to push.
|
|
| 317 |
+ #
|
|
| 318 |
+ # Raises:
|
|
| 319 |
+ # (CASError): if there was an error
|
|
| 320 |
+ #
|
|
| 321 |
+ def push_directory(self, remote, directory):
|
|
| 322 |
+ remote.init()
|
|
| 385 | 323 |
|
| 386 |
- def push_message(self, project, message):
|
|
| 324 |
+ self._send_directory(remote, directory.ref)
|
|
| 387 | 325 |
|
| 388 |
- push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
|
| 326 |
+ # push_message():
|
|
| 327 |
+ #
|
|
| 328 |
+ # Push the given protobuf message to a remote.
|
|
| 329 |
+ #
|
|
| 330 |
+ # Args:
|
|
| 331 |
+ # remote (CASRemote): The remote to push to
|
|
| 332 |
+ # message (Message): A protobuf message to push.
|
|
| 333 |
+ #
|
|
| 334 |
+ # Raises:
|
|
| 335 |
+ # (CASError): if there was an error
|
|
| 336 |
+ #
|
|
| 337 |
+ def push_message(self, remote, message):
|
|
| 389 | 338 |
|
| 390 | 339 |
message_buffer = message.SerializeToString()
|
| 391 | 340 |
message_sha = hashlib.sha256(message_buffer)
|
| ... | ... | @@ -393,17 +342,25 @@ class CASCache(ArtifactCache): |
| 393 | 342 |
message_digest.hash = message_sha.hexdigest()
|
| 394 | 343 |
message_digest.size_bytes = len(message_buffer)
|
| 395 | 344 |
|
| 396 |
- for remote in push_remotes:
|
|
| 397 |
- remote.init()
|
|
| 345 |
+ remote.init()
|
|
| 398 | 346 |
|
| 399 |
- with io.BytesIO(message_buffer) as b:
|
|
| 400 |
- self._send_blob(remote, message_digest, b)
|
|
| 347 |
+ with io.BytesIO(message_buffer) as b:
|
|
| 348 |
+ self._send_blob(remote, message_digest, b)
|
|
| 401 | 349 |
|
| 402 | 350 |
return message_digest
|
| 403 | 351 |
|
| 404 |
- def _verify_digest_on_remote(self, remote, digest):
|
|
| 405 |
- # Check whether ref is already on the server in which case
|
|
| 406 |
- # there is no need to push the artifact
|
|
| 352 |
+ # verify_digest_on_remote():
|
|
| 353 |
+ #
|
|
| 354 |
+ # Check whether the object is already on the server in which case
|
|
| 355 |
+ # there is no need to upload it.
|
|
| 356 |
+ #
|
|
| 357 |
+ # Args:
|
|
| 358 |
+ # remote (CASRemote): The remote to check
|
|
| 359 |
+ # digest (Digest): The object digest.
|
|
| 360 |
+ #
|
|
| 361 |
+ def verify_digest_on_remote(self, remote, digest):
|
|
| 362 |
+ remote.init()
|
|
| 363 |
+ |
|
| 407 | 364 |
request = remote_execution_pb2.FindMissingBlobsRequest()
|
| 408 | 365 |
request.blob_digests.extend([digest])
|
| 409 | 366 |
|
| ... | ... | @@ -413,24 +370,6 @@ class CASCache(ArtifactCache): |
| 413 | 370 |
|
| 414 | 371 |
return True
|
| 415 | 372 |
|
| 416 |
- def verify_digest_pushed(self, project, digest):
|
|
| 417 |
- |
|
| 418 |
- push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
|
| 419 |
- |
|
| 420 |
- pushed = False
|
|
| 421 |
- |
|
| 422 |
- for remote in push_remotes:
|
|
| 423 |
- remote.init()
|
|
| 424 |
- |
|
| 425 |
- if self._verify_digest_on_remote(remote, digest):
|
|
| 426 |
- pushed = True
|
|
| 427 |
- |
|
| 428 |
- return pushed
|
|
| 429 |
- |
|
| 430 |
- ################################################
|
|
| 431 |
- # API Private Methods #
|
|
| 432 |
- ################################################
|
|
| 433 |
- |
|
| 434 | 373 |
# objpath():
|
| 435 | 374 |
#
|
| 436 | 375 |
# Return the path of an object based on its digest.
|
| ... | ... | @@ -496,7 +435,7 @@ class CASCache(ArtifactCache): |
| 496 | 435 |
pass
|
| 497 | 436 |
|
| 498 | 437 |
except OSError as e:
|
| 499 |
- raise ArtifactError("Failed to hash object: {}".format(e)) from e
|
|
| 438 |
+ raise CASError("Failed to hash object: {}".format(e)) from e
|
|
| 500 | 439 |
|
| 501 | 440 |
return digest
|
| 502 | 441 |
|
| ... | ... | @@ -537,26 +476,39 @@ class CASCache(ArtifactCache): |
| 537 | 476 |
return digest
|
| 538 | 477 |
|
| 539 | 478 |
except FileNotFoundError as e:
|
| 540 |
- raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e
|
|
| 479 |
+ raise CASError("Attempt to access unavailable ref: {}".format(e)) from e
|
|
| 541 | 480 |
|
| 542 |
- def update_mtime(self, element, key):
|
|
| 481 |
+ # update_mtime()
|
|
| 482 |
+ #
|
|
| 483 |
+ # Update the mtime of a ref.
|
|
| 484 |
+ #
|
|
| 485 |
+ # Args:
|
|
| 486 |
+ # ref (str): The ref to update
|
|
| 487 |
+ #
|
|
| 488 |
+ def update_mtime(self, ref):
|
|
| 543 | 489 |
try:
|
| 544 |
- ref = self.get_artifact_fullname(element, key)
|
|
| 545 | 490 |
os.utime(self._refpath(ref))
|
| 546 | 491 |
except FileNotFoundError as e:
|
| 547 |
- raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e
|
|
| 492 |
+ raise CASError("Attempt to access unavailable ref: {}".format(e)) from e
|
|
| 548 | 493 |
|
| 494 |
+ # calculate_cache_size()
|
|
| 495 |
+ #
|
|
| 496 |
+ # Return the real disk usage of the CAS cache.
|
|
| 497 |
+ #
|
|
| 498 |
+ # Returns:
|
|
| 499 |
+ # (int): The size of the cache.
|
|
| 500 |
+ #
|
|
| 549 | 501 |
def calculate_cache_size(self):
|
| 550 | 502 |
return utils._get_dir_size(self.casdir)
|
| 551 | 503 |
|
| 552 |
- # list_artifacts():
|
|
| 504 |
+ # list_refs():
|
|
| 553 | 505 |
#
|
| 554 |
- # List cached artifacts in Least Recently Modified (LRM) order.
|
|
| 506 |
+ # List refs in Least Recently Modified (LRM) order.
|
|
| 555 | 507 |
#
|
| 556 | 508 |
# Returns:
|
| 557 | 509 |
# (list) - A list of refs in LRM order
|
| 558 | 510 |
#
|
| 559 |
- def list_artifacts(self):
|
|
| 511 |
+ def list_refs(self):
|
|
| 560 | 512 |
# string of: /path/to/repo/refs/heads
|
| 561 | 513 |
ref_heads = os.path.join(self.casdir, 'refs', 'heads')
|
| 562 | 514 |
|
| ... | ... | @@ -571,7 +523,7 @@ class CASCache(ArtifactCache): |
| 571 | 523 |
mtimes.append(os.path.getmtime(ref_path))
|
| 572 | 524 |
|
| 573 | 525 |
# NOTE: Sorted will sort from earliest to latest, thus the
|
| 574 |
- # first element of this list will be the file modified earliest.
|
|
| 526 |
+ # first ref of this list will be the file modified earliest.
|
|
| 575 | 527 |
return [ref for _, ref in sorted(zip(mtimes, refs))]
|
| 576 | 528 |
|
| 577 | 529 |
# remove():
|
| ... | ... | @@ -590,28 +542,10 @@ class CASCache(ArtifactCache): |
| 590 | 542 |
#
|
| 591 | 543 |
def remove(self, ref, *, defer_prune=False):
|
| 592 | 544 |
|
| 593 |
- # Remove extract if not used by other ref
|
|
| 594 |
- tree = self.resolve_ref(ref)
|
|
| 595 |
- ref_name, ref_hash = os.path.split(ref)
|
|
| 596 |
- extract = os.path.join(self.extractdir, ref_name, tree.hash)
|
|
| 597 |
- keys_file = os.path.join(extract, 'meta', 'keys.yaml')
|
|
| 598 |
- if os.path.exists(keys_file):
|
|
| 599 |
- keys_meta = _yaml.load(keys_file)
|
|
| 600 |
- keys = [keys_meta['strong'], keys_meta['weak']]
|
|
| 601 |
- remove_extract = True
|
|
| 602 |
- for other_hash in keys:
|
|
| 603 |
- if other_hash == ref_hash:
|
|
| 604 |
- continue
|
|
| 605 |
- remove_extract = False
|
|
| 606 |
- break
|
|
| 607 |
- |
|
| 608 |
- if remove_extract:
|
|
| 609 |
- utils._force_rmtree(extract)
|
|
| 610 |
- |
|
| 611 | 545 |
# Remove cache ref
|
| 612 | 546 |
refpath = self._refpath(ref)
|
| 613 | 547 |
if not os.path.exists(refpath):
|
| 614 |
- raise ArtifactError("Could not find artifact for ref '{}'".format(ref))
|
|
| 548 |
+ raise CASError("Could not find ref '{}'".format(ref))
|
|
| 615 | 549 |
|
| 616 | 550 |
os.unlink(refpath)
|
| 617 | 551 |
|
| ... | ... | @@ -721,7 +655,7 @@ class CASCache(ArtifactCache): |
| 721 | 655 |
# The process serving the socket can't be cached anyway
|
| 722 | 656 |
pass
|
| 723 | 657 |
else:
|
| 724 |
- raise ArtifactError("Unsupported file type for {}".format(full_path))
|
|
| 658 |
+ raise CASError("Unsupported file type for {}".format(full_path))
|
|
| 725 | 659 |
|
| 726 | 660 |
return self.add_object(digest=dir_digest,
|
| 727 | 661 |
buffer=directory.SerializeToString())
|
| ... | ... | @@ -740,7 +674,7 @@ class CASCache(ArtifactCache): |
| 740 | 674 |
if dirnode.name == name:
|
| 741 | 675 |
return dirnode.digest
|
| 742 | 676 |
|
| 743 |
- raise ArtifactError("Subdirectory {} not found".format(name))
|
|
| 677 |
+ raise CASError("Subdirectory {} not found".format(name))
|
|
| 744 | 678 |
|
| 745 | 679 |
def _diff_trees(self, tree_a, tree_b, *, added, removed, modified, path=""):
|
| 746 | 680 |
dir_a = remote_execution_pb2.Directory()
|
| ... | ... | @@ -812,29 +746,6 @@ class CASCache(ArtifactCache): |
| 812 | 746 |
for dirnode in directory.directories:
|
| 813 | 747 |
self._reachable_refs_dir(reachable, dirnode.digest)
|
| 814 | 748 |
|
| 815 |
- def _initialize_remote(self, remote_spec, q):
|
|
| 816 |
- try:
|
|
| 817 |
- remote = _CASRemote(remote_spec)
|
|
| 818 |
- remote.init()
|
|
| 819 |
- |
|
| 820 |
- request = buildstream_pb2.StatusRequest()
|
|
| 821 |
- response = remote.ref_storage.Status(request)
|
|
| 822 |
- |
|
| 823 |
- if remote_spec.push and not response.allow_updates:
|
|
| 824 |
- q.put('Artifact server does not allow push')
|
|
| 825 |
- else:
|
|
| 826 |
- # No error
|
|
| 827 |
- q.put(None)
|
|
| 828 |
- |
|
| 829 |
- except grpc.RpcError as e:
|
|
| 830 |
- # str(e) is too verbose for errors reported to the user
|
|
| 831 |
- q.put(e.details())
|
|
| 832 |
- |
|
| 833 |
- except Exception as e: # pylint: disable=broad-except
|
|
| 834 |
- # Whatever happens, we need to return it to the calling process
|
|
| 835 |
- #
|
|
| 836 |
- q.put(str(e))
|
|
| 837 |
- |
|
| 838 | 749 |
def _required_blobs(self, directory_digest):
|
| 839 | 750 |
# parse directory, and recursively add blobs
|
| 840 | 751 |
d = remote_execution_pb2.Digest()
|
| ... | ... | @@ -1080,7 +991,7 @@ class CASCache(ArtifactCache): |
| 1080 | 991 |
|
| 1081 | 992 |
# Represents a single remote CAS cache.
|
| 1082 | 993 |
#
|
| 1083 |
-class _CASRemote():
|
|
| 994 |
+class CASRemote():
|
|
| 1084 | 995 |
def __init__(self, spec):
|
| 1085 | 996 |
self.spec = spec
|
| 1086 | 997 |
self._initialized = False
|
| ... | ... | @@ -1125,7 +1036,7 @@ class _CASRemote(): |
| 1125 | 1036 |
certificate_chain=client_cert_bytes)
|
| 1126 | 1037 |
self.channel = grpc.secure_channel('{}:{}'.format(url.hostname, port), credentials)
|
| 1127 | 1038 |
else:
|
| 1128 |
- raise ArtifactError("Unsupported URL: {}".format(self.spec.url))
|
|
| 1039 |
+ raise CASError("Unsupported URL: {}".format(self.spec.url))
|
|
| 1129 | 1040 |
|
| 1130 | 1041 |
self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel)
|
| 1131 | 1042 |
self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
|
| ... | ... | @@ -1203,10 +1114,10 @@ class _CASBatchRead(): |
| 1203 | 1114 |
|
| 1204 | 1115 |
for response in batch_response.responses:
|
| 1205 | 1116 |
if response.status.code != code_pb2.OK:
|
| 1206 |
- raise ArtifactError("Failed to download blob {}: {}".format(
|
|
| 1117 |
+ raise CASError("Failed to download blob {}: {}".format(
|
|
| 1207 | 1118 |
response.digest.hash, response.status.code))
|
| 1208 | 1119 |
if response.digest.size_bytes != len(response.data):
|
| 1209 |
- raise ArtifactError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
|
|
| 1120 |
+ raise CASError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
|
|
| 1210 | 1121 |
response.digest.hash, response.digest.size_bytes, len(response.data)))
|
| 1211 | 1122 |
|
| 1212 | 1123 |
yield (response.digest, response.data)
|
| ... | ... | @@ -1248,7 +1159,7 @@ class _CASBatchUpdate(): |
| 1248 | 1159 |
|
| 1249 | 1160 |
for response in batch_response.responses:
|
| 1250 | 1161 |
if response.status.code != code_pb2.OK:
|
| 1251 |
- raise ArtifactError("Failed to upload blob {}: {}".format(
|
|
| 1162 |
+ raise CASError("Failed to upload blob {}: {}".format(
|
|
| 1252 | 1163 |
response.digest.hash, response.status.code))
|
| 1253 | 1164 |
|
| 1254 | 1165 |
|
| ... | ... | @@ -32,8 +32,9 @@ from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remo |
| 32 | 32 |
from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
|
| 33 | 33 |
from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
|
| 34 | 34 |
|
| 35 |
-from .._exceptions import ArtifactError
|
|
| 36 |
-from .._context import Context
|
|
| 35 |
+from .._exceptions import CASError
|
|
| 36 |
+ |
|
| 37 |
+from .cascache import CASCache
|
|
| 37 | 38 |
|
| 38 | 39 |
|
| 39 | 40 |
# The default limit for gRPC messages is 4 MiB.
|
| ... | ... | @@ -55,26 +56,23 @@ class ArtifactTooLargeException(Exception): |
| 55 | 56 |
# enable_push (bool): Whether to allow blob uploads and artifact updates
|
| 56 | 57 |
#
|
| 57 | 58 |
def create_server(repo, *, enable_push):
|
| 58 |
- context = Context()
|
|
| 59 |
- context.artifactdir = os.path.abspath(repo)
|
|
| 60 |
- |
|
| 61 |
- artifactcache = context.artifactcache
|
|
| 59 |
+ cas = CASCache(os.path.abspath(repo))
|
|
| 62 | 60 |
|
| 63 | 61 |
# Use max_workers default from Python 3.5+
|
| 64 | 62 |
max_workers = (os.cpu_count() or 1) * 5
|
| 65 | 63 |
server = grpc.server(futures.ThreadPoolExecutor(max_workers))
|
| 66 | 64 |
|
| 67 | 65 |
bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
|
| 68 |
- _ByteStreamServicer(artifactcache, enable_push=enable_push), server)
|
|
| 66 |
+ _ByteStreamServicer(cas, enable_push=enable_push), server)
|
|
| 69 | 67 |
|
| 70 | 68 |
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
|
| 71 |
- _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
|
|
| 69 |
+ _ContentAddressableStorageServicer(cas, enable_push=enable_push), server)
|
|
| 72 | 70 |
|
| 73 | 71 |
remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
|
| 74 | 72 |
_CapabilitiesServicer(), server)
|
| 75 | 73 |
|
| 76 | 74 |
buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server(
|
| 77 |
- _ReferenceStorageServicer(artifactcache, enable_push=enable_push), server)
|
|
| 75 |
+ _ReferenceStorageServicer(cas, enable_push=enable_push), server)
|
|
| 78 | 76 |
|
| 79 | 77 |
return server
|
| 80 | 78 |
|
| ... | ... | @@ -333,7 +331,7 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): |
| 333 | 331 |
|
| 334 | 332 |
response.digest.hash = tree.hash
|
| 335 | 333 |
response.digest.size_bytes = tree.size_bytes
|
| 336 |
- except ArtifactError:
|
|
| 334 |
+ except CASError:
|
|
| 337 | 335 |
context.set_code(grpc.StatusCode.NOT_FOUND)
|
| 338 | 336 |
|
| 339 | 337 |
return response
|
| ... | ... | @@ -437,7 +435,7 @@ def _clean_up_cache(cas, object_size): |
| 437 | 435 |
return 0
|
| 438 | 436 |
|
| 439 | 437 |
# obtain a list of LRP artifacts
|
| 440 |
- LRP_artifacts = cas.list_artifacts()
|
|
| 438 |
+ LRP_artifacts = cas.list_refs()
|
|
| 441 | 439 |
|
| 442 | 440 |
removed_size = 0 # in bytes
|
| 443 | 441 |
while object_size - removed_size > free_disk_space:
|
| ... | ... | @@ -31,7 +31,6 @@ from ._exceptions import LoadError, LoadErrorReason, BstError |
| 31 | 31 |
from ._message import Message, MessageType
|
| 32 | 32 |
from ._profile import Topics, profile_start, profile_end
|
| 33 | 33 |
from ._artifactcache import ArtifactCache
|
| 34 |
-from ._artifactcache.cascache import CASCache
|
|
| 35 | 34 |
from ._workspaces import Workspaces
|
| 36 | 35 |
from .plugin import _plugin_lookup
|
| 37 | 36 |
|
| ... | ... | @@ -233,7 +232,7 @@ class Context(): |
| 233 | 232 |
@property
|
| 234 | 233 |
def artifactcache(self):
|
| 235 | 234 |
if not self._artifactcache:
|
| 236 |
- self._artifactcache = CASCache(self)
|
|
| 235 |
+ self._artifactcache = ArtifactCache(self)
|
|
| 237 | 236 |
|
| 238 | 237 |
return self._artifactcache
|
| 239 | 238 |
|
| ... | ... | @@ -47,7 +47,6 @@ class ElementFactory(PluginContext): |
| 47 | 47 |
# Args:
|
| 48 | 48 |
# context (object): The Context object for processing
|
| 49 | 49 |
# project (object): The project object
|
| 50 |
- # artifacts (ArtifactCache): The artifact cache
|
|
| 51 | 50 |
# meta (object): The loaded MetaElement
|
| 52 | 51 |
#
|
| 53 | 52 |
# Returns: A newly created Element object of the appropriate kind
|
| ... | ... | @@ -56,9 +55,9 @@ class ElementFactory(PluginContext): |
| 56 | 55 |
# PluginError (if the kind lookup failed)
|
| 57 | 56 |
# LoadError (if the element itself took issue with the config)
|
| 58 | 57 |
#
|
| 59 |
- def create(self, context, project, artifacts, meta):
|
|
| 58 |
+ def create(self, context, project, meta):
|
|
| 60 | 59 |
element_type, default_config = self.lookup(meta.kind)
|
| 61 |
- element = element_type(context, project, artifacts, meta, default_config)
|
|
| 60 |
+ element = element_type(context, project, meta, default_config)
|
|
| 62 | 61 |
version = self._format_versions.get(meta.kind, 0)
|
| 63 | 62 |
self._assert_plugin_format(element, version)
|
| 64 | 63 |
return element
|
| ... | ... | @@ -90,6 +90,7 @@ class ErrorDomain(Enum): |
| 90 | 90 |
APP = 12
|
| 91 | 91 |
STREAM = 13
|
| 92 | 92 |
VIRTUAL_FS = 14
|
| 93 |
+ CAS = 15
|
|
| 93 | 94 |
|
| 94 | 95 |
|
| 95 | 96 |
# BstError is an internal base exception class for BuildSream
|
| ... | ... | @@ -274,6 +275,15 @@ class ArtifactError(BstError): |
| 274 | 275 |
super().__init__(message, detail=detail, domain=ErrorDomain.ARTIFACT, reason=reason, temporary=True)
|
| 275 | 276 |
|
| 276 | 277 |
|
| 278 |
+# CASError
|
|
| 279 |
+#
|
|
| 280 |
+# Raised when errors are encountered in the CAS
|
|
| 281 |
+#
|
|
| 282 |
+class CASError(BstError):
|
|
| 283 |
+ def __init__(self, message, *, detail=None, reason=None, temporary=False):
|
|
| 284 |
+ super().__init__(message, detail=detail, domain=ErrorDomain.CAS, reason=reason, temporary=True)
|
|
| 285 |
+ |
|
| 286 |
+ |
|
| 277 | 287 |
# PipelineError
|
| 278 | 288 |
#
|
| 279 | 289 |
# Raised from pipeline operations
|
| ... | ... | @@ -537,7 +537,7 @@ class Loader(): |
| 537 | 537 |
raise LoadError(LoadErrorReason.INVALID_DATA,
|
| 538 | 538 |
"{}: Expected junction but element kind is {}".format(filename, meta_element.kind))
|
| 539 | 539 |
|
| 540 |
- element = Element._new_from_meta(meta_element, self._context.artifactcache)
|
|
| 540 |
+ element = Element._new_from_meta(meta_element)
|
|
| 541 | 541 |
element._preflight()
|
| 542 | 542 |
|
| 543 | 543 |
sources = list(element.sources())
|
| ... | ... | @@ -106,7 +106,7 @@ class Pipeline(): |
| 106 | 106 |
|
| 107 | 107 |
profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in targets))
|
| 108 | 108 |
|
| 109 |
- elements = self._project.load_elements(targets, self._artifacts,
|
|
| 109 |
+ elements = self._project.load_elements(targets,
|
|
| 110 | 110 |
rewritable=rewritable,
|
| 111 | 111 |
fetch_subprojects=fetch_subprojects)
|
| 112 | 112 |
|
| ... | ... | @@ -224,18 +224,17 @@ class Project(): |
| 224 | 224 |
# Instantiate and return an element
|
| 225 | 225 |
#
|
| 226 | 226 |
# Args:
|
| 227 |
- # artifacts (ArtifactCache): The artifact cache
|
|
| 228 | 227 |
# meta (MetaElement): The loaded MetaElement
|
| 229 | 228 |
# first_pass (bool): Whether to use first pass configuration (for junctions)
|
| 230 | 229 |
#
|
| 231 | 230 |
# Returns:
|
| 232 | 231 |
# (Element): A newly created Element object of the appropriate kind
|
| 233 | 232 |
#
|
| 234 |
- def create_element(self, artifacts, meta, *, first_pass=False):
|
|
| 233 |
+ def create_element(self, meta, *, first_pass=False):
|
|
| 235 | 234 |
if first_pass:
|
| 236 |
- return self.first_pass_config.element_factory.create(self._context, self, artifacts, meta)
|
|
| 235 |
+ return self.first_pass_config.element_factory.create(self._context, self, meta)
|
|
| 237 | 236 |
else:
|
| 238 |
- return self.config.element_factory.create(self._context, self, artifacts, meta)
|
|
| 237 |
+ return self.config.element_factory.create(self._context, self, meta)
|
|
| 239 | 238 |
|
| 240 | 239 |
# create_source()
|
| 241 | 240 |
#
|
| ... | ... | @@ -305,7 +304,6 @@ class Project(): |
| 305 | 304 |
#
|
| 306 | 305 |
# Args:
|
| 307 | 306 |
# targets (list): Target names
|
| 308 |
- # artifacts (ArtifactCache): Artifact cache
|
|
| 309 | 307 |
# rewritable (bool): Whether the loaded files should be rewritable
|
| 310 | 308 |
# this is a bit more expensive due to deep copies
|
| 311 | 309 |
# fetch_subprojects (bool): Whether we should fetch subprojects as a part of the
|
| ... | ... | @@ -314,7 +312,7 @@ class Project(): |
| 314 | 312 |
# Returns:
|
| 315 | 313 |
# (list): A list of loaded Element
|
| 316 | 314 |
#
|
| 317 |
- def load_elements(self, targets, artifacts, *,
|
|
| 315 |
+ def load_elements(self, targets, *,
|
|
| 318 | 316 |
rewritable=False, fetch_subprojects=False):
|
| 319 | 317 |
with self._context.timed_activity("Loading elements", silent_nested=True):
|
| 320 | 318 |
meta_elements = self.loader.load(targets, rewritable=rewritable,
|
| ... | ... | @@ -323,7 +321,7 @@ class Project(): |
| 323 | 321 |
|
| 324 | 322 |
with self._context.timed_activity("Resolving elements"):
|
| 325 | 323 |
elements = [
|
| 326 |
- Element._new_from_meta(meta, artifacts)
|
|
| 324 |
+ Element._new_from_meta(meta)
|
|
| 327 | 325 |
for meta in meta_elements
|
| 328 | 326 |
]
|
| 329 | 327 |
|
| ... | ... | @@ -174,7 +174,7 @@ class Element(Plugin): |
| 174 | 174 |
*Since: 1.4*
|
| 175 | 175 |
"""
|
| 176 | 176 |
|
| 177 |
- def __init__(self, context, project, artifacts, meta, plugin_conf):
|
|
| 177 |
+ def __init__(self, context, project, meta, plugin_conf):
|
|
| 178 | 178 |
|
| 179 | 179 |
self.__cache_key_dict = None # Dict for cache key calculation
|
| 180 | 180 |
self.__cache_key = None # Our cached cache key
|
| ... | ... | @@ -199,7 +199,7 @@ class Element(Plugin): |
| 199 | 199 |
self.__sources = [] # List of Sources
|
| 200 | 200 |
self.__weak_cache_key = None # Our cached weak cache key
|
| 201 | 201 |
self.__strict_cache_key = None # Our cached cache key for strict builds
|
| 202 |
- self.__artifacts = artifacts # Artifact cache
|
|
| 202 |
+ self.__artifacts = context.artifactcache # Artifact cache
|
|
| 203 | 203 |
self.__consistency = Consistency.INCONSISTENT # Cached overall consistency state
|
| 204 | 204 |
self.__strong_cached = None # Whether we have a cached artifact
|
| 205 | 205 |
self.__weak_cached = None # Whether we have a cached artifact
|
| ... | ... | @@ -872,14 +872,13 @@ class Element(Plugin): |
| 872 | 872 |
# and its dependencies from a meta element.
|
| 873 | 873 |
#
|
| 874 | 874 |
# Args:
|
| 875 |
- # artifacts (ArtifactCache): The artifact cache
|
|
| 876 | 875 |
# meta (MetaElement): The meta element
|
| 877 | 876 |
#
|
| 878 | 877 |
# Returns:
|
| 879 | 878 |
# (Element): A newly created Element instance
|
| 880 | 879 |
#
|
| 881 | 880 |
@classmethod
|
| 882 |
- def _new_from_meta(cls, meta, artifacts):
|
|
| 881 |
+ def _new_from_meta(cls, meta):
|
|
| 883 | 882 |
|
| 884 | 883 |
if not meta.first_pass:
|
| 885 | 884 |
meta.project.ensure_fully_loaded()
|
| ... | ... | @@ -887,7 +886,7 @@ class Element(Plugin): |
| 887 | 886 |
if meta in cls.__instantiated_elements:
|
| 888 | 887 |
return cls.__instantiated_elements[meta]
|
| 889 | 888 |
|
| 890 |
- element = meta.project.create_element(artifacts, meta, first_pass=meta.first_pass)
|
|
| 889 |
+ element = meta.project.create_element(meta, first_pass=meta.first_pass)
|
|
| 891 | 890 |
cls.__instantiated_elements[meta] = element
|
| 892 | 891 |
|
| 893 | 892 |
# Instantiate sources
|
| ... | ... | @@ -904,10 +903,10 @@ class Element(Plugin): |
| 904 | 903 |
|
| 905 | 904 |
# Instantiate dependencies
|
| 906 | 905 |
for meta_dep in meta.dependencies:
|
| 907 |
- dependency = Element._new_from_meta(meta_dep, artifacts)
|
|
| 906 |
+ dependency = Element._new_from_meta(meta_dep)
|
|
| 908 | 907 |
element.__runtime_dependencies.append(dependency)
|
| 909 | 908 |
for meta_dep in meta.build_dependencies:
|
| 910 |
- dependency = Element._new_from_meta(meta_dep, artifacts)
|
|
| 909 |
+ dependency = Element._new_from_meta(meta_dep)
|
|
| 911 | 910 |
element.__build_dependencies.append(dependency)
|
| 912 | 911 |
|
| 913 | 912 |
return element
|
| ... | ... | @@ -2057,7 +2056,7 @@ class Element(Plugin): |
| 2057 | 2056 |
'sources': [s._get_unique_key(workspace is None) for s in self.__sources],
|
| 2058 | 2057 |
'workspace': '' if workspace is None else workspace.get_key(self._get_project()),
|
| 2059 | 2058 |
'public': self.__public,
|
| 2060 |
- 'cache': type(self.__artifacts).__name__
|
|
| 2059 |
+ 'cache': 'CASCache'
|
|
| 2061 | 2060 |
}
|
| 2062 | 2061 |
|
| 2063 | 2062 |
self.__cache_key_dict['fatal-warnings'] = sorted(project._fatal_warnings)
|
| ... | ... | @@ -2180,6 +2179,7 @@ class Element(Plugin): |
| 2180 | 2179 |
stderr=stderr,
|
| 2181 | 2180 |
config=config,
|
| 2182 | 2181 |
server_url=self.__remote_execution_url,
|
| 2182 |
+ bare_directory=bare_directory,
|
|
| 2183 | 2183 |
allow_real_directory=False)
|
| 2184 | 2184 |
yield sandbox
|
| 2185 | 2185 |
|
| ... | ... | @@ -42,4 +42,5 @@ class SandboxDummy(Sandbox): |
| 42 | 42 |
"'{}'".format(command[0]),
|
| 43 | 43 |
reason='missing-command')
|
| 44 | 44 |
|
| 45 |
- raise SandboxError("This platform does not support local builds: {}".format(self._reason))
|
|
| 45 |
+ raise SandboxError("This platform does not support local builds: {}".format(self._reason),
|
|
| 46 |
+ reason="unavailable-local-sandbox")
|
| ... | ... | @@ -28,10 +28,7 @@ from ..storage._filebaseddirectory import FileBasedDirectory |
| 28 | 28 |
from ..storage._casbaseddirectory import CasBasedDirectory
|
| 29 | 29 |
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
| 30 | 30 |
from .._protos.google.rpc import code_pb2
|
| 31 |
- |
|
| 32 |
- |
|
| 33 |
-class SandboxError(Exception):
|
|
| 34 |
- pass
|
|
| 31 |
+from .._exceptions import SandboxError
|
|
| 35 | 32 |
|
| 36 | 33 |
|
| 37 | 34 |
# SandboxRemote()
|
| ... | ... | @@ -79,7 +79,7 @@ class CasBasedDirectory(Directory): |
| 79 | 79 |
self.filename = filename
|
| 80 | 80 |
self.common_name = common_name
|
| 81 | 81 |
self.pb2_directory = remote_execution_pb2.Directory()
|
| 82 |
- self.cas_cache = context.artifactcache
|
|
| 82 |
+ self.cas_cache = context.artifactcache.cas
|
|
| 83 | 83 |
if ref:
|
| 84 | 84 |
with open(self.cas_cache.objpath(ref), 'rb') as f:
|
| 85 | 85 |
self.pb2_directory.ParseFromString(f.read())
|
| ... | ... | @@ -634,7 +634,7 @@ def _parse_size(size, volume): |
| 634 | 634 |
|
| 635 | 635 |
# _pretty_size()
|
| 636 | 636 |
#
|
| 637 |
-# Converts a number of bytes into a string representation in KB, MB, GB, TB
|
|
| 637 |
+# Converts a number of bytes into a string representation in KiB, MiB, GiB, TiB
|
|
| 638 | 638 |
# represented as K, M, G, T etc.
|
| 639 | 639 |
#
|
| 640 | 640 |
# Args:
|
| ... | ... | @@ -646,10 +646,11 @@ def _parse_size(size, volume): |
| 646 | 646 |
def _pretty_size(size, dec_places=0):
|
| 647 | 647 |
psize = size
|
| 648 | 648 |
unit = 'B'
|
| 649 |
- for unit in ('B', 'K', 'M', 'G', 'T'):
|
|
| 649 |
+ units = ('B', 'K', 'M', 'G', 'T')
|
|
| 650 |
+ for unit in units:
|
|
| 650 | 651 |
if psize < 1024:
|
| 651 | 652 |
break
|
| 652 |
- else:
|
|
| 653 |
+ elif unit != units[-1]:
|
|
| 653 | 654 |
psize /= 1024
|
| 654 | 655 |
return "{size:g}{unit}".format(size=round(psize, dec_places), unit=unit)
|
| 655 | 656 |
|
| ... | ... | @@ -23,6 +23,8 @@ import shutil |
| 23 | 23 |
|
| 24 | 24 |
import pytest
|
| 25 | 25 |
|
| 26 |
+from buildstream._platform.platform import Platform
|
|
| 27 |
+ |
|
| 26 | 28 |
|
| 27 | 29 |
def pytest_addoption(parser):
|
| 28 | 30 |
parser.addoption('--integration', action='store_true', default=False,
|
| ... | ... | @@ -52,3 +54,8 @@ def integration_cache(request): |
| 52 | 54 |
shutil.rmtree(os.path.join(cache_dir, 'artifacts'))
|
| 53 | 55 |
except FileNotFoundError:
|
| 54 | 56 |
pass
|
| 57 |
+ |
|
| 58 |
+ |
|
| 59 |
+@pytest.fixture(autouse=True)
|
|
| 60 |
+def clean_platform_cache():
|
|
| 61 |
+ Platform._instance = None
|
| 1 | 1 |
coverage == 4.4.0
|
| 2 | 2 |
pep8
|
| 3 | 3 |
pylint == 2.1.1
|
| 4 |
-pytest >= 3.7
|
|
| 4 |
+pytest >= 3.8
|
|
| 5 | 5 |
pytest-cov >= 2.5.0
|
| 6 | 6 |
pytest-datafiles
|
| 7 | 7 |
pytest-env
|
| ... | ... | @@ -90,7 +90,7 @@ def test_pull(cli, tmpdir, datafiles): |
| 90 | 90 |
cas = context.artifactcache
|
| 91 | 91 |
|
| 92 | 92 |
# Assert that the element's artifact is **not** cached
|
| 93 |
- element = project.load_elements(['target.bst'], cas)[0]
|
|
| 93 |
+ element = project.load_elements(['target.bst'])[0]
|
|
| 94 | 94 |
element_key = cli.get_element_key(project_dir, 'target.bst')
|
| 95 | 95 |
assert not cas.contains(element, element_key)
|
| 96 | 96 |
|
| ... | ... | @@ -132,7 +132,7 @@ def _test_pull(user_config_file, project_dir, artifact_dir, |
| 132 | 132 |
cas = context.artifactcache
|
| 133 | 133 |
|
| 134 | 134 |
# Load the target element
|
| 135 |
- element = project.load_elements([element_name], cas)[0]
|
|
| 135 |
+ element = project.load_elements([element_name])[0]
|
|
| 136 | 136 |
|
| 137 | 137 |
# Manually setup the CAS remote
|
| 138 | 138 |
cas.setup_remotes(use_config=True)
|
| ... | ... | @@ -190,15 +190,16 @@ def test_pull_tree(cli, tmpdir, datafiles): |
| 190 | 190 |
# Load the project and CAS cache
|
| 191 | 191 |
project = Project(project_dir, context)
|
| 192 | 192 |
project.ensure_fully_loaded()
|
| 193 |
- cas = context.artifactcache
|
|
| 193 |
+ artifactcache = context.artifactcache
|
|
| 194 |
+ cas = artifactcache.cas
|
|
| 194 | 195 |
|
| 195 | 196 |
# Assert that the element's artifact is cached
|
| 196 |
- element = project.load_elements(['target.bst'], cas)[0]
|
|
| 197 |
+ element = project.load_elements(['target.bst'])[0]
|
|
| 197 | 198 |
element_key = cli.get_element_key(project_dir, 'target.bst')
|
| 198 |
- assert cas.contains(element, element_key)
|
|
| 199 |
+ assert artifactcache.contains(element, element_key)
|
|
| 199 | 200 |
|
| 200 | 201 |
# Retrieve the Directory object from the cached artifact
|
| 201 |
- artifact_ref = cas.get_artifact_fullname(element, element_key)
|
|
| 202 |
+ artifact_ref = artifactcache.get_artifact_fullname(element, element_key)
|
|
| 202 | 203 |
artifact_digest = cas.resolve_ref(artifact_ref)
|
| 203 | 204 |
|
| 204 | 205 |
queue = multiprocessing.Queue()
|
| ... | ... | @@ -268,12 +269,13 @@ def _test_push_tree(user_config_file, project_dir, artifact_dir, artifact_digest |
| 268 | 269 |
project.ensure_fully_loaded()
|
| 269 | 270 |
|
| 270 | 271 |
# Create a local CAS cache handle
|
| 271 |
- cas = context.artifactcache
|
|
| 272 |
+ artifactcache = context.artifactcache
|
|
| 273 |
+ cas = artifactcache.cas
|
|
| 272 | 274 |
|
| 273 | 275 |
# Manually setup the CAS remote
|
| 274 |
- cas.setup_remotes(use_config=True)
|
|
| 276 |
+ artifactcache.setup_remotes(use_config=True)
|
|
| 275 | 277 |
|
| 276 |
- if cas.has_push_remotes():
|
|
| 278 |
+ if artifactcache.has_push_remotes():
|
|
| 277 | 279 |
directory = remote_execution_pb2.Directory()
|
| 278 | 280 |
|
| 279 | 281 |
with open(cas.objpath(artifact_digest), 'rb') as f:
|
| ... | ... | @@ -284,7 +286,7 @@ def _test_push_tree(user_config_file, project_dir, artifact_dir, artifact_digest |
| 284 | 286 |
tree_maker(cas, tree, directory)
|
| 285 | 287 |
|
| 286 | 288 |
# Push the Tree as a regular message
|
| 287 |
- tree_digest = cas.push_message(project, tree)
|
|
| 289 |
+ tree_digest = artifactcache.push_message(project, tree)
|
|
| 288 | 290 |
|
| 289 | 291 |
queue.put((tree_digest.hash, tree_digest.size_bytes))
|
| 290 | 292 |
else:
|
| ... | ... | @@ -69,7 +69,7 @@ def test_push(cli, tmpdir, datafiles): |
| 69 | 69 |
cas = context.artifactcache
|
| 70 | 70 |
|
| 71 | 71 |
# Assert that the element's artifact is cached
|
| 72 |
- element = project.load_elements(['target.bst'], cas)[0]
|
|
| 72 |
+ element = project.load_elements(['target.bst'])[0]
|
|
| 73 | 73 |
element_key = cli.get_element_key(project_dir, 'target.bst')
|
| 74 | 74 |
assert cas.contains(element, element_key)
|
| 75 | 75 |
|
| ... | ... | @@ -111,7 +111,7 @@ def _test_push(user_config_file, project_dir, artifact_dir, |
| 111 | 111 |
cas = context.artifactcache
|
| 112 | 112 |
|
| 113 | 113 |
# Load the target element
|
| 114 |
- element = project.load_elements([element_name], cas)[0]
|
|
| 114 |
+ element = project.load_elements([element_name])[0]
|
|
| 115 | 115 |
|
| 116 | 116 |
# Manually setup the CAS remote
|
| 117 | 117 |
cas.setup_remotes(use_config=True)
|
| ... | ... | @@ -165,20 +165,21 @@ def test_push_directory(cli, tmpdir, datafiles): |
| 165 | 165 |
# Load the project and CAS cache
|
| 166 | 166 |
project = Project(project_dir, context)
|
| 167 | 167 |
project.ensure_fully_loaded()
|
| 168 |
- cas = context.artifactcache
|
|
| 168 |
+ artifactcache = context.artifactcache
|
|
| 169 |
+ cas = artifactcache.cas
|
|
| 169 | 170 |
|
| 170 | 171 |
# Assert that the element's artifact is cached
|
| 171 |
- element = project.load_elements(['target.bst'], cas)[0]
|
|
| 172 |
+ element = project.load_elements(['target.bst'])[0]
|
|
| 172 | 173 |
element_key = cli.get_element_key(project_dir, 'target.bst')
|
| 173 |
- assert cas.contains(element, element_key)
|
|
| 174 |
+ assert artifactcache.contains(element, element_key)
|
|
| 174 | 175 |
|
| 175 | 176 |
# Manually setup the CAS remote
|
| 176 |
- cas.setup_remotes(use_config=True)
|
|
| 177 |
- cas.initialize_remotes()
|
|
| 178 |
- assert cas.has_push_remotes(element=element)
|
|
| 177 |
+ artifactcache.setup_remotes(use_config=True)
|
|
| 178 |
+ artifactcache.initialize_remotes()
|
|
| 179 |
+ assert artifactcache.has_push_remotes(element=element)
|
|
| 179 | 180 |
|
| 180 | 181 |
# Recreate the CasBasedDirectory object from the cached artifact
|
| 181 |
- artifact_ref = cas.get_artifact_fullname(element, element_key)
|
|
| 182 |
+ artifact_ref = artifactcache.get_artifact_fullname(element, element_key)
|
|
| 182 | 183 |
artifact_digest = cas.resolve_ref(artifact_ref)
|
| 183 | 184 |
|
| 184 | 185 |
queue = multiprocessing.Queue()
|
| 1 |
+import os
|
|
| 2 |
+import pytest
|
|
| 3 |
+from tests.testutils import cli
|
|
| 4 |
+from tests.testutils.site import IS_LINUX
|
|
| 5 |
+ |
|
| 6 |
+from buildstream import _site, _yaml
|
|
| 7 |
+from buildstream._exceptions import ErrorDomain
|
|
| 8 |
+ |
|
| 9 |
+ |
|
| 10 |
+# Project directory
|
|
| 11 |
+DATA_DIR = os.path.join(
|
|
| 12 |
+ os.path.dirname(os.path.realpath(__file__)),
|
|
| 13 |
+ 'project',
|
|
| 14 |
+)
|
|
| 15 |
+ |
|
| 16 |
+ |
|
| 17 |
+@pytest.fixture
|
|
| 18 |
+def clean_bwrap_cache():
|
|
| 19 |
+ _site._bwrap_major = None
|
|
| 20 |
+ _site._bwrap_minor = None
|
|
| 21 |
+ _site._bwrap_patch = None
|
|
| 22 |
+ |
|
| 23 |
+ |
|
| 24 |
+@pytest.mark.integration
|
|
| 25 |
+@pytest.mark.skipif(not IS_LINUX, reason='Only available on Linux')
|
|
| 26 |
+@pytest.mark.datafiles(DATA_DIR)
|
|
| 27 |
+@pytest.mark.usefixtures(clean_bwrap_cache)
|
|
| 28 |
+def test_missing_brwap_has_nice_error_message(cli, datafiles):
|
|
| 29 |
+ project = os.path.join(datafiles.dirname, datafiles.basename)
|
|
| 30 |
+ element_path = os.path.join(project, 'elements', 'element.bst')
|
|
| 31 |
+ |
|
| 32 |
+ # Write out our test target
|
|
| 33 |
+ element = {
|
|
| 34 |
+ 'kind': 'script',
|
|
| 35 |
+ 'depends': [
|
|
| 36 |
+ {
|
|
| 37 |
+ 'filename': 'base.bst',
|
|
| 38 |
+ 'type': 'build',
|
|
| 39 |
+ },
|
|
| 40 |
+ ],
|
|
| 41 |
+ 'config': {
|
|
| 42 |
+ 'commands': [
|
|
| 43 |
+ 'false',
|
|
| 44 |
+ ],
|
|
| 45 |
+ },
|
|
| 46 |
+ }
|
|
| 47 |
+ _yaml.dump(element, element_path)
|
|
| 48 |
+ |
|
| 49 |
+ # Build without access to host tools, this should fail with a nice error
|
|
| 50 |
+ result = cli.run(
|
|
| 51 |
+ project=project, args=['build', 'element.bst'], env={'PATH': ''})
|
|
| 52 |
+ result.assert_task_error(ErrorDomain.SANDBOX, 'unavailable-local-sandbox')
|
|
| 53 |
+ assert "not found" in result.stderr
|
|
| 54 |
+ |
|
| 55 |
+ |
|
| 56 |
+@pytest.mark.integration
|
|
| 57 |
+@pytest.mark.skipif(not IS_LINUX, reason='Only available on Linux')
|
|
| 58 |
+@pytest.mark.datafiles(DATA_DIR)
|
|
| 59 |
+@pytest.mark.usefixtures(clean_bwrap_cache)
|
|
| 60 |
+def test_old_brwap_has_nice_error_message(cli, datafiles, tmp_path):
|
|
| 61 |
+ import buildstream._platform
|
|
| 62 |
+ buildstream._platform.Platform._instance = None
|
|
| 63 |
+ |
|
| 64 |
+ bwrap = tmp_path.joinpath('bin/bwrap')
|
|
| 65 |
+ bwrap.parent.mkdir()
|
|
| 66 |
+ with bwrap.open('w') as fp:
|
|
| 67 |
+ fp.write('''
|
|
| 68 |
+ #!/bin/bash
|
|
| 69 |
+ echo bubblewrap 0.0.1
|
|
| 70 |
+ '''.strip())
|
|
| 71 |
+ |
|
| 72 |
+ bwrap.chmod(0o755)
|
|
| 73 |
+ |
|
| 74 |
+ project = os.path.join(datafiles.dirname, datafiles.basename)
|
|
| 75 |
+ element_path = os.path.join(project, 'elements', 'element3.bst')
|
|
| 76 |
+ |
|
| 77 |
+ # Write out our test target
|
|
| 78 |
+ element = {
|
|
| 79 |
+ 'kind': 'script',
|
|
| 80 |
+ 'depends': [
|
|
| 81 |
+ {
|
|
| 82 |
+ 'filename': 'base.bst',
|
|
| 83 |
+ 'type': 'build',
|
|
| 84 |
+ },
|
|
| 85 |
+ ],
|
|
| 86 |
+ 'config': {
|
|
| 87 |
+ 'commands': [
|
|
| 88 |
+ 'true && true && false',
|
|
| 89 |
+ ],
|
|
| 90 |
+ },
|
|
| 91 |
+ }
|
|
| 92 |
+ _yaml.dump(element, element_path)
|
|
| 93 |
+ |
|
| 94 |
+ # Build without access to host tools, this should fail with a nice error
|
|
| 95 |
+ result = cli.run(
|
|
| 96 |
+ project=project,
|
|
| 97 |
+ args=['--debug', '--verbose', 'build', 'element3.bst'],
|
|
| 98 |
+ env={'PATH': str(tmp_path.joinpath('bin'))})
|
|
| 99 |
+ result.assert_task_error(ErrorDomain.SANDBOX, 'unavailable-local-sandbox')
|
|
| 100 |
+ assert "too old" in result.stderr
|
| ... | ... | @@ -13,7 +13,7 @@ import pytest_cov |
| 13 | 13 |
from buildstream import _yaml
|
| 14 | 14 |
from buildstream._artifactcache.casserver import create_server
|
| 15 | 15 |
from buildstream._context import Context
|
| 16 |
-from buildstream._exceptions import ArtifactError
|
|
| 16 |
+from buildstream._exceptions import CASError
|
|
| 17 | 17 |
from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
| 18 | 18 |
|
| 19 | 19 |
|
| ... | ... | @@ -48,7 +48,7 @@ class ArtifactShare(): |
| 48 | 48 |
context = Context()
|
| 49 | 49 |
context.artifactdir = self.repodir
|
| 50 | 50 |
|
| 51 |
- self.cas = context.artifactcache
|
|
| 51 |
+ self.cas = context.artifactcache.cas
|
|
| 52 | 52 |
|
| 53 | 53 |
self.total_space = total_space
|
| 54 | 54 |
self.free_space = free_space
|
| ... | ... | @@ -135,7 +135,7 @@ class ArtifactShare(): |
| 135 | 135 |
try:
|
| 136 | 136 |
tree = self.cas.resolve_ref(artifact_key)
|
| 137 | 137 |
return True
|
| 138 |
- except ArtifactError:
|
|
| 138 |
+ except CASError:
|
|
| 139 | 139 |
return False
|
| 140 | 140 |
|
| 141 | 141 |
# close():
|
| 1 |
+from contextlib import contextmanager
|
|
| 2 |
+import os
|
|
| 3 |
+ |
|
| 4 |
+ |
|
| 5 |
+# MockAttributeResult
|
|
| 6 |
+#
|
|
| 7 |
+# A class to take a dictionary of kwargs and make them accessible via
|
|
| 8 |
+# attributes of the object.
|
|
| 9 |
+#
|
|
| 10 |
+class MockAttributeResult(dict):
|
|
| 11 |
+ __getattr__ = dict.get
|
|
| 12 |
+ |
|
| 13 |
+ |
|
| 14 |
+# mock_statvfs():
|
|
| 15 |
+#
|
|
| 16 |
+# Gets a function which mocks statvfs and returns a statvfs result with the kwargs accessible.
|
|
| 17 |
+#
|
|
| 18 |
+# Returns:
|
|
| 19 |
+# func(path) -> object: object will have all the kwargs accessible via object.kwarg
|
|
| 20 |
+#
|
|
| 21 |
+# Example:
|
|
| 22 |
+# statvfs = mock_statvfs(f_blocks=10)
|
|
| 23 |
+# result = statvfs("regardless/of/path")
|
|
| 24 |
+# assert result.f_blocks == 10 # True
|
|
| 25 |
+def mock_statvfs(**kwargs):
|
|
| 26 |
+ def statvfs(path):
|
|
| 27 |
+ return MockAttributeResult(kwargs)
|
|
| 28 |
+ return statvfs
|
|
| 29 |
+ |
|
| 30 |
+ |
|
| 31 |
+# monkey_patch()
|
|
| 32 |
+#
|
|
| 33 |
+# with monkey_patch("statvfs", custom_statvfs):
|
|
| 34 |
+# assert os.statvfs == custom_statvfs # True
|
|
| 35 |
+# assert os.statvfs == custom_statvfs # False
|
|
| 36 |
+#
|
|
| 37 |
+@contextmanager
|
|
| 38 |
+def monkey_patch(to_patch, patched_func):
|
|
| 39 |
+ orig = getattr(os, to_patch)
|
|
| 40 |
+ setattr(os, to_patch, patched_func)
|
|
| 41 |
+ try:
|
|
| 42 |
+ yield
|
|
| 43 |
+ finally:
|
|
| 44 |
+ setattr(os, to_patch, orig)
|
| 1 |
+from buildstream import _yaml
|
|
| 2 |
+from ..testutils import mock_os
|
|
| 3 |
+from ..testutils.runcli import cli
|
|
| 4 |
+ |
|
| 5 |
+import os
|
|
| 6 |
+import pytest
|
|
| 7 |
+ |
|
| 8 |
+ |
|
| 9 |
+KiB = 1024
|
|
| 10 |
+MiB = (KiB * 1024)
|
|
| 11 |
+GiB = (MiB * 1024)
|
|
| 12 |
+TiB = (GiB * 1024)
|
|
| 13 |
+ |
|
| 14 |
+ |
|
| 15 |
+def test_parse_size_over_1024T(cli, tmpdir):
|
|
| 16 |
+ BLOCK_SIZE = 4096
|
|
| 17 |
+ cli.configure({
|
|
| 18 |
+ 'cache': {
|
|
| 19 |
+ 'quota': 2048 * TiB
|
|
| 20 |
+ }
|
|
| 21 |
+ })
|
|
| 22 |
+ project = tmpdir.join("main")
|
|
| 23 |
+ os.makedirs(str(project))
|
|
| 24 |
+ _yaml.dump({'name': 'main'}, str(project.join("project.conf")))
|
|
| 25 |
+ |
|
| 26 |
+ bavail = (1025 * TiB) / BLOCK_SIZE
|
|
| 27 |
+ patched_statvfs = mock_os.mock_statvfs(f_bavail=bavail, f_bsize=BLOCK_SIZE)
|
|
| 28 |
+ with mock_os.monkey_patch("statvfs", patched_statvfs):
|
|
| 29 |
+ result = cli.run(project, args=["build", "file.bst"])
|
|
| 30 |
+ assert "1025T of available system storage" in result.stderr
|
