[Notes] [Git][BuildStream/buildstream][raoul/802-refactor-artifactcache] 16 commits: Rename "tools" directory to "requirements"



Title: GitLab

Raoul Hidalgo Charman pushed to branch raoul/802-refactor-artifactcache at BuildStream / buildstream

Commits:

30 changed files:

Changes:

  • .gitlab-ci.yml
    ... ... @@ -276,7 +276,7 @@ coverage:
    276 276
       coverage: '/TOTAL +\d+ +\d+ +(\d+\.\d+)%/'
    
    277 277
       script:
    
    278 278
         - cd dist && ./unpack.sh && cd buildstream
    
    279
    -    - pip3 install -r tools/requirements.txt -r tools/dev-requirements.txt
    
    279
    +    - pip3 install -r requirements/requirements.txt -r requirements/dev-requirements.txt
    
    280 280
         - pip3 install --no-index .
    
    281 281
         - mkdir report
    
    282 282
         - cd report
    

  • CONTRIBUTING.rst
    ... ... @@ -1736,10 +1736,8 @@ obtain profiles::
    1736 1736
           ForceCommand BST_PROFILE=artifact-receive cd /tmp && bst-artifact-receive --pull-url https://example.com/ /home/artifacts/artifacts
    
    1737 1737
     
    
    1738 1738
     
    
    1739
    -The MANIFEST.in and setup.py
    
    1740
    -----------------------------
    
    1741
    -When adding a dependency to BuildStream, it's important to update the setup.py accordingly.
    
    1742
    -
    
    1739
    +Managing data files
    
    1740
    +-------------------
    
    1743 1741
     When adding data files which need to be discovered at runtime by BuildStream, update setup.py accordingly.
    
    1744 1742
     
    
    1745 1743
     When adding data files for the purpose of docs or tests, or anything that is not covered by
    
    ... ... @@ -1749,3 +1747,23 @@ At any time, running the following command to create a source distribution shoul
    1749 1747
     creating a tarball which contains everything we want it to include::
    
    1750 1748
     
    
    1751 1749
       ./setup.py sdist
    
    1750
    +
    
    1751
    +
    
    1752
    +Updating BuildStream's Python dependencies
    
    1753
    +------------------------------------------
    
    1754
    +BuildStream's Python dependencies are listed in multiple
    
    1755
    +`requirements files <https://pip.readthedocs.io/en/latest/reference/pip_install/#requirements-file-format>`
    
    1756
    +present in the ``requirements`` directory.
    
    1757
    +
    
    1758
    +All ``.txt`` files in this directory are generated from the corresponding
    
    1759
    +``.in`` file, and each ``.in`` file represents a set of dependencies. For
    
    1760
    +example, ``requirements.in`` contains all runtime dependencies of BuildStream.
    
    1761
    +``requirements.txt`` is generated from it, and contains pinned versions of all
    
    1762
    +runtime dependencies (including transitive dependencies) of BuildStream.
    
    1763
    +
    
    1764
    +When adding a new dependency to BuildStream, or updating existing dependencies,
    
    1765
    +it is important to update the appropriate requirements file accordingly. After
    
    1766
    +changing the ``.in`` file, run the following to update the matching ``.txt``
    
    1767
    +file::
    
    1768
    +
    
    1769
    +   make -C requirements

  • MANIFEST.in
    ... ... @@ -32,12 +32,12 @@ include .pylintrc
    32 32
     recursive-include buildstream/_protos *.proto
    
    33 33
     
    
    34 34
     # Requirements files
    
    35
    -include tools/requirements.in
    
    36
    -include tools/requirements.txt
    
    37
    -include tools/dev-requirements.in
    
    38
    -include tools/dev-requirements.txt
    
    39
    -include tools/plugin-requirements.in
    
    40
    -include tools/plugin-requirements.txt
    
    35
    +include requirements/requirements.in
    
    36
    +include requirements/requirements.txt
    
    37
    +include requirements/dev-requirements.in
    
    38
    +include requirements/dev-requirements.txt
    
    39
    +include requirements/plugin-requirements.in
    
    40
    +include requirements/plugin-requirements.txt
    
    41 41
     
    
    42 42
     # Versioneer
    
    43 43
     include versioneer.py

  • buildstream/_artifactcache/artifactcache.pybuildstream/_artifactcache.py
    ... ... @@ -19,18 +19,17 @@
    19 19
     
    
    20 20
     import multiprocessing
    
    21 21
     import os
    
    22
    -import signal
    
    23 22
     import string
    
    24 23
     from collections.abc import Mapping
    
    24
    +import grpc
    
    25 25
     
    
    26
    -from ..types import _KeyStrength
    
    27
    -from .._exceptions import ArtifactError, CASError, LoadError, LoadErrorReason
    
    28
    -from .._message import Message, MessageType
    
    29
    -from .. import _signals
    
    30
    -from .. import utils
    
    31
    -from .. import _yaml
    
    26
    +from .types import _KeyStrength
    
    27
    +from ._exceptions import ArtifactError, CASError, LoadError, LoadErrorReason
    
    28
    +from ._message import Message, MessageType
    
    29
    +from . import utils
    
    30
    +from . import _yaml
    
    32 31
     
    
    33
    -from .cascache import CASRemote, CASRemoteSpec
    
    32
    +from ._cas.casremote import BlobNotFound, CASRemote, CASRemoteSpec
    
    34 33
     
    
    35 34
     
    
    36 35
     CACHE_SIZE_FILE = "cache_size"
    
    ... ... @@ -375,20 +374,8 @@ class ArtifactCache():
    375 374
             remotes = {}
    
    376 375
             q = multiprocessing.Queue()
    
    377 376
             for remote_spec in remote_specs:
    
    378
    -            # Use subprocess to avoid creation of gRPC threads in main BuildStream process
    
    379
    -            # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
    
    380
    -            p = multiprocessing.Process(target=self.cas.initialize_remote, args=(remote_spec, q))
    
    381 377
     
    
    382
    -            try:
    
    383
    -                # Keep SIGINT blocked in the child process
    
    384
    -                with _signals.blocked([signal.SIGINT], ignore=False):
    
    385
    -                    p.start()
    
    386
    -
    
    387
    -                error = q.get()
    
    388
    -                p.join()
    
    389
    -            except KeyboardInterrupt:
    
    390
    -                utils._kill_process_tree(p.pid)
    
    391
    -                raise
    
    378
    +            error = CASRemote.check_remote(remote_spec, self.context.tmpdir, q)
    
    392 379
     
    
    393 380
                 if error and on_failure:
    
    394 381
                     on_failure(remote_spec.url, error)
    
    ... ... @@ -399,7 +386,7 @@ class ArtifactCache():
    399 386
                     if remote_spec.push:
    
    400 387
                         self._has_push_remotes = True
    
    401 388
     
    
    402
    -                remotes[remote_spec.url] = CASRemote(remote_spec)
    
    389
    +                remotes[remote_spec.url] = CASRemote(remote_spec, self.context.tmpdir)
    
    403 390
     
    
    404 391
             for project in self.context.get_projects():
    
    405 392
                 remote_specs = self.global_remote_specs
    
    ... ... @@ -621,16 +608,37 @@ class ArtifactCache():
    621 608
     
    
    622 609
             for remote in push_remotes:
    
    623 610
                 remote.init()
    
    611
    +            pushed_remote = False
    
    624 612
                 display_key = element._get_brief_display_key()
    
    625 613
                 element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
    
    626 614
     
    
    627
    -            if self.cas.push(refs, remote):
    
    628
    -                element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
    
    615
    +            try:
    
    616
    +                for ref in refs:
    
    617
    +                    # Check whether ref is already on the server in which case
    
    618
    +                    # there is no need to push the ref
    
    619
    +                    root_digest = self.cas.resolve_ref(ref)
    
    620
    +                    response = remote.get_reference(ref)
    
    621
    +                    if (response is not None and
    
    622
    +                            response.hash == root_digest.hash and
    
    623
    +                            response.size_bytes == root_digest.size_bytes):
    
    624
    +                        element.info("Remote ({}) already has {} cached".format(
    
    625
    +                            remote.spec.url, element._get_brief_display_key()))
    
    626
    +                        continue
    
    627
    +
    
    628
    +                    # upload blobs
    
    629
    +                    self._send_directory(root_digest, remote)
    
    630
    +
    
    631
    +                    remote.update_reference(ref, root_digest)
    
    632
    +                    pushed_remote = True
    
    633
    +
    
    634
    +            except grpc.RpcError as e:
    
    635
    +                if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
    
    636
    +                    raise CASError("Failed to push ref {}: {}"
    
    637
    +                                   .format(refs, e), temporary=True) from e
    
    638
    +
    
    639
    +            if pushed_remote is True:
    
    629 640
                     pushed = True
    
    630
    -            else:
    
    631
    -                element.info("Remote ({}) already has {} cached".format(
    
    632
    -                    remote.spec.url, element._get_brief_display_key()
    
    633
    -                ))
    
    641
    +                element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
    
    634 642
     
    
    635 643
             return pushed
    
    636 644
     
    
    ... ... @@ -658,13 +666,36 @@ class ArtifactCache():
    658 666
                     display_key = element._get_brief_display_key()
    
    659 667
                     element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url))
    
    660 668
     
    
    661
    -                if self.cas.pull(ref, remote, progress=progress, subdir=subdir, excluded_subdirs=excluded_subdirs):
    
    662
    -                    element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
    
    669
    +                root_digest = remote.get_reference(ref)
    
    670
    +
    
    671
    +                if root_digest:
    
    672
    +                    try:
    
    673
    +                        for blob_digest in remote.yield_directory_digests(
    
    674
    +                                root_digest, progress=progress, subdir=subdir,
    
    675
    +                                excluded_subdirs=excluded_subdirs):
    
    676
    +                            if self.cas.check_blob(blob_digest):
    
    677
    +                                continue
    
    678
    +                            remote.request_blob(blob_digest)
    
    679
    +                            for blob_file in remote.get_blobs():
    
    680
    +                                self.cas.add_object(path=blob_file.name, link_directly=True)
    
    681
    +
    
    682
    +                        # request the final CAS batch
    
    683
    +                        for blob_file in remote.get_blobs(complete_batch=True):
    
    684
    +                            self.cas.add_object(path=blob_file.name, link_directly=True)
    
    685
    +
    
    686
    +                        self.cas.set_ref(ref, root_digest)
    
    687
    +                    except BlobNotFound:
    
    688
    +                        element.info("Remote ({}) is missing blobs for {}".format(
    
    689
    +                            remote.spec.url, element._get_brief_display_key()))
    
    690
    +                        continue
    
    691
    +
    
    663 692
                         if subdir:
    
    664 693
                             # Attempt to extract subdir into artifact extract dir if it already exists
    
    665 694
                             # without containing the subdir. If the respective artifact extract dir does not
    
    666 695
                             # exist a complete extraction will complete.
    
    667 696
                             self.extract(element, key, subdir)
    
    697
    +
    
    698
    +                    element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
    
    668 699
                         # no need to pull from additional remotes
    
    669 700
                         return True
    
    670 701
                     else:
    
    ... ... @@ -685,15 +716,26 @@ class ArtifactCache():
    685 716
         #
    
    686 717
         # Args:
    
    687 718
         #     project (Project): The current project
    
    688
    -    #     digest (Digest): The digest of the tree
    
    719
    +    #     tree_digest (Digest): The digest of the tree
    
    689 720
         #
    
    690
    -    def pull_tree(self, project, digest):
    
    721
    +    def pull_tree(self, project, tree_digest):
    
    691 722
             for remote in self._remotes[project]:
    
    692
    -            digest = self.cas.pull_tree(remote, digest)
    
    693
    -
    
    694
    -            if digest:
    
    695
    -                # no need to pull from additional remotes
    
    696
    -                return digest
    
    723
    +            try:
    
    724
    +                for blob_digest in remote.yield_tree_digests(tree_digest):
    
    725
    +                    if self.cas.check_blob(blob_digest):
    
    726
    +                        continue
    
    727
    +                    remote.request_blob(blob_digest)
    
    728
    +                    for blob_file in remote.get_blobs():
    
    729
    +                        self.cas.add_object(path=blob_file.name, link_directly=True)
    
    730
    +
    
    731
    +                # Get the last batch
    
    732
    +                for blob_file in remote.get_blobs(complete_batch=True):
    
    733
    +                    self.cas.add_object(path=blob_file.name, link_directly=True)
    
    734
    +
    
    735
    +            except BlobNotFound:
    
    736
    +                continue
    
    737
    +            else:
    
    738
    +                return tree_digest
    
    697 739
     
    
    698 740
             return None
    
    699 741
     
    
    ... ... @@ -722,7 +764,7 @@ class ArtifactCache():
    722 764
                 return
    
    723 765
     
    
    724 766
             for remote in push_remotes:
    
    725
    -            self.cas.push_directory(remote, directory)
    
    767
    +            self._send_directory(directory.ref, remote)
    
    726 768
     
    
    727 769
         # push_message():
    
    728 770
         #
    
    ... ... @@ -747,7 +789,7 @@ class ArtifactCache():
    747 789
                                     "servers are configured as push remotes.")
    
    748 790
     
    
    749 791
             for remote in push_remotes:
    
    750
    -            message_digest = self.cas.push_message(remote, message)
    
    792
    +            message_digest = remote.push_message(message)
    
    751 793
     
    
    752 794
             return message_digest
    
    753 795
     
    
    ... ... @@ -807,6 +849,14 @@ class ArtifactCache():
    807 849
             with self.context.timed_activity("Initializing remote caches", silent_nested=True):
    
    808 850
                 self.initialize_remotes(on_failure=remote_failed)
    
    809 851
     
    
    852
    +    def _send_directory(self, root_digest, remote):
    
    853
    +        required_blobs = self.cas.yield_directory_digests(root_digest)
    
    854
    +        missing_blobs = remote.find_missing_blobs(required_blobs)
    
    855
    +        for blob in missing_blobs.values():
    
    856
    +            blob_file = self.cas.objpath(blob)
    
    857
    +            remote.upload_blob(blob, blob_file, final=True)
    
    858
    +        remote.send_update_batch()
    
    859
    +
    
    810 860
         # _write_cache_size()
    
    811 861
         #
    
    812 862
         # Writes the given size of the artifact to the cache's size file
    

  • buildstream/_artifactcache/__init__.py deleted
    1
    -#
    
    2
    -#  Copyright (C) 2017-2018 Codethink Limited
    
    3
    -#
    
    4
    -#  This program is free software; you can redistribute it and/or
    
    5
    -#  modify it under the terms of the GNU Lesser General Public
    
    6
    -#  License as published by the Free Software Foundation; either
    
    7
    -#  version 2 of the License, or (at your option) any later version.
    
    8
    -#
    
    9
    -#  This library is distributed in the hope that it will be useful,
    
    10
    -#  but WITHOUT ANY WARRANTY; without even the implied warranty of
    
    11
    -#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
    
    12
    -#  Lesser General Public License for more details.
    
    13
    -#
    
    14
    -#  You should have received a copy of the GNU Lesser General Public
    
    15
    -#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
    
    16
    -#
    
    17
    -#  Authors:
    
    18
    -#        Tristan Van Berkom <tristan vanberkom codethink co uk>
    
    19
    -
    
    20
    -from .artifactcache import ArtifactCache, ArtifactCacheSpec, CACHE_SIZE_FILE

  • buildstream/_cas/__init__.py

  • buildstream/_artifactcache/cascache.pybuildstream/_cas/cascache.py
    ... ... @@ -17,85 +17,16 @@
    17 17
     #  Authors:
    
    18 18
     #        Jürg Billeter <juerg billeter codethink co uk>
    
    19 19
     
    
    20
    -from collections import namedtuple
    
    21 20
     import hashlib
    
    22
    -import itertools
    
    23
    -import io
    
    24 21
     import os
    
    25 22
     import stat
    
    26 23
     import tempfile
    
    27
    -import uuid
    
    28 24
     import contextlib
    
    29
    -from urllib.parse import urlparse
    
    30 25
     
    
    31
    -import grpc
    
    32
    -
    
    33
    -from .._protos.google.rpc import code_pb2
    
    34
    -from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    35
    -from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    36
    -from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
    
    26
    +from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    37 27
     
    
    38 28
     from .. import utils
    
    39
    -from .._exceptions import CASError, LoadError, LoadErrorReason
    
    40
    -from .. import _yaml
    
    41
    -
    
    42
    -
    
    43
    -# The default limit for gRPC messages is 4 MiB.
    
    44
    -# Limit payload to 1 MiB to leave sufficient headroom for metadata.
    
    45
    -_MAX_PAYLOAD_BYTES = 1024 * 1024
    
    46
    -
    
    47
    -
    
    48
    -class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key client_cert instance_name')):
    
    49
    -
    
    50
    -    # _new_from_config_node
    
    51
    -    #
    
    52
    -    # Creates an CASRemoteSpec() from a YAML loaded node
    
    53
    -    #
    
    54
    -    @staticmethod
    
    55
    -    def _new_from_config_node(spec_node, basedir=None):
    
    56
    -        _yaml.node_validate(spec_node, ['url', 'push', 'server-cert', 'client-key', 'client-cert', 'instance_name'])
    
    57
    -        url = _yaml.node_get(spec_node, str, 'url')
    
    58
    -        push = _yaml.node_get(spec_node, bool, 'push', default_value=False)
    
    59
    -        if not url:
    
    60
    -            provenance = _yaml.node_get_provenance(spec_node, 'url')
    
    61
    -            raise LoadError(LoadErrorReason.INVALID_DATA,
    
    62
    -                            "{}: empty artifact cache URL".format(provenance))
    
    63
    -
    
    64
    -        instance_name = _yaml.node_get(spec_node, str, 'instance_name', default_value=None)
    
    65
    -
    
    66
    -        server_cert = _yaml.node_get(spec_node, str, 'server-cert', default_value=None)
    
    67
    -        if server_cert and basedir:
    
    68
    -            server_cert = os.path.join(basedir, server_cert)
    
    69
    -
    
    70
    -        client_key = _yaml.node_get(spec_node, str, 'client-key', default_value=None)
    
    71
    -        if client_key and basedir:
    
    72
    -            client_key = os.path.join(basedir, client_key)
    
    73
    -
    
    74
    -        client_cert = _yaml.node_get(spec_node, str, 'client-cert', default_value=None)
    
    75
    -        if client_cert and basedir:
    
    76
    -            client_cert = os.path.join(basedir, client_cert)
    
    77
    -
    
    78
    -        if client_key and not client_cert:
    
    79
    -            provenance = _yaml.node_get_provenance(spec_node, 'client-key')
    
    80
    -            raise LoadError(LoadErrorReason.INVALID_DATA,
    
    81
    -                            "{}: 'client-key' was specified without 'client-cert'".format(provenance))
    
    82
    -
    
    83
    -        if client_cert and not client_key:
    
    84
    -            provenance = _yaml.node_get_provenance(spec_node, 'client-cert')
    
    85
    -            raise LoadError(LoadErrorReason.INVALID_DATA,
    
    86
    -                            "{}: 'client-cert' was specified without 'client-key'".format(provenance))
    
    87
    -
    
    88
    -        return CASRemoteSpec(url, push, server_cert, client_key, client_cert, instance_name)
    
    89
    -
    
    90
    -
    
    91
    -CASRemoteSpec.__new__.__defaults__ = (None, None, None, None)
    
    92
    -
    
    93
    -
    
    94
    -class BlobNotFound(CASError):
    
    95
    -
    
    96
    -    def __init__(self, blob, msg):
    
    97
    -        self.blob = blob
    
    98
    -        super().__init__(msg)
    
    29
    +from .._exceptions import CASError
    
    99 30
     
    
    100 31
     
    
    101 32
     # A CASCache manages a CAS repository as specified in the Remote Execution API.
    
    ... ... @@ -245,96 +176,6 @@ class CASCache():
    245 176
     
    
    246 177
             return modified, removed, added
    
    247 178
     
    
    248
    -    def initialize_remote(self, remote_spec, q):
    
    249
    -        try:
    
    250
    -            remote = CASRemote(remote_spec)
    
    251
    -            remote.init()
    
    252
    -
    
    253
    -            request = buildstream_pb2.StatusRequest(instance_name=remote_spec.instance_name)
    
    254
    -            response = remote.ref_storage.Status(request)
    
    255
    -
    
    256
    -            if remote_spec.push and not response.allow_updates:
    
    257
    -                q.put('CAS server does not allow push')
    
    258
    -            else:
    
    259
    -                # No error
    
    260
    -                q.put(None)
    
    261
    -
    
    262
    -        except grpc.RpcError as e:
    
    263
    -            # str(e) is too verbose for errors reported to the user
    
    264
    -            q.put(e.details())
    
    265
    -
    
    266
    -        except Exception as e:               # pylint: disable=broad-except
    
    267
    -            # Whatever happens, we need to return it to the calling process
    
    268
    -            #
    
    269
    -            q.put(str(e))
    
    270
    -
    
    271
    -    # pull():
    
    272
    -    #
    
    273
    -    # Pull a ref from a remote repository.
    
    274
    -    #
    
    275
    -    # Args:
    
    276
    -    #     ref (str): The ref to pull
    
    277
    -    #     remote (CASRemote): The remote repository to pull from
    
    278
    -    #     progress (callable): The progress callback, if any
    
    279
    -    #     subdir (str): The optional specific subdir to pull
    
    280
    -    #     excluded_subdirs (list): The optional list of subdirs to not pull
    
    281
    -    #
    
    282
    -    # Returns:
    
    283
    -    #   (bool): True if pull was successful, False if ref was not available
    
    284
    -    #
    
    285
    -    def pull(self, ref, remote, *, progress=None, subdir=None, excluded_subdirs=None):
    
    286
    -        try:
    
    287
    -            remote.init()
    
    288
    -
    
    289
    -            request = buildstream_pb2.GetReferenceRequest(instance_name=remote.spec.instance_name)
    
    290
    -            request.key = ref
    
    291
    -            response = remote.ref_storage.GetReference(request)
    
    292
    -
    
    293
    -            tree = remote_execution_pb2.Digest()
    
    294
    -            tree.hash = response.digest.hash
    
    295
    -            tree.size_bytes = response.digest.size_bytes
    
    296
    -
    
    297
    -            # Check if the element artifact is present, if so just fetch the subdir.
    
    298
    -            if subdir and os.path.exists(self.objpath(tree)):
    
    299
    -                self._fetch_subdir(remote, tree, subdir)
    
    300
    -            else:
    
    301
    -                # Fetch artifact, excluded_subdirs determined in pullqueue
    
    302
    -                self._fetch_directory(remote, tree, excluded_subdirs=excluded_subdirs)
    
    303
    -
    
    304
    -            self.set_ref(ref, tree)
    
    305
    -
    
    306
    -            return True
    
    307
    -        except grpc.RpcError as e:
    
    308
    -            if e.code() != grpc.StatusCode.NOT_FOUND:
    
    309
    -                raise CASError("Failed to pull ref {}: {}".format(ref, e)) from e
    
    310
    -            else:
    
    311
    -                return False
    
    312
    -        except BlobNotFound as e:
    
    313
    -            return False
    
    314
    -
    
    315
    -    # pull_tree():
    
    316
    -    #
    
    317
    -    # Pull a single Tree rather than a ref.
    
    318
    -    # Does not update local refs.
    
    319
    -    #
    
    320
    -    # Args:
    
    321
    -    #     remote (CASRemote): The remote to pull from
    
    322
    -    #     digest (Digest): The digest of the tree
    
    323
    -    #
    
    324
    -    def pull_tree(self, remote, digest):
    
    325
    -        try:
    
    326
    -            remote.init()
    
    327
    -
    
    328
    -            digest = self._fetch_tree(remote, digest)
    
    329
    -
    
    330
    -            return digest
    
    331
    -
    
    332
    -        except grpc.RpcError as e:
    
    333
    -            if e.code() != grpc.StatusCode.NOT_FOUND:
    
    334
    -                raise
    
    335
    -
    
    336
    -        return None
    
    337
    -
    
    338 179
         # link_ref():
    
    339 180
         #
    
    340 181
         # Add an alias for an existing ref.
    
    ... ... @@ -348,117 +189,6 @@ class CASCache():
    348 189
     
    
    349 190
             self.set_ref(newref, tree)
    
    350 191
     
    
    351
    -    # push():
    
    352
    -    #
    
    353
    -    # Push committed refs to remote repository.
    
    354
    -    #
    
    355
    -    # Args:
    
    356
    -    #     refs (list): The refs to push
    
    357
    -    #     remote (CASRemote): The remote to push to
    
    358
    -    #
    
    359
    -    # Returns:
    
    360
    -    #   (bool): True if any remote was updated, False if no pushes were required
    
    361
    -    #
    
    362
    -    # Raises:
    
    363
    -    #   (CASError): if there was an error
    
    364
    -    #
    
    365
    -    def push(self, refs, remote):
    
    366
    -        skipped_remote = True
    
    367
    -        try:
    
    368
    -            for ref in refs:
    
    369
    -                tree = self.resolve_ref(ref)
    
    370
    -
    
    371
    -                # Check whether ref is already on the server in which case
    
    372
    -                # there is no need to push the ref
    
    373
    -                try:
    
    374
    -                    request = buildstream_pb2.GetReferenceRequest(instance_name=remote.spec.instance_name)
    
    375
    -                    request.key = ref
    
    376
    -                    response = remote.ref_storage.GetReference(request)
    
    377
    -
    
    378
    -                    if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
    
    379
    -                        # ref is already on the server with the same tree
    
    380
    -                        continue
    
    381
    -
    
    382
    -                except grpc.RpcError as e:
    
    383
    -                    if e.code() != grpc.StatusCode.NOT_FOUND:
    
    384
    -                        # Intentionally re-raise RpcError for outer except block.
    
    385
    -                        raise
    
    386
    -
    
    387
    -                self._send_directory(remote, tree)
    
    388
    -
    
    389
    -                request = buildstream_pb2.UpdateReferenceRequest(instance_name=remote.spec.instance_name)
    
    390
    -                request.keys.append(ref)
    
    391
    -                request.digest.hash = tree.hash
    
    392
    -                request.digest.size_bytes = tree.size_bytes
    
    393
    -                remote.ref_storage.UpdateReference(request)
    
    394
    -
    
    395
    -                skipped_remote = False
    
    396
    -        except grpc.RpcError as e:
    
    397
    -            if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
    
    398
    -                raise CASError("Failed to push ref {}: {}".format(refs, e), temporary=True) from e
    
    399
    -
    
    400
    -        return not skipped_remote
    
    401
    -
    
    402
    -    # push_directory():
    
    403
    -    #
    
    404
    -    # Push the given virtual directory to a remote.
    
    405
    -    #
    
    406
    -    # Args:
    
    407
    -    #     remote (CASRemote): The remote to push to
    
    408
    -    #     directory (Directory): A virtual directory object to push.
    
    409
    -    #
    
    410
    -    # Raises:
    
    411
    -    #     (CASError): if there was an error
    
    412
    -    #
    
    413
    -    def push_directory(self, remote, directory):
    
    414
    -        remote.init()
    
    415
    -
    
    416
    -        self._send_directory(remote, directory.ref)
    
    417
    -
    
    418
    -    # push_message():
    
    419
    -    #
    
    420
    -    # Push the given protobuf message to a remote.
    
    421
    -    #
    
    422
    -    # Args:
    
    423
    -    #     remote (CASRemote): The remote to push to
    
    424
    -    #     message (Message): A protobuf message to push.
    
    425
    -    #
    
    426
    -    # Raises:
    
    427
    -    #     (CASError): if there was an error
    
    428
    -    #
    
    429
    -    def push_message(self, remote, message):
    
    430
    -
    
    431
    -        message_buffer = message.SerializeToString()
    
    432
    -        message_digest = utils._message_digest(message_buffer)
    
    433
    -
    
    434
    -        remote.init()
    
    435
    -
    
    436
    -        with io.BytesIO(message_buffer) as b:
    
    437
    -            self._send_blob(remote, message_digest, b)
    
    438
    -
    
    439
    -        return message_digest
    
    440
    -
    
    441
    -    # verify_digest_on_remote():
    
    442
    -    #
    
    443
    -    # Check whether the object is already on the server in which case
    
    444
    -    # there is no need to upload it.
    
    445
    -    #
    
    446
    -    # Args:
    
    447
    -    #     remote (CASRemote): The remote to check
    
    448
    -    #     digest (Digest): The object digest.
    
    449
    -    #
    
    450
    -    def verify_digest_on_remote(self, remote, digest):
    
    451
    -        remote.init()
    
    452
    -
    
    453
    -        request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=remote.spec.instance_name)
    
    454
    -        request.blob_digests.extend([digest])
    
    455
    -
    
    456
    -        response = remote.cas.FindMissingBlobs(request)
    
    457
    -        if digest in response.missing_blob_digests:
    
    458
    -            return False
    
    459
    -
    
    460
    -        return True
    
    461
    -
    
    462 192
         # objpath():
    
    463 193
         #
    
    464 194
         # Return the path of an object based on its digest.
    
    ... ... @@ -720,6 +450,37 @@ class CASCache():
    720 450
             reachable = set()
    
    721 451
             self._reachable_refs_dir(reachable, tree, update_mtime=True)
    
    722 452
     
    
    453
    +    # Check to see if a blob is in the local CAS
    
    454
    +    # return None if not
    
    455
    +    def check_blob(self, digest):
    
    456
    +        objpath = self.objpath(digest)
    
    457
    +        if os.path.exists(objpath):
    
    458
    +            # already in local repository
    
    459
    +            return objpath
    
    460
    +        else:
    
    461
    +            return None
    
    462
    +
    
    463
    +    def yield_directory_digests(self, directory_digest):
    
    464
    +        # parse directory, and recursively add blobs
    
    465
    +        d = remote_execution_pb2.Digest()
    
    466
    +        d.hash = directory_digest.hash
    
    467
    +        d.size_bytes = directory_digest.size_bytes
    
    468
    +        yield d
    
    469
    +
    
    470
    +        directory = remote_execution_pb2.Directory()
    
    471
    +
    
    472
    +        with open(self.objpath(directory_digest), 'rb') as f:
    
    473
    +            directory.ParseFromString(f.read())
    
    474
    +
    
    475
    +        for filenode in directory.files:
    
    476
    +            d = remote_execution_pb2.Digest()
    
    477
    +            d.hash = filenode.digest.hash
    
    478
    +            d.size_bytes = filenode.digest.size_bytes
    
    479
    +            yield d
    
    480
    +
    
    481
    +        for dirnode in directory.directories:
    
    482
    +            yield from self.yield_directory_digests(dirnode.digest)
    
    483
    +
    
    723 484
         ################################################
    
    724 485
         #             Local Private Methods            #
    
    725 486
         ################################################
    
    ... ... @@ -908,429 +669,3 @@ class CASCache():
    908 669
     
    
    909 670
             for dirnode in directory.directories:
    
    910 671
                 yield from self._required_blobs(dirnode.digest)
    911
    -
    
    912
    -    def _fetch_blob(self, remote, digest, stream):
    
    913
    -        resource_name_components = ['blobs', digest.hash, str(digest.size_bytes)]
    
    914
    -
    
    915
    -        if remote.spec.instance_name:
    
    916
    -            resource_name_components.insert(0, remote.spec.instance_name)
    
    917
    -
    
    918
    -        resource_name = '/'.join(resource_name_components)
    
    919
    -
    
    920
    -        request = bytestream_pb2.ReadRequest()
    
    921
    -        request.resource_name = resource_name
    
    922
    -        request.read_offset = 0
    
    923
    -        for response in remote.bytestream.Read(request):
    
    924
    -            stream.write(response.data)
    
    925
    -        stream.flush()
    
    926
    -
    
    927
    -        assert digest.size_bytes == os.fstat(stream.fileno()).st_size
    
    928
    -
    
    929
    -    # _ensure_blob():
    
    930
    -    #
    
    931
    -    # Fetch and add blob if it's not already local.
    
    932
    -    #
    
    933
    -    # Args:
    
    934
    -    #     remote (Remote): The remote to use.
    
    935
    -    #     digest (Digest): Digest object for the blob to fetch.
    
    936
    -    #
    
    937
    -    # Returns:
    
    938
    -    #     (str): The path of the object
    
    939
    -    #
    
    940
    -    def _ensure_blob(self, remote, digest):
    
    941
    -        objpath = self.objpath(digest)
    
    942
    -        if os.path.exists(objpath):
    
    943
    -            # already in local repository
    
    944
    -            return objpath
    
    945
    -
    
    946
    -        with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
    
    947
    -            self._fetch_blob(remote, digest, f)
    
    948
    -
    
    949
    -            added_digest = self.add_object(path=f.name, link_directly=True)
    
    950
    -            assert added_digest.hash == digest.hash
    
    951
    -
    
    952
    -        return objpath
    
    953
    -
    
    954
    -    def _batch_download_complete(self, batch):
    
    955
    -        for digest, data in batch.send():
    
    956
    -            with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
    
    957
    -                f.write(data)
    
    958
    -                f.flush()
    
    959
    -
    
    960
    -                added_digest = self.add_object(path=f.name, link_directly=True)
    
    961
    -                assert added_digest.hash == digest.hash
    
    962
    -
    
    963
    -    # Helper function for _fetch_directory().
    
    964
    -    def _fetch_directory_batch(self, remote, batch, fetch_queue, fetch_next_queue):
    
    965
    -        self._batch_download_complete(batch)
    
    966
    -
    
    967
    -        # All previously scheduled directories are now locally available,
    
    968
    -        # move them to the processing queue.
    
    969
    -        fetch_queue.extend(fetch_next_queue)
    
    970
    -        fetch_next_queue.clear()
    
    971
    -        return _CASBatchRead(remote)
    
    972
    -
    
    973
    -    # Helper function for _fetch_directory().
    
    974
    -    def _fetch_directory_node(self, remote, digest, batch, fetch_queue, fetch_next_queue, *, recursive=False):
    
    975
    -        in_local_cache = os.path.exists(self.objpath(digest))
    
    976
    -
    
    977
    -        if in_local_cache:
    
    978
    -            # Skip download, already in local cache.
    
    979
    -            pass
    
    980
    -        elif (digest.size_bytes >= remote.max_batch_total_size_bytes or
    
    981
    -              not remote.batch_read_supported):
    
    982
    -            # Too large for batch request, download in independent request.
    
    983
    -            self._ensure_blob(remote, digest)
    
    984
    -            in_local_cache = True
    
    985
    -        else:
    
    986
    -            if not batch.add(digest):
    
    987
    -                # Not enough space left in batch request.
    
    988
    -                # Complete pending batch first.
    
    989
    -                batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
    
    990
    -                batch.add(digest)
    
    991
    -
    
    992
    -        if recursive:
    
    993
    -            if in_local_cache:
    
    994
    -                # Add directory to processing queue.
    
    995
    -                fetch_queue.append(digest)
    
    996
    -            else:
    
    997
    -                # Directory will be available after completing pending batch.
    
    998
    -                # Add directory to deferred processing queue.
    
    999
    -                fetch_next_queue.append(digest)
    
    1000
    -
    
    1001
    -        return batch
    
    1002
    -
    
    1003
    -    # _fetch_directory():
    
    1004
    -    #
    
    1005
    -    # Fetches remote directory and adds it to content addressable store.
    
    1006
    -    #
    
    1007
    -    # Fetches files, symbolic links and recursively other directories in
    
    1008
    -    # the remote directory and adds them to the content addressable
    
    1009
    -    # store.
    
    1010
    -    #
    
    1011
    -    # Args:
    
    1012
    -    #     remote (Remote): The remote to use.
    
    1013
    -    #     dir_digest (Digest): Digest object for the directory to fetch.
    
    1014
    -    #     excluded_subdirs (list): The optional list of subdirs to not fetch
    
    1015
    -    #
    
    1016
    -    def _fetch_directory(self, remote, dir_digest, *, excluded_subdirs=None):
    
    1017
    -        fetch_queue = [dir_digest]
    
    1018
    -        fetch_next_queue = []
    
    1019
    -        batch = _CASBatchRead(remote)
    
    1020
    -        if not excluded_subdirs:
    
    1021
    -            excluded_subdirs = []
    
    1022
    -
    
    1023
    -        while len(fetch_queue) + len(fetch_next_queue) > 0:
    
    1024
    -            if not fetch_queue:
    
    1025
    -                batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
    
    1026
    -
    
    1027
    -            dir_digest = fetch_queue.pop(0)
    
    1028
    -
    
    1029
    -            objpath = self._ensure_blob(remote, dir_digest)
    
    1030
    -
    
    1031
    -            directory = remote_execution_pb2.Directory()
    
    1032
    -            with open(objpath, 'rb') as f:
    
    1033
    -                directory.ParseFromString(f.read())
    
    1034
    -
    
    1035
    -            for dirnode in directory.directories:
    
    1036
    -                if dirnode.name not in excluded_subdirs:
    
    1037
    -                    batch = self._fetch_directory_node(remote, dirnode.digest, batch,
    
    1038
    -                                                       fetch_queue, fetch_next_queue, recursive=True)
    
    1039
    -
    
    1040
    -            for filenode in directory.files:
    
    1041
    -                batch = self._fetch_directory_node(remote, filenode.digest, batch,
    
    1042
    -                                                   fetch_queue, fetch_next_queue)
    
    1043
    -
    
    1044
    -        # Fetch final batch
    
    1045
    -        self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
    
    1046
    -
    
    1047
    -    def _fetch_subdir(self, remote, tree, subdir):
    
    1048
    -        subdirdigest = self._get_subdir(tree, subdir)
    
    1049
    -        self._fetch_directory(remote, subdirdigest)
    
    1050
    -
    
    1051
    -    def _fetch_tree(self, remote, digest):
    
    1052
    -        # download but do not store the Tree object
    
    1053
    -        with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
    
    1054
    -            self._fetch_blob(remote, digest, out)
    
    1055
    -
    
    1056
    -            tree = remote_execution_pb2.Tree()
    
    1057
    -
    
    1058
    -            with open(out.name, 'rb') as f:
    
    1059
    -                tree.ParseFromString(f.read())
    
    1060
    -
    
    1061
    -            tree.children.extend([tree.root])
    
    1062
    -            for directory in tree.children:
    
    1063
    -                for filenode in directory.files:
    
    1064
    -                    self._ensure_blob(remote, filenode.digest)
    
    1065
    -
    
    1066
    -                # place directory blob only in final location when we've downloaded
    
    1067
    -                # all referenced blobs to avoid dangling references in the repository
    
    1068
    -                dirbuffer = directory.SerializeToString()
    
    1069
    -                dirdigest = self.add_object(buffer=dirbuffer)
    
    1070
    -                assert dirdigest.size_bytes == len(dirbuffer)
    
    1071
    -
    
    1072
    -        return dirdigest
    
    1073
    -
    
    1074
    -    def _send_blob(self, remote, digest, stream, u_uid=uuid.uuid4()):
    
    1075
    -        resource_name_components = ['uploads', str(u_uid), 'blobs',
    
    1076
    -                                    digest.hash, str(digest.size_bytes)]
    
    1077
    -
    
    1078
    -        if remote.spec.instance_name:
    
    1079
    -            resource_name_components.insert(0, remote.spec.instance_name)
    
    1080
    -
    
    1081
    -        resource_name = '/'.join(resource_name_components)
    
    1082
    -
    
    1083
    -        def request_stream(resname, instream):
    
    1084
    -            offset = 0
    
    1085
    -            finished = False
    
    1086
    -            remaining = digest.size_bytes
    
    1087
    -            while not finished:
    
    1088
    -                chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
    
    1089
    -                remaining -= chunk_size
    
    1090
    -
    
    1091
    -                request = bytestream_pb2.WriteRequest()
    
    1092
    -                request.write_offset = offset
    
    1093
    -                # max. _MAX_PAYLOAD_BYTES chunks
    
    1094
    -                request.data = instream.read(chunk_size)
    
    1095
    -                request.resource_name = resname
    
    1096
    -                request.finish_write = remaining <= 0
    
    1097
    -
    
    1098
    -                yield request
    
    1099
    -
    
    1100
    -                offset += chunk_size
    
    1101
    -                finished = request.finish_write
    
    1102
    -
    
    1103
    -        response = remote.bytestream.Write(request_stream(resource_name, stream))
    
    1104
    -
    
    1105
    -        assert response.committed_size == digest.size_bytes
    
    1106
    -
    
    1107
    -    def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
    
    1108
    -        required_blobs = self._required_blobs(digest)
    
    1109
    -
    
    1110
    -        missing_blobs = dict()
    
    1111
    -        # Limit size of FindMissingBlobs request
    
    1112
    -        for required_blobs_group in _grouper(required_blobs, 512):
    
    1113
    -            request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=remote.spec.instance_name)
    
    1114
    -
    
    1115
    -            for required_digest in required_blobs_group:
    
    1116
    -                d = request.blob_digests.add()
    
    1117
    -                d.hash = required_digest.hash
    
    1118
    -                d.size_bytes = required_digest.size_bytes
    
    1119
    -
    
    1120
    -            response = remote.cas.FindMissingBlobs(request)
    
    1121
    -            for missing_digest in response.missing_blob_digests:
    
    1122
    -                d = remote_execution_pb2.Digest()
    
    1123
    -                d.hash = missing_digest.hash
    
    1124
    -                d.size_bytes = missing_digest.size_bytes
    
    1125
    -                missing_blobs[d.hash] = d
    
    1126
    -
    
    1127
    -        # Upload any blobs missing on the server
    
    1128
    -        self._send_blobs(remote, missing_blobs.values(), u_uid)
    
    1129
    -
    
    1130
    -    def _send_blobs(self, remote, digests, u_uid=uuid.uuid4()):
    
    1131
    -        batch = _CASBatchUpdate(remote)
    
    1132
    -
    
    1133
    -        for digest in digests:
    
    1134
    -            with open(self.objpath(digest), 'rb') as f:
    
    1135
    -                assert os.fstat(f.fileno()).st_size == digest.size_bytes
    
    1136
    -
    
    1137
    -                if (digest.size_bytes >= remote.max_batch_total_size_bytes or
    
    1138
    -                        not remote.batch_update_supported):
    
    1139
    -                    # Too large for batch request, upload in independent request.
    
    1140
    -                    self._send_blob(remote, digest, f, u_uid=u_uid)
    
    1141
    -                else:
    
    1142
    -                    if not batch.add(digest, f):
    
    1143
    -                        # Not enough space left in batch request.
    
    1144
    -                        # Complete pending batch first.
    
    1145
    -                        batch.send()
    
    1146
    -                        batch = _CASBatchUpdate(remote)
    
    1147
    -                        batch.add(digest, f)
    
    1148
    -
    
    1149
    -        # Send final batch
    
    1150
    -        batch.send()
    
    1151
    -
    
    1152
    -
    
    1153
    -# Represents a single remote CAS cache.
    
    1154
    -#
    
    1155
    -class CASRemote():
    
    1156
    -    def __init__(self, spec):
    
    1157
    -        self.spec = spec
    
    1158
    -        self._initialized = False
    
    1159
    -        self.channel = None
    
    1160
    -        self.bytestream = None
    
    1161
    -        self.cas = None
    
    1162
    -        self.ref_storage = None
    
    1163
    -        self.batch_update_supported = None
    
    1164
    -        self.batch_read_supported = None
    
    1165
    -        self.capabilities = None
    
    1166
    -        self.max_batch_total_size_bytes = None
    
    1167
    -
    
    1168
    -    def init(self):
    
    1169
    -        if not self._initialized:
    
    1170
    -            url = urlparse(self.spec.url)
    
    1171
    -            if url.scheme == 'http':
    
    1172
    -                port = url.port or 80
    
    1173
    -                self.channel = grpc.insecure_channel('{}:{}'.format(url.hostname, port))
    
    1174
    -            elif url.scheme == 'https':
    
    1175
    -                port = url.port or 443
    
    1176
    -
    
    1177
    -                if self.spec.server_cert:
    
    1178
    -                    with open(self.spec.server_cert, 'rb') as f:
    
    1179
    -                        server_cert_bytes = f.read()
    
    1180
    -                else:
    
    1181
    -                    server_cert_bytes = None
    
    1182
    -
    
    1183
    -                if self.spec.client_key:
    
    1184
    -                    with open(self.spec.client_key, 'rb') as f:
    
    1185
    -                        client_key_bytes = f.read()
    
    1186
    -                else:
    
    1187
    -                    client_key_bytes = None
    
    1188
    -
    
    1189
    -                if self.spec.client_cert:
    
    1190
    -                    with open(self.spec.client_cert, 'rb') as f:
    
    1191
    -                        client_cert_bytes = f.read()
    
    1192
    -                else:
    
    1193
    -                    client_cert_bytes = None
    
    1194
    -
    
    1195
    -                credentials = grpc.ssl_channel_credentials(root_certificates=server_cert_bytes,
    
    1196
    -                                                           private_key=client_key_bytes,
    
    1197
    -                                                           certificate_chain=client_cert_bytes)
    
    1198
    -                self.channel = grpc.secure_channel('{}:{}'.format(url.hostname, port), credentials)
    
    1199
    -            else:
    
    1200
    -                raise CASError("Unsupported URL: {}".format(self.spec.url))
    
    1201
    -
    
    1202
    -            self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel)
    
    1203
    -            self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
    
    1204
    -            self.capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self.channel)
    
    1205
    -            self.ref_storage = buildstream_pb2_grpc.ReferenceStorageStub(self.channel)
    
    1206
    -
    
    1207
    -            self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
    
    1208
    -            try:
    
    1209
    -                request = remote_execution_pb2.GetCapabilitiesRequest(instance_name=self.spec.instance_name)
    
    1210
    -                response = self.capabilities.GetCapabilities(request)
    
    1211
    -                server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes
    
    1212
    -                if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes:
    
    1213
    -                    self.max_batch_total_size_bytes = server_max_batch_total_size_bytes
    
    1214
    -            except grpc.RpcError as e:
    
    1215
    -                # Simply use the defaults for servers that don't implement GetCapabilities()
    
    1216
    -                if e.code() != grpc.StatusCode.UNIMPLEMENTED:
    
    1217
    -                    raise
    
    1218
    -
    
    1219
    -            # Check whether the server supports BatchReadBlobs()
    
    1220
    -            self.batch_read_supported = False
    
    1221
    -            try:
    
    1222
    -                request = remote_execution_pb2.BatchReadBlobsRequest(instance_name=self.spec.instance_name)
    
    1223
    -                response = self.cas.BatchReadBlobs(request)
    
    1224
    -                self.batch_read_supported = True
    
    1225
    -            except grpc.RpcError as e:
    
    1226
    -                if e.code() != grpc.StatusCode.UNIMPLEMENTED:
    
    1227
    -                    raise
    
    1228
    -
    
    1229
    -            # Check whether the server supports BatchUpdateBlobs()
    
    1230
    -            self.batch_update_supported = False
    
    1231
    -            try:
    
    1232
    -                request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=self.spec.instance_name)
    
    1233
    -                response = self.cas.BatchUpdateBlobs(request)
    
    1234
    -                self.batch_update_supported = True
    
    1235
    -            except grpc.RpcError as e:
    
    1236
    -                if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
    
    1237
    -                        e.code() != grpc.StatusCode.PERMISSION_DENIED):
    
    1238
    -                    raise
    
    1239
    -
    
    1240
    -            self._initialized = True
    
    1241
    -
    
    1242
    -
    
    1243
    -# Represents a batch of blobs queued for fetching.
    
    1244
    -#
    
    1245
    -class _CASBatchRead():
    
    1246
    -    def __init__(self, remote):
    
    1247
    -        self._remote = remote
    
    1248
    -        self._max_total_size_bytes = remote.max_batch_total_size_bytes
    
    1249
    -        self._request = remote_execution_pb2.BatchReadBlobsRequest(instance_name=remote.spec.instance_name)
    
    1250
    -        self._size = 0
    
    1251
    -        self._sent = False
    
    1252
    -
    
    1253
    -    def add(self, digest):
    
    1254
    -        assert not self._sent
    
    1255
    -
    
    1256
    -        new_batch_size = self._size + digest.size_bytes
    
    1257
    -        if new_batch_size > self._max_total_size_bytes:
    
    1258
    -            # Not enough space left in current batch
    
    1259
    -            return False
    
    1260
    -
    
    1261
    -        request_digest = self._request.digests.add()
    
    1262
    -        request_digest.hash = digest.hash
    
    1263
    -        request_digest.size_bytes = digest.size_bytes
    
    1264
    -        self._size = new_batch_size
    
    1265
    -        return True
    
    1266
    -
    
    1267
    -    def send(self):
    
    1268
    -        assert not self._sent
    
    1269
    -        self._sent = True
    
    1270
    -
    
    1271
    -        if not self._request.digests:
    
    1272
    -            return
    
    1273
    -
    
    1274
    -        batch_response = self._remote.cas.BatchReadBlobs(self._request)
    
    1275
    -
    
    1276
    -        for response in batch_response.responses:
    
    1277
    -            if response.status.code == code_pb2.NOT_FOUND:
    
    1278
    -                raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format(
    
    1279
    -                    response.digest.hash, response.status.code))
    
    1280
    -            if response.status.code != code_pb2.OK:
    
    1281
    -                raise CASError("Failed to download blob {}: {}".format(
    
    1282
    -                    response.digest.hash, response.status.code))
    
    1283
    -            if response.digest.size_bytes != len(response.data):
    
    1284
    -                raise CASError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
    
    1285
    -                    response.digest.hash, response.digest.size_bytes, len(response.data)))
    
    1286
    -
    
    1287
    -            yield (response.digest, response.data)
    
    1288
    -
    
    1289
    -
    
    1290
    -# Represents a batch of blobs queued for upload.
    
    1291
    -#
    
    1292
    -class _CASBatchUpdate():
    
    1293
    -    def __init__(self, remote):
    
    1294
    -        self._remote = remote
    
    1295
    -        self._max_total_size_bytes = remote.max_batch_total_size_bytes
    
    1296
    -        self._request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=remote.spec.instance_name)
    
    1297
    -        self._size = 0
    
    1298
    -        self._sent = False
    
    1299
    -
    
    1300
    -    def add(self, digest, stream):
    
    1301
    -        assert not self._sent
    
    1302
    -
    
    1303
    -        new_batch_size = self._size + digest.size_bytes
    
    1304
    -        if new_batch_size > self._max_total_size_bytes:
    
    1305
    -            # Not enough space left in current batch
    
    1306
    -            return False
    
    1307
    -
    
    1308
    -        blob_request = self._request.requests.add()
    
    1309
    -        blob_request.digest.hash = digest.hash
    
    1310
    -        blob_request.digest.size_bytes = digest.size_bytes
    
    1311
    -        blob_request.data = stream.read(digest.size_bytes)
    
    1312
    -        self._size = new_batch_size
    
    1313
    -        return True
    
    1314
    -
    
    1315
    -    def send(self):
    
    1316
    -        assert not self._sent
    
    1317
    -        self._sent = True
    
    1318
    -
    
    1319
    -        if not self._request.requests:
    
    1320
    -            return
    
    1321
    -
    
    1322
    -        batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
    
    1323
    -
    
    1324
    -        for response in batch_response.responses:
    
    1325
    -            if response.status.code != code_pb2.OK:
    
    1326
    -                raise CASError("Failed to upload blob {}: {}".format(
    
    1327
    -                    response.digest.hash, response.status.code))
    
    1328
    -
    
    1329
    -
    
    1330
    -def _grouper(iterable, n):
    
    1331
    -    while True:
    
    1332
    -        try:
    
    1333
    -            current = next(iterable)
    
    1334
    -        except StopIteration:
    
    1335
    -            return
    
    1336
    -        yield itertools.chain([current], itertools.islice(iterable, n - 1))

  • buildstream/_cas/casremote.py
    1
    +from collections import namedtuple
    
    2
    +import io
    
    3
    +import itertools
    
    4
    +import os
    
    5
    +import multiprocessing
    
    6
    +import signal
    
    7
    +import tempfile
    
    8
    +from urllib.parse import urlparse
    
    9
    +import uuid
    
    10
    +
    
    11
    +import grpc
    
    12
    +
    
    13
    +from .. import _yaml
    
    14
    +from .._protos.google.rpc import code_pb2
    
    15
    +from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    16
    +from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    17
    +from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
    
    18
    +
    
    19
    +from .._exceptions import CASError, LoadError, LoadErrorReason
    
    20
    +from .. import _signals
    
    21
    +from .. import utils
    
    22
    +
    
    23
    +# The default limit for gRPC messages is 4 MiB.
    
    24
    +# Limit payload to 1 MiB to leave sufficient headroom for metadata.
    
    25
    +_MAX_PAYLOAD_BYTES = 1024 * 1024
    
    26
    +
    
    27
    +
    
    28
    +class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key client_cert instance_name')):
    
    29
    +
    
    30
    +    # _new_from_config_node
    
    31
    +    #
    
    32
    +    # Creates an CASRemoteSpec() from a YAML loaded node
    
    33
    +    #
    
    34
    +    @staticmethod
    
    35
    +    def _new_from_config_node(spec_node, basedir=None):
    
    36
    +        _yaml.node_validate(spec_node, ['url', 'push', 'server-cert', 'client-key', 'client-cert', 'instance_name'])
    
    37
    +        url = _yaml.node_get(spec_node, str, 'url')
    
    38
    +        push = _yaml.node_get(spec_node, bool, 'push', default_value=False)
    
    39
    +        if not url:
    
    40
    +            provenance = _yaml.node_get_provenance(spec_node, 'url')
    
    41
    +            raise LoadError(LoadErrorReason.INVALID_DATA,
    
    42
    +                            "{}: empty artifact cache URL".format(provenance))
    
    43
    +
    
    44
    +        instance_name = _yaml.node_get(spec_node, str, 'server-cert', default_value=None)
    
    45
    +
    
    46
    +        server_cert = _yaml.node_get(spec_node, str, 'server-cert', default_value=None)
    
    47
    +        if server_cert and basedir:
    
    48
    +            server_cert = os.path.join(basedir, server_cert)
    
    49
    +
    
    50
    +        client_key = _yaml.node_get(spec_node, str, 'client-key', default_value=None)
    
    51
    +        if client_key and basedir:
    
    52
    +            client_key = os.path.join(basedir, client_key)
    
    53
    +
    
    54
    +        client_cert = _yaml.node_get(spec_node, str, 'client-cert', default_value=None)
    
    55
    +        if client_cert and basedir:
    
    56
    +            client_cert = os.path.join(basedir, client_cert)
    
    57
    +
    
    58
    +        if client_key and not client_cert:
    
    59
    +            provenance = _yaml.node_get_provenance(spec_node, 'client-key')
    
    60
    +            raise LoadError(LoadErrorReason.INVALID_DATA,
    
    61
    +                            "{}: 'client-key' was specified without 'client-cert'".format(provenance))
    
    62
    +
    
    63
    +        if client_cert and not client_key:
    
    64
    +            provenance = _yaml.node_get_provenance(spec_node, 'client-cert')
    
    65
    +            raise LoadError(LoadErrorReason.INVALID_DATA,
    
    66
    +                            "{}: 'client-cert' was specified without 'client-key'".format(provenance))
    
    67
    +
    
    68
    +        return CASRemoteSpec(url, push, server_cert, client_key, client_cert, instance_name)
    
    69
    +
    
    70
    +
    
    71
    +CASRemoteSpec.__new__.__defaults__ = (None, None, None, None)
    
    72
    +
    
    73
    +
    
    74
    +class BlobNotFound(CASError):
    
    75
    +
    
    76
    +    def __init__(self, blob, msg):
    
    77
    +        self.blob = blob
    
    78
    +        super().__init__(msg)
    
    79
    +
    
    80
    +
    
    81
    +# Represents a single remote CAS cache.
    
    82
    +#
    
    83
    +class CASRemote():
    
    84
    +    def __init__(self, spec, tmpdir):
    
    85
    +        self.spec = spec
    
    86
    +        self._initialized = False
    
    87
    +        self.channel = None
    
    88
    +        self.bytestream = None
    
    89
    +        self.cas = None
    
    90
    +        self.ref_storage = None
    
    91
    +        self.batch_update_supported = None
    
    92
    +        self.batch_read_supported = None
    
    93
    +        self.capabilities = None
    
    94
    +        self.max_batch_total_size_bytes = None
    
    95
    +
    
    96
    +        # Need str because python 3.5 and lower doesn't deal with path like
    
    97
    +        # objects here.
    
    98
    +        self.tmpdir = str(tmpdir)
    
    99
    +        os.makedirs(self.tmpdir, exist_ok=True)
    
    100
    +
    
    101
    +        self.__tmp_downloads = []  # files in the tmpdir waiting to be added to local caches
    
    102
    +
    
    103
    +        self.__batch_read = None
    
    104
    +        self.__batch_update = None
    
    105
    +
    
    106
    +    def init(self):
    
    107
    +        if not self._initialized:
    
    108
    +            url = urlparse(self.spec.url)
    
    109
    +            if url.scheme == 'http':
    
    110
    +                port = url.port or 80
    
    111
    +                self.channel = grpc.insecure_channel('{}:{}'.format(url.hostname, port))
    
    112
    +            elif url.scheme == 'https':
    
    113
    +                port = url.port or 443
    
    114
    +
    
    115
    +                if self.spec.server_cert:
    
    116
    +                    with open(self.spec.server_cert, 'rb') as f:
    
    117
    +                        server_cert_bytes = f.read()
    
    118
    +                else:
    
    119
    +                    server_cert_bytes = None
    
    120
    +
    
    121
    +                if self.spec.client_key:
    
    122
    +                    with open(self.spec.client_key, 'rb') as f:
    
    123
    +                        client_key_bytes = f.read()
    
    124
    +                else:
    
    125
    +                    client_key_bytes = None
    
    126
    +
    
    127
    +                if self.spec.client_cert:
    
    128
    +                    with open(self.spec.client_cert, 'rb') as f:
    
    129
    +                        client_cert_bytes = f.read()
    
    130
    +                else:
    
    131
    +                    client_cert_bytes = None
    
    132
    +
    
    133
    +                credentials = grpc.ssl_channel_credentials(root_certificates=server_cert_bytes,
    
    134
    +                                                           private_key=client_key_bytes,
    
    135
    +                                                           certificate_chain=client_cert_bytes)
    
    136
    +                self.channel = grpc.secure_channel('{}:{}'.format(url.hostname, port), credentials)
    
    137
    +            else:
    
    138
    +                raise CASError("Unsupported URL: {}".format(self.spec.url))
    
    139
    +
    
    140
    +            self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel)
    
    141
    +            self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
    
    142
    +            self.capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self.channel)
    
    143
    +            self.ref_storage = buildstream_pb2_grpc.ReferenceStorageStub(self.channel)
    
    144
    +
    
    145
    +            self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
    
    146
    +            try:
    
    147
    +                request = remote_execution_pb2.GetCapabilitiesRequest()
    
    148
    +                response = self.capabilities.GetCapabilities(request)
    
    149
    +                server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes
    
    150
    +                if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes:
    
    151
    +                    self.max_batch_total_size_bytes = server_max_batch_total_size_bytes
    
    152
    +            except grpc.RpcError as e:
    
    153
    +                # Simply use the defaults for servers that don't implement GetCapabilities()
    
    154
    +                if e.code() != grpc.StatusCode.UNIMPLEMENTED:
    
    155
    +                    raise
    
    156
    +
    
    157
    +            # Check whether the server supports BatchReadBlobs()
    
    158
    +            self.batch_read_supported = False
    
    159
    +            try:
    
    160
    +                request = remote_execution_pb2.BatchReadBlobsRequest()
    
    161
    +                response = self.cas.BatchReadBlobs(request)
    
    162
    +                self.batch_read_supported = True
    
    163
    +                self.__batch_read = _CASBatchRead(self)
    
    164
    +            except grpc.RpcError as e:
    
    165
    +                if e.code() != grpc.StatusCode.UNIMPLEMENTED:
    
    166
    +                    raise
    
    167
    +
    
    168
    +            # Check whether the server supports BatchUpdateBlobs()
    
    169
    +            self.batch_update_supported = False
    
    170
    +            try:
    
    171
    +                request = remote_execution_pb2.BatchUpdateBlobsRequest()
    
    172
    +                response = self.cas.BatchUpdateBlobs(request)
    
    173
    +                self.batch_update_supported = True
    
    174
    +                self.__batch_update = _CASBatchUpdate(self)
    
    175
    +            except grpc.RpcError as e:
    
    176
    +                if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
    
    177
    +                        e.code() != grpc.StatusCode.PERMISSION_DENIED):
    
    178
    +                    raise
    
    179
    +
    
    180
    +            self._initialized = True
    
    181
    +
    
    182
    +    # check_remote
    
    183
    +    #
    
    184
    +    # Used when checking whether remote_specs work in the buildstream main
    
    185
    +    # thread, runs this in a seperate process to avoid creation of gRPC threads
    
    186
    +    # in the main BuildStream process
    
    187
    +    # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
    
    188
    +    @classmethod
    
    189
    +    def check_remote(cls, remote_spec, tmpdir, q):
    
    190
    +
    
    191
    +        def __check_remote():
    
    192
    +            try:
    
    193
    +                remote = cls(remote_spec, tmpdir)
    
    194
    +                remote.init()
    
    195
    +
    
    196
    +                request = buildstream_pb2.StatusRequest()
    
    197
    +                response = remote.ref_storage.Status(request)
    
    198
    +
    
    199
    +                if remote_spec.push and not response.allow_updates:
    
    200
    +                    q.put('CAS server does not allow push')
    
    201
    +                else:
    
    202
    +                    # No error
    
    203
    +                    q.put(None)
    
    204
    +
    
    205
    +            except grpc.RpcError as e:
    
    206
    +                # str(e) is too verbose for errors reported to the user
    
    207
    +                q.put(e.details())
    
    208
    +
    
    209
    +            except Exception as e:               # pylint: disable=broad-except
    
    210
    +                # Whatever happens, we need to return it to the calling process
    
    211
    +                #
    
    212
    +                q.put(str(e))
    
    213
    +
    
    214
    +        p = multiprocessing.Process(target=__check_remote)
    
    215
    +
    
    216
    +        try:
    
    217
    +            # Keep SIGINT blocked in the child process
    
    218
    +            with _signals.blocked([signal.SIGINT], ignore=False):
    
    219
    +                p.start()
    
    220
    +
    
    221
    +            error = q.get()
    
    222
    +            p.join()
    
    223
    +        except KeyboardInterrupt:
    
    224
    +            utils._kill_process_tree(p.pid)
    
    225
    +            raise
    
    226
    +
    
    227
    +        return error
    
    228
    +
    
    229
    +    # verify_digest_on_remote():
    
    230
    +    #
    
    231
    +    # Check whether the object is already on the server in which case
    
    232
    +    # there is no need to upload it.
    
    233
    +    #
    
    234
    +    # Args:
    
    235
    +    #     digest (Digest): The object digest.
    
    236
    +    #
    
    237
    +    def verify_digest_on_remote(self, digest):
    
    238
    +        self.init()
    
    239
    +
    
    240
    +        request = remote_execution_pb2.FindMissingBlobsRequest()
    
    241
    +        request.blob_digests.extend([digest])
    
    242
    +
    
    243
    +        response = self.cas.FindMissingBlobs(request)
    
    244
    +        if digest in response.missing_blob_digests:
    
    245
    +            return False
    
    246
    +
    
    247
    +        return True
    
    248
    +
    
    249
    +    # push_message():
    
    250
    +    #
    
    251
    +    # Push the given protobuf message to a remote.
    
    252
    +    #
    
    253
    +    # Args:
    
    254
    +    #     message (Message): A protobuf message to push.
    
    255
    +    #
    
    256
    +    # Raises:
    
    257
    +    #     (CASError): if there was an error
    
    258
    +    #
    
    259
    +    def push_message(self, message):
    
    260
    +
    
    261
    +        message_buffer = message.SerializeToString()
    
    262
    +        message_digest = utils._message_digest(message_buffer)
    
    263
    +
    
    264
    +        self.init()
    
    265
    +
    
    266
    +        with io.BytesIO(message_buffer) as b:
    
    267
    +            self._send_blob(message_digest, b)
    
    268
    +
    
    269
    +        return message_digest
    
    270
    +
    
    271
    +    # get_reference():
    
    272
    +    #
    
    273
    +    # Args:
    
    274
    +    #    ref (str): The ref to request
    
    275
    +    #
    
    276
    +    def get_reference(self, ref):
    
    277
    +        try:
    
    278
    +            self.init()
    
    279
    +
    
    280
    +            request = buildstream_pb2.GetReferenceRequest()
    
    281
    +            request.key = ref
    
    282
    +            return self.ref_storage.GetReference(request).digest
    
    283
    +        except grpc.RpcError as e:
    
    284
    +            if e.code() != grpc.StatusCode.NOT_FOUND:
    
    285
    +                raise CASError("Failed to find ref {}: {}".format(ref, e)) from e
    
    286
    +            else:
    
    287
    +                return None
    
    288
    +
    
    289
    +    # update_reference():
    
    290
    +    #
    
    291
    +    # Args:
    
    292
    +    #    ref (str): Reference to update
    
    293
    +    #    digest (Digest): New digest to update ref with
    
    294
    +    def update_reference(self, ref, digest):
    
    295
    +        request = buildstream_pb2.UpdateReferenceRequest()
    
    296
    +        request.keys.append(ref)
    
    297
    +        request.digest.hash = digest.hash
    
    298
    +        request.digest.size_bytes = digest.size_bytes
    
    299
    +        self.ref_storage.UpdateReference(request)
    
    300
    +
    
    301
    +    def get_tree_blob(self, tree_digest):
    
    302
    +        self.init()
    
    303
    +        f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
    
    304
    +        self._fetch_blob(tree_digest, f)
    
    305
    +
    
    306
    +        tree = remote_execution_pb2.Tree()
    
    307
    +        with open(f.name, 'rb') as tmp:
    
    308
    +            tree.ParseFromString(tmp.read())
    
    309
    +
    
    310
    +        return tree
    
    311
    +
    
    312
    +    # yield_directory_digests():
    
    313
    +    #
    
    314
    +    # Iterate over blobs digests starting from a root digest
    
    315
    +    #
    
    316
    +    # Args:
    
    317
    +    #     root_digest (digest): The root_digest to get a tree of
    
    318
    +    #     progress (callable): The progress callback, if any
    
    319
    +    #     subdir (str): The optional specific subdir to pull
    
    320
    +    #     excluded_subdirs (list): The optional list of subdirs to not pull
    
    321
    +    #
    
    322
    +    # Returns:
    
    323
    +    #     (iter digests): recursively iterates over digests contained in root directory
    
    324
    +    #
    
    325
    +    def yield_directory_digests(self, root_digest, *, progress=None,
    
    326
    +                                subdir=None, excluded_subdirs=None):
    
    327
    +        self.init()
    
    328
    +
    
    329
    +        # TODO add subdir and progress stuff?
    
    330
    +        # Fetch artifact, excluded_subdirs determined in pullqueue
    
    331
    +        yield from self._yield_directory_digests(root_digest, excluded_subdirs=excluded_subdirs)
    
    332
    +
    
    333
    +    # yield_tree_digests():
    
    334
    +    #
    
    335
    +    # Fetches a tree file from digests and then iterates over child digests
    
    336
    +    #
    
    337
    +    # Args:
    
    338
    +    #     tree_digest (digest): tree digest
    
    339
    +    #
    
    340
    +    # Returns:
    
    341
    +    #     (iter digests): iterates over digests in tree message
    
    342
    +    def yield_tree_digests(self, tree_digest):
    
    343
    +        self.init()
    
    344
    +
    
    345
    +        # get tree file
    
    346
    +        f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
    
    347
    +        self._fetch_blob(tree_digest, f)
    
    348
    +        tree = remote_execution_pb2.Tree()
    
    349
    +        tree.ParseFromString(f.read())
    
    350
    +
    
    351
    +        tree.children.extend([tree.root])
    
    352
    +        for directory in tree.children:
    
    353
    +            for filenode in directory.files:
    
    354
    +                yield filenode.digest
    
    355
    +
    
    356
    +            # add the directory to downloaded tmp files to be added
    
    357
    +            f2 = tempfile.NamedTemporaryFile(dir=self.tmpdir)
    
    358
    +            f2.write(directory.SerializeToString())
    
    359
    +            self.__tmp_downloads.append(f2)
    
    360
    +
    
    361
    +        # Add the tree directory to downloads right at the end
    
    362
    +        self.__tmp_downloads.append(f)
    
    363
    +
    
    364
    +    # request_blob():
    
    365
    +    #
    
    366
    +    # Request blob, triggering download depending via bytestream or cas
    
    367
    +    # BatchReadBlobs depending on size.
    
    368
    +    #
    
    369
    +    # Args:
    
    370
    +    #    digest (Digest): digest of the requested blob
    
    371
    +    #
    
    372
    +    def request_blob(self, digest):
    
    373
    +        if (not self.batch_read_supported or
    
    374
    +                digest.size_bytes > self.max_batch_total_size_bytes):
    
    375
    +            f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
    
    376
    +            self._fetch_blob(digest, f)
    
    377
    +            self.__tmp_downloads.append(f)
    
    378
    +        elif self.__batch_read.add(digest) is False:
    
    379
    +            self._download_batch()
    
    380
    +            self.__batch_read.add(digest)
    
    381
    +
    
    382
    +    # get_blobs():
    
    383
    +    #
    
    384
    +    # Yield over downloaded blobs in the tmp file locations, causing the files
    
    385
    +    # to be deleted once they go out of scope.
    
    386
    +    #
    
    387
    +    # Args:
    
    388
    +    #    complete_batch (bool): download any outstanding batch read request
    
    389
    +    #
    
    390
    +    # Returns:
    
    391
    +    #    iterator over NamedTemporaryFile
    
    392
    +    def get_blobs(self, complete_batch=False):
    
    393
    +        # Send read batch request and download
    
    394
    +        if (complete_batch is True and
    
    395
    +                self.batch_read_supported is True):
    
    396
    +            self._download_batch()
    
    397
    +
    
    398
    +        while self.__tmp_downloads:
    
    399
    +            yield self.__tmp_downloads.pop()
    
    400
    +
    
    401
    +    # upload_blob():
    
    402
    +    #
    
    403
    +    # Push blobs given an iterator over blob files
    
    404
    +    #
    
    405
    +    def upload_blob(self, digest, blob_file, u_uid=uuid.uuid4(), final=False):
    
    406
    +        with open(blob_file, 'rb') as f:
    
    407
    +            assert os.fstat(f.fileno()).st_size == digest.size_bytes
    
    408
    +
    
    409
    +            if (digest.size_bytes >= self.max_batch_total_size_bytes or
    
    410
    +                    not self.batch_update_supported):
    
    411
    +                # Too large for batch request, upload in independent request.
    
    412
    +                self._send_blob(digest, f, u_uid=u_uid)
    
    413
    +            else:
    
    414
    +                if self.__batch_update.add(digest, f) is False:
    
    415
    +                    self.__batch_update.send()
    
    416
    +                    self.__batch_update = _CASBatchUpdate(self)
    
    417
    +                    self.__batch_update.add(digest, f)
    
    418
    +
    
    419
    +    def send_update_batch(self):
    
    420
    +        # make sure everything is sent
    
    421
    +        self.__batch_update.send()
    
    422
    +        self.__batch_update = _CASBatchUpdate(self)
    
    423
    +
    
    424
    +    # find_missing_blobs()
    
    425
    +    #
    
    426
    +    # Does FindMissingBlobs request to remote
    
    427
    +    #
    
    428
    +    # Args:
    
    429
    +    #    required_blobs ([Digest]): list of blobs required
    
    430
    +    #    u_uid (str): uuid4
    
    431
    +    #
    
    432
    +    # Returns:
    
    433
    +    #    (Dict(Digest)): missing blobs
    
    434
    +    def find_missing_blobs(self, required_blobs, u_uid=uuid.uuid4()):
    
    435
    +        self.init()
    
    436
    +        missing_blobs = dict()
    
    437
    +        # Limit size of FindMissingBlobs request
    
    438
    +        for required_blobs_group in _grouper(required_blobs, 512):
    
    439
    +            request = remote_execution_pb2.FindMissingBlobsRequest()
    
    440
    +
    
    441
    +            for required_digest in required_blobs_group:
    
    442
    +                d = request.blob_digests.add()
    
    443
    +                d.hash = required_digest.hash
    
    444
    +                d.size_bytes = required_digest.size_bytes
    
    445
    +
    
    446
    +            response = self.cas.FindMissingBlobs(request)
    
    447
    +            for missing_digest in response.missing_blob_digests:
    
    448
    +                d = remote_execution_pb2.Digest()
    
    449
    +                d.hash = missing_digest.hash
    
    450
    +                d.size_bytes = missing_digest.size_bytes
    
    451
    +                missing_blobs[d.hash] = d
    
    452
    +
    
    453
    +        return missing_blobs
    
    454
    +
    
    455
    +    ################################################
    
    456
    +    #             Local Private Methods            #
    
    457
    +    ################################################
    
    458
    +    def _fetch_blob(self, digest, stream):
    
    459
    +        resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
    
    460
    +        request = bytestream_pb2.ReadRequest()
    
    461
    +        request.resource_name = resource_name
    
    462
    +        request.read_offset = 0
    
    463
    +        for response in self.bytestream.Read(request):
    
    464
    +            stream.write(response.data)
    
    465
    +        stream.flush()
    
    466
    +
    
    467
    +        assert digest.size_bytes == os.fstat(stream.fileno()).st_size
    
    468
    +
    
    469
    +    # _yield_directory_digests():
    
    470
    +    #
    
    471
    +    # Fetches remote directory and adds it to content addressable store.
    
    472
    +    #
    
    473
    +    # Fetches files, symbolic links and recursively other directories in
    
    474
    +    # the remote directory and adds them to the content addressable
    
    475
    +    # store.
    
    476
    +    #
    
    477
    +    # Args:
    
    478
    +    #     dir_digest (Digest): Digest object for the directory to fetch.
    
    479
    +    #     excluded_subdirs (list): The optional list of subdirs to not fetch
    
    480
    +    #
    
    481
    +    def _yield_directory_digests(self, dir_digest, *, excluded_subdirs=None):
    
    482
    +
    
    483
    +        if excluded_subdirs is None:
    
    484
    +            excluded_subdirs = []
    
    485
    +
    
    486
    +        # get directory blob
    
    487
    +        f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
    
    488
    +        self._fetch_blob(dir_digest, f)
    
    489
    +
    
    490
    +        # need to read in directory structure to iterate over it
    
    491
    +        directory = remote_execution_pb2.Directory()
    
    492
    +        with open(f.name, 'rb') as tmp:
    
    493
    +            directory.ParseFromString(tmp.read())
    
    494
    +
    
    495
    +        yield dir_digest
    
    496
    +        for filenode in directory.files:
    
    497
    +            yield filenode.digest
    
    498
    +        for dirnode in directory.directories:
    
    499
    +            if dirnode.name not in excluded_subdirs:
    
    500
    +                yield from self._yield_directory_digests(dirnode.digest)
    
    501
    +
    
    502
    +    def _send_blob(self, digest, stream, u_uid=uuid.uuid4()):
    
    503
    +        resource_name = '/'.join(['uploads', str(u_uid), 'blobs',
    
    504
    +                                  digest.hash, str(digest.size_bytes)])
    
    505
    +
    
    506
    +        def request_stream(resname, instream):
    
    507
    +            offset = 0
    
    508
    +            finished = False
    
    509
    +            remaining = digest.size_bytes
    
    510
    +            while not finished:
    
    511
    +                chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
    
    512
    +                remaining -= chunk_size
    
    513
    +
    
    514
    +                request = bytestream_pb2.WriteRequest()
    
    515
    +                request.write_offset = offset
    
    516
    +                # max. _MAX_PAYLOAD_BYTES chunks
    
    517
    +                request.data = instream.read(chunk_size)
    
    518
    +                request.resource_name = resname
    
    519
    +                request.finish_write = remaining <= 0
    
    520
    +
    
    521
    +                yield request
    
    522
    +
    
    523
    +                offset += chunk_size
    
    524
    +                finished = request.finish_write
    
    525
    +
    
    526
    +        response = self.bytestream.Write(request_stream(resource_name, stream))
    
    527
    +
    
    528
    +        assert response.committed_size == digest.size_bytes
    
    529
    +
    
    530
    +    def _download_batch(self):
    
    531
    +        for _, data in self.__batch_read.send():
    
    532
    +            f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
    
    533
    +            f.write(data)
    
    534
    +            f.flush()
    
    535
    +            self.__tmp_downloads.append(f)
    
    536
    +
    
    537
    +        self.__batch_read = _CASBatchRead(self)
    
    538
    +
    
    539
    +
    
    540
    +def _grouper(iterable, n):
    
    541
    +    while True:
    
    542
    +        try:
    
    543
    +            current = next(iterable)
    
    544
    +        except StopIteration:
    
    545
    +            return
    
    546
    +        yield itertools.chain([current], itertools.islice(iterable, n - 1))
    
    547
    +
    
    548
    +
    
    549
    +# Represents a batch of blobs queued for fetching.
    
    550
    +#
    
    551
    +class _CASBatchRead():
    
    552
    +    def __init__(self, remote):
    
    553
    +        self._remote = remote
    
    554
    +        self._max_total_size_bytes = remote.max_batch_total_size_bytes
    
    555
    +        self._request = remote_execution_pb2.BatchReadBlobsRequest()
    
    556
    +        self._size = 0
    
    557
    +        self._sent = False
    
    558
    +
    
    559
    +    def add(self, digest):
    
    560
    +        assert not self._sent
    
    561
    +
    
    562
    +        new_batch_size = self._size + digest.size_bytes
    
    563
    +        if new_batch_size > self._max_total_size_bytes:
    
    564
    +            # Not enough space left in current batch
    
    565
    +            return False
    
    566
    +
    
    567
    +        request_digest = self._request.digests.add()
    
    568
    +        request_digest.hash = digest.hash
    
    569
    +        request_digest.size_bytes = digest.size_bytes
    
    570
    +        self._size = new_batch_size
    
    571
    +        return True
    
    572
    +
    
    573
    +    def send(self):
    
    574
    +        assert not self._sent
    
    575
    +        self._sent = True
    
    576
    +
    
    577
    +        if not self._request.digests:
    
    578
    +            return
    
    579
    +
    
    580
    +        batch_response = self._remote.cas.BatchReadBlobs(self._request)
    
    581
    +
    
    582
    +        for response in batch_response.responses:
    
    583
    +            if response.status.code == code_pb2.NOT_FOUND:
    
    584
    +                raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format(
    
    585
    +                    response.digest.hash, response.status.code))
    
    586
    +            if response.status.code != code_pb2.OK:
    
    587
    +                raise CASError("Failed to download blob {}: {}".format(
    
    588
    +                    response.digest.hash, response.status.code))
    
    589
    +            if response.digest.size_bytes != len(response.data):
    
    590
    +                raise CASError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
    
    591
    +                    response.digest.hash, response.digest.size_bytes, len(response.data)))
    
    592
    +
    
    593
    +            yield (response.digest, response.data)
    
    594
    +
    
    595
    +
    
    596
    +# Represents a batch of blobs queued for upload.
    
    597
    +#
    
    598
    +class _CASBatchUpdate():
    
    599
    +    def __init__(self, remote):
    
    600
    +        self._remote = remote
    
    601
    +        self._max_total_size_bytes = remote.max_batch_total_size_bytes
    
    602
    +        self._request = remote_execution_pb2.BatchUpdateBlobsRequest()
    
    603
    +        self._size = 0
    
    604
    +        self._sent = False
    
    605
    +
    
    606
    +    def add(self, digest, stream):
    
    607
    +        assert not self._sent
    
    608
    +
    
    609
    +        new_batch_size = self._size + digest.size_bytes
    
    610
    +        if new_batch_size > self._max_total_size_bytes:
    
    611
    +            # Not enough space left in current batch
    
    612
    +            return False
    
    613
    +
    
    614
    +        blob_request = self._request.requests.add()
    
    615
    +        blob_request.digest.hash = digest.hash
    
    616
    +        blob_request.digest.size_bytes = digest.size_bytes
    
    617
    +        blob_request.data = stream.read(digest.size_bytes)
    
    618
    +        self._size = new_batch_size
    
    619
    +        return True
    
    620
    +
    
    621
    +    def send(self):
    
    622
    +        assert not self._sent
    
    623
    +        self._sent = True
    
    624
    +
    
    625
    +        if not self._request.requests:
    
    626
    +            return
    
    627
    +
    
    628
    +        batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
    
    629
    +
    
    630
    +        for response in batch_response.responses:
    
    631
    +            if response.status.code != code_pb2.OK:
    
    632
    +                raise CASError("Failed to upload blob {}: {}".format(
    
    633
    +                    response.digest.hash, response.status.code))

  • buildstream/_artifactcache/casserver.pybuildstream/_cas/casserver.py

  • buildstream/_context.py
    ... ... @@ -31,7 +31,7 @@ from ._exceptions import LoadError, LoadErrorReason, BstError
    31 31
     from ._message import Message, MessageType
    
    32 32
     from ._profile import Topics, profile_start, profile_end
    
    33 33
     from ._artifactcache import ArtifactCache
    
    34
    -from ._artifactcache.cascache import CASCache
    
    34
    +from ._cas.cascache import CASCache
    
    35 35
     from ._workspaces import Workspaces, WorkspaceProjectCache, WORKSPACE_PROJECT_FILE
    
    36 36
     from .plugin import _plugin_lookup
    
    37 37
     
    
    ... ... @@ -187,10 +187,11 @@ class Context():
    187 187
             _yaml.node_validate(defaults, [
    
    188 188
                 'sourcedir', 'builddir', 'artifactdir', 'logdir',
    
    189 189
                 'scheduler', 'artifacts', 'logging', 'projects',
    
    190
    -            'cache', 'prompt', 'workspacedir',
    
    190
    +            'cache', 'prompt', 'workspacedir', 'tmpdir'
    
    191 191
             ])
    
    192 192
     
    
    193
    -        for directory in ['sourcedir', 'builddir', 'artifactdir', 'logdir', 'workspacedir']:
    
    193
    +        for directory in ['sourcedir', 'builddir', 'artifactdir', 'logdir',
    
    194
    +                          'tmpdir', 'workspacedir']:
    
    194 195
                 # Allow the ~ tilde expansion and any environment variables in
    
    195 196
                 # path specification in the config files.
    
    196 197
                 #
    

  • buildstream/data/userconfig.yaml
    ... ... @@ -19,6 +19,9 @@ builddir: ${XDG_CACHE_HOME}/buildstream/build
    19 19
     # Location to store local binary artifacts
    
    20 20
     artifactdir: ${XDG_CACHE_HOME}/buildstream/artifacts
    
    21 21
     
    
    22
    +# tmp directory, used by casremote
    
    23
    +tmpdir: ${XDG_CACHE_HOME}/buildstream/tmp
    
    24
    +
    
    22 25
     # Location to store build logs
    
    23 26
     logdir: ${XDG_CACHE_HOME}/buildstream/logs
    
    24 27
     
    

  • buildstream/sandbox/_sandboxremote.py
    ... ... @@ -38,7 +38,7 @@ from .._protos.google.rpc import code_pb2
    38 38
     from .._exceptions import SandboxError
    
    39 39
     from .. import _yaml
    
    40 40
     from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc
    
    41
    -from .._artifactcache.cascache import CASRemote, CASRemoteSpec
    
    41
    +from .._cas.casremote import CASRemote, CASRemoteSpec
    
    42 42
     
    
    43 43
     
    
    44 44
     class RemoteExecutionSpec(namedtuple('RemoteExecutionSpec', 'exec_service storage_service action_service')):
    
    ... ... @@ -244,7 +244,7 @@ class SandboxRemote(Sandbox):
    244 244
     
    
    245 245
             context = self._get_context()
    
    246 246
             cascache = context.get_cascache()
    
    247
    -        casremote = CASRemote(self.storage_remote_spec)
    
    247
    +        casremote = CASRemote(self.storage_remote_spec, context.tmpdir)
    
    248 248
     
    
    249 249
             # Now do a pull to ensure we have the necessary parts.
    
    250 250
             dir_digest = cascache.pull_tree(casremote, tree_digest)
    
    ... ... @@ -303,7 +303,7 @@ class SandboxRemote(Sandbox):
    303 303
             action_result = self._check_action_cache(action_digest)
    
    304 304
     
    
    305 305
             if not action_result:
    
    306
    -            casremote = CASRemote(self.storage_remote_spec)
    
    306
    +            casremote = CASRemote(self.storage_remote_spec, self._get_context().tmpdir)
    
    307 307
     
    
    308 308
                 # Now, push that key (without necessarily needing a ref) to the remote.
    
    309 309
                 try:
    
    ... ... @@ -311,17 +311,17 @@ class SandboxRemote(Sandbox):
    311 311
                 except grpc.RpcError as e:
    
    312 312
                     raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
    
    313 313
     
    
    314
    -            if not cascache.verify_digest_on_remote(casremote, upload_vdir.ref):
    
    314
    +            if not casremote.verify_digest_on_remote(upload_vdir.ref):
    
    315 315
                     raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
    
    316 316
     
    
    317 317
                 # Push command and action
    
    318 318
                 try:
    
    319
    -                cascache.push_message(casremote, command_proto)
    
    319
    +                casremote.push_message(command_proto)
    
    320 320
                 except grpc.RpcError as e:
    
    321 321
                     raise SandboxError("Failed to push command to remote: {}".format(e))
    
    322 322
     
    
    323 323
                 try:
    
    324
    -                cascache.push_message(casremote, action)
    
    324
    +                casremote.push_message(action)
    
    325 325
                 except grpc.RpcError as e:
    
    326 326
                     raise SandboxError("Failed to push action to remote: {}".format(e))
    
    327 327
     
    

  • doc/source/using_configuring_artifact_server.rst
    ... ... @@ -94,7 +94,7 @@ requiring BuildStream's more exigent dependencies by setting the
    94 94
     Command reference
    
    95 95
     ~~~~~~~~~~~~~~~~~
    
    96 96
     
    
    97
    -.. click:: buildstream._artifactcache.casserver:server_main
    
    97
    +.. click:: buildstream._cas.casserver:server_main
    
    98 98
        :prog: bst-artifact-server
    
    99 99
     
    
    100 100
     
    

  • requirements/Makefile
    1
    +# Makefile for updating BuildStream's requirements files.
    
    2
    +#
    
    3
    +
    
    4
    +REQUIREMENTS_IN := $(wildcard *.in)
    
    5
    +REQUIREMENTS_TXT := $(REQUIREMENTS_IN:.in=.txt)
    
    6
    +PYTHON := python3
    
    7
    +VENV := $(PYTHON) -m venv
    
    8
    +
    
    9
    +VENV_PIP = $(VENVDIR)/bin/pip
    
    10
    +
    
    11
    +
    
    12
    +.PHONY: all
    
    13
    +
    
    14
    +all: $(REQUIREMENTS_TXT)
    
    15
    +
    
    16
    +%.txt: %.in
    
    17
    +	$(eval VENVDIR := $(shell mktemp -d $(CURDIR)/.bst-venv.XXXXXX))
    
    18
    +	$(VENV) $(VENVDIR)
    
    19
    +	$(VENV_PIP) install -r $^
    
    20
    +	$(VENV_PIP) freeze -r $^ > $@
    
    21
    +	rm -rf $(VENVDIR)

  • tools/dev-requirements.inrequirements/dev-requirements.in

  • tools/dev-requirements.txtrequirements/dev-requirements.txt

  • tools/plugin-requirements.inrequirements/plugin-requirements.in

  • tools/plugin-requirements.txtrequirements/plugin-requirements.txt

  • tools/requirements.inrequirements/requirements.in

  • tools/requirements.txtrequirements/requirements.txt

  • setup.py
    ... ... @@ -270,10 +270,10 @@ def get_cmdclass():
    270 270
     #####################################################
    
    271 271
     #               Gather requirements                 #
    
    272 272
     #####################################################
    
    273
    -with open('tools/dev-requirements.in') as dev_reqs:
    
    273
    +with open('requirements/dev-requirements.in') as dev_reqs:
    
    274 274
         dev_requires = dev_reqs.read().splitlines()
    
    275 275
     
    
    276
    -with open('tools/requirements.in') as install_reqs:
    
    276
    +with open('requirements/requirements.in') as install_reqs:
    
    277 277
         install_requires = install_reqs.read().splitlines()
    
    278 278
     
    
    279 279
     #####################################################
    

  • tests/artifactcache/config.py
    ... ... @@ -3,8 +3,7 @@ import pytest
    3 3
     import itertools
    
    4 4
     import os
    
    5 5
     
    
    6
    -from buildstream._artifactcache import ArtifactCacheSpec
    
    7
    -from buildstream._artifactcache.artifactcache import _configured_remote_artifact_cache_specs
    
    6
    +from buildstream._artifactcache import ArtifactCacheSpec, _configured_remote_artifact_cache_specs
    
    8 7
     from buildstream._context import Context
    
    9 8
     from buildstream._project import Project
    
    10 9
     from buildstream.utils import _deduplicate
    

  • tests/artifactcache/expiry.py
    ... ... @@ -342,13 +342,13 @@ def test_invalid_cache_quota(cli, datafiles, tmpdir, quota, success):
    342 342
             total_space = 10000
    
    343 343
     
    
    344 344
         volume_space_patch = mock.patch(
    
    345
    -        "buildstream._artifactcache.artifactcache.ArtifactCache._get_volume_space_info_for",
    
    345
    +        "buildstream._artifactcache.ArtifactCache._get_volume_space_info_for",
    
    346 346
             autospec=True,
    
    347 347
             return_value=(free_space, total_space),
    
    348 348
         )
    
    349 349
     
    
    350 350
         cache_size_patch = mock.patch(
    
    351
    -        "buildstream._artifactcache.artifactcache.ArtifactCache.get_cache_size",
    
    351
    +        "buildstream._artifactcache.ArtifactCache.get_cache_size",
    
    352 352
             autospec=True,
    
    353 353
             return_value=0,
    
    354 354
         )
    

  • tests/artifactcache/pull.py
    ... ... @@ -110,7 +110,7 @@ def test_pull(cli, tmpdir, datafiles):
    110 110
             # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
    
    111 111
             process = multiprocessing.Process(target=_queue_wrapper,
    
    112 112
                                               args=(_test_pull, queue, user_config_file, project_dir,
    
    113
    -                                                artifact_dir, 'target.bst', element_key))
    
    113
    +                                                artifact_dir, tmpdir, 'target.bst', element_key))
    
    114 114
     
    
    115 115
             try:
    
    116 116
                 # Keep SIGINT blocked in the child process
    
    ... ... @@ -127,13 +127,14 @@ def test_pull(cli, tmpdir, datafiles):
    127 127
             assert cas.contains(element, element_key)
    
    128 128
     
    
    129 129
     
    
    130
    -def _test_pull(user_config_file, project_dir, artifact_dir,
    
    130
    +def _test_pull(user_config_file, project_dir, artifact_dir, tmpdir,
    
    131 131
                    element_name, element_key, queue):
    
    132 132
         # Fake minimal context
    
    133 133
         context = Context()
    
    134 134
         context.load(config=user_config_file)
    
    135 135
         context.artifactdir = artifact_dir
    
    136 136
         context.set_message_handler(message_handler)
    
    137
    +    context.tmpdir = tmpdir
    
    137 138
     
    
    138 139
         # Load the project manually
    
    139 140
         project = Project(project_dir, context)
    
    ... ... @@ -218,7 +219,7 @@ def test_pull_tree(cli, tmpdir, datafiles):
    218 219
             # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
    
    219 220
             process = multiprocessing.Process(target=_queue_wrapper,
    
    220 221
                                               args=(_test_push_tree, queue, user_config_file, project_dir,
    
    221
    -                                                artifact_dir, artifact_digest))
    
    222
    +                                                artifact_dir, tmpdir, artifact_digest))
    
    222 223
     
    
    223 224
             try:
    
    224 225
                 # Keep SIGINT blocked in the child process
    
    ... ... @@ -246,7 +247,7 @@ def test_pull_tree(cli, tmpdir, datafiles):
    246 247
             # Use subprocess to avoid creation of gRPC threads in main BuildStream process
    
    247 248
             process = multiprocessing.Process(target=_queue_wrapper,
    
    248 249
                                               args=(_test_pull_tree, queue, user_config_file, project_dir,
    
    249
    -                                                artifact_dir, tree_digest))
    
    250
    +                                                artifact_dir, tmpdir, tree_digest))
    
    250 251
     
    
    251 252
             try:
    
    252 253
                 # Keep SIGINT blocked in the child process
    
    ... ... @@ -268,12 +269,14 @@ def test_pull_tree(cli, tmpdir, datafiles):
    268 269
             assert os.path.exists(cas.objpath(directory_digest))
    
    269 270
     
    
    270 271
     
    
    271
    -def _test_push_tree(user_config_file, project_dir, artifact_dir, artifact_digest, queue):
    
    272
    +def _test_push_tree(user_config_file, project_dir, artifact_dir, tmpdir,
    
    273
    +                    artifact_digest, queue):
    
    272 274
         # Fake minimal context
    
    273 275
         context = Context()
    
    274 276
         context.load(config=user_config_file)
    
    275 277
         context.artifactdir = artifact_dir
    
    276 278
         context.set_message_handler(message_handler)
    
    279
    +    context.tmpdir
    
    277 280
     
    
    278 281
         # Load the project manually
    
    279 282
         project = Project(project_dir, context)
    
    ... ... @@ -304,12 +307,14 @@ def _test_push_tree(user_config_file, project_dir, artifact_dir, artifact_digest
    304 307
             queue.put("No remote configured")
    
    305 308
     
    
    306 309
     
    
    307
    -def _test_pull_tree(user_config_file, project_dir, artifact_dir, artifact_digest, queue):
    
    310
    +def _test_pull_tree(user_config_file, project_dir, artifact_dir, tmpdir,
    
    311
    +                    artifact_digest, queue):
    
    308 312
         # Fake minimal context
    
    309 313
         context = Context()
    
    310 314
         context.load(config=user_config_file)
    
    311 315
         context.artifactdir = artifact_dir
    
    312 316
         context.set_message_handler(message_handler)
    
    317
    +    context.tmpdir = tmpdir
    
    313 318
     
    
    314 319
         # Load the project manually
    
    315 320
         project = Project(project_dir, context)
    

  • tests/sandboxes/storage-tests.py
    ... ... @@ -3,7 +3,7 @@ import pytest
    3 3
     
    
    4 4
     from buildstream._exceptions import ErrorDomain
    
    5 5
     
    
    6
    -from buildstream._artifactcache.cascache import CASCache
    
    6
    +from buildstream._cas.cascache import CASCache
    
    7 7
     from buildstream.storage._casbaseddirectory import CasBasedDirectory
    
    8 8
     from buildstream.storage._filebaseddirectory import FileBasedDirectory
    
    9 9
     
    

  • tests/storage/virtual_directory_import.py
    ... ... @@ -8,7 +8,7 @@ from tests.testutils import cli
    8 8
     from buildstream.storage._casbaseddirectory import CasBasedDirectory
    
    9 9
     from buildstream.storage._filebaseddirectory import FileBasedDirectory
    
    10 10
     from buildstream._artifactcache import ArtifactCache
    
    11
    -from buildstream._artifactcache.cascache import CASCache
    
    11
    +from buildstream._cas.cascache import CASCache
    
    12 12
     from buildstream import utils
    
    13 13
     
    
    14 14
     
    

  • tests/testutils/artifactshare.py
    ... ... @@ -11,8 +11,8 @@ from multiprocessing import Process, Queue
    11 11
     import pytest_cov
    
    12 12
     
    
    13 13
     from buildstream import _yaml
    
    14
    -from buildstream._artifactcache.cascache import CASCache
    
    15
    -from buildstream._artifactcache.casserver import create_server
    
    14
    +from buildstream._cas.cascache import CASCache
    
    15
    +from buildstream._cas.casserver import create_server
    
    16 16
     from buildstream._exceptions import CASError
    
    17 17
     from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    18 18
     
    

  • tests/testutils/runcli.py
    ... ... @@ -509,7 +509,8 @@ def cli_integration(tmpdir, integration_cache):
    509 509
         # to avoid downloading the huge base-sdk repeatedly
    
    510 510
         fixture.configure({
    
    511 511
             'sourcedir': os.path.join(integration_cache, 'sources'),
    
    512
    -        'artifactdir': os.path.join(integration_cache, 'artifacts')
    
    512
    +        'artifactdir': os.path.join(integration_cache, 'artifacts'),
    
    513
    +        'tmpdir': os.path.join(integration_cache, 'tmp')
    
    513 514
         })
    
    514 515
     
    
    515 516
         return fixture
    
    ... ... @@ -556,6 +557,8 @@ def configured(directory, config=None):
    556 557
             config['builddir'] = os.path.join(directory, 'build')
    
    557 558
         if not config.get('artifactdir', False):
    
    558 559
             config['artifactdir'] = os.path.join(directory, 'artifacts')
    
    560
    +    if not config.get('tmpdir', False):
    
    561
    +        config['tmpdir'] = os.path.join(directory, 'tmp')
    
    559 562
         if not config.get('logdir', False):
    
    560 563
             config['logdir'] = os.path.join(directory, 'logs')
    
    561 564
     
    

  • tests/utils/misc.py
    ... ... @@ -23,7 +23,7 @@ def test_parse_size_over_1024T(cli, tmpdir):
    23 23
         _yaml.dump({'name': 'main'}, str(project.join("project.conf")))
    
    24 24
     
    
    25 25
         volume_space_patch = mock.patch(
    
    26
    -        "buildstream._artifactcache.artifactcache.ArtifactCache._get_volume_space_info_for",
    
    26
    +        "buildstream._artifactcache.ArtifactCache._get_volume_space_info_for",
    
    27 27
             autospec=True,
    
    28 28
             return_value=(1025 * TiB, 1025 * TiB)
    
    29 29
         )
    

  • tox.ini
    ... ... @@ -5,9 +5,9 @@ skip_missing_interpreters = true
    5 5
     [testenv]
    
    6 6
     commands = pytest {posargs}
    
    7 7
     deps =
    
    8
    -    -rtools/requirements.txt
    
    9
    -    -rtools/dev-requirements.txt
    
    10
    -    -rtools/plugin-requirements.txt
    
    8
    +    -rrequirements/requirements.txt
    
    9
    +    -rrequirements/dev-requirements.txt
    
    10
    +    -rrequirements/plugin-requirements.txt
    
    11 11
     passenv =
    
    12 12
         BST_FORCE_BACKEND
    
    13 13
         GI_TYPELIB_PATH
    
    ... ... @@ -18,9 +18,9 @@ commands =
    18 18
         pycodestyle
    
    19 19
         pylint buildstream
    
    20 20
     deps =
    
    21
    -    -rtools/requirements.txt
    
    22
    -    -rtools/dev-requirements.txt
    
    23
    -    -rtools/plugin-requirements.txt
    
    21
    +    -rrequirements/requirements.txt
    
    22
    +    -rrequirements/dev-requirements.txt
    
    23
    +    -rrequirements/plugin-requirements.txt
    
    24 24
     
    
    25 25
     [testenv:docs]
    
    26 26
     commands =
    
    ... ... @@ -30,8 +30,8 @@ deps =
    30 30
         sphinx==1.7.9
    
    31 31
         sphinx-click
    
    32 32
         sphinx_rtd_theme
    
    33
    -    -rtools/requirements.txt
    
    34
    -    -rtools/plugin-requirements.txt
    
    33
    +    -rrequirements/requirements.txt
    
    34
    +    -rrequirements/plugin-requirements.txt
    
    35 35
     passenv =
    
    36 36
         BST_FORCE_SESSION_REBUILD
    
    37 37
         BST_SOURCE_CACHE
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]