finnball pushed to branch finn/cas-commands at BuildGrid / buildgrid
Commits:
- 
b0066338
by finn at 2018-08-15T15:06:23Z
- 
d5c182ed
by finn at 2018-08-15T15:06:26Z
- 
e32c8c8c
by finn at 2018-08-15T15:06:26Z
- 
31249375
by finn at 2018-08-15T15:06:26Z
13 changed files:
- README.rst
- + app/bots/__init__.py
- + app/bots/buildbox.py
- + app/bots/dummy.py
- + app/bots/temp_directory.py
- app/commands/cmd_bot.py
- + app/commands/cmd_cas.py
- app/commands/cmd_execute.py
- buildgrid/bot/bot_session.py
- buildgrid/server/job.py
- buildgrid/server/scheduler.py
- + buildgrid/utils.py
- tests/integration/operations_service.py
Changes:
| ... | ... | @@ -38,7 +38,7 @@ In one terminal, start a server:: | 
| 38 | 38 |  | 
| 39 | 39 |  In another terminal, send a request for work::
 | 
| 40 | 40 |  | 
| 41 | -  bgd execute request
 | |
| 41 | +  bgd execute request-dummy
 | |
| 42 | 42 |  | 
| 43 | 43 |  The stage should show as `QUEUED` as it awaits a bot to pick up the work::
 | 
| 44 | 44 |  | 
| ... | ... | @@ -51,3 +51,35 @@ Create a bot session:: | 
| 51 | 51 |  Show the work as completed::
 | 
| 52 | 52 |  | 
| 53 | 53 |    bgd execute list
 | 
| 54 | + | |
| 55 | +Instructions for a Simple Build
 | |
| 56 | +-------------------------------
 | |
| 57 | + | |
| 58 | +This example covers a simple build. The user will upload a directory containing a C file and a command to the CAS. The bot will then fetch the uploaded directory and command which will then be run inside a temporary directory. The result will then be uploaded to the CAS and downloaded by the user. This is an early demo and still lacks a few features such as symlink support and checking to see if files exist in the CAS before executing a command.
 | |
| 59 | + | |
| 60 | +Create a new directory called `test-buildgrid/` and place the following C file in it called `hello.c`::
 | |
| 61 | + | |
| 62 | +  #include <stdio.h>
 | |
| 63 | +  int main()
 | |
| 64 | +  {
 | |
| 65 | +   printf("Hello, World!");
 | |
| 66 | +   return 0;
 | |
| 67 | +  }
 | |
| 68 | + | |
| 69 | +Now start a BuildGrid server, passing it a directory it can write a CAS to::
 | |
| 70 | + | |
| 71 | +  bgd server start --cas disk --cas-cache disk --cas-disk-directory /path/to/empty/directory
 | |
| 72 | + | |
| 73 | +Start the following bot session::
 | |
| 74 | + | |
| 75 | +  bgd bot temp-directory
 | |
| 76 | + | |
| 77 | +Upload the directory containing the C file::
 | |
| 78 | + | |
| 79 | +  bgd cas upload-dir /path/to/test-buildgrid
 | |
| 80 | + | |
| 81 | +Now we send an execution request to the bot with the name of the epxected `output-file`, a boolean describing if it is executeable, the path to the directory we uploaded in order to calculate the digest and finally the command to run on the bot::
 | |
| 82 | + | |
| 83 | +  bgd execute command --output-file hello True /path/to/test-buildgrid -- gcc -Wall hello.c -o hello
 | |
| 84 | + | |
| 85 | +The resulting executeable should have returned to a new directory called `testing/` | 
| 1 | +# Copyright (C) 2018 Bloomberg LP
 | |
| 2 | +#
 | |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License");
 | |
| 4 | +# you may not use this file except in compliance with the License.
 | |
| 5 | +# You may obtain a copy of the License at
 | |
| 6 | +#
 | |
| 7 | +#  <http://www.apache.org/licenses/LICENSE-2.0>
 | |
| 8 | +#
 | |
| 9 | +# Unless required by applicable law or agreed to in writing, software
 | |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS,
 | |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| 12 | +# See the License for the specific language governing permissions and
 | |
| 13 | +# limitations under the License.
 | |
| 14 | + | |
| 15 | + | |
| 16 | +import os
 | |
| 17 | +import subprocess
 | |
| 18 | +import tempfile
 | |
| 19 | +import grpc
 | |
| 20 | + | |
| 21 | +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 | |
| 22 | +from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
 | |
| 23 | +from buildgrid.utils import read_file
 | |
| 24 | +from google.protobuf import any_pb2
 | |
| 25 | + | |
| 26 | + | |
| 27 | +def work_buildbox(context, lease):
 | |
| 28 | +    logger = context.logger
 | |
| 29 | + | |
| 30 | +    action_any = lease.payload
 | |
| 31 | +    action = remote_execution_pb2.Action()
 | |
| 32 | +    action_any.Unpack(action)
 | |
| 33 | + | |
| 34 | +    cert_server = read_file(context.server_cert)
 | |
| 35 | +    cert_client = read_file(context.client_cert)
 | |
| 36 | +    key_client = read_file(context.client_key)
 | |
| 37 | + | |
| 38 | +    # create server credentials
 | |
| 39 | +    credentials = grpc.ssl_channel_credentials(root_certificates=cert_server,
 | |
| 40 | +                                               private_key=key_client,
 | |
| 41 | +                                               certificate_chain=cert_client)
 | |
| 42 | + | |
| 43 | +    channel = grpc.secure_channel('{}:{}'.format(context.remote, context.port), credentials)
 | |
| 44 | + | |
| 45 | +    stub = bytestream_pb2_grpc.ByteStreamStub(channel)
 | |
| 46 | + | |
| 47 | +    remote_command = _buildstream_fetch_command(context.local_cas, stub, action.command_digest)
 | |
| 48 | +    environment = dict((x.name, x.value) for x in remote_command.environment_variables)
 | |
| 49 | +    logger.debug("command hash: {}".format(action.command_digest.hash))
 | |
| 50 | +    logger.debug("vdir hash: {}".format(action.input_root_digest.hash))
 | |
| 51 | +    logger.debug("\n{}".format(' '.join(remote_command.arguments)))
 | |
| 52 | + | |
| 53 | +    command = ['buildbox',
 | |
| 54 | +               '--remote={}'.format('https://{}:{}'.format(context.remote, context.port)),
 | |
| 55 | +               '--server-cert={}'.format(context.server_cert),
 | |
| 56 | +               '--client-key={}'.format(context.client_key),
 | |
| 57 | +               '--client-cert={}'.format(context.client_cert),
 | |
| 58 | +               '--local={}'.format(context.local_cas),
 | |
| 59 | +               '--chdir={}'.format(environment['PWD']),
 | |
| 60 | +               context.fuse_dir]
 | |
| 61 | + | |
| 62 | +    command.extend(remote_command.arguments)
 | |
| 63 | + | |
| 64 | +    logger.debug(' '.join(command))
 | |
| 65 | +    logger.debug("Input root digest:\n{}".format(action.input_root_digest))
 | |
| 66 | +    logger.info("Launching process")
 | |
| 67 | + | |
| 68 | +    proc = subprocess.Popen(command,
 | |
| 69 | +                            stdin=subprocess.PIPE,
 | |
| 70 | +                            stdout=subprocess.PIPE)
 | |
| 71 | +    std_send = action.input_root_digest.SerializeToString()
 | |
| 72 | +    std_out, _ = proc.communicate(std_send)
 | |
| 73 | + | |
| 74 | +    output_root_digest = remote_execution_pb2.Digest()
 | |
| 75 | +    output_root_digest.ParseFromString(std_out)
 | |
| 76 | +    logger.debug("Output root digest: {}".format(output_root_digest))
 | |
| 77 | + | |
| 78 | +    output_file = remote_execution_pb2.OutputDirectory(tree_digest=output_root_digest)
 | |
| 79 | + | |
| 80 | +    action_result = remote_execution_pb2.ActionResult()
 | |
| 81 | +    action_result.output_directories.extend([output_file])
 | |
| 82 | + | |
| 83 | +    action_result_any = any_pb2.Any()
 | |
| 84 | +    action_result_any.Pack(action_result)
 | |
| 85 | + | |
| 86 | +    lease.result.CopyFrom(action_result_any)
 | |
| 87 | + | |
| 88 | +    return lease
 | |
| 89 | + | |
| 90 | + | |
| 91 | +def _buildstream_fetch_blob(remote, digest, out):
 | |
| 92 | +    resource_name = os.path.join(digest.hash, str(digest.size_bytes))
 | |
| 93 | +    request = bytestream_pb2.ReadRequest()
 | |
| 94 | +    request.resource_name = resource_name
 | |
| 95 | +    request.read_offset = 0
 | |
| 96 | +    for response in remote.Read(request):
 | |
| 97 | +        out.write(response.data)
 | |
| 98 | + | |
| 99 | +    out.flush()
 | |
| 100 | +    assert digest.size_bytes == os.fstat(out.fileno()).st_size
 | |
| 101 | + | |
| 102 | + | |
| 103 | +def _buildstream_fetch_command(casdir, remote, digest):
 | |
| 104 | +    with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as out:
 | |
| 105 | +        _buildstream_fetch_blob(remote, digest, out)
 | |
| 106 | +        remote_command = remote_execution_pb2.Command()
 | |
| 107 | +        with open(out.name, 'rb') as f:
 | |
| 108 | +            remote_command.ParseFromString(f.read())
 | |
| 109 | +        return remote_command
 | |
| 110 | + | |
| 111 | + | |
| 112 | +def _buildstream_fetch_action(casdir, remote, digest):
 | |
| 113 | +    with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as out:
 | |
| 114 | +        _buildstream_fetch_blob(remote, digest, out)
 | |
| 115 | +        remote_action = remote_execution_pb2.Action()
 | |
| 116 | +        with open(out.name, 'rb') as f:
 | |
| 117 | +            remote_action.ParseFromString(f.read())
 | |
| 118 | +        return remote_action | 
| 1 | +# Copyright (C) 2018 Bloomberg LP
 | |
| 2 | +#
 | |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License");
 | |
| 4 | +# you may not use this file except in compliance with the License.
 | |
| 5 | +# You may obtain a copy of the License at
 | |
| 6 | +#
 | |
| 7 | +#  <http://www.apache.org/licenses/LICENSE-2.0>
 | |
| 8 | +#
 | |
| 9 | +# Unless required by applicable law or agreed to in writing, software
 | |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS,
 | |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| 12 | +# See the License for the specific language governing permissions and
 | |
| 13 | +# limitations under the License.
 | |
| 14 | + | |
| 15 | + | |
| 16 | +import random
 | |
| 17 | +import time
 | |
| 18 | + | |
| 19 | + | |
| 20 | +def work_dummy(context, lease):
 | |
| 21 | +    """ Just returns lease after some random time
 | |
| 22 | +    """
 | |
| 23 | +    time.sleep(random.randint(1, 5))
 | |
| 24 | +    return lease | 
| 1 | +# Copyright (C) 2018 Bloomberg LP
 | |
| 2 | +#
 | |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License");
 | |
| 4 | +# you may not use this file except in compliance with the License.
 | |
| 5 | +# You may obtain a copy of the License at
 | |
| 6 | +#
 | |
| 7 | +#  <http://www.apache.org/licenses/LICENSE-2.0>
 | |
| 8 | +#
 | |
| 9 | +# Unless required by applicable law or agreed to in writing, software
 | |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS,
 | |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| 12 | +# See the License for the specific language governing permissions and
 | |
| 13 | +# limitations under the License.
 | |
| 14 | + | |
| 15 | + | |
| 16 | +import os
 | |
| 17 | +import subprocess
 | |
| 18 | +import tempfile
 | |
| 19 | + | |
| 20 | +from buildgrid.utils import read_file, create_digest, write_fetch_directory, parse_to_pb2_from_fetch
 | |
| 21 | + | |
| 22 | +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
 | |
| 23 | +from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
 | |
| 24 | +from google.protobuf import any_pb2
 | |
| 25 | + | |
| 26 | + | |
| 27 | +def work_temp_directory(context, lease):
 | |
| 28 | +    """ Bot downloads directories and files into a temp directory,
 | |
| 29 | +    then uploads results back to CAS
 | |
| 30 | +    """
 | |
| 31 | + | |
| 32 | +    instance_name = context.instance_name
 | |
| 33 | +    stub_bytestream = bytestream_pb2_grpc.ByteStreamStub(context.channel)
 | |
| 34 | + | |
| 35 | +    action_digest = remote_execution_pb2.Digest()
 | |
| 36 | +    lease.payload.Unpack(action_digest)
 | |
| 37 | + | |
| 38 | +    action = remote_execution_pb2.Action()
 | |
| 39 | + | |
| 40 | +    action = parse_to_pb2_from_fetch(action, stub_bytestream, action_digest, instance_name)
 | |
| 41 | + | |
| 42 | +    with tempfile.TemporaryDirectory() as temp_dir:
 | |
| 43 | + | |
| 44 | +        command = remote_execution_pb2.Command()
 | |
| 45 | +        command = parse_to_pb2_from_fetch(command, stub_bytestream, action.command_digest, instance_name)
 | |
| 46 | + | |
| 47 | +        arguments = "cd {} &&".format(temp_dir)
 | |
| 48 | + | |
| 49 | +        for argument in command.arguments:
 | |
| 50 | +            arguments += " {}".format(argument)
 | |
| 51 | + | |
| 52 | +        context.logger.info(arguments)
 | |
| 53 | + | |
| 54 | +        write_fetch_directory(temp_dir, stub_bytestream, action.input_root_digest, instance_name)
 | |
| 55 | + | |
| 56 | +        proc = subprocess.Popen(arguments,
 | |
| 57 | +                         shell=True,
 | |
| 58 | +                         stdin=subprocess.PIPE,
 | |
| 59 | +                         stdout=subprocess.PIPE)
 | |
| 60 | + | |
| 61 | +        # TODO: Should return the std_out to the user
 | |
| 62 | +        proc.communicate()
 | |
| 63 | + | |
| 64 | +        result = remote_execution_pb2.ActionResult()
 | |
| 65 | +        requests = []
 | |
| 66 | +        for output_file in command.output_files:
 | |
| 67 | +            path = os.path.join(temp_dir, output_file)
 | |
| 68 | +            chunk = read_file(path)
 | |
| 69 | + | |
| 70 | +            digest = create_digest(chunk)
 | |
| 71 | + | |
| 72 | +            result.output_files.extend([remote_execution_pb2.OutputFile(path=output_file,
 | |
| 73 | +                                                                        digest=digest)])
 | |
| 74 | + | |
| 75 | +            requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
 | |
| 76 | +                digest=digest, data=chunk))
 | |
| 77 | + | |
| 78 | +        request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
 | |
| 79 | +                                                               requests=requests)
 | |
| 80 | + | |
| 81 | +        stub_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel)
 | |
| 82 | +        stub_cas.BatchUpdateBlobs(request)
 | |
| 83 | + | |
| 84 | +        result_any = any_pb2.Any()
 | |
| 85 | +        result_any.Pack(result)
 | |
| 86 | + | |
| 87 | +        lease.result.CopyFrom(result_any)
 | |
| 88 | + | |
| 89 | +    return lease | 
| ... | ... | @@ -22,23 +22,17 @@ Bot command | 
| 22 | 22 |  Create a bot interface and request work
 | 
| 23 | 23 |  """
 | 
| 24 | 24 |  | 
| 25 | -import asyncio
 | |
| 26 | 25 |  import logging
 | 
| 27 | -import os
 | |
| 28 | -import random
 | |
| 29 | -import subprocess
 | |
| 30 | -import tempfile
 | |
| 26 | + | |
| 31 | 27 |  from pathlib import Path, PurePath
 | 
| 32 | 28 |  | 
| 33 | 29 |  import click
 | 
| 34 | 30 |  import grpc
 | 
| 35 | -from google.protobuf import any_pb2
 | |
| 36 | 31 |  | 
| 37 | 32 |  from buildgrid.bot import bot, bot_interface
 | 
| 38 | 33 |  from buildgrid.bot.bot_session import BotSession, Device, Worker
 | 
| 39 | -from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
 | |
| 40 | -from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 | |
| 41 | 34 |  | 
| 35 | +from ..bots import buildbox, dummy, temp_directory
 | |
| 42 | 36 |  from ..cli import pass_context
 | 
| 43 | 37 |  | 
| 44 | 38 |  | 
| ... | ... | @@ -53,6 +47,7 @@ def cli(context, host, port, parent): | 
| 53 | 47 |  | 
| 54 | 48 |      context.logger = logging.getLogger(__name__)
 | 
| 55 | 49 |      context.logger.info("Starting on port {}".format(port))
 | 
| 50 | +    context.channel = channel
 | |
| 56 | 51 |  | 
| 57 | 52 |      worker = Worker()
 | 
| 58 | 53 |      worker.add_device(Device())
 | 
| ... | ... | @@ -63,16 +58,33 @@ def cli(context, host, port, parent): | 
| 63 | 58 |      context.bot_session = bot_session
 | 
| 64 | 59 |  | 
| 65 | 60 |  | 
| 66 | -@cli.command('dummy', short_help="Create a dummy bot session")
 | |
| 61 | +@cli.command('dummy', short_help='Create a dummy bot session which just returns lease')
 | |
| 67 | 62 |  @pass_context
 | 
| 68 | -def dummy(context):
 | |
| 63 | +def run_dummy(context):
 | |
| 69 | 64 |      """
 | 
| 70 | 65 |      Simple dummy client. Creates a session, accepts leases, does fake work and
 | 
| 71 | 66 |      updates the server.
 | 
| 72 | 67 |      """
 | 
| 73 | 68 |      try:
 | 
| 74 | 69 |          b = bot.Bot(context.bot_session)
 | 
| 75 | -        b.session(_work_dummy,
 | |
| 70 | +        b.session(dummy.work_dummy,
 | |
| 71 | +                  context)
 | |
| 72 | + | |
| 73 | +    except KeyboardInterrupt:
 | |
| 74 | +        pass
 | |
| 75 | + | |
| 76 | + | |
| 77 | +@cli.command('temp-directory', short_help='Runs commands in temp directory and uploads results')
 | |
| 78 | +@click.option('--instance-name', default='testing')
 | |
| 79 | +@pass_context
 | |
| 80 | +def run_temp_directory(context, instance_name):
 | |
| 81 | +    """ Downloads files and command from CAS and runs
 | |
| 82 | +    in a temp directory, uploading result back to CAS
 | |
| 83 | +    """
 | |
| 84 | +    context.instance_name = instance_name
 | |
| 85 | +    try:
 | |
| 86 | +        b = bot.Bot(context.bot_session)
 | |
| 87 | +        b.session(temp_directory.work_temp_directory,
 | |
| 76 | 88 |                    context)
 | 
| 77 | 89 |  | 
| 78 | 90 |      except KeyboardInterrupt:
 | 
| ... | ... | @@ -88,7 +100,7 @@ def dummy(context): | 
| 88 | 100 |  @click.option('--port', show_default=True, default=11001)
 | 
| 89 | 101 |  @click.option('--remote', show_default=True, default='localhost')
 | 
| 90 | 102 |  @pass_context
 | 
| 91 | -def work_buildbox(context, remote, port, server_cert, client_key, client_cert, local_cas, fuse_dir):
 | |
| 103 | +def run_buildbox(context, remote, port, server_cert, client_key, client_cert, local_cas, fuse_dir):
 | |
| 92 | 104 |      """
 | 
| 93 | 105 |      Uses BuildBox to run commands.
 | 
| 94 | 106 |      """
 | 
| ... | ... | @@ -104,104 +116,9 @@ def work_buildbox(context, remote, port, server_cert, client_key, client_cert, l | 
| 104 | 116 |      context.fuse_dir = fuse_dir
 | 
| 105 | 117 |  | 
| 106 | 118 |      try:
 | 
| 107 | -        b = bot.Bot(bot_session=context.bot_session)
 | |
| 108 | 119 |  | 
| 109 | -        b.session(work=_work_buildbox,
 | |
| 120 | +        b = bot.Bot(context.bot_session)
 | |
| 121 | +        b.session(work=buildbox.work_buildbox,
 | |
| 110 | 122 |                    context=context)
 | 
| 111 | 123 |      except KeyboardInterrupt:
 | 
| 112 | 124 |          pass | 
| 113 | - | |
| 114 | - | |
| 115 | -async def _work_dummy(context, lease):
 | |
| 116 | -    await asyncio.sleep(random.randint(1, 5))
 | |
| 117 | -    return lease
 | |
| 118 | - | |
| 119 | - | |
| 120 | -async def _work_buildbox(context, lease):
 | |
| 121 | -    logger = context.logger
 | |
| 122 | - | |
| 123 | -    action_any = lease.payload
 | |
| 124 | -    action = remote_execution_pb2.Action()
 | |
| 125 | -    action_any.Unpack(action)
 | |
| 126 | - | |
| 127 | -    cert_server = _file_read(context.server_cert)
 | |
| 128 | -    cert_client = _file_read(context.client_cert)
 | |
| 129 | -    key_client = _file_read(context.client_key)
 | |
| 130 | - | |
| 131 | -    # create server credentials
 | |
| 132 | -    credentials = grpc.ssl_channel_credentials(root_certificates=cert_server,
 | |
| 133 | -                                               private_key=key_client,
 | |
| 134 | -                                               certificate_chain=cert_client)
 | |
| 135 | - | |
| 136 | -    channel = grpc.secure_channel('{}:{}'.format(context.remote, context.port), credentials)
 | |
| 137 | - | |
| 138 | -    stub = bytestream_pb2_grpc.ByteStreamStub(channel)
 | |
| 139 | - | |
| 140 | -    remote_command = _fetch_command(context.local_cas, stub, action.command_digest)
 | |
| 141 | -    environment = dict((x.name, x.value) for x in remote_command.environment_variables)
 | |
| 142 | -    logger.debug("command hash: {}".format(action.command_digest.hash))
 | |
| 143 | -    logger.debug("vdir hash: {}".format(action.input_root_digest.hash))
 | |
| 144 | -    logger.debug("\n{}".format(' '.join(remote_command.arguments)))
 | |
| 145 | - | |
| 146 | -    command = ['buildbox',
 | |
| 147 | -               '--remote={}'.format('https://{}:{}'.format(context.remote, context.port)),
 | |
| 148 | -               '--server-cert={}'.format(context.server_cert),
 | |
| 149 | -               '--client-key={}'.format(context.client_key),
 | |
| 150 | -               '--client-cert={}'.format(context.client_cert),
 | |
| 151 | -               '--local={}'.format(context.local_cas),
 | |
| 152 | -               '--chdir={}'.format(environment['PWD']),
 | |
| 153 | -               context.fuse_dir]
 | |
| 154 | - | |
| 155 | -    command.extend(remote_command.arguments)
 | |
| 156 | - | |
| 157 | -    logger.debug(' '.join(command))
 | |
| 158 | -    logger.debug("Input root digest:\n{}".format(action.input_root_digest))
 | |
| 159 | -    logger.info("Launching process")
 | |
| 160 | - | |
| 161 | -    proc = subprocess.Popen(command,
 | |
| 162 | -                            stdin=subprocess.PIPE,
 | |
| 163 | -                            stdout=subprocess.PIPE)
 | |
| 164 | -    std_send = action.input_root_digest.SerializeToString()
 | |
| 165 | -    std_out, _ = proc.communicate(std_send)
 | |
| 166 | - | |
| 167 | -    output_root_digest = remote_execution_pb2.Digest()
 | |
| 168 | -    output_root_digest.ParseFromString(std_out)
 | |
| 169 | -    logger.debug("Output root digest: {}".format(output_root_digest))
 | |
| 170 | - | |
| 171 | -    output_file = remote_execution_pb2.OutputDirectory(tree_digest=output_root_digest)
 | |
| 172 | - | |
| 173 | -    action_result = remote_execution_pb2.ActionResult()
 | |
| 174 | -    action_result.output_directories.extend([output_file])
 | |
| 175 | - | |
| 176 | -    action_result_any = any_pb2.Any()
 | |
| 177 | -    action_result_any.Pack(action_result)
 | |
| 178 | - | |
| 179 | -    lease.result.CopyFrom(action_result_any)
 | |
| 180 | - | |
| 181 | -    return lease
 | |
| 182 | - | |
| 183 | - | |
| 184 | -def _fetch_blob(remote, digest, out):
 | |
| 185 | -    resource_name = os.path.join(digest.hash, str(digest.size_bytes))
 | |
| 186 | -    request = bytestream_pb2.ReadRequest()
 | |
| 187 | -    request.resource_name = resource_name
 | |
| 188 | -    request.read_offset = 0
 | |
| 189 | -    for response in remote.Read(request):
 | |
| 190 | -        out.write(response.data)
 | |
| 191 | - | |
| 192 | -    out.flush()
 | |
| 193 | -    assert digest.size_bytes == os.fstat(out.fileno()).st_size
 | |
| 194 | - | |
| 195 | - | |
| 196 | -def _fetch_command(casdir, remote, digest):
 | |
| 197 | -    with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as out:
 | |
| 198 | -        _fetch_blob(remote, digest, out)
 | |
| 199 | -        remote_command = remote_execution_pb2.Command()
 | |
| 200 | -        with open(out.name, 'rb') as f:
 | |
| 201 | -            remote_command.ParseFromString(f.read())
 | |
| 202 | -        return remote_command
 | |
| 203 | - | |
| 204 | - | |
| 205 | -def _file_read(file_path):
 | |
| 206 | -    with open(file_path, 'rb') as f:
 | |
| 207 | -        return f.read() | 
| 1 | +# Copyright (C) 2018 Bloomberg LP
 | |
| 2 | +#
 | |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License");
 | |
| 4 | +# you may not use this file except in compliance with the License.
 | |
| 5 | +# You may obtain a copy of the License at
 | |
| 6 | +#
 | |
| 7 | +#  <http://www.apache.org/licenses/LICENSE-2.0>
 | |
| 8 | +#
 | |
| 9 | +# Unless required by applicable law or agreed to in writing, software
 | |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS,
 | |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| 12 | +# See the License for the specific language governing permissions and
 | |
| 13 | +# limitations under the License.
 | |
| 14 | + | |
| 15 | + | |
| 16 | +"""
 | |
| 17 | +Execute command
 | |
| 18 | +=================
 | |
| 19 | + | |
| 20 | +Request work to be executed and monitor status of jobs.
 | |
| 21 | +"""
 | |
| 22 | + | |
| 23 | +import logging
 | |
| 24 | +import click
 | |
| 25 | +import grpc
 | |
| 26 | + | |
| 27 | +from buildgrid.utils import merkle_maker, create_digest
 | |
| 28 | +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
 | |
| 29 | + | |
| 30 | +from ..cli import pass_context
 | |
| 31 | + | |
| 32 | + | |
| 33 | +@click.group(short_help='Interact with the CAS')
 | |
| 34 | +@click.option('--port', default='50051')
 | |
| 35 | +@click.option('--host', default='localhost')
 | |
| 36 | +@pass_context
 | |
| 37 | +def cli(context, host, port):
 | |
| 38 | +    context.logger = logging.getLogger(__name__)
 | |
| 39 | +    context.logger.info("Starting on port {}".format(port))
 | |
| 40 | + | |
| 41 | +    context.channel = grpc.insecure_channel('{}:{}'.format(host, port))
 | |
| 42 | +    context.port = port
 | |
| 43 | + | |
| 44 | + | |
| 45 | +@cli.command('upload-files', short_help='Upload files')
 | |
| 46 | +@click.argument('files', nargs=-1, type=click.File('rb'))
 | |
| 47 | +@click.option('--instance-name', default='testing')
 | |
| 48 | +@pass_context
 | |
| 49 | +def upload_files(context, files, instance_name):
 | |
| 50 | +    stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel)
 | |
| 51 | + | |
| 52 | +    requests = []
 | |
| 53 | +    for file in files:
 | |
| 54 | +        chunk = file.read()
 | |
| 55 | +        requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
 | |
| 56 | +            digest=create_digest(chunk), data=chunk))
 | |
| 57 | + | |
| 58 | +    request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
 | |
| 59 | +                                                           requests=requests)
 | |
| 60 | + | |
| 61 | +    context.logger.info("Sending: {}".format(request))
 | |
| 62 | +    response = stub.BatchUpdateBlobs(request)
 | |
| 63 | +    context.logger.info("Response: {}".format(response))
 | |
| 64 | + | |
| 65 | + | |
| 66 | +@cli.command('upload-dir', short_help='Upload files')
 | |
| 67 | +@click.argument('directory')
 | |
| 68 | +@click.option('--instance-name', default='testing')
 | |
| 69 | +@pass_context
 | |
| 70 | +def upload_dir(context, directory, instance_name):
 | |
| 71 | +    context.logger.info("Uploading directory to cas")
 | |
| 72 | +    stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel)
 | |
| 73 | + | |
| 74 | +    requests = []
 | |
| 75 | + | |
| 76 | +    for chunk, file_digest in merkle_maker(directory):
 | |
| 77 | +        requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
 | |
| 78 | +            digest=file_digest, data=chunk))
 | |
| 79 | + | |
| 80 | +    request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
 | |
| 81 | +                                                           requests=requests)
 | |
| 82 | + | |
| 83 | +    context.logger.info("Request:\n{}".format(request))
 | |
| 84 | +    response = stub.BatchUpdateBlobs(request)
 | |
| 85 | +    context.logger.info("Response:\n{}".format(response)) | 
| ... | ... | @@ -22,18 +22,22 @@ Execute command | 
| 22 | 22 |  Request work to be executed and monitor status of jobs.
 | 
| 23 | 23 |  """
 | 
| 24 | 24 |  | 
| 25 | +import errno
 | |
| 25 | 26 |  import logging
 | 
| 26 | - | |
| 27 | +import stat
 | |
| 28 | +import os
 | |
| 27 | 29 |  import click
 | 
| 28 | 30 |  import grpc
 | 
| 29 | 31 |  | 
| 32 | +from buildgrid.utils import merkle_maker, create_digest, write_fetch_blob
 | |
| 30 | 33 |  from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
 | 
| 34 | +from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
 | |
| 31 | 35 |  from buildgrid._protos.google.longrunning import operations_pb2, operations_pb2_grpc
 | 
| 32 | 36 |  | 
| 33 | 37 |  from ..cli import pass_context
 | 
| 34 | 38 |  | 
| 35 | 39 |  | 
| 36 | -@click.group(short_help="Simple execute client")
 | |
| 40 | +@click.group(short_help='Simple execute client')
 | |
| 37 | 41 |  @click.option('--port', default='50051')
 | 
| 38 | 42 |  @click.option('--host', default='localhost')
 | 
| 39 | 43 |  @pass_context
 | 
| ... | ... | @@ -45,12 +49,12 @@ def cli(context, host, port): | 
| 45 | 49 |      context.port = port
 | 
| 46 | 50 |  | 
| 47 | 51 |  | 
| 48 | -@cli.command('request', short_help="Send a dummy action")
 | |
| 52 | +@cli.command('request-dummy', short_help='Send a dummy action')
 | |
| 49 | 53 |  @click.option('--number', default=1)
 | 
| 50 | 54 |  @click.option('--instance-name', default='testing')
 | 
| 51 | -@click.option('--wait-for-completion', is_flag=True)
 | |
| 55 | +@click.option('--wait-for-completion', is_flag=True, help='Stream updates until jobs are completed')
 | |
| 52 | 56 |  @pass_context
 | 
| 53 | -def request(context, number, instance_name, wait_for_completion):
 | |
| 57 | +def request_dummy(context, number, instance_name, wait_for_completion):
 | |
| 54 | 58 |      action_digest = remote_execution_pb2.Digest()
 | 
| 55 | 59 |  | 
| 56 | 60 |      context.logger.info("Sending execution request...\n")
 | 
| ... | ... | @@ -72,7 +76,7 @@ def request(context, number, instance_name, wait_for_completion): | 
| 72 | 76 |              context.logger.info(next(response))
 | 
| 73 | 77 |  | 
| 74 | 78 |  | 
| 75 | -@cli.command('status', short_help="Get the status of an operation")
 | |
| 79 | +@cli.command('status', short_help='Get the status of an operation')
 | |
| 76 | 80 |  @click.argument('operation-name')
 | 
| 77 | 81 |  @pass_context
 | 
| 78 | 82 |  def operation_status(context, operation_name):
 | 
| ... | ... | @@ -85,7 +89,7 @@ def operation_status(context, operation_name): | 
| 85 | 89 |      context.logger.info(response)
 | 
| 86 | 90 |  | 
| 87 | 91 |  | 
| 88 | -@cli.command('list', short_help="List operations")
 | |
| 92 | +@cli.command('list', short_help='List operations')
 | |
| 89 | 93 |  @pass_context
 | 
| 90 | 94 |  def list_operations(context):
 | 
| 91 | 95 |      context.logger.info("Getting list of operations")
 | 
| ... | ... | @@ -103,7 +107,7 @@ def list_operations(context): | 
| 103 | 107 |          context.logger.info(op)
 | 
| 104 | 108 |  | 
| 105 | 109 |  | 
| 106 | -@cli.command('wait', short_help="Streams an operation until it is complete")
 | |
| 110 | +@cli.command('wait', short_help='Streams an operation until it is complete')
 | |
| 107 | 111 |  @click.argument('operation-name')
 | 
| 108 | 112 |  @pass_context
 | 
| 109 | 113 |  def wait_execution(context, operation_name):
 | 
| ... | ... | @@ -114,3 +118,86 @@ def wait_execution(context, operation_name): | 
| 114 | 118 |  | 
| 115 | 119 |      for stream in response:
 | 
| 116 | 120 |          context.logger.info(stream)
 | 
| 121 | + | |
| 122 | + | |
| 123 | +@cli.command('command', short_help='Send a command to be executed')
 | |
| 124 | +@click.argument('input-root')
 | |
| 125 | +@click.argument('commands', nargs=-1)
 | |
| 126 | +@click.option('--output-file', nargs=2, type=(str, bool), multiple=True,
 | |
| 127 | +              help='{Expected output file, is_executeable flag}')
 | |
| 128 | +@click.option('--output-directory', default='testing', help='Output directory for output files')
 | |
| 129 | +@click.option('--instance-name', default='testing')
 | |
| 130 | +@pass_context
 | |
| 131 | +def command(context, input_root, commands, output_file, output_directory, instance_name):
 | |
| 132 | +    stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
 | |
| 133 | + | |
| 134 | +    execute_command = remote_execution_pb2.Command()
 | |
| 135 | + | |
| 136 | +    for arg in commands:
 | |
| 137 | +        execute_command.arguments.extend([arg])
 | |
| 138 | + | |
| 139 | +    output_executeables = []
 | |
| 140 | +    for file, is_executeable in output_file:
 | |
| 141 | +        execute_command.output_files.extend([file])
 | |
| 142 | +        if is_executeable:
 | |
| 143 | +            output_executeables.append(file)
 | |
| 144 | + | |
| 145 | +    command_digest = create_digest(execute_command.SerializeToString())
 | |
| 146 | +    context.logger.info(command_digest)
 | |
| 147 | + | |
| 148 | +    # TODO: Check for missing blobs
 | |
| 149 | +    digest = None
 | |
| 150 | +    for _, digest in merkle_maker(input_root):
 | |
| 151 | +        pass
 | |
| 152 | + | |
| 153 | +    action = remote_execution_pb2.Action(command_digest=command_digest,
 | |
| 154 | +                                         input_root_digest=digest,
 | |
| 155 | +                                         do_not_cache=True)
 | |
| 156 | + | |
| 157 | +    action_digest = create_digest(action.SerializeToString())
 | |
| 158 | + | |
| 159 | +    context.logger.info("Sending execution request...\n")
 | |
| 160 | + | |
| 161 | +    requests = []
 | |
| 162 | +    requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
 | |
| 163 | +        digest=command_digest, data=execute_command.SerializeToString()))
 | |
| 164 | + | |
| 165 | +    requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
 | |
| 166 | +        digest=action_digest, data=action.SerializeToString()))
 | |
| 167 | + | |
| 168 | +    request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
 | |
| 169 | +                                                           requests=requests)
 | |
| 170 | +    remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel).BatchUpdateBlobs(request)
 | |
| 171 | + | |
| 172 | +    request = remote_execution_pb2.ExecuteRequest(instance_name=instance_name,
 | |
| 173 | +                                                  action_digest=action_digest,
 | |
| 174 | +                                                  skip_cache_lookup=True)
 | |
| 175 | +    response = stub.Execute(request)
 | |
| 176 | + | |
| 177 | +    stub = bytestream_pb2_grpc.ByteStreamStub(context.channel)
 | |
| 178 | + | |
| 179 | +    stream = None
 | |
| 180 | +    for stream in response:
 | |
| 181 | +        context.logger.info(stream)
 | |
| 182 | + | |
| 183 | +    execute_response = remote_execution_pb2.ExecuteResponse()
 | |
| 184 | +    stream.response.Unpack(execute_response)
 | |
| 185 | + | |
| 186 | +    for output_file_response in execute_response.result.output_files:
 | |
| 187 | +        path = os.path.join(output_directory, output_file_response.path)
 | |
| 188 | + | |
| 189 | +        if not os.path.exists(os.path.dirname(path)):
 | |
| 190 | + | |
| 191 | +            try:
 | |
| 192 | +                os.makedirs(os.path.dirname(path))
 | |
| 193 | + | |
| 194 | +            except OSError as exc:
 | |
| 195 | +                if exc.errno != errno.EEXIST:
 | |
| 196 | +                    raise
 | |
| 197 | + | |
| 198 | +        with open(path, 'wb+') as f:
 | |
| 199 | +            write_fetch_blob(f, stub, output_file_response.digest, instance_name)
 | |
| 200 | + | |
| 201 | +        if output_file_response.path in output_executeables:
 | |
| 202 | +            st = os.stat(path)
 | |
| 203 | +            os.chmod(path, st.st_mode | stat.S_IXUSR) | 
| ... | ... | @@ -104,14 +104,15 @@ class BotSession: | 
| 104 | 104 |              self._update_lease_from_server(lease)
 | 
| 105 | 105 |  | 
| 106 | 106 |      def update_bot_session(self):
 | 
| 107 | +        self.logger.debug("Updating bot session: {}".format(self._bot_id))
 | |
| 107 | 108 |          session = self._interface.update_bot_session(self.get_pb2())
 | 
| 108 | -        for lease in session.leases:
 | |
| 109 | -            self._update_lease_from_server(lease)
 | |
| 110 | - | |
| 111 | -        for k, v in self._leases.items():
 | |
| 109 | +        for k, v in list(self._leases.items()):
 | |
| 112 | 110 |              if v.state == LeaseState.COMPLETED.value:
 | 
| 113 | 111 |                  del self._leases[k]
 | 
| 114 | 112 |  | 
| 113 | +        for lease in session.leases:
 | |
| 114 | +            self._update_lease_from_server(lease)
 | |
| 115 | + | |
| 115 | 116 |      def get_pb2(self):
 | 
| 116 | 117 |          leases = list(self._leases.values())
 | 
| 117 | 118 |          if not leases:
 | 
| ... | ... | @@ -134,12 +135,16 @@ class BotSession: | 
| 134 | 135 |          # TODO: Compare with previous state of lease
 | 
| 135 | 136 |          if lease.state == LeaseState.PENDING.value:
 | 
| 136 | 137 |              lease.state = LeaseState.ACTIVE.value
 | 
| 137 | -            asyncio.ensure_future(self.create_work(lease))
 | |
| 138 | 138 |              self._leases[lease.id] = lease
 | 
| 139 | +            self.update_bot_session()
 | |
| 140 | +            asyncio.ensure_future(self.create_work(lease))
 | |
| 139 | 141 |  | 
| 140 | 142 |      async def create_work(self, lease):
 | 
| 141 | 143 |          self.logger.debug("Work created: {}".format(lease.id))
 | 
| 142 | -        lease = await self._work(self._context, lease)
 | |
| 144 | + | |
| 145 | +        loop = asyncio.get_event_loop()
 | |
| 146 | +        lease = await loop.run_in_executor(None, self._work, self._context, lease)
 | |
| 147 | + | |
| 143 | 148 |          self.logger.debug("Work complete: {}".format(lease.id))
 | 
| 144 | 149 |          self.lease_completed(lease)
 | 
| 145 | 150 |  | 
| ... | ... | @@ -21,26 +21,25 @@ from enum import Enum | 
| 21 | 21 |  | 
| 22 | 22 |  from google.protobuf import any_pb2
 | 
| 23 | 23 |  | 
| 24 | -from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ExecuteOperationMetadata
 | |
| 25 | -from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ExecuteResponse
 | |
| 24 | +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 | |
| 26 | 25 |  from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
 | 
| 27 | 26 |  from buildgrid._protos.google.longrunning import operations_pb2
 | 
| 28 | 27 |  | 
| 29 | 28 |  | 
| 30 | 29 |  class ExecuteStage(Enum):
 | 
| 31 | -    UNKNOWN = ExecuteOperationMetadata.Stage.Value('UNKNOWN')
 | |
| 30 | +    UNKNOWN = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('UNKNOWN')
 | |
| 32 | 31 |  | 
| 33 | 32 |      # Checking the result against the cache.
 | 
| 34 | -    CACHE_CHECK = ExecuteOperationMetadata.Stage.Value('CACHE_CHECK')
 | |
| 33 | +    CACHE_CHECK = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('CACHE_CHECK')
 | |
| 35 | 34 |  | 
| 36 | 35 |      # Currently idle, awaiting a free machine to execute.
 | 
| 37 | -    QUEUED = ExecuteOperationMetadata.Stage.Value('QUEUED')
 | |
| 36 | +    QUEUED = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('QUEUED')
 | |
| 38 | 37 |  | 
| 39 | 38 |      # Currently being executed by a worker.
 | 
| 40 | -    EXECUTING = ExecuteOperationMetadata.Stage.Value('EXECUTING')
 | |
| 39 | +    EXECUTING = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('EXECUTING')
 | |
| 41 | 40 |  | 
| 42 | 41 |      # Finished execution.
 | 
| 43 | -    COMPLETED = ExecuteOperationMetadata.Stage.Value('COMPLETED')
 | |
| 42 | +    COMPLETED = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('COMPLETED')
 | |
| 44 | 43 |  | 
| 45 | 44 |  | 
| 46 | 45 |  class BotStatus(Enum):
 | 
| ... | ... | @@ -80,13 +79,13 @@ class Job: | 
| 80 | 79 |      def __init__(self, action_digest, do_not_cache=False, message_queue=None):
 | 
| 81 | 80 |          self.lease = None
 | 
| 82 | 81 |          self.logger = logging.getLogger(__name__)
 | 
| 82 | +        self.n_tries = 0
 | |
| 83 | 83 |          self.result = None
 | 
| 84 | 84 |          self.result_cached = False
 | 
| 85 | 85 |  | 
| 86 | 86 |          self._action_digest = action_digest
 | 
| 87 | 87 |          self._do_not_cache = do_not_cache
 | 
| 88 | 88 |          self._execute_stage = ExecuteStage.UNKNOWN
 | 
| 89 | -        self._n_tries = 0
 | |
| 90 | 89 |          self._name = str(uuid.uuid4())
 | 
| 91 | 90 |          self._operation = operations_pb2.Operation(name=self._name)
 | 
| 92 | 91 |          self._operation_update_queues = []
 | 
| ... | ... | @@ -122,15 +121,16 @@ class Job: | 
| 122 | 121 |          self._operation.metadata.CopyFrom(self._pack_any(self.get_operation_meta()))
 | 
| 123 | 122 |          if self.result is not None:
 | 
| 124 | 123 |              self._operation.done = True
 | 
| 125 | -            response = ExecuteResponse()
 | |
| 126 | -            self.result.Unpack(response.result)
 | |
| 127 | -            response.cached_result = self.result_cached
 | |
| 124 | +            action_result = remote_execution_pb2.ActionResult()
 | |
| 125 | +            self.result.Unpack(action_result)
 | |
| 126 | +            response = remote_execution_pb2.ExecuteResponse(result=action_result,
 | |
| 127 | +                                                            cached_result=self.result_cached)
 | |
| 128 | 128 |              self._operation.response.CopyFrom(self._pack_any(response))
 | 
| 129 | 129 |  | 
| 130 | 130 |          return self._operation
 | 
| 131 | 131 |  | 
| 132 | 132 |      def get_operation_meta(self):
 | 
| 133 | -        meta = ExecuteOperationMetadata()
 | |
| 133 | +        meta = remote_execution_pb2.ExecuteOperationMetadata()
 | |
| 134 | 134 |          meta.stage = self._execute_stage.value
 | 
| 135 | 135 |          meta.action_digest.CopyFrom(self._action_digest)
 | 
| 136 | 136 |  | 
| ... | ... | @@ -25,7 +25,6 @@ from collections import deque | 
| 25 | 25 |  | 
| 26 | 26 |  from google.protobuf import any_pb2
 | 
| 27 | 27 |  | 
| 28 | -from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ActionResult
 | |
| 29 | 28 |  from buildgrid._protos.google.longrunning import operations_pb2
 | 
| 30 | 29 |  | 
| 31 | 30 |  from .job import ExecuteStage, LeaseState
 | 
| ... | ... | @@ -83,9 +82,7 @@ class Scheduler: | 
| 83 | 82 |          job.update_execute_stage(ExecuteStage.COMPLETED)
 | 
| 84 | 83 |          self.jobs[name] = job
 | 
| 85 | 84 |          if not job.do_not_cache and self.action_cache is not None:
 | 
| 86 | -            action_result = ActionResult()
 | |
| 87 | -            result.Unpack(action_result)
 | |
| 88 | -            self.action_cache.put_action_result(job.action_digest, action_result)
 | |
| 85 | +            self.action_cache.put_action_result(job.action_digest, result)
 | |
| 89 | 86 |  | 
| 90 | 87 |      def get_operations(self):
 | 
| 91 | 88 |          response = operations_pb2.ListOperationsResponse()
 | 
| ... | ... | @@ -94,7 +91,7 @@ class Scheduler: | 
| 94 | 91 |          return response
 | 
| 95 | 92 |  | 
| 96 | 93 |      def update_job_lease_state(self, name, state):
 | 
| 97 | -        job = self.jobs.get(name)
 | |
| 94 | +        job = self.jobs[name]
 | |
| 98 | 95 |          job.lease.state = state
 | 
| 99 | 96 |          self.jobs[name] = job
 | 
| 100 | 97 |  | 
| 1 | +# Copyright (C) 2018 Bloomberg LP
 | |
| 2 | +#
 | |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License");
 | |
| 4 | +# you may not use this file except in compliance with the License.
 | |
| 5 | +# You may obtain a copy of the License at
 | |
| 6 | +#
 | |
| 7 | +#  <http://www.apache.org/licenses/LICENSE-2.0>
 | |
| 8 | +#
 | |
| 9 | +# Unless required by applicable law or agreed to in writing, software
 | |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS,
 | |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| 12 | +# See the License for the specific language governing permissions and
 | |
| 13 | +# limitations under the License.
 | |
| 14 | + | |
| 15 | +import os
 | |
| 16 | + | |
| 17 | +from buildgrid.settings import HASH
 | |
| 18 | +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 | |
| 19 | +from buildgrid._protos.google.bytestream import bytestream_pb2
 | |
| 20 | + | |
| 21 | + | |
| 22 | +def gen_fetch_blob(stub, digest, instance_name=""):
 | |
| 23 | +    """ Generates byte stream from a fetch blob request
 | |
| 24 | +    """
 | |
| 25 | + | |
| 26 | +    resource_name = os.path.join(instance_name, 'blobs', digest.hash, str(digest.size_bytes))
 | |
| 27 | +    request = bytestream_pb2.ReadRequest(resource_name=resource_name,
 | |
| 28 | +                                         read_offset=0)
 | |
| 29 | +    for response in stub.Read(request):
 | |
| 30 | +        yield response.data
 | |
| 31 | + | |
| 32 | + | |
| 33 | +def write_fetch_directory(directory, stub, digest, instance_name=""):
 | |
| 34 | +    """ Given a directory digest, fetches files and writes them to a directory
 | |
| 35 | +    """
 | |
| 36 | +    # TODO: Extend to symlinks and inner directories
 | |
| 37 | +    # pathlib.Path('/my/directory').mkdir(parents=True, exist_ok=True)
 | |
| 38 | + | |
| 39 | +    directory_pb2 = remote_execution_pb2.Directory()
 | |
| 40 | +    directory_pb2 = parse_to_pb2_from_fetch(directory_pb2, stub, digest, instance_name)
 | |
| 41 | + | |
| 42 | +    for file_node in directory_pb2.files:
 | |
| 43 | +        path = os.path.join(directory, file_node.name)
 | |
| 44 | +        with open(path, 'wb') as f:
 | |
| 45 | +            write_fetch_blob(f, stub, file_node.digest, instance_name)
 | |
| 46 | + | |
| 47 | + | |
| 48 | +def write_fetch_blob(out, stub, digest, instance_name=""):
 | |
| 49 | +    """ Given an output buffer, fetches blob and writes to buffer
 | |
| 50 | +    """
 | |
| 51 | + | |
| 52 | +    for stream in gen_fetch_blob(stub, digest, instance_name):
 | |
| 53 | +        out.write(stream)
 | |
| 54 | + | |
| 55 | +    out.flush()
 | |
| 56 | +    assert digest.size_bytes == os.fstat(out.fileno()).st_size
 | |
| 57 | + | |
| 58 | + | |
| 59 | +def parse_to_pb2_from_fetch(pb2, stub, digest, instance_name=""):
 | |
| 60 | +    """ Fetches stream and parses it into given pb2
 | |
| 61 | +    """
 | |
| 62 | + | |
| 63 | +    stream_bytes = b''
 | |
| 64 | +    for stream in gen_fetch_blob(stub, digest, instance_name):
 | |
| 65 | +        stream_bytes += stream
 | |
| 66 | + | |
| 67 | +    pb2.ParseFromString(stream_bytes)
 | |
| 68 | +    return pb2
 | |
| 69 | + | |
| 70 | + | |
| 71 | +def create_digest(bytes_to_digest):
 | |
| 72 | +    """ Creates a hash based on the hex digest and returns the digest
 | |
| 73 | +    """
 | |
| 74 | +    return remote_execution_pb2.Digest(hash=HASH(bytes_to_digest).hexdigest(),
 | |
| 75 | +                                       size_bytes=len(bytes_to_digest))
 | |
| 76 | + | |
| 77 | + | |
| 78 | +def merkle_maker(directory):
 | |
| 79 | +    """ Walks thorugh given directory, yielding the binary and digest
 | |
| 80 | +    """
 | |
| 81 | +    directory_pb2 = remote_execution_pb2.Directory()
 | |
| 82 | +    for (dir_path, dir_names, file_names) in os.walk(directory):
 | |
| 83 | + | |
| 84 | +        for file_name in file_names:
 | |
| 85 | +            file_path = os.path.join(dir_path, file_name)
 | |
| 86 | +            chunk = read_file(file_path)
 | |
| 87 | +            file_digest = create_digest(chunk)
 | |
| 88 | +            directory_pb2.files.extend([file_maker(file_path, file_digest)])
 | |
| 89 | +            yield chunk, file_digest
 | |
| 90 | + | |
| 91 | +        for inner_dir in dir_names:
 | |
| 92 | +            inner_dir_path = os.path.join(dir_path, inner_dir)
 | |
| 93 | +            yield from merkle_maker(inner_dir_path)
 | |
| 94 | + | |
| 95 | +    directory_string = directory_pb2.SerializeToString()
 | |
| 96 | + | |
| 97 | +    yield directory_string, create_digest(directory_string)
 | |
| 98 | + | |
| 99 | + | |
| 100 | +def file_maker(file_path, file_digest):
 | |
| 101 | +    """ Creates a File Node
 | |
| 102 | +    """
 | |
| 103 | +    _, file_name = os.path.split(file_path)
 | |
| 104 | +    return remote_execution_pb2.FileNode(name=file_name,
 | |
| 105 | +                                         digest=file_digest,
 | |
| 106 | +                                         is_executable=os.access(file_path, os.X_OK))
 | |
| 107 | + | |
| 108 | + | |
| 109 | +def read_file(read):
 | |
| 110 | +    with open(read, 'rb') as f:
 | |
| 111 | +        return f.read() | 
| ... | ... | @@ -100,12 +100,14 @@ def test_list_operations_with_result(instance, execute_request, context): | 
| 100 | 100 |      action_result = remote_execution_pb2.ActionResult()
 | 
| 101 | 101 |      output_file = remote_execution_pb2.OutputFile(path='unicorn')
 | 
| 102 | 102 |      action_result.output_files.extend([output_file])
 | 
| 103 | -    instance._instance._scheduler.jobs[response_execute.name].result = _pack_any(action_result)
 | |
| 103 | + | |
| 104 | +    instance._instance._scheduler.job_complete(response_execute.name, _pack_any(action_result))
 | |
| 104 | 105 |  | 
| 105 | 106 |      request = operations_pb2.ListOperationsRequest()
 | 
| 106 | 107 |      response = instance.ListOperations(request, context)
 | 
| 107 | 108 |  | 
| 108 | 109 |      assert response.operations[0].name == response_execute.name
 | 
| 110 | + | |
| 109 | 111 |      execute_response = remote_execution_pb2.ExecuteResponse()
 | 
| 110 | 112 |      response.operations[0].response.Unpack(execute_response)
 | 
| 111 | 113 |      assert execute_response.result == action_result
 | 
