[Notes] [Git][BuildGrid/buildgrid][finn/cas-commands] 3 commits: Bot work is now sent on an executor to stop them blocking the session.



Title: GitLab

finnball pushed to branch finn/cas-commands at BuildGrid / buildgrid

Commits:

6 changed files:

Changes:

  • README.rst
    ... ... @@ -38,7 +38,7 @@ In one terminal, start a server::
    38 38
     
    
    39 39
     In another terminal, send a request for work::
    
    40 40
     
    
    41
    -  bgd execute request
    
    41
    +  bgd execute request-dummy
    
    42 42
     
    
    43 43
     The stage should show as `QUEUED` as it awaits a bot to pick up the work::
    
    44 44
     
    
    ... ... @@ -51,3 +51,35 @@ Create a bot session::
    51 51
     Show the work as completed::
    
    52 52
     
    
    53 53
       bgd execute list
    
    54
    +
    
    55
    +Instructions for a Simple Build
    
    56
    +-------------------------------
    
    57
    +
    
    58
    +This example covers a simple build. The user will upload a directory containing a C file and a command to the CAS. The bot will then fetch the uploaded directory and command which will then be run inside a temporary directory. The result will then be uploaded to the CAS and downloaded by the user. This is an early demo and still lacks a few features such as symlink support and checking to see if files exist in the CAS before executing a command.
    
    59
    +
    
    60
    +Create a new directory called `test-buildgrid/` and place the following C file in it called `hello.c`::
    
    61
    +
    
    62
    +  #include <stdio.h>
    
    63
    +  int main()
    
    64
    +  {
    
    65
    +   printf("Hello, World!");
    
    66
    +   return 0;
    
    67
    +  }
    
    68
    +
    
    69
    +Now start a BuildGrid server, passing it a directory it can write a CAS to::
    
    70
    +
    
    71
    +  bgd server start --cas disk --cas-cache disk --cas-disk-directory /path/to/empty/directory
    
    72
    +
    
    73
    +Start the following bot session::
    
    74
    +
    
    75
    +  bgd bot temp-directory
    
    76
    +
    
    77
    +Upload the directory containing the C file::
    
    78
    +
    
    79
    +  bgd cas upload-dir /path/to/test-buildgrid
    
    80
    +
    
    81
    +Now we send an execution request to the bot with the name of the epxected `output-file`, a boolean describing if it is executeable, the path to the directory we uploaded in order to calculate the digest and finally the command to run on the bot::
    
    82
    +
    
    83
    +  bgd execute command --output-file hello True /path/to/test-buildgrid -- gcc -Wall hello.c -o hello
    
    84
    +
    
    85
    +The resulting executeable should have returned to a new directory called `testing/`

  • buildgrid/bot/bot_session.py
    ... ... @@ -104,14 +104,15 @@ class BotSession:
    104 104
                 self._update_lease_from_server(lease)
    
    105 105
     
    
    106 106
         def update_bot_session(self):
    
    107
    +        self.logger.debug("Updating bot session: {}".format(self._bot_id))
    
    107 108
             session = self._interface.update_bot_session(self.get_pb2())
    
    108
    -        for lease in session.leases:
    
    109
    -            self._update_lease_from_server(lease)
    
    110
    -
    
    111
    -        for k, v in self._leases.items():
    
    109
    +        for k, v in list(self._leases.items()):
    
    112 110
                 if v.state == LeaseState.COMPLETED.value:
    
    113 111
                     del self._leases[k]
    
    114 112
     
    
    113
    +        for lease in session.leases:
    
    114
    +            self._update_lease_from_server(lease)
    
    115
    +
    
    115 116
         def get_pb2(self):
    
    116 117
             leases = list(self._leases.values())
    
    117 118
             if not leases:
    
    ... ... @@ -134,12 +135,16 @@ class BotSession:
    134 135
             # TODO: Compare with previous state of lease
    
    135 136
             if lease.state == LeaseState.PENDING.value:
    
    136 137
                 lease.state = LeaseState.ACTIVE.value
    
    137
    -            asyncio.ensure_future(self.create_work(lease))
    
    138 138
                 self._leases[lease.id] = lease
    
    139
    +            self.update_bot_session()
    
    140
    +            asyncio.ensure_future(self.create_work(lease))
    
    139 141
     
    
    140 142
         async def create_work(self, lease):
    
    141 143
             self.logger.debug("Work created: {}".format(lease.id))
    
    142
    -        lease = await self._work(self._context, lease)
    
    144
    +
    
    145
    +        loop = asyncio.get_event_loop()
    
    146
    +        lease = await loop.run_in_executor(None, self._work, self._context, lease)
    
    147
    +
    
    143 148
             self.logger.debug("Work complete: {}".format(lease.id))
    
    144 149
             self.lease_completed(lease)
    
    145 150
     
    

  • buildgrid/server/job.py
    ... ... @@ -21,26 +21,25 @@ from enum import Enum
    21 21
     
    
    22 22
     from google.protobuf import any_pb2
    
    23 23
     
    
    24
    -from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ExecuteOperationMetadata
    
    25
    -from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ExecuteResponse
    
    24
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    26 25
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    27 26
     from buildgrid._protos.google.longrunning import operations_pb2
    
    28 27
     
    
    29 28
     
    
    30 29
     class ExecuteStage(Enum):
    
    31
    -    UNKNOWN = ExecuteOperationMetadata.Stage.Value('UNKNOWN')
    
    30
    +    UNKNOWN = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('UNKNOWN')
    
    32 31
     
    
    33 32
         # Checking the result against the cache.
    
    34
    -    CACHE_CHECK = ExecuteOperationMetadata.Stage.Value('CACHE_CHECK')
    
    33
    +    CACHE_CHECK = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('CACHE_CHECK')
    
    35 34
     
    
    36 35
         # Currently idle, awaiting a free machine to execute.
    
    37
    -    QUEUED = ExecuteOperationMetadata.Stage.Value('QUEUED')
    
    36
    +    QUEUED = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('QUEUED')
    
    38 37
     
    
    39 38
         # Currently being executed by a worker.
    
    40
    -    EXECUTING = ExecuteOperationMetadata.Stage.Value('EXECUTING')
    
    39
    +    EXECUTING = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('EXECUTING')
    
    41 40
     
    
    42 41
         # Finished execution.
    
    43
    -    COMPLETED = ExecuteOperationMetadata.Stage.Value('COMPLETED')
    
    42
    +    COMPLETED = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('COMPLETED')
    
    44 43
     
    
    45 44
     
    
    46 45
     class BotStatus(Enum):
    
    ... ... @@ -80,13 +79,13 @@ class Job:
    80 79
         def __init__(self, action_digest, do_not_cache=False, message_queue=None):
    
    81 80
             self.lease = None
    
    82 81
             self.logger = logging.getLogger(__name__)
    
    82
    +        self.n_tries = 0
    
    83 83
             self.result = None
    
    84 84
             self.result_cached = False
    
    85 85
     
    
    86 86
             self._action_digest = action_digest
    
    87 87
             self._do_not_cache = do_not_cache
    
    88 88
             self._execute_stage = ExecuteStage.UNKNOWN
    
    89
    -        self._n_tries = 0
    
    90 89
             self._name = str(uuid.uuid4())
    
    91 90
             self._operation = operations_pb2.Operation(name=self._name)
    
    92 91
             self._operation_update_queues = []
    
    ... ... @@ -122,15 +121,16 @@ class Job:
    122 121
             self._operation.metadata.CopyFrom(self._pack_any(self.get_operation_meta()))
    
    123 122
             if self.result is not None:
    
    124 123
                 self._operation.done = True
    
    125
    -            response = ExecuteResponse()
    
    126
    -            self.result.Unpack(response.result)
    
    127
    -            response.cached_result = self.result_cached
    
    124
    +            action_result = remote_execution_pb2.ActionResult()
    
    125
    +            self.result.Unpack(action_result)
    
    126
    +            response = remote_execution_pb2.ExecuteResponse(result=action_result,
    
    127
    +                                                            cached_result=self.result_cached)
    
    128 128
                 self._operation.response.CopyFrom(self._pack_any(response))
    
    129 129
     
    
    130 130
             return self._operation
    
    131 131
     
    
    132 132
         def get_operation_meta(self):
    
    133
    -        meta = ExecuteOperationMetadata()
    
    133
    +        meta = remote_execution_pb2.ExecuteOperationMetadata()
    
    134 134
             meta.stage = self._execute_stage.value
    
    135 135
             meta.action_digest.CopyFrom(self._action_digest)
    
    136 136
     
    

  • buildgrid/server/scheduler.py
    ... ... @@ -25,7 +25,6 @@ from collections import deque
    25 25
     
    
    26 26
     from google.protobuf import any_pb2
    
    27 27
     
    
    28
    -from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ActionResult
    
    29 28
     from buildgrid._protos.google.longrunning import operations_pb2
    
    30 29
     
    
    31 30
     from .job import ExecuteStage, LeaseState
    
    ... ... @@ -83,9 +82,7 @@ class Scheduler:
    83 82
             job.update_execute_stage(ExecuteStage.COMPLETED)
    
    84 83
             self.jobs[name] = job
    
    85 84
             if not job.do_not_cache and self.action_cache is not None:
    
    86
    -            action_result = ActionResult()
    
    87
    -            result.Unpack(action_result)
    
    88
    -            self.action_cache.put_action_result(job.action_digest, action_result)
    
    85
    +            self.action_cache.put_action_result(job.action_digest, result)
    
    89 86
     
    
    90 87
         def get_operations(self):
    
    91 88
             response = operations_pb2.ListOperationsResponse()
    
    ... ... @@ -94,7 +91,7 @@ class Scheduler:
    94 91
             return response
    
    95 92
     
    
    96 93
         def update_job_lease_state(self, name, state):
    
    97
    -        job = self.jobs.get(name)
    
    94
    +        job = self.jobs[name]
    
    98 95
             job.lease.state = state
    
    99 96
             self.jobs[name] = job
    
    100 97
     
    

  • buildgrid/utils.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +import os
    
    16
    +
    
    17
    +from buildgrid.settings import HASH
    
    18
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    19
    +from buildgrid._protos.google.bytestream import bytestream_pb2
    
    20
    +
    
    21
    +
    
    22
    +def gen_fetch_blob(stub, digest, instance_name=""):
    
    23
    +    """ Generates byte stream from a fetch blob request
    
    24
    +    """
    
    25
    +
    
    26
    +    resource_name = os.path.join(instance_name, 'blobs', digest.hash, str(digest.size_bytes))
    
    27
    +    request = bytestream_pb2.ReadRequest(resource_name=resource_name,
    
    28
    +                                         read_offset=0)
    
    29
    +    for response in stub.Read(request):
    
    30
    +        yield response.data
    
    31
    +
    
    32
    +
    
    33
    +def write_fetch_directory(directory, stub, digest, instance_name=""):
    
    34
    +    """ Given a directory digest, fetches files and writes them to a directory
    
    35
    +    """
    
    36
    +    # TODO: Extend to symlinks and inner directories
    
    37
    +    # pathlib.Path('/my/directory').mkdir(parents=True, exist_ok=True)
    
    38
    +
    
    39
    +    directory_pb2 = remote_execution_pb2.Directory()
    
    40
    +    directory_pb2 = parse_to_pb2_from_fetch(directory_pb2, stub, digest, instance_name)
    
    41
    +
    
    42
    +    for file_node in directory_pb2.files:
    
    43
    +        path = os.path.join(directory, file_node.name)
    
    44
    +        with open(path, 'wb') as f:
    
    45
    +            write_fetch_blob(f, stub, file_node.digest, instance_name)
    
    46
    +
    
    47
    +
    
    48
    +def write_fetch_blob(out, stub, digest, instance_name=""):
    
    49
    +    """ Given an output buffer, fetches blob and writes to buffer
    
    50
    +    """
    
    51
    +
    
    52
    +    for stream in gen_fetch_blob(stub, digest, instance_name):
    
    53
    +        out.write(stream)
    
    54
    +
    
    55
    +    out.flush()
    
    56
    +    assert digest.size_bytes == os.fstat(out.fileno()).st_size
    
    57
    +
    
    58
    +
    
    59
    +def parse_to_pb2_from_fetch(pb2, stub, digest, instance_name=""):
    
    60
    +    """ Fetches stream and parses it into given pb2
    
    61
    +    """
    
    62
    +
    
    63
    +    stream_bytes = b''
    
    64
    +    for stream in gen_fetch_blob(stub, digest, instance_name):
    
    65
    +        stream_bytes += stream
    
    66
    +
    
    67
    +    pb2.ParseFromString(stream_bytes)
    
    68
    +    return pb2
    
    69
    +
    
    70
    +
    
    71
    +def create_digest(bytes_to_digest):
    
    72
    +    """ Creates a hash based on the hex digest and returns the digest
    
    73
    +    """
    
    74
    +    return remote_execution_pb2.Digest(hash=HASH(bytes_to_digest).hexdigest(),
    
    75
    +                                       size_bytes=len(bytes_to_digest))
    
    76
    +
    
    77
    +
    
    78
    +def merkle_maker(directory):
    
    79
    +    """ Walks thorugh given directory, yielding the binary and digest
    
    80
    +    """
    
    81
    +    directory_pb2 = remote_execution_pb2.Directory()
    
    82
    +    for (dir_path, dir_names, file_names) in os.walk(directory):
    
    83
    +
    
    84
    +        for file_name in file_names:
    
    85
    +            file_path = os.path.join(dir_path, file_name)
    
    86
    +            chunk = read_file(file_path)
    
    87
    +            file_digest = create_digest(chunk)
    
    88
    +            directory_pb2.files.extend([file_maker(file_path, file_digest)])
    
    89
    +            yield chunk, file_digest
    
    90
    +
    
    91
    +        for inner_dir in dir_names:
    
    92
    +            inner_dir_path = os.path.join(dir_path, inner_dir)
    
    93
    +            yield from merkle_maker(inner_dir_path)
    
    94
    +
    
    95
    +    directory_string = directory_pb2.SerializeToString()
    
    96
    +
    
    97
    +    yield directory_string, create_digest(directory_string)
    
    98
    +
    
    99
    +
    
    100
    +def file_maker(file_path, file_digest):
    
    101
    +    """ Creates a File Node
    
    102
    +    """
    
    103
    +    _, file_name = os.path.split(file_path)
    
    104
    +    return remote_execution_pb2.FileNode(name=file_name,
    
    105
    +                                         digest=file_digest,
    
    106
    +                                         is_executable=os.access(file_path, os.X_OK))
    
    107
    +
    
    108
    +
    
    109
    +def read_file(file):
    
    110
    +    with open(file, 'rb') as f:
    
    111
    +        return f.read()

  • tests/integration/operations_service.py
    ... ... @@ -100,12 +100,14 @@ def test_list_operations_with_result(instance, execute_request, context):
    100 100
         action_result = remote_execution_pb2.ActionResult()
    
    101 101
         output_file = remote_execution_pb2.OutputFile(path='unicorn')
    
    102 102
         action_result.output_files.extend([output_file])
    
    103
    -    instance._instance._scheduler.jobs[response_execute.name].result = _pack_any(action_result)
    
    103
    +
    
    104
    +    instance._instance._scheduler.job_complete(response_execute.name, _pack_any(action_result))
    
    104 105
     
    
    105 106
         request = operations_pb2.ListOperationsRequest()
    
    106 107
         response = instance.ListOperations(request, context)
    
    107 108
     
    
    108 109
         assert response.operations[0].name == response_execute.name
    
    110
    +
    
    109 111
         execute_response = remote_execution_pb2.ExecuteResponse()
    
    110 112
         response.operations[0].response.Unpack(execute_response)
    
    111 113
         assert execute_response.result == action_result
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]