[Notes] [Git][BuildStream/buildstream][jmac/remote_execution_client] 14 commits: Add "remote-execution" project configuration option.



Title: GitLab

Martin Blanchard pushed to branch jmac/remote_execution_client at BuildStream / buildstream

Commits:

21 changed files:

Changes:

  • buildstream/_project.py
    ... ... @@ -128,6 +128,7 @@ class Project():
    128 128
             self._shell_host_files = []   # A list of HostMount objects
    
    129 129
     
    
    130 130
             self.artifact_cache_specs = None
    
    131
    +        self.remote_execution_url = None
    
    131 132
             self._sandbox = None
    
    132 133
             self._splits = None
    
    133 134
     
    
    ... ... @@ -471,7 +472,7 @@ class Project():
    471 472
                 'aliases', 'name',
    
    472 473
                 'artifacts', 'options',
    
    473 474
                 'fail-on-overlap', 'shell', 'fatal-warnings',
    
    474
    -            'ref-storage', 'sandbox', 'mirrors'
    
    475
    +            'ref-storage', 'sandbox', 'mirrors', 'remote-execution'
    
    475 476
             ])
    
    476 477
     
    
    477 478
             #
    
    ... ... @@ -482,6 +483,11 @@ class Project():
    482 483
             # Load artifacts pull/push configuration for this project
    
    483 484
             self.artifact_cache_specs = ArtifactCache.specs_from_config_node(config, self.directory)
    
    484 485
     
    
    486
    +        # Load remote-execution configuration for this project
    
    487
    +        remote_execution = _yaml.node_get(config, Mapping, 'remote-execution')
    
    488
    +        _yaml.node_validate(remote_execution, ['url'])
    
    489
    +        self.remote_execution_url = _yaml.node_get(remote_execution, str, 'url')
    
    490
    +
    
    485 491
             # Load sandbox environment variables
    
    486 492
             self.base_environment = _yaml.node_get(config, Mapping, 'environment')
    
    487 493
             self.base_env_nocache = _yaml.node_get(config, list, 'environment-nocache')
    

  • buildstream/data/projectconfig.yaml
    ... ... @@ -204,3 +204,6 @@ shell:
    204 204
       # Command to run when `bst shell` does not provide a command
    
    205 205
       #
    
    206 206
       command: [ 'sh', '-i' ]
    
    207
    +
    
    208
    +remote-execution:
    
    209
    +  url: ""
    \ No newline at end of file

  • buildstream/element.py
    ... ... @@ -95,6 +95,7 @@ from . import _site
    95 95
     from ._platform import Platform
    
    96 96
     from .plugin import CoreWarnings
    
    97 97
     from .sandbox._config import SandboxConfig
    
    98
    +from .sandbox._sandboxremote import SandboxRemote
    
    98 99
     
    
    99 100
     from .storage.directory import Directory
    
    100 101
     from .storage._filebaseddirectory import FileBasedDirectory
    
    ... ... @@ -250,6 +251,12 @@ class Element(Plugin):
    250 251
             # Extract Sandbox config
    
    251 252
             self.__sandbox_config = self.__extract_sandbox_config(meta)
    
    252 253
     
    
    254
    +        # Extract remote execution URL
    
    255
    +        if not self.__is_junction:
    
    256
    +            self.__remote_execution_url = project.remote_execution_url
    
    257
    +        else:
    
    258
    +            self.__remote_execution_url = None
    
    259
    +
    
    253 260
         def __lt__(self, other):
    
    254 261
             return self.name < other.name
    
    255 262
     
    
    ... ... @@ -1570,6 +1577,8 @@ class Element(Plugin):
    1570 1577
                     finally:
    
    1571 1578
                         if collect is not None:
    
    1572 1579
                             try:
    
    1580
    +                            # Sandbox will probably have replaced its virtual directory, so get it again
    
    1581
    +                            sandbox_vroot = sandbox.get_virtual_directory()
    
    1573 1582
                                 collectvdir = sandbox_vroot.descend(collect.lstrip(os.sep).split(os.sep))
    
    1574 1583
                             except VirtualDirectoryError:
    
    1575 1584
                                 # No collect directory existed
    
    ... ... @@ -2146,7 +2155,32 @@ class Element(Plugin):
    2146 2155
             project = self._get_project()
    
    2147 2156
             platform = Platform.get_platform()
    
    2148 2157
     
    
    2149
    -        if directory is not None and os.path.exists(directory):
    
    2158
    +        if self.__remote_execution_url and self.BST_VIRTUAL_DIRECTORY:
    
    2159
    +            if not self.__artifacts.has_push_remotes(element=self):
    
    2160
    +                # Give an early warning if remote execution will not work
    
    2161
    +                raise ElementError("Artifact {} is configured to use remote execution but has no push remotes. "
    
    2162
    +                                   .format(self.name) +
    
    2163
    +                                   "The remote artifact server(s) may not be correctly configured or contactable.")
    
    2164
    +
    
    2165
    +            self.info("Using a remote sandbox for artifact {}".format(self.name))
    
    2166
    +
    
    2167
    +            sandbox = SandboxRemote(context, project,
    
    2168
    +                                    directory,
    
    2169
    +                                    stdout=stdout,
    
    2170
    +                                    stderr=stderr,
    
    2171
    +                                    config=config,
    
    2172
    +                                    server_url=self.__remote_execution_url,
    
    2173
    +                                    allow_real_directory=False)
    
    2174
    +            yield sandbox
    
    2175
    +
    
    2176
    +        elif directory is not None and os.path.exists(directory):
    
    2177
    +            if self.__remote_execution_url:
    
    2178
    +                self.warn("Artifact {} is configured to use remote execution but element plugin does not support it."
    
    2179
    +                          .format(self.name), detail="Element plugin '{kind}' does not support virtual directories."
    
    2180
    +                          .format(kind=self.get_kind()), warning_token="remote-failure")
    
    2181
    +
    
    2182
    +                self.info("Falling back to local sandbox for artifact {}".format(self.name))
    
    2183
    +
    
    2150 2184
                 sandbox = platform.create_sandbox(context, project,
    
    2151 2185
                                                   directory,
    
    2152 2186
                                                   stdout=stdout,
    

  • buildstream/plugins/elements/autotools.py
    ... ... @@ -57,7 +57,8 @@ from buildstream import BuildElement
    57 57
     
    
    58 58
     # Element implementation for the 'autotools' kind.
    
    59 59
     class AutotoolsElement(BuildElement):
    
    60
    -    pass
    
    60
    +    # Supports virtual directories (required for remote execution)
    
    61
    +    BST_VIRTUAL_DIRECTORY = True
    
    61 62
     
    
    62 63
     
    
    63 64
     # Plugin entry point
    

  • buildstream/plugins/elements/cmake.py
    ... ... @@ -56,7 +56,8 @@ from buildstream import BuildElement
    56 56
     
    
    57 57
     # Element implementation for the 'cmake' kind.
    
    58 58
     class CMakeElement(BuildElement):
    
    59
    -    pass
    
    59
    +    # Supports virtual directories (required for remote execution)
    
    60
    +    BST_VIRTUAL_DIRECTORY = True
    
    60 61
     
    
    61 62
     
    
    62 63
     # Plugin entry point
    

  • buildstream/plugins/elements/make.py
    ... ... @@ -38,7 +38,8 @@ from buildstream import BuildElement
    38 38
     
    
    39 39
     # Element implementation for the 'make' kind.
    
    40 40
     class MakeElement(BuildElement):
    
    41
    -    pass
    
    41
    +    # Supports virtual directories (required for remote execution)
    
    42
    +    BST_VIRTUAL_DIRECTORY = True
    
    42 43
     
    
    43 44
     
    
    44 45
     # Plugin entry point
    

  • buildstream/plugins/elements/meson.py
    ... ... @@ -53,7 +53,8 @@ from buildstream import BuildElement
    53 53
     
    
    54 54
     # Element implementation for the 'meson' kind.
    
    55 55
     class MesonElement(BuildElement):
    
    56
    -    pass
    
    56
    +    # Supports virtual directories (required for remote execution)
    
    57
    +    BST_VIRTUAL_DIRECTORY = True
    
    57 58
     
    
    58 59
     
    
    59 60
     # Plugin entry point
    

  • buildstream/plugins/elements/qmake.py
    ... ... @@ -33,7 +33,8 @@ from buildstream import BuildElement
    33 33
     
    
    34 34
     # Element implementation for the 'qmake' kind.
    
    35 35
     class QMakeElement(BuildElement):
    
    36
    -    pass
    
    36
    +    # Supports virtual directories (required for remote execution)
    
    37
    +    BST_VIRTUAL_DIRECTORY = True
    
    37 38
     
    
    38 39
     
    
    39 40
     # Plugin entry point
    

  • buildstream/sandbox/__init__.py
    ... ... @@ -20,3 +20,4 @@
    20 20
     from .sandbox import Sandbox, SandboxFlags
    
    21 21
     from ._sandboxchroot import SandboxChroot
    
    22 22
     from ._sandboxbwrap import SandboxBwrap
    
    23
    +from ._sandboxremote import SandboxRemote

  • buildstream/sandbox/_sandboxremote.py
    1
    +#!/usr/bin/env python3
    
    2
    +#
    
    3
    +#  Copyright (C) 2018 Bloomberg LP
    
    4
    +#
    
    5
    +#  This program is free software; you can redistribute it and/or
    
    6
    +#  modify it under the terms of the GNU Lesser General Public
    
    7
    +#  License as published by the Free Software Foundation; either
    
    8
    +#  version 2 of the License, or (at your option) any later version.
    
    9
    +#
    
    10
    +#  This library is distributed in the hope that it will be useful,
    
    11
    +#  but WITHOUT ANY WARRANTY; without even the implied warranty of
    
    12
    +#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
    
    13
    +#  Lesser General Public License for more details.
    
    14
    +#
    
    15
    +#  You should have received a copy of the GNU Lesser General Public
    
    16
    +#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
    
    17
    +#
    
    18
    +#  Authors:
    
    19
    +#        Jim MacArthur <jim macarthur codethink co uk>
    
    20
    +
    
    21
    +import os
    
    22
    +import re
    
    23
    +from urllib.parse import urlparse
    
    24
    +
    
    25
    +import grpc
    
    26
    +
    
    27
    +from . import Sandbox
    
    28
    +from ..storage._filebaseddirectory import FileBasedDirectory
    
    29
    +from ..storage._casbaseddirectory import CasBasedDirectory
    
    30
    +from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    31
    +from .._artifactcache.cascache import CASCache
    
    32
    +
    
    33
    +
    
    34
    +class SandboxError(Exception):
    
    35
    +    pass
    
    36
    +
    
    37
    +
    
    38
    +# SandboxRemote()
    
    39
    +#
    
    40
    +# This isn't really a sandbox, it's a stub which sends all the sources and build
    
    41
    +# commands to a remote server and retrieves the results from it.
    
    42
    +#
    
    43
    +class SandboxRemote(Sandbox):
    
    44
    +
    
    45
    +    def __init__(self, *args, **kwargs):
    
    46
    +        super().__init__(*args, **kwargs)
    
    47
    +        self.cascache = None
    
    48
    +
    
    49
    +        url = urlparse(kwargs['server_url'])
    
    50
    +        if not url.scheme or not url.hostname or not url.port:
    
    51
    +            raise SandboxError("Configured remote URL '{}' does not match the expected layout. "
    
    52
    +                               .format(self.server_url) +
    
    53
    +                               "It should be of the form <protocol>://<domain name>:<port>.")
    
    54
    +        elif url.scheme != 'http':
    
    55
    +            raise SandboxError("Configured remote '{}' uses an unsupported protocol. "
    
    56
    +                               "Only plain HTTP is currenlty supported (no HTTPS).")
    
    57
    +
    
    58
    +        self.server_url = '{}:{}'.format(url.hostname, url.port)
    
    59
    +
    
    60
    +    def _get_cascache(self):
    
    61
    +        if self.cascache is None:
    
    62
    +            self.cascache = CASCache(self._get_context())
    
    63
    +            self.cascache.setup_remotes(use_config=True)
    
    64
    +        return self.cascache
    
    65
    +
    
    66
    +    def run_remote_command(self, command, input_root_digest, working_directory, environment):
    
    67
    +        # Sends an execution request to the remote execution server.
    
    68
    +        #
    
    69
    +        # This function blocks until it gets a response from the server.
    
    70
    +        #
    
    71
    +        environment_variables = [remote_execution_pb2.Command.
    
    72
    +                                 EnvironmentVariable(name=k, value=v)
    
    73
    +                                 for (k, v) in environment.items()]
    
    74
    +
    
    75
    +        # Create and send the Command object.
    
    76
    +        remote_command = remote_execution_pb2.Command(arguments=command,
    
    77
    +                                                      working_directory=working_directory,
    
    78
    +                                                      environment_variables=environment_variables,
    
    79
    +                                                      output_files=[],
    
    80
    +                                                      output_directories=[self._output_directory],
    
    81
    +                                                      platform=None)
    
    82
    +
    
    83
    +        cascache = self._get_cascache()
    
    84
    +        # Upload the Command message to the remote CAS server
    
    85
    +        command_digest = cascache.push_message(self._get_project(), remote_command)
    
    86
    +        if not command_digest or not cascache.verify_digest_pushed(self._get_project(), command_digest):
    
    87
    +            # Command push failed
    
    88
    +            return None
    
    89
    +
    
    90
    +        # Create and send the action.
    
    91
    +        action = remote_execution_pb2.Action(command_digest=command_digest,
    
    92
    +                                             input_root_digest=input_root_digest,
    
    93
    +                                             timeout=None,
    
    94
    +                                             do_not_cache=False)
    
    95
    +
    
    96
    +        # Upload the Action message to the remote CAS server
    
    97
    +        action_digest = cascache.push_message(self._get_project(), action)
    
    98
    +        if not action_digest or not cascache.verify_digest_pushed(self._get_project(), action_digest):
    
    99
    +            # Action push failed
    
    100
    +            return None
    
    101
    +
    
    102
    +        # Next, try to create a communication channel to the BuildGrid server.
    
    103
    +        channel = grpc.insecure_channel(self.server_url)
    
    104
    +        stub = remote_execution_pb2_grpc.ExecutionStub(channel)
    
    105
    +        request = remote_execution_pb2.ExecuteRequest(action_digest=action_digest,
    
    106
    +                                                      skip_cache_lookup=False)
    
    107
    +        try:
    
    108
    +            operation_iterator = stub.Execute(request)
    
    109
    +        except grpc.RpcError:
    
    110
    +            return None
    
    111
    +
    
    112
    +        operation = None
    
    113
    +        with self._get_context().timed_activity("Waiting for the remote build to complete"):
    
    114
    +            # It is advantageous to check operation_iterator.code() is grpc.StatusCode.OK here,
    
    115
    +            # which will check the server is actually contactable. However, calling it when the
    
    116
    +            # server is available seems to cause .code() to hang forever.
    
    117
    +            for operation in operation_iterator:
    
    118
    +                if operation.done:
    
    119
    +                    break
    
    120
    +
    
    121
    +        return operation
    
    122
    +
    
    123
    +    def process_job_output(self, output_directories, output_files):
    
    124
    +        # Reads the remote execution server response to an execution request.
    
    125
    +        #
    
    126
    +        # output_directories is an array of OutputDirectory objects.
    
    127
    +        # output_files is an array of OutputFile objects.
    
    128
    +        #
    
    129
    +        # We only specify one output_directory, so it's an error
    
    130
    +        # for there to be any output files or more than one directory at the moment.
    
    131
    +        #
    
    132
    +        if output_files:
    
    133
    +            raise SandboxError("Output files were returned when we didn't request any.")
    
    134
    +        elif not output_directories:
    
    135
    +            error_text = "No output directory was returned from the build server."
    
    136
    +            raise SandboxError(error_text)
    
    137
    +        elif len(output_directories) > 1:
    
    138
    +            error_text = "More than one output directory was returned from the build server: {}."
    
    139
    +            raise SandboxError(error_text.format(output_directories))
    
    140
    +
    
    141
    +        tree_digest = output_directories[0].tree_digest
    
    142
    +        if tree_digest is None or not tree_digest.hash:
    
    143
    +            raise SandboxError("Output directory structure had no digest attached.")
    
    144
    +
    
    145
    +        cascache = self._get_cascache()
    
    146
    +        # Now do a pull to ensure we have the necessary parts.
    
    147
    +        dir_digest = cascache.pull_tree(self._get_project(), tree_digest)
    
    148
    +        if dir_digest is None or not dir_digest.hash or not dir_digest.size_bytes:
    
    149
    +            raise SandboxError("Output directory structure pulling from remote failed.")
    
    150
    +
    
    151
    +        path_components = os.path.split(self._output_directory)
    
    152
    +
    
    153
    +        # Now what we have is a digest for the output. Once we return, the calling process will
    
    154
    +        # attempt to descend into our directory and find that directory, so we need to overwrite
    
    155
    +        # that.
    
    156
    +
    
    157
    +        if not path_components:
    
    158
    +            # The artifact wants the whole directory; we could just return the returned hash in its
    
    159
    +            # place, but we don't have a means to do that yet.
    
    160
    +            raise SandboxError("Unimplemented: Output directory is empty or equal to the sandbox root.")
    
    161
    +
    
    162
    +        # At the moment, we will get the whole directory back in the first directory argument and we need
    
    163
    +        # to replace the sandbox's virtual directory with that. Creating a new virtual directory object
    
    164
    +        # from another hash will be interesting, though...
    
    165
    +
    
    166
    +        new_dir = CasBasedDirectory(self._get_context(), ref=dir_digest)
    
    167
    +        self._set_virtual_directory(new_dir)
    
    168
    +
    
    169
    +    def run(self, command, flags, *, cwd=None, env=None):
    
    170
    +        # Upload sources
    
    171
    +        upload_vdir = self.get_virtual_directory()
    
    172
    +
    
    173
    +        if isinstance(upload_vdir, FileBasedDirectory):
    
    174
    +            # Make a new temporary directory to put source in
    
    175
    +            upload_vdir = CasBasedDirectory(self._get_context(), ref=None)
    
    176
    +            upload_vdir.import_files(self.get_virtual_directory()._get_underlying_directory())
    
    177
    +
    
    178
    +        upload_vdir.recalculate_hash()
    
    179
    +
    
    180
    +        cascache = self._get_cascache()
    
    181
    +        # Now, push that key (without necessarily needing a ref) to the remote.
    
    182
    +        vdir_digest = cascache.push_directory(self._get_project(), upload_vdir)
    
    183
    +        if not vdir_digest or not cascache.verify_digest_pushed(self._get_project(), vdir_digest):
    
    184
    +            raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
    
    185
    +
    
    186
    +        # Set up environment and working directory
    
    187
    +        if cwd is None:
    
    188
    +            cwd = self._get_work_directory()
    
    189
    +
    
    190
    +        if cwd is None:
    
    191
    +            cwd = '/'
    
    192
    +
    
    193
    +        if env is None:
    
    194
    +            env = self._get_environment()
    
    195
    +
    
    196
    +        # We want command args as a list of strings
    
    197
    +        if isinstance(command, str):
    
    198
    +            command = [command]
    
    199
    +
    
    200
    +        # Now transmit the command to execute
    
    201
    +        operation = self.run_remote_command(command, upload_vdir.ref, cwd, env)
    
    202
    +
    
    203
    +        if operation is None:
    
    204
    +            # Failure of remote execution, usually due to an error in BuildStream
    
    205
    +            # NB This error could be raised in __run_remote_command
    
    206
    +            raise SandboxError("No response returned from server")
    
    207
    +
    
    208
    +        assert(not operation.HasField('error') and operation.HasField('response'))
    
    209
    +
    
    210
    +        execution_response = remote_execution_pb2.ExecuteResponse()
    
    211
    +        # The response is expected to be an ExecutionResponse message
    
    212
    +        assert(operation.response.Is(execution_response.DESCRIPTOR))
    
    213
    +
    
    214
    +        operation.response.Unpack(execution_response)
    
    215
    +
    
    216
    +        if execution_response.status.code != 0:
    
    217
    +            # A normal error during the build: the remote execution system
    
    218
    +            # has worked correctly but the command failed.
    
    219
    +            # execution_response.error also contains 'message' (str) and
    
    220
    +            # 'details' (iterator of Any) which we ignore at the moment.
    
    221
    +            return execution_response.status.code
    
    222
    +
    
    223
    +        action_result = execution_response.result
    
    224
    +
    
    225
    +        self.process_job_output(action_result.output_directories, action_result.output_files)
    
    226
    +
    
    227
    +        return 0

  • doc/source/format_project.rst
    ... ... @@ -204,6 +204,24 @@ with an artifact share.
    204 204
     You can also specify a list of caches here; earlier entries in the list
    
    205 205
     will have higher priority than later ones.
    
    206 206
     
    
    207
    +Remote execution
    
    208
    +~~~~~~~~~~~~~~~~
    
    209
    +BuildStream supports remote execution using the Google Remote Execution API
    
    210
    +(REAPI). A description of how remote execution works is beyond the scope
    
    211
    +of this document, but you can specify a remote server complying with the REAPI
    
    212
    +using the `remote-execution` option:
    
    213
    +
    
    214
    +.. code:: yaml
    
    215
    +
    
    216
    +  remote-execution:
    
    217
    +
    
    218
    +    # A url defining a remote execution server
    
    219
    +    url: http://buildserver.example.com:50051
    
    220
    +
    
    221
    +The url should contain a hostname and port separated by ':'. Only plain HTTP is
    
    222
    +currently suported (no HTTPS).
    
    223
    +
    
    224
    +The Remote Execution API can be found via https://github.com/bazelbuild/remote-apis.
    
    207 225
     
    
    208 226
     .. _project_essentials_mirrors:
    
    209 227
     
    

  • tests/artifactcache/project/elements/compose-all.bst
    1
    +kind: compose
    
    2
    +
    
    3
    +depends:
    
    4
    +- filename: import-bin.bst
    
    5
    +  type: build
    
    6
    +- filename: import-dev.bst
    
    7
    +  type: build
    
    8
    +
    
    9
    +config:
    
    10
    +  # Dont try running the sandbox, we dont have a
    
    11
    +  # runtime to run anything in this context.
    
    12
    +  integrate: False

  • tests/artifactcache/project/elements/import-bin.bst
    1
    +kind: import
    
    2
    +sources:
    
    3
    +- kind: local
    
    4
    +  path: files/bin-files

  • tests/artifactcache/project/elements/import-dev.bst
    1
    +kind: import
    
    2
    +sources:
    
    3
    +- kind: local
    
    4
    +  path: files/dev-files

  • tests/artifactcache/project/elements/target.bst
    1
    +kind: stack
    
    2
    +description: |
    
    3
    +
    
    4
    +  Main stack target for the bst build test
    
    5
    +
    
    6
    +depends:
    
    7
    +- import-bin.bst
    
    8
    +- import-dev.bst
    
    9
    +- compose-all.bst

  • tests/artifactcache/project/files/bin-files/usr/bin/hello
    1
    +#!/bin/bash
    
    2
    +
    
    3
    +echo "Hello !"

  • tests/artifactcache/project/files/dev-files/usr/include/pony.h
    1
    +#ifndef __PONY_H__
    
    2
    +#define __PONY_H__
    
    3
    +
    
    4
    +#define PONY_BEGIN "Once upon a time, there was a pony."
    
    5
    +#define PONY_END "And they lived happily ever after, the end."
    
    6
    +
    
    7
    +#define MAKE_PONY(story)  \
    
    8
    +  PONY_BEGIN \
    
    9
    +  story \
    
    10
    +  PONY_END
    
    11
    +
    
    12
    +#endif /* __PONY_H__ */

  • tests/artifactcache/project/project.conf
    1
    +# Project config for frontend build test
    
    2
    +name: test
    
    3
    +
    
    4
    +element-path: elements

  • tests/artifactcache/pull.py
    1
    +import hashlib
    
    2
    +import os
    
    3
    +import pytest
    
    4
    +
    
    5
    +from buildstream._artifactcache.artifactcache import ArtifactCacheSpec
    
    6
    +from buildstream._artifactcache.cascache import CASCache
    
    7
    +from buildstream._context import Context
    
    8
    +from buildstream._project import Project
    
    9
    +from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    10
    +
    
    11
    +from tests.testutils import cli, create_artifact_share
    
    12
    +
    
    13
    +
    
    14
    +# Project directory
    
    15
    +DATA_DIR = os.path.join(
    
    16
    +    os.path.dirname(os.path.realpath(__file__)),
    
    17
    +    "project",
    
    18
    +)
    
    19
    +
    
    20
    +
    
    21
    +# Handle messages from the pipeline
    
    22
    +def message_handler(message, context):
    
    23
    +    pass
    
    24
    +
    
    25
    +
    
    26
    +def tree_maker(cas, tree, directory):
    
    27
    +    if tree.root.ByteSize() == 0:
    
    28
    +        tree.root.CopyFrom(directory)
    
    29
    +
    
    30
    +    for directory_node in directory.directories:
    
    31
    +        child_directory = tree.children.add()
    
    32
    +
    
    33
    +        with open(cas.objpath(directory_node.digest), 'rb') as f:
    
    34
    +            child_directory.ParseFromString(f.read())
    
    35
    +
    
    36
    +        tree_maker(cas, tree, child_directory)
    
    37
    +
    
    38
    +
    
    39
    +@pytest.mark.datafiles(DATA_DIR)
    
    40
    +def test_pull(cli, tmpdir, datafiles):
    
    41
    +    project_dir = str(datafiles)
    
    42
    +
    
    43
    +    # Set up an artifact cache.
    
    44
    +    with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
    
    45
    +        # Configure artifact share
    
    46
    +        cli.configure({
    
    47
    +            'scheduler': {
    
    48
    +                'pushers': 1
    
    49
    +            },
    
    50
    +            'artifacts': {
    
    51
    +                'url': share.repo,
    
    52
    +                'push': True,
    
    53
    +            }
    
    54
    +        })
    
    55
    +
    
    56
    +        # First build the project with the artifact cache configured
    
    57
    +        result = cli.run(project=project_dir, args=['build', 'target.bst'])
    
    58
    +        result.assert_success()
    
    59
    +
    
    60
    +        # Assert that we are now cached locally
    
    61
    +        assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
    
    62
    +        # Assert that we shared/pushed the cached artifact
    
    63
    +        element_key = cli.get_element_key(project_dir, 'target.bst')
    
    64
    +        assert share.has_artifact('test', 'target.bst', element_key)
    
    65
    +
    
    66
    +        # Delete the artifact locally
    
    67
    +        cli.remove_artifact_from_cache(project_dir, 'target.bst')
    
    68
    +
    
    69
    +        # Assert that we are not cached locally anymore
    
    70
    +        assert cli.get_element_state(project_dir, 'target.bst') != 'cached'
    
    71
    +
    
    72
    +        # Fake minimal context
    
    73
    +        context = Context()
    
    74
    +        context.set_message_handler(message_handler)
    
    75
    +        context.sched_pushers = 1
    
    76
    +        context.artifactdir = os.path.join(str(tmpdir), 'cache', 'artifacts')
    
    77
    +        context.artifact_cache_specs = [ArtifactCacheSpec(url=share.repo,
    
    78
    +                                                          push=True)]
    
    79
    +
    
    80
    +        # Load the project and CAS cache
    
    81
    +        project = Project(project_dir, context)
    
    82
    +        project.ensure_fully_loaded()
    
    83
    +        cas = CASCache(context)
    
    84
    +
    
    85
    +        # Assert that the element's artifact is **not** cached
    
    86
    +        element = project.load_elements(['target.bst'], cas)[0]
    
    87
    +        element_key = cli.get_element_key(project_dir, 'target.bst')
    
    88
    +        assert not cas.contains(element, element_key)
    
    89
    +
    
    90
    +        # Manually setup the CAS remote
    
    91
    +        cas.setup_remotes(use_config=True)
    
    92
    +        cas.initialize_remotes()
    
    93
    +        assert cas.has_push_remotes()
    
    94
    +
    
    95
    +        # Pull the artifact
    
    96
    +        pulled = cas.pull(element, element_key)
    
    97
    +        assert pulled is True
    
    98
    +        assert cas.contains(element, element_key)
    
    99
    +
    
    100
    +        # Finally, close the opened gRPC channels properly!
    
    101
    +        for remote in cas._remotes[project]:
    
    102
    +            if remote.channel:
    
    103
    +                remote.channel.close()
    
    104
    +
    
    105
    +
    
    106
    +@pytest.mark.datafiles(DATA_DIR)
    
    107
    +def test_pull_tree(cli, tmpdir, datafiles):
    
    108
    +    project_dir = str(datafiles)
    
    109
    +
    
    110
    +    # Set up an artifact cache.
    
    111
    +    with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
    
    112
    +        # Configure artifact share
    
    113
    +        cli.configure({
    
    114
    +            'scheduler': {
    
    115
    +                'pushers': 1
    
    116
    +            },
    
    117
    +            'artifacts': {
    
    118
    +                'url': share.repo,
    
    119
    +                'push': True,
    
    120
    +            }
    
    121
    +        })
    
    122
    +
    
    123
    +        # First build the project with the artifact cache configured
    
    124
    +        result = cli.run(project=project_dir, args=['build', 'target.bst'])
    
    125
    +        result.assert_success()
    
    126
    +
    
    127
    +        # Assert that we are now cached locally
    
    128
    +        assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
    
    129
    +        # Assert that we shared/pushed the cached artifact
    
    130
    +        element_key = cli.get_element_key(project_dir, 'target.bst')
    
    131
    +        assert share.has_artifact('test', 'target.bst', element_key)
    
    132
    +
    
    133
    +        # Fake minimal context
    
    134
    +        context = Context()
    
    135
    +        context.set_message_handler(message_handler)
    
    136
    +        context.sched_pushers = 1
    
    137
    +        context.artifactdir = os.path.join(str(tmpdir), 'cache', 'artifacts')
    
    138
    +        context.artifact_cache_specs = [ArtifactCacheSpec(url=share.repo,
    
    139
    +                                                          push=True)]
    
    140
    +
    
    141
    +        # Load the project and CAS cache
    
    142
    +        project = Project(project_dir, context)
    
    143
    +        project.ensure_fully_loaded()
    
    144
    +        cas = CASCache(context)
    
    145
    +
    
    146
    +        # Assert that the element's artifact is cached
    
    147
    +        element = project.load_elements(['target.bst'], cas)[0]
    
    148
    +        element_key = cli.get_element_key(project_dir, 'target.bst')
    
    149
    +        assert cas.contains(element, element_key)
    
    150
    +
    
    151
    +        # Manually setup the CAS remote
    
    152
    +        cas.setup_remotes(use_config=True)
    
    153
    +        cas.initialize_remotes()
    
    154
    +        assert cas.has_push_remotes(element=element)
    
    155
    +
    
    156
    +        # Retrieve the Directory object from the cached artifact
    
    157
    +        artifact_ref = cas.get_artifact_fullname(element, element_key)
    
    158
    +        artifact_digest = cas.resolve_ref(artifact_ref)
    
    159
    +
    
    160
    +        directory = remote_execution_pb2.Directory()
    
    161
    +
    
    162
    +        with open(cas.objpath(artifact_digest), 'rb') as f:
    
    163
    +            directory.ParseFromString(f.read())
    
    164
    +
    
    165
    +        # Build the Tree object while we are still cached
    
    166
    +        tree = remote_execution_pb2.Tree()
    
    167
    +        tree_maker(cas, tree, directory)
    
    168
    +
    
    169
    +        # Push the Tree as a regular message
    
    170
    +        tree_digest = cas.push_message(project, tree)
    
    171
    +
    
    172
    +        # Now delete the artifact locally
    
    173
    +        cli.remove_artifact_from_cache(project_dir, 'target.bst')
    
    174
    +
    
    175
    +        # Assert that we are not cached locally anymore
    
    176
    +        assert cli.get_element_state(project_dir, 'target.bst') != 'cached'
    
    177
    +
    
    178
    +        # Pull the artifact using the Tree object
    
    179
    +        directory_digest = cas.pull_tree(project, tree_digest)
    
    180
    +        assert directory_digest == artifact_digest
    
    181
    +
    
    182
    +        # Ensure the entire Tree stucture has been pulled
    
    183
    +        assert os.path.exists(cas.objpath(directory_digest))
    
    184
    +        for child_directory in tree.children:
    
    185
    +            child_blob = child_directory.SerializeToString()
    
    186
    +
    
    187
    +            child_digest = remote_execution_pb2.Digest()
    
    188
    +            child_digest.hash = hashlib.sha256(child_blob).hexdigest()
    
    189
    +            child_digest.size_bytes = len(child_blob)
    
    190
    +
    
    191
    +            assert os.path.exists(cas.objpath(child_digest))
    
    192
    +
    
    193
    +        # Finally, close the opened gRPC channels properly!
    
    194
    +        for remote in cas._remotes[project]:
    
    195
    +            if remote.channel:
    
    196
    +                remote.channel.close()

  • tests/artifactcache/push.py
    1
    +import os
    
    2
    +import pytest
    
    3
    +
    
    4
    +from pluginbase import PluginBase
    
    5
    +from buildstream._artifactcache.artifactcache import ArtifactCacheSpec
    
    6
    +from buildstream._artifactcache.cascache import CASCache
    
    7
    +from buildstream._context import Context
    
    8
    +from buildstream._project import Project
    
    9
    +from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    10
    +from buildstream.storage._casbaseddirectory import CasBasedDirectory
    
    11
    +
    
    12
    +from tests.testutils import cli, create_artifact_share
    
    13
    +
    
    14
    +
    
    15
    +# Project directory
    
    16
    +DATA_DIR = os.path.join(
    
    17
    +    os.path.dirname(os.path.realpath(__file__)),
    
    18
    +    "project",
    
    19
    +)
    
    20
    +
    
    21
    +
    
    22
    +# Handle messages from the pipeline
    
    23
    +def message_handler(message, context):
    
    24
    +    pass
    
    25
    +
    
    26
    +
    
    27
    +@pytest.mark.datafiles(DATA_DIR)
    
    28
    +def test_push(cli, tmpdir, datafiles):
    
    29
    +    project_dir = str(datafiles)
    
    30
    +
    
    31
    +    # First build the project without the artifact cache configured
    
    32
    +    result = cli.run(project=project_dir, args=['build', 'target.bst'])
    
    33
    +    result.assert_success()
    
    34
    +
    
    35
    +    # Assert that we are now cached locally
    
    36
    +    assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
    
    37
    +
    
    38
    +    # Set up an artifact cache.
    
    39
    +    with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
    
    40
    +        # Fake minimal context
    
    41
    +        context = Context()
    
    42
    +        context.set_message_handler(message_handler)
    
    43
    +        context.sched_pushers = 1
    
    44
    +        context.artifactdir = os.path.join(str(tmpdir), 'cache', 'artifacts')
    
    45
    +        context.artifact_cache_specs = [ArtifactCacheSpec(url=share.repo,
    
    46
    +                                                          push=True)]
    
    47
    +
    
    48
    +        # Load the project and CAS cache
    
    49
    +        project = Project(project_dir, context)
    
    50
    +        project.ensure_fully_loaded()
    
    51
    +        cas = CASCache(context)
    
    52
    +
    
    53
    +        # Assert that the element's artifact is cached
    
    54
    +        element = project.load_elements(['target.bst'], cas)[0]
    
    55
    +        element_key = cli.get_element_key(project_dir, 'target.bst')
    
    56
    +        assert cas.contains(element, element_key)
    
    57
    +
    
    58
    +        # Manually setup the CAS remote
    
    59
    +        cas.setup_remotes(use_config=True)
    
    60
    +        cas.initialize_remotes()
    
    61
    +        assert cas.has_push_remotes(element=element)
    
    62
    +
    
    63
    +        # Push the element's artifact
    
    64
    +        pushed = cas.push(element, [element_key])
    
    65
    +        assert pushed is True
    
    66
    +        assert share.has_artifact('test', 'target.bst', element_key)
    
    67
    +
    
    68
    +        # Finally, close the opened gRPC channels properly!
    
    69
    +        for remote in cas._remotes[project]:
    
    70
    +            if remote.channel:
    
    71
    +                remote.channel.close()
    
    72
    +
    
    73
    +
    
    74
    +@pytest.mark.datafiles(DATA_DIR)
    
    75
    +def test_push_directory(cli, tmpdir, datafiles):
    
    76
    +    project_dir = str(datafiles)
    
    77
    +
    
    78
    +    # First build the project without the artifact cache configured
    
    79
    +    result = cli.run(project=project_dir, args=['build', 'target.bst'])
    
    80
    +    result.assert_success()
    
    81
    +
    
    82
    +    # Assert that we are now cached locally
    
    83
    +    assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
    
    84
    +
    
    85
    +    # Set up an artifact cache.
    
    86
    +    with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
    
    87
    +        # Fake minimal context
    
    88
    +        context = Context()
    
    89
    +        context.set_message_handler(message_handler)
    
    90
    +        context.sched_pushers = 1
    
    91
    +        context.artifactdir = os.path.join(str(tmpdir), 'cache', 'artifacts')
    
    92
    +        context.artifact_cache_specs = [ArtifactCacheSpec(url=share.repo,
    
    93
    +                                                          push=True)]
    
    94
    +
    
    95
    +        # Load the project and CAS cache
    
    96
    +        project = Project(project_dir, context)
    
    97
    +        project.ensure_fully_loaded()
    
    98
    +        cas = CASCache(context)
    
    99
    +
    
    100
    +        # Assert that the element's artifact is cached
    
    101
    +        element = project.load_elements(['target.bst'], cas)[0]
    
    102
    +        element_key = cli.get_element_key(project_dir, 'target.bst')
    
    103
    +        assert cas.contains(element, element_key)
    
    104
    +
    
    105
    +        # Manually setup the CAS remote
    
    106
    +        cas.setup_remotes(use_config=True)
    
    107
    +        cas.initialize_remotes()
    
    108
    +        assert cas.has_push_remotes(element=element)
    
    109
    +
    
    110
    +        # Recreate the CasBasedDirectory object from the cached artifact
    
    111
    +        artifact_ref = cas.get_artifact_fullname(element, element_key)
    
    112
    +        artifact_digest = cas.resolve_ref(artifact_ref)
    
    113
    +
    
    114
    +        directory = CasBasedDirectory(context, ref=artifact_digest)
    
    115
    +
    
    116
    +        # Push the CasBasedDirectory object
    
    117
    +        directory_digest = cas.push_directory(project, directory)
    
    118
    +        assert directory_digest == artifact_digest
    
    119
    +        assert share.has_object(directory_digest)
    
    120
    +
    
    121
    +        # Finally, close the opened gRPC channels properly!
    
    122
    +        for remote in cas._remotes[project]:
    
    123
    +            if remote.channel:
    
    124
    +                remote.channel.close()
    
    125
    +
    
    126
    +
    
    127
    +@pytest.mark.datafiles(DATA_DIR)
    
    128
    +def test_push_message(cli, tmpdir, datafiles):
    
    129
    +    project_dir = str(datafiles)
    
    130
    +
    
    131
    +    # Set up an artifact cache.
    
    132
    +    with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
    
    133
    +        # Fake minimal context
    
    134
    +        context = Context()
    
    135
    +        context.set_message_handler(message_handler)
    
    136
    +        context.sched_pushers = 1
    
    137
    +        context.artifactdir = os.path.join(str(tmpdir), 'cache', 'artifacts')
    
    138
    +        context.artifact_cache_specs = [ArtifactCacheSpec(url=share.repo,
    
    139
    +                                                          push=True)]
    
    140
    +
    
    141
    +        # Load the project and CAS cache
    
    142
    +        project = Project(project_dir, context)
    
    143
    +        project.ensure_fully_loaded()
    
    144
    +        cas = CASCache(context)
    
    145
    +
    
    146
    +        # Manually setup the CAS remote
    
    147
    +        cas.setup_remotes(use_config=True)
    
    148
    +        cas.initialize_remotes()
    
    149
    +        assert cas.has_push_remotes()
    
    150
    +
    
    151
    +        # Create an example message object
    
    152
    +        command = remote_execution_pb2.Command(arguments=['/usr/bin/gcc', '--help'],
    
    153
    +                                               working_directory='/buildstream-build',
    
    154
    +                                               output_directories=['/buildstream-install'])
    
    155
    +
    
    156
    +        # Push the message object
    
    157
    +        digest = cas.push_message(project, command)
    
    158
    +        assert digest
    
    159
    +        assert share.has_object(digest)
    
    160
    +
    
    161
    +        # Finally, close the opened gRPC channels properly!
    
    162
    +        for remote in cas._remotes[project]:
    
    163
    +            if remote.channel:
    
    164
    +                remote.channel.close()

  • tests/testutils/artifactshare.py
    ... ... @@ -15,6 +15,7 @@ from buildstream._artifactcache.cascache import CASCache
    15 15
     from buildstream._artifactcache.casserver import create_server
    
    16 16
     from buildstream._context import Context
    
    17 17
     from buildstream._exceptions import ArtifactError
    
    18
    +from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    18 19
     
    
    19 20
     
    
    20 21
     # ArtifactShare()
    
    ... ... @@ -87,6 +88,23 @@ class ArtifactShare():
    87 88
             # Sleep until termination by signal
    
    88 89
             signal.pause()
    
    89 90
     
    
    91
    +    # has_object():
    
    92
    +    #
    
    93
    +    # Checks whether the object is present in the share
    
    94
    +    #
    
    95
    +    # Args:
    
    96
    +    #    digest (str): The object's digest
    
    97
    +    #
    
    98
    +    # Returns:
    
    99
    +    #    (bool): True if the object exists in the share, otherwise false.
    
    100
    +    def has_object(self, digest):
    
    101
    +
    
    102
    +        assert isinstance(digest, remote_execution_pb2.Digest)
    
    103
    +
    
    104
    +        object_path = self.cas.objpath(digest)
    
    105
    +
    
    106
    +        return os.path.exists(object_path)
    
    107
    +
    
    90 108
         # has_artifact():
    
    91 109
         #
    
    92 110
         # Checks whether the artifact is present in the share
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]