[Notes] [Git][BuildGrid/buildgrid][finn/cas-commands] Added new cas command to the app and new bot which builds local commands



Title: GitLab

finnball pushed to branch finn/cas-commands at BuildGrid / buildgrid

Commits:

14 changed files:

Changes:

  • .gitlab-ci.yml
    ... ... @@ -31,7 +31,7 @@ before_script:
    31 31
         - ${BGD} server start &
    
    32 32
         - sleep 1 # Allow server to boot
    
    33 33
         - ${BGD} bot --host=0.0.0.0 dummy &
    
    34
    -    - ${BGD} execute --host=0.0.0.0 request --wait-for-completion
    
    34
    +    - ${BGD} execute --host=0.0.0.0 request-dummy --wait-for-completion
    
    35 35
     
    
    36 36
     tests-debian-stretch:
    
    37 37
       <<: *linux-tests
    

  • README.rst
    ... ... @@ -51,3 +51,35 @@ Create a bot session::
    51 51
     Show the work as completed::
    
    52 52
     
    
    53 53
       bgd execute list
    
    54
    +
    
    55
    +Instructions for a Simple Build
    
    56
    +-------------------------------
    
    57
    +
    
    58
    +This example covers a simple build. The user will upload a directory containing a C file and a command to the CAS. The bot will then create a temporary directory, fetch the directory and run the command defined by the user. This is an early demo and still lacks a few features such as symlink support and checking to see if files exist in the CAS before executing a command.
    
    59
    +
    
    60
    +Create a new directory called `test-buildgrid/` and place the following C file in it called `hello.c`::
    
    61
    +
    
    62
    +  #include <stdio.h>
    
    63
    +  int main()
    
    64
    +  {
    
    65
    +   printf("Hello, World!");
    
    66
    +   return 0;
    
    67
    +  }
    
    68
    +
    
    69
    +Now start a BuildGrid server, passing it a directory it can write a CAS to::
    
    70
    +
    
    71
    +  bgd server start --cas disk --cas-cache disk --cas-disk-directory /path/to/empty/directory
    
    72
    +
    
    73
    +Start the following bot session::
    
    74
    +
    
    75
    +  bgd bot temp-directory
    
    76
    +
    
    77
    +Upload the directory containing the C file::
    
    78
    +
    
    79
    +  bgd cas upload-dir /path/to/test-buildgrid
    
    80
    +
    
    81
    +Now we send an execution request to the bot with the name of the epxected `output-file`, a boolean describing if it is executeable, the path to the directory we uploaded in order to calculate the digest and finally the command to run on the bot::
    
    82
    +
    
    83
    +  bgd execute command --output-file hello True /path/to/test-buildgrid -- gcc -Wall hello.c -o hello
    
    84
    +
    
    85
    +The resulting executeable should have returned to a new directory called `testing/`

  • app/bots/__init__.py

  • app/bots/buildbox.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +import asyncio
    
    17
    +import grpc
    
    18
    +import os
    
    19
    +import subprocess
    
    20
    +
    
    21
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    22
    +from buildgrid._protos.google.bytestream import bytestream_pb2
    
    23
    +from google.protobuf import any_pb2
    
    24
    +
    
    25
    +def work_buildbox(context, lease):
    
    26
    +    logger = context.logger
    
    27
    +
    
    28
    +    action_any = lease.payload
    
    29
    +    action = remote_execution_pb2.Action()
    
    30
    +    action_any.Unpack(action)
    
    31
    +
    
    32
    +    cert_server = read_file(context.server_cert)
    
    33
    +    cert_client = read_file(context.client_cert)
    
    34
    +    key_client = read_file(context.client_key)
    
    35
    +
    
    36
    +    # create server credentials
    
    37
    +    credentials = grpc.ssl_channel_credentials(root_certificates=cert_server,
    
    38
    +                                               private_key=key_client,
    
    39
    +                                               certificate_chain=cert_client)
    
    40
    +
    
    41
    +    channel = grpc.secure_channel('{}:{}'.format(context.remote, context.port), credentials)
    
    42
    +
    
    43
    +    stub = bytestream_pb2_grpc.ByteStreamStub(channel)
    
    44
    +
    
    45
    +    remote_command = _buildstream_fetch_command(context.local_cas, stub, action.command_digest)
    
    46
    +    environment = dict((x.name, x.value) for x in remote_command.environment_variables)
    
    47
    +    logger.debug("command hash: {}".format(action.command_digest.hash))
    
    48
    +    logger.debug("vdir hash: {}".format(action.input_root_digest.hash))
    
    49
    +    logger.debug("\n{}".format(' '.join(remote_command.arguments)))
    
    50
    +
    
    51
    +    command = ['buildbox',
    
    52
    +               '--remote={}'.format('https://{}:{}'.format(context.remote, context.port)),
    
    53
    +               '--server-cert={}'.format(context.server_cert),
    
    54
    +               '--client-key={}'.format(context.client_key),
    
    55
    +               '--client-cert={}'.format(context.client_cert),
    
    56
    +               '--local={}'.format(context.local_cas),
    
    57
    +               '--chdir={}'.format(environment['PWD']),
    
    58
    +               context.fuse_dir]
    
    59
    +
    
    60
    +    command.extend(remote_command.arguments)
    
    61
    +
    
    62
    +    logger.debug(' '.join(command))
    
    63
    +    logger.debug("Input root digest:\n{}".format(action.input_root_digest))
    
    64
    +    logger.info("Launching process")
    
    65
    +
    
    66
    +    proc = subprocess.Popen(command,
    
    67
    +                            stdin=subprocess.PIPE,
    
    68
    +                            stdout=subprocess.PIPE)
    
    69
    +    std_send = action.input_root_digest.SerializeToString()
    
    70
    +    std_out, std_error = proc.communicate(std_send)
    
    71
    +
    
    72
    +    output_root_digest = remote_execution_pb2.Digest()
    
    73
    +    output_root_digest.ParseFromString(std_out)
    
    74
    +    logger.debug("Output root digest: {}".format(output_root_digest))
    
    75
    +
    
    76
    +    output_file = remote_execution_pb2.OutputDirectory(tree_digest = output_root_digest)
    
    77
    +
    
    78
    +    action_result = remote_execution_pb2.ActionResult()
    
    79
    +    action_result.output_directories.extend([output_file])
    
    80
    +
    
    81
    +    action_result_any = any_pb2.Any()
    
    82
    +    action_result_any.Pack(action_result)
    
    83
    +
    
    84
    +    lease.result.CopyFrom(action_result_any)
    
    85
    +
    
    86
    +    return lease
    
    87
    +
    
    88
    +## ########################################################################## ##
    
    89
    +## Below is for BuildStream's current incorrect implementation for ByteStream ##
    
    90
    +## ########################################################################## ##
    
    91
    +
    
    92
    +def _buildstream_fetch_blob(remote, digest, out):
    
    93
    +    resource_name = os.path.join(digest.hash, str(digest.size_bytes))
    
    94
    +    request = bytestream_pb2.ReadRequest()
    
    95
    +    request.resource_name = resource_name
    
    96
    +    request.read_offset = 0
    
    97
    +    for response in remote.Read(request):
    
    98
    +        out.write(response.data)
    
    99
    +
    
    100
    +    out.flush()
    
    101
    +    assert digest.size_bytes == os.fstat(out.fileno()).st_size
    
    102
    +
    
    103
    +def _buildstream_fetch_command(casdir, remote, digest):
    
    104
    +    with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as out:
    
    105
    +        _buildstream_fetch_blob(remote, digest, out)
    
    106
    +        remote_command = remote_execution_pb2.Command()
    
    107
    +        with open(out.name, 'rb') as f:
    
    108
    +            remote_command.ParseFromString(f.read())
    
    109
    +        return remote_command
    
    110
    +
    
    111
    +def _buildstream_fetch_action(casdir, remote, digest):
    
    112
    +    with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as out:
    
    113
    +        _buildstream_fetch_blob(remote, digest, out)
    
    114
    +        remote_action = remote_execution_pb2.Action()
    
    115
    +        with open(out.name, 'rb') as f:
    
    116
    +            remote_action.ParseFromString(f.read())
    
    117
    +        return remote_action

  • app/bots/dummy.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +import random
    
    17
    +import time
    
    18
    +
    
    19
    +def work_dummy(context, lease):
    
    20
    +    """ Just returns lease after some random time
    
    21
    +    """
    
    22
    +    time.sleep(random.randint(1,5))
    
    23
    +    return lease

  • app/bots/temp_directory.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +import asyncio
    
    17
    +import os
    
    18
    +import subprocess
    
    19
    +import tempfile
    
    20
    +
    
    21
    +from buildgrid.utils import read_file, create_digest, write_fetch_blob, write_fetch_directory, parse_to_pb2_from_fetch
    
    22
    +
    
    23
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    24
    +from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    25
    +from google.protobuf import any_pb2
    
    26
    +
    
    27
    +def work_temp_directory(context, lease):
    
    28
    +    """ Bot downloads directories and files into a temp directory,
    
    29
    +    then uploads results back to CAS
    
    30
    +    """
    
    31
    +
    
    32
    +    instance_name = context.instance_name
    
    33
    +    stub_bytestream = bytestream_pb2_grpc.ByteStreamStub(context.channel)
    
    34
    +
    
    35
    +    action_digest = remote_execution_pb2.Digest()
    
    36
    +    lease.payload.Unpack(action_digest)
    
    37
    +
    
    38
    +    action = remote_execution_pb2.Action()
    
    39
    +
    
    40
    +    action = parse_to_pb2_from_fetch(action, stub_bytestream, action_digest, instance_name)
    
    41
    +
    
    42
    +    with tempfile.TemporaryDirectory() as temp_dir:
    
    43
    +
    
    44
    +        command = remote_execution_pb2.Command()
    
    45
    +        command = parse_to_pb2_from_fetch(command, stub_bytestream, action.command_digest, instance_name)
    
    46
    +
    
    47
    +        arguments = "cd {} &&".format(temp_dir)
    
    48
    +
    
    49
    +        for argument in command.arguments:
    
    50
    +            arguments += " {}".format(argument)
    
    51
    +
    
    52
    +        context.logger.info(arguments)
    
    53
    +
    
    54
    +        write_fetch_directory(temp_dir, stub_bytestream, action.input_root_digest, instance_name)
    
    55
    +
    
    56
    +        proc = subprocess.Popen(arguments,
    
    57
    +                                shell=True,
    
    58
    +                                stdin=subprocess.PIPE,
    
    59
    +                                stdout=subprocess.PIPE)
    
    60
    +        std_out, std_error = proc.communicate()
    
    61
    +
    
    62
    +        result = remote_execution_pb2.ActionResult()
    
    63
    +        requests = []
    
    64
    +        for output_file in command.output_files:
    
    65
    +            path = os.path.join(temp_dir, output_file)
    
    66
    +
    
    67
    +            chunk = read_file(path)
    
    68
    +            digest = create_digest(chunk)
    
    69
    +
    
    70
    +            result.output_files.extend([remote_execution_pb2.OutputFile(path=output_file,
    
    71
    +                                                                        digest=digest)])
    
    72
    +
    
    73
    +            requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
    
    74
    +                digest=digest, data=chunk))
    
    75
    +
    
    76
    +        request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
    
    77
    +                                                           requests=requests)
    
    78
    +
    
    79
    +        stub_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel)
    
    80
    +        stub_cas.BatchUpdateBlobs(request)
    
    81
    +
    
    82
    +        result_any = any_pb2.Any()
    
    83
    +        result_any.Pack(result)
    
    84
    +
    
    85
    +        lease.result.CopyFrom(result_any)
    
    86
    +
    
    87
    +    return lease

  • app/commands/cmd_bot.py
    ... ... @@ -26,22 +26,17 @@ import asyncio
    26 26
     import click
    
    27 27
     import grpc
    
    28 28
     import logging
    
    29
    -import os
    
    30
    -import random
    
    31
    -import subprocess
    
    32
    -import tempfile
    
    33 29
     
    
    34 30
     from pathlib import Path, PurePath
    
    35 31
     
    
    36 32
     from buildgrid.bot import bot, bot_interface
    
    37 33
     from buildgrid.bot.bot_session import BotSession, Device, Worker
    
    38
    -from buildgrid._exceptions import BotError
    
    39 34
     
    
    35
    +from ..bots import buildbox, dummy, temp_directory
    
    40 36
     from ..cli import pass_context
    
    41 37
     
    
    42
    -from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    43 38
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    44
    -from google.protobuf import any_pb2
    
    39
    +
    
    45 40
     
    
    46 41
     @click.group(short_help = 'Create a bot client')
    
    47 42
     @click.option('--parent', default='bgd_test')
    
    ... ... @@ -54,6 +49,7 @@ def cli(context, host, port, parent):
    54 49
     
    
    55 50
         context.logger = logging.getLogger(__name__)
    
    56 51
         context.logger.info("Starting on port {}".format(port))
    
    52
    +    context.channel = channel
    
    57 53
     
    
    58 54
         worker = Worker()
    
    59 55
         worker.add_device(Device())
    
    ... ... @@ -63,22 +59,41 @@ def cli(context, host, port, parent):
    63 59
     
    
    64 60
         context.bot_session = bot_session
    
    65 61
     
    
    66
    -@cli.command('dummy', short_help='Create a dummy bot session')
    
    62
    +
    
    63
    +@cli.command('dummy', short_help='Create a dummy bot session which just returns lease')
    
    67 64
     @pass_context
    
    68
    -def dummy(context):
    
    65
    +def run_dummy(context):
    
    69 66
         """
    
    70 67
         Simple dummy client. Creates a session, accepts leases, does fake work and
    
    71 68
         updates the server.
    
    72 69
         """
    
    73 70
         try:
    
    74 71
             b = bot.Bot(context.bot_session)
    
    75
    -        b.session(_work_dummy,
    
    72
    +        b.session(dummy.work_dummy,
    
    73
    +                  context)
    
    74
    +
    
    75
    +    except KeyboardInterrupt:
    
    76
    +        pass
    
    77
    +
    
    78
    +
    
    79
    +@cli.command('temp-directory', short_help='Runs commands in temp directory and uploads results')
    
    80
    +@click.option('--instance-name', default='testing')
    
    81
    +@pass_context
    
    82
    +def run_temp_directory(context, instance_name):
    
    83
    +    """ Downloads files and command from CAS and runs
    
    84
    +    in a temp directory, uploading result back to CAS
    
    85
    +    """
    
    86
    +    context.instance_name = instance_name
    
    87
    +    try:
    
    88
    +        b = bot.Bot(context.bot_session)
    
    89
    +        b.session(temp_directory.work_temp_directory,
    
    76 90
                       context)
    
    77 91
     
    
    78 92
         except KeyboardInterrupt:
    
    79 93
             pass
    
    80 94
     
    
    81
    -@cli.command('buildbox', short_help='Create a bot session with busybox')
    
    95
    +
    
    96
    +@cli.command('buildbox', short_help='Create a bot session with buildbox')
    
    82 97
     @click.option('--fuse-dir', show_default = True, default=str(PurePath(Path.home(), 'fuse')))
    
    83 98
     @click.option('--local-cas', show_default = True, default=str(PurePath(Path.home(), 'cas')))
    
    84 99
     @click.option('--client-cert', show_default = True, default=str(PurePath(Path.home(), 'client.crt')))
    
    ... ... @@ -87,7 +102,7 @@ def dummy(context):
    87 102
     @click.option('--port', show_default = True, default=11001)
    
    88 103
     @click.option('--remote', show_default = True, default='localhost')
    
    89 104
     @pass_context
    
    90
    -def work_buildbox(context, remote, port, server_cert, client_key, client_cert, local_cas, fuse_dir):
    
    105
    +def run_buildbox(context, remote, port, server_cert, client_key, client_cert, local_cas, fuse_dir):
    
    91 106
         """
    
    92 107
         Uses BuildBox to run commands.
    
    93 108
         """
    
    ... ... @@ -103,104 +118,10 @@ def work_buildbox(context, remote, port, server_cert, client_key, client_cert, l
    103 118
         context.fuse_dir = fuse_dir
    
    104 119
     
    
    105 120
         try:
    
    106
    -        b = bot.Bot(work=_work_buildbox,
    
    107
    -                    bot_session=context.bot_session,
    
    108
    -                    channel=context.channel,
    
    109
    -                    parent=context.parent)
    
    110 121
     
    
    111
    -        b.session(context.parent,
    
    112
    -                  _work_buildbox,
    
    122
    +        b = bot.Bot(context.bot_session)
    
    123
    +        b.session(buildbox.work_buildbox,
    
    113 124
                       context)
    
    114 125
     
    
    115 126
         except KeyboardInterrupt:
    
    116 127
             pass
    117
    -
    
    118
    -async def _work_dummy(context, lease):
    
    119
    -    await asyncio.sleep(random.randint(1,5))
    
    120
    -    return lease
    
    121
    -
    
    122
    -async def _work_buildbox(context, lease):
    
    123
    -    logger = context.logger
    
    124
    -
    
    125
    -    action_any = lease.payload
    
    126
    -    action = remote_execution_pb2.Action()
    
    127
    -    action_any.Unpack(action)
    
    128
    -
    
    129
    -    cert_server = _file_read(context.server_cert)
    
    130
    -    cert_client = _file_read(context.client_cert)
    
    131
    -    key_client = _file_read(context.client_key)
    
    132
    -
    
    133
    -    # create server credentials
    
    134
    -    credentials = grpc.ssl_channel_credentials(root_certificates=cert_server,
    
    135
    -                                               private_key=key_client,
    
    136
    -                                               certificate_chain=cert_client)
    
    137
    -
    
    138
    -    channel = grpc.secure_channel('{}:{}'.format(context.remote, context.port), credentials)
    
    139
    -
    
    140
    -    stub = bytestream_pb2_grpc.ByteStreamStub(channel)
    
    141
    -
    
    142
    -    remote_command = _fetch_command(context.local_cas, stub, action.command_digest)
    
    143
    -    environment = dict((x.name, x.value) for x in remote_command.environment_variables)
    
    144
    -    logger.debug("command hash: {}".format(action.command_digest.hash))
    
    145
    -    logger.debug("vdir hash: {}".format(action.input_root_digest.hash))
    
    146
    -    logger.debug("\n{}".format(' '.join(remote_command.arguments)))
    
    147
    -
    
    148
    -    command = ['buildbox',
    
    149
    -               '--remote={}'.format('https://{}:{}'.format(context.remote, context.port)),
    
    150
    -               '--server-cert={}'.format(context.server_cert),
    
    151
    -               '--client-key={}'.format(context.client_key),
    
    152
    -               '--client-cert={}'.format(context.client_cert),
    
    153
    -               '--local={}'.format(context.local_cas),
    
    154
    -               '--chdir={}'.format(environment['PWD']),
    
    155
    -               context.fuse_dir]
    
    156
    -
    
    157
    -    command.extend(remote_command.arguments)
    
    158
    -
    
    159
    -    logger.debug(' '.join(command))
    
    160
    -    logger.debug("Input root digest:\n{}".format(action.input_root_digest))
    
    161
    -    logger.info("Launching process")
    
    162
    -
    
    163
    -    proc = subprocess.Popen(command,
    
    164
    -                            stdin=subprocess.PIPE,
    
    165
    -                            stdout=subprocess.PIPE)
    
    166
    -    std_send = action.input_root_digest.SerializeToString()
    
    167
    -    std_out, std_error = proc.communicate(std_send)
    
    168
    -
    
    169
    -    output_root_digest = remote_execution_pb2.Digest()
    
    170
    -    output_root_digest.ParseFromString(std_out)
    
    171
    -    logger.debug("Output root digest: {}".format(output_root_digest))
    
    172
    -
    
    173
    -    output_file = remote_execution_pb2.OutputDirectory(tree_digest = output_root_digest)
    
    174
    -
    
    175
    -    action_result = remote_execution_pb2.ActionResult()
    
    176
    -    action_result.output_directories.extend([output_file])
    
    177
    -
    
    178
    -    action_result_any = any_pb2.Any()
    
    179
    -    action_result_any.Pack(action_result)
    
    180
    -
    
    181
    -    lease.result.CopyFrom(action_result_any)
    
    182
    -
    
    183
    -    return lease
    
    184
    -
    
    185
    -def _fetch_blob(remote, digest, out):
    
    186
    -    resource_name = os.path.join(digest.hash, str(digest.size_bytes))
    
    187
    -    request = bytestream_pb2.ReadRequest()
    
    188
    -    request.resource_name = resource_name
    
    189
    -    request.read_offset = 0
    
    190
    -    for response in remote.Read(request):
    
    191
    -        out.write(response.data)
    
    192
    -
    
    193
    -    out.flush()
    
    194
    -    assert digest.size_bytes == os.fstat(out.fileno()).st_size
    
    195
    -
    
    196
    -def _fetch_command(casdir, remote, digest):
    
    197
    -    with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as out:
    
    198
    -        _fetch_blob(remote, digest, out)
    
    199
    -        remote_command = remote_execution_pb2.Command()
    
    200
    -        with open(out.name, 'rb') as f:
    
    201
    -            remote_command.ParseFromString(f.read())
    
    202
    -        return remote_command
    
    203
    -
    
    204
    -def _file_read(file_path):
    
    205
    -    with open(file_path, 'rb') as f:
    
    206
    -        return f.read()

  • app/commands/cmd_cas.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +"""
    
    17
    +Execute command
    
    18
    +=================
    
    19
    +
    
    20
    +Request work to be executed and monitor status of jobs.
    
    21
    +"""
    
    22
    +
    
    23
    +import click
    
    24
    +import grpc
    
    25
    +import logging
    
    26
    +import os
    
    27
    +import sys
    
    28
    +
    
    29
    +from buildgrid.utils import merkle_maker, read_file, create_digest
    
    30
    +from ..cli import pass_context
    
    31
    +
    
    32
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    33
    +
    
    34
    +@click.group(short_help = "Interact with the CAS")
    
    35
    +@click.option('--port', default='50051')
    
    36
    +@click.option('--host', default='localhost')
    
    37
    +@pass_context
    
    38
    +def cli(context, host, port):
    
    39
    +    context.logger = logging.getLogger(__name__)
    
    40
    +    context.logger.info("Starting on port {}".format(port))
    
    41
    +
    
    42
    +    context.channel = grpc.insecure_channel('{}:{}'.format(host, port))
    
    43
    +    context.port = port
    
    44
    +
    
    45
    +@cli.command('upload-files', short_help='Upload files')
    
    46
    +@click.argument('files', nargs=-1, type=click.File('rb'))
    
    47
    +@click.option('--instance-name', default='testing')
    
    48
    +@pass_context
    
    49
    +def upload_files(context, files, instance_name):
    
    50
    +    stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel)
    
    51
    +
    
    52
    +    requests = []
    
    53
    +    for file in files:
    
    54
    +        chunk = file.read()
    
    55
    +        requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
    
    56
    +            digest=create_digest(chunk), data=chunk))
    
    57
    +
    
    58
    +    request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
    
    59
    +                                                           requests=requests)
    
    60
    +
    
    61
    +    context.logger.info("Sending: {}".format(request))
    
    62
    +    response = stub.BatchUpdateBlobs(request)
    
    63
    +    context.logger.info("Response: {}".format(response))
    
    64
    +
    
    65
    +@cli.command('upload-dir', short_help='Upload files')
    
    66
    +@click.argument('dir')
    
    67
    +@click.option('--instance-name', default='testing')
    
    68
    +@pass_context
    
    69
    +def upload_dir(context, dir, instance_name):
    
    70
    +    context.logger.info("Uploading directory to cas")
    
    71
    +    stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel)
    
    72
    +
    
    73
    +    requests = []
    
    74
    +
    
    75
    +    for chunk, file_digest in merkle_maker(dir):
    
    76
    +        requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
    
    77
    +            digest=file_digest, data=chunk))
    
    78
    +
    
    79
    +    request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
    
    80
    +                                                           requests=requests)
    
    81
    +
    
    82
    +    context.logger.info("Request:\n{}".format(request))
    
    83
    +    response = stub.BatchUpdateBlobs(request)
    
    84
    +    context.logger.info("Response:\n{}".format(response))

  • app/commands/cmd_execute.py
    ... ... @@ -25,16 +25,22 @@ Request work to be executed and monitor status of jobs.
    25 25
     import click
    
    26 26
     import grpc
    
    27 27
     import logging
    
    28
    -import sys
    
    29
    -import time
    
    28
    +import stat
    
    29
    +import os
    
    30 30
     
    
    31
    +from buildgrid.utils import merkle_maker, read_file, create_digest, write_fetch_blob
    
    31 32
     from ..cli import pass_context
    
    32 33
     
    
    34
    +from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    35
    +
    
    36
    +from buildgrid.settings import HASH
    
    37
    +
    
    33 38
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    34 39
     from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ExecuteOperationMetadata
    
    35 40
     from buildgrid._protos.google.longrunning import operations_pb2, operations_pb2_grpc
    
    36 41
     from google.protobuf import any_pb2
    
    37 42
     
    
    43
    +
    
    38 44
     @click.group(short_help = "Simple execute client")
    
    39 45
     @click.option('--port', default='50051')
    
    40 46
     @click.option('--host', default='localhost')
    
    ... ... @@ -46,12 +52,13 @@ def cli(context, host, port):
    46 52
         context.channel = grpc.insecure_channel('{}:{}'.format(host, port))
    
    47 53
         context.port = port
    
    48 54
     
    
    49
    -@cli.command('request', short_help='Send a dummy action')
    
    55
    +
    
    56
    +@cli.command('request-dummy', short_help='Send a dummy action')
    
    50 57
     @click.option('--number', default=1)
    
    51 58
     @click.option('--instance-name', default='testing')
    
    52
    -@click.option('--wait-for-completion', is_flag=True)
    
    59
    +@click.option('--wait-for-completion', is_flag=True, help='Stream updates until jobs are completed')
    
    53 60
     @pass_context
    
    54
    -def request(context, number, instance_name, wait_for_completion):
    
    61
    +def request_dummy(context, number, instance_name, wait_for_completion):
    
    55 62
         action_digest = remote_execution_pb2.Digest()
    
    56 63
     
    
    57 64
         context.logger.info("Sending execution request...\n")
    
    ... ... @@ -71,6 +78,7 @@ def request(context, number, instance_name, wait_for_completion):
    71 78
             else:
    
    72 79
                 context.logger.info(next(response))
    
    73 80
     
    
    81
    +
    
    74 82
     @cli.command('status', short_help='Get the status of an operation')
    
    75 83
     @click.argument('operation-name')
    
    76 84
     @pass_context
    
    ... ... @@ -83,6 +91,7 @@ def operation_status(context, operation_name):
    83 91
         response = stub.GetOperation(request)
    
    84 92
         context.logger.info(response)
    
    85 93
     
    
    94
    +
    
    86 95
     @cli.command('list', short_help='List operations')
    
    87 96
     @pass_context
    
    88 97
     def list_operations(context):
    
    ... ... @@ -100,6 +109,7 @@ def list_operations(context):
    100 109
         for op in response.operations:
    
    101 110
             context.logger.info(op)
    
    102 111
     
    
    112
    +
    
    103 113
     @cli.command('wait', short_help='Streams an operation until it is complete')
    
    104 114
     @click.argument('operation-name')
    
    105 115
     @pass_context
    
    ... ... @@ -111,3 +121,81 @@ def wait_execution(context, operation_name):
    111 121
     
    
    112 122
         for stream in response:
    
    113 123
             context.logger.info(stream)
    
    124
    +
    
    125
    +@cli.command('command', short_help='Send a command to be executed')
    
    126
    +@click.argument('input-root')
    
    127
    +@click.argument('commands', nargs=-1)
    
    128
    +@click.option('--output-file', nargs=2, type=(str, bool), multiple=True, help='{Expected output file, is_executeable flag}')
    
    129
    +@click.option('--output-directory', default='testing', help='Output directory for output files')
    
    130
    +@click.option('--instance-name', default='testing')
    
    131
    +@pass_context
    
    132
    +def command(context, input_root, commands, output_file, output_directory,  instance_name):
    
    133
    +    stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
    
    134
    +
    
    135
    +    command = remote_execution_pb2.Command()
    
    136
    +
    
    137
    +    for arg in commands:
    
    138
    +        command.arguments.extend([arg])
    
    139
    +
    
    140
    +    output_executeables = []
    
    141
    +    for file, is_executeable in output_file:
    
    142
    +        command.output_files.extend([file])
    
    143
    +        output_executeables.append(file)
    
    144
    +
    
    145
    +    command_digest = create_digest(command.SerializeToString())
    
    146
    +    context.logger.info(command_digest)
    
    147
    +
    
    148
    +    # TODO: Check for missing blobs
    
    149
    +    digest = None
    
    150
    +    for _, digest in merkle_maker(input_root):
    
    151
    +        pass
    
    152
    +
    
    153
    +    action = remote_execution_pb2.Action(command_digest=command_digest,
    
    154
    +                                         input_root_digest=digest,
    
    155
    +                                         do_not_cache=True)
    
    156
    +
    
    157
    +    action_digest = create_digest(action.SerializeToString())
    
    158
    +
    
    159
    +    context.logger.info("Sending execution request...\n")
    
    160
    +
    
    161
    +    requests = []
    
    162
    +    requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
    
    163
    +        digest=command_digest, data=command.SerializeToString()))
    
    164
    +
    
    165
    +    requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
    
    166
    +        digest=action_digest, data=action.SerializeToString()))
    
    167
    +
    
    168
    +    request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
    
    169
    +                                                           requests=requests)
    
    170
    +    remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel).BatchUpdateBlobs(request)
    
    171
    +
    
    172
    +    request = remote_execution_pb2.ExecuteRequest(instance_name = instance_name,
    
    173
    +                                                  action_digest = action_digest,
    
    174
    +                                                  skip_cache_lookup = True)
    
    175
    +    response = stub.Execute(request)
    
    176
    +
    
    177
    +    stub = bytestream_pb2_grpc.ByteStreamStub(context.channel)
    
    178
    +
    
    179
    +    stream = None
    
    180
    +    for stream in response:
    
    181
    +        context.logger.info(stream)
    
    182
    +    execute_response = remote_execution_pb2.ExecuteResponse()
    
    183
    +    stream.response.Unpack(execute_response)
    
    184
    +    for output_file in execute_response.result.output_files:
    
    185
    +        path = os.path.join(output_directory, output_file.path)
    
    186
    +
    
    187
    +        if not os.path.exists(os.path.dirname(path)):
    
    188
    +
    
    189
    +            try:
    
    190
    +                os.makedirs(os.path.dirname(path))
    
    191
    +
    
    192
    +            except OSError as exc:
    
    193
    +                if exc.errno != errno.EEXIST:
    
    194
    +                    raise
    
    195
    +
    
    196
    +        with open(path, 'wb+') as f:
    
    197
    +            write_fetch_blob(f, stub, output_file.digest, instance_name)
    
    198
    +
    
    199
    +        if output_file.path in output_executeables:
    
    200
    +            st = os.stat(path)
    
    201
    +            os.chmod(path, st.st_mode | stat.S_IXUSR)

  • buildgrid/bot/bot_session.py
    ... ... @@ -83,14 +83,15 @@ class BotSession:
    83 83
                 self._update_lease_from_server(lease)
    
    84 84
     
    
    85 85
         def update_bot_session(self):
    
    86
    +        self.logger.debug("Updating bot session: {}".format(self._bot_id))
    
    86 87
             session = self._interface.update_bot_session(self.get_pb2())
    
    87
    -        for lease in session.leases:
    
    88
    -            self._update_lease_from_server(lease)
    
    89
    -
    
    90
    -        for k, v in self._leases.items():
    
    88
    +        for k, v in list(self._leases.items()):
    
    91 89
                 if v.state == LeaseState.COMPLETED.value:
    
    92 90
                     del self._leases[k]
    
    93 91
     
    
    92
    +        for lease in session.leases:
    
    93
    +            self._update_lease_from_server(lease)
    
    94
    +
    
    94 95
         def get_pb2(self):
    
    95 96
             leases = list(self._leases.values())
    
    96 97
             if not leases:
    
    ... ... @@ -114,12 +115,16 @@ class BotSession:
    114 115
             lease_bot = self._leases.get(lease.id)
    
    115 116
             if lease.state == LeaseState.PENDING.value:
    
    116 117
                 lease.state = LeaseState.ACTIVE.value
    
    117
    -            asyncio.ensure_future(self.create_work(lease))
    
    118 118
                 self._leases[lease.id] = lease
    
    119
    +            self.update_bot_session()
    
    120
    +            asyncio.ensure_future(self.create_work(lease))
    
    119 121
     
    
    120 122
         async def create_work(self, lease):
    
    121 123
             self.logger.debug("Work created: {}".format(lease.id))
    
    122
    -        lease = await self._work(self._context, lease)
    
    124
    +
    
    125
    +        loop = asyncio.get_event_loop()
    
    126
    +        lease = await loop.run_in_executor(None, self._work, self._context, lease)
    
    127
    +
    
    123 128
             self.logger.debug("Work complete: {}".format(lease.id))
    
    124 129
             self.lease_completed(lease)
    
    125 130
     
    

  • buildgrid/server/job.py
    ... ... @@ -20,7 +20,7 @@ import uuid
    20 20
     
    
    21 21
     from enum import Enum
    
    22 22
     
    
    23
    -from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ExecuteOperationMetadata, ExecuteResponse
    
    23
    +from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ActionResult, ExecuteOperationMetadata, ExecuteResponse
    
    24 24
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2, worker_pb2
    
    25 25
     from buildgrid._protos.google.longrunning import operations_pb2
    
    26 26
     from google.protobuf import any_pb2
    
    ... ... @@ -52,13 +52,13 @@ class Job():
    52 52
         def __init__(self, action_digest, do_not_cache=False, message_queue=None):
    
    53 53
             self.lease = None
    
    54 54
             self.logger = logging.getLogger(__name__)
    
    55
    +        self.n_tries = 0
    
    55 56
             self.result = None
    
    56 57
             self.result_cached = False
    
    57 58
     
    
    58 59
             self._action_digest = action_digest
    
    59 60
             self._do_not_cache = do_not_cache
    
    60 61
             self._execute_stage = ExecuteStage.UNKNOWN
    
    61
    -        self._n_tries = 0
    
    62 62
             self._name = str(uuid.uuid4())
    
    63 63
             self._operation = operations_pb2.Operation(name = self._name)
    
    64 64
             self._operation_update_queues = []
    
    ... ... @@ -94,9 +94,10 @@ class Job():
    94 94
             self._operation.metadata.CopyFrom(self._pack_any(self.get_operation_meta()))
    
    95 95
             if self.result is not None:
    
    96 96
                 self._operation.done = True
    
    97
    -            response = ExecuteResponse()
    
    98
    -            self.result.Unpack(response.result)
    
    99
    -            response.cached_result = self.result_cached
    
    97
    +            action_result = ActionResult()
    
    98
    +            self.result.Unpack(action_result)
    
    99
    +            response = ExecuteResponse(result=action_result,
    
    100
    +                                       cached_result = self.result_cached)
    
    100 101
                 self._operation.response.CopyFrom(self._pack_any(response))
    
    101 102
     
    
    102 103
             return self._operation
    

  • buildgrid/server/scheduler.py
    ... ... @@ -31,7 +31,7 @@ from .job import ExecuteStage, LeaseState
    31 31
     
    
    32 32
     class Scheduler():
    
    33 33
     
    
    34
    -    MAX_N_TRIES = 5
    
    34
    +    MAX_N_TRIES = 100
    
    35 35
     
    
    36 36
         def __init__(self, action_cache=None):
    
    37 37
             self.action_cache = action_cache
    
    ... ... @@ -62,7 +62,8 @@ class Scheduler():
    62 62
             job.update_execute_stage(ExecuteStage.QUEUED)
    
    63 63
     
    
    64 64
         def retry_job(self, name):
    
    65
    -        if job in self.jobs[name]:
    
    65
    +        job = self.jobs.get(name)
    
    66
    +        if job is not None:
    
    66 67
                 if job.n_tries >= self.MAX_N_TRIES:
    
    67 68
                     # TODO: Decide what to do with these jobs
    
    68 69
                     job.update_execute_stage(ExecuteStage.COMPLETED)
    
    ... ... @@ -80,9 +81,7 @@ class Scheduler():
    80 81
             job.update_execute_stage(ExecuteStage.COMPLETED)
    
    81 82
             self.jobs[name] = job
    
    82 83
             if not job.do_not_cache and self.action_cache is not None:
    
    83
    -            action_result = ActionResult()
    
    84
    -            result.Unpack(action_result)
    
    85
    -            self.action_cache.put_action_result(job.action_digest, action_result)
    
    84
    +            self.action_cache.put_action_result(job.action_digest, result)
    
    86 85
     
    
    87 86
         def get_operations(self):
    
    88 87
             response = operations_pb2.ListOperationsResponse()
    
    ... ... @@ -91,7 +90,7 @@ class Scheduler():
    91 90
             return response
    
    92 91
     
    
    93 92
         def update_job_lease_state(self, name, state):
    
    94
    -        job = self.jobs.get(name)
    
    93
    +        job = self.jobs[name]
    
    95 94
             job.lease.state = state
    
    96 95
             self.jobs[name] = job
    
    97 96
     
    

  • buildgrid/utils.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +import os
    
    16
    +import pathlib
    
    17
    +
    
    18
    +from buildgrid.settings import HASH
    
    19
    +
    
    20
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    21
    +from buildgrid._protos.google.bytestream import bytestream_pb2
    
    22
    +
    
    23
    +def gen_fetch_blob(stub, digest, instance_name=""):
    
    24
    +    """ Generates byte stream from a fetch blob request
    
    25
    +    """
    
    26
    +
    
    27
    +    resource_name = os.path.join(instance_name,'blobs',digest.hash, str(digest.size_bytes))
    
    28
    +    request = bytestream_pb2.ReadRequest(resource_name=resource_name,
    
    29
    +                                         read_offset=0)
    
    30
    +    for response in stub.Read(request):
    
    31
    +        yield response.data
    
    32
    +
    
    33
    +def write_fetch_directory(dir, stub, digest, instance_name=""):
    
    34
    +    """ Given a directory digest, fetches files and writes them to a directory
    
    35
    +    """
    
    36
    +    ## TODO: Extend to symlinks and inner directories
    
    37
    +    ## pathlib.Path('/my/directory').mkdir(parents=True, exist_ok=True)
    
    38
    +
    
    39
    +    directory = remote_execution_pb2.Directory()
    
    40
    +    directory = parse_to_pb2_from_fetch(directory, stub, digest, instance_name)
    
    41
    +
    
    42
    +    for file_node in directory.files:
    
    43
    +        path = os.path.join(dir, file_node.name)
    
    44
    +        with open(path, 'wb') as f:
    
    45
    +            write_fetch_blob(f, stub, file_node.digest, instance_name)
    
    46
    +
    
    47
    +def write_fetch_blob(out, stub, digest, instance_name=""):
    
    48
    +    """ Given an output buffer, fetches blob and writes to buffer
    
    49
    +    """
    
    50
    +
    
    51
    +    for stream in gen_fetch_blob(stub, digest, instance_name):
    
    52
    +        out.write(stream)
    
    53
    +
    
    54
    +    out.flush()
    
    55
    +    assert digest.size_bytes == os.fstat(out.fileno()).st_size
    
    56
    +
    
    57
    +def parse_to_pb2_from_fetch(pb2, stub, digest, instance_name=""):
    
    58
    +    """ Fetches stream and parses it into given pb2
    
    59
    +    """
    
    60
    +
    
    61
    +    bytes = b''
    
    62
    +    for stream in gen_fetch_blob(stub, digest, instance_name):
    
    63
    +        bytes += stream
    
    64
    +
    
    65
    +    pb2.ParseFromString(bytes)
    
    66
    +    return pb2
    
    67
    +
    
    68
    +def create_digest(bytes):
    
    69
    +    """ Creates a hash based on the hex digest and returns the digest
    
    70
    +    """
    
    71
    +    hash = HASH(bytes)
    
    72
    +    return remote_execution_pb2.Digest(hash = hash.hexdigest(),
    
    73
    +                                       size_bytes=len(bytes))
    
    74
    +
    
    75
    +def merkle_maker(dir):
    
    76
    +    """ Walks thorugh given directory, yielding the binary and digest
    
    77
    +    """
    
    78
    +    directory = remote_execution_pb2.Directory()
    
    79
    +    for (dir_path, dir_names, file_names) in os.walk(dir):
    
    80
    +
    
    81
    +        for file_name in file_names:
    
    82
    +            file_path = os.path.join(dir_path, file_name)
    
    83
    +            chunk = read_file(file_path)
    
    84
    +            file_digest = create_digest(chunk)
    
    85
    +            directory.files.extend([file_maker(file_path, file_digest)])
    
    86
    +            yield chunk, file_digest
    
    87
    +
    
    88
    +        for inner_dir in dir_names:
    
    89
    +            inner_dir_path = os.path.join(dir_path, inner_dir_path)
    
    90
    +            yield from _merkle_maker(inner_dir_path)
    
    91
    +
    
    92
    +    directory_string = directory.SerializeToString()
    
    93
    +
    
    94
    +    yield directory_string, create_digest(directory_string)
    
    95
    +
    
    96
    +def file_maker(file_path, file_digest):
    
    97
    +    """ Creates a File Node
    
    98
    +    """
    
    99
    +    _, file_name = os.path.split(file_path)
    
    100
    +    return remote_execution_pb2.FileNode(name=file_name,
    
    101
    +                                         digest=file_digest,
    
    102
    +                                         is_executable=os.access(file_path, os.X_OK))
    
    103
    +
    
    104
    +def read_file(file):
    
    105
    +    with open(file, 'rb') as f:
    
    106
    +        return f.read()

  • tests/integration/operations_service.py
    ... ... @@ -91,12 +91,14 @@ def test_list_operations_with_result(instance, execute_request, context):
    91 91
         action_result = remote_execution_pb2.ActionResult()
    
    92 92
         output_file = remote_execution_pb2.OutputFile(path = 'unicorn')
    
    93 93
         action_result.output_files.extend([output_file])
    
    94
    -    instance._instance._scheduler.jobs[response_execute.name].result = _pack_any(action_result)
    
    94
    +
    
    95
    +    instance._instance._scheduler.job_complete(response_execute.name, _pack_any(action_result))
    
    95 96
     
    
    96 97
         request = operations_pb2.ListOperationsRequest()
    
    97 98
         response = instance.ListOperations(request, context)
    
    98 99
     
    
    99 100
         assert response.operations[0].name == response_execute.name
    
    101
    +
    
    100 102
         execute_response = remote_execution_pb2.ExecuteResponse()
    
    101 103
         response.operations[0].response.Unpack(execute_response)
    
    102 104
         assert execute_response.result == action_result
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]