finnball pushed to branch finn/cas-commands at BuildGrid / buildgrid
Commits:
-
e0d4b9da
by finn at 2018-08-15T09:55:10Z
-
45817c86
by finn at 2018-08-15T09:55:14Z
-
8d0b8159
by finn at 2018-08-15T09:55:14Z
6 changed files:
- README.rst
- buildgrid/bot/bot_session.py
- buildgrid/server/job.py
- buildgrid/server/scheduler.py
- + buildgrid/utils.py
- tests/integration/operations_service.py
Changes:
... | ... | @@ -38,7 +38,7 @@ In one terminal, start a server:: |
38 | 38 |
|
39 | 39 |
In another terminal, send a request for work::
|
40 | 40 |
|
41 |
- bgd execute request
|
|
41 |
+ bgd execute request-dummy
|
|
42 | 42 |
|
43 | 43 |
The stage should show as `QUEUED` as it awaits a bot to pick up the work::
|
44 | 44 |
|
... | ... | @@ -51,3 +51,35 @@ Create a bot session:: |
51 | 51 |
Show the work as completed::
|
52 | 52 |
|
53 | 53 |
bgd execute list
|
54 |
+ |
|
55 |
+Instructions for a Simple Build
|
|
56 |
+-------------------------------
|
|
57 |
+ |
|
58 |
+This example covers a simple build. The user will upload a directory containing a C file and a command to the CAS. The bot will then fetch the uploaded directory and command which will then be run inside a temporary directory. The result will then be uploaded to the CAS and downloaded by the user. This is an early demo and still lacks a few features such as symlink support and checking to see if files exist in the CAS before executing a command.
|
|
59 |
+ |
|
60 |
+Create a new directory called `test-buildgrid/` and place the following C file in it called `hello.c`::
|
|
61 |
+ |
|
62 |
+ #include <stdio.h>
|
|
63 |
+ int main()
|
|
64 |
+ {
|
|
65 |
+ printf("Hello, World!");
|
|
66 |
+ return 0;
|
|
67 |
+ }
|
|
68 |
+ |
|
69 |
+Now start a BuildGrid server, passing it a directory it can write a CAS to::
|
|
70 |
+ |
|
71 |
+ bgd server start --cas disk --cas-cache disk --cas-disk-directory /path/to/empty/directory
|
|
72 |
+ |
|
73 |
+Start the following bot session::
|
|
74 |
+ |
|
75 |
+ bgd bot temp-directory
|
|
76 |
+ |
|
77 |
+Upload the directory containing the C file::
|
|
78 |
+ |
|
79 |
+ bgd cas upload-dir /path/to/test-buildgrid
|
|
80 |
+ |
|
81 |
+Now we send an execution request to the bot with the name of the epxected `output-file`, a boolean describing if it is executeable, the path to the directory we uploaded in order to calculate the digest and finally the command to run on the bot::
|
|
82 |
+ |
|
83 |
+ bgd execute command --output-file hello True /path/to/test-buildgrid -- gcc -Wall hello.c -o hello
|
|
84 |
+ |
|
85 |
+The resulting executeable should have returned to a new directory called `testing/`
|
... | ... | @@ -104,14 +104,15 @@ class BotSession: |
104 | 104 |
self._update_lease_from_server(lease)
|
105 | 105 |
|
106 | 106 |
def update_bot_session(self):
|
107 |
+ self.logger.debug("Updating bot session: {}".format(self._bot_id))
|
|
107 | 108 |
session = self._interface.update_bot_session(self.get_pb2())
|
108 |
- for lease in session.leases:
|
|
109 |
- self._update_lease_from_server(lease)
|
|
110 |
- |
|
111 |
- for k, v in self._leases.items():
|
|
109 |
+ for k, v in list(self._leases.items()):
|
|
112 | 110 |
if v.state == LeaseState.COMPLETED.value:
|
113 | 111 |
del self._leases[k]
|
114 | 112 |
|
113 |
+ for lease in session.leases:
|
|
114 |
+ self._update_lease_from_server(lease)
|
|
115 |
+ |
|
115 | 116 |
def get_pb2(self):
|
116 | 117 |
leases = list(self._leases.values())
|
117 | 118 |
if not leases:
|
... | ... | @@ -134,12 +135,16 @@ class BotSession: |
134 | 135 |
# TODO: Compare with previous state of lease
|
135 | 136 |
if lease.state == LeaseState.PENDING.value:
|
136 | 137 |
lease.state = LeaseState.ACTIVE.value
|
137 |
- asyncio.ensure_future(self.create_work(lease))
|
|
138 | 138 |
self._leases[lease.id] = lease
|
139 |
+ self.update_bot_session()
|
|
140 |
+ asyncio.ensure_future(self.create_work(lease))
|
|
139 | 141 |
|
140 | 142 |
async def create_work(self, lease):
|
141 | 143 |
self.logger.debug("Work created: {}".format(lease.id))
|
142 |
- lease = await self._work(self._context, lease)
|
|
144 |
+ |
|
145 |
+ loop = asyncio.get_event_loop()
|
|
146 |
+ lease = await loop.run_in_executor(None, self._work, self._context, lease)
|
|
147 |
+ |
|
143 | 148 |
self.logger.debug("Work complete: {}".format(lease.id))
|
144 | 149 |
self.lease_completed(lease)
|
145 | 150 |
|
... | ... | @@ -21,26 +21,25 @@ from enum import Enum |
21 | 21 |
|
22 | 22 |
from google.protobuf import any_pb2
|
23 | 23 |
|
24 |
-from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ExecuteOperationMetadata
|
|
25 |
-from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ExecuteResponse
|
|
24 |
+from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
|
26 | 25 |
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
|
27 | 26 |
from buildgrid._protos.google.longrunning import operations_pb2
|
28 | 27 |
|
29 | 28 |
|
30 | 29 |
class ExecuteStage(Enum):
|
31 |
- UNKNOWN = ExecuteOperationMetadata.Stage.Value('UNKNOWN')
|
|
30 |
+ UNKNOWN = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('UNKNOWN')
|
|
32 | 31 |
|
33 | 32 |
# Checking the result against the cache.
|
34 |
- CACHE_CHECK = ExecuteOperationMetadata.Stage.Value('CACHE_CHECK')
|
|
33 |
+ CACHE_CHECK = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('CACHE_CHECK')
|
|
35 | 34 |
|
36 | 35 |
# Currently idle, awaiting a free machine to execute.
|
37 |
- QUEUED = ExecuteOperationMetadata.Stage.Value('QUEUED')
|
|
36 |
+ QUEUED = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('QUEUED')
|
|
38 | 37 |
|
39 | 38 |
# Currently being executed by a worker.
|
40 |
- EXECUTING = ExecuteOperationMetadata.Stage.Value('EXECUTING')
|
|
39 |
+ EXECUTING = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('EXECUTING')
|
|
41 | 40 |
|
42 | 41 |
# Finished execution.
|
43 |
- COMPLETED = ExecuteOperationMetadata.Stage.Value('COMPLETED')
|
|
42 |
+ COMPLETED = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('COMPLETED')
|
|
44 | 43 |
|
45 | 44 |
|
46 | 45 |
class BotStatus(Enum):
|
... | ... | @@ -80,13 +79,13 @@ class Job: |
80 | 79 |
def __init__(self, action_digest, do_not_cache=False, message_queue=None):
|
81 | 80 |
self.lease = None
|
82 | 81 |
self.logger = logging.getLogger(__name__)
|
82 |
+ self.n_tries = 0
|
|
83 | 83 |
self.result = None
|
84 | 84 |
self.result_cached = False
|
85 | 85 |
|
86 | 86 |
self._action_digest = action_digest
|
87 | 87 |
self._do_not_cache = do_not_cache
|
88 | 88 |
self._execute_stage = ExecuteStage.UNKNOWN
|
89 |
- self._n_tries = 0
|
|
90 | 89 |
self._name = str(uuid.uuid4())
|
91 | 90 |
self._operation = operations_pb2.Operation(name=self._name)
|
92 | 91 |
self._operation_update_queues = []
|
... | ... | @@ -122,15 +121,16 @@ class Job: |
122 | 121 |
self._operation.metadata.CopyFrom(self._pack_any(self.get_operation_meta()))
|
123 | 122 |
if self.result is not None:
|
124 | 123 |
self._operation.done = True
|
125 |
- response = ExecuteResponse()
|
|
126 |
- self.result.Unpack(response.result)
|
|
127 |
- response.cached_result = self.result_cached
|
|
124 |
+ action_result = remote_execution_pb2.ActionResult()
|
|
125 |
+ self.result.Unpack(action_result)
|
|
126 |
+ response = remote_execution_pb2.ExecuteResponse(result=action_result,
|
|
127 |
+ cached_result=self.result_cached)
|
|
128 | 128 |
self._operation.response.CopyFrom(self._pack_any(response))
|
129 | 129 |
|
130 | 130 |
return self._operation
|
131 | 131 |
|
132 | 132 |
def get_operation_meta(self):
|
133 |
- meta = ExecuteOperationMetadata()
|
|
133 |
+ meta = remote_execution_pb2.ExecuteOperationMetadata()
|
|
134 | 134 |
meta.stage = self._execute_stage.value
|
135 | 135 |
meta.action_digest.CopyFrom(self._action_digest)
|
136 | 136 |
|
... | ... | @@ -25,7 +25,6 @@ from collections import deque |
25 | 25 |
|
26 | 26 |
from google.protobuf import any_pb2
|
27 | 27 |
|
28 |
-from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ActionResult
|
|
29 | 28 |
from buildgrid._protos.google.longrunning import operations_pb2
|
30 | 29 |
|
31 | 30 |
from .job import ExecuteStage, LeaseState
|
... | ... | @@ -83,9 +82,7 @@ class Scheduler: |
83 | 82 |
job.update_execute_stage(ExecuteStage.COMPLETED)
|
84 | 83 |
self.jobs[name] = job
|
85 | 84 |
if not job.do_not_cache and self.action_cache is not None:
|
86 |
- action_result = ActionResult()
|
|
87 |
- result.Unpack(action_result)
|
|
88 |
- self.action_cache.put_action_result(job.action_digest, action_result)
|
|
85 |
+ self.action_cache.put_action_result(job.action_digest, result)
|
|
89 | 86 |
|
90 | 87 |
def get_operations(self):
|
91 | 88 |
response = operations_pb2.ListOperationsResponse()
|
... | ... | @@ -94,7 +91,7 @@ class Scheduler: |
94 | 91 |
return response
|
95 | 92 |
|
96 | 93 |
def update_job_lease_state(self, name, state):
|
97 |
- job = self.jobs.get(name)
|
|
94 |
+ job = self.jobs[name]
|
|
98 | 95 |
job.lease.state = state
|
99 | 96 |
self.jobs[name] = job
|
100 | 97 |
|
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+import os
|
|
16 |
+ |
|
17 |
+from buildgrid.settings import HASH
|
|
18 |
+from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
|
19 |
+from buildgrid._protos.google.bytestream import bytestream_pb2
|
|
20 |
+ |
|
21 |
+ |
|
22 |
+def gen_fetch_blob(stub, digest, instance_name=""):
|
|
23 |
+ """ Generates byte stream from a fetch blob request
|
|
24 |
+ """
|
|
25 |
+ |
|
26 |
+ resource_name = os.path.join(instance_name, 'blobs', digest.hash, str(digest.size_bytes))
|
|
27 |
+ request = bytestream_pb2.ReadRequest(resource_name=resource_name,
|
|
28 |
+ read_offset=0)
|
|
29 |
+ for response in stub.Read(request):
|
|
30 |
+ yield response.data
|
|
31 |
+ |
|
32 |
+ |
|
33 |
+def write_fetch_directory(directory, stub, digest, instance_name=""):
|
|
34 |
+ """ Given a directory digest, fetches files and writes them to a directory
|
|
35 |
+ """
|
|
36 |
+ # TODO: Extend to symlinks and inner directories
|
|
37 |
+ # pathlib.Path('/my/directory').mkdir(parents=True, exist_ok=True)
|
|
38 |
+ |
|
39 |
+ directory_pb2 = remote_execution_pb2.Directory()
|
|
40 |
+ directory_pb2 = parse_to_pb2_from_fetch(directory_pb2, stub, digest, instance_name)
|
|
41 |
+ |
|
42 |
+ for file_node in directory_pb2.files:
|
|
43 |
+ path = os.path.join(directory, file_node.name)
|
|
44 |
+ with open(path, 'wb') as f:
|
|
45 |
+ write_fetch_blob(f, stub, file_node.digest, instance_name)
|
|
46 |
+ |
|
47 |
+ |
|
48 |
+def write_fetch_blob(out, stub, digest, instance_name=""):
|
|
49 |
+ """ Given an output buffer, fetches blob and writes to buffer
|
|
50 |
+ """
|
|
51 |
+ |
|
52 |
+ for stream in gen_fetch_blob(stub, digest, instance_name):
|
|
53 |
+ out.write(stream)
|
|
54 |
+ |
|
55 |
+ out.flush()
|
|
56 |
+ assert digest.size_bytes == os.fstat(out.fileno()).st_size
|
|
57 |
+ |
|
58 |
+ |
|
59 |
+def parse_to_pb2_from_fetch(pb2, stub, digest, instance_name=""):
|
|
60 |
+ """ Fetches stream and parses it into given pb2
|
|
61 |
+ """
|
|
62 |
+ |
|
63 |
+ stream_bytes = b''
|
|
64 |
+ for stream in gen_fetch_blob(stub, digest, instance_name):
|
|
65 |
+ stream_bytes += stream
|
|
66 |
+ |
|
67 |
+ pb2.ParseFromString(stream_bytes)
|
|
68 |
+ return pb2
|
|
69 |
+ |
|
70 |
+ |
|
71 |
+def create_digest(bytes_to_digest):
|
|
72 |
+ """ Creates a hash based on the hex digest and returns the digest
|
|
73 |
+ """
|
|
74 |
+ return remote_execution_pb2.Digest(hash=HASH(bytes_to_digest).hexdigest(),
|
|
75 |
+ size_bytes=len(bytes_to_digest))
|
|
76 |
+ |
|
77 |
+ |
|
78 |
+def merkle_maker(directory):
|
|
79 |
+ """ Walks thorugh given directory, yielding the binary and digest
|
|
80 |
+ """
|
|
81 |
+ directory_pb2 = remote_execution_pb2.Directory()
|
|
82 |
+ for (dir_path, dir_names, file_names) in os.walk(directory):
|
|
83 |
+ |
|
84 |
+ for file_name in file_names:
|
|
85 |
+ file_path = os.path.join(dir_path, file_name)
|
|
86 |
+ chunk = read_file(file_path)
|
|
87 |
+ file_digest = create_digest(chunk)
|
|
88 |
+ directory_pb2.files.extend([file_maker(file_path, file_digest)])
|
|
89 |
+ yield chunk, file_digest
|
|
90 |
+ |
|
91 |
+ for inner_dir in dir_names:
|
|
92 |
+ inner_dir_path = os.path.join(dir_path, inner_dir)
|
|
93 |
+ yield from merkle_maker(inner_dir_path)
|
|
94 |
+ |
|
95 |
+ directory_string = directory_pb2.SerializeToString()
|
|
96 |
+ |
|
97 |
+ yield directory_string, create_digest(directory_string)
|
|
98 |
+ |
|
99 |
+ |
|
100 |
+def file_maker(file_path, file_digest):
|
|
101 |
+ """ Creates a File Node
|
|
102 |
+ """
|
|
103 |
+ _, file_name = os.path.split(file_path)
|
|
104 |
+ return remote_execution_pb2.FileNode(name=file_name,
|
|
105 |
+ digest=file_digest,
|
|
106 |
+ is_executable=os.access(file_path, os.X_OK))
|
|
107 |
+ |
|
108 |
+ |
|
109 |
+def read_file(file):
|
|
110 |
+ with open(file, 'rb') as f:
|
|
111 |
+ return f.read()
|
... | ... | @@ -100,12 +100,14 @@ def test_list_operations_with_result(instance, execute_request, context): |
100 | 100 |
action_result = remote_execution_pb2.ActionResult()
|
101 | 101 |
output_file = remote_execution_pb2.OutputFile(path='unicorn')
|
102 | 102 |
action_result.output_files.extend([output_file])
|
103 |
- instance._instance._scheduler.jobs[response_execute.name].result = _pack_any(action_result)
|
|
103 |
+ |
|
104 |
+ instance._instance._scheduler.job_complete(response_execute.name, _pack_any(action_result))
|
|
104 | 105 |
|
105 | 106 |
request = operations_pb2.ListOperationsRequest()
|
106 | 107 |
response = instance.ListOperations(request, context)
|
107 | 108 |
|
108 | 109 |
assert response.operations[0].name == response_execute.name
|
110 |
+ |
|
109 | 111 |
execute_response = remote_execution_pb2.ExecuteResponse()
|
110 | 112 |
response.operations[0].response.Unpack(execute_response)
|
111 | 113 |
assert execute_response.result == action_result
|