[Notes] [Git][BuildStream/buildstream][jmac/remote_execution_client] 10 commits: _casbaseddirectory.py: add _save() function.



Title: GitLab

Jim MacArthur pushed to branch jmac/remote_execution_client at BuildStream / buildstream

Commits:

10 changed files:

Changes:

  • buildstream/_project.py
    ... ... @@ -129,6 +129,7 @@ class Project():
    129 129
     
    
    130 130
             self.artifact_cache_specs = None
    
    131 131
             self._sandbox = None
    
    132
    +        self._remote_execution = None
    
    132 133
             self._splits = None
    
    133 134
     
    
    134 135
             self._context.add_project(self)
    
    ... ... @@ -460,7 +461,7 @@ class Project():
    460 461
                 'aliases', 'name',
    
    461 462
                 'artifacts', 'options',
    
    462 463
                 'fail-on-overlap', 'shell', 'fatal-warnings',
    
    463
    -            'ref-storage', 'sandbox', 'mirrors'
    
    464
    +            'ref-storage', 'sandbox', 'mirrors', 'remote-execution'
    
    464 465
             ])
    
    465 466
     
    
    466 467
             #
    
    ... ... @@ -478,6 +479,9 @@ class Project():
    478 479
             # Load sandbox configuration
    
    479 480
             self._sandbox = _yaml.node_get(config, Mapping, 'sandbox')
    
    480 481
     
    
    482
    +        # Load remote execution configuration
    
    483
    +        self._remote_execution = _yaml.node_get(config, Mapping, 'remote-execution')
    
    484
    +
    
    481 485
             # Load project split rules
    
    482 486
             self._splits = _yaml.node_get(config, Mapping, 'split-rules')
    
    483 487
     
    

  • buildstream/buildelement.py
    ... ... @@ -155,6 +155,9 @@ class BuildElement(Element):
    155 155
                 command_dir = build_root
    
    156 156
             sandbox.set_work_directory(command_dir)
    
    157 157
     
    
    158
    +        # Tell sandbox which directory is preserved in the finished artifact
    
    159
    +        sandbox.set_output_directory(install_root)
    
    160
    +
    
    158 161
             # Setup environment
    
    159 162
             sandbox.set_environment(self.get_environment())
    
    160 163
     
    

  • buildstream/data/projectconfig.yaml
    ... ... @@ -204,3 +204,6 @@ shell:
    204 204
       # Command to run when `bst shell` does not provide a command
    
    205 205
       #
    
    206 206
       command: [ 'sh', '-i' ]
    
    207
    +
    
    208
    +remote-execution:
    
    209
    +  url: ""
    \ No newline at end of file

  • buildstream/element.py
    ... ... @@ -95,6 +95,7 @@ from . import _site
    95 95
     from ._platform import Platform
    
    96 96
     from .plugin import CoreWarnings
    
    97 97
     from .sandbox._config import SandboxConfig
    
    98
    +from .sandbox._sandboxremote import SandboxRemote
    
    98 99
     
    
    99 100
     from .storage.directory import Directory
    
    100 101
     from .storage._filebaseddirectory import FileBasedDirectory
    
    ... ... @@ -250,6 +251,9 @@ class Element(Plugin):
    250 251
             # Extract Sandbox config
    
    251 252
             self.__sandbox_config = self.__extract_sandbox_config(meta)
    
    252 253
     
    
    254
    +        # Extract remote execution URL
    
    255
    +        self.__remote_execution_url = self.__extract_remote_execution_config(meta)
    
    256
    +
    
    253 257
         def __lt__(self, other):
    
    254 258
             return self.name < other.name
    
    255 259
     
    
    ... ... @@ -1545,6 +1549,8 @@ class Element(Plugin):
    1545 1549
                     finally:
    
    1546 1550
                         if collect is not None:
    
    1547 1551
                             try:
    
    1552
    +                            # Sandbox will probably have replaced its virtual directory, so get it again
    
    1553
    +                            sandbox_vroot = sandbox.get_virtual_directory()
    
    1548 1554
                                 collectvdir = sandbox_vroot.descend(collect.lstrip(os.sep).split(os.sep))
    
    1549 1555
                             except VirtualDirectoryError:
    
    1550 1556
                                 # No collect directory existed
    
    ... ... @@ -2117,7 +2123,24 @@ class Element(Plugin):
    2117 2123
             project = self._get_project()
    
    2118 2124
             platform = Platform.get_platform()
    
    2119 2125
     
    
    2120
    -        if directory is not None and os.path.exists(directory):
    
    2126
    +        if self.__remote_execution_url and self.BST_VIRTUAL_DIRECTORY:
    
    2127
    +            if not self.__artifacts.has_push_remotes(element=self):
    
    2128
    +                # Give an early warning if remote execution will not work
    
    2129
    +                raise ElementError("Artifact {} is configured to use remote execution but has no push remotes. "
    
    2130
    +                                   .format(self.name) +
    
    2131
    +                                   "The remote artifact server(s) may not be correctly configured or contactable.")
    
    2132
    +
    
    2133
    +            self.info("Using a remote 'sandbox' for artifact {}".format(self.name))
    
    2134
    +            sandbox = SandboxRemote(context, project,
    
    2135
    +                                    directory,
    
    2136
    +                                    stdout=stdout,
    
    2137
    +                                    stderr=stderr,
    
    2138
    +                                    config=config,
    
    2139
    +                                    server_url=self.__remote_execution_url,
    
    2140
    +                                    allow_real_directory=False)
    
    2141
    +            yield sandbox
    
    2142
    +        elif directory is not None and os.path.exists(directory):
    
    2143
    +            self.info("Using a local sandbox for artifact {}".format(self.name))
    
    2121 2144
                 sandbox = platform.create_sandbox(context, project,
    
    2122 2145
                                                   directory,
    
    2123 2146
                                                   stdout=stdout,
    
    ... ... @@ -2289,6 +2312,18 @@ class Element(Plugin):
    2289 2312
             return SandboxConfig(self.node_get_member(sandbox_config, int, 'build-uid'),
    
    2290 2313
                                  self.node_get_member(sandbox_config, int, 'build-gid'))
    
    2291 2314
     
    
    2315
    +    def __extract_remote_execution_config(self, meta):
    
    2316
    +        if self.__is_junction:
    
    2317
    +            return ''
    
    2318
    +        else:
    
    2319
    +            project = self._get_project()
    
    2320
    +            project.ensure_fully_loaded()
    
    2321
    +            if project._remote_execution:
    
    2322
    +                rexec_config = _yaml.node_chain_copy(project._remote_execution)
    
    2323
    +                return self.node_get_member(rexec_config, str, 'url')
    
    2324
    +            else:
    
    2325
    +                return ''
    
    2326
    +
    
    2292 2327
         # This makes a special exception for the split rules, which
    
    2293 2328
         # elements may extend but whos defaults are defined in the project.
    
    2294 2329
         #
    

  • buildstream/plugins/elements/autotools.py
    ... ... @@ -57,7 +57,7 @@ from buildstream import BuildElement
    57 57
     
    
    58 58
     # Element implementation for the 'autotools' kind.
    
    59 59
     class AutotoolsElement(BuildElement):
    
    60
    -    pass
    
    60
    +    BST_VIRTUAL_DIRECTORY = True
    
    61 61
     
    
    62 62
     
    
    63 63
     # Plugin entry point
    

  • buildstream/sandbox/__init__.py
    ... ... @@ -20,3 +20,4 @@
    20 20
     from .sandbox import Sandbox, SandboxFlags
    
    21 21
     from ._sandboxchroot import SandboxChroot
    
    22 22
     from ._sandboxbwrap import SandboxBwrap
    
    23
    +from ._sandboxremote import SandboxRemote

  • buildstream/sandbox/_sandboxremote.py
    1
    +#!/usr/bin/env python3
    
    2
    +#
    
    3
    +#  Copyright (C) 2018 Bloomberg LP
    
    4
    +#
    
    5
    +#  This program is free software; you can redistribute it and/or
    
    6
    +#  modify it under the terms of the GNU Lesser General Public
    
    7
    +#  License as published by the Free Software Foundation; either
    
    8
    +#  version 2 of the License, or (at your option) any later version.
    
    9
    +#
    
    10
    +#  This library is distributed in the hope that it will be useful,
    
    11
    +#  but WITHOUT ANY WARRANTY; without even the implied warranty of
    
    12
    +#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
    
    13
    +#  Lesser General Public License for more details.
    
    14
    +#
    
    15
    +#  You should have received a copy of the GNU Lesser General Public
    
    16
    +#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
    
    17
    +#
    
    18
    +#  Authors:
    
    19
    +#        Jim MacArthur <jim macarthur codethink co uk>
    
    20
    +
    
    21
    +import os
    
    22
    +import re
    
    23
    +
    
    24
    +import grpc
    
    25
    +
    
    26
    +from . import Sandbox
    
    27
    +from ..storage._filebaseddirectory import FileBasedDirectory
    
    28
    +from ..storage._casbaseddirectory import CasBasedDirectory
    
    29
    +from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    30
    +from .._artifactcache.cascache import CASCache
    
    31
    +
    
    32
    +
    
    33
    +class SandboxError(Exception):
    
    34
    +    pass
    
    35
    +
    
    36
    +
    
    37
    +# SandboxRemote()
    
    38
    +#
    
    39
    +# This isn't really a sandbox, it's a stub which sends all the source
    
    40
    +# to a remote server and retrieves the results from it.
    
    41
    +#
    
    42
    +class SandboxRemote(Sandbox):
    
    43
    +
    
    44
    +    def __init__(self, *args, **kwargs):
    
    45
    +        super().__init__(*args, **kwargs)
    
    46
    +        self.cascache = None
    
    47
    +        self.server_url = kwargs['server_url']
    
    48
    +        # Check the format of the url ourselves to save the user from
    
    49
    +        # whatever error messages grpc will produce
    
    50
    +        m = re.match(r'^(.+):(\d+)$', self.server_url)
    
    51
    +        if m is None:
    
    52
    +            raise SandboxError("Configured remote URL '{}' does not match the expected layout. "
    
    53
    +                               .format(self.server_url) +
    
    54
    +                               "It should be of the form <protocol>://<domain name>:<port>.")
    
    55
    +
    
    56
    +    def _get_cascache(self):
    
    57
    +        if self.cascache is None:
    
    58
    +            self.cascache = CASCache(self._get_context())
    
    59
    +            self.cascache.setup_remotes(use_config=True)
    
    60
    +        return self.cascache
    
    61
    +
    
    62
    +    def __run_remote_command(self, cascache, command, input_root_digest, environment):
    
    63
    +
    
    64
    +        environment_variables = [remote_execution_pb2.Command.
    
    65
    +                                 EnvironmentVariable(name=k, value=v)
    
    66
    +                                 for (k, v) in environment.items()]
    
    67
    +
    
    68
    +        # Create and send the Command object.
    
    69
    +        remote_command = remote_execution_pb2.Command(arguments=command, environment_variables=environment_variables,
    
    70
    +                                                      output_files=[],
    
    71
    +                                                      output_directories=[self._output_directory],
    
    72
    +                                                      platform=None)
    
    73
    +        command_digest = cascache.add_object(buffer=remote_command.SerializeToString())
    
    74
    +        command_ref = 'worker-command/{}'.format(command_digest.hash)
    
    75
    +        cascache.set_ref(command_ref, command_digest)
    
    76
    +
    
    77
    +        command_push_successful = cascache.push_refs([command_ref], self._get_project(), may_have_dependencies=False)
    
    78
    +        if not command_push_successful and not cascache.verify_key_pushed(command_ref, self._get_project()):
    
    79
    +            # Command push failed
    
    80
    +            return None
    
    81
    +
    
    82
    +        # Create and send the action.
    
    83
    +
    
    84
    +        action = remote_execution_pb2.Action(command_digest=command_digest,
    
    85
    +                                             input_root_digest=input_root_digest,
    
    86
    +                                             timeout=None,
    
    87
    +                                             do_not_cache=True)
    
    88
    +
    
    89
    +        action_digest = cascache.add_object(buffer=action.SerializeToString())
    
    90
    +        action_ref = 'worker-action/{}'.format(command_digest.hash)
    
    91
    +        cascache.set_ref(action_ref, action_digest)
    
    92
    +        action_push_successful = cascache.push_refs([action_ref], self._get_project(), may_have_dependencies=False)
    
    93
    +
    
    94
    +        if not action_push_successful and not cascache.verify_key_pushed(action_ref, self._get_project()):
    
    95
    +            # Action push failed
    
    96
    +            return None
    
    97
    +
    
    98
    +        # Next, try to create a communication channel to the BuildGrid server.
    
    99
    +
    
    100
    +        channel = grpc.insecure_channel(self.server_url)
    
    101
    +        stub = remote_execution_pb2_grpc.ExecutionStub(channel)
    
    102
    +        request = remote_execution_pb2.ExecuteRequest(instance_name='default',
    
    103
    +                                                      action_digest=action_digest,
    
    104
    +                                                      skip_cache_lookup=True)
    
    105
    +
    
    106
    +        operation_iterator = stub.Execute(request)
    
    107
    +        operation = None
    
    108
    +        with self._get_context().timed_activity("Waiting for the remote build to complete"):
    
    109
    +            # It is advantageous to check operation_iterator.code() is grpc.StatusCode.OK here,
    
    110
    +            # which will check the server is actually contactable. However, calling it when the
    
    111
    +            # server is available seems to cause .code() to hang forever.
    
    112
    +            for operation in operation_iterator:
    
    113
    +                if operation.done:
    
    114
    +                    break
    
    115
    +        return operation
    
    116
    +
    
    117
    +    def process_job_output(self, output_directories, output_files):
    
    118
    +        # output_directories is an array of OutputDirectory objects.
    
    119
    +        # output_files is an array of OutputFile objects.
    
    120
    +        #
    
    121
    +        # We only specify one output_directory, so it's an error
    
    122
    +        # for there to be any output files or more than one directory at the moment.
    
    123
    +
    
    124
    +        if output_files:
    
    125
    +            raise SandboxError("Output files were returned when we didn't request any.")
    
    126
    +        elif len(output_directories) > 1:
    
    127
    +            error_text = "More than one output directory was returned from the build server: {}"
    
    128
    +            raise SandboxError(error_text.format(output_directories))
    
    129
    +        elif len(output_directories) < 1:  # pylint: disable=len-as-condition
    
    130
    +            error_text = "No output directory was returned from the build server."
    
    131
    +            raise SandboxError(error_text)
    
    132
    +
    
    133
    +        digest = output_directories[0].tree_digest
    
    134
    +        if digest is None or digest.hash is None or digest.hash == "":
    
    135
    +            raise SandboxError("Output directory structure had no digest attached.")
    
    136
    +
    
    137
    +        # Now do a pull to ensure we have the necessary parts.
    
    138
    +        cascache = self._get_cascache()
    
    139
    +        cascache.pull_key(digest.hash, digest.size_bytes, self._get_project())
    
    140
    +        path_components = os.path.split(self._output_directory)
    
    141
    +
    
    142
    +        # Now what we have is a digest for the output. Once we return, the calling process will
    
    143
    +        # attempt to descend into our directory and find that directory, so we need to overwrite
    
    144
    +        # that.
    
    145
    +
    
    146
    +        if not path_components:
    
    147
    +            # The artifact wants the whole directory; we could just return the returned hash in its
    
    148
    +            # place, but we don't have a means to do that yet.
    
    149
    +            raise SandboxError("Unimplemented: Output directory is empty or equal to the sandbox root.")
    
    150
    +
    
    151
    +        # At the moment, we will get the whole directory back in the first directory argument and we need
    
    152
    +        # to replace the sandbox's virtual directory with that. Creating a new virtual directory object
    
    153
    +        # from another hash will be interesting, though...
    
    154
    +
    
    155
    +        new_dir = CasBasedDirectory(self._get_context(), ref=digest)
    
    156
    +        self._set_virtual_directory(new_dir)
    
    157
    +
    
    158
    +    def run(self, command, flags, *, cwd=None, env=None):
    
    159
    +        # Upload sources
    
    160
    +        upload_vdir = self.get_virtual_directory()
    
    161
    +
    
    162
    +        if isinstance(upload_vdir, FileBasedDirectory):
    
    163
    +            # Make a new temporary directory to put source in
    
    164
    +            upload_vdir = CasBasedDirectory(self._get_context(), ref=None)
    
    165
    +            upload_vdir.import_files(self.get_virtual_directory()._get_underlying_directory())
    
    166
    +
    
    167
    +        # Now, push that key (without necessarily needing a ref) to the remote.
    
    168
    +        cascache = self._get_cascache()
    
    169
    +
    
    170
    +        ref = 'worker-source/{}'.format(upload_vdir.ref.hash)
    
    171
    +        upload_vdir._save(ref)
    
    172
    +        source_push_successful = cascache.push_refs([ref], self._get_project())
    
    173
    +
    
    174
    +        # Set up environment and PWD
    
    175
    +        if env is None:
    
    176
    +            env = self._get_environment()
    
    177
    +        if 'PWD' not in env:
    
    178
    +            env['PWD'] = self._get_work_directory()
    
    179
    +
    
    180
    +        # We want command args as a list of strings
    
    181
    +        if isinstance(command, str):
    
    182
    +            command = [command]
    
    183
    +
    
    184
    +        # Now transmit the command to execute
    
    185
    +        if source_push_successful or cascache.verify_key_pushed(ref, self._get_project()):
    
    186
    +            response = self.__run_remote_command(cascache, command, upload_vdir.ref, env)
    
    187
    +
    
    188
    +            if response is None:
    
    189
    +                # Failure of remote execution, usually due to an error in BuildStream
    
    190
    +                # NB This error could be raised in __run_remote_command
    
    191
    +                raise SandboxError("No response returned from server")
    
    192
    +
    
    193
    +            assert(response.HasField("error") or response.HasField("response"))
    
    194
    +
    
    195
    +            if response.HasField("error"):
    
    196
    +                # A normal error during the build: the remote execution system
    
    197
    +                # has worked correctly but the command failed.
    
    198
    +                # response.error also contains 'message' (str) and 'details'
    
    199
    +                # (iterator of Any) which we ignore at the moment.
    
    200
    +                return response.error.code
    
    201
    +            else:
    
    202
    +
    
    203
    +                # At the moment, response can either be an
    
    204
    +                # ExecutionResponse containing an ActionResult, or an
    
    205
    +                # ActionResult directly.
    
    206
    +                executeResponse = remote_execution_pb2.ExecuteResponse()
    
    207
    +                if response.response.Is(executeResponse.DESCRIPTOR):
    
    208
    +                    # Unpack ExecuteResponse and set response to its response
    
    209
    +                    response.response.Unpack(executeResponse)
    
    210
    +                    response = executeResponse
    
    211
    +
    
    212
    +                actionResult = remote_execution_pb2.ActionResult()
    
    213
    +                if response.response.Is(actionResult.DESCRIPTOR):
    
    214
    +                    response.response.Unpack(actionResult)
    
    215
    +                    self.process_job_output(actionResult.output_directories, actionResult.output_files)
    
    216
    +                else:
    
    217
    +                    raise SandboxError("Received unknown message from server (expected ExecutionResponse).")
    
    218
    +        else:
    
    219
    +            raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
    
    220
    +        return 0

  • buildstream/sandbox/sandbox.py
    ... ... @@ -99,9 +99,11 @@ class Sandbox():
    99 99
             self.__stdout = kwargs['stdout']
    
    100 100
             self.__stderr = kwargs['stderr']
    
    101 101
     
    
    102
    -        # Setup the directories. Root should be available to subclasses, hence
    
    103
    -        # being single-underscore. The others are private to this class.
    
    102
    +        # Setup the directories. Root and output_directory should be
    
    103
    +        # available to subclasses, hence being single-underscore. The
    
    104
    +        # others are private to this class.
    
    104 105
             self._root = os.path.join(directory, 'root')
    
    106
    +        self._output_directory = None
    
    105 107
             self.__directory = directory
    
    106 108
             self.__scratch = os.path.join(self.__directory, 'scratch')
    
    107 109
             for directory_ in [self._root, self.__scratch]:
    
    ... ... @@ -144,11 +146,17 @@ class Sandbox():
    144 146
                     self._vdir = FileBasedDirectory(self._root)
    
    145 147
             return self._vdir
    
    146 148
     
    
    149
    +    def _set_virtual_directory(self, vdir):
    
    150
    +        """ Sets virtual directory. Useful after remote execution
    
    151
    +        has rewritten the working directory.
    
    152
    +        """
    
    153
    +        self._vdir = vdir
    
    154
    +
    
    147 155
         def set_environment(self, environment):
    
    148 156
             """Sets the environment variables for the sandbox
    
    149 157
     
    
    150 158
             Args:
    
    151
    -           directory (dict): The environment variables to use in the sandbox
    
    159
    +           environment (dict): The environment variables to use in the sandbox
    
    152 160
             """
    
    153 161
             self.__env = environment
    
    154 162
     
    
    ... ... @@ -160,6 +168,15 @@ class Sandbox():
    160 168
             """
    
    161 169
             self.__cwd = directory
    
    162 170
     
    
    171
    +    def set_output_directory(self, directory):
    
    172
    +        """Sets the output directory - the directory which is preserved
    
    173
    +        as an artifact after assembly.
    
    174
    +
    
    175
    +        Args:
    
    176
    +           directory (str): An absolute path within the sandbox
    
    177
    +        """
    
    178
    +        self._output_directory = directory
    
    179
    +
    
    163 180
         def mark_directory(self, directory, *, artifact=False):
    
    164 181
             """Marks a sandbox directory and ensures it will exist
    
    165 182
     
    

  • buildstream/storage/_casbaseddirectory.py
    ... ... @@ -561,3 +561,20 @@ class CasBasedDirectory(Directory):
    561 561
             throw an exception. """
    
    562 562
             raise VirtualDirectoryError("_get_underlying_directory was called on a CAS-backed directory," +
    
    563 563
                                         " which has no underlying directory.")
    
    564
    +
    
    565
    +    def _save(self, name):
    
    566
    +        """Saves this directory into the content cache as a named ref. Used
    
    567
    +        by remote execution to make references for source directories so they
    
    568
    +        can be pushed to a remote artifact server.
    
    569
    +
    
    570
    +        """
    
    571
    +        self._recalculate_recursing_up()
    
    572
    +        self._recalculate_recursing_down()
    
    573
    +        (rel_refpath, refname) = os.path.split(name)
    
    574
    +        refdir = os.path.join(self.cas_directory, 'refs', 'heads', rel_refpath)
    
    575
    +        refname = os.path.join(refdir, refname)
    
    576
    +
    
    577
    +        if not os.path.exists(refdir):
    
    578
    +            os.makedirs(refdir)
    
    579
    +        with open(refname, "wb") as f:
    
    580
    +            f.write(self.ref.SerializeToString())

  • doc/source/format_project.rst
    ... ... @@ -204,6 +204,23 @@ with an artifact share.
    204 204
     You can also specify a list of caches here; earlier entries in the list
    
    205 205
     will have higher priority than later ones.
    
    206 206
     
    
    207
    +Remote execution
    
    208
    +~~~~~~~~~~~~~~~~
    
    209
    +Buildstream supports remote execution using the Google Remote Execution API
    
    210
    +(REAPI). A description of how remote execution works is beyond the scope
    
    211
    +of this document, but you can specify a remote server complying with the REAPI
    
    212
    +using the `remote-execution` option:
    
    213
    +
    
    214
    +.. code:: yaml
    
    215
    +
    
    216
    +  remote-execution:
    
    217
    +
    
    218
    +    # A url defining a remote execution server
    
    219
    +    url: buildserver.example.com:50051
    
    220
    +
    
    221
    +The url should be a hostname and port separated by ':'. Do not include a protocol.
    
    222
    +
    
    223
    +The Remote Execution API can be found via https://github.com/bazelbuild/remote-apis.
    
    207 224
     
    
    208 225
     .. _project_essentials_mirrors:
    
    209 226
     
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]