[Notes] [Git][BuildStream/buildstream][tpollard/494] 12 commits: dev-requirements: Update pytest dependency to 3.8



Title: GitLab

Tom Pollard pushed to branch tpollard/494 at BuildStream / buildstream

Commits:

23 changed files:

Changes:

  • NEWS
    ... ... @@ -38,13 +38,23 @@ buildstream 1.3.1
    38 38
         a bug fix to workspaces so they can be build in workspaces too.
    
    39 39
     
    
    40 40
       o Creating a build shell through the interactive mode or `bst shell --build`
    
    41
    -    will now use the cached build tree. It is now easier to debug local build
    
    42
    -    failures.
    
    41
    +    will now use the cached buildtree if available locally. It is now easier to
    
    42
    +    debug local build failures.
    
    43 43
     
    
    44 44
       o `bst shell --sysroot` now takes any directory that contains a sysroot,
    
    45 45
         instead of just a specially-formatted build-root with a `root` and `scratch`
    
    46 46
         subdirectory.
    
    47 47
     
    
    48
    +  o Due to the element `build tree` being cached in the respective artifact their
    
    49
    +    size in some cases has significantly increased. In *most* cases the build tree
    
    50
    +    is not utilised when building targets, as such by default bst 'pull' & 'build'
    
    51
    +    will not fetch buildtrees from remotes. This behaviour can be overriden with
    
    52
    +    the cli main option '--pull-build-trees', or the user configuration option
    
    53
    +    'pullbuildtrees = True'. The override will also add the build tree to already
    
    54
    +    cached artifacts. When attempting to populate an artifactcache server with
    
    55
    +    cached artifacts, only 'complete' elements can be pushed. If the element is
    
    56
    +    expected to have a populated build tree then it must be cached before pushing.
    
    57
    +
    
    48 58
     
    
    49 59
     =================
    
    50 60
     buildstream 1.1.5
    

  • buildstream/_artifactcache/artifactcache.py
    ... ... @@ -17,17 +17,22 @@
    17 17
     #  Authors:
    
    18 18
     #        Tristan Maat <tristan maat codethink co uk>
    
    19 19
     
    
    20
    +import multiprocessing
    
    20 21
     import os
    
    22
    +import signal
    
    21 23
     import string
    
    22 24
     from collections import namedtuple
    
    23 25
     from collections.abc import Mapping
    
    24 26
     
    
    25 27
     from ..types import _KeyStrength
    
    26
    -from .._exceptions import ArtifactError, ImplError, LoadError, LoadErrorReason
    
    28
    +from .._exceptions import ArtifactError, CASError, LoadError, LoadErrorReason
    
    27 29
     from .._message import Message, MessageType
    
    30
    +from .. import _signals
    
    28 31
     from .. import utils
    
    29 32
     from .. import _yaml
    
    30 33
     
    
    34
    +from .cascache import CASCache, CASRemote
    
    35
    +
    
    31 36
     
    
    32 37
     CACHE_SIZE_FILE = "cache_size"
    
    33 38
     
    
    ... ... @@ -93,7 +98,8 @@ class ArtifactCache():
    93 98
         def __init__(self, context):
    
    94 99
             self.context = context
    
    95 100
             self.extractdir = os.path.join(context.artifactdir, 'extract')
    
    96
    -        self.tmpdir = os.path.join(context.artifactdir, 'tmp')
    
    101
    +
    
    102
    +        self.cas = CASCache(context.artifactdir)
    
    97 103
     
    
    98 104
             self.global_remote_specs = []
    
    99 105
             self.project_remote_specs = {}
    
    ... ... @@ -104,12 +110,15 @@ class ArtifactCache():
    104 110
             self._cache_lower_threshold = None    # The target cache size for a cleanup
    
    105 111
             self._remotes_setup = False           # Check to prevent double-setup of remotes
    
    106 112
     
    
    113
    +        # Per-project list of _CASRemote instances.
    
    114
    +        self._remotes = {}
    
    115
    +
    
    116
    +        self._has_fetch_remotes = False
    
    117
    +        self._has_push_remotes = False
    
    118
    +
    
    107 119
             os.makedirs(self.extractdir, exist_ok=True)
    
    108
    -        os.makedirs(self.tmpdir, exist_ok=True)
    
    109 120
     
    
    110
    -    ################################################
    
    111
    -    #  Methods implemented on the abstract class   #
    
    112
    -    ################################################
    
    121
    +        self._calculate_cache_quota()
    
    113 122
     
    
    114 123
         # get_artifact_fullname()
    
    115 124
         #
    
    ... ... @@ -240,8 +249,10 @@ class ArtifactCache():
    240 249
                 for key in (strong_key, weak_key):
    
    241 250
                     if key:
    
    242 251
                         try:
    
    243
    -                        self.update_mtime(element, key)
    
    244
    -                    except ArtifactError:
    
    252
    +                        ref = self.get_artifact_fullname(element, key)
    
    253
    +
    
    254
    +                        self.cas.update_mtime(ref)
    
    255
    +                    except CASError:
    
    245 256
                             pass
    
    246 257
     
    
    247 258
         # clean():
    
    ... ... @@ -252,7 +263,7 @@ class ArtifactCache():
    252 263
         #    (int): The size of the cache after having cleaned up
    
    253 264
         #
    
    254 265
         def clean(self):
    
    255
    -        artifacts = self.list_artifacts()  # pylint: disable=assignment-from-no-return
    
    266
    +        artifacts = self.list_artifacts()
    
    256 267
     
    
    257 268
             # Build a set of the cache keys which are required
    
    258 269
             # based on the required elements at cleanup time
    
    ... ... @@ -294,7 +305,7 @@ class ArtifactCache():
    294 305
                 if key not in required_artifacts:
    
    295 306
     
    
    296 307
                     # Remove the actual artifact, if it's not required.
    
    297
    -                size = self.remove(to_remove)  # pylint: disable=assignment-from-no-return
    
    308
    +                size = self.remove(to_remove)
    
    298 309
     
    
    299 310
                     # Remove the size from the removed size
    
    300 311
                     self.set_cache_size(self._cache_size - size)
    
    ... ... @@ -311,7 +322,7 @@ class ArtifactCache():
    311 322
         #    (int): The size of the artifact cache.
    
    312 323
         #
    
    313 324
         def compute_cache_size(self):
    
    314
    -        self._cache_size = self.calculate_cache_size()  # pylint: disable=assignment-from-no-return
    
    325
    +        self._cache_size = self.cas.calculate_cache_size()
    
    315 326
     
    
    316 327
             return self._cache_size
    
    317 328
     
    
    ... ... @@ -380,28 +391,12 @@ class ArtifactCache():
    380 391
         def has_quota_exceeded(self):
    
    381 392
             return self.get_cache_size() > self._cache_quota
    
    382 393
     
    
    383
    -    ################################################
    
    384
    -    # Abstract methods for subclasses to implement #
    
    385
    -    ################################################
    
    386
    -
    
    387 394
         # preflight():
    
    388 395
         #
    
    389 396
         # Preflight check.
    
    390 397
         #
    
    391 398
         def preflight(self):
    
    392
    -        pass
    
    393
    -
    
    394
    -    # update_mtime()
    
    395
    -    #
    
    396
    -    # Update the mtime of an artifact.
    
    397
    -    #
    
    398
    -    # Args:
    
    399
    -    #     element (Element): The Element to update
    
    400
    -    #     key (str): The key of the artifact.
    
    401
    -    #
    
    402
    -    def update_mtime(self, element, key):
    
    403
    -        raise ImplError("Cache '{kind}' does not implement update_mtime()"
    
    404
    -                        .format(kind=type(self).__name__))
    
    399
    +        self.cas.preflight()
    
    405 400
     
    
    406 401
         # initialize_remotes():
    
    407 402
         #
    
    ... ... @@ -411,7 +406,59 @@ class ArtifactCache():
    411 406
         #     on_failure (callable): Called if we fail to contact one of the caches.
    
    412 407
         #
    
    413 408
         def initialize_remotes(self, *, on_failure=None):
    
    414
    -        pass
    
    409
    +        remote_specs = self.global_remote_specs
    
    410
    +
    
    411
    +        for project in self.project_remote_specs:
    
    412
    +            remote_specs += self.project_remote_specs[project]
    
    413
    +
    
    414
    +        remote_specs = list(utils._deduplicate(remote_specs))
    
    415
    +
    
    416
    +        remotes = {}
    
    417
    +        q = multiprocessing.Queue()
    
    418
    +        for remote_spec in remote_specs:
    
    419
    +            # Use subprocess to avoid creation of gRPC threads in main BuildStream process
    
    420
    +            # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
    
    421
    +            p = multiprocessing.Process(target=self.cas.initialize_remote, args=(remote_spec, q))
    
    422
    +
    
    423
    +            try:
    
    424
    +                # Keep SIGINT blocked in the child process
    
    425
    +                with _signals.blocked([signal.SIGINT], ignore=False):
    
    426
    +                    p.start()
    
    427
    +
    
    428
    +                error = q.get()
    
    429
    +                p.join()
    
    430
    +            except KeyboardInterrupt:
    
    431
    +                utils._kill_process_tree(p.pid)
    
    432
    +                raise
    
    433
    +
    
    434
    +            if error and on_failure:
    
    435
    +                on_failure(remote_spec.url, error)
    
    436
    +            elif error:
    
    437
    +                raise ArtifactError(error)
    
    438
    +            else:
    
    439
    +                self._has_fetch_remotes = True
    
    440
    +                if remote_spec.push:
    
    441
    +                    self._has_push_remotes = True
    
    442
    +
    
    443
    +                remotes[remote_spec.url] = CASRemote(remote_spec)
    
    444
    +
    
    445
    +        for project in self.context.get_projects():
    
    446
    +            remote_specs = self.global_remote_specs
    
    447
    +            if project in self.project_remote_specs:
    
    448
    +                remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project]))
    
    449
    +
    
    450
    +            project_remotes = []
    
    451
    +
    
    452
    +            for remote_spec in remote_specs:
    
    453
    +                # Errors are already handled in the loop above,
    
    454
    +                # skip unreachable remotes here.
    
    455
    +                if remote_spec.url not in remotes:
    
    456
    +                    continue
    
    457
    +
    
    458
    +                remote = remotes[remote_spec.url]
    
    459
    +                project_remotes.append(remote)
    
    460
    +
    
    461
    +            self._remotes[project] = project_remotes
    
    415 462
     
    
    416 463
         # contains():
    
    417 464
         #
    
    ... ... @@ -425,8 +472,25 @@ class ArtifactCache():
    425 472
         # Returns: True if the artifact is in the cache, False otherwise
    
    426 473
         #
    
    427 474
         def contains(self, element, key):
    
    428
    -        raise ImplError("Cache '{kind}' does not implement contains()"
    
    429
    -                        .format(kind=type(self).__name__))
    
    475
    +        ref = self.get_artifact_fullname(element, key)
    
    476
    +
    
    477
    +        return self.cas.contains(ref)
    
    478
    +
    
    479
    +    # contains_subdir_artifact():
    
    480
    +    #
    
    481
    +    # Check whether an artifact element contains a digest for a subdir
    
    482
    +    # which is populated in the cache, i.e non dangling.
    
    483
    +    #
    
    484
    +    # Args:
    
    485
    +    #     element (Element): The Element to check
    
    486
    +    #     key (str): The cache key to use
    
    487
    +    #     subdir (str): The subdir to check
    
    488
    +    #
    
    489
    +    # Returns: True if the subdir exists & is populated in the cache, False otherwise
    
    490
    +    #
    
    491
    +    def contains_subdir_artifact(self, element, key, subdir):
    
    492
    +        ref = self.get_artifact_fullname(element, key)
    
    493
    +        return self.cas.contains_subdir_artifact(ref, subdir)
    
    430 494
     
    
    431 495
         # list_artifacts():
    
    432 496
         #
    
    ... ... @@ -437,8 +501,7 @@ class ArtifactCache():
    437 501
         #               `ArtifactCache.get_artifact_fullname` in LRU order
    
    438 502
         #
    
    439 503
         def list_artifacts(self):
    
    440
    -        raise ImplError("Cache '{kind}' does not implement list_artifacts()"
    
    441
    -                        .format(kind=type(self).__name__))
    
    504
    +        return self.cas.list_refs()
    
    442 505
     
    
    443 506
         # remove():
    
    444 507
         #
    
    ... ... @@ -450,9 +513,31 @@ class ArtifactCache():
    450 513
         #                          generated by
    
    451 514
         #                          `ArtifactCache.get_artifact_fullname`)
    
    452 515
         #
    
    453
    -    def remove(self, artifact_name):
    
    454
    -        raise ImplError("Cache '{kind}' does not implement remove()"
    
    455
    -                        .format(kind=type(self).__name__))
    
    516
    +    # Returns:
    
    517
    +    #    (int|None) The amount of space pruned from the repository in
    
    518
    +    #               Bytes, or None if defer_prune is True
    
    519
    +    #
    
    520
    +    def remove(self, ref):
    
    521
    +
    
    522
    +        # Remove extract if not used by other ref
    
    523
    +        tree = self.cas.resolve_ref(ref)
    
    524
    +        ref_name, ref_hash = os.path.split(ref)
    
    525
    +        extract = os.path.join(self.extractdir, ref_name, tree.hash)
    
    526
    +        keys_file = os.path.join(extract, 'meta', 'keys.yaml')
    
    527
    +        if os.path.exists(keys_file):
    
    528
    +            keys_meta = _yaml.load(keys_file)
    
    529
    +            keys = [keys_meta['strong'], keys_meta['weak']]
    
    530
    +            remove_extract = True
    
    531
    +            for other_hash in keys:
    
    532
    +                if other_hash == ref_hash:
    
    533
    +                    continue
    
    534
    +                remove_extract = False
    
    535
    +                break
    
    536
    +
    
    537
    +            if remove_extract:
    
    538
    +                utils._force_rmtree(extract)
    
    539
    +
    
    540
    +        return self.cas.remove(ref)
    
    456 541
     
    
    457 542
         # extract():
    
    458 543
         #
    
    ... ... @@ -464,6 +549,7 @@ class ArtifactCache():
    464 549
         # Args:
    
    465 550
         #     element (Element): The Element to extract
    
    466 551
         #     key (str): The cache key to use
    
    552
    +    #     subdir (str): Optional specific subdir to extract
    
    467 553
         #
    
    468 554
         # Raises:
    
    469 555
         #     ArtifactError: In cases there was an OSError, or if the artifact
    
    ... ... @@ -471,9 +557,12 @@ class ArtifactCache():
    471 557
         #
    
    472 558
         # Returns: path to extracted artifact
    
    473 559
         #
    
    474
    -    def extract(self, element, key):
    
    475
    -        raise ImplError("Cache '{kind}' does not implement extract()"
    
    476
    -                        .format(kind=type(self).__name__))
    
    560
    +    def extract(self, element, key, subdir=None):
    
    561
    +        ref = self.get_artifact_fullname(element, key)
    
    562
    +
    
    563
    +        path = os.path.join(self.extractdir, element._get_project().name, element.normal_name)
    
    564
    +
    
    565
    +        return self.cas.extract(ref, path, subdir=subdir)
    
    477 566
     
    
    478 567
         # commit():
    
    479 568
         #
    
    ... ... @@ -485,8 +574,9 @@ class ArtifactCache():
    485 574
         #     keys (list): The cache keys to use
    
    486 575
         #
    
    487 576
         def commit(self, element, content, keys):
    
    488
    -        raise ImplError("Cache '{kind}' does not implement commit()"
    
    489
    -                        .format(kind=type(self).__name__))
    
    577
    +        refs = [self.get_artifact_fullname(element, key) for key in keys]
    
    578
    +
    
    579
    +        self.cas.commit(refs, content)
    
    490 580
     
    
    491 581
         # diff():
    
    492 582
         #
    
    ... ... @@ -500,8 +590,10 @@ class ArtifactCache():
    500 590
         #     subdir (str): A subdirectory to limit the comparison to
    
    501 591
         #
    
    502 592
         def diff(self, element, key_a, key_b, *, subdir=None):
    
    503
    -        raise ImplError("Cache '{kind}' does not implement diff()"
    
    504
    -                        .format(kind=type(self).__name__))
    
    593
    +        ref_a = self.get_artifact_fullname(element, key_a)
    
    594
    +        ref_b = self.get_artifact_fullname(element, key_b)
    
    595
    +
    
    596
    +        return self.cas.diff(ref_a, ref_b, subdir=subdir)
    
    505 597
     
    
    506 598
         # has_fetch_remotes():
    
    507 599
         #
    
    ... ... @@ -513,7 +605,16 @@ class ArtifactCache():
    513 605
         # Returns: True if any remote repositories are configured, False otherwise
    
    514 606
         #
    
    515 607
         def has_fetch_remotes(self, *, element=None):
    
    516
    -        return False
    
    608
    +        if not self._has_fetch_remotes:
    
    609
    +            # No project has fetch remotes
    
    610
    +            return False
    
    611
    +        elif element is None:
    
    612
    +            # At least one (sub)project has fetch remotes
    
    613
    +            return True
    
    614
    +        else:
    
    615
    +            # Check whether the specified element's project has fetch remotes
    
    616
    +            remotes_for_project = self._remotes[element._get_project()]
    
    617
    +            return bool(remotes_for_project)
    
    517 618
     
    
    518 619
         # has_push_remotes():
    
    519 620
         #
    
    ... ... @@ -525,7 +626,16 @@ class ArtifactCache():
    525 626
         # Returns: True if any remote repository is configured, False otherwise
    
    526 627
         #
    
    527 628
         def has_push_remotes(self, *, element=None):
    
    528
    -        return False
    
    629
    +        if not self._has_push_remotes:
    
    630
    +            # No project has push remotes
    
    631
    +            return False
    
    632
    +        elif element is None:
    
    633
    +            # At least one (sub)project has push remotes
    
    634
    +            return True
    
    635
    +        else:
    
    636
    +            # Check whether the specified element's project has push remotes
    
    637
    +            remotes_for_project = self._remotes[element._get_project()]
    
    638
    +            return any(remote.spec.push for remote in remotes_for_project)
    
    529 639
     
    
    530 640
         # push():
    
    531 641
         #
    
    ... ... @@ -542,8 +652,28 @@ class ArtifactCache():
    542 652
         #   (ArtifactError): if there was an error
    
    543 653
         #
    
    544 654
         def push(self, element, keys):
    
    545
    -        raise ImplError("Cache '{kind}' does not implement push()"
    
    546
    -                        .format(kind=type(self).__name__))
    
    655
    +        refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
    
    656
    +
    
    657
    +        project = element._get_project()
    
    658
    +
    
    659
    +        push_remotes = [r for r in self._remotes[project] if r.spec.push]
    
    660
    +
    
    661
    +        pushed = False
    
    662
    +
    
    663
    +        for remote in push_remotes:
    
    664
    +            remote.init()
    
    665
    +            display_key = element._get_brief_display_key()
    
    666
    +            element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
    
    667
    +
    
    668
    +            if self.cas.push(refs, remote):
    
    669
    +                element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
    
    670
    +                pushed = True
    
    671
    +            else:
    
    672
    +                element.info("Remote ({}) already has {} cached".format(
    
    673
    +                    remote.spec.url, element._get_brief_display_key()
    
    674
    +                ))
    
    675
    +
    
    676
    +        return pushed
    
    547 677
     
    
    548 678
         # pull():
    
    549 679
         #
    
    ... ... @@ -553,13 +683,142 @@ class ArtifactCache():
    553 683
         #     element (Element): The Element whose artifact is to be fetched
    
    554 684
         #     key (str): The cache key to use
    
    555 685
         #     progress (callable): The progress callback, if any
    
    686
    +    #     subdir (str): The optional specific subdir to pull
    
    687
    +    #     excluded_subdirs (list): The optional list of subdirs to not pull
    
    556 688
         #
    
    557 689
         # Returns:
    
    558 690
         #   (bool): True if pull was successful, False if artifact was not available
    
    559 691
         #
    
    560
    -    def pull(self, element, key, *, progress=None):
    
    561
    -        raise ImplError("Cache '{kind}' does not implement pull()"
    
    562
    -                        .format(kind=type(self).__name__))
    
    692
    +    def pull(self, element, key, *, progress=None, subdir=None, excluded_subdirs=None):
    
    693
    +        ref = self.get_artifact_fullname(element, key)
    
    694
    +
    
    695
    +        project = element._get_project()
    
    696
    +
    
    697
    +        for remote in self._remotes[project]:
    
    698
    +            try:
    
    699
    +                display_key = element._get_brief_display_key()
    
    700
    +                element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url))
    
    701
    +
    
    702
    +                if self.cas.pull(ref, remote, progress=progress, subdir=subdir, excluded_subdirs=excluded_subdirs):
    
    703
    +                    element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
    
    704
    +                    if subdir:
    
    705
    +                        # Attempt to extract subdir into artifact extract dir if it already exists
    
    706
    +                        # without containing the subdir. If the respective artifact extract dir does not
    
    707
    +                        # exist a complete extraction will complete.
    
    708
    +                        self.extract(element, key, subdir)
    
    709
    +                    # no need to pull from additional remotes
    
    710
    +                    return True
    
    711
    +                else:
    
    712
    +                    element.info("Remote ({}) does not have {} cached".format(
    
    713
    +                        remote.spec.url, element._get_brief_display_key()
    
    714
    +                    ))
    
    715
    +
    
    716
    +            except CASError as e:
    
    717
    +                raise ArtifactError("Failed to pull artifact {}: {}".format(
    
    718
    +                    element._get_brief_display_key(), e)) from e
    
    719
    +
    
    720
    +        return False
    
    721
    +
    
    722
    +    # pull_tree():
    
    723
    +    #
    
    724
    +    # Pull a single Tree rather than an artifact.
    
    725
    +    # Does not update local refs.
    
    726
    +    #
    
    727
    +    # Args:
    
    728
    +    #     project (Project): The current project
    
    729
    +    #     digest (Digest): The digest of the tree
    
    730
    +    #
    
    731
    +    def pull_tree(self, project, digest):
    
    732
    +        for remote in self._remotes[project]:
    
    733
    +            digest = self.cas.pull_tree(remote, digest)
    
    734
    +
    
    735
    +            if digest:
    
    736
    +                # no need to pull from additional remotes
    
    737
    +                return digest
    
    738
    +
    
    739
    +        return None
    
    740
    +
    
    741
    +    # push_directory():
    
    742
    +    #
    
    743
    +    # Push the given virtual directory to all remotes.
    
    744
    +    #
    
    745
    +    # Args:
    
    746
    +    #     project (Project): The current project
    
    747
    +    #     directory (Directory): A virtual directory object to push.
    
    748
    +    #
    
    749
    +    # Raises:
    
    750
    +    #     (ArtifactError): if there was an error
    
    751
    +    #
    
    752
    +    def push_directory(self, project, directory):
    
    753
    +        if self._has_push_remotes:
    
    754
    +            push_remotes = [r for r in self._remotes[project] if r.spec.push]
    
    755
    +        else:
    
    756
    +            push_remotes = []
    
    757
    +
    
    758
    +        if not push_remotes:
    
    759
    +            raise ArtifactError("push_directory was called, but no remote artifact " +
    
    760
    +                                "servers are configured as push remotes.")
    
    761
    +
    
    762
    +        if directory.ref is None:
    
    763
    +            return
    
    764
    +
    
    765
    +        for remote in push_remotes:
    
    766
    +            self.cas.push_directory(remote, directory)
    
    767
    +
    
    768
    +    # push_message():
    
    769
    +    #
    
    770
    +    # Push the given protobuf message to all remotes.
    
    771
    +    #
    
    772
    +    # Args:
    
    773
    +    #     project (Project): The current project
    
    774
    +    #     message (Message): A protobuf message to push.
    
    775
    +    #
    
    776
    +    # Raises:
    
    777
    +    #     (ArtifactError): if there was an error
    
    778
    +    #
    
    779
    +    def push_message(self, project, message):
    
    780
    +
    
    781
    +        if self._has_push_remotes:
    
    782
    +            push_remotes = [r for r in self._remotes[project] if r.spec.push]
    
    783
    +        else:
    
    784
    +            push_remotes = []
    
    785
    +
    
    786
    +        if not push_remotes:
    
    787
    +            raise ArtifactError("push_message was called, but no remote artifact " +
    
    788
    +                                "servers are configured as push remotes.")
    
    789
    +
    
    790
    +        for remote in push_remotes:
    
    791
    +            message_digest = self.cas.push_message(remote, message)
    
    792
    +
    
    793
    +        return message_digest
    
    794
    +
    
    795
    +    # verify_digest_pushed():
    
    796
    +    #
    
    797
    +    # Check whether the object is already on the server in which case
    
    798
    +    # there is no need to upload it.
    
    799
    +    #
    
    800
    +    # Args:
    
    801
    +    #     project (Project): The current project
    
    802
    +    #     digest (Digest): The object digest.
    
    803
    +    #
    
    804
    +    def verify_digest_pushed(self, project, digest):
    
    805
    +
    
    806
    +        if self._has_push_remotes:
    
    807
    +            push_remotes = [r for r in self._remotes[project] if r.spec.push]
    
    808
    +        else:
    
    809
    +            push_remotes = []
    
    810
    +
    
    811
    +        if not push_remotes:
    
    812
    +            raise ArtifactError("verify_digest_pushed was called, but no remote artifact " +
    
    813
    +                                "servers are configured as push remotes.")
    
    814
    +
    
    815
    +        pushed = False
    
    816
    +
    
    817
    +        for remote in push_remotes:
    
    818
    +            if self.cas.verify_digest_on_remote(remote, digest):
    
    819
    +                pushed = True
    
    820
    +
    
    821
    +        return pushed
    
    563 822
     
    
    564 823
         # link_key():
    
    565 824
         #
    
    ... ... @@ -571,19 +830,10 @@ class ArtifactCache():
    571 830
         #     newkey (str): A new cache key for the artifact
    
    572 831
         #
    
    573 832
         def link_key(self, element, oldkey, newkey):
    
    574
    -        raise ImplError("Cache '{kind}' does not implement link_key()"
    
    575
    -                        .format(kind=type(self).__name__))
    
    833
    +        oldref = self.get_artifact_fullname(element, oldkey)
    
    834
    +        newref = self.get_artifact_fullname(element, newkey)
    
    576 835
     
    
    577
    -    # calculate_cache_size()
    
    578
    -    #
    
    579
    -    # Return the real artifact cache size.
    
    580
    -    #
    
    581
    -    # Returns:
    
    582
    -    #    (int): The size of the artifact cache.
    
    583
    -    #
    
    584
    -    def calculate_cache_size(self):
    
    585
    -        raise ImplError("Cache '{kind}' does not implement calculate_cache_size()"
    
    586
    -                        .format(kind=type(self).__name__))
    
    836
    +        self.cas.link_ref(oldref, newref)
    
    587 837
     
    
    588 838
         ################################################
    
    589 839
         #               Local Private Methods          #
    

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -20,9 +20,7 @@
    20 20
     import hashlib
    
    21 21
     import itertools
    
    22 22
     import io
    
    23
    -import multiprocessing
    
    24 23
     import os
    
    25
    -import signal
    
    26 24
     import stat
    
    27 25
     import tempfile
    
    28 26
     import uuid
    
    ... ... @@ -31,17 +29,13 @@ from urllib.parse import urlparse
    31 29
     
    
    32 30
     import grpc
    
    33 31
     
    
    34
    -from .. import _yaml
    
    35
    -
    
    36 32
     from .._protos.google.rpc import code_pb2
    
    37 33
     from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    38 34
     from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    39 35
     from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
    
    40 36
     
    
    41
    -from .. import _signals, utils
    
    42
    -from .._exceptions import ArtifactError
    
    43
    -
    
    44
    -from . import ArtifactCache
    
    37
    +from .. import utils
    
    38
    +from .._exceptions import CASError
    
    45 39
     
    
    46 40
     
    
    47 41
     # The default limit for gRPC messages is 4 MiB.
    
    ... ... @@ -49,62 +43,101 @@ from . import ArtifactCache
    49 43
     _MAX_PAYLOAD_BYTES = 1024 * 1024
    
    50 44
     
    
    51 45
     
    
    52
    -# A CASCache manages artifacts in a CAS repository as specified in the
    
    53
    -# Remote Execution API.
    
    46
    +# A CASCache manages a CAS repository as specified in the Remote Execution API.
    
    54 47
     #
    
    55 48
     # Args:
    
    56
    -#     context (Context): The BuildStream context
    
    49
    +#     path (str): The root directory for the CAS repository
    
    57 50
     #
    
    58
    -# Pushing is explicitly disabled by the platform in some cases,
    
    59
    -# like when we are falling back to functioning without using
    
    60
    -# user namespaces.
    
    61
    -#
    
    62
    -class CASCache(ArtifactCache):
    
    63
    -
    
    64
    -    def __init__(self, context):
    
    65
    -        super().__init__(context)
    
    51
    +class CASCache():
    
    66 52
     
    
    67
    -        self.casdir = os.path.join(context.artifactdir, 'cas')
    
    53
    +    def __init__(self, path):
    
    54
    +        self.casdir = os.path.join(path, 'cas')
    
    55
    +        self.tmpdir = os.path.join(path, 'tmp')
    
    68 56
             os.makedirs(os.path.join(self.casdir, 'refs', 'heads'), exist_ok=True)
    
    69 57
             os.makedirs(os.path.join(self.casdir, 'objects'), exist_ok=True)
    
    58
    +        os.makedirs(self.tmpdir, exist_ok=True)
    
    70 59
     
    
    71
    -        self._calculate_cache_quota()
    
    72
    -
    
    73
    -        # Per-project list of _CASRemote instances.
    
    74
    -        self._remotes = {}
    
    75
    -
    
    76
    -        self._has_fetch_remotes = False
    
    77
    -        self._has_push_remotes = False
    
    78
    -
    
    79
    -    ################################################
    
    80
    -    #     Implementation of abstract methods       #
    
    81
    -    ################################################
    
    82
    -
    
    60
    +    # preflight():
    
    61
    +    #
    
    62
    +    # Preflight check.
    
    63
    +    #
    
    83 64
         def preflight(self):
    
    84 65
             headdir = os.path.join(self.casdir, 'refs', 'heads')
    
    85 66
             objdir = os.path.join(self.casdir, 'objects')
    
    86 67
             if not (os.path.isdir(headdir) and os.path.isdir(objdir)):
    
    87
    -            raise ArtifactError("CAS repository check failed for '{}'"
    
    88
    -                                .format(self.casdir))
    
    68
    +            raise CASError("CAS repository check failed for '{}'".format(self.casdir))
    
    89 69
     
    
    90
    -    def contains(self, element, key):
    
    91
    -        refpath = self._refpath(self.get_artifact_fullname(element, key))
    
    70
    +    # contains():
    
    71
    +    #
    
    72
    +    # Check whether the specified ref is already available in the local CAS cache.
    
    73
    +    #
    
    74
    +    # Args:
    
    75
    +    #     ref (str): The ref to check
    
    76
    +    #
    
    77
    +    # Returns: True if the ref is in the cache, False otherwise
    
    78
    +    #
    
    79
    +    def contains(self, ref):
    
    80
    +        refpath = self._refpath(ref)
    
    92 81
     
    
    93 82
             # This assumes that the repository doesn't have any dangling pointers
    
    94 83
             return os.path.exists(refpath)
    
    95 84
     
    
    96
    -    def extract(self, element, key):
    
    97
    -        ref = self.get_artifact_fullname(element, key)
    
    85
    +    # contains_subdir_artifact():
    
    86
    +    #
    
    87
    +    # Check whether the specified artifact element tree has a digest for a subdir
    
    88
    +    # which is populated in the cache, i.e non dangling.
    
    89
    +    #
    
    90
    +    # Args:
    
    91
    +    #     ref (str): The ref to check
    
    92
    +    #     subdir (str): The subdir to check
    
    93
    +    #
    
    94
    +    # Returns: True if the subdir exists & is populated in the cache, False otherwise
    
    95
    +    #
    
    96
    +    #
    
    97
    +    def contains_subdir_artifact(self, ref, subdir):
    
    98
    +        tree = self.resolve_ref(ref)
    
    99
    +
    
    100
    +        # This assumes that the subdir digest is present in the element tree
    
    101
    +        subdirdigest = self._get_subdir(tree, subdir)
    
    102
    +        objpath = self.objpath(subdirdigest)
    
    103
    +
    
    104
    +        # True if subdir content is cached or if empty as expected
    
    105
    +        return os.path.exists(objpath)
    
    98 106
     
    
    107
    +    # extract():
    
    108
    +    #
    
    109
    +    # Extract cached directory for the specified ref if it hasn't
    
    110
    +    # already been extracted.
    
    111
    +    #
    
    112
    +    # Args:
    
    113
    +    #     ref (str): The ref whose directory to extract
    
    114
    +    #     path (str): The destination path
    
    115
    +    #     subdir (str): Optional specific dir to extract
    
    116
    +    #
    
    117
    +    # Raises:
    
    118
    +    #     CASError: In cases there was an OSError, or if the ref did not exist.
    
    119
    +    #
    
    120
    +    # Returns: path to extracted directory
    
    121
    +    #
    
    122
    +    def extract(self, ref, path, subdir=None):
    
    99 123
             tree = self.resolve_ref(ref, update_mtime=True)
    
    100 124
     
    
    101
    -        dest = os.path.join(self.extractdir, element._get_project().name,
    
    102
    -                            element.normal_name, tree.hash)
    
    125
    +        elementdest = dest = os.path.join(path, tree.hash)
    
    126
    +
    
    127
    +        # If artifact is already extracted, check if the optional subdir
    
    128
    +        # has also been extracted. If the artifact has not been extracted
    
    129
    +        # a full extraction would include the optional subdir
    
    103 130
             if os.path.isdir(dest):
    
    104
    -            # artifact has already been extracted
    
    105
    -            return dest
    
    131
    +            if subdir:
    
    132
    +                if not os.path.isdir(os.path.join(dest, subdir)):
    
    133
    +                    dest = os.path.join(dest, subdir)
    
    134
    +                    tree = self._get_subdir(tree, subdir)
    
    135
    +                else:
    
    136
    +                    return dest
    
    137
    +            else:
    
    138
    +                return dest
    
    106 139
     
    
    107
    -        with tempfile.TemporaryDirectory(prefix='tmp', dir=self.extractdir) as tmpdir:
    
    140
    +        with tempfile.TemporaryDirectory(prefix='tmp', dir=self.tmpdir) as tmpdir:
    
    108 141
                 checkoutdir = os.path.join(tmpdir, ref)
    
    109 142
                 self._checkout(checkoutdir, tree)
    
    110 143
     
    
    ... ... @@ -118,23 +151,35 @@ class CASCache(ArtifactCache):
    118 151
                     # If rename fails with these errors, another process beat
    
    119 152
                     # us to it so just ignore.
    
    120 153
                     if e.errno not in [errno.ENOTEMPTY, errno.EEXIST]:
    
    121
    -                    raise ArtifactError("Failed to extract artifact for ref '{}': {}"
    
    122
    -                                        .format(ref, e)) from e
    
    123
    -
    
    124
    -        return dest
    
    154
    +                    raise CASError("Failed to extract directory for ref '{}': {}".format(ref, e)) from e
    
    125 155
     
    
    126
    -    def commit(self, element, content, keys):
    
    127
    -        refs = [self.get_artifact_fullname(element, key) for key in keys]
    
    156
    +        return elementdest
    
    128 157
     
    
    129
    -        tree = self._commit_directory(content)
    
    158
    +    # commit():
    
    159
    +    #
    
    160
    +    # Commit directory to cache.
    
    161
    +    #
    
    162
    +    # Args:
    
    163
    +    #     refs (list): The refs to set
    
    164
    +    #     path (str): The directory to import
    
    165
    +    #
    
    166
    +    def commit(self, refs, path):
    
    167
    +        tree = self._commit_directory(path)
    
    130 168
     
    
    131 169
             for ref in refs:
    
    132 170
                 self.set_ref(ref, tree)
    
    133 171
     
    
    134
    -    def diff(self, element, key_a, key_b, *, subdir=None):
    
    135
    -        ref_a = self.get_artifact_fullname(element, key_a)
    
    136
    -        ref_b = self.get_artifact_fullname(element, key_b)
    
    137
    -
    
    172
    +    # diff():
    
    173
    +    #
    
    174
    +    # Return a list of files that have been added or modified between
    
    175
    +    # the refs described by ref_a and ref_b.
    
    176
    +    #
    
    177
    +    # Args:
    
    178
    +    #     ref_a (str): The first ref
    
    179
    +    #     ref_b (str): The second ref
    
    180
    +    #     subdir (str): A subdirectory to limit the comparison to
    
    181
    +    #
    
    182
    +    def diff(self, ref_a, ref_b, *, subdir=None):
    
    138 183
             tree_a = self.resolve_ref(ref_a)
    
    139 184
             tree_b = self.resolve_ref(ref_b)
    
    140 185
     
    
    ... ... @@ -150,158 +195,129 @@ class CASCache(ArtifactCache):
    150 195
     
    
    151 196
             return modified, removed, added
    
    152 197
     
    
    153
    -    def initialize_remotes(self, *, on_failure=None):
    
    154
    -        remote_specs = self.global_remote_specs
    
    155
    -
    
    156
    -        for project in self.project_remote_specs:
    
    157
    -            remote_specs += self.project_remote_specs[project]
    
    158
    -
    
    159
    -        remote_specs = list(utils._deduplicate(remote_specs))
    
    160
    -
    
    161
    -        remotes = {}
    
    162
    -        q = multiprocessing.Queue()
    
    163
    -        for remote_spec in remote_specs:
    
    164
    -            # Use subprocess to avoid creation of gRPC threads in main BuildStream process
    
    165
    -            # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
    
    166
    -            p = multiprocessing.Process(target=self._initialize_remote, args=(remote_spec, q))
    
    198
    +    def initialize_remote(self, remote_spec, q):
    
    199
    +        try:
    
    200
    +            remote = CASRemote(remote_spec)
    
    201
    +            remote.init()
    
    167 202
     
    
    168
    -            try:
    
    169
    -                # Keep SIGINT blocked in the child process
    
    170
    -                with _signals.blocked([signal.SIGINT], ignore=False):
    
    171
    -                    p.start()
    
    172
    -
    
    173
    -                error = q.get()
    
    174
    -                p.join()
    
    175
    -            except KeyboardInterrupt:
    
    176
    -                utils._kill_process_tree(p.pid)
    
    177
    -                raise
    
    203
    +            request = buildstream_pb2.StatusRequest()
    
    204
    +            response = remote.ref_storage.Status(request)
    
    178 205
     
    
    179
    -            if error and on_failure:
    
    180
    -                on_failure(remote_spec.url, error)
    
    181
    -            elif error:
    
    182
    -                raise ArtifactError(error)
    
    206
    +            if remote_spec.push and not response.allow_updates:
    
    207
    +                q.put('CAS server does not allow push')
    
    183 208
                 else:
    
    184
    -                self._has_fetch_remotes = True
    
    185
    -                if remote_spec.push:
    
    186
    -                    self._has_push_remotes = True
    
    209
    +                # No error
    
    210
    +                q.put(None)
    
    187 211
     
    
    188
    -                remotes[remote_spec.url] = _CASRemote(remote_spec)
    
    212
    +        except grpc.RpcError as e:
    
    213
    +            # str(e) is too verbose for errors reported to the user
    
    214
    +            q.put(e.details())
    
    189 215
     
    
    190
    -        for project in self.context.get_projects():
    
    191
    -            remote_specs = self.global_remote_specs
    
    192
    -            if project in self.project_remote_specs:
    
    193
    -                remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project]))
    
    216
    +        except Exception as e:               # pylint: disable=broad-except
    
    217
    +            # Whatever happens, we need to return it to the calling process
    
    218
    +            #
    
    219
    +            q.put(str(e))
    
    194 220
     
    
    195
    -            project_remotes = []
    
    221
    +    # pull():
    
    222
    +    #
    
    223
    +    # Pull a ref from a remote repository.
    
    224
    +    #
    
    225
    +    # Args:
    
    226
    +    #     ref (str): The ref to pull
    
    227
    +    #     remote (CASRemote): The remote repository to pull from
    
    228
    +    #     progress (callable): The progress callback, if any
    
    229
    +    #     subdir (str): The optional specific subdir to pull
    
    230
    +    #     excluded_subdirs (list): The optional list of subdirs to not pull
    
    231
    +    #
    
    232
    +    # Returns:
    
    233
    +    #   (bool): True if pull was successful, False if ref was not available
    
    234
    +    #
    
    235
    +    def pull(self, ref, remote, *, progress=None, subdir=None, excluded_subdirs=None):
    
    236
    +        try:
    
    237
    +            remote.init()
    
    196 238
     
    
    197
    -            for remote_spec in remote_specs:
    
    198
    -                # Errors are already handled in the loop above,
    
    199
    -                # skip unreachable remotes here.
    
    200
    -                if remote_spec.url not in remotes:
    
    201
    -                    continue
    
    239
    +            request = buildstream_pb2.GetReferenceRequest()
    
    240
    +            request.key = ref
    
    241
    +            response = remote.ref_storage.GetReference(request)
    
    202 242
     
    
    203
    -                remote = remotes[remote_spec.url]
    
    204
    -                project_remotes.append(remote)
    
    243
    +            tree = remote_execution_pb2.Digest()
    
    244
    +            tree.hash = response.digest.hash
    
    245
    +            tree.size_bytes = response.digest.size_bytes
    
    205 246
     
    
    206
    -            self._remotes[project] = project_remotes
    
    247
    +            # Check if the element artifact is present, if so just fetch the subdir.
    
    248
    +            if subdir and os.path.exists(self.objpath(tree)):
    
    249
    +                self._fetch_subdir(remote, tree, subdir)
    
    250
    +            else:
    
    251
    +                # Fetch artifact, excluded_subdirs determined in pullqueue
    
    252
    +                self._fetch_directory(remote, tree, excluded_subdirs=excluded_subdirs)
    
    207 253
     
    
    208
    -    def has_fetch_remotes(self, *, element=None):
    
    209
    -        if not self._has_fetch_remotes:
    
    210
    -            # No project has fetch remotes
    
    211
    -            return False
    
    212
    -        elif element is None:
    
    213
    -            # At least one (sub)project has fetch remotes
    
    214
    -            return True
    
    215
    -        else:
    
    216
    -            # Check whether the specified element's project has fetch remotes
    
    217
    -            remotes_for_project = self._remotes[element._get_project()]
    
    218
    -            return bool(remotes_for_project)
    
    254
    +            self.set_ref(ref, tree)
    
    219 255
     
    
    220
    -    def has_push_remotes(self, *, element=None):
    
    221
    -        if not self._has_push_remotes:
    
    222
    -            # No project has push remotes
    
    223
    -            return False
    
    224
    -        elif element is None:
    
    225
    -            # At least one (sub)project has push remotes
    
    226 256
                 return True
    
    227
    -        else:
    
    228
    -            # Check whether the specified element's project has push remotes
    
    229
    -            remotes_for_project = self._remotes[element._get_project()]
    
    230
    -            return any(remote.spec.push for remote in remotes_for_project)
    
    231
    -
    
    232
    -    def pull(self, element, key, *, progress=None):
    
    233
    -        ref = self.get_artifact_fullname(element, key)
    
    234
    -
    
    235
    -        project = element._get_project()
    
    236
    -
    
    237
    -        for remote in self._remotes[project]:
    
    238
    -            try:
    
    239
    -                remote.init()
    
    240
    -                display_key = element._get_brief_display_key()
    
    241
    -                element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url))
    
    242
    -
    
    243
    -                request = buildstream_pb2.GetReferenceRequest()
    
    244
    -                request.key = ref
    
    245
    -                response = remote.ref_storage.GetReference(request)
    
    246
    -
    
    247
    -                tree = remote_execution_pb2.Digest()
    
    248
    -                tree.hash = response.digest.hash
    
    249
    -                tree.size_bytes = response.digest.size_bytes
    
    250
    -
    
    251
    -                self._fetch_directory(remote, tree)
    
    252
    -
    
    253
    -                self.set_ref(ref, tree)
    
    254
    -
    
    255
    -                element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
    
    256
    -                # no need to pull from additional remotes
    
    257
    -                return True
    
    258
    -
    
    259
    -            except grpc.RpcError as e:
    
    260
    -                if e.code() != grpc.StatusCode.NOT_FOUND:
    
    261
    -                    raise ArtifactError("Failed to pull artifact {}: {}".format(
    
    262
    -                        element._get_brief_display_key(), e)) from e
    
    263
    -                else:
    
    264
    -                    element.info("Remote ({}) does not have {} cached".format(
    
    265
    -                        remote.spec.url, element._get_brief_display_key()
    
    266
    -                    ))
    
    267
    -
    
    268
    -        return False
    
    269
    -
    
    270
    -    def pull_tree(self, project, digest):
    
    271
    -        """ Pull a single Tree rather than an artifact.
    
    272
    -        Does not update local refs. """
    
    257
    +        except grpc.RpcError as e:
    
    258
    +            if e.code() != grpc.StatusCode.NOT_FOUND:
    
    259
    +                raise CASError("Failed to pull ref {}: {}".format(ref, e)) from e
    
    260
    +            else:
    
    261
    +                return False
    
    273 262
     
    
    274
    -        for remote in self._remotes[project]:
    
    275
    -            try:
    
    276
    -                remote.init()
    
    263
    +    # pull_tree():
    
    264
    +    #
    
    265
    +    # Pull a single Tree rather than a ref.
    
    266
    +    # Does not update local refs.
    
    267
    +    #
    
    268
    +    # Args:
    
    269
    +    #     remote (CASRemote): The remote to pull from
    
    270
    +    #     digest (Digest): The digest of the tree
    
    271
    +    #
    
    272
    +    def pull_tree(self, remote, digest):
    
    273
    +        try:
    
    274
    +            remote.init()
    
    277 275
     
    
    278
    -                digest = self._fetch_tree(remote, digest)
    
    276
    +            digest = self._fetch_tree(remote, digest)
    
    279 277
     
    
    280
    -                # no need to pull from additional remotes
    
    281
    -                return digest
    
    278
    +            return digest
    
    282 279
     
    
    283
    -            except grpc.RpcError as e:
    
    284
    -                if e.code() != grpc.StatusCode.NOT_FOUND:
    
    285
    -                    raise
    
    280
    +        except grpc.RpcError as e:
    
    281
    +            if e.code() != grpc.StatusCode.NOT_FOUND:
    
    282
    +                raise
    
    286 283
     
    
    287 284
             return None
    
    288 285
     
    
    289
    -    def link_key(self, element, oldkey, newkey):
    
    290
    -        oldref = self.get_artifact_fullname(element, oldkey)
    
    291
    -        newref = self.get_artifact_fullname(element, newkey)
    
    292
    -
    
    286
    +    # link_ref():
    
    287
    +    #
    
    288
    +    # Add an alias for an existing ref.
    
    289
    +    #
    
    290
    +    # Args:
    
    291
    +    #     oldref (str): An existing ref
    
    292
    +    #     newref (str): A new ref for the same directory
    
    293
    +    #
    
    294
    +    def link_ref(self, oldref, newref):
    
    293 295
             tree = self.resolve_ref(oldref)
    
    294 296
     
    
    295 297
             self.set_ref(newref, tree)
    
    296 298
     
    
    297
    -    def _push_refs_to_remote(self, refs, remote):
    
    299
    +    # push():
    
    300
    +    #
    
    301
    +    # Push committed refs to remote repository.
    
    302
    +    #
    
    303
    +    # Args:
    
    304
    +    #     refs (list): The refs to push
    
    305
    +    #     remote (CASRemote): The remote to push to
    
    306
    +    #
    
    307
    +    # Returns:
    
    308
    +    #   (bool): True if any remote was updated, False if no pushes were required
    
    309
    +    #
    
    310
    +    # Raises:
    
    311
    +    #   (CASError): if there was an error
    
    312
    +    #
    
    313
    +    def push(self, refs, remote):
    
    298 314
             skipped_remote = True
    
    299 315
             try:
    
    300 316
                 for ref in refs:
    
    301 317
                     tree = self.resolve_ref(ref)
    
    302 318
     
    
    303 319
                     # Check whether ref is already on the server in which case
    
    304
    -                # there is no need to push the artifact
    
    320
    +                # there is no need to push the ref
    
    305 321
                     try:
    
    306 322
                         request = buildstream_pb2.GetReferenceRequest()
    
    307 323
                         request.key = ref
    
    ... ... @@ -327,65 +343,38 @@ class CASCache(ArtifactCache):
    327 343
                     skipped_remote = False
    
    328 344
             except grpc.RpcError as e:
    
    329 345
                 if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
    
    330
    -                raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
    
    346
    +                raise CASError("Failed to push ref {}: {}".format(refs, e), temporary=True) from e
    
    331 347
     
    
    332 348
             return not skipped_remote
    
    333 349
     
    
    334
    -    def push(self, element, keys):
    
    335
    -
    
    336
    -        refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
    
    337
    -
    
    338
    -        project = element._get_project()
    
    339
    -
    
    340
    -        push_remotes = [r for r in self._remotes[project] if r.spec.push]
    
    341
    -
    
    342
    -        pushed = False
    
    343
    -
    
    344
    -        for remote in push_remotes:
    
    345
    -            remote.init()
    
    346
    -            display_key = element._get_brief_display_key()
    
    347
    -            element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
    
    348
    -
    
    349
    -            if self._push_refs_to_remote(refs, remote):
    
    350
    -                element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
    
    351
    -                pushed = True
    
    352
    -            else:
    
    353
    -                element.info("Remote ({}) already has {} cached".format(
    
    354
    -                    remote.spec.url, element._get_brief_display_key()
    
    355
    -                ))
    
    356
    -
    
    357
    -        return pushed
    
    358
    -
    
    359
    -    def push_directory(self, project, directory):
    
    360
    -        """ Push the given virtual directory to all remotes.
    
    361
    -
    
    362
    -        Args:
    
    363
    -            project (Project): The current project
    
    364
    -            directory (Directory): A virtual directory object to push.
    
    365
    -
    
    366
    -        Raises: ArtifactError if no push remotes are configured.
    
    367
    -        """
    
    368
    -
    
    369
    -        if self._has_push_remotes:
    
    370
    -            push_remotes = [r for r in self._remotes[project] if r.spec.push]
    
    371
    -        else:
    
    372
    -            push_remotes = []
    
    373
    -
    
    374
    -        if not push_remotes:
    
    375
    -            raise ArtifactError("CASCache: push_directory was called, but no remote artifact " +
    
    376
    -                                "servers are configured as push remotes.")
    
    377
    -
    
    378
    -        if directory.ref is None:
    
    379
    -            return
    
    380
    -
    
    381
    -        for remote in push_remotes:
    
    382
    -            remote.init()
    
    383
    -
    
    384
    -            self._send_directory(remote, directory.ref)
    
    350
    +    # push_directory():
    
    351
    +    #
    
    352
    +    # Push the given virtual directory to a remote.
    
    353
    +    #
    
    354
    +    # Args:
    
    355
    +    #     remote (CASRemote): The remote to push to
    
    356
    +    #     directory (Directory): A virtual directory object to push.
    
    357
    +    #
    
    358
    +    # Raises:
    
    359
    +    #     (CASError): if there was an error
    
    360
    +    #
    
    361
    +    def push_directory(self, remote, directory):
    
    362
    +        remote.init()
    
    385 363
     
    
    386
    -    def push_message(self, project, message):
    
    364
    +        self._send_directory(remote, directory.ref)
    
    387 365
     
    
    388
    -        push_remotes = [r for r in self._remotes[project] if r.spec.push]
    
    366
    +    # push_message():
    
    367
    +    #
    
    368
    +    # Push the given protobuf message to a remote.
    
    369
    +    #
    
    370
    +    # Args:
    
    371
    +    #     remote (CASRemote): The remote to push to
    
    372
    +    #     message (Message): A protobuf message to push.
    
    373
    +    #
    
    374
    +    # Raises:
    
    375
    +    #     (CASError): if there was an error
    
    376
    +    #
    
    377
    +    def push_message(self, remote, message):
    
    389 378
     
    
    390 379
             message_buffer = message.SerializeToString()
    
    391 380
             message_sha = hashlib.sha256(message_buffer)
    
    ... ... @@ -393,17 +382,25 @@ class CASCache(ArtifactCache):
    393 382
             message_digest.hash = message_sha.hexdigest()
    
    394 383
             message_digest.size_bytes = len(message_buffer)
    
    395 384
     
    
    396
    -        for remote in push_remotes:
    
    397
    -            remote.init()
    
    385
    +        remote.init()
    
    398 386
     
    
    399
    -            with io.BytesIO(message_buffer) as b:
    
    400
    -                self._send_blob(remote, message_digest, b)
    
    387
    +        with io.BytesIO(message_buffer) as b:
    
    388
    +            self._send_blob(remote, message_digest, b)
    
    401 389
     
    
    402 390
             return message_digest
    
    403 391
     
    
    404
    -    def _verify_digest_on_remote(self, remote, digest):
    
    405
    -        # Check whether ref is already on the server in which case
    
    406
    -        # there is no need to push the artifact
    
    392
    +    # verify_digest_on_remote():
    
    393
    +    #
    
    394
    +    # Check whether the object is already on the server in which case
    
    395
    +    # there is no need to upload it.
    
    396
    +    #
    
    397
    +    # Args:
    
    398
    +    #     remote (CASRemote): The remote to check
    
    399
    +    #     digest (Digest): The object digest.
    
    400
    +    #
    
    401
    +    def verify_digest_on_remote(self, remote, digest):
    
    402
    +        remote.init()
    
    403
    +
    
    407 404
             request = remote_execution_pb2.FindMissingBlobsRequest()
    
    408 405
             request.blob_digests.extend([digest])
    
    409 406
     
    
    ... ... @@ -413,24 +410,6 @@ class CASCache(ArtifactCache):
    413 410
     
    
    414 411
             return True
    
    415 412
     
    
    416
    -    def verify_digest_pushed(self, project, digest):
    
    417
    -
    
    418
    -        push_remotes = [r for r in self._remotes[project] if r.spec.push]
    
    419
    -
    
    420
    -        pushed = False
    
    421
    -
    
    422
    -        for remote in push_remotes:
    
    423
    -            remote.init()
    
    424
    -
    
    425
    -            if self._verify_digest_on_remote(remote, digest):
    
    426
    -                pushed = True
    
    427
    -
    
    428
    -        return pushed
    
    429
    -
    
    430
    -    ################################################
    
    431
    -    #                API Private Methods           #
    
    432
    -    ################################################
    
    433
    -
    
    434 413
         # objpath():
    
    435 414
         #
    
    436 415
         # Return the path of an object based on its digest.
    
    ... ... @@ -496,7 +475,7 @@ class CASCache(ArtifactCache):
    496 475
                 pass
    
    497 476
     
    
    498 477
             except OSError as e:
    
    499
    -            raise ArtifactError("Failed to hash object: {}".format(e)) from e
    
    478
    +            raise CASError("Failed to hash object: {}".format(e)) from e
    
    500 479
     
    
    501 480
             return digest
    
    502 481
     
    
    ... ... @@ -537,26 +516,39 @@ class CASCache(ArtifactCache):
    537 516
                     return digest
    
    538 517
     
    
    539 518
             except FileNotFoundError as e:
    
    540
    -            raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e
    
    519
    +            raise CASError("Attempt to access unavailable ref: {}".format(e)) from e
    
    541 520
     
    
    542
    -    def update_mtime(self, element, key):
    
    521
    +    # update_mtime()
    
    522
    +    #
    
    523
    +    # Update the mtime of a ref.
    
    524
    +    #
    
    525
    +    # Args:
    
    526
    +    #     ref (str): The ref to update
    
    527
    +    #
    
    528
    +    def update_mtime(self, ref):
    
    543 529
             try:
    
    544
    -            ref = self.get_artifact_fullname(element, key)
    
    545 530
                 os.utime(self._refpath(ref))
    
    546 531
             except FileNotFoundError as e:
    
    547
    -            raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e
    
    532
    +            raise CASError("Attempt to access unavailable ref: {}".format(e)) from e
    
    548 533
     
    
    534
    +    # calculate_cache_size()
    
    535
    +    #
    
    536
    +    # Return the real disk usage of the CAS cache.
    
    537
    +    #
    
    538
    +    # Returns:
    
    539
    +    #    (int): The size of the cache.
    
    540
    +    #
    
    549 541
         def calculate_cache_size(self):
    
    550 542
             return utils._get_dir_size(self.casdir)
    
    551 543
     
    
    552
    -    # list_artifacts():
    
    544
    +    # list_refs():
    
    553 545
         #
    
    554
    -    # List cached artifacts in Least Recently Modified (LRM) order.
    
    546
    +    # List refs in Least Recently Modified (LRM) order.
    
    555 547
         #
    
    556 548
         # Returns:
    
    557 549
         #     (list) - A list of refs in LRM order
    
    558 550
         #
    
    559
    -    def list_artifacts(self):
    
    551
    +    def list_refs(self):
    
    560 552
             # string of: /path/to/repo/refs/heads
    
    561 553
             ref_heads = os.path.join(self.casdir, 'refs', 'heads')
    
    562 554
     
    
    ... ... @@ -571,7 +563,7 @@ class CASCache(ArtifactCache):
    571 563
                     mtimes.append(os.path.getmtime(ref_path))
    
    572 564
     
    
    573 565
             # NOTE: Sorted will sort from earliest to latest, thus the
    
    574
    -        # first element of this list will be the file modified earliest.
    
    566
    +        # first ref of this list will be the file modified earliest.
    
    575 567
             return [ref for _, ref in sorted(zip(mtimes, refs))]
    
    576 568
     
    
    577 569
         # remove():
    
    ... ... @@ -590,28 +582,10 @@ class CASCache(ArtifactCache):
    590 582
         #
    
    591 583
         def remove(self, ref, *, defer_prune=False):
    
    592 584
     
    
    593
    -        # Remove extract if not used by other ref
    
    594
    -        tree = self.resolve_ref(ref)
    
    595
    -        ref_name, ref_hash = os.path.split(ref)
    
    596
    -        extract = os.path.join(self.extractdir, ref_name, tree.hash)
    
    597
    -        keys_file = os.path.join(extract, 'meta', 'keys.yaml')
    
    598
    -        if os.path.exists(keys_file):
    
    599
    -            keys_meta = _yaml.load(keys_file)
    
    600
    -            keys = [keys_meta['strong'], keys_meta['weak']]
    
    601
    -            remove_extract = True
    
    602
    -            for other_hash in keys:
    
    603
    -                if other_hash == ref_hash:
    
    604
    -                    continue
    
    605
    -                remove_extract = False
    
    606
    -                break
    
    607
    -
    
    608
    -            if remove_extract:
    
    609
    -                utils._force_rmtree(extract)
    
    610
    -
    
    611 585
             # Remove cache ref
    
    612 586
             refpath = self._refpath(ref)
    
    613 587
             if not os.path.exists(refpath):
    
    614
    -            raise ArtifactError("Could not find artifact for ref '{}'".format(ref))
    
    588
    +            raise CASError("Could not find ref '{}'".format(ref))
    
    615 589
     
    
    616 590
             os.unlink(refpath)
    
    617 591
     
    
    ... ... @@ -673,8 +647,10 @@ class CASCache(ArtifactCache):
    673 647
                              stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
    
    674 648
     
    
    675 649
             for dirnode in directory.directories:
    
    676
    -            fullpath = os.path.join(dest, dirnode.name)
    
    677
    -            self._checkout(fullpath, dirnode.digest)
    
    650
    +            # Don't try to checkout a dangling ref
    
    651
    +            if os.path.exists(self.objpath(dirnode.digest)):
    
    652
    +                fullpath = os.path.join(dest, dirnode.name)
    
    653
    +                self._checkout(fullpath, dirnode.digest)
    
    678 654
     
    
    679 655
             for symlinknode in directory.symlinks:
    
    680 656
                 # symlink
    
    ... ... @@ -721,7 +697,7 @@ class CASCache(ArtifactCache):
    721 697
                     # The process serving the socket can't be cached anyway
    
    722 698
                     pass
    
    723 699
                 else:
    
    724
    -                raise ArtifactError("Unsupported file type for {}".format(full_path))
    
    700
    +                raise CASError("Unsupported file type for {}".format(full_path))
    
    725 701
     
    
    726 702
             return self.add_object(digest=dir_digest,
    
    727 703
                                    buffer=directory.SerializeToString())
    
    ... ... @@ -740,7 +716,7 @@ class CASCache(ArtifactCache):
    740 716
                 if dirnode.name == name:
    
    741 717
                     return dirnode.digest
    
    742 718
     
    
    743
    -        raise ArtifactError("Subdirectory {} not found".format(name))
    
    719
    +        raise CASError("Subdirectory {} not found".format(name))
    
    744 720
     
    
    745 721
         def _diff_trees(self, tree_a, tree_b, *, added, removed, modified, path=""):
    
    746 722
             dir_a = remote_execution_pb2.Directory()
    
    ... ... @@ -812,29 +788,6 @@ class CASCache(ArtifactCache):
    812 788
             for dirnode in directory.directories:
    
    813 789
                 self._reachable_refs_dir(reachable, dirnode.digest)
    
    814 790
     
    
    815
    -    def _initialize_remote(self, remote_spec, q):
    
    816
    -        try:
    
    817
    -            remote = _CASRemote(remote_spec)
    
    818
    -            remote.init()
    
    819
    -
    
    820
    -            request = buildstream_pb2.StatusRequest()
    
    821
    -            response = remote.ref_storage.Status(request)
    
    822
    -
    
    823
    -            if remote_spec.push and not response.allow_updates:
    
    824
    -                q.put('Artifact server does not allow push')
    
    825
    -            else:
    
    826
    -                # No error
    
    827
    -                q.put(None)
    
    828
    -
    
    829
    -        except grpc.RpcError as e:
    
    830
    -            # str(e) is too verbose for errors reported to the user
    
    831
    -            q.put(e.details())
    
    832
    -
    
    833
    -        except Exception as e:               # pylint: disable=broad-except
    
    834
    -            # Whatever happens, we need to return it to the calling process
    
    835
    -            #
    
    836
    -            q.put(str(e))
    
    837
    -
    
    838 791
         def _required_blobs(self, directory_digest):
    
    839 792
             # parse directory, and recursively add blobs
    
    840 793
             d = remote_execution_pb2.Digest()
    
    ... ... @@ -952,11 +905,14 @@ class CASCache(ArtifactCache):
    952 905
         # Args:
    
    953 906
         #     remote (Remote): The remote to use.
    
    954 907
         #     dir_digest (Digest): Digest object for the directory to fetch.
    
    908
    +    #     excluded_subdirs (list): The optional list of subdirs to not fetch
    
    955 909
         #
    
    956
    -    def _fetch_directory(self, remote, dir_digest):
    
    910
    +    def _fetch_directory(self, remote, dir_digest, *, excluded_subdirs=None):
    
    957 911
             fetch_queue = [dir_digest]
    
    958 912
             fetch_next_queue = []
    
    959 913
             batch = _CASBatchRead(remote)
    
    914
    +        if not excluded_subdirs:
    
    915
    +            excluded_subdirs = []
    
    960 916
     
    
    961 917
             while len(fetch_queue) + len(fetch_next_queue) > 0:
    
    962 918
                 if not fetch_queue:
    
    ... ... @@ -971,8 +927,9 @@ class CASCache(ArtifactCache):
    971 927
                     directory.ParseFromString(f.read())
    
    972 928
     
    
    973 929
                 for dirnode in directory.directories:
    
    974
    -                batch = self._fetch_directory_node(remote, dirnode.digest, batch,
    
    975
    -                                                   fetch_queue, fetch_next_queue, recursive=True)
    
    930
    +                if dirnode.name not in excluded_subdirs:
    
    931
    +                    batch = self._fetch_directory_node(remote, dirnode.digest, batch,
    
    932
    +                                                       fetch_queue, fetch_next_queue, recursive=True)
    
    976 933
     
    
    977 934
                 for filenode in directory.files:
    
    978 935
                     batch = self._fetch_directory_node(remote, filenode.digest, batch,
    
    ... ... @@ -981,6 +938,10 @@ class CASCache(ArtifactCache):
    981 938
             # Fetch final batch
    
    982 939
             self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
    
    983 940
     
    
    941
    +    def _fetch_subdir(self, remote, tree, subdir):
    
    942
    +        subdirdigest = self._get_subdir(tree, subdir)
    
    943
    +        self._fetch_directory(remote, subdirdigest)
    
    944
    +
    
    984 945
         def _fetch_tree(self, remote, digest):
    
    985 946
             # download but do not store the Tree object
    
    986 947
             with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
    
    ... ... @@ -1080,7 +1041,7 @@ class CASCache(ArtifactCache):
    1080 1041
     
    
    1081 1042
     # Represents a single remote CAS cache.
    
    1082 1043
     #
    
    1083
    -class _CASRemote():
    
    1044
    +class CASRemote():
    
    1084 1045
         def __init__(self, spec):
    
    1085 1046
             self.spec = spec
    
    1086 1047
             self._initialized = False
    
    ... ... @@ -1125,7 +1086,7 @@ class _CASRemote():
    1125 1086
                                                                certificate_chain=client_cert_bytes)
    
    1126 1087
                     self.channel = grpc.secure_channel('{}:{}'.format(url.hostname, port), credentials)
    
    1127 1088
                 else:
    
    1128
    -                raise ArtifactError("Unsupported URL: {}".format(self.spec.url))
    
    1089
    +                raise CASError("Unsupported URL: {}".format(self.spec.url))
    
    1129 1090
     
    
    1130 1091
                 self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel)
    
    1131 1092
                 self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
    
    ... ... @@ -1203,10 +1164,10 @@ class _CASBatchRead():
    1203 1164
     
    
    1204 1165
             for response in batch_response.responses:
    
    1205 1166
                 if response.status.code != code_pb2.OK:
    
    1206
    -                raise ArtifactError("Failed to download blob {}: {}".format(
    
    1167
    +                raise CASError("Failed to download blob {}: {}".format(
    
    1207 1168
                         response.digest.hash, response.status.code))
    
    1208 1169
                 if response.digest.size_bytes != len(response.data):
    
    1209
    -                raise ArtifactError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
    
    1170
    +                raise CASError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
    
    1210 1171
                         response.digest.hash, response.digest.size_bytes, len(response.data)))
    
    1211 1172
     
    
    1212 1173
                 yield (response.digest, response.data)
    
    ... ... @@ -1248,7 +1209,7 @@ class _CASBatchUpdate():
    1248 1209
     
    
    1249 1210
             for response in batch_response.responses:
    
    1250 1211
                 if response.status.code != code_pb2.OK:
    
    1251
    -                raise ArtifactError("Failed to upload blob {}: {}".format(
    
    1212
    +                raise CASError("Failed to upload blob {}: {}".format(
    
    1252 1213
                         response.digest.hash, response.status.code))
    
    1253 1214
     
    
    1254 1215
     
    

  • buildstream/_artifactcache/casserver.py
    ... ... @@ -32,8 +32,9 @@ from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remo
    32 32
     from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    33 33
     from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
    
    34 34
     
    
    35
    -from .._exceptions import ArtifactError
    
    36
    -from .._context import Context
    
    35
    +from .._exceptions import CASError
    
    36
    +
    
    37
    +from .cascache import CASCache
    
    37 38
     
    
    38 39
     
    
    39 40
     # The default limit for gRPC messages is 4 MiB.
    
    ... ... @@ -55,26 +56,23 @@ class ArtifactTooLargeException(Exception):
    55 56
     #     enable_push (bool): Whether to allow blob uploads and artifact updates
    
    56 57
     #
    
    57 58
     def create_server(repo, *, enable_push):
    
    58
    -    context = Context()
    
    59
    -    context.artifactdir = os.path.abspath(repo)
    
    60
    -
    
    61
    -    artifactcache = context.artifactcache
    
    59
    +    cas = CASCache(os.path.abspath(repo))
    
    62 60
     
    
    63 61
         # Use max_workers default from Python 3.5+
    
    64 62
         max_workers = (os.cpu_count() or 1) * 5
    
    65 63
         server = grpc.server(futures.ThreadPoolExecutor(max_workers))
    
    66 64
     
    
    67 65
         bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
    
    68
    -        _ByteStreamServicer(artifactcache, enable_push=enable_push), server)
    
    66
    +        _ByteStreamServicer(cas, enable_push=enable_push), server)
    
    69 67
     
    
    70 68
         remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
    
    71
    -        _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
    
    69
    +        _ContentAddressableStorageServicer(cas, enable_push=enable_push), server)
    
    72 70
     
    
    73 71
         remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
    
    74 72
             _CapabilitiesServicer(), server)
    
    75 73
     
    
    76 74
         buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server(
    
    77
    -        _ReferenceStorageServicer(artifactcache, enable_push=enable_push), server)
    
    75
    +        _ReferenceStorageServicer(cas, enable_push=enable_push), server)
    
    78 76
     
    
    79 77
         return server
    
    80 78
     
    
    ... ... @@ -333,7 +331,7 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
    333 331
     
    
    334 332
                 response.digest.hash = tree.hash
    
    335 333
                 response.digest.size_bytes = tree.size_bytes
    
    336
    -        except ArtifactError:
    
    334
    +        except CASError:
    
    337 335
                 context.set_code(grpc.StatusCode.NOT_FOUND)
    
    338 336
     
    
    339 337
             return response
    
    ... ... @@ -437,7 +435,7 @@ def _clean_up_cache(cas, object_size):
    437 435
             return 0
    
    438 436
     
    
    439 437
         # obtain a list of LRP artifacts
    
    440
    -    LRP_artifacts = cas.list_artifacts()
    
    438
    +    LRP_artifacts = cas.list_refs()
    
    441 439
     
    
    442 440
         removed_size = 0  # in bytes
    
    443 441
         while object_size - removed_size > free_disk_space:
    

  • buildstream/_context.py
    ... ... @@ -31,7 +31,6 @@ from ._exceptions import LoadError, LoadErrorReason, BstError
    31 31
     from ._message import Message, MessageType
    
    32 32
     from ._profile import Topics, profile_start, profile_end
    
    33 33
     from ._artifactcache import ArtifactCache
    
    34
    -from ._artifactcache.cascache import CASCache
    
    35 34
     from ._workspaces import Workspaces
    
    36 35
     from .plugin import _plugin_lookup
    
    37 36
     
    
    ... ... @@ -105,6 +104,9 @@ class Context():
    105 104
             # What to do when a build fails in non interactive mode
    
    106 105
             self.sched_error_action = 'continue'
    
    107 106
     
    
    107
    +        # Whether or not to attempt to pull build trees globally
    
    108
    +        self.pull_build_trees = False
    
    109
    +
    
    108 110
             # Whether elements must be rebuilt when their dependencies have changed
    
    109 111
             self._strict_build_plan = None
    
    110 112
     
    
    ... ... @@ -161,7 +163,7 @@ class Context():
    161 163
             _yaml.node_validate(defaults, [
    
    162 164
                 'sourcedir', 'builddir', 'artifactdir', 'logdir',
    
    163 165
                 'scheduler', 'artifacts', 'logging', 'projects',
    
    164
    -            'cache'
    
    166
    +            'cache', 'pullbuildtrees'
    
    165 167
             ])
    
    166 168
     
    
    167 169
             for directory in ['sourcedir', 'builddir', 'artifactdir', 'logdir']:
    
    ... ... @@ -186,6 +188,9 @@ class Context():
    186 188
             # Load artifact share configuration
    
    187 189
             self.artifact_cache_specs = ArtifactCache.specs_from_config_node(defaults)
    
    188 190
     
    
    191
    +        # Load pull build trees configuration
    
    192
    +        self.pull_build_trees = _yaml.node_get(defaults, bool, 'pullbuildtrees', default_value='False')
    
    193
    +
    
    189 194
             # Load logging config
    
    190 195
             logging = _yaml.node_get(defaults, Mapping, 'logging')
    
    191 196
             _yaml.node_validate(logging, [
    
    ... ... @@ -233,7 +238,7 @@ class Context():
    233 238
         @property
    
    234 239
         def artifactcache(self):
    
    235 240
             if not self._artifactcache:
    
    236
    -            self._artifactcache = CASCache(self)
    
    241
    +            self._artifactcache = ArtifactCache(self)
    
    237 242
     
    
    238 243
             return self._artifactcache
    
    239 244
     
    

  • buildstream/_elementfactory.py
    ... ... @@ -47,7 +47,6 @@ class ElementFactory(PluginContext):
    47 47
         # Args:
    
    48 48
         #    context (object): The Context object for processing
    
    49 49
         #    project (object): The project object
    
    50
    -    #    artifacts (ArtifactCache): The artifact cache
    
    51 50
         #    meta (object): The loaded MetaElement
    
    52 51
         #
    
    53 52
         # Returns: A newly created Element object of the appropriate kind
    
    ... ... @@ -56,9 +55,9 @@ class ElementFactory(PluginContext):
    56 55
         #    PluginError (if the kind lookup failed)
    
    57 56
         #    LoadError (if the element itself took issue with the config)
    
    58 57
         #
    
    59
    -    def create(self, context, project, artifacts, meta):
    
    58
    +    def create(self, context, project, meta):
    
    60 59
             element_type, default_config = self.lookup(meta.kind)
    
    61
    -        element = element_type(context, project, artifacts, meta, default_config)
    
    60
    +        element = element_type(context, project, meta, default_config)
    
    62 61
             version = self._format_versions.get(meta.kind, 0)
    
    63 62
             self._assert_plugin_format(element, version)
    
    64 63
             return element

  • buildstream/_exceptions.py
    ... ... @@ -90,6 +90,7 @@ class ErrorDomain(Enum):
    90 90
         APP = 12
    
    91 91
         STREAM = 13
    
    92 92
         VIRTUAL_FS = 14
    
    93
    +    CAS = 15
    
    93 94
     
    
    94 95
     
    
    95 96
     # BstError is an internal base exception class for BuildSream
    
    ... ... @@ -274,6 +275,15 @@ class ArtifactError(BstError):
    274 275
             super().__init__(message, detail=detail, domain=ErrorDomain.ARTIFACT, reason=reason, temporary=True)
    
    275 276
     
    
    276 277
     
    
    278
    +# CASError
    
    279
    +#
    
    280
    +# Raised when errors are encountered in the CAS
    
    281
    +#
    
    282
    +class CASError(BstError):
    
    283
    +    def __init__(self, message, *, detail=None, reason=None, temporary=False):
    
    284
    +        super().__init__(message, detail=detail, domain=ErrorDomain.CAS, reason=reason, temporary=True)
    
    285
    +
    
    286
    +
    
    277 287
     # PipelineError
    
    278 288
     #
    
    279 289
     # Raised from pipeline operations
    

  • buildstream/_frontend/app.py
    ... ... @@ -182,7 +182,8 @@ class App():
    182 182
                 'fetchers': 'sched_fetchers',
    
    183 183
                 'builders': 'sched_builders',
    
    184 184
                 'pushers': 'sched_pushers',
    
    185
    -            'network_retries': 'sched_network_retries'
    
    185
    +            'network_retries': 'sched_network_retries',
    
    186
    +            'pull_build_trees': 'pull_build_trees'
    
    186 187
             }
    
    187 188
             for cli_option, context_attr in override_map.items():
    
    188 189
                 option_value = self._main_options.get(cli_option)
    

  • buildstream/_frontend/cli.py
    ... ... @@ -219,6 +219,8 @@ def print_version(ctx, param, value):
    219 219
                   help="Specify a project option")
    
    220 220
     @click.option('--default-mirror', default=None,
    
    221 221
                   help="The mirror to fetch from first, before attempting other mirrors")
    
    222
    +@click.option('--pull-build-trees', is_flag=True, default=None,
    
    223
    +              help="Include an element's build trees when pulling remote element artifacts")
    
    222 224
     @click.pass_context
    
    223 225
     def cli(context, **kwargs):
    
    224 226
         """Build and manipulate BuildStream projects
    

  • buildstream/_loader/loader.py
    ... ... @@ -537,7 +537,7 @@ class Loader():
    537 537
                 raise LoadError(LoadErrorReason.INVALID_DATA,
    
    538 538
                                 "{}: Expected junction but element kind is {}".format(filename, meta_element.kind))
    
    539 539
     
    
    540
    -        element = Element._new_from_meta(meta_element, self._context.artifactcache)
    
    540
    +        element = Element._new_from_meta(meta_element)
    
    541 541
             element._preflight()
    
    542 542
     
    
    543 543
             sources = list(element.sources())
    

  • buildstream/_pipeline.py
    ... ... @@ -106,7 +106,7 @@ class Pipeline():
    106 106
     
    
    107 107
             profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in targets))
    
    108 108
     
    
    109
    -        elements = self._project.load_elements(targets, self._artifacts,
    
    109
    +        elements = self._project.load_elements(targets,
    
    110 110
                                                    rewritable=rewritable,
    
    111 111
                                                    fetch_subprojects=fetch_subprojects)
    
    112 112
     
    

  • buildstream/_project.py
    ... ... @@ -224,18 +224,17 @@ class Project():
    224 224
         # Instantiate and return an element
    
    225 225
         #
    
    226 226
         # Args:
    
    227
    -    #    artifacts (ArtifactCache): The artifact cache
    
    228 227
         #    meta (MetaElement): The loaded MetaElement
    
    229 228
         #    first_pass (bool): Whether to use first pass configuration (for junctions)
    
    230 229
         #
    
    231 230
         # Returns:
    
    232 231
         #    (Element): A newly created Element object of the appropriate kind
    
    233 232
         #
    
    234
    -    def create_element(self, artifacts, meta, *, first_pass=False):
    
    233
    +    def create_element(self, meta, *, first_pass=False):
    
    235 234
             if first_pass:
    
    236
    -            return self.first_pass_config.element_factory.create(self._context, self, artifacts, meta)
    
    235
    +            return self.first_pass_config.element_factory.create(self._context, self, meta)
    
    237 236
             else:
    
    238
    -            return self.config.element_factory.create(self._context, self, artifacts, meta)
    
    237
    +            return self.config.element_factory.create(self._context, self, meta)
    
    239 238
     
    
    240 239
         # create_source()
    
    241 240
         #
    
    ... ... @@ -305,7 +304,6 @@ class Project():
    305 304
         #
    
    306 305
         # Args:
    
    307 306
         #    targets (list): Target names
    
    308
    -    #    artifacts (ArtifactCache): Artifact cache
    
    309 307
         #    rewritable (bool): Whether the loaded files should be rewritable
    
    310 308
         #                       this is a bit more expensive due to deep copies
    
    311 309
         #    fetch_subprojects (bool): Whether we should fetch subprojects as a part of the
    
    ... ... @@ -314,7 +312,7 @@ class Project():
    314 312
         # Returns:
    
    315 313
         #    (list): A list of loaded Element
    
    316 314
         #
    
    317
    -    def load_elements(self, targets, artifacts, *,
    
    315
    +    def load_elements(self, targets, *,
    
    318 316
                           rewritable=False, fetch_subprojects=False):
    
    319 317
             with self._context.timed_activity("Loading elements", silent_nested=True):
    
    320 318
                 meta_elements = self.loader.load(targets, rewritable=rewritable,
    
    ... ... @@ -323,7 +321,7 @@ class Project():
    323 321
     
    
    324 322
             with self._context.timed_activity("Resolving elements"):
    
    325 323
                 elements = [
    
    326
    -                Element._new_from_meta(meta, artifacts)
    
    324
    +                Element._new_from_meta(meta)
    
    327 325
                     for meta in meta_elements
    
    328 326
                 ]
    
    329 327
     
    

  • buildstream/_scheduler/queues/pullqueue.py
    ... ... @@ -32,9 +32,20 @@ class PullQueue(Queue):
    32 32
         complete_name = "Pulled"
    
    33 33
         resources = [ResourceType.DOWNLOAD, ResourceType.CACHE]
    
    34 34
     
    
    35
    +    def __init__(self, scheduler):
    
    36
    +        super().__init__(scheduler)
    
    37
    +
    
    38
    +        # Current default exclusions on pull
    
    39
    +        self._excluded_subdirs = ["buildtree"]
    
    40
    +        self._subdir = None
    
    41
    +        # If build trees are to be pulled, remove the value from exclusion list
    
    42
    +        if self._scheduler.context.pull_build_trees:
    
    43
    +            self._subdir = "buildtree"
    
    44
    +            self._excluded_subdirs.remove(self._subdir)
    
    45
    +
    
    35 46
         def process(self, element):
    
    36 47
             # returns whether an artifact was downloaded or not
    
    37
    -        if not element._pull():
    
    48
    +        if not element._pull(subdir=self._subdir, excluded_subdirs=self._excluded_subdirs):
    
    38 49
                 raise SkipJob(self.action_name)
    
    39 50
     
    
    40 51
         def status(self, element):
    
    ... ... @@ -49,7 +60,7 @@ class PullQueue(Queue):
    49 60
             if not element._can_query_cache():
    
    50 61
                 return QueueStatus.WAIT
    
    51 62
     
    
    52
    -        if element._pull_pending():
    
    63
    +        if element._pull_pending(subdir=self._subdir):
    
    53 64
                 return QueueStatus.READY
    
    54 65
             else:
    
    55 66
                 return QueueStatus.SKIP
    

  • buildstream/data/userconfig.yaml
    ... ... @@ -97,3 +97,5 @@ logging:
    97 97
     
    
    98 98
         [%{elapsed}][%{key}][%{element}] %{action} %{message}
    
    99 99
     
    
    100
    +# Whether to pull buildtrees when downloading element artifacts
    
    101
    +pullbuildtrees: False

  • buildstream/element.py
    ... ... @@ -174,7 +174,7 @@ class Element(Plugin):
    174 174
         *Since: 1.4*
    
    175 175
         """
    
    176 176
     
    
    177
    -    def __init__(self, context, project, artifacts, meta, plugin_conf):
    
    177
    +    def __init__(self, context, project, meta, plugin_conf):
    
    178 178
     
    
    179 179
             self.__cache_key_dict = None            # Dict for cache key calculation
    
    180 180
             self.__cache_key = None                 # Our cached cache key
    
    ... ... @@ -199,7 +199,7 @@ class Element(Plugin):
    199 199
             self.__sources = []                     # List of Sources
    
    200 200
             self.__weak_cache_key = None            # Our cached weak cache key
    
    201 201
             self.__strict_cache_key = None          # Our cached cache key for strict builds
    
    202
    -        self.__artifacts = artifacts            # Artifact cache
    
    202
    +        self.__artifacts = context.artifactcache  # Artifact cache
    
    203 203
             self.__consistency = Consistency.INCONSISTENT  # Cached overall consistency state
    
    204 204
             self.__strong_cached = None             # Whether we have a cached artifact
    
    205 205
             self.__weak_cached = None               # Whether we have a cached artifact
    
    ... ... @@ -872,14 +872,13 @@ class Element(Plugin):
    872 872
         # and its dependencies from a meta element.
    
    873 873
         #
    
    874 874
         # Args:
    
    875
    -    #    artifacts (ArtifactCache): The artifact cache
    
    876 875
         #    meta (MetaElement): The meta element
    
    877 876
         #
    
    878 877
         # Returns:
    
    879 878
         #    (Element): A newly created Element instance
    
    880 879
         #
    
    881 880
         @classmethod
    
    882
    -    def _new_from_meta(cls, meta, artifacts):
    
    881
    +    def _new_from_meta(cls, meta):
    
    883 882
     
    
    884 883
             if not meta.first_pass:
    
    885 884
                 meta.project.ensure_fully_loaded()
    
    ... ... @@ -887,7 +886,7 @@ class Element(Plugin):
    887 886
             if meta in cls.__instantiated_elements:
    
    888 887
                 return cls.__instantiated_elements[meta]
    
    889 888
     
    
    890
    -        element = meta.project.create_element(artifacts, meta, first_pass=meta.first_pass)
    
    889
    +        element = meta.project.create_element(meta, first_pass=meta.first_pass)
    
    891 890
             cls.__instantiated_elements[meta] = element
    
    892 891
     
    
    893 892
             # Instantiate sources
    
    ... ... @@ -904,10 +903,10 @@ class Element(Plugin):
    904 903
     
    
    905 904
             # Instantiate dependencies
    
    906 905
             for meta_dep in meta.dependencies:
    
    907
    -            dependency = Element._new_from_meta(meta_dep, artifacts)
    
    906
    +            dependency = Element._new_from_meta(meta_dep)
    
    908 907
                 element.__runtime_dependencies.append(dependency)
    
    909 908
             for meta_dep in meta.build_dependencies:
    
    910
    -            dependency = Element._new_from_meta(meta_dep, artifacts)
    
    909
    +            dependency = Element._new_from_meta(meta_dep)
    
    911 910
                 element.__build_dependencies.append(dependency)
    
    912 911
     
    
    913 912
             return element
    
    ... ... @@ -1399,9 +1398,18 @@ class Element(Plugin):
    1399 1398
                                                      .format(workspace.get_absolute_path())):
    
    1400 1399
                                 workspace.stage(temp_staging_directory)
    
    1401 1400
                     elif self._cached():
    
    1402
    -                    # We have a cached buildtree to use, instead
    
    1403
    -                    artifact_base, _ = self.__extract()
    
    1404
    -                    import_dir = os.path.join(artifact_base, 'buildtree')
    
    1401
    +                    # Check if we have a cached buildtree to use
    
    1402
    +                    context = self._get_context()
    
    1403
    +                    if context.get_strict():
    
    1404
    +                        if self.__artifacts.contains_subdir_artifact(self, self.__strict_cache_key, 'buildtree'):
    
    1405
    +                            artifact_base, _ = self.__extract()
    
    1406
    +                            import_dir = os.path.join(artifact_base, 'buildtree')
    
    1407
    +                    elif self.__artifacts.contains_subdir_artifact(self, self.__weak_cache_key, 'buildtree'):
    
    1408
    +                        artifact_base, _ = self.__extract()
    
    1409
    +                        import_dir = os.path.join(artifact_base, 'buildtree')
    
    1410
    +                    else:
    
    1411
    +                        self.warn("{} is cached without a buildtree, the source will be staged instead"
    
    1412
    +                                  .format(self.name))
    
    1405 1413
                     else:
    
    1406 1414
                         # No workspace, stage directly
    
    1407 1415
                         for source in self.sources():
    
    ... ... @@ -1699,18 +1707,26 @@ class Element(Plugin):
    1699 1707
     
    
    1700 1708
         # _pull_pending()
    
    1701 1709
         #
    
    1702
    -    # Check whether the artifact will be pulled.
    
    1710
    +    # Check whether the artifact will be pulled. If the pull operation is to
    
    1711
    +    # include a specific subdir of the element artifact (from cli or user conf)
    
    1712
    +    # then the local cache is queried for the subdirs existence.
    
    1713
    +    #
    
    1714
    +    # Args:
    
    1715
    +    #    subdir (str): Whether the pull has been invoked with a specific subdir set
    
    1703 1716
         #
    
    1704 1717
         # Returns:
    
    1705 1718
         #   (bool): Whether a pull operation is pending
    
    1706 1719
         #
    
    1707
    -    def _pull_pending(self):
    
    1720
    +    def _pull_pending(self, subdir=None):
    
    1708 1721
             if self._get_workspace():
    
    1709 1722
                 # Workspace builds are never pushed to artifact servers
    
    1710 1723
                 return False
    
    1711 1724
     
    
    1712
    -        if self.__strong_cached:
    
    1713
    -            # Artifact already in local cache
    
    1725
    +        if self.__strong_cached and subdir:
    
    1726
    +            # If we've specified a subdir, check if the subdir is cached locally
    
    1727
    +            if self.__artifacts.contains_subdir_artifact(self, self.__strict_cache_key, subdir):
    
    1728
    +                return False
    
    1729
    +        elif self.__strong_cached:
    
    1714 1730
                 return False
    
    1715 1731
     
    
    1716 1732
             # Pull is pending if artifact remote server available
    
    ... ... @@ -1732,50 +1748,27 @@ class Element(Plugin):
    1732 1748
     
    
    1733 1749
             self._update_state()
    
    1734 1750
     
    
    1735
    -    def _pull_strong(self, *, progress=None):
    
    1736
    -        weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
    
    1737
    -
    
    1738
    -        key = self.__strict_cache_key
    
    1739
    -        if not self.__artifacts.pull(self, key, progress=progress):
    
    1740
    -            return False
    
    1741
    -
    
    1742
    -        # update weak ref by pointing it to this newly fetched artifact
    
    1743
    -        self.__artifacts.link_key(self, key, weak_key)
    
    1744
    -
    
    1745
    -        return True
    
    1746
    -
    
    1747
    -    def _pull_weak(self, *, progress=None):
    
    1748
    -        weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
    
    1749
    -
    
    1750
    -        if not self.__artifacts.pull(self, weak_key, progress=progress):
    
    1751
    -            return False
    
    1752
    -
    
    1753
    -        # extract strong cache key from this newly fetched artifact
    
    1754
    -        self._pull_done()
    
    1755
    -
    
    1756
    -        # create tag for strong cache key
    
    1757
    -        key = self._get_cache_key(strength=_KeyStrength.STRONG)
    
    1758
    -        self.__artifacts.link_key(self, weak_key, key)
    
    1759
    -
    
    1760
    -        return True
    
    1761
    -
    
    1762 1751
         # _pull():
    
    1763 1752
         #
    
    1764 1753
         # Pull artifact from remote artifact repository into local artifact cache.
    
    1765 1754
         #
    
    1755
    +    # Args:
    
    1756
    +    #     subdir (str): The optional specific subdir to pull
    
    1757
    +    #     excluded_subdirs (list): The optional list of subdirs to not pull
    
    1758
    +    #
    
    1766 1759
         # Returns: True if the artifact has been downloaded, False otherwise
    
    1767 1760
         #
    
    1768
    -    def _pull(self):
    
    1761
    +    def _pull(self, subdir=None, excluded_subdirs=None):
    
    1769 1762
             context = self._get_context()
    
    1770 1763
     
    
    1771 1764
             def progress(percent, message):
    
    1772 1765
                 self.status(message)
    
    1773 1766
     
    
    1774 1767
             # Attempt to pull artifact without knowing whether it's available
    
    1775
    -        pulled = self._pull_strong(progress=progress)
    
    1768
    +        pulled = self.__pull_strong(progress=progress, subdir=subdir, excluded_subdirs=excluded_subdirs)
    
    1776 1769
     
    
    1777 1770
             if not pulled and not self._cached() and not context.get_strict():
    
    1778
    -            pulled = self._pull_weak(progress=progress)
    
    1771
    +            pulled = self.__pull_weak(progress=progress, subdir=subdir, excluded_subdirs=excluded_subdirs)
    
    1779 1772
     
    
    1780 1773
             if not pulled:
    
    1781 1774
                 return False
    
    ... ... @@ -1798,10 +1791,21 @@ class Element(Plugin):
    1798 1791
             if not self._cached():
    
    1799 1792
                 return True
    
    1800 1793
     
    
    1801
    -        # Do not push tained artifact
    
    1794
    +        # Do not push tainted artifact
    
    1802 1795
             if self.__get_tainted():
    
    1803 1796
                 return True
    
    1804 1797
     
    
    1798
    +        # strict_cache_key can't be relied on to be available when running in non strict mode
    
    1799
    +        context = self._get_context()
    
    1800
    +
    
    1801
    +        # Do not push elements that have a dangling buildtree artifact unless element type is
    
    1802
    +        # expected to have an empty buildtree directory
    
    1803
    +        if context.get_strict():
    
    1804
    +            if not self.__artifacts.contains_subdir_artifact(self, self.__strict_cache_key, 'buildtree'):
    
    1805
    +                return True
    
    1806
    +        elif not self.__artifacts.contains_subdir_artifact(self, self.__weak_cache_key, 'buildtree'):
    
    1807
    +            return True
    
    1808
    +
    
    1805 1809
             return False
    
    1806 1810
     
    
    1807 1811
         # _push():
    
    ... ... @@ -2057,7 +2061,7 @@ class Element(Plugin):
    2057 2061
                     'sources': [s._get_unique_key(workspace is None) for s in self.__sources],
    
    2058 2062
                     'workspace': '' if workspace is None else workspace.get_key(self._get_project()),
    
    2059 2063
                     'public': self.__public,
    
    2060
    -                'cache': type(self.__artifacts).__name__
    
    2064
    +                'cache': 'CASCache'
    
    2061 2065
                 }
    
    2062 2066
     
    
    2063 2067
                 self.__cache_key_dict['fatal-warnings'] = sorted(project._fatal_warnings)
    
    ... ... @@ -2682,6 +2686,59 @@ class Element(Plugin):
    2682 2686
     
    
    2683 2687
             return utils._deduplicate(keys)
    
    2684 2688
     
    
    2689
    +    # __pull_strong():
    
    2690
    +    #
    
    2691
    +    # Attempt pulling given element from configured artifact caches with
    
    2692
    +    # the strict cache key
    
    2693
    +    #
    
    2694
    +    # Args:
    
    2695
    +    #     progress (callable): The progress callback, if any
    
    2696
    +    #     subdir (str): The optional specific subdir to pull
    
    2697
    +    #     excluded_subdirs (list): The optional list of subdirs to not pull
    
    2698
    +    #
    
    2699
    +    # Returns:
    
    2700
    +    #     (bool): Whether or not the pull was successful
    
    2701
    +    #
    
    2702
    +    def __pull_strong(self, *, progress=None, subdir=None, excluded_subdirs=None):
    
    2703
    +        weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
    
    2704
    +        key = self.__strict_cache_key
    
    2705
    +        if not self.__artifacts.pull(self, key, progress=progress, subdir=subdir,
    
    2706
    +                                     excluded_subdirs=excluded_subdirs):
    
    2707
    +            return False
    
    2708
    +
    
    2709
    +        # update weak ref by pointing it to this newly fetched artifact
    
    2710
    +        self.__artifacts.link_key(self, key, weak_key)
    
    2711
    +
    
    2712
    +        return True
    
    2713
    +
    
    2714
    +    # __pull_weak():
    
    2715
    +    #
    
    2716
    +    # Attempt pulling given element from configured artifact caches with
    
    2717
    +    # the weak cache key
    
    2718
    +    #
    
    2719
    +    # Args:
    
    2720
    +    #     progress (callable): The progress callback, if any
    
    2721
    +    #     subdir (str): The optional specific subdir to pull
    
    2722
    +    #     excluded_subdirs (list): The optional list of subdirs to not pull
    
    2723
    +    #
    
    2724
    +    # Returns:
    
    2725
    +    #     (bool): Whether or not the pull was successful
    
    2726
    +    #
    
    2727
    +    def __pull_weak(self, *, progress=None, subdir=None, excluded_subdirs=None):
    
    2728
    +        weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
    
    2729
    +        if not self.__artifacts.pull(self, weak_key, progress=progress, subdir=subdir,
    
    2730
    +                                     excluded_subdirs=excluded_subdirs):
    
    2731
    +            return False
    
    2732
    +
    
    2733
    +        # extract strong cache key from this newly fetched artifact
    
    2734
    +        self._pull_done()
    
    2735
    +
    
    2736
    +        # create tag for strong cache key
    
    2737
    +        key = self._get_cache_key(strength=_KeyStrength.STRONG)
    
    2738
    +        self.__artifacts.link_key(self, weak_key, key)
    
    2739
    +
    
    2740
    +        return True
    
    2741
    +
    
    2685 2742
     
    
    2686 2743
     def _overlap_error_detail(f, forbidden_overlap_elements, elements):
    
    2687 2744
         if forbidden_overlap_elements:
    

  • buildstream/storage/_casbaseddirectory.py
    ... ... @@ -79,7 +79,7 @@ class CasBasedDirectory(Directory):
    79 79
             self.filename = filename
    
    80 80
             self.common_name = common_name
    
    81 81
             self.pb2_directory = remote_execution_pb2.Directory()
    
    82
    -        self.cas_cache = context.artifactcache
    
    82
    +        self.cas_cache = context.artifactcache.cas
    
    83 83
             if ref:
    
    84 84
                 with open(self.cas_cache.objpath(ref), 'rb') as f:
    
    85 85
                     self.pb2_directory.ParseFromString(f.read())
    

  • dev-requirements.txt
    1 1
     coverage == 4.4.0
    
    2 2
     pep8
    
    3 3
     pylint == 2.1.1
    
    4
    -pytest >= 3.7
    
    4
    +pytest >= 3.8
    
    5 5
     pytest-cov >= 2.5.0
    
    6 6
     pytest-datafiles
    
    7 7
     pytest-env
    

  • tests/artifactcache/pull.py
    ... ... @@ -90,7 +90,7 @@ def test_pull(cli, tmpdir, datafiles):
    90 90
             cas = context.artifactcache
    
    91 91
     
    
    92 92
             # Assert that the element's artifact is **not** cached
    
    93
    -        element = project.load_elements(['target.bst'], cas)[0]
    
    93
    +        element = project.load_elements(['target.bst'])[0]
    
    94 94
             element_key = cli.get_element_key(project_dir, 'target.bst')
    
    95 95
             assert not cas.contains(element, element_key)
    
    96 96
     
    
    ... ... @@ -132,7 +132,7 @@ def _test_pull(user_config_file, project_dir, artifact_dir,
    132 132
         cas = context.artifactcache
    
    133 133
     
    
    134 134
         # Load the target element
    
    135
    -    element = project.load_elements([element_name], cas)[0]
    
    135
    +    element = project.load_elements([element_name])[0]
    
    136 136
     
    
    137 137
         # Manually setup the CAS remote
    
    138 138
         cas.setup_remotes(use_config=True)
    
    ... ... @@ -190,15 +190,16 @@ def test_pull_tree(cli, tmpdir, datafiles):
    190 190
             # Load the project and CAS cache
    
    191 191
             project = Project(project_dir, context)
    
    192 192
             project.ensure_fully_loaded()
    
    193
    -        cas = context.artifactcache
    
    193
    +        artifactcache = context.artifactcache
    
    194
    +        cas = artifactcache.cas
    
    194 195
     
    
    195 196
             # Assert that the element's artifact is cached
    
    196
    -        element = project.load_elements(['target.bst'], cas)[0]
    
    197
    +        element = project.load_elements(['target.bst'])[0]
    
    197 198
             element_key = cli.get_element_key(project_dir, 'target.bst')
    
    198
    -        assert cas.contains(element, element_key)
    
    199
    +        assert artifactcache.contains(element, element_key)
    
    199 200
     
    
    200 201
             # Retrieve the Directory object from the cached artifact
    
    201
    -        artifact_ref = cas.get_artifact_fullname(element, element_key)
    
    202
    +        artifact_ref = artifactcache.get_artifact_fullname(element, element_key)
    
    202 203
             artifact_digest = cas.resolve_ref(artifact_ref)
    
    203 204
     
    
    204 205
             queue = multiprocessing.Queue()
    
    ... ... @@ -268,12 +269,13 @@ def _test_push_tree(user_config_file, project_dir, artifact_dir, artifact_digest
    268 269
         project.ensure_fully_loaded()
    
    269 270
     
    
    270 271
         # Create a local CAS cache handle
    
    271
    -    cas = context.artifactcache
    
    272
    +    artifactcache = context.artifactcache
    
    273
    +    cas = artifactcache.cas
    
    272 274
     
    
    273 275
         # Manually setup the CAS remote
    
    274
    -    cas.setup_remotes(use_config=True)
    
    276
    +    artifactcache.setup_remotes(use_config=True)
    
    275 277
     
    
    276
    -    if cas.has_push_remotes():
    
    278
    +    if artifactcache.has_push_remotes():
    
    277 279
             directory = remote_execution_pb2.Directory()
    
    278 280
     
    
    279 281
             with open(cas.objpath(artifact_digest), 'rb') as f:
    
    ... ... @@ -284,7 +286,7 @@ def _test_push_tree(user_config_file, project_dir, artifact_dir, artifact_digest
    284 286
             tree_maker(cas, tree, directory)
    
    285 287
     
    
    286 288
             # Push the Tree as a regular message
    
    287
    -        tree_digest = cas.push_message(project, tree)
    
    289
    +        tree_digest = artifactcache.push_message(project, tree)
    
    288 290
     
    
    289 291
             queue.put((tree_digest.hash, tree_digest.size_bytes))
    
    290 292
         else:
    

  • tests/artifactcache/push.py
    ... ... @@ -69,7 +69,7 @@ def test_push(cli, tmpdir, datafiles):
    69 69
             cas = context.artifactcache
    
    70 70
     
    
    71 71
             # Assert that the element's artifact is cached
    
    72
    -        element = project.load_elements(['target.bst'], cas)[0]
    
    72
    +        element = project.load_elements(['target.bst'])[0]
    
    73 73
             element_key = cli.get_element_key(project_dir, 'target.bst')
    
    74 74
             assert cas.contains(element, element_key)
    
    75 75
     
    
    ... ... @@ -111,7 +111,7 @@ def _test_push(user_config_file, project_dir, artifact_dir,
    111 111
         cas = context.artifactcache
    
    112 112
     
    
    113 113
         # Load the target element
    
    114
    -    element = project.load_elements([element_name], cas)[0]
    
    114
    +    element = project.load_elements([element_name])[0]
    
    115 115
     
    
    116 116
         # Manually setup the CAS remote
    
    117 117
         cas.setup_remotes(use_config=True)
    
    ... ... @@ -165,20 +165,21 @@ def test_push_directory(cli, tmpdir, datafiles):
    165 165
             # Load the project and CAS cache
    
    166 166
             project = Project(project_dir, context)
    
    167 167
             project.ensure_fully_loaded()
    
    168
    -        cas = context.artifactcache
    
    168
    +        artifactcache = context.artifactcache
    
    169
    +        cas = artifactcache.cas
    
    169 170
     
    
    170 171
             # Assert that the element's artifact is cached
    
    171
    -        element = project.load_elements(['target.bst'], cas)[0]
    
    172
    +        element = project.load_elements(['target.bst'])[0]
    
    172 173
             element_key = cli.get_element_key(project_dir, 'target.bst')
    
    173
    -        assert cas.contains(element, element_key)
    
    174
    +        assert artifactcache.contains(element, element_key)
    
    174 175
     
    
    175 176
             # Manually setup the CAS remote
    
    176
    -        cas.setup_remotes(use_config=True)
    
    177
    -        cas.initialize_remotes()
    
    178
    -        assert cas.has_push_remotes(element=element)
    
    177
    +        artifactcache.setup_remotes(use_config=True)
    
    178
    +        artifactcache.initialize_remotes()
    
    179
    +        assert artifactcache.has_push_remotes(element=element)
    
    179 180
     
    
    180 181
             # Recreate the CasBasedDirectory object from the cached artifact
    
    181
    -        artifact_ref = cas.get_artifact_fullname(element, element_key)
    
    182
    +        artifact_ref = artifactcache.get_artifact_fullname(element, element_key)
    
    182 183
             artifact_digest = cas.resolve_ref(artifact_ref)
    
    183 184
     
    
    184 185
             queue = multiprocessing.Queue()
    

  • tests/completions/completions.py
    ... ... @@ -42,6 +42,7 @@ MAIN_OPTIONS = [
    42 42
         "-o ",
    
    43 43
         "--option ",
    
    44 44
         "--on-error ",
    
    45
    +    "--pull-build-trees ",
    
    45 46
         "--pushers ",
    
    46 47
         "--strict ",
    
    47 48
         "--verbose ",
    

  • tests/integration/build-tree.py
    ... ... @@ -70,8 +70,8 @@ def test_buildtree_pulled(cli, tmpdir, datafiles):
    70 70
             })
    
    71 71
             assert cli.get_element_state(project, element_name) != 'cached'
    
    72 72
     
    
    73
    -        # Pull from cache
    
    74
    -        result = cli.run(project=project, args=['pull', '--deps', 'all', element_name])
    
    73
    +        # Pull from cache, ensuring cli options is set to pull the buildtree
    
    74
    +        result = cli.run(project=project, args=['--pull-build-trees', 'pull', '--deps', 'all', element_name])
    
    75 75
             result.assert_success()
    
    76 76
     
    
    77 77
             # Check it's using the cached build tree
    

  • tests/integration/pullbuildtrees.py
    1
    +import os
    
    2
    +import shutil
    
    3
    +import pytest
    
    4
    +
    
    5
    +from tests.testutils import cli_integration as cli, create_artifact_share
    
    6
    +from tests.testutils.integration import assert_contains
    
    7
    +
    
    8
    +
    
    9
    +DATA_DIR = os.path.join(
    
    10
    +    os.path.dirname(os.path.realpath(__file__)),
    
    11
    +    "project"
    
    12
    +)
    
    13
    +
    
    14
    +
    
    15
    +# Remove artifact cache & set cli.config value of pullbuildtrees
    
    16
    +# to false, which is the default user context. The cache has to be
    
    17
    +# cleared as just forcefully removing the refpath leaves dangling objects.
    
    18
    +def default_state(cli, tmpdir, share):
    
    19
    +    shutil.rmtree(os.path.join(str(tmpdir), 'artifacts'))
    
    20
    +    cli.configure({
    
    21
    +        'pullbuildtrees': False,
    
    22
    +        'artifacts': {'url': share.repo, 'push': False},
    
    23
    +        'artifactdir': os.path.join(str(tmpdir), 'artifacts')
    
    24
    +    })
    
    25
    +
    
    26
    +
    
    27
    +# A test to capture the integration of the pullbuildtrees
    
    28
    +# behaviour, which by default is to not include the buildtree
    
    29
    +# directory of an element.
    
    30
    +@pytest.mark.integration
    
    31
    +@pytest.mark.datafiles(DATA_DIR)
    
    32
    +def test_pullbuildtrees(cli, tmpdir, datafiles, integration_cache):
    
    33
    +
    
    34
    +    project = os.path.join(datafiles.dirname, datafiles.basename)
    
    35
    +    element_name = 'autotools/amhello.bst'
    
    36
    +
    
    37
    +    # Create artifact shares for pull & push testing
    
    38
    +    with create_artifact_share(os.path.join(str(tmpdir), 'share1')) as share1,\
    
    39
    +        create_artifact_share(os.path.join(str(tmpdir), 'share2')) as share2:
    
    40
    +        cli.configure({
    
    41
    +            'artifacts': {'url': share1.repo, 'push': True},
    
    42
    +            'artifactdir': os.path.join(str(tmpdir), 'artifacts')
    
    43
    +        })
    
    44
    +
    
    45
    +        # Build autotools element, checked pushed, delete local
    
    46
    +        result = cli.run(project=project, args=['build', element_name])
    
    47
    +        assert result.exit_code == 0
    
    48
    +        assert cli.get_element_state(project, element_name) == 'cached'
    
    49
    +        assert share1.has_artifact('test', element_name, cli.get_element_key(project, element_name))
    
    50
    +        default_state(cli, tmpdir, share1)
    
    51
    +
    
    52
    +        # Pull artifact with default config, assert that pulling again
    
    53
    +        # doesn't create a pull job, then assert with buildtrees user
    
    54
    +        # config set creates a pull job.
    
    55
    +        result = cli.run(project=project, args=['pull', element_name])
    
    56
    +        assert element_name in result.get_pulled_elements()
    
    57
    +        result = cli.run(project=project, args=['pull', element_name])
    
    58
    +        assert element_name not in result.get_pulled_elements()
    
    59
    +        cli.configure({'pullbuildtrees': True})
    
    60
    +        result = cli.run(project=project, args=['pull', element_name])
    
    61
    +        assert element_name in result.get_pulled_elements()
    
    62
    +        default_state(cli, tmpdir, share1)
    
    63
    +
    
    64
    +        # Pull artifact with default config, then assert that pulling
    
    65
    +        # with buildtrees cli flag set creates a pull job.
    
    66
    +        # Also assert that the buildtree is added to the artifact's
    
    67
    +        # extract dir
    
    68
    +        result = cli.run(project=project, args=['pull', element_name])
    
    69
    +        assert element_name in result.get_pulled_elements()
    
    70
    +        elementdigest = share1.has_artifact('test', element_name, cli.get_element_key(project, element_name))
    
    71
    +        buildtreedir = os.path.join(str(tmpdir), 'artifacts', 'extract', 'test', 'autotools-amhello',
    
    72
    +                                    elementdigest.hash, 'buildtree')
    
    73
    +        assert not os.path.isdir(buildtreedir)
    
    74
    +        result = cli.run(project=project, args=['--pull-build-trees', 'pull', element_name])
    
    75
    +        assert element_name in result.get_pulled_elements()
    
    76
    +        assert os.path.isdir(buildtreedir)
    
    77
    +        default_state(cli, tmpdir, share1)
    
    78
    +
    
    79
    +        # Pull artifact with pullbuildtrees set in user config, then assert
    
    80
    +        # that pulling with the same user config doesn't creates a pull job,
    
    81
    +        # or when buildtrees cli flag is set.
    
    82
    +        cli.configure({'pullbuildtrees': True})
    
    83
    +        result = cli.run(project=project, args=['pull', element_name])
    
    84
    +        assert element_name in result.get_pulled_elements()
    
    85
    +        result = cli.run(project=project, args=['pull', element_name])
    
    86
    +        assert element_name not in result.get_pulled_elements()
    
    87
    +        result = cli.run(project=project, args=['--pull-build-trees', 'pull', element_name])
    
    88
    +        assert element_name not in result.get_pulled_elements()
    
    89
    +        default_state(cli, tmpdir, share1)
    
    90
    +
    
    91
    +        # Pull artifact with default config and buildtrees cli flag set, then assert
    
    92
    +        # that pulling with pullbuildtrees set in user config doesn't create a pull
    
    93
    +        # job.
    
    94
    +        result = cli.run(project=project, args=['--pull-build-trees', 'pull', element_name])
    
    95
    +        assert element_name in result.get_pulled_elements()
    
    96
    +        cli.configure({'pullbuildtrees': True})
    
    97
    +        result = cli.run(project=project, args=['pull', element_name])
    
    98
    +        assert element_name not in result.get_pulled_elements()
    
    99
    +        default_state(cli, tmpdir, share1)
    
    100
    +
    
    101
    +        # Assert that a partial build element (not containing a populated buildtree dir)
    
    102
    +        # can't be pushed to an artifact share, then assert that a complete build element
    
    103
    +        # can be. This will attempt a partial pull from share1 and then a partial push
    
    104
    +        # to share2
    
    105
    +        result = cli.run(project=project, args=['pull', element_name])
    
    106
    +        assert element_name in result.get_pulled_elements()
    
    107
    +        cli.configure({'artifacts': {'url': share2.repo, 'push': True}})
    
    108
    +        result = cli.run(project=project, args=['push', element_name])
    
    109
    +        assert element_name not in result.get_pushed_elements()
    
    110
    +        assert not share2.has_artifact('test', element_name, cli.get_element_key(project, element_name))
    
    111
    +
    
    112
    +        # Assert that after pulling the missing buildtree the element artifact can be
    
    113
    +        # successfully pushed to the remote. This will attempt to pull the buildtree
    
    114
    +        # from share1 and then a 'complete' push to share2
    
    115
    +        cli.configure({'artifacts': {'url': share1.repo, 'push': False}})
    
    116
    +        result = cli.run(project=project, args=['--pull-build-trees', 'pull', element_name])
    
    117
    +        assert element_name in result.get_pulled_elements()
    
    118
    +        cli.configure({'artifacts': {'url': share2.repo, 'push': True}})
    
    119
    +        result = cli.run(project=project, args=['push', element_name])
    
    120
    +        assert element_name in result.get_pushed_elements()
    
    121
    +        assert share2.has_artifact('test', element_name, cli.get_element_key(project, element_name))
    
    122
    +        default_state(cli, tmpdir, share1)

  • tests/testutils/artifactshare.py
    ... ... @@ -13,7 +13,7 @@ import pytest_cov
    13 13
     from buildstream import _yaml
    
    14 14
     from buildstream._artifactcache.casserver import create_server
    
    15 15
     from buildstream._context import Context
    
    16
    -from buildstream._exceptions import ArtifactError
    
    16
    +from buildstream._exceptions import CASError
    
    17 17
     from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    18 18
     
    
    19 19
     
    
    ... ... @@ -48,7 +48,7 @@ class ArtifactShare():
    48 48
             context = Context()
    
    49 49
             context.artifactdir = self.repodir
    
    50 50
     
    
    51
    -        self.cas = context.artifactcache
    
    51
    +        self.cas = context.artifactcache.cas
    
    52 52
     
    
    53 53
             self.total_space = total_space
    
    54 54
             self.free_space = free_space
    
    ... ... @@ -114,7 +114,7 @@ class ArtifactShare():
    114 114
         #    cache_key (str): The cache key
    
    115 115
         #
    
    116 116
         # Returns:
    
    117
    -    #    (bool): True if the artifact exists in the share, otherwise false.
    
    117
    +    #    (str): artifact digest if the artifact exists in the share, otherwise None.
    
    118 118
         def has_artifact(self, project_name, element_name, cache_key):
    
    119 119
     
    
    120 120
             # NOTE: This should be kept in line with our
    
    ... ... @@ -134,9 +134,9 @@ class ArtifactShare():
    134 134
     
    
    135 135
             try:
    
    136 136
                 tree = self.cas.resolve_ref(artifact_key)
    
    137
    -            return True
    
    138
    -        except ArtifactError:
    
    139
    -            return False
    
    137
    +            return tree
    
    138
    +        except CASError:
    
    139
    +            return None
    
    140 140
     
    
    141 141
         # close():
    
    142 142
         #
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]