[Notes] [Git][BuildGrid/buildgrid][finn/cas-commands] 4 commits: New commands. Can upload files and commands to CAS.



Title: GitLab

finnball pushed to branch finn/cas-commands at BuildGrid / buildgrid

Commits:

13 changed files:

Changes:

  • README.rst
    ... ... @@ -38,7 +38,7 @@ In one terminal, start a server::
    38 38
     
    
    39 39
     In another terminal, send a request for work::
    
    40 40
     
    
    41
    -  bgd execute request
    
    41
    +  bgd execute request-dummy
    
    42 42
     
    
    43 43
     The stage should show as `QUEUED` as it awaits a bot to pick up the work::
    
    44 44
     
    
    ... ... @@ -51,3 +51,35 @@ Create a bot session::
    51 51
     Show the work as completed::
    
    52 52
     
    
    53 53
       bgd execute list
    
    54
    +
    
    55
    +Instructions for a Simple Build
    
    56
    +-------------------------------
    
    57
    +
    
    58
    +This example covers a simple build. The user will upload a directory containing a C file and a command to the CAS. The bot will then fetch the uploaded directory and command which will then be run inside a temporary directory. The result will then be uploaded to the CAS and downloaded by the user. This is an early demo and still lacks a few features such as symlink support and checking to see if files exist in the CAS before executing a command.
    
    59
    +
    
    60
    +Create a new directory called `test-buildgrid/` and place the following C file in it called `hello.c`::
    
    61
    +
    
    62
    +  #include <stdio.h>
    
    63
    +  int main()
    
    64
    +  {
    
    65
    +   printf("Hello, World!");
    
    66
    +   return 0;
    
    67
    +  }
    
    68
    +
    
    69
    +Now start a BuildGrid server, passing it a directory it can write a CAS to::
    
    70
    +
    
    71
    +  bgd server start --cas disk --cas-cache disk --cas-disk-directory /path/to/empty/directory
    
    72
    +
    
    73
    +Start the following bot session::
    
    74
    +
    
    75
    +  bgd bot temp-directory
    
    76
    +
    
    77
    +Upload the directory containing the C file::
    
    78
    +
    
    79
    +  bgd cas upload-dir /path/to/test-buildgrid
    
    80
    +
    
    81
    +Now we send an execution request to the bot with the name of the epxected `output-file`, a boolean describing if it is executeable, the path to the directory we uploaded in order to calculate the digest and finally the command to run on the bot::
    
    82
    +
    
    83
    +  bgd execute command --output-file hello True /path/to/test-buildgrid -- gcc -Wall hello.c -o hello
    
    84
    +
    
    85
    +The resulting executeable should have returned to a new directory called `testing/`

  • app/bots/__init__.py

  • app/bots/buildbox.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +import os
    
    17
    +import subprocess
    
    18
    +import tempfile
    
    19
    +import grpc
    
    20
    +
    
    21
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    22
    +from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    23
    +from buildgrid.utils import read_file
    
    24
    +from google.protobuf import any_pb2
    
    25
    +
    
    26
    +
    
    27
    +def work_buildbox(context, lease):
    
    28
    +    logger = context.logger
    
    29
    +
    
    30
    +    action_any = lease.payload
    
    31
    +    action = remote_execution_pb2.Action()
    
    32
    +    action_any.Unpack(action)
    
    33
    +
    
    34
    +    cert_server = read_file(context.server_cert)
    
    35
    +    cert_client = read_file(context.client_cert)
    
    36
    +    key_client = read_file(context.client_key)
    
    37
    +
    
    38
    +    # create server credentials
    
    39
    +    credentials = grpc.ssl_channel_credentials(root_certificates=cert_server,
    
    40
    +                                               private_key=key_client,
    
    41
    +                                               certificate_chain=cert_client)
    
    42
    +
    
    43
    +    channel = grpc.secure_channel('{}:{}'.format(context.remote, context.port), credentials)
    
    44
    +
    
    45
    +    stub = bytestream_pb2_grpc.ByteStreamStub(channel)
    
    46
    +
    
    47
    +    remote_command = _buildstream_fetch_command(context.local_cas, stub, action.command_digest)
    
    48
    +    environment = dict((x.name, x.value) for x in remote_command.environment_variables)
    
    49
    +    logger.debug("command hash: {}".format(action.command_digest.hash))
    
    50
    +    logger.debug("vdir hash: {}".format(action.input_root_digest.hash))
    
    51
    +    logger.debug("\n{}".format(' '.join(remote_command.arguments)))
    
    52
    +
    
    53
    +    command = ['buildbox',
    
    54
    +               '--remote={}'.format('https://{}:{}'.format(context.remote, context.port)),
    
    55
    +               '--server-cert={}'.format(context.server_cert),
    
    56
    +               '--client-key={}'.format(context.client_key),
    
    57
    +               '--client-cert={}'.format(context.client_cert),
    
    58
    +               '--local={}'.format(context.local_cas),
    
    59
    +               '--chdir={}'.format(environment['PWD']),
    
    60
    +               context.fuse_dir]
    
    61
    +
    
    62
    +    command.extend(remote_command.arguments)
    
    63
    +
    
    64
    +    logger.debug(' '.join(command))
    
    65
    +    logger.debug("Input root digest:\n{}".format(action.input_root_digest))
    
    66
    +    logger.info("Launching process")
    
    67
    +
    
    68
    +    proc = subprocess.Popen(command,
    
    69
    +                            stdin=subprocess.PIPE,
    
    70
    +                            stdout=subprocess.PIPE)
    
    71
    +    std_send = action.input_root_digest.SerializeToString()
    
    72
    +    std_out, _ = proc.communicate(std_send)
    
    73
    +
    
    74
    +    output_root_digest = remote_execution_pb2.Digest()
    
    75
    +    output_root_digest.ParseFromString(std_out)
    
    76
    +    logger.debug("Output root digest: {}".format(output_root_digest))
    
    77
    +
    
    78
    +    output_file = remote_execution_pb2.OutputDirectory(tree_digest=output_root_digest)
    
    79
    +
    
    80
    +    action_result = remote_execution_pb2.ActionResult()
    
    81
    +    action_result.output_directories.extend([output_file])
    
    82
    +
    
    83
    +    action_result_any = any_pb2.Any()
    
    84
    +    action_result_any.Pack(action_result)
    
    85
    +
    
    86
    +    lease.result.CopyFrom(action_result_any)
    
    87
    +
    
    88
    +    return lease
    
    89
    +
    
    90
    +
    
    91
    +def _buildstream_fetch_blob(remote, digest, out):
    
    92
    +    resource_name = os.path.join(digest.hash, str(digest.size_bytes))
    
    93
    +    request = bytestream_pb2.ReadRequest()
    
    94
    +    request.resource_name = resource_name
    
    95
    +    request.read_offset = 0
    
    96
    +    for response in remote.Read(request):
    
    97
    +        out.write(response.data)
    
    98
    +
    
    99
    +    out.flush()
    
    100
    +    assert digest.size_bytes == os.fstat(out.fileno()).st_size
    
    101
    +
    
    102
    +
    
    103
    +def _buildstream_fetch_command(casdir, remote, digest):
    
    104
    +    with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as out:
    
    105
    +        _buildstream_fetch_blob(remote, digest, out)
    
    106
    +        remote_command = remote_execution_pb2.Command()
    
    107
    +        with open(out.name, 'rb') as f:
    
    108
    +            remote_command.ParseFromString(f.read())
    
    109
    +        return remote_command
    
    110
    +
    
    111
    +
    
    112
    +def _buildstream_fetch_action(casdir, remote, digest):
    
    113
    +    with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as out:
    
    114
    +        _buildstream_fetch_blob(remote, digest, out)
    
    115
    +        remote_action = remote_execution_pb2.Action()
    
    116
    +        with open(out.name, 'rb') as f:
    
    117
    +            remote_action.ParseFromString(f.read())
    
    118
    +        return remote_action

  • app/bots/dummy.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +import random
    
    17
    +import time
    
    18
    +
    
    19
    +
    
    20
    +def work_dummy(context, lease):
    
    21
    +    """ Just returns lease after some random time
    
    22
    +    """
    
    23
    +    time.sleep(random.randint(1, 5))
    
    24
    +    return lease

  • app/bots/temp_directory.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +import os
    
    17
    +import subprocess
    
    18
    +import tempfile
    
    19
    +
    
    20
    +from buildgrid.utils import read_file, create_digest, write_fetch_directory, parse_to_pb2_from_fetch
    
    21
    +
    
    22
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    23
    +from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
    
    24
    +from google.protobuf import any_pb2
    
    25
    +
    
    26
    +
    
    27
    +def work_temp_directory(context, lease):
    
    28
    +    """ Bot downloads directories and files into a temp directory,
    
    29
    +    then uploads results back to CAS
    
    30
    +    """
    
    31
    +
    
    32
    +    instance_name = context.instance_name
    
    33
    +    stub_bytestream = bytestream_pb2_grpc.ByteStreamStub(context.channel)
    
    34
    +
    
    35
    +    action_digest = remote_execution_pb2.Digest()
    
    36
    +    lease.payload.Unpack(action_digest)
    
    37
    +
    
    38
    +    action = remote_execution_pb2.Action()
    
    39
    +
    
    40
    +    action = parse_to_pb2_from_fetch(action, stub_bytestream, action_digest, instance_name)
    
    41
    +
    
    42
    +    with tempfile.TemporaryDirectory() as temp_dir:
    
    43
    +
    
    44
    +        command = remote_execution_pb2.Command()
    
    45
    +        command = parse_to_pb2_from_fetch(command, stub_bytestream, action.command_digest, instance_name)
    
    46
    +
    
    47
    +        arguments = "cd {} &&".format(temp_dir)
    
    48
    +
    
    49
    +        for argument in command.arguments:
    
    50
    +            arguments += " {}".format(argument)
    
    51
    +
    
    52
    +        context.logger.info(arguments)
    
    53
    +
    
    54
    +        write_fetch_directory(temp_dir, stub_bytestream, action.input_root_digest, instance_name)
    
    55
    +
    
    56
    +        proc = subprocess.Popen(arguments,
    
    57
    +                         shell=True,
    
    58
    +                         stdin=subprocess.PIPE,
    
    59
    +                         stdout=subprocess.PIPE)
    
    60
    +
    
    61
    +        # TODO: Should return the std_out to the user
    
    62
    +        proc.communicate()
    
    63
    +
    
    64
    +        result = remote_execution_pb2.ActionResult()
    
    65
    +        requests = []
    
    66
    +        for output_file in command.output_files:
    
    67
    +            path = os.path.join(temp_dir, output_file)
    
    68
    +            chunk = read_file(path)
    
    69
    +
    
    70
    +            digest = create_digest(chunk)
    
    71
    +
    
    72
    +            result.output_files.extend([remote_execution_pb2.OutputFile(path=output_file,
    
    73
    +                                                                        digest=digest)])
    
    74
    +
    
    75
    +            requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
    
    76
    +                digest=digest, data=chunk))
    
    77
    +
    
    78
    +        request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
    
    79
    +                                                               requests=requests)
    
    80
    +
    
    81
    +        stub_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel)
    
    82
    +        stub_cas.BatchUpdateBlobs(request)
    
    83
    +
    
    84
    +        result_any = any_pb2.Any()
    
    85
    +        result_any.Pack(result)
    
    86
    +
    
    87
    +        lease.result.CopyFrom(result_any)
    
    88
    +
    
    89
    +    return lease

  • app/commands/cmd_bot.py
    ... ... @@ -22,23 +22,17 @@ Bot command
    22 22
     Create a bot interface and request work
    
    23 23
     """
    
    24 24
     
    
    25
    -import asyncio
    
    26 25
     import logging
    
    27
    -import os
    
    28
    -import random
    
    29
    -import subprocess
    
    30
    -import tempfile
    
    26
    +
    
    31 27
     from pathlib import Path, PurePath
    
    32 28
     
    
    33 29
     import click
    
    34 30
     import grpc
    
    35
    -from google.protobuf import any_pb2
    
    36 31
     
    
    37 32
     from buildgrid.bot import bot, bot_interface
    
    38 33
     from buildgrid.bot.bot_session import BotSession, Device, Worker
    
    39
    -from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    40
    -from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    41 34
     
    
    35
    +from ..bots import buildbox, dummy, temp_directory
    
    42 36
     from ..cli import pass_context
    
    43 37
     
    
    44 38
     
    
    ... ... @@ -53,6 +47,7 @@ def cli(context, host, port, parent):
    53 47
     
    
    54 48
         context.logger = logging.getLogger(__name__)
    
    55 49
         context.logger.info("Starting on port {}".format(port))
    
    50
    +    context.channel = channel
    
    56 51
     
    
    57 52
         worker = Worker()
    
    58 53
         worker.add_device(Device())
    
    ... ... @@ -63,16 +58,33 @@ def cli(context, host, port, parent):
    63 58
         context.bot_session = bot_session
    
    64 59
     
    
    65 60
     
    
    66
    -@cli.command('dummy', short_help="Create a dummy bot session")
    
    61
    +@cli.command('dummy', short_help='Create a dummy bot session which just returns lease')
    
    67 62
     @pass_context
    
    68
    -def dummy(context):
    
    63
    +def run_dummy(context):
    
    69 64
         """
    
    70 65
         Simple dummy client. Creates a session, accepts leases, does fake work and
    
    71 66
         updates the server.
    
    72 67
         """
    
    73 68
         try:
    
    74 69
             b = bot.Bot(context.bot_session)
    
    75
    -        b.session(_work_dummy,
    
    70
    +        b.session(dummy.work_dummy,
    
    71
    +                  context)
    
    72
    +
    
    73
    +    except KeyboardInterrupt:
    
    74
    +        pass
    
    75
    +
    
    76
    +
    
    77
    +@cli.command('temp-directory', short_help='Runs commands in temp directory and uploads results')
    
    78
    +@click.option('--instance-name', default='testing')
    
    79
    +@pass_context
    
    80
    +def run_temp_directory(context, instance_name):
    
    81
    +    """ Downloads files and command from CAS and runs
    
    82
    +    in a temp directory, uploading result back to CAS
    
    83
    +    """
    
    84
    +    context.instance_name = instance_name
    
    85
    +    try:
    
    86
    +        b = bot.Bot(context.bot_session)
    
    87
    +        b.session(temp_directory.work_temp_directory,
    
    76 88
                       context)
    
    77 89
     
    
    78 90
         except KeyboardInterrupt:
    
    ... ... @@ -88,7 +100,7 @@ def dummy(context):
    88 100
     @click.option('--port', show_default=True, default=11001)
    
    89 101
     @click.option('--remote', show_default=True, default='localhost')
    
    90 102
     @pass_context
    
    91
    -def work_buildbox(context, remote, port, server_cert, client_key, client_cert, local_cas, fuse_dir):
    
    103
    +def run_buildbox(context, remote, port, server_cert, client_key, client_cert, local_cas, fuse_dir):
    
    92 104
         """
    
    93 105
         Uses BuildBox to run commands.
    
    94 106
         """
    
    ... ... @@ -104,104 +116,9 @@ def work_buildbox(context, remote, port, server_cert, client_key, client_cert, l
    104 116
         context.fuse_dir = fuse_dir
    
    105 117
     
    
    106 118
         try:
    
    107
    -        b = bot.Bot(bot_session=context.bot_session)
    
    108 119
     
    
    109
    -        b.session(work=_work_buildbox,
    
    120
    +        b = bot.Bot(context.bot_session)
    
    121
    +        b.session(work=buildbox.work_buildbox,
    
    110 122
                       context=context)
    
    111 123
         except KeyboardInterrupt:
    
    112 124
             pass
    113
    -
    
    114
    -
    
    115
    -async def _work_dummy(context, lease):
    
    116
    -    await asyncio.sleep(random.randint(1, 5))
    
    117
    -    return lease
    
    118
    -
    
    119
    -
    
    120
    -async def _work_buildbox(context, lease):
    
    121
    -    logger = context.logger
    
    122
    -
    
    123
    -    action_any = lease.payload
    
    124
    -    action = remote_execution_pb2.Action()
    
    125
    -    action_any.Unpack(action)
    
    126
    -
    
    127
    -    cert_server = _file_read(context.server_cert)
    
    128
    -    cert_client = _file_read(context.client_cert)
    
    129
    -    key_client = _file_read(context.client_key)
    
    130
    -
    
    131
    -    # create server credentials
    
    132
    -    credentials = grpc.ssl_channel_credentials(root_certificates=cert_server,
    
    133
    -                                               private_key=key_client,
    
    134
    -                                               certificate_chain=cert_client)
    
    135
    -
    
    136
    -    channel = grpc.secure_channel('{}:{}'.format(context.remote, context.port), credentials)
    
    137
    -
    
    138
    -    stub = bytestream_pb2_grpc.ByteStreamStub(channel)
    
    139
    -
    
    140
    -    remote_command = _fetch_command(context.local_cas, stub, action.command_digest)
    
    141
    -    environment = dict((x.name, x.value) for x in remote_command.environment_variables)
    
    142
    -    logger.debug("command hash: {}".format(action.command_digest.hash))
    
    143
    -    logger.debug("vdir hash: {}".format(action.input_root_digest.hash))
    
    144
    -    logger.debug("\n{}".format(' '.join(remote_command.arguments)))
    
    145
    -
    
    146
    -    command = ['buildbox',
    
    147
    -               '--remote={}'.format('https://{}:{}'.format(context.remote, context.port)),
    
    148
    -               '--server-cert={}'.format(context.server_cert),
    
    149
    -               '--client-key={}'.format(context.client_key),
    
    150
    -               '--client-cert={}'.format(context.client_cert),
    
    151
    -               '--local={}'.format(context.local_cas),
    
    152
    -               '--chdir={}'.format(environment['PWD']),
    
    153
    -               context.fuse_dir]
    
    154
    -
    
    155
    -    command.extend(remote_command.arguments)
    
    156
    -
    
    157
    -    logger.debug(' '.join(command))
    
    158
    -    logger.debug("Input root digest:\n{}".format(action.input_root_digest))
    
    159
    -    logger.info("Launching process")
    
    160
    -
    
    161
    -    proc = subprocess.Popen(command,
    
    162
    -                            stdin=subprocess.PIPE,
    
    163
    -                            stdout=subprocess.PIPE)
    
    164
    -    std_send = action.input_root_digest.SerializeToString()
    
    165
    -    std_out, _ = proc.communicate(std_send)
    
    166
    -
    
    167
    -    output_root_digest = remote_execution_pb2.Digest()
    
    168
    -    output_root_digest.ParseFromString(std_out)
    
    169
    -    logger.debug("Output root digest: {}".format(output_root_digest))
    
    170
    -
    
    171
    -    output_file = remote_execution_pb2.OutputDirectory(tree_digest=output_root_digest)
    
    172
    -
    
    173
    -    action_result = remote_execution_pb2.ActionResult()
    
    174
    -    action_result.output_directories.extend([output_file])
    
    175
    -
    
    176
    -    action_result_any = any_pb2.Any()
    
    177
    -    action_result_any.Pack(action_result)
    
    178
    -
    
    179
    -    lease.result.CopyFrom(action_result_any)
    
    180
    -
    
    181
    -    return lease
    
    182
    -
    
    183
    -
    
    184
    -def _fetch_blob(remote, digest, out):
    
    185
    -    resource_name = os.path.join(digest.hash, str(digest.size_bytes))
    
    186
    -    request = bytestream_pb2.ReadRequest()
    
    187
    -    request.resource_name = resource_name
    
    188
    -    request.read_offset = 0
    
    189
    -    for response in remote.Read(request):
    
    190
    -        out.write(response.data)
    
    191
    -
    
    192
    -    out.flush()
    
    193
    -    assert digest.size_bytes == os.fstat(out.fileno()).st_size
    
    194
    -
    
    195
    -
    
    196
    -def _fetch_command(casdir, remote, digest):
    
    197
    -    with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as out:
    
    198
    -        _fetch_blob(remote, digest, out)
    
    199
    -        remote_command = remote_execution_pb2.Command()
    
    200
    -        with open(out.name, 'rb') as f:
    
    201
    -            remote_command.ParseFromString(f.read())
    
    202
    -        return remote_command
    
    203
    -
    
    204
    -
    
    205
    -def _file_read(file_path):
    
    206
    -    with open(file_path, 'rb') as f:
    
    207
    -        return f.read()

  • app/commands/cmd_cas.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +"""
    
    17
    +Execute command
    
    18
    +=================
    
    19
    +
    
    20
    +Request work to be executed and monitor status of jobs.
    
    21
    +"""
    
    22
    +
    
    23
    +import logging
    
    24
    +import click
    
    25
    +import grpc
    
    26
    +
    
    27
    +from buildgrid.utils import merkle_maker, create_digest
    
    28
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    29
    +
    
    30
    +from ..cli import pass_context
    
    31
    +
    
    32
    +
    
    33
    +@click.group(short_help='Interact with the CAS')
    
    34
    +@click.option('--port', default='50051')
    
    35
    +@click.option('--host', default='localhost')
    
    36
    +@pass_context
    
    37
    +def cli(context, host, port):
    
    38
    +    context.logger = logging.getLogger(__name__)
    
    39
    +    context.logger.info("Starting on port {}".format(port))
    
    40
    +
    
    41
    +    context.channel = grpc.insecure_channel('{}:{}'.format(host, port))
    
    42
    +    context.port = port
    
    43
    +
    
    44
    +
    
    45
    +@cli.command('upload-files', short_help='Upload files')
    
    46
    +@click.argument('files', nargs=-1, type=click.File('rb'))
    
    47
    +@click.option('--instance-name', default='testing')
    
    48
    +@pass_context
    
    49
    +def upload_files(context, files, instance_name):
    
    50
    +    stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel)
    
    51
    +
    
    52
    +    requests = []
    
    53
    +    for file in files:
    
    54
    +        chunk = file.read()
    
    55
    +        requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
    
    56
    +            digest=create_digest(chunk), data=chunk))
    
    57
    +
    
    58
    +    request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
    
    59
    +                                                           requests=requests)
    
    60
    +
    
    61
    +    context.logger.info("Sending: {}".format(request))
    
    62
    +    response = stub.BatchUpdateBlobs(request)
    
    63
    +    context.logger.info("Response: {}".format(response))
    
    64
    +
    
    65
    +
    
    66
    +@cli.command('upload-dir', short_help='Upload files')
    
    67
    +@click.argument('directory')
    
    68
    +@click.option('--instance-name', default='testing')
    
    69
    +@pass_context
    
    70
    +def upload_dir(context, directory, instance_name):
    
    71
    +    context.logger.info("Uploading directory to cas")
    
    72
    +    stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel)
    
    73
    +
    
    74
    +    requests = []
    
    75
    +
    
    76
    +    for chunk, file_digest in merkle_maker(directory):
    
    77
    +        requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
    
    78
    +            digest=file_digest, data=chunk))
    
    79
    +
    
    80
    +    request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
    
    81
    +                                                           requests=requests)
    
    82
    +
    
    83
    +    context.logger.info("Request:\n{}".format(request))
    
    84
    +    response = stub.BatchUpdateBlobs(request)
    
    85
    +    context.logger.info("Response:\n{}".format(response))

  • app/commands/cmd_execute.py
    ... ... @@ -22,18 +22,22 @@ Execute command
    22 22
     Request work to be executed and monitor status of jobs.
    
    23 23
     """
    
    24 24
     
    
    25
    +import errno
    
    25 26
     import logging
    
    26
    -
    
    27
    +import stat
    
    28
    +import os
    
    27 29
     import click
    
    28 30
     import grpc
    
    29 31
     
    
    32
    +from buildgrid.utils import merkle_maker, create_digest, write_fetch_blob
    
    30 33
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    34
    +from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
    
    31 35
     from buildgrid._protos.google.longrunning import operations_pb2, operations_pb2_grpc
    
    32 36
     
    
    33 37
     from ..cli import pass_context
    
    34 38
     
    
    35 39
     
    
    36
    -@click.group(short_help="Simple execute client")
    
    40
    +@click.group(short_help='Simple execute client')
    
    37 41
     @click.option('--port', default='50051')
    
    38 42
     @click.option('--host', default='localhost')
    
    39 43
     @pass_context
    
    ... ... @@ -45,12 +49,12 @@ def cli(context, host, port):
    45 49
         context.port = port
    
    46 50
     
    
    47 51
     
    
    48
    -@cli.command('request', short_help="Send a dummy action")
    
    52
    +@cli.command('request-dummy', short_help='Send a dummy action')
    
    49 53
     @click.option('--number', default=1)
    
    50 54
     @click.option('--instance-name', default='testing')
    
    51
    -@click.option('--wait-for-completion', is_flag=True)
    
    55
    +@click.option('--wait-for-completion', is_flag=True, help='Stream updates until jobs are completed')
    
    52 56
     @pass_context
    
    53
    -def request(context, number, instance_name, wait_for_completion):
    
    57
    +def request_dummy(context, number, instance_name, wait_for_completion):
    
    54 58
         action_digest = remote_execution_pb2.Digest()
    
    55 59
     
    
    56 60
         context.logger.info("Sending execution request...\n")
    
    ... ... @@ -72,7 +76,7 @@ def request(context, number, instance_name, wait_for_completion):
    72 76
                 context.logger.info(next(response))
    
    73 77
     
    
    74 78
     
    
    75
    -@cli.command('status', short_help="Get the status of an operation")
    
    79
    +@cli.command('status', short_help='Get the status of an operation')
    
    76 80
     @click.argument('operation-name')
    
    77 81
     @pass_context
    
    78 82
     def operation_status(context, operation_name):
    
    ... ... @@ -85,7 +89,7 @@ def operation_status(context, operation_name):
    85 89
         context.logger.info(response)
    
    86 90
     
    
    87 91
     
    
    88
    -@cli.command('list', short_help="List operations")
    
    92
    +@cli.command('list', short_help='List operations')
    
    89 93
     @pass_context
    
    90 94
     def list_operations(context):
    
    91 95
         context.logger.info("Getting list of operations")
    
    ... ... @@ -103,7 +107,7 @@ def list_operations(context):
    103 107
             context.logger.info(op)
    
    104 108
     
    
    105 109
     
    
    106
    -@cli.command('wait', short_help="Streams an operation until it is complete")
    
    110
    +@cli.command('wait', short_help='Streams an operation until it is complete')
    
    107 111
     @click.argument('operation-name')
    
    108 112
     @pass_context
    
    109 113
     def wait_execution(context, operation_name):
    
    ... ... @@ -114,3 +118,86 @@ def wait_execution(context, operation_name):
    114 118
     
    
    115 119
         for stream in response:
    
    116 120
             context.logger.info(stream)
    
    121
    +
    
    122
    +
    
    123
    +@cli.command('command', short_help='Send a command to be executed')
    
    124
    +@click.argument('input-root')
    
    125
    +@click.argument('commands', nargs=-1)
    
    126
    +@click.option('--output-file', nargs=2, type=(str, bool), multiple=True,
    
    127
    +              help='{Expected output file, is_executeable flag}')
    
    128
    +@click.option('--output-directory', default='testing', help='Output directory for output files')
    
    129
    +@click.option('--instance-name', default='testing')
    
    130
    +@pass_context
    
    131
    +def command(context, input_root, commands, output_file, output_directory, instance_name):
    
    132
    +    stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
    
    133
    +
    
    134
    +    execute_command = remote_execution_pb2.Command()
    
    135
    +
    
    136
    +    for arg in commands:
    
    137
    +        execute_command.arguments.extend([arg])
    
    138
    +
    
    139
    +    output_executeables = []
    
    140
    +    for file, is_executeable in output_file:
    
    141
    +        execute_command.output_files.extend([file])
    
    142
    +        if is_executeable:
    
    143
    +            output_executeables.append(file)
    
    144
    +
    
    145
    +    command_digest = create_digest(execute_command.SerializeToString())
    
    146
    +    context.logger.info(command_digest)
    
    147
    +
    
    148
    +    # TODO: Check for missing blobs
    
    149
    +    digest = None
    
    150
    +    for _, digest in merkle_maker(input_root):
    
    151
    +        pass
    
    152
    +
    
    153
    +    action = remote_execution_pb2.Action(command_digest=command_digest,
    
    154
    +                                         input_root_digest=digest,
    
    155
    +                                         do_not_cache=True)
    
    156
    +
    
    157
    +    action_digest = create_digest(action.SerializeToString())
    
    158
    +
    
    159
    +    context.logger.info("Sending execution request...\n")
    
    160
    +
    
    161
    +    requests = []
    
    162
    +    requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
    
    163
    +        digest=command_digest, data=execute_command.SerializeToString()))
    
    164
    +
    
    165
    +    requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
    
    166
    +        digest=action_digest, data=action.SerializeToString()))
    
    167
    +
    
    168
    +    request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
    
    169
    +                                                           requests=requests)
    
    170
    +    remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel).BatchUpdateBlobs(request)
    
    171
    +
    
    172
    +    request = remote_execution_pb2.ExecuteRequest(instance_name=instance_name,
    
    173
    +                                                  action_digest=action_digest,
    
    174
    +                                                  skip_cache_lookup=True)
    
    175
    +    response = stub.Execute(request)
    
    176
    +
    
    177
    +    stub = bytestream_pb2_grpc.ByteStreamStub(context.channel)
    
    178
    +
    
    179
    +    stream = None
    
    180
    +    for stream in response:
    
    181
    +        context.logger.info(stream)
    
    182
    +
    
    183
    +    execute_response = remote_execution_pb2.ExecuteResponse()
    
    184
    +    stream.response.Unpack(execute_response)
    
    185
    +
    
    186
    +    for output_file_response in execute_response.result.output_files:
    
    187
    +        path = os.path.join(output_directory, output_file_response.path)
    
    188
    +
    
    189
    +        if not os.path.exists(os.path.dirname(path)):
    
    190
    +
    
    191
    +            try:
    
    192
    +                os.makedirs(os.path.dirname(path))
    
    193
    +
    
    194
    +            except OSError as exc:
    
    195
    +                if exc.errno != errno.EEXIST:
    
    196
    +                    raise
    
    197
    +
    
    198
    +        with open(path, 'wb+') as f:
    
    199
    +            write_fetch_blob(f, stub, output_file_response.digest, instance_name)
    
    200
    +
    
    201
    +        if output_file_response.path in output_executeables:
    
    202
    +            st = os.stat(path)
    
    203
    +            os.chmod(path, st.st_mode | stat.S_IXUSR)

  • buildgrid/bot/bot_session.py
    ... ... @@ -104,14 +104,15 @@ class BotSession:
    104 104
                 self._update_lease_from_server(lease)
    
    105 105
     
    
    106 106
         def update_bot_session(self):
    
    107
    +        self.logger.debug("Updating bot session: {}".format(self._bot_id))
    
    107 108
             session = self._interface.update_bot_session(self.get_pb2())
    
    108
    -        for lease in session.leases:
    
    109
    -            self._update_lease_from_server(lease)
    
    110
    -
    
    111
    -        for k, v in self._leases.items():
    
    109
    +        for k, v in list(self._leases.items()):
    
    112 110
                 if v.state == LeaseState.COMPLETED.value:
    
    113 111
                     del self._leases[k]
    
    114 112
     
    
    113
    +        for lease in session.leases:
    
    114
    +            self._update_lease_from_server(lease)
    
    115
    +
    
    115 116
         def get_pb2(self):
    
    116 117
             leases = list(self._leases.values())
    
    117 118
             if not leases:
    
    ... ... @@ -134,12 +135,16 @@ class BotSession:
    134 135
             # TODO: Compare with previous state of lease
    
    135 136
             if lease.state == LeaseState.PENDING.value:
    
    136 137
                 lease.state = LeaseState.ACTIVE.value
    
    137
    -            asyncio.ensure_future(self.create_work(lease))
    
    138 138
                 self._leases[lease.id] = lease
    
    139
    +            self.update_bot_session()
    
    140
    +            asyncio.ensure_future(self.create_work(lease))
    
    139 141
     
    
    140 142
         async def create_work(self, lease):
    
    141 143
             self.logger.debug("Work created: {}".format(lease.id))
    
    142
    -        lease = await self._work(self._context, lease)
    
    144
    +
    
    145
    +        loop = asyncio.get_event_loop()
    
    146
    +        lease = await loop.run_in_executor(None, self._work, self._context, lease)
    
    147
    +
    
    143 148
             self.logger.debug("Work complete: {}".format(lease.id))
    
    144 149
             self.lease_completed(lease)
    
    145 150
     
    

  • buildgrid/server/job.py
    ... ... @@ -21,26 +21,25 @@ from enum import Enum
    21 21
     
    
    22 22
     from google.protobuf import any_pb2
    
    23 23
     
    
    24
    -from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ExecuteOperationMetadata
    
    25
    -from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ExecuteResponse
    
    24
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    26 25
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    27 26
     from buildgrid._protos.google.longrunning import operations_pb2
    
    28 27
     
    
    29 28
     
    
    30 29
     class ExecuteStage(Enum):
    
    31
    -    UNKNOWN = ExecuteOperationMetadata.Stage.Value('UNKNOWN')
    
    30
    +    UNKNOWN = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('UNKNOWN')
    
    32 31
     
    
    33 32
         # Checking the result against the cache.
    
    34
    -    CACHE_CHECK = ExecuteOperationMetadata.Stage.Value('CACHE_CHECK')
    
    33
    +    CACHE_CHECK = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('CACHE_CHECK')
    
    35 34
     
    
    36 35
         # Currently idle, awaiting a free machine to execute.
    
    37
    -    QUEUED = ExecuteOperationMetadata.Stage.Value('QUEUED')
    
    36
    +    QUEUED = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('QUEUED')
    
    38 37
     
    
    39 38
         # Currently being executed by a worker.
    
    40
    -    EXECUTING = ExecuteOperationMetadata.Stage.Value('EXECUTING')
    
    39
    +    EXECUTING = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('EXECUTING')
    
    41 40
     
    
    42 41
         # Finished execution.
    
    43
    -    COMPLETED = ExecuteOperationMetadata.Stage.Value('COMPLETED')
    
    42
    +    COMPLETED = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('COMPLETED')
    
    44 43
     
    
    45 44
     
    
    46 45
     class BotStatus(Enum):
    
    ... ... @@ -80,13 +79,13 @@ class Job:
    80 79
         def __init__(self, action_digest, do_not_cache=False, message_queue=None):
    
    81 80
             self.lease = None
    
    82 81
             self.logger = logging.getLogger(__name__)
    
    82
    +        self.n_tries = 0
    
    83 83
             self.result = None
    
    84 84
             self.result_cached = False
    
    85 85
     
    
    86 86
             self._action_digest = action_digest
    
    87 87
             self._do_not_cache = do_not_cache
    
    88 88
             self._execute_stage = ExecuteStage.UNKNOWN
    
    89
    -        self._n_tries = 0
    
    90 89
             self._name = str(uuid.uuid4())
    
    91 90
             self._operation = operations_pb2.Operation(name=self._name)
    
    92 91
             self._operation_update_queues = []
    
    ... ... @@ -122,15 +121,16 @@ class Job:
    122 121
             self._operation.metadata.CopyFrom(self._pack_any(self.get_operation_meta()))
    
    123 122
             if self.result is not None:
    
    124 123
                 self._operation.done = True
    
    125
    -            response = ExecuteResponse()
    
    126
    -            self.result.Unpack(response.result)
    
    127
    -            response.cached_result = self.result_cached
    
    124
    +            action_result = remote_execution_pb2.ActionResult()
    
    125
    +            self.result.Unpack(action_result)
    
    126
    +            response = remote_execution_pb2.ExecuteResponse(result=action_result,
    
    127
    +                                                            cached_result=self.result_cached)
    
    128 128
                 self._operation.response.CopyFrom(self._pack_any(response))
    
    129 129
     
    
    130 130
             return self._operation
    
    131 131
     
    
    132 132
         def get_operation_meta(self):
    
    133
    -        meta = ExecuteOperationMetadata()
    
    133
    +        meta = remote_execution_pb2.ExecuteOperationMetadata()
    
    134 134
             meta.stage = self._execute_stage.value
    
    135 135
             meta.action_digest.CopyFrom(self._action_digest)
    
    136 136
     
    

  • buildgrid/server/scheduler.py
    ... ... @@ -25,7 +25,6 @@ from collections import deque
    25 25
     
    
    26 26
     from google.protobuf import any_pb2
    
    27 27
     
    
    28
    -from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ActionResult
    
    29 28
     from buildgrid._protos.google.longrunning import operations_pb2
    
    30 29
     
    
    31 30
     from .job import ExecuteStage, LeaseState
    
    ... ... @@ -83,9 +82,7 @@ class Scheduler:
    83 82
             job.update_execute_stage(ExecuteStage.COMPLETED)
    
    84 83
             self.jobs[name] = job
    
    85 84
             if not job.do_not_cache and self.action_cache is not None:
    
    86
    -            action_result = ActionResult()
    
    87
    -            result.Unpack(action_result)
    
    88
    -            self.action_cache.put_action_result(job.action_digest, action_result)
    
    85
    +            self.action_cache.put_action_result(job.action_digest, result)
    
    89 86
     
    
    90 87
         def get_operations(self):
    
    91 88
             response = operations_pb2.ListOperationsResponse()
    
    ... ... @@ -94,7 +91,7 @@ class Scheduler:
    94 91
             return response
    
    95 92
     
    
    96 93
         def update_job_lease_state(self, name, state):
    
    97
    -        job = self.jobs.get(name)
    
    94
    +        job = self.jobs[name]
    
    98 95
             job.lease.state = state
    
    99 96
             self.jobs[name] = job
    
    100 97
     
    

  • buildgrid/utils.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +import os
    
    16
    +
    
    17
    +from buildgrid.settings import HASH
    
    18
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    19
    +from buildgrid._protos.google.bytestream import bytestream_pb2
    
    20
    +
    
    21
    +
    
    22
    +def gen_fetch_blob(stub, digest, instance_name=""):
    
    23
    +    """ Generates byte stream from a fetch blob request
    
    24
    +    """
    
    25
    +
    
    26
    +    resource_name = os.path.join(instance_name, 'blobs', digest.hash, str(digest.size_bytes))
    
    27
    +    request = bytestream_pb2.ReadRequest(resource_name=resource_name,
    
    28
    +                                         read_offset=0)
    
    29
    +    for response in stub.Read(request):
    
    30
    +        yield response.data
    
    31
    +
    
    32
    +
    
    33
    +def write_fetch_directory(directory, stub, digest, instance_name=""):
    
    34
    +    """ Given a directory digest, fetches files and writes them to a directory
    
    35
    +    """
    
    36
    +    # TODO: Extend to symlinks and inner directories
    
    37
    +    # pathlib.Path('/my/directory').mkdir(parents=True, exist_ok=True)
    
    38
    +
    
    39
    +    directory_pb2 = remote_execution_pb2.Directory()
    
    40
    +    directory_pb2 = parse_to_pb2_from_fetch(directory_pb2, stub, digest, instance_name)
    
    41
    +
    
    42
    +    for file_node in directory_pb2.files:
    
    43
    +        path = os.path.join(directory, file_node.name)
    
    44
    +        with open(path, 'wb') as f:
    
    45
    +            write_fetch_blob(f, stub, file_node.digest, instance_name)
    
    46
    +
    
    47
    +
    
    48
    +def write_fetch_blob(out, stub, digest, instance_name=""):
    
    49
    +    """ Given an output buffer, fetches blob and writes to buffer
    
    50
    +    """
    
    51
    +
    
    52
    +    for stream in gen_fetch_blob(stub, digest, instance_name):
    
    53
    +        out.write(stream)
    
    54
    +
    
    55
    +    out.flush()
    
    56
    +    assert digest.size_bytes == os.fstat(out.fileno()).st_size
    
    57
    +
    
    58
    +
    
    59
    +def parse_to_pb2_from_fetch(pb2, stub, digest, instance_name=""):
    
    60
    +    """ Fetches stream and parses it into given pb2
    
    61
    +    """
    
    62
    +
    
    63
    +    stream_bytes = b''
    
    64
    +    for stream in gen_fetch_blob(stub, digest, instance_name):
    
    65
    +        stream_bytes += stream
    
    66
    +
    
    67
    +    pb2.ParseFromString(stream_bytes)
    
    68
    +    return pb2
    
    69
    +
    
    70
    +
    
    71
    +def create_digest(bytes_to_digest):
    
    72
    +    """ Creates a hash based on the hex digest and returns the digest
    
    73
    +    """
    
    74
    +    return remote_execution_pb2.Digest(hash=HASH(bytes_to_digest).hexdigest(),
    
    75
    +                                       size_bytes=len(bytes_to_digest))
    
    76
    +
    
    77
    +
    
    78
    +def merkle_maker(directory):
    
    79
    +    """ Walks thorugh given directory, yielding the binary and digest
    
    80
    +    """
    
    81
    +    directory_pb2 = remote_execution_pb2.Directory()
    
    82
    +    for (dir_path, dir_names, file_names) in os.walk(directory):
    
    83
    +
    
    84
    +        for file_name in file_names:
    
    85
    +            file_path = os.path.join(dir_path, file_name)
    
    86
    +            chunk = read_file(file_path)
    
    87
    +            file_digest = create_digest(chunk)
    
    88
    +            directory_pb2.files.extend([file_maker(file_path, file_digest)])
    
    89
    +            yield chunk, file_digest
    
    90
    +
    
    91
    +        for inner_dir in dir_names:
    
    92
    +            inner_dir_path = os.path.join(dir_path, inner_dir)
    
    93
    +            yield from merkle_maker(inner_dir_path)
    
    94
    +
    
    95
    +    directory_string = directory_pb2.SerializeToString()
    
    96
    +
    
    97
    +    yield directory_string, create_digest(directory_string)
    
    98
    +
    
    99
    +
    
    100
    +def file_maker(file_path, file_digest):
    
    101
    +    """ Creates a File Node
    
    102
    +    """
    
    103
    +    _, file_name = os.path.split(file_path)
    
    104
    +    return remote_execution_pb2.FileNode(name=file_name,
    
    105
    +                                         digest=file_digest,
    
    106
    +                                         is_executable=os.access(file_path, os.X_OK))
    
    107
    +
    
    108
    +
    
    109
    +def read_file(read):
    
    110
    +    with open(read, 'rb') as f:
    
    111
    +        return f.read()

  • tests/integration/operations_service.py
    ... ... @@ -100,12 +100,14 @@ def test_list_operations_with_result(instance, execute_request, context):
    100 100
         action_result = remote_execution_pb2.ActionResult()
    
    101 101
         output_file = remote_execution_pb2.OutputFile(path='unicorn')
    
    102 102
         action_result.output_files.extend([output_file])
    
    103
    -    instance._instance._scheduler.jobs[response_execute.name].result = _pack_any(action_result)
    
    103
    +
    
    104
    +    instance._instance._scheduler.job_complete(response_execute.name, _pack_any(action_result))
    
    104 105
     
    
    105 106
         request = operations_pb2.ListOperationsRequest()
    
    106 107
         response = instance.ListOperations(request, context)
    
    107 108
     
    
    108 109
         assert response.operations[0].name == response_execute.name
    
    110
    +
    
    109 111
         execute_response = remote_execution_pb2.ExecuteResponse()
    
    110 112
         response.operations[0].response.Unpack(execute_response)
    
    111 113
         assert execute_response.result == action_result
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]