[Notes] [Git][BuildStream/buildstream][juerg/cas] Split up artifact cache and CAS cache



Title: GitLab

Jürg Billeter pushed to branch juerg/cas at BuildStream / buildstream

Commits:

9 changed files:

Changes:

  • buildstream/_artifactcache/artifactcache.py
    ... ... @@ -17,17 +17,22 @@
    17 17
     #  Authors:
    
    18 18
     #        Tristan Maat <tristan maat codethink co uk>
    
    19 19
     
    
    20
    +import multiprocessing
    
    20 21
     import os
    
    22
    +import signal
    
    21 23
     import string
    
    22 24
     from collections import namedtuple
    
    23 25
     from collections.abc import Mapping
    
    24 26
     
    
    25 27
     from ..types import _KeyStrength
    
    26
    -from .._exceptions import ArtifactError, ImplError, LoadError, LoadErrorReason
    
    28
    +from .._exceptions import ArtifactError, CASError, LoadError, LoadErrorReason
    
    27 29
     from .._message import Message, MessageType
    
    30
    +from .. import _signals
    
    28 31
     from .. import utils
    
    29 32
     from .. import _yaml
    
    30 33
     
    
    34
    +from .cascache import CASCache, CASRemote
    
    35
    +
    
    31 36
     
    
    32 37
     CACHE_SIZE_FILE = "cache_size"
    
    33 38
     
    
    ... ... @@ -93,7 +98,8 @@ class ArtifactCache():
    93 98
         def __init__(self, context):
    
    94 99
             self.context = context
    
    95 100
             self.extractdir = os.path.join(context.artifactdir, 'extract')
    
    96
    -        self.tmpdir = os.path.join(context.artifactdir, 'tmp')
    
    101
    +
    
    102
    +        self.cas = CASCache(context.artifactdir)
    
    97 103
     
    
    98 104
             self.global_remote_specs = []
    
    99 105
             self.project_remote_specs = {}
    
    ... ... @@ -104,12 +110,15 @@ class ArtifactCache():
    104 110
             self._cache_lower_threshold = None    # The target cache size for a cleanup
    
    105 111
             self._remotes_setup = False           # Check to prevent double-setup of remotes
    
    106 112
     
    
    113
    +        # Per-project list of _CASRemote instances.
    
    114
    +        self._remotes = {}
    
    115
    +
    
    116
    +        self._has_fetch_remotes = False
    
    117
    +        self._has_push_remotes = False
    
    118
    +
    
    107 119
             os.makedirs(self.extractdir, exist_ok=True)
    
    108
    -        os.makedirs(self.tmpdir, exist_ok=True)
    
    109 120
     
    
    110
    -    ################################################
    
    111
    -    #  Methods implemented on the abstract class   #
    
    112
    -    ################################################
    
    121
    +        self._calculate_cache_quota()
    
    113 122
     
    
    114 123
         # get_artifact_fullname()
    
    115 124
         #
    
    ... ... @@ -240,8 +249,10 @@ class ArtifactCache():
    240 249
                 for key in (strong_key, weak_key):
    
    241 250
                     if key:
    
    242 251
                         try:
    
    243
    -                        self.update_mtime(element, key)
    
    244
    -                    except ArtifactError:
    
    252
    +                        ref = self.get_artifact_fullname(element, key)
    
    253
    +
    
    254
    +                        self.cas.update_mtime(ref)
    
    255
    +                    except CASError:
    
    245 256
                             pass
    
    246 257
     
    
    247 258
         # clean():
    
    ... ... @@ -252,7 +263,7 @@ class ArtifactCache():
    252 263
         #    (int): The size of the cache after having cleaned up
    
    253 264
         #
    
    254 265
         def clean(self):
    
    255
    -        artifacts = self.list_artifacts()  # pylint: disable=assignment-from-no-return
    
    266
    +        artifacts = self.list_artifacts()
    
    256 267
     
    
    257 268
             # Build a set of the cache keys which are required
    
    258 269
             # based on the required elements at cleanup time
    
    ... ... @@ -294,7 +305,7 @@ class ArtifactCache():
    294 305
                 if key not in required_artifacts:
    
    295 306
     
    
    296 307
                     # Remove the actual artifact, if it's not required.
    
    297
    -                size = self.remove(to_remove)  # pylint: disable=assignment-from-no-return
    
    308
    +                size = self.remove(to_remove)
    
    298 309
     
    
    299 310
                     # Remove the size from the removed size
    
    300 311
                     self.set_cache_size(self._cache_size - size)
    
    ... ... @@ -311,7 +322,7 @@ class ArtifactCache():
    311 322
         #    (int): The size of the artifact cache.
    
    312 323
         #
    
    313 324
         def compute_cache_size(self):
    
    314
    -        self._cache_size = self.calculate_cache_size()  # pylint: disable=assignment-from-no-return
    
    325
    +        self._cache_size = self.cas.calculate_cache_size()
    
    315 326
     
    
    316 327
             return self._cache_size
    
    317 328
     
    
    ... ... @@ -380,28 +391,12 @@ class ArtifactCache():
    380 391
         def has_quota_exceeded(self):
    
    381 392
             return self.get_cache_size() > self._cache_quota
    
    382 393
     
    
    383
    -    ################################################
    
    384
    -    # Abstract methods for subclasses to implement #
    
    385
    -    ################################################
    
    386
    -
    
    387 394
         # preflight():
    
    388 395
         #
    
    389 396
         # Preflight check.
    
    390 397
         #
    
    391 398
         def preflight(self):
    
    392
    -        pass
    
    393
    -
    
    394
    -    # update_mtime()
    
    395
    -    #
    
    396
    -    # Update the mtime of an artifact.
    
    397
    -    #
    
    398
    -    # Args:
    
    399
    -    #     element (Element): The Element to update
    
    400
    -    #     key (str): The key of the artifact.
    
    401
    -    #
    
    402
    -    def update_mtime(self, element, key):
    
    403
    -        raise ImplError("Cache '{kind}' does not implement update_mtime()"
    
    404
    -                        .format(kind=type(self).__name__))
    
    399
    +        self.cas.preflight()
    
    405 400
     
    
    406 401
         # initialize_remotes():
    
    407 402
         #
    
    ... ... @@ -411,7 +406,59 @@ class ArtifactCache():
    411 406
         #     on_failure (callable): Called if we fail to contact one of the caches.
    
    412 407
         #
    
    413 408
         def initialize_remotes(self, *, on_failure=None):
    
    414
    -        pass
    
    409
    +        remote_specs = self.global_remote_specs
    
    410
    +
    
    411
    +        for project in self.project_remote_specs:
    
    412
    +            remote_specs += self.project_remote_specs[project]
    
    413
    +
    
    414
    +        remote_specs = list(utils._deduplicate(remote_specs))
    
    415
    +
    
    416
    +        remotes = {}
    
    417
    +        q = multiprocessing.Queue()
    
    418
    +        for remote_spec in remote_specs:
    
    419
    +            # Use subprocess to avoid creation of gRPC threads in main BuildStream process
    
    420
    +            # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
    
    421
    +            p = multiprocessing.Process(target=self.cas.initialize_remote, args=(remote_spec, q))
    
    422
    +
    
    423
    +            try:
    
    424
    +                # Keep SIGINT blocked in the child process
    
    425
    +                with _signals.blocked([signal.SIGINT], ignore=False):
    
    426
    +                    p.start()
    
    427
    +
    
    428
    +                error = q.get()
    
    429
    +                p.join()
    
    430
    +            except KeyboardInterrupt:
    
    431
    +                utils._kill_process_tree(p.pid)
    
    432
    +                raise
    
    433
    +
    
    434
    +            if error and on_failure:
    
    435
    +                on_failure(remote_spec.url, error)
    
    436
    +            elif error:
    
    437
    +                raise ArtifactError(error)
    
    438
    +            else:
    
    439
    +                self._has_fetch_remotes = True
    
    440
    +                if remote_spec.push:
    
    441
    +                    self._has_push_remotes = True
    
    442
    +
    
    443
    +                remotes[remote_spec.url] = CASRemote(remote_spec)
    
    444
    +
    
    445
    +        for project in self.context.get_projects():
    
    446
    +            remote_specs = self.global_remote_specs
    
    447
    +            if project in self.project_remote_specs:
    
    448
    +                remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project]))
    
    449
    +
    
    450
    +            project_remotes = []
    
    451
    +
    
    452
    +            for remote_spec in remote_specs:
    
    453
    +                # Errors are already handled in the loop above,
    
    454
    +                # skip unreachable remotes here.
    
    455
    +                if remote_spec.url not in remotes:
    
    456
    +                    continue
    
    457
    +
    
    458
    +                remote = remotes[remote_spec.url]
    
    459
    +                project_remotes.append(remote)
    
    460
    +
    
    461
    +            self._remotes[project] = project_remotes
    
    415 462
     
    
    416 463
         # contains():
    
    417 464
         #
    
    ... ... @@ -425,8 +472,9 @@ class ArtifactCache():
    425 472
         # Returns: True if the artifact is in the cache, False otherwise
    
    426 473
         #
    
    427 474
         def contains(self, element, key):
    
    428
    -        raise ImplError("Cache '{kind}' does not implement contains()"
    
    429
    -                        .format(kind=type(self).__name__))
    
    475
    +        ref = self.get_artifact_fullname(element, key)
    
    476
    +
    
    477
    +        return self.cas.contains(ref)
    
    430 478
     
    
    431 479
         # list_artifacts():
    
    432 480
         #
    
    ... ... @@ -437,8 +485,7 @@ class ArtifactCache():
    437 485
         #               `ArtifactCache.get_artifact_fullname` in LRU order
    
    438 486
         #
    
    439 487
         def list_artifacts(self):
    
    440
    -        raise ImplError("Cache '{kind}' does not implement list_artifacts()"
    
    441
    -                        .format(kind=type(self).__name__))
    
    488
    +        return self.cas.list_refs()
    
    442 489
     
    
    443 490
         # remove():
    
    444 491
         #
    
    ... ... @@ -450,9 +497,31 @@ class ArtifactCache():
    450 497
         #                          generated by
    
    451 498
         #                          `ArtifactCache.get_artifact_fullname`)
    
    452 499
         #
    
    453
    -    def remove(self, artifact_name):
    
    454
    -        raise ImplError("Cache '{kind}' does not implement remove()"
    
    455
    -                        .format(kind=type(self).__name__))
    
    500
    +    # Returns:
    
    501
    +    #    (int|None) The amount of space pruned from the repository in
    
    502
    +    #               Bytes, or None if defer_prune is True
    
    503
    +    #
    
    504
    +    def remove(self, ref):
    
    505
    +
    
    506
    +        # Remove extract if not used by other ref
    
    507
    +        tree = self.cas.resolve_ref(ref)
    
    508
    +        ref_name, ref_hash = os.path.split(ref)
    
    509
    +        extract = os.path.join(self.extractdir, ref_name, tree.hash)
    
    510
    +        keys_file = os.path.join(extract, 'meta', 'keys.yaml')
    
    511
    +        if os.path.exists(keys_file):
    
    512
    +            keys_meta = _yaml.load(keys_file)
    
    513
    +            keys = [keys_meta['strong'], keys_meta['weak']]
    
    514
    +            remove_extract = True
    
    515
    +            for other_hash in keys:
    
    516
    +                if other_hash == ref_hash:
    
    517
    +                    continue
    
    518
    +                remove_extract = False
    
    519
    +                break
    
    520
    +
    
    521
    +            if remove_extract:
    
    522
    +                utils._force_rmtree(extract)
    
    523
    +
    
    524
    +        return self.cas.remove(ref)
    
    456 525
     
    
    457 526
         # extract():
    
    458 527
         #
    
    ... ... @@ -472,8 +541,11 @@ class ArtifactCache():
    472 541
         # Returns: path to extracted artifact
    
    473 542
         #
    
    474 543
         def extract(self, element, key):
    
    475
    -        raise ImplError("Cache '{kind}' does not implement extract()"
    
    476
    -                        .format(kind=type(self).__name__))
    
    544
    +        ref = self.get_artifact_fullname(element, key)
    
    545
    +
    
    546
    +        path = os.path.join(self.extractdir, element._get_project().name, element.normal_name)
    
    547
    +
    
    548
    +        return self.cas.extract(ref, path)
    
    477 549
     
    
    478 550
         # commit():
    
    479 551
         #
    
    ... ... @@ -485,8 +557,9 @@ class ArtifactCache():
    485 557
         #     keys (list): The cache keys to use
    
    486 558
         #
    
    487 559
         def commit(self, element, content, keys):
    
    488
    -        raise ImplError("Cache '{kind}' does not implement commit()"
    
    489
    -                        .format(kind=type(self).__name__))
    
    560
    +        refs = [self.get_artifact_fullname(element, key) for key in keys]
    
    561
    +
    
    562
    +        self.cas.commit(refs, content)
    
    490 563
     
    
    491 564
         # diff():
    
    492 565
         #
    
    ... ... @@ -500,8 +573,10 @@ class ArtifactCache():
    500 573
         #     subdir (str): A subdirectory to limit the comparison to
    
    501 574
         #
    
    502 575
         def diff(self, element, key_a, key_b, *, subdir=None):
    
    503
    -        raise ImplError("Cache '{kind}' does not implement diff()"
    
    504
    -                        .format(kind=type(self).__name__))
    
    576
    +        ref_a = self.get_artifact_fullname(element, key_a)
    
    577
    +        ref_b = self.get_artifact_fullname(element, key_b)
    
    578
    +
    
    579
    +        return self.cas.diff(ref_a, ref_b, subdir=subdir)
    
    505 580
     
    
    506 581
         # has_fetch_remotes():
    
    507 582
         #
    
    ... ... @@ -513,7 +588,16 @@ class ArtifactCache():
    513 588
         # Returns: True if any remote repositories are configured, False otherwise
    
    514 589
         #
    
    515 590
         def has_fetch_remotes(self, *, element=None):
    
    516
    -        return False
    
    591
    +        if not self._has_fetch_remotes:
    
    592
    +            # No project has fetch remotes
    
    593
    +            return False
    
    594
    +        elif element is None:
    
    595
    +            # At least one (sub)project has fetch remotes
    
    596
    +            return True
    
    597
    +        else:
    
    598
    +            # Check whether the specified element's project has fetch remotes
    
    599
    +            remotes_for_project = self._remotes[element._get_project()]
    
    600
    +            return bool(remotes_for_project)
    
    517 601
     
    
    518 602
         # has_push_remotes():
    
    519 603
         #
    
    ... ... @@ -525,7 +609,16 @@ class ArtifactCache():
    525 609
         # Returns: True if any remote repository is configured, False otherwise
    
    526 610
         #
    
    527 611
         def has_push_remotes(self, *, element=None):
    
    528
    -        return False
    
    612
    +        if not self._has_push_remotes:
    
    613
    +            # No project has push remotes
    
    614
    +            return False
    
    615
    +        elif element is None:
    
    616
    +            # At least one (sub)project has push remotes
    
    617
    +            return True
    
    618
    +        else:
    
    619
    +            # Check whether the specified element's project has push remotes
    
    620
    +            remotes_for_project = self._remotes[element._get_project()]
    
    621
    +            return any(remote.spec.push for remote in remotes_for_project)
    
    529 622
     
    
    530 623
         # push():
    
    531 624
         #
    
    ... ... @@ -542,8 +635,28 @@ class ArtifactCache():
    542 635
         #   (ArtifactError): if there was an error
    
    543 636
         #
    
    544 637
         def push(self, element, keys):
    
    545
    -        raise ImplError("Cache '{kind}' does not implement push()"
    
    546
    -                        .format(kind=type(self).__name__))
    
    638
    +        refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
    
    639
    +
    
    640
    +        project = element._get_project()
    
    641
    +
    
    642
    +        push_remotes = [r for r in self._remotes[project] if r.spec.push]
    
    643
    +
    
    644
    +        pushed = False
    
    645
    +
    
    646
    +        for remote in push_remotes:
    
    647
    +            remote.init()
    
    648
    +            display_key = element._get_brief_display_key()
    
    649
    +            element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
    
    650
    +
    
    651
    +            if self.cas.push(refs, remote):
    
    652
    +                element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
    
    653
    +                pushed = True
    
    654
    +            else:
    
    655
    +                element.info("Remote ({}) already has {} cached".format(
    
    656
    +                    remote.spec.url, element._get_brief_display_key()
    
    657
    +                ))
    
    658
    +
    
    659
    +        return pushed
    
    547 660
     
    
    548 661
         # pull():
    
    549 662
         #
    
    ... ... @@ -558,8 +671,130 @@ class ArtifactCache():
    558 671
         #   (bool): True if pull was successful, False if artifact was not available
    
    559 672
         #
    
    560 673
         def pull(self, element, key, *, progress=None):
    
    561
    -        raise ImplError("Cache '{kind}' does not implement pull()"
    
    562
    -                        .format(kind=type(self).__name__))
    
    674
    +        ref = self.get_artifact_fullname(element, key)
    
    675
    +
    
    676
    +        project = element._get_project()
    
    677
    +
    
    678
    +        for remote in self._remotes[project]:
    
    679
    +            try:
    
    680
    +                display_key = element._get_brief_display_key()
    
    681
    +                element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url))
    
    682
    +
    
    683
    +                if self.cas.pull(ref, remote, progress=progress):
    
    684
    +                    element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
    
    685
    +                    # no need to pull from additional remotes
    
    686
    +                    return True
    
    687
    +                else:
    
    688
    +                    element.info("Remote ({}) does not have {} cached".format(
    
    689
    +                        remote.spec.url, element._get_brief_display_key()
    
    690
    +                    ))
    
    691
    +
    
    692
    +            except CASError as e:
    
    693
    +                raise ArtifactError("Failed to pull artifact {}: {}".format(
    
    694
    +                    element._get_brief_display_key(), e)) from e
    
    695
    +
    
    696
    +        return False
    
    697
    +
    
    698
    +    # pull_tree():
    
    699
    +    #
    
    700
    +    # Pull a single Tree rather than an artifact.
    
    701
    +    # Does not update local refs.
    
    702
    +    #
    
    703
    +    # Args:
    
    704
    +    #     project (Project): The current project
    
    705
    +    #     digest (Digest): The digest of the tree
    
    706
    +    #
    
    707
    +    def pull_tree(self, project, digest):
    
    708
    +        for remote in self._remotes[project]:
    
    709
    +            digest = self.cas.pull_tree(remote, digest)
    
    710
    +
    
    711
    +            if digest:
    
    712
    +                # no need to pull from additional remotes
    
    713
    +                return digest
    
    714
    +
    
    715
    +        return None
    
    716
    +
    
    717
    +    # push_directory():
    
    718
    +    #
    
    719
    +    # Push the given virtual directory to all remotes.
    
    720
    +    #
    
    721
    +    # Args:
    
    722
    +    #     project (Project): The current project
    
    723
    +    #     directory (Directory): A virtual directory object to push.
    
    724
    +    #
    
    725
    +    # Raises:
    
    726
    +    #     (ArtifactError): if there was an error
    
    727
    +    #
    
    728
    +    def push_directory(self, project, directory):
    
    729
    +        if self._has_push_remotes:
    
    730
    +            push_remotes = [r for r in self._remotes[project] if r.spec.push]
    
    731
    +        else:
    
    732
    +            push_remotes = []
    
    733
    +
    
    734
    +        if not push_remotes:
    
    735
    +            raise ArtifactError("push_directory was called, but no remote artifact " +
    
    736
    +                                "servers are configured as push remotes.")
    
    737
    +
    
    738
    +        if directory.ref is None:
    
    739
    +            return
    
    740
    +
    
    741
    +        for remote in push_remotes:
    
    742
    +            self.cas.push_directory(remote, directory)
    
    743
    +
    
    744
    +    # push_message():
    
    745
    +    #
    
    746
    +    # Push the given protobuf message to all remotes.
    
    747
    +    #
    
    748
    +    # Args:
    
    749
    +    #     project (Project): The current project
    
    750
    +    #     message (Message): A protobuf message to push.
    
    751
    +    #
    
    752
    +    # Raises:
    
    753
    +    #     (ArtifactError): if there was an error
    
    754
    +    #
    
    755
    +    def push_message(self, project, message):
    
    756
    +
    
    757
    +        if self._has_push_remotes:
    
    758
    +            push_remotes = [r for r in self._remotes[project] if r.spec.push]
    
    759
    +        else:
    
    760
    +            push_remotes = []
    
    761
    +
    
    762
    +        if not push_remotes:
    
    763
    +            raise ArtifactError("push_message was called, but no remote artifact " +
    
    764
    +                                "servers are configured as push remotes.")
    
    765
    +
    
    766
    +        for remote in push_remotes:
    
    767
    +            message_digest = self.cas.push_message(remote, message)
    
    768
    +
    
    769
    +        return message_digest
    
    770
    +
    
    771
    +    # verify_digest_pushed():
    
    772
    +    #
    
    773
    +    # Check whether the object is already on the server in which case
    
    774
    +    # there is no need to upload it.
    
    775
    +    #
    
    776
    +    # Args:
    
    777
    +    #     project (Project): The current project
    
    778
    +    #     digest (Digest): The object digest.
    
    779
    +    #
    
    780
    +    def verify_digest_pushed(self, project, digest):
    
    781
    +
    
    782
    +        if self._has_push_remotes:
    
    783
    +            push_remotes = [r for r in self._remotes[project] if r.spec.push]
    
    784
    +        else:
    
    785
    +            push_remotes = []
    
    786
    +
    
    787
    +        if not push_remotes:
    
    788
    +            raise ArtifactError("verify_digest_pushed was called, but no remote artifact " +
    
    789
    +                                "servers are configured as push remotes.")
    
    790
    +
    
    791
    +        pushed = False
    
    792
    +
    
    793
    +        for remote in push_remotes:
    
    794
    +            if self.cas.verify_digest_on_remote(remote, digest):
    
    795
    +                pushed = True
    
    796
    +
    
    797
    +        return pushed
    
    563 798
     
    
    564 799
         # link_key():
    
    565 800
         #
    
    ... ... @@ -571,19 +806,10 @@ class ArtifactCache():
    571 806
         #     newkey (str): A new cache key for the artifact
    
    572 807
         #
    
    573 808
         def link_key(self, element, oldkey, newkey):
    
    574
    -        raise ImplError("Cache '{kind}' does not implement link_key()"
    
    575
    -                        .format(kind=type(self).__name__))
    
    809
    +        oldref = self.get_artifact_fullname(element, oldkey)
    
    810
    +        newref = self.get_artifact_fullname(element, newkey)
    
    576 811
     
    
    577
    -    # calculate_cache_size()
    
    578
    -    #
    
    579
    -    # Return the real artifact cache size.
    
    580
    -    #
    
    581
    -    # Returns:
    
    582
    -    #    (int): The size of the artifact cache.
    
    583
    -    #
    
    584
    -    def calculate_cache_size(self):
    
    585
    -        raise ImplError("Cache '{kind}' does not implement calculate_cache_size()"
    
    586
    -                        .format(kind=type(self).__name__))
    
    812
    +        self.cas.link_ref(oldref, newref)
    
    587 813
     
    
    588 814
         ################################################
    
    589 815
         #               Local Private Methods          #
    

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -20,9 +20,7 @@
    20 20
     import hashlib
    
    21 21
     import itertools
    
    22 22
     import io
    
    23
    -import multiprocessing
    
    24 23
     import os
    
    25
    -import signal
    
    26 24
     import stat
    
    27 25
     import tempfile
    
    28 26
     import uuid
    
    ... ... @@ -31,17 +29,13 @@ from urllib.parse import urlparse
    31 29
     
    
    32 30
     import grpc
    
    33 31
     
    
    34
    -from .. import _yaml
    
    35
    -
    
    36 32
     from .._protos.google.rpc import code_pb2
    
    37 33
     from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    38 34
     from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    39 35
     from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
    
    40 36
     
    
    41
    -from .. import _signals, utils
    
    42
    -from .._exceptions import ArtifactError
    
    43
    -
    
    44
    -from . import ArtifactCache
    
    37
    +from .. import utils
    
    38
    +from .._exceptions import CASError
    
    45 39
     
    
    46 40
     
    
    47 41
     # The default limit for gRPC messages is 4 MiB.
    
    ... ... @@ -49,62 +43,68 @@ from . import ArtifactCache
    49 43
     _MAX_PAYLOAD_BYTES = 1024 * 1024
    
    50 44
     
    
    51 45
     
    
    52
    -# A CASCache manages artifacts in a CAS repository as specified in the
    
    53
    -# Remote Execution API.
    
    46
    +# A CASCache manages a CAS repository as specified in the Remote Execution API.
    
    54 47
     #
    
    55 48
     # Args:
    
    56
    -#     context (Context): The BuildStream context
    
    57
    -#
    
    58
    -# Pushing is explicitly disabled by the platform in some cases,
    
    59
    -# like when we are falling back to functioning without using
    
    60
    -# user namespaces.
    
    49
    +#     path (str): The root directory for the CAS repository
    
    61 50
     #
    
    62
    -class CASCache(ArtifactCache):
    
    51
    +class CASCache():
    
    63 52
     
    
    64
    -    def __init__(self, context):
    
    65
    -        super().__init__(context)
    
    66
    -
    
    67
    -        self.casdir = os.path.join(context.artifactdir, 'cas')
    
    53
    +    def __init__(self, path):
    
    54
    +        self.casdir = os.path.join(path, 'cas')
    
    55
    +        self.tmpdir = os.path.join(path, 'tmp')
    
    68 56
             os.makedirs(os.path.join(self.casdir, 'refs', 'heads'), exist_ok=True)
    
    69 57
             os.makedirs(os.path.join(self.casdir, 'objects'), exist_ok=True)
    
    58
    +        os.makedirs(self.tmpdir, exist_ok=True)
    
    70 59
     
    
    71
    -        self._calculate_cache_quota()
    
    72
    -
    
    73
    -        # Per-project list of _CASRemote instances.
    
    74
    -        self._remotes = {}
    
    75
    -
    
    76
    -        self._has_fetch_remotes = False
    
    77
    -        self._has_push_remotes = False
    
    78
    -
    
    79
    -    ################################################
    
    80
    -    #     Implementation of abstract methods       #
    
    81
    -    ################################################
    
    82
    -
    
    60
    +    # preflight():
    
    61
    +    #
    
    62
    +    # Preflight check.
    
    63
    +    #
    
    83 64
         def preflight(self):
    
    84 65
             headdir = os.path.join(self.casdir, 'refs', 'heads')
    
    85 66
             objdir = os.path.join(self.casdir, 'objects')
    
    86 67
             if not (os.path.isdir(headdir) and os.path.isdir(objdir)):
    
    87
    -            raise ArtifactError("CAS repository check failed for '{}'"
    
    88
    -                                .format(self.casdir))
    
    68
    +            raise CASError("CAS repository check failed for '{}'".format(self.casdir))
    
    89 69
     
    
    90
    -    def contains(self, element, key):
    
    91
    -        refpath = self._refpath(self.get_artifact_fullname(element, key))
    
    70
    +    # contains():
    
    71
    +    #
    
    72
    +    # Check whether the specified ref is already available in the local CAS cache.
    
    73
    +    #
    
    74
    +    # Args:
    
    75
    +    #     ref (str): The ref to check
    
    76
    +    #
    
    77
    +    # Returns: True if the ref is in the cache, False otherwise
    
    78
    +    #
    
    79
    +    def contains(self, ref):
    
    80
    +        refpath = self._refpath(ref)
    
    92 81
     
    
    93 82
             # This assumes that the repository doesn't have any dangling pointers
    
    94 83
             return os.path.exists(refpath)
    
    95 84
     
    
    96
    -    def extract(self, element, key):
    
    97
    -        ref = self.get_artifact_fullname(element, key)
    
    98
    -
    
    85
    +    # extract():
    
    86
    +    #
    
    87
    +    # Extract cached directory for the specified ref if it hasn't
    
    88
    +    # already been extracted.
    
    89
    +    #
    
    90
    +    # Args:
    
    91
    +    #     ref (str): The ref whose directory to extract
    
    92
    +    #     path (str): The destination path
    
    93
    +    #
    
    94
    +    # Raises:
    
    95
    +    #     CASError: In cases there was an OSError, or if the ref did not exist.
    
    96
    +    #
    
    97
    +    # Returns: path to extracted directory
    
    98
    +    #
    
    99
    +    def extract(self, ref, path):
    
    99 100
             tree = self.resolve_ref(ref, update_mtime=True)
    
    100 101
     
    
    101
    -        dest = os.path.join(self.extractdir, element._get_project().name,
    
    102
    -                            element.normal_name, tree.hash)
    
    102
    +        dest = os.path.join(path, tree.hash)
    
    103 103
             if os.path.isdir(dest):
    
    104
    -            # artifact has already been extracted
    
    104
    +            # directory has already been extracted
    
    105 105
                 return dest
    
    106 106
     
    
    107
    -        with tempfile.TemporaryDirectory(prefix='tmp', dir=self.extractdir) as tmpdir:
    
    107
    +        with tempfile.TemporaryDirectory(prefix='tmp', dir=self.tmpdir) as tmpdir:
    
    108 108
                 checkoutdir = os.path.join(tmpdir, ref)
    
    109 109
                 self._checkout(checkoutdir, tree)
    
    110 110
     
    
    ... ... @@ -118,23 +118,35 @@ class CASCache(ArtifactCache):
    118 118
                     # If rename fails with these errors, another process beat
    
    119 119
                     # us to it so just ignore.
    
    120 120
                     if e.errno not in [errno.ENOTEMPTY, errno.EEXIST]:
    
    121
    -                    raise ArtifactError("Failed to extract artifact for ref '{}': {}"
    
    122
    -                                        .format(ref, e)) from e
    
    121
    +                    raise CASError("Failed to extract directory for ref '{}': {}".format(ref, e)) from e
    
    123 122
     
    
    124 123
             return dest
    
    125 124
     
    
    126
    -    def commit(self, element, content, keys):
    
    127
    -        refs = [self.get_artifact_fullname(element, key) for key in keys]
    
    128
    -
    
    129
    -        tree = self._commit_directory(content)
    
    125
    +    # commit():
    
    126
    +    #
    
    127
    +    # Commit directory to cache.
    
    128
    +    #
    
    129
    +    # Args:
    
    130
    +    #     refs (list): The refs to set
    
    131
    +    #     path (str): The directory to import
    
    132
    +    #
    
    133
    +    def commit(self, refs, path):
    
    134
    +        tree = self._commit_directory(path)
    
    130 135
     
    
    131 136
             for ref in refs:
    
    132 137
                 self.set_ref(ref, tree)
    
    133 138
     
    
    134
    -    def diff(self, element, key_a, key_b, *, subdir=None):
    
    135
    -        ref_a = self.get_artifact_fullname(element, key_a)
    
    136
    -        ref_b = self.get_artifact_fullname(element, key_b)
    
    137
    -
    
    139
    +    # diff():
    
    140
    +    #
    
    141
    +    # Return a list of files that have been added or modified between
    
    142
    +    # the refs described by ref_a and ref_b.
    
    143
    +    #
    
    144
    +    # Args:
    
    145
    +    #     ref_a (str): The first ref
    
    146
    +    #     ref_b (str): The second ref
    
    147
    +    #     subdir (str): A subdirectory to limit the comparison to
    
    148
    +    #
    
    149
    +    def diff(self, ref_a, ref_b, *, subdir=None):
    
    138 150
             tree_a = self.resolve_ref(ref_a)
    
    139 151
             tree_b = self.resolve_ref(ref_b)
    
    140 152
     
    
    ... ... @@ -150,158 +162,122 @@ class CASCache(ArtifactCache):
    150 162
     
    
    151 163
             return modified, removed, added
    
    152 164
     
    
    153
    -    def initialize_remotes(self, *, on_failure=None):
    
    154
    -        remote_specs = self.global_remote_specs
    
    155
    -
    
    156
    -        for project in self.project_remote_specs:
    
    157
    -            remote_specs += self.project_remote_specs[project]
    
    158
    -
    
    159
    -        remote_specs = list(utils._deduplicate(remote_specs))
    
    160
    -
    
    161
    -        remotes = {}
    
    162
    -        q = multiprocessing.Queue()
    
    163
    -        for remote_spec in remote_specs:
    
    164
    -            # Use subprocess to avoid creation of gRPC threads in main BuildStream process
    
    165
    -            # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
    
    166
    -            p = multiprocessing.Process(target=self._initialize_remote, args=(remote_spec, q))
    
    165
    +    def initialize_remote(self, remote_spec, q):
    
    166
    +        try:
    
    167
    +            remote = CASRemote(remote_spec)
    
    168
    +            remote.init()
    
    167 169
     
    
    168
    -            try:
    
    169
    -                # Keep SIGINT blocked in the child process
    
    170
    -                with _signals.blocked([signal.SIGINT], ignore=False):
    
    171
    -                    p.start()
    
    172
    -
    
    173
    -                error = q.get()
    
    174
    -                p.join()
    
    175
    -            except KeyboardInterrupt:
    
    176
    -                utils._kill_process_tree(p.pid)
    
    177
    -                raise
    
    170
    +            request = buildstream_pb2.StatusRequest()
    
    171
    +            response = remote.ref_storage.Status(request)
    
    178 172
     
    
    179
    -            if error and on_failure:
    
    180
    -                on_failure(remote_spec.url, error)
    
    181
    -            elif error:
    
    182
    -                raise ArtifactError(error)
    
    173
    +            if remote_spec.push and not response.allow_updates:
    
    174
    +                q.put('CAS server does not allow push')
    
    183 175
                 else:
    
    184
    -                self._has_fetch_remotes = True
    
    185
    -                if remote_spec.push:
    
    186
    -                    self._has_push_remotes = True
    
    176
    +                # No error
    
    177
    +                q.put(None)
    
    187 178
     
    
    188
    -                remotes[remote_spec.url] = _CASRemote(remote_spec)
    
    179
    +        except grpc.RpcError as e:
    
    180
    +            # str(e) is too verbose for errors reported to the user
    
    181
    +            q.put(e.details())
    
    189 182
     
    
    190
    -        for project in self.context.get_projects():
    
    191
    -            remote_specs = self.global_remote_specs
    
    192
    -            if project in self.project_remote_specs:
    
    193
    -                remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project]))
    
    183
    +        except Exception as e:               # pylint: disable=broad-except
    
    184
    +            # Whatever happens, we need to return it to the calling process
    
    185
    +            #
    
    186
    +            q.put(str(e))
    
    194 187
     
    
    195
    -            project_remotes = []
    
    188
    +    # pull():
    
    189
    +    #
    
    190
    +    # Pull a ref from a remote repository.
    
    191
    +    #
    
    192
    +    # Args:
    
    193
    +    #     ref (str): The ref to pull
    
    194
    +    #     remote (CASRemote): The remote repository to pull from
    
    195
    +    #     progress (callable): The progress callback, if any
    
    196
    +    #
    
    197
    +    # Returns:
    
    198
    +    #   (bool): True if pull was successful, False if ref was not available
    
    199
    +    #
    
    200
    +    def pull(self, ref, remote, *, progress=None):
    
    201
    +        try:
    
    202
    +            remote.init()
    
    196 203
     
    
    197
    -            for remote_spec in remote_specs:
    
    198
    -                # Errors are already handled in the loop above,
    
    199
    -                # skip unreachable remotes here.
    
    200
    -                if remote_spec.url not in remotes:
    
    201
    -                    continue
    
    204
    +            request = buildstream_pb2.GetReferenceRequest()
    
    205
    +            request.key = ref
    
    206
    +            response = remote.ref_storage.GetReference(request)
    
    202 207
     
    
    203
    -                remote = remotes[remote_spec.url]
    
    204
    -                project_remotes.append(remote)
    
    208
    +            tree = remote_execution_pb2.Digest()
    
    209
    +            tree.hash = response.digest.hash
    
    210
    +            tree.size_bytes = response.digest.size_bytes
    
    205 211
     
    
    206
    -            self._remotes[project] = project_remotes
    
    212
    +            self._fetch_directory(remote, tree)
    
    207 213
     
    
    208
    -    def has_fetch_remotes(self, *, element=None):
    
    209
    -        if not self._has_fetch_remotes:
    
    210
    -            # No project has fetch remotes
    
    211
    -            return False
    
    212
    -        elif element is None:
    
    213
    -            # At least one (sub)project has fetch remotes
    
    214
    -            return True
    
    215
    -        else:
    
    216
    -            # Check whether the specified element's project has fetch remotes
    
    217
    -            remotes_for_project = self._remotes[element._get_project()]
    
    218
    -            return bool(remotes_for_project)
    
    214
    +            self.set_ref(ref, tree)
    
    219 215
     
    
    220
    -    def has_push_remotes(self, *, element=None):
    
    221
    -        if not self._has_push_remotes:
    
    222
    -            # No project has push remotes
    
    223
    -            return False
    
    224
    -        elif element is None:
    
    225
    -            # At least one (sub)project has push remotes
    
    226 216
                 return True
    
    227
    -        else:
    
    228
    -            # Check whether the specified element's project has push remotes
    
    229
    -            remotes_for_project = self._remotes[element._get_project()]
    
    230
    -            return any(remote.spec.push for remote in remotes_for_project)
    
    231
    -
    
    232
    -    def pull(self, element, key, *, progress=None):
    
    233
    -        ref = self.get_artifact_fullname(element, key)
    
    234
    -
    
    235
    -        project = element._get_project()
    
    236
    -
    
    237
    -        for remote in self._remotes[project]:
    
    238
    -            try:
    
    239
    -                remote.init()
    
    240
    -                display_key = element._get_brief_display_key()
    
    241
    -                element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url))
    
    242
    -
    
    243
    -                request = buildstream_pb2.GetReferenceRequest()
    
    244
    -                request.key = ref
    
    245
    -                response = remote.ref_storage.GetReference(request)
    
    246
    -
    
    247
    -                tree = remote_execution_pb2.Digest()
    
    248
    -                tree.hash = response.digest.hash
    
    249
    -                tree.size_bytes = response.digest.size_bytes
    
    250
    -
    
    251
    -                self._fetch_directory(remote, tree)
    
    252
    -
    
    253
    -                self.set_ref(ref, tree)
    
    254
    -
    
    255
    -                element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
    
    256
    -                # no need to pull from additional remotes
    
    257
    -                return True
    
    258
    -
    
    259
    -            except grpc.RpcError as e:
    
    260
    -                if e.code() != grpc.StatusCode.NOT_FOUND:
    
    261
    -                    raise ArtifactError("Failed to pull artifact {}: {}".format(
    
    262
    -                        element._get_brief_display_key(), e)) from e
    
    263
    -                else:
    
    264
    -                    element.info("Remote ({}) does not have {} cached".format(
    
    265
    -                        remote.spec.url, element._get_brief_display_key()
    
    266
    -                    ))
    
    267
    -
    
    268
    -        return False
    
    269
    -
    
    270
    -    def pull_tree(self, project, digest):
    
    271
    -        """ Pull a single Tree rather than an artifact.
    
    272
    -        Does not update local refs. """
    
    217
    +        except grpc.RpcError as e:
    
    218
    +            if e.code() != grpc.StatusCode.NOT_FOUND:
    
    219
    +                raise CASError("Failed to pull ref {}: {}".format(ref, e)) from e
    
    220
    +            else:
    
    221
    +                return False
    
    273 222
     
    
    274
    -        for remote in self._remotes[project]:
    
    275
    -            try:
    
    276
    -                remote.init()
    
    223
    +    # pull_tree():
    
    224
    +    #
    
    225
    +    # Pull a single Tree rather than a ref.
    
    226
    +    # Does not update local refs.
    
    227
    +    #
    
    228
    +    # Args:
    
    229
    +    #     remote (CASRemote): The remote to pull from
    
    230
    +    #     digest (Digest): The digest of the tree
    
    231
    +    #
    
    232
    +    def pull_tree(self, remote, digest):
    
    233
    +        try:
    
    234
    +            remote.init()
    
    277 235
     
    
    278
    -                digest = self._fetch_tree(remote, digest)
    
    236
    +            digest = self._fetch_tree(remote, digest)
    
    279 237
     
    
    280
    -                # no need to pull from additional remotes
    
    281
    -                return digest
    
    238
    +            return digest
    
    282 239
     
    
    283
    -            except grpc.RpcError as e:
    
    284
    -                if e.code() != grpc.StatusCode.NOT_FOUND:
    
    285
    -                    raise
    
    240
    +        except grpc.RpcError as e:
    
    241
    +            if e.code() != grpc.StatusCode.NOT_FOUND:
    
    242
    +                raise
    
    286 243
     
    
    287 244
             return None
    
    288 245
     
    
    289
    -    def link_key(self, element, oldkey, newkey):
    
    290
    -        oldref = self.get_artifact_fullname(element, oldkey)
    
    291
    -        newref = self.get_artifact_fullname(element, newkey)
    
    292
    -
    
    246
    +    # link_ref():
    
    247
    +    #
    
    248
    +    # Add an alias for an existing ref.
    
    249
    +    #
    
    250
    +    # Args:
    
    251
    +    #     oldref (str): An existing ref
    
    252
    +    #     newref (str): A new ref for the same directory
    
    253
    +    #
    
    254
    +    def link_ref(self, oldref, newref):
    
    293 255
             tree = self.resolve_ref(oldref)
    
    294 256
     
    
    295 257
             self.set_ref(newref, tree)
    
    296 258
     
    
    297
    -    def _push_refs_to_remote(self, refs, remote):
    
    259
    +    # push():
    
    260
    +    #
    
    261
    +    # Push committed refs to remote repository.
    
    262
    +    #
    
    263
    +    # Args:
    
    264
    +    #     refs (list): The refs to push
    
    265
    +    #     remote (CASRemote): The remote to push to
    
    266
    +    #
    
    267
    +    # Returns:
    
    268
    +    #   (bool): True if any remote was updated, False if no pushes were required
    
    269
    +    #
    
    270
    +    # Raises:
    
    271
    +    #   (CASError): if there was an error
    
    272
    +    #
    
    273
    +    def push(self, refs, remote):
    
    298 274
             skipped_remote = True
    
    299 275
             try:
    
    300 276
                 for ref in refs:
    
    301 277
                     tree = self.resolve_ref(ref)
    
    302 278
     
    
    303 279
                     # Check whether ref is already on the server in which case
    
    304
    -                # there is no need to push the artifact
    
    280
    +                # there is no need to push the ref
    
    305 281
                     try:
    
    306 282
                         request = buildstream_pb2.GetReferenceRequest()
    
    307 283
                         request.key = ref
    
    ... ... @@ -327,65 +303,38 @@ class CASCache(ArtifactCache):
    327 303
                     skipped_remote = False
    
    328 304
             except grpc.RpcError as e:
    
    329 305
                 if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
    
    330
    -                raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
    
    306
    +                raise CASError("Failed to push ref {}: {}".format(refs, e), temporary=True) from e
    
    331 307
     
    
    332 308
             return not skipped_remote
    
    333 309
     
    
    334
    -    def push(self, element, keys):
    
    335
    -
    
    336
    -        refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
    
    337
    -
    
    338
    -        project = element._get_project()
    
    339
    -
    
    340
    -        push_remotes = [r for r in self._remotes[project] if r.spec.push]
    
    341
    -
    
    342
    -        pushed = False
    
    343
    -
    
    344
    -        for remote in push_remotes:
    
    345
    -            remote.init()
    
    346
    -            display_key = element._get_brief_display_key()
    
    347
    -            element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
    
    348
    -
    
    349
    -            if self._push_refs_to_remote(refs, remote):
    
    350
    -                element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
    
    351
    -                pushed = True
    
    352
    -            else:
    
    353
    -                element.info("Remote ({}) already has {} cached".format(
    
    354
    -                    remote.spec.url, element._get_brief_display_key()
    
    355
    -                ))
    
    356
    -
    
    357
    -        return pushed
    
    358
    -
    
    359
    -    def push_directory(self, project, directory):
    
    360
    -        """ Push the given virtual directory to all remotes.
    
    361
    -
    
    362
    -        Args:
    
    363
    -            project (Project): The current project
    
    364
    -            directory (Directory): A virtual directory object to push.
    
    365
    -
    
    366
    -        Raises: ArtifactError if no push remotes are configured.
    
    367
    -        """
    
    368
    -
    
    369
    -        if self._has_push_remotes:
    
    370
    -            push_remotes = [r for r in self._remotes[project] if r.spec.push]
    
    371
    -        else:
    
    372
    -            push_remotes = []
    
    373
    -
    
    374
    -        if not push_remotes:
    
    375
    -            raise ArtifactError("CASCache: push_directory was called, but no remote artifact " +
    
    376
    -                                "servers are configured as push remotes.")
    
    377
    -
    
    378
    -        if directory.ref is None:
    
    379
    -            return
    
    380
    -
    
    381
    -        for remote in push_remotes:
    
    382
    -            remote.init()
    
    383
    -
    
    384
    -            self._send_directory(remote, directory.ref)
    
    310
    +    # push_directory():
    
    311
    +    #
    
    312
    +    # Push the given virtual directory to a remote.
    
    313
    +    #
    
    314
    +    # Args:
    
    315
    +    #     remote (CASRemote): The remote to push to
    
    316
    +    #     directory (Directory): A virtual directory object to push.
    
    317
    +    #
    
    318
    +    # Raises:
    
    319
    +    #     (CASError): if there was an error
    
    320
    +    #
    
    321
    +    def push_directory(self, remote, directory):
    
    322
    +        remote.init()
    
    385 323
     
    
    386
    -    def push_message(self, project, message):
    
    324
    +        self._send_directory(remote, directory.ref)
    
    387 325
     
    
    388
    -        push_remotes = [r for r in self._remotes[project] if r.spec.push]
    
    326
    +    # push_message():
    
    327
    +    #
    
    328
    +    # Push the given protobuf message to a remote.
    
    329
    +    #
    
    330
    +    # Args:
    
    331
    +    #     remote (CASRemote): The remote to push to
    
    332
    +    #     message (Message): A protobuf message to push.
    
    333
    +    #
    
    334
    +    # Raises:
    
    335
    +    #     (CASError): if there was an error
    
    336
    +    #
    
    337
    +    def push_message(self, remote, message):
    
    389 338
     
    
    390 339
             message_buffer = message.SerializeToString()
    
    391 340
             message_sha = hashlib.sha256(message_buffer)
    
    ... ... @@ -393,17 +342,25 @@ class CASCache(ArtifactCache):
    393 342
             message_digest.hash = message_sha.hexdigest()
    
    394 343
             message_digest.size_bytes = len(message_buffer)
    
    395 344
     
    
    396
    -        for remote in push_remotes:
    
    397
    -            remote.init()
    
    345
    +        remote.init()
    
    398 346
     
    
    399
    -            with io.BytesIO(message_buffer) as b:
    
    400
    -                self._send_blob(remote, message_digest, b)
    
    347
    +        with io.BytesIO(message_buffer) as b:
    
    348
    +            self._send_blob(remote, message_digest, b)
    
    401 349
     
    
    402 350
             return message_digest
    
    403 351
     
    
    404
    -    def _verify_digest_on_remote(self, remote, digest):
    
    405
    -        # Check whether ref is already on the server in which case
    
    406
    -        # there is no need to push the artifact
    
    352
    +    # verify_digest_on_remote():
    
    353
    +    #
    
    354
    +    # Check whether the object is already on the server in which case
    
    355
    +    # there is no need to upload it.
    
    356
    +    #
    
    357
    +    # Args:
    
    358
    +    #     remote (CASRemote): The remote to check
    
    359
    +    #     digest (Digest): The object digest.
    
    360
    +    #
    
    361
    +    def verify_digest_on_remote(self, remote, digest):
    
    362
    +        remote.init()
    
    363
    +
    
    407 364
             request = remote_execution_pb2.FindMissingBlobsRequest()
    
    408 365
             request.blob_digests.extend([digest])
    
    409 366
     
    
    ... ... @@ -413,24 +370,6 @@ class CASCache(ArtifactCache):
    413 370
     
    
    414 371
             return True
    
    415 372
     
    
    416
    -    def verify_digest_pushed(self, project, digest):
    
    417
    -
    
    418
    -        push_remotes = [r for r in self._remotes[project] if r.spec.push]
    
    419
    -
    
    420
    -        pushed = False
    
    421
    -
    
    422
    -        for remote in push_remotes:
    
    423
    -            remote.init()
    
    424
    -
    
    425
    -            if self._verify_digest_on_remote(remote, digest):
    
    426
    -                pushed = True
    
    427
    -
    
    428
    -        return pushed
    
    429
    -
    
    430
    -    ################################################
    
    431
    -    #                API Private Methods           #
    
    432
    -    ################################################
    
    433
    -
    
    434 373
         # objpath():
    
    435 374
         #
    
    436 375
         # Return the path of an object based on its digest.
    
    ... ... @@ -496,7 +435,7 @@ class CASCache(ArtifactCache):
    496 435
                 pass
    
    497 436
     
    
    498 437
             except OSError as e:
    
    499
    -            raise ArtifactError("Failed to hash object: {}".format(e)) from e
    
    438
    +            raise CASError("Failed to hash object: {}".format(e)) from e
    
    500 439
     
    
    501 440
             return digest
    
    502 441
     
    
    ... ... @@ -537,26 +476,39 @@ class CASCache(ArtifactCache):
    537 476
                     return digest
    
    538 477
     
    
    539 478
             except FileNotFoundError as e:
    
    540
    -            raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e
    
    479
    +            raise CASError("Attempt to access unavailable ref: {}".format(e)) from e
    
    541 480
     
    
    542
    -    def update_mtime(self, element, key):
    
    481
    +    # update_mtime()
    
    482
    +    #
    
    483
    +    # Update the mtime of a ref.
    
    484
    +    #
    
    485
    +    # Args:
    
    486
    +    #     ref (str): The ref to update
    
    487
    +    #
    
    488
    +    def update_mtime(self, ref):
    
    543 489
             try:
    
    544
    -            ref = self.get_artifact_fullname(element, key)
    
    545 490
                 os.utime(self._refpath(ref))
    
    546 491
             except FileNotFoundError as e:
    
    547
    -            raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e
    
    492
    +            raise CASError("Attempt to access unavailable ref: {}".format(e)) from e
    
    548 493
     
    
    494
    +    # calculate_cache_size()
    
    495
    +    #
    
    496
    +    # Return the real disk usage of the CAS cache.
    
    497
    +    #
    
    498
    +    # Returns:
    
    499
    +    #    (int): The size of the cache.
    
    500
    +    #
    
    549 501
         def calculate_cache_size(self):
    
    550 502
             return utils._get_dir_size(self.casdir)
    
    551 503
     
    
    552
    -    # list_artifacts():
    
    504
    +    # list_refs():
    
    553 505
         #
    
    554
    -    # List cached artifacts in Least Recently Modified (LRM) order.
    
    506
    +    # List refs in Least Recently Modified (LRM) order.
    
    555 507
         #
    
    556 508
         # Returns:
    
    557 509
         #     (list) - A list of refs in LRM order
    
    558 510
         #
    
    559
    -    def list_artifacts(self):
    
    511
    +    def list_refs(self):
    
    560 512
             # string of: /path/to/repo/refs/heads
    
    561 513
             ref_heads = os.path.join(self.casdir, 'refs', 'heads')
    
    562 514
     
    
    ... ... @@ -571,7 +523,7 @@ class CASCache(ArtifactCache):
    571 523
                     mtimes.append(os.path.getmtime(ref_path))
    
    572 524
     
    
    573 525
             # NOTE: Sorted will sort from earliest to latest, thus the
    
    574
    -        # first element of this list will be the file modified earliest.
    
    526
    +        # first ref of this list will be the file modified earliest.
    
    575 527
             return [ref for _, ref in sorted(zip(mtimes, refs))]
    
    576 528
     
    
    577 529
         # remove():
    
    ... ... @@ -590,28 +542,10 @@ class CASCache(ArtifactCache):
    590 542
         #
    
    591 543
         def remove(self, ref, *, defer_prune=False):
    
    592 544
     
    
    593
    -        # Remove extract if not used by other ref
    
    594
    -        tree = self.resolve_ref(ref)
    
    595
    -        ref_name, ref_hash = os.path.split(ref)
    
    596
    -        extract = os.path.join(self.extractdir, ref_name, tree.hash)
    
    597
    -        keys_file = os.path.join(extract, 'meta', 'keys.yaml')
    
    598
    -        if os.path.exists(keys_file):
    
    599
    -            keys_meta = _yaml.load(keys_file)
    
    600
    -            keys = [keys_meta['strong'], keys_meta['weak']]
    
    601
    -            remove_extract = True
    
    602
    -            for other_hash in keys:
    
    603
    -                if other_hash == ref_hash:
    
    604
    -                    continue
    
    605
    -                remove_extract = False
    
    606
    -                break
    
    607
    -
    
    608
    -            if remove_extract:
    
    609
    -                utils._force_rmtree(extract)
    
    610
    -
    
    611 545
             # Remove cache ref
    
    612 546
             refpath = self._refpath(ref)
    
    613 547
             if not os.path.exists(refpath):
    
    614
    -            raise ArtifactError("Could not find artifact for ref '{}'".format(ref))
    
    548
    +            raise CASError("Could not find ref '{}'".format(ref))
    
    615 549
     
    
    616 550
             os.unlink(refpath)
    
    617 551
     
    
    ... ... @@ -721,7 +655,7 @@ class CASCache(ArtifactCache):
    721 655
                     # The process serving the socket can't be cached anyway
    
    722 656
                     pass
    
    723 657
                 else:
    
    724
    -                raise ArtifactError("Unsupported file type for {}".format(full_path))
    
    658
    +                raise CASError("Unsupported file type for {}".format(full_path))
    
    725 659
     
    
    726 660
             return self.add_object(digest=dir_digest,
    
    727 661
                                    buffer=directory.SerializeToString())
    
    ... ... @@ -740,7 +674,7 @@ class CASCache(ArtifactCache):
    740 674
                 if dirnode.name == name:
    
    741 675
                     return dirnode.digest
    
    742 676
     
    
    743
    -        raise ArtifactError("Subdirectory {} not found".format(name))
    
    677
    +        raise CASError("Subdirectory {} not found".format(name))
    
    744 678
     
    
    745 679
         def _diff_trees(self, tree_a, tree_b, *, added, removed, modified, path=""):
    
    746 680
             dir_a = remote_execution_pb2.Directory()
    
    ... ... @@ -812,29 +746,6 @@ class CASCache(ArtifactCache):
    812 746
             for dirnode in directory.directories:
    
    813 747
                 self._reachable_refs_dir(reachable, dirnode.digest)
    
    814 748
     
    
    815
    -    def _initialize_remote(self, remote_spec, q):
    
    816
    -        try:
    
    817
    -            remote = _CASRemote(remote_spec)
    
    818
    -            remote.init()
    
    819
    -
    
    820
    -            request = buildstream_pb2.StatusRequest()
    
    821
    -            response = remote.ref_storage.Status(request)
    
    822
    -
    
    823
    -            if remote_spec.push and not response.allow_updates:
    
    824
    -                q.put('Artifact server does not allow push')
    
    825
    -            else:
    
    826
    -                # No error
    
    827
    -                q.put(None)
    
    828
    -
    
    829
    -        except grpc.RpcError as e:
    
    830
    -            # str(e) is too verbose for errors reported to the user
    
    831
    -            q.put(e.details())
    
    832
    -
    
    833
    -        except Exception as e:               # pylint: disable=broad-except
    
    834
    -            # Whatever happens, we need to return it to the calling process
    
    835
    -            #
    
    836
    -            q.put(str(e))
    
    837
    -
    
    838 749
         def _required_blobs(self, directory_digest):
    
    839 750
             # parse directory, and recursively add blobs
    
    840 751
             d = remote_execution_pb2.Digest()
    
    ... ... @@ -1080,7 +991,7 @@ class CASCache(ArtifactCache):
    1080 991
     
    
    1081 992
     # Represents a single remote CAS cache.
    
    1082 993
     #
    
    1083
    -class _CASRemote():
    
    994
    +class CASRemote():
    
    1084 995
         def __init__(self, spec):
    
    1085 996
             self.spec = spec
    
    1086 997
             self._initialized = False
    
    ... ... @@ -1125,7 +1036,7 @@ class _CASRemote():
    1125 1036
                                                                certificate_chain=client_cert_bytes)
    
    1126 1037
                     self.channel = grpc.secure_channel('{}:{}'.format(url.hostname, port), credentials)
    
    1127 1038
                 else:
    
    1128
    -                raise ArtifactError("Unsupported URL: {}".format(self.spec.url))
    
    1039
    +                raise CASError("Unsupported URL: {}".format(self.spec.url))
    
    1129 1040
     
    
    1130 1041
                 self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel)
    
    1131 1042
                 self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
    
    ... ... @@ -1203,10 +1114,10 @@ class _CASBatchRead():
    1203 1114
     
    
    1204 1115
             for response in batch_response.responses:
    
    1205 1116
                 if response.status.code != code_pb2.OK:
    
    1206
    -                raise ArtifactError("Failed to download blob {}: {}".format(
    
    1117
    +                raise CASError("Failed to download blob {}: {}".format(
    
    1207 1118
                         response.digest.hash, response.status.code))
    
    1208 1119
                 if response.digest.size_bytes != len(response.data):
    
    1209
    -                raise ArtifactError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
    
    1120
    +                raise CASError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
    
    1210 1121
                         response.digest.hash, response.digest.size_bytes, len(response.data)))
    
    1211 1122
     
    
    1212 1123
                 yield (response.digest, response.data)
    
    ... ... @@ -1248,7 +1159,7 @@ class _CASBatchUpdate():
    1248 1159
     
    
    1249 1160
             for response in batch_response.responses:
    
    1250 1161
                 if response.status.code != code_pb2.OK:
    
    1251
    -                raise ArtifactError("Failed to upload blob {}: {}".format(
    
    1162
    +                raise CASError("Failed to upload blob {}: {}".format(
    
    1252 1163
                         response.digest.hash, response.status.code))
    
    1253 1164
     
    
    1254 1165
     
    

  • buildstream/_artifactcache/casserver.py
    ... ... @@ -32,8 +32,9 @@ from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remo
    32 32
     from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    33 33
     from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
    
    34 34
     
    
    35
    -from .._exceptions import ArtifactError
    
    36
    -from .._context import Context
    
    35
    +from .._exceptions import CASError
    
    36
    +
    
    37
    +from .cascache import CASCache
    
    37 38
     
    
    38 39
     
    
    39 40
     # The default limit for gRPC messages is 4 MiB.
    
    ... ... @@ -55,26 +56,23 @@ class ArtifactTooLargeException(Exception):
    55 56
     #     enable_push (bool): Whether to allow blob uploads and artifact updates
    
    56 57
     #
    
    57 58
     def create_server(repo, *, enable_push):
    
    58
    -    context = Context()
    
    59
    -    context.artifactdir = os.path.abspath(repo)
    
    60
    -
    
    61
    -    artifactcache = context.artifactcache
    
    59
    +    cas = CASCache(os.path.abspath(repo))
    
    62 60
     
    
    63 61
         # Use max_workers default from Python 3.5+
    
    64 62
         max_workers = (os.cpu_count() or 1) * 5
    
    65 63
         server = grpc.server(futures.ThreadPoolExecutor(max_workers))
    
    66 64
     
    
    67 65
         bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
    
    68
    -        _ByteStreamServicer(artifactcache, enable_push=enable_push), server)
    
    66
    +        _ByteStreamServicer(cas, enable_push=enable_push), server)
    
    69 67
     
    
    70 68
         remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
    
    71
    -        _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
    
    69
    +        _ContentAddressableStorageServicer(cas, enable_push=enable_push), server)
    
    72 70
     
    
    73 71
         remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
    
    74 72
             _CapabilitiesServicer(), server)
    
    75 73
     
    
    76 74
         buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server(
    
    77
    -        _ReferenceStorageServicer(artifactcache, enable_push=enable_push), server)
    
    75
    +        _ReferenceStorageServicer(cas, enable_push=enable_push), server)
    
    78 76
     
    
    79 77
         return server
    
    80 78
     
    
    ... ... @@ -333,7 +331,7 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
    333 331
     
    
    334 332
                 response.digest.hash = tree.hash
    
    335 333
                 response.digest.size_bytes = tree.size_bytes
    
    336
    -        except ArtifactError:
    
    334
    +        except CASError:
    
    337 335
                 context.set_code(grpc.StatusCode.NOT_FOUND)
    
    338 336
     
    
    339 337
             return response
    
    ... ... @@ -437,7 +435,7 @@ def _clean_up_cache(cas, object_size):
    437 435
             return 0
    
    438 436
     
    
    439 437
         # obtain a list of LRP artifacts
    
    440
    -    LRP_artifacts = cas.list_artifacts()
    
    438
    +    LRP_artifacts = cas.list_refs()
    
    441 439
     
    
    442 440
         removed_size = 0  # in bytes
    
    443 441
         while object_size - removed_size > free_disk_space:
    

  • buildstream/_context.py
    ... ... @@ -31,7 +31,6 @@ from ._exceptions import LoadError, LoadErrorReason, BstError
    31 31
     from ._message import Message, MessageType
    
    32 32
     from ._profile import Topics, profile_start, profile_end
    
    33 33
     from ._artifactcache import ArtifactCache
    
    34
    -from ._artifactcache.cascache import CASCache
    
    35 34
     from ._workspaces import Workspaces
    
    36 35
     from .plugin import _plugin_lookup
    
    37 36
     
    
    ... ... @@ -233,7 +232,7 @@ class Context():
    233 232
         @property
    
    234 233
         def artifactcache(self):
    
    235 234
             if not self._artifactcache:
    
    236
    -            self._artifactcache = CASCache(self)
    
    235
    +            self._artifactcache = ArtifactCache(self)
    
    237 236
     
    
    238 237
             return self._artifactcache
    
    239 238
     
    

  • buildstream/_exceptions.py
    ... ... @@ -90,6 +90,7 @@ class ErrorDomain(Enum):
    90 90
         APP = 12
    
    91 91
         STREAM = 13
    
    92 92
         VIRTUAL_FS = 14
    
    93
    +    CAS = 15
    
    93 94
     
    
    94 95
     
    
    95 96
     # BstError is an internal base exception class for BuildSream
    
    ... ... @@ -274,6 +275,15 @@ class ArtifactError(BstError):
    274 275
             super().__init__(message, detail=detail, domain=ErrorDomain.ARTIFACT, reason=reason, temporary=True)
    
    275 276
     
    
    276 277
     
    
    278
    +# CASError
    
    279
    +#
    
    280
    +# Raised when errors are encountered in the CAS
    
    281
    +#
    
    282
    +class CASError(BstError):
    
    283
    +    def __init__(self, message, *, detail=None, reason=None, temporary=False):
    
    284
    +        super().__init__(message, detail=detail, domain=ErrorDomain.CAS, reason=reason, temporary=True)
    
    285
    +
    
    286
    +
    
    277 287
     # PipelineError
    
    278 288
     #
    
    279 289
     # Raised from pipeline operations
    

  • buildstream/storage/_casbaseddirectory.py
    ... ... @@ -79,7 +79,7 @@ class CasBasedDirectory(Directory):
    79 79
             self.filename = filename
    
    80 80
             self.common_name = common_name
    
    81 81
             self.pb2_directory = remote_execution_pb2.Directory()
    
    82
    -        self.cas_cache = context.artifactcache
    
    82
    +        self.cas_cache = context.artifactcache.cas
    
    83 83
             if ref:
    
    84 84
                 with open(self.cas_cache.objpath(ref), 'rb') as f:
    
    85 85
                     self.pb2_directory.ParseFromString(f.read())
    

  • tests/artifactcache/pull.py
    ... ... @@ -190,15 +190,16 @@ def test_pull_tree(cli, tmpdir, datafiles):
    190 190
             # Load the project and CAS cache
    
    191 191
             project = Project(project_dir, context)
    
    192 192
             project.ensure_fully_loaded()
    
    193
    -        cas = context.artifactcache
    
    193
    +        artifactcache = context.artifactcache
    
    194
    +        cas = artifactcache.cas
    
    194 195
     
    
    195 196
             # Assert that the element's artifact is cached
    
    196 197
             element = project.load_elements(['target.bst'])[0]
    
    197 198
             element_key = cli.get_element_key(project_dir, 'target.bst')
    
    198
    -        assert cas.contains(element, element_key)
    
    199
    +        assert artifactcache.contains(element, element_key)
    
    199 200
     
    
    200 201
             # Retrieve the Directory object from the cached artifact
    
    201
    -        artifact_ref = cas.get_artifact_fullname(element, element_key)
    
    202
    +        artifact_ref = artifactcache.get_artifact_fullname(element, element_key)
    
    202 203
             artifact_digest = cas.resolve_ref(artifact_ref)
    
    203 204
     
    
    204 205
             queue = multiprocessing.Queue()
    
    ... ... @@ -268,12 +269,13 @@ def _test_push_tree(user_config_file, project_dir, artifact_dir, artifact_digest
    268 269
         project.ensure_fully_loaded()
    
    269 270
     
    
    270 271
         # Create a local CAS cache handle
    
    271
    -    cas = context.artifactcache
    
    272
    +    artifactcache = context.artifactcache
    
    273
    +    cas = artifactcache.cas
    
    272 274
     
    
    273 275
         # Manually setup the CAS remote
    
    274
    -    cas.setup_remotes(use_config=True)
    
    276
    +    artifactcache.setup_remotes(use_config=True)
    
    275 277
     
    
    276
    -    if cas.has_push_remotes():
    
    278
    +    if artifactcache.has_push_remotes():
    
    277 279
             directory = remote_execution_pb2.Directory()
    
    278 280
     
    
    279 281
             with open(cas.objpath(artifact_digest), 'rb') as f:
    
    ... ... @@ -284,7 +286,7 @@ def _test_push_tree(user_config_file, project_dir, artifact_dir, artifact_digest
    284 286
             tree_maker(cas, tree, directory)
    
    285 287
     
    
    286 288
             # Push the Tree as a regular message
    
    287
    -        tree_digest = cas.push_message(project, tree)
    
    289
    +        tree_digest = artifactcache.push_message(project, tree)
    
    288 290
     
    
    289 291
             queue.put((tree_digest.hash, tree_digest.size_bytes))
    
    290 292
         else:
    

  • tests/artifactcache/push.py
    ... ... @@ -165,20 +165,21 @@ def test_push_directory(cli, tmpdir, datafiles):
    165 165
             # Load the project and CAS cache
    
    166 166
             project = Project(project_dir, context)
    
    167 167
             project.ensure_fully_loaded()
    
    168
    -        cas = context.artifactcache
    
    168
    +        artifactcache = context.artifactcache
    
    169
    +        cas = artifactcache.cas
    
    169 170
     
    
    170 171
             # Assert that the element's artifact is cached
    
    171 172
             element = project.load_elements(['target.bst'])[0]
    
    172 173
             element_key = cli.get_element_key(project_dir, 'target.bst')
    
    173
    -        assert cas.contains(element, element_key)
    
    174
    +        assert artifactcache.contains(element, element_key)
    
    174 175
     
    
    175 176
             # Manually setup the CAS remote
    
    176
    -        cas.setup_remotes(use_config=True)
    
    177
    -        cas.initialize_remotes()
    
    178
    -        assert cas.has_push_remotes(element=element)
    
    177
    +        artifactcache.setup_remotes(use_config=True)
    
    178
    +        artifactcache.initialize_remotes()
    
    179
    +        assert artifactcache.has_push_remotes(element=element)
    
    179 180
     
    
    180 181
             # Recreate the CasBasedDirectory object from the cached artifact
    
    181
    -        artifact_ref = cas.get_artifact_fullname(element, element_key)
    
    182
    +        artifact_ref = artifactcache.get_artifact_fullname(element, element_key)
    
    182 183
             artifact_digest = cas.resolve_ref(artifact_ref)
    
    183 184
     
    
    184 185
             queue = multiprocessing.Queue()
    

  • tests/testutils/artifactshare.py
    ... ... @@ -13,7 +13,7 @@ import pytest_cov
    13 13
     from buildstream import _yaml
    
    14 14
     from buildstream._artifactcache.casserver import create_server
    
    15 15
     from buildstream._context import Context
    
    16
    -from buildstream._exceptions import ArtifactError
    
    16
    +from buildstream._exceptions import CASError
    
    17 17
     from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    18 18
     
    
    19 19
     
    
    ... ... @@ -48,7 +48,7 @@ class ArtifactShare():
    48 48
             context = Context()
    
    49 49
             context.artifactdir = self.repodir
    
    50 50
     
    
    51
    -        self.cas = context.artifactcache
    
    51
    +        self.cas = context.artifactcache.cas
    
    52 52
     
    
    53 53
             self.total_space = total_space
    
    54 54
             self.free_space = free_space
    
    ... ... @@ -135,7 +135,7 @@ class ArtifactShare():
    135 135
             try:
    
    136 136
                 tree = self.cas.resolve_ref(artifact_key)
    
    137 137
                 return True
    
    138
    -        except ArtifactError:
    
    138
    +        except CASError:
    
    139 139
                 return False
    
    140 140
     
    
    141 141
         # close():
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]