Martin Blanchard pushed to branch mablanch/61-bazel-support at BuildGrid / buildgrid
Commits:
-
1763ad72
by Martin Blanchard at 2018-08-22T11:52:07Z
-
00a6f7f3
by Martin Blanchard at 2018-08-22T11:52:08Z
-
09f833b8
by Martin Blanchard at 2018-08-22T11:52:08Z
-
530dc02a
by Martin Blanchard at 2018-08-22T11:52:08Z
4 changed files:
- buildgrid/_app/bots/temp_directory.py
- − buildgrid/server/action_cache.py
- buildgrid/server/scheduler.py
- buildgrid/utils.py
Changes:
... | ... | @@ -19,7 +19,7 @@ import tempfile |
19 | 19 |
|
20 | 20 |
from google.protobuf import any_pb2
|
21 | 21 |
|
22 |
-from buildgrid.utils import read_file, create_digest, write_fetch_directory, parse_to_pb2_from_fetch
|
|
22 |
+from buildgrid.utils import output_file_maker, write_fetch_directory, parse_to_pb2_from_fetch
|
|
23 | 23 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
24 | 24 |
from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
|
25 | 25 |
|
... | ... | @@ -35,54 +35,68 @@ def work_temp_directory(context, lease): |
35 | 35 |
action_digest = remote_execution_pb2.Digest()
|
36 | 36 |
lease.payload.Unpack(action_digest)
|
37 | 37 |
|
38 |
- action = remote_execution_pb2.Action()
|
|
38 |
+ action = parse_to_pb2_from_fetch(remote_execution_pb2.Action(),
|
|
39 |
+ stub_bytestream, action_digest, instance_name)
|
|
39 | 40 |
|
40 |
- action = parse_to_pb2_from_fetch(action, stub_bytestream, action_digest, instance_name)
|
|
41 |
+ with tempfile.TemporaryDirectory() as temp_directory:
|
|
41 | 42 |
|
42 |
- with tempfile.TemporaryDirectory() as temp_dir:
|
|
43 |
+ command = parse_to_pb2_from_fetch(remote_execution_pb2.Command(),
|
|
44 |
+ stub_bytestream, action.command_digest, instance_name)
|
|
43 | 45 |
|
44 |
- command = remote_execution_pb2.Command()
|
|
45 |
- command = parse_to_pb2_from_fetch(command, stub_bytestream, action.command_digest, instance_name)
|
|
46 |
+ write_fetch_directory(temp_directory, stub_bytestream,
|
|
47 |
+ action.input_root_digest, instance_name)
|
|
46 | 48 |
|
47 |
- arguments = "cd {} &&".format(temp_dir)
|
|
49 |
+ execution_envionment = os.environ.copy()
|
|
50 |
+ for variable in command.environment_variables:
|
|
51 |
+ if variable.name not in ['PATH', 'PWD']:
|
|
52 |
+ execution_envionment[variable.name] = variable.value
|
|
48 | 53 |
|
54 |
+ command_arguments = list()
|
|
49 | 55 |
for argument in command.arguments:
|
50 |
- arguments += " {}".format(argument)
|
|
51 |
- |
|
52 |
- context.logger.info(arguments)
|
|
53 |
- |
|
54 |
- write_fetch_directory(temp_dir, stub_bytestream, action.input_root_digest, instance_name)
|
|
55 |
- |
|
56 |
- proc = subprocess.Popen(arguments,
|
|
57 |
- shell=True,
|
|
58 |
- stdin=subprocess.PIPE,
|
|
59 |
- stdout=subprocess.PIPE)
|
|
60 |
- |
|
61 |
- # TODO: Should return the std_out to the user
|
|
62 |
- proc.communicate()
|
|
63 |
- |
|
64 |
- result = remote_execution_pb2.ActionResult()
|
|
65 |
- requests = []
|
|
66 |
- for output_file in command.output_files:
|
|
67 |
- path = os.path.join(temp_dir, output_file)
|
|
68 |
- chunk = read_file(path)
|
|
69 |
- |
|
70 |
- digest = create_digest(chunk)
|
|
71 |
- |
|
72 |
- result.output_files.extend([remote_execution_pb2.OutputFile(path=output_file,
|
|
73 |
- digest=digest)])
|
|
74 |
- |
|
75 |
- requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
|
|
76 |
- digest=digest, data=chunk))
|
|
77 |
- |
|
78 |
- request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
|
|
79 |
- requests=requests)
|
|
56 |
+ command_arguments.append(argument.strip())
|
|
57 |
+ |
|
58 |
+ working_directory = None
|
|
59 |
+ if command.working_directory:
|
|
60 |
+ working_directory = os.path.join(temp_directory,
|
|
61 |
+ command.working_directory)
|
|
62 |
+ os.makedirs(working_directory, exist_ok=True)
|
|
63 |
+ else:
|
|
64 |
+ working_directory = temp_directory
|
|
65 |
+ |
|
66 |
+ # Ensure that output files structure exists:
|
|
67 |
+ for output_path in command.output_files:
|
|
68 |
+ directory_path = os.path.join(working_directory,
|
|
69 |
+ os.path.dirname(output_path))
|
|
70 |
+ os.makedirs(directory_path, exist_ok=True)
|
|
71 |
+ |
|
72 |
+ process = subprocess.Popen(command_arguments,
|
|
73 |
+ cwd=working_directory,
|
|
74 |
+ universal_newlines=True,
|
|
75 |
+ env=execution_envionment,
|
|
76 |
+ stdin=subprocess.PIPE,
|
|
77 |
+ stdout=subprocess.PIPE)
|
|
78 |
+ # TODO: Should return the stdout and stderr to the user.
|
|
79 |
+ process.communicate()
|
|
80 |
+ |
|
81 |
+ update_requests = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name)
|
|
82 |
+ action_result = remote_execution_pb2.ActionResult()
|
|
83 |
+ |
|
84 |
+ for output_path in command.output_files:
|
|
85 |
+ file_path = os.path.join(working_directory, output_path)
|
|
86 |
+ # Missing outputs should simply be omitted in ActionResult:
|
|
87 |
+ if not os.path.isfile(file_path):
|
|
88 |
+ continue
|
|
89 |
+ |
|
90 |
+ output_file, update_request = output_file_maker(file_path, working_directory)
|
|
91 |
+ |
|
92 |
+ action_result.output_files.extend([output_file])
|
|
93 |
+ update_requests.requests.extend([update_request])
|
|
80 | 94 |
|
81 | 95 |
stub_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel)
|
82 |
- stub_cas.BatchUpdateBlobs(request)
|
|
96 |
+ stub_cas.BatchUpdateBlobs(update_requests)
|
|
83 | 97 |
|
84 | 98 |
result_any = any_pb2.Any()
|
85 |
- result_any.Pack(result)
|
|
99 |
+ result_any.Pack(action_result)
|
|
86 | 100 |
|
87 | 101 |
lease.result.CopyFrom(result_any)
|
88 | 102 |
|
1 |
-# Copyright (C) 2018 Bloomberg LP
|
|
2 |
-#
|
|
3 |
-# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
-# you may not use this file except in compliance with the License.
|
|
5 |
-# You may obtain a copy of the License at
|
|
6 |
-#
|
|
7 |
-# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
-#
|
|
9 |
-# Unless required by applicable law or agreed to in writing, software
|
|
10 |
-# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
-# See the License for the specific language governing permissions and
|
|
13 |
-# limitations under the License.
|
|
14 |
- |
|
15 |
- |
|
16 |
-"""
|
|
17 |
-ActionCache
|
|
18 |
-===========
|
|
19 |
- |
|
20 |
-Implements a simple in-memory action cache.
|
|
21 |
- |
|
22 |
-The action cache maps Action to their corresponding ActionResult. An
|
|
23 |
-ActionResult may be found in cache, for any given Action, if that action has
|
|
24 |
-already been executed.
|
|
25 |
- |
|
26 |
-Note:
|
|
27 |
- Action and ActionResult are referenced by their Digest and mapping is stored
|
|
28 |
- in-memory.
|
|
29 |
-"""
|
|
30 |
- |
|
31 |
-import collections
|
|
32 |
- |
|
33 |
-from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
|
|
34 |
- |
|
35 |
- |
|
36 |
-class ActionCache:
|
|
37 |
- """In-memory Action to ActionResult associative array.
|
|
38 |
- """
|
|
39 |
- |
|
40 |
- def __init__(self, storage, max_cached_actions):
|
|
41 |
- """Initialises a new ActionCache instance.
|
|
42 |
- |
|
43 |
- Args:
|
|
44 |
- storage (StorageABC): storage backend instance to be used.
|
|
45 |
- max_cached_actions (int): maximun number of entries to cache.
|
|
46 |
- """
|
|
47 |
- self._storage = storage
|
|
48 |
- self._max_cached_actions = max_cached_actions
|
|
49 |
- self._digest_map = collections.OrderedDict()
|
|
50 |
- |
|
51 |
- def get_action_result(self, action_digest):
|
|
52 |
- """Retrieves the cached ActionResult for the given Action digest.
|
|
53 |
- |
|
54 |
- Args:
|
|
55 |
- action_digest (Digest): digest of the Action to query.
|
|
56 |
- |
|
57 |
- Returns:
|
|
58 |
- The cached ActionResult matching the given Action digest or None if
|
|
59 |
- the nothing hass been cached yet for that Action.
|
|
60 |
- """
|
|
61 |
- key = (action_digest.hash, action_digest.size_bytes)
|
|
62 |
- if key in self._digest_map:
|
|
63 |
- action_result = self._storage.get_message(self._digest_map[key],
|
|
64 |
- re_pb2.ActionResult)
|
|
65 |
- if action_result is not None:
|
|
66 |
- if self._blobs_still_exist(action_result):
|
|
67 |
- self._digest_map.move_to_end(key)
|
|
68 |
- return action_result
|
|
69 |
- del self._digest_map[key]
|
|
70 |
- return None
|
|
71 |
- |
|
72 |
- def put_action_result(self, action_digest, action_result):
|
|
73 |
- """Stores an ActionResult in cache for the given Action digest.
|
|
74 |
- |
|
75 |
- If the cache size limit has been reached, the oldest cache entries will
|
|
76 |
- be dropped before insertion so that the cache size never exceeds the
|
|
77 |
- maximum numbers of entries allowed.
|
|
78 |
- |
|
79 |
- Args:
|
|
80 |
- action_digest (Digest): digest of the Action to select.
|
|
81 |
- action_result (ActionResult): result object to store.
|
|
82 |
- """
|
|
83 |
- if self._max_cached_actions == 0:
|
|
84 |
- return
|
|
85 |
- |
|
86 |
- while len(self._digest_map) >= self._max_cached_actions:
|
|
87 |
- self._digest_map.popitem(last=False)
|
|
88 |
- |
|
89 |
- key = (action_digest.hash, action_digest.size_bytes)
|
|
90 |
- action_result_digest = self._storage.put_message(action_result)
|
|
91 |
- self._digest_map[key] = action_result_digest
|
|
92 |
- |
|
93 |
- def _blobs_still_exist(self, action_result):
|
|
94 |
- """Checks CAS for ActionResult output blobs existance.
|
|
95 |
- |
|
96 |
- Args:
|
|
97 |
- action_result (ActionResult): ActionResult to search referenced
|
|
98 |
- output blobs for.
|
|
99 |
- |
|
100 |
- Returns:
|
|
101 |
- True if all referenced blobs are present in CAS, False otherwise.
|
|
102 |
- """
|
|
103 |
- blobs_needed = []
|
|
104 |
- |
|
105 |
- for output_file in action_result.output_files:
|
|
106 |
- blobs_needed.append(output_file.digest)
|
|
107 |
- |
|
108 |
- for output_directory in action_result.output_directories:
|
|
109 |
- blobs_needed.append(output_directory.tree_digest)
|
|
110 |
- tree = self._storage.get_message(output_directory.tree_digest,
|
|
111 |
- re_pb2.Tree)
|
|
112 |
- if tree is None:
|
|
113 |
- return False
|
|
114 |
- for file_node in tree.root.files:
|
|
115 |
- blobs_needed.append(file_node.digest)
|
|
116 |
- for child in tree.children:
|
|
117 |
- for file_node in child.files:
|
|
118 |
- blobs_needed.append(file_node.digest)
|
|
119 |
- |
|
120 |
- if action_result.stdout_digest.hash and not action_result.stdout_raw:
|
|
121 |
- blobs_needed.append(action_result.stdout_digest)
|
|
122 |
- if action_result.stderr_digest.hash and not action_result.stderr_raw:
|
|
123 |
- blobs_needed.append(action_result.stderr_digest)
|
|
124 |
- |
|
125 |
- missing = self._storage.missing_blobs(blobs_needed)
|
|
126 |
- return len(missing) == 0
|
... | ... | @@ -90,7 +90,7 @@ class Scheduler: |
90 | 90 |
job.update_execute_stage(ExecuteStage.COMPLETED)
|
91 | 91 |
self.jobs[name] = job
|
92 | 92 |
if not job.do_not_cache and self._action_cache is not None:
|
93 |
- self._action_cache.put_action_result(job.action_digest, result)
|
|
93 |
+ self._action_cache.update_action_result(job.action_digest, result)
|
|
94 | 94 |
|
95 | 95 |
def get_operations(self):
|
96 | 96 |
response = operations_pb2.ListOperationsResponse()
|
... | ... | @@ -31,30 +31,59 @@ def gen_fetch_blob(stub, digest, instance_name=""): |
31 | 31 |
yield response.data
|
32 | 32 |
|
33 | 33 |
|
34 |
-def write_fetch_directory(directory, stub, digest, instance_name=""):
|
|
35 |
- """ Given a directory digest, fetches files and writes them to a directory
|
|
34 |
+def write_fetch_directory(root_directory, stub, digest, instance_name=None):
|
|
35 |
+ """Locally replicates a directory from CAS.
|
|
36 |
+ |
|
37 |
+ Args:
|
|
38 |
+ root_directory (str): local directory to populate.
|
|
39 |
+ stub (): gRPC stub for CAS communication.
|
|
40 |
+ digest (Digest): digest for the directory to fetch from CAS.
|
|
41 |
+ instance_name (str, optional): farm instance name to query data from.
|
|
36 | 42 |
"""
|
37 |
- # TODO: Extend to symlinks and inner directories
|
|
38 |
- # pathlib.Path('/my/directory').mkdir(parents=True, exist_ok=True)
|
|
43 |
+ if not os.path.isabs(root_directory):
|
|
44 |
+ root_directory = os.path.abspath(root_directory)
|
|
45 |
+ if not os.path.exists(root_directory):
|
|
46 |
+ os.makedirs(root_directory, exist_ok=True)
|
|
39 | 47 |
|
40 |
- directory_pb2 = remote_execution_pb2.Directory()
|
|
41 |
- directory_pb2 = parse_to_pb2_from_fetch(directory_pb2, stub, digest, instance_name)
|
|
48 |
+ directory = parse_to_pb2_from_fetch(remote_execution_pb2.Directory(),
|
|
49 |
+ stub, digest, instance_name)
|
|
50 |
+ |
|
51 |
+ for directory_node in directory.directories:
|
|
52 |
+ child_path = os.path.join(root_directory, directory_node.name)
|
|
53 |
+ |
|
54 |
+ write_fetch_directory(child_path, stub, directory_node.digest, instance_name)
|
|
55 |
+ |
|
56 |
+ for file_node in directory.files:
|
|
57 |
+ child_path = os.path.join(root_directory, file_node.name)
|
|
58 |
+ |
|
59 |
+ with open(child_path, 'wb') as child_file:
|
|
60 |
+ write_fetch_blob(child_file, stub, file_node.digest, instance_name)
|
|
61 |
+ |
|
62 |
+ for symlink_node in directory.symlinks:
|
|
63 |
+ child_path = os.path.join(root_directory, symlink_node.name)
|
|
42 | 64 |
|
43 |
- for file_node in directory_pb2.files:
|
|
44 |
- path = os.path.join(directory, file_node.name)
|
|
45 |
- with open(path, 'wb') as f:
|
|
46 |
- write_fetch_blob(f, stub, file_node.digest, instance_name)
|
|
65 |
+ if os.path.isabs(symlink_node.target):
|
|
66 |
+ continue # No out of temp-directory links for now.
|
|
67 |
+ target_path = os.path.join(root_directory, symlink_node.target)
|
|
47 | 68 |
|
69 |
+ os.symlink(child_path, target_path)
|
|
48 | 70 |
|
49 |
-def write_fetch_blob(out, stub, digest, instance_name=""):
|
|
50 |
- """ Given an output buffer, fetches blob and writes to buffer
|
|
71 |
+ |
|
72 |
+def write_fetch_blob(target_file, stub, digest, instance_name=None):
|
|
73 |
+ """Extracts a blob from CAS into a local file.
|
|
74 |
+ |
|
75 |
+ Args:
|
|
76 |
+ target_file (str): local file to write.
|
|
77 |
+ stub (): gRPC stub for CAS communication.
|
|
78 |
+ digest (Digest): digest for the blob to fetch from CAS.
|
|
79 |
+ instance_name (str, optional): farm instance name to query data from.
|
|
51 | 80 |
"""
|
52 | 81 |
|
53 | 82 |
for stream in gen_fetch_blob(stub, digest, instance_name):
|
54 |
- out.write(stream)
|
|
83 |
+ target_file.write(stream)
|
|
84 |
+ target_file.flush()
|
|
55 | 85 |
|
56 |
- out.flush()
|
|
57 |
- assert digest.size_bytes == os.fstat(out.fileno()).st_size
|
|
86 |
+ assert digest.size_bytes == os.fstat(target_file.fileno()).st_size
|
|
58 | 87 |
|
59 | 88 |
|
60 | 89 |
def parse_to_pb2_from_fetch(pb2, stub, digest, instance_name=""):
|
... | ... | @@ -107,6 +136,49 @@ def file_maker(file_path, file_digest): |
107 | 136 |
is_executable=os.access(file_path, os.X_OK))
|
108 | 137 |
|
109 | 138 |
|
110 |
-def read_file(read):
|
|
111 |
- with open(read, 'rb') as f:
|
|
112 |
- return f.read()
|
|
139 |
+def read_file(file_path):
|
|
140 |
+ """Loads raw file content in memory.
|
|
141 |
+ |
|
142 |
+ Returns:
|
|
143 |
+ bytes: Raw file's content until EOF.
|
|
144 |
+ |
|
145 |
+ Raises:
|
|
146 |
+ OSError: If `file_path` does not exist or is not readable.
|
|
147 |
+ """
|
|
148 |
+ with open(file_path, 'rb') as byte_file:
|
|
149 |
+ return byte_file.read()
|
|
150 |
+ |
|
151 |
+ |
|
152 |
+def output_file_maker(file_path, input_path):
|
|
153 |
+ """Creates a gRPC :obj:`OutputFile` for a local file.
|
|
154 |
+ |
|
155 |
+ `file_path` **must** point inside or be relative to `input_path`.
|
|
156 |
+ |
|
157 |
+ Args:
|
|
158 |
+ file_path (str): absolute or relative path to a local file.
|
|
159 |
+ input_path (str): absolute or relative path to the input root directory.
|
|
160 |
+ |
|
161 |
+ Returns:
|
|
162 |
+ :obj:`OutputFile`, :obj:`BatchUpdateBlobsRequest`: Tuple of a new gRPC
|
|
163 |
+ :obj:`OutputFile` object for the file pointed by `file_path` and the
|
|
164 |
+ corresponding :obj:`BatchUpdateBlobsRequest` for CAS upload.
|
|
165 |
+ """
|
|
166 |
+ if not os.path.isabs(file_path):
|
|
167 |
+ file_path = os.path.abspath(file_path)
|
|
168 |
+ if not os.path.isabs(input_path):
|
|
169 |
+ input_path = os.path.abspath(input_path)
|
|
170 |
+ |
|
171 |
+ file_digest = remote_execution_pb2.Digest()
|
|
172 |
+ |
|
173 |
+ file_data = read_file(file_path)
|
|
174 |
+ file_digest.hash = HASH(file_data).hexdigest()
|
|
175 |
+ file_digest.size_bytes = len(file_data)
|
|
176 |
+ |
|
177 |
+ output_file = remote_execution_pb2.OutputFile(digest=file_digest)
|
|
178 |
+ output_file.path = os.path.relpath(file_path, start=input_path)
|
|
179 |
+ output_file.is_executable = os.access(file_path, os.X_OK)
|
|
180 |
+ |
|
181 |
+ update_request = remote_execution_pb2.BatchUpdateBlobsRequest.Request(digest=file_digest)
|
|
182 |
+ update_request.data = file_data
|
|
183 |
+ |
|
184 |
+ return output_file, update_request
|