finnball pushed to branch finn/cas-commands at BuildGrid / buildgrid
Commits:
-
b0066338
by finn at 2018-08-15T15:06:23Z
-
d5c182ed
by finn at 2018-08-15T15:06:26Z
-
e32c8c8c
by finn at 2018-08-15T15:06:26Z
-
31249375
by finn at 2018-08-15T15:06:26Z
13 changed files:
- README.rst
- + app/bots/__init__.py
- + app/bots/buildbox.py
- + app/bots/dummy.py
- + app/bots/temp_directory.py
- app/commands/cmd_bot.py
- + app/commands/cmd_cas.py
- app/commands/cmd_execute.py
- buildgrid/bot/bot_session.py
- buildgrid/server/job.py
- buildgrid/server/scheduler.py
- + buildgrid/utils.py
- tests/integration/operations_service.py
Changes:
... | ... | @@ -38,7 +38,7 @@ In one terminal, start a server:: |
38 | 38 |
|
39 | 39 |
In another terminal, send a request for work::
|
40 | 40 |
|
41 |
- bgd execute request
|
|
41 |
+ bgd execute request-dummy
|
|
42 | 42 |
|
43 | 43 |
The stage should show as `QUEUED` as it awaits a bot to pick up the work::
|
44 | 44 |
|
... | ... | @@ -51,3 +51,35 @@ Create a bot session:: |
51 | 51 |
Show the work as completed::
|
52 | 52 |
|
53 | 53 |
bgd execute list
|
54 |
+ |
|
55 |
+Instructions for a Simple Build
|
|
56 |
+-------------------------------
|
|
57 |
+ |
|
58 |
+This example covers a simple build. The user will upload a directory containing a C file and a command to the CAS. The bot will then fetch the uploaded directory and command which will then be run inside a temporary directory. The result will then be uploaded to the CAS and downloaded by the user. This is an early demo and still lacks a few features such as symlink support and checking to see if files exist in the CAS before executing a command.
|
|
59 |
+ |
|
60 |
+Create a new directory called `test-buildgrid/` and place the following C file in it called `hello.c`::
|
|
61 |
+ |
|
62 |
+ #include <stdio.h>
|
|
63 |
+ int main()
|
|
64 |
+ {
|
|
65 |
+ printf("Hello, World!");
|
|
66 |
+ return 0;
|
|
67 |
+ }
|
|
68 |
+ |
|
69 |
+Now start a BuildGrid server, passing it a directory it can write a CAS to::
|
|
70 |
+ |
|
71 |
+ bgd server start --cas disk --cas-cache disk --cas-disk-directory /path/to/empty/directory
|
|
72 |
+ |
|
73 |
+Start the following bot session::
|
|
74 |
+ |
|
75 |
+ bgd bot temp-directory
|
|
76 |
+ |
|
77 |
+Upload the directory containing the C file::
|
|
78 |
+ |
|
79 |
+ bgd cas upload-dir /path/to/test-buildgrid
|
|
80 |
+ |
|
81 |
+Now we send an execution request to the bot with the name of the epxected `output-file`, a boolean describing if it is executeable, the path to the directory we uploaded in order to calculate the digest and finally the command to run on the bot::
|
|
82 |
+ |
|
83 |
+ bgd execute command --output-file hello True /path/to/test-buildgrid -- gcc -Wall hello.c -o hello
|
|
84 |
+ |
|
85 |
+The resulting executeable should have returned to a new directory called `testing/`
|
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+ |
|
16 |
+import os
|
|
17 |
+import subprocess
|
|
18 |
+import tempfile
|
|
19 |
+import grpc
|
|
20 |
+ |
|
21 |
+from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
|
22 |
+from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
|
|
23 |
+from buildgrid.utils import read_file
|
|
24 |
+from google.protobuf import any_pb2
|
|
25 |
+ |
|
26 |
+ |
|
27 |
+def work_buildbox(context, lease):
|
|
28 |
+ logger = context.logger
|
|
29 |
+ |
|
30 |
+ action_any = lease.payload
|
|
31 |
+ action = remote_execution_pb2.Action()
|
|
32 |
+ action_any.Unpack(action)
|
|
33 |
+ |
|
34 |
+ cert_server = read_file(context.server_cert)
|
|
35 |
+ cert_client = read_file(context.client_cert)
|
|
36 |
+ key_client = read_file(context.client_key)
|
|
37 |
+ |
|
38 |
+ # create server credentials
|
|
39 |
+ credentials = grpc.ssl_channel_credentials(root_certificates=cert_server,
|
|
40 |
+ private_key=key_client,
|
|
41 |
+ certificate_chain=cert_client)
|
|
42 |
+ |
|
43 |
+ channel = grpc.secure_channel('{}:{}'.format(context.remote, context.port), credentials)
|
|
44 |
+ |
|
45 |
+ stub = bytestream_pb2_grpc.ByteStreamStub(channel)
|
|
46 |
+ |
|
47 |
+ remote_command = _buildstream_fetch_command(context.local_cas, stub, action.command_digest)
|
|
48 |
+ environment = dict((x.name, x.value) for x in remote_command.environment_variables)
|
|
49 |
+ logger.debug("command hash: {}".format(action.command_digest.hash))
|
|
50 |
+ logger.debug("vdir hash: {}".format(action.input_root_digest.hash))
|
|
51 |
+ logger.debug("\n{}".format(' '.join(remote_command.arguments)))
|
|
52 |
+ |
|
53 |
+ command = ['buildbox',
|
|
54 |
+ '--remote={}'.format('https://{}:{}'.format(context.remote, context.port)),
|
|
55 |
+ '--server-cert={}'.format(context.server_cert),
|
|
56 |
+ '--client-key={}'.format(context.client_key),
|
|
57 |
+ '--client-cert={}'.format(context.client_cert),
|
|
58 |
+ '--local={}'.format(context.local_cas),
|
|
59 |
+ '--chdir={}'.format(environment['PWD']),
|
|
60 |
+ context.fuse_dir]
|
|
61 |
+ |
|
62 |
+ command.extend(remote_command.arguments)
|
|
63 |
+ |
|
64 |
+ logger.debug(' '.join(command))
|
|
65 |
+ logger.debug("Input root digest:\n{}".format(action.input_root_digest))
|
|
66 |
+ logger.info("Launching process")
|
|
67 |
+ |
|
68 |
+ proc = subprocess.Popen(command,
|
|
69 |
+ stdin=subprocess.PIPE,
|
|
70 |
+ stdout=subprocess.PIPE)
|
|
71 |
+ std_send = action.input_root_digest.SerializeToString()
|
|
72 |
+ std_out, _ = proc.communicate(std_send)
|
|
73 |
+ |
|
74 |
+ output_root_digest = remote_execution_pb2.Digest()
|
|
75 |
+ output_root_digest.ParseFromString(std_out)
|
|
76 |
+ logger.debug("Output root digest: {}".format(output_root_digest))
|
|
77 |
+ |
|
78 |
+ output_file = remote_execution_pb2.OutputDirectory(tree_digest=output_root_digest)
|
|
79 |
+ |
|
80 |
+ action_result = remote_execution_pb2.ActionResult()
|
|
81 |
+ action_result.output_directories.extend([output_file])
|
|
82 |
+ |
|
83 |
+ action_result_any = any_pb2.Any()
|
|
84 |
+ action_result_any.Pack(action_result)
|
|
85 |
+ |
|
86 |
+ lease.result.CopyFrom(action_result_any)
|
|
87 |
+ |
|
88 |
+ return lease
|
|
89 |
+ |
|
90 |
+ |
|
91 |
+def _buildstream_fetch_blob(remote, digest, out):
|
|
92 |
+ resource_name = os.path.join(digest.hash, str(digest.size_bytes))
|
|
93 |
+ request = bytestream_pb2.ReadRequest()
|
|
94 |
+ request.resource_name = resource_name
|
|
95 |
+ request.read_offset = 0
|
|
96 |
+ for response in remote.Read(request):
|
|
97 |
+ out.write(response.data)
|
|
98 |
+ |
|
99 |
+ out.flush()
|
|
100 |
+ assert digest.size_bytes == os.fstat(out.fileno()).st_size
|
|
101 |
+ |
|
102 |
+ |
|
103 |
+def _buildstream_fetch_command(casdir, remote, digest):
|
|
104 |
+ with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as out:
|
|
105 |
+ _buildstream_fetch_blob(remote, digest, out)
|
|
106 |
+ remote_command = remote_execution_pb2.Command()
|
|
107 |
+ with open(out.name, 'rb') as f:
|
|
108 |
+ remote_command.ParseFromString(f.read())
|
|
109 |
+ return remote_command
|
|
110 |
+ |
|
111 |
+ |
|
112 |
+def _buildstream_fetch_action(casdir, remote, digest):
|
|
113 |
+ with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as out:
|
|
114 |
+ _buildstream_fetch_blob(remote, digest, out)
|
|
115 |
+ remote_action = remote_execution_pb2.Action()
|
|
116 |
+ with open(out.name, 'rb') as f:
|
|
117 |
+ remote_action.ParseFromString(f.read())
|
|
118 |
+ return remote_action
|
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+ |
|
16 |
+import random
|
|
17 |
+import time
|
|
18 |
+ |
|
19 |
+ |
|
20 |
+def work_dummy(context, lease):
|
|
21 |
+ """ Just returns lease after some random time
|
|
22 |
+ """
|
|
23 |
+ time.sleep(random.randint(1, 5))
|
|
24 |
+ return lease
|
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+ |
|
16 |
+import os
|
|
17 |
+import subprocess
|
|
18 |
+import tempfile
|
|
19 |
+ |
|
20 |
+from buildgrid.utils import read_file, create_digest, write_fetch_directory, parse_to_pb2_from_fetch
|
|
21 |
+ |
|
22 |
+from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
|
23 |
+from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
|
|
24 |
+from google.protobuf import any_pb2
|
|
25 |
+ |
|
26 |
+ |
|
27 |
+def work_temp_directory(context, lease):
|
|
28 |
+ """ Bot downloads directories and files into a temp directory,
|
|
29 |
+ then uploads results back to CAS
|
|
30 |
+ """
|
|
31 |
+ |
|
32 |
+ instance_name = context.instance_name
|
|
33 |
+ stub_bytestream = bytestream_pb2_grpc.ByteStreamStub(context.channel)
|
|
34 |
+ |
|
35 |
+ action_digest = remote_execution_pb2.Digest()
|
|
36 |
+ lease.payload.Unpack(action_digest)
|
|
37 |
+ |
|
38 |
+ action = remote_execution_pb2.Action()
|
|
39 |
+ |
|
40 |
+ action = parse_to_pb2_from_fetch(action, stub_bytestream, action_digest, instance_name)
|
|
41 |
+ |
|
42 |
+ with tempfile.TemporaryDirectory() as temp_dir:
|
|
43 |
+ |
|
44 |
+ command = remote_execution_pb2.Command()
|
|
45 |
+ command = parse_to_pb2_from_fetch(command, stub_bytestream, action.command_digest, instance_name)
|
|
46 |
+ |
|
47 |
+ arguments = "cd {} &&".format(temp_dir)
|
|
48 |
+ |
|
49 |
+ for argument in command.arguments:
|
|
50 |
+ arguments += " {}".format(argument)
|
|
51 |
+ |
|
52 |
+ context.logger.info(arguments)
|
|
53 |
+ |
|
54 |
+ write_fetch_directory(temp_dir, stub_bytestream, action.input_root_digest, instance_name)
|
|
55 |
+ |
|
56 |
+ proc = subprocess.Popen(arguments,
|
|
57 |
+ shell=True,
|
|
58 |
+ stdin=subprocess.PIPE,
|
|
59 |
+ stdout=subprocess.PIPE)
|
|
60 |
+ |
|
61 |
+ # TODO: Should return the std_out to the user
|
|
62 |
+ proc.communicate()
|
|
63 |
+ |
|
64 |
+ result = remote_execution_pb2.ActionResult()
|
|
65 |
+ requests = []
|
|
66 |
+ for output_file in command.output_files:
|
|
67 |
+ path = os.path.join(temp_dir, output_file)
|
|
68 |
+ chunk = read_file(path)
|
|
69 |
+ |
|
70 |
+ digest = create_digest(chunk)
|
|
71 |
+ |
|
72 |
+ result.output_files.extend([remote_execution_pb2.OutputFile(path=output_file,
|
|
73 |
+ digest=digest)])
|
|
74 |
+ |
|
75 |
+ requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
|
|
76 |
+ digest=digest, data=chunk))
|
|
77 |
+ |
|
78 |
+ request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
|
|
79 |
+ requests=requests)
|
|
80 |
+ |
|
81 |
+ stub_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel)
|
|
82 |
+ stub_cas.BatchUpdateBlobs(request)
|
|
83 |
+ |
|
84 |
+ result_any = any_pb2.Any()
|
|
85 |
+ result_any.Pack(result)
|
|
86 |
+ |
|
87 |
+ lease.result.CopyFrom(result_any)
|
|
88 |
+ |
|
89 |
+ return lease
|
... | ... | @@ -22,23 +22,17 @@ Bot command |
22 | 22 |
Create a bot interface and request work
|
23 | 23 |
"""
|
24 | 24 |
|
25 |
-import asyncio
|
|
26 | 25 |
import logging
|
27 |
-import os
|
|
28 |
-import random
|
|
29 |
-import subprocess
|
|
30 |
-import tempfile
|
|
26 |
+ |
|
31 | 27 |
from pathlib import Path, PurePath
|
32 | 28 |
|
33 | 29 |
import click
|
34 | 30 |
import grpc
|
35 |
-from google.protobuf import any_pb2
|
|
36 | 31 |
|
37 | 32 |
from buildgrid.bot import bot, bot_interface
|
38 | 33 |
from buildgrid.bot.bot_session import BotSession, Device, Worker
|
39 |
-from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
|
|
40 |
-from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
|
41 | 34 |
|
35 |
+from ..bots import buildbox, dummy, temp_directory
|
|
42 | 36 |
from ..cli import pass_context
|
43 | 37 |
|
44 | 38 |
|
... | ... | @@ -53,6 +47,7 @@ def cli(context, host, port, parent): |
53 | 47 |
|
54 | 48 |
context.logger = logging.getLogger(__name__)
|
55 | 49 |
context.logger.info("Starting on port {}".format(port))
|
50 |
+ context.channel = channel
|
|
56 | 51 |
|
57 | 52 |
worker = Worker()
|
58 | 53 |
worker.add_device(Device())
|
... | ... | @@ -63,16 +58,33 @@ def cli(context, host, port, parent): |
63 | 58 |
context.bot_session = bot_session
|
64 | 59 |
|
65 | 60 |
|
66 |
-@cli.command('dummy', short_help="Create a dummy bot session")
|
|
61 |
+@cli.command('dummy', short_help='Create a dummy bot session which just returns lease')
|
|
67 | 62 |
@pass_context
|
68 |
-def dummy(context):
|
|
63 |
+def run_dummy(context):
|
|
69 | 64 |
"""
|
70 | 65 |
Simple dummy client. Creates a session, accepts leases, does fake work and
|
71 | 66 |
updates the server.
|
72 | 67 |
"""
|
73 | 68 |
try:
|
74 | 69 |
b = bot.Bot(context.bot_session)
|
75 |
- b.session(_work_dummy,
|
|
70 |
+ b.session(dummy.work_dummy,
|
|
71 |
+ context)
|
|
72 |
+ |
|
73 |
+ except KeyboardInterrupt:
|
|
74 |
+ pass
|
|
75 |
+ |
|
76 |
+ |
|
77 |
+@cli.command('temp-directory', short_help='Runs commands in temp directory and uploads results')
|
|
78 |
+@click.option('--instance-name', default='testing')
|
|
79 |
+@pass_context
|
|
80 |
+def run_temp_directory(context, instance_name):
|
|
81 |
+ """ Downloads files and command from CAS and runs
|
|
82 |
+ in a temp directory, uploading result back to CAS
|
|
83 |
+ """
|
|
84 |
+ context.instance_name = instance_name
|
|
85 |
+ try:
|
|
86 |
+ b = bot.Bot(context.bot_session)
|
|
87 |
+ b.session(temp_directory.work_temp_directory,
|
|
76 | 88 |
context)
|
77 | 89 |
|
78 | 90 |
except KeyboardInterrupt:
|
... | ... | @@ -88,7 +100,7 @@ def dummy(context): |
88 | 100 |
@click.option('--port', show_default=True, default=11001)
|
89 | 101 |
@click.option('--remote', show_default=True, default='localhost')
|
90 | 102 |
@pass_context
|
91 |
-def work_buildbox(context, remote, port, server_cert, client_key, client_cert, local_cas, fuse_dir):
|
|
103 |
+def run_buildbox(context, remote, port, server_cert, client_key, client_cert, local_cas, fuse_dir):
|
|
92 | 104 |
"""
|
93 | 105 |
Uses BuildBox to run commands.
|
94 | 106 |
"""
|
... | ... | @@ -104,104 +116,9 @@ def work_buildbox(context, remote, port, server_cert, client_key, client_cert, l |
104 | 116 |
context.fuse_dir = fuse_dir
|
105 | 117 |
|
106 | 118 |
try:
|
107 |
- b = bot.Bot(bot_session=context.bot_session)
|
|
108 | 119 |
|
109 |
- b.session(work=_work_buildbox,
|
|
120 |
+ b = bot.Bot(context.bot_session)
|
|
121 |
+ b.session(work=buildbox.work_buildbox,
|
|
110 | 122 |
context=context)
|
111 | 123 |
except KeyboardInterrupt:
|
112 | 124 |
pass
|
113 |
- |
|
114 |
- |
|
115 |
-async def _work_dummy(context, lease):
|
|
116 |
- await asyncio.sleep(random.randint(1, 5))
|
|
117 |
- return lease
|
|
118 |
- |
|
119 |
- |
|
120 |
-async def _work_buildbox(context, lease):
|
|
121 |
- logger = context.logger
|
|
122 |
- |
|
123 |
- action_any = lease.payload
|
|
124 |
- action = remote_execution_pb2.Action()
|
|
125 |
- action_any.Unpack(action)
|
|
126 |
- |
|
127 |
- cert_server = _file_read(context.server_cert)
|
|
128 |
- cert_client = _file_read(context.client_cert)
|
|
129 |
- key_client = _file_read(context.client_key)
|
|
130 |
- |
|
131 |
- # create server credentials
|
|
132 |
- credentials = grpc.ssl_channel_credentials(root_certificates=cert_server,
|
|
133 |
- private_key=key_client,
|
|
134 |
- certificate_chain=cert_client)
|
|
135 |
- |
|
136 |
- channel = grpc.secure_channel('{}:{}'.format(context.remote, context.port), credentials)
|
|
137 |
- |
|
138 |
- stub = bytestream_pb2_grpc.ByteStreamStub(channel)
|
|
139 |
- |
|
140 |
- remote_command = _fetch_command(context.local_cas, stub, action.command_digest)
|
|
141 |
- environment = dict((x.name, x.value) for x in remote_command.environment_variables)
|
|
142 |
- logger.debug("command hash: {}".format(action.command_digest.hash))
|
|
143 |
- logger.debug("vdir hash: {}".format(action.input_root_digest.hash))
|
|
144 |
- logger.debug("\n{}".format(' '.join(remote_command.arguments)))
|
|
145 |
- |
|
146 |
- command = ['buildbox',
|
|
147 |
- '--remote={}'.format('https://{}:{}'.format(context.remote, context.port)),
|
|
148 |
- '--server-cert={}'.format(context.server_cert),
|
|
149 |
- '--client-key={}'.format(context.client_key),
|
|
150 |
- '--client-cert={}'.format(context.client_cert),
|
|
151 |
- '--local={}'.format(context.local_cas),
|
|
152 |
- '--chdir={}'.format(environment['PWD']),
|
|
153 |
- context.fuse_dir]
|
|
154 |
- |
|
155 |
- command.extend(remote_command.arguments)
|
|
156 |
- |
|
157 |
- logger.debug(' '.join(command))
|
|
158 |
- logger.debug("Input root digest:\n{}".format(action.input_root_digest))
|
|
159 |
- logger.info("Launching process")
|
|
160 |
- |
|
161 |
- proc = subprocess.Popen(command,
|
|
162 |
- stdin=subprocess.PIPE,
|
|
163 |
- stdout=subprocess.PIPE)
|
|
164 |
- std_send = action.input_root_digest.SerializeToString()
|
|
165 |
- std_out, _ = proc.communicate(std_send)
|
|
166 |
- |
|
167 |
- output_root_digest = remote_execution_pb2.Digest()
|
|
168 |
- output_root_digest.ParseFromString(std_out)
|
|
169 |
- logger.debug("Output root digest: {}".format(output_root_digest))
|
|
170 |
- |
|
171 |
- output_file = remote_execution_pb2.OutputDirectory(tree_digest=output_root_digest)
|
|
172 |
- |
|
173 |
- action_result = remote_execution_pb2.ActionResult()
|
|
174 |
- action_result.output_directories.extend([output_file])
|
|
175 |
- |
|
176 |
- action_result_any = any_pb2.Any()
|
|
177 |
- action_result_any.Pack(action_result)
|
|
178 |
- |
|
179 |
- lease.result.CopyFrom(action_result_any)
|
|
180 |
- |
|
181 |
- return lease
|
|
182 |
- |
|
183 |
- |
|
184 |
-def _fetch_blob(remote, digest, out):
|
|
185 |
- resource_name = os.path.join(digest.hash, str(digest.size_bytes))
|
|
186 |
- request = bytestream_pb2.ReadRequest()
|
|
187 |
- request.resource_name = resource_name
|
|
188 |
- request.read_offset = 0
|
|
189 |
- for response in remote.Read(request):
|
|
190 |
- out.write(response.data)
|
|
191 |
- |
|
192 |
- out.flush()
|
|
193 |
- assert digest.size_bytes == os.fstat(out.fileno()).st_size
|
|
194 |
- |
|
195 |
- |
|
196 |
-def _fetch_command(casdir, remote, digest):
|
|
197 |
- with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as out:
|
|
198 |
- _fetch_blob(remote, digest, out)
|
|
199 |
- remote_command = remote_execution_pb2.Command()
|
|
200 |
- with open(out.name, 'rb') as f:
|
|
201 |
- remote_command.ParseFromString(f.read())
|
|
202 |
- return remote_command
|
|
203 |
- |
|
204 |
- |
|
205 |
-def _file_read(file_path):
|
|
206 |
- with open(file_path, 'rb') as f:
|
|
207 |
- return f.read()
|
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+ |
|
16 |
+"""
|
|
17 |
+Execute command
|
|
18 |
+=================
|
|
19 |
+ |
|
20 |
+Request work to be executed and monitor status of jobs.
|
|
21 |
+"""
|
|
22 |
+ |
|
23 |
+import logging
|
|
24 |
+import click
|
|
25 |
+import grpc
|
|
26 |
+ |
|
27 |
+from buildgrid.utils import merkle_maker, create_digest
|
|
28 |
+from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
|
29 |
+ |
|
30 |
+from ..cli import pass_context
|
|
31 |
+ |
|
32 |
+ |
|
33 |
+@click.group(short_help='Interact with the CAS')
|
|
34 |
+@click.option('--port', default='50051')
|
|
35 |
+@click.option('--host', default='localhost')
|
|
36 |
+@pass_context
|
|
37 |
+def cli(context, host, port):
|
|
38 |
+ context.logger = logging.getLogger(__name__)
|
|
39 |
+ context.logger.info("Starting on port {}".format(port))
|
|
40 |
+ |
|
41 |
+ context.channel = grpc.insecure_channel('{}:{}'.format(host, port))
|
|
42 |
+ context.port = port
|
|
43 |
+ |
|
44 |
+ |
|
45 |
+@cli.command('upload-files', short_help='Upload files')
|
|
46 |
+@click.argument('files', nargs=-1, type=click.File('rb'))
|
|
47 |
+@click.option('--instance-name', default='testing')
|
|
48 |
+@pass_context
|
|
49 |
+def upload_files(context, files, instance_name):
|
|
50 |
+ stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel)
|
|
51 |
+ |
|
52 |
+ requests = []
|
|
53 |
+ for file in files:
|
|
54 |
+ chunk = file.read()
|
|
55 |
+ requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
|
|
56 |
+ digest=create_digest(chunk), data=chunk))
|
|
57 |
+ |
|
58 |
+ request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
|
|
59 |
+ requests=requests)
|
|
60 |
+ |
|
61 |
+ context.logger.info("Sending: {}".format(request))
|
|
62 |
+ response = stub.BatchUpdateBlobs(request)
|
|
63 |
+ context.logger.info("Response: {}".format(response))
|
|
64 |
+ |
|
65 |
+ |
|
66 |
+@cli.command('upload-dir', short_help='Upload files')
|
|
67 |
+@click.argument('directory')
|
|
68 |
+@click.option('--instance-name', default='testing')
|
|
69 |
+@pass_context
|
|
70 |
+def upload_dir(context, directory, instance_name):
|
|
71 |
+ context.logger.info("Uploading directory to cas")
|
|
72 |
+ stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel)
|
|
73 |
+ |
|
74 |
+ requests = []
|
|
75 |
+ |
|
76 |
+ for chunk, file_digest in merkle_maker(directory):
|
|
77 |
+ requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
|
|
78 |
+ digest=file_digest, data=chunk))
|
|
79 |
+ |
|
80 |
+ request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
|
|
81 |
+ requests=requests)
|
|
82 |
+ |
|
83 |
+ context.logger.info("Request:\n{}".format(request))
|
|
84 |
+ response = stub.BatchUpdateBlobs(request)
|
|
85 |
+ context.logger.info("Response:\n{}".format(response))
|
... | ... | @@ -22,18 +22,22 @@ Execute command |
22 | 22 |
Request work to be executed and monitor status of jobs.
|
23 | 23 |
"""
|
24 | 24 |
|
25 |
+import errno
|
|
25 | 26 |
import logging
|
26 |
- |
|
27 |
+import stat
|
|
28 |
+import os
|
|
27 | 29 |
import click
|
28 | 30 |
import grpc
|
29 | 31 |
|
32 |
+from buildgrid.utils import merkle_maker, create_digest, write_fetch_blob
|
|
30 | 33 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
34 |
+from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
|
|
31 | 35 |
from buildgrid._protos.google.longrunning import operations_pb2, operations_pb2_grpc
|
32 | 36 |
|
33 | 37 |
from ..cli import pass_context
|
34 | 38 |
|
35 | 39 |
|
36 |
-@click.group(short_help="Simple execute client")
|
|
40 |
+@click.group(short_help='Simple execute client')
|
|
37 | 41 |
@click.option('--port', default='50051')
|
38 | 42 |
@click.option('--host', default='localhost')
|
39 | 43 |
@pass_context
|
... | ... | @@ -45,12 +49,12 @@ def cli(context, host, port): |
45 | 49 |
context.port = port
|
46 | 50 |
|
47 | 51 |
|
48 |
-@cli.command('request', short_help="Send a dummy action")
|
|
52 |
+@cli.command('request-dummy', short_help='Send a dummy action')
|
|
49 | 53 |
@click.option('--number', default=1)
|
50 | 54 |
@click.option('--instance-name', default='testing')
|
51 |
-@click.option('--wait-for-completion', is_flag=True)
|
|
55 |
+@click.option('--wait-for-completion', is_flag=True, help='Stream updates until jobs are completed')
|
|
52 | 56 |
@pass_context
|
53 |
-def request(context, number, instance_name, wait_for_completion):
|
|
57 |
+def request_dummy(context, number, instance_name, wait_for_completion):
|
|
54 | 58 |
action_digest = remote_execution_pb2.Digest()
|
55 | 59 |
|
56 | 60 |
context.logger.info("Sending execution request...\n")
|
... | ... | @@ -72,7 +76,7 @@ def request(context, number, instance_name, wait_for_completion): |
72 | 76 |
context.logger.info(next(response))
|
73 | 77 |
|
74 | 78 |
|
75 |
-@cli.command('status', short_help="Get the status of an operation")
|
|
79 |
+@cli.command('status', short_help='Get the status of an operation')
|
|
76 | 80 |
@click.argument('operation-name')
|
77 | 81 |
@pass_context
|
78 | 82 |
def operation_status(context, operation_name):
|
... | ... | @@ -85,7 +89,7 @@ def operation_status(context, operation_name): |
85 | 89 |
context.logger.info(response)
|
86 | 90 |
|
87 | 91 |
|
88 |
-@cli.command('list', short_help="List operations")
|
|
92 |
+@cli.command('list', short_help='List operations')
|
|
89 | 93 |
@pass_context
|
90 | 94 |
def list_operations(context):
|
91 | 95 |
context.logger.info("Getting list of operations")
|
... | ... | @@ -103,7 +107,7 @@ def list_operations(context): |
103 | 107 |
context.logger.info(op)
|
104 | 108 |
|
105 | 109 |
|
106 |
-@cli.command('wait', short_help="Streams an operation until it is complete")
|
|
110 |
+@cli.command('wait', short_help='Streams an operation until it is complete')
|
|
107 | 111 |
@click.argument('operation-name')
|
108 | 112 |
@pass_context
|
109 | 113 |
def wait_execution(context, operation_name):
|
... | ... | @@ -114,3 +118,86 @@ def wait_execution(context, operation_name): |
114 | 118 |
|
115 | 119 |
for stream in response:
|
116 | 120 |
context.logger.info(stream)
|
121 |
+ |
|
122 |
+ |
|
123 |
+@cli.command('command', short_help='Send a command to be executed')
|
|
124 |
+@click.argument('input-root')
|
|
125 |
+@click.argument('commands', nargs=-1)
|
|
126 |
+@click.option('--output-file', nargs=2, type=(str, bool), multiple=True,
|
|
127 |
+ help='{Expected output file, is_executeable flag}')
|
|
128 |
+@click.option('--output-directory', default='testing', help='Output directory for output files')
|
|
129 |
+@click.option('--instance-name', default='testing')
|
|
130 |
+@pass_context
|
|
131 |
+def command(context, input_root, commands, output_file, output_directory, instance_name):
|
|
132 |
+ stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
|
|
133 |
+ |
|
134 |
+ execute_command = remote_execution_pb2.Command()
|
|
135 |
+ |
|
136 |
+ for arg in commands:
|
|
137 |
+ execute_command.arguments.extend([arg])
|
|
138 |
+ |
|
139 |
+ output_executeables = []
|
|
140 |
+ for file, is_executeable in output_file:
|
|
141 |
+ execute_command.output_files.extend([file])
|
|
142 |
+ if is_executeable:
|
|
143 |
+ output_executeables.append(file)
|
|
144 |
+ |
|
145 |
+ command_digest = create_digest(execute_command.SerializeToString())
|
|
146 |
+ context.logger.info(command_digest)
|
|
147 |
+ |
|
148 |
+ # TODO: Check for missing blobs
|
|
149 |
+ digest = None
|
|
150 |
+ for _, digest in merkle_maker(input_root):
|
|
151 |
+ pass
|
|
152 |
+ |
|
153 |
+ action = remote_execution_pb2.Action(command_digest=command_digest,
|
|
154 |
+ input_root_digest=digest,
|
|
155 |
+ do_not_cache=True)
|
|
156 |
+ |
|
157 |
+ action_digest = create_digest(action.SerializeToString())
|
|
158 |
+ |
|
159 |
+ context.logger.info("Sending execution request...\n")
|
|
160 |
+ |
|
161 |
+ requests = []
|
|
162 |
+ requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
|
|
163 |
+ digest=command_digest, data=execute_command.SerializeToString()))
|
|
164 |
+ |
|
165 |
+ requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
|
|
166 |
+ digest=action_digest, data=action.SerializeToString()))
|
|
167 |
+ |
|
168 |
+ request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
|
|
169 |
+ requests=requests)
|
|
170 |
+ remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel).BatchUpdateBlobs(request)
|
|
171 |
+ |
|
172 |
+ request = remote_execution_pb2.ExecuteRequest(instance_name=instance_name,
|
|
173 |
+ action_digest=action_digest,
|
|
174 |
+ skip_cache_lookup=True)
|
|
175 |
+ response = stub.Execute(request)
|
|
176 |
+ |
|
177 |
+ stub = bytestream_pb2_grpc.ByteStreamStub(context.channel)
|
|
178 |
+ |
|
179 |
+ stream = None
|
|
180 |
+ for stream in response:
|
|
181 |
+ context.logger.info(stream)
|
|
182 |
+ |
|
183 |
+ execute_response = remote_execution_pb2.ExecuteResponse()
|
|
184 |
+ stream.response.Unpack(execute_response)
|
|
185 |
+ |
|
186 |
+ for output_file_response in execute_response.result.output_files:
|
|
187 |
+ path = os.path.join(output_directory, output_file_response.path)
|
|
188 |
+ |
|
189 |
+ if not os.path.exists(os.path.dirname(path)):
|
|
190 |
+ |
|
191 |
+ try:
|
|
192 |
+ os.makedirs(os.path.dirname(path))
|
|
193 |
+ |
|
194 |
+ except OSError as exc:
|
|
195 |
+ if exc.errno != errno.EEXIST:
|
|
196 |
+ raise
|
|
197 |
+ |
|
198 |
+ with open(path, 'wb+') as f:
|
|
199 |
+ write_fetch_blob(f, stub, output_file_response.digest, instance_name)
|
|
200 |
+ |
|
201 |
+ if output_file_response.path in output_executeables:
|
|
202 |
+ st = os.stat(path)
|
|
203 |
+ os.chmod(path, st.st_mode | stat.S_IXUSR)
|
... | ... | @@ -104,14 +104,15 @@ class BotSession: |
104 | 104 |
self._update_lease_from_server(lease)
|
105 | 105 |
|
106 | 106 |
def update_bot_session(self):
|
107 |
+ self.logger.debug("Updating bot session: {}".format(self._bot_id))
|
|
107 | 108 |
session = self._interface.update_bot_session(self.get_pb2())
|
108 |
- for lease in session.leases:
|
|
109 |
- self._update_lease_from_server(lease)
|
|
110 |
- |
|
111 |
- for k, v in self._leases.items():
|
|
109 |
+ for k, v in list(self._leases.items()):
|
|
112 | 110 |
if v.state == LeaseState.COMPLETED.value:
|
113 | 111 |
del self._leases[k]
|
114 | 112 |
|
113 |
+ for lease in session.leases:
|
|
114 |
+ self._update_lease_from_server(lease)
|
|
115 |
+ |
|
115 | 116 |
def get_pb2(self):
|
116 | 117 |
leases = list(self._leases.values())
|
117 | 118 |
if not leases:
|
... | ... | @@ -134,12 +135,16 @@ class BotSession: |
134 | 135 |
# TODO: Compare with previous state of lease
|
135 | 136 |
if lease.state == LeaseState.PENDING.value:
|
136 | 137 |
lease.state = LeaseState.ACTIVE.value
|
137 |
- asyncio.ensure_future(self.create_work(lease))
|
|
138 | 138 |
self._leases[lease.id] = lease
|
139 |
+ self.update_bot_session()
|
|
140 |
+ asyncio.ensure_future(self.create_work(lease))
|
|
139 | 141 |
|
140 | 142 |
async def create_work(self, lease):
|
141 | 143 |
self.logger.debug("Work created: {}".format(lease.id))
|
142 |
- lease = await self._work(self._context, lease)
|
|
144 |
+ |
|
145 |
+ loop = asyncio.get_event_loop()
|
|
146 |
+ lease = await loop.run_in_executor(None, self._work, self._context, lease)
|
|
147 |
+ |
|
143 | 148 |
self.logger.debug("Work complete: {}".format(lease.id))
|
144 | 149 |
self.lease_completed(lease)
|
145 | 150 |
|
... | ... | @@ -21,26 +21,25 @@ from enum import Enum |
21 | 21 |
|
22 | 22 |
from google.protobuf import any_pb2
|
23 | 23 |
|
24 |
-from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ExecuteOperationMetadata
|
|
25 |
-from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ExecuteResponse
|
|
24 |
+from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
|
26 | 25 |
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
|
27 | 26 |
from buildgrid._protos.google.longrunning import operations_pb2
|
28 | 27 |
|
29 | 28 |
|
30 | 29 |
class ExecuteStage(Enum):
|
31 |
- UNKNOWN = ExecuteOperationMetadata.Stage.Value('UNKNOWN')
|
|
30 |
+ UNKNOWN = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('UNKNOWN')
|
|
32 | 31 |
|
33 | 32 |
# Checking the result against the cache.
|
34 |
- CACHE_CHECK = ExecuteOperationMetadata.Stage.Value('CACHE_CHECK')
|
|
33 |
+ CACHE_CHECK = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('CACHE_CHECK')
|
|
35 | 34 |
|
36 | 35 |
# Currently idle, awaiting a free machine to execute.
|
37 |
- QUEUED = ExecuteOperationMetadata.Stage.Value('QUEUED')
|
|
36 |
+ QUEUED = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('QUEUED')
|
|
38 | 37 |
|
39 | 38 |
# Currently being executed by a worker.
|
40 |
- EXECUTING = ExecuteOperationMetadata.Stage.Value('EXECUTING')
|
|
39 |
+ EXECUTING = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('EXECUTING')
|
|
41 | 40 |
|
42 | 41 |
# Finished execution.
|
43 |
- COMPLETED = ExecuteOperationMetadata.Stage.Value('COMPLETED')
|
|
42 |
+ COMPLETED = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('COMPLETED')
|
|
44 | 43 |
|
45 | 44 |
|
46 | 45 |
class BotStatus(Enum):
|
... | ... | @@ -80,13 +79,13 @@ class Job: |
80 | 79 |
def __init__(self, action_digest, do_not_cache=False, message_queue=None):
|
81 | 80 |
self.lease = None
|
82 | 81 |
self.logger = logging.getLogger(__name__)
|
82 |
+ self.n_tries = 0
|
|
83 | 83 |
self.result = None
|
84 | 84 |
self.result_cached = False
|
85 | 85 |
|
86 | 86 |
self._action_digest = action_digest
|
87 | 87 |
self._do_not_cache = do_not_cache
|
88 | 88 |
self._execute_stage = ExecuteStage.UNKNOWN
|
89 |
- self._n_tries = 0
|
|
90 | 89 |
self._name = str(uuid.uuid4())
|
91 | 90 |
self._operation = operations_pb2.Operation(name=self._name)
|
92 | 91 |
self._operation_update_queues = []
|
... | ... | @@ -122,15 +121,16 @@ class Job: |
122 | 121 |
self._operation.metadata.CopyFrom(self._pack_any(self.get_operation_meta()))
|
123 | 122 |
if self.result is not None:
|
124 | 123 |
self._operation.done = True
|
125 |
- response = ExecuteResponse()
|
|
126 |
- self.result.Unpack(response.result)
|
|
127 |
- response.cached_result = self.result_cached
|
|
124 |
+ action_result = remote_execution_pb2.ActionResult()
|
|
125 |
+ self.result.Unpack(action_result)
|
|
126 |
+ response = remote_execution_pb2.ExecuteResponse(result=action_result,
|
|
127 |
+ cached_result=self.result_cached)
|
|
128 | 128 |
self._operation.response.CopyFrom(self._pack_any(response))
|
129 | 129 |
|
130 | 130 |
return self._operation
|
131 | 131 |
|
132 | 132 |
def get_operation_meta(self):
|
133 |
- meta = ExecuteOperationMetadata()
|
|
133 |
+ meta = remote_execution_pb2.ExecuteOperationMetadata()
|
|
134 | 134 |
meta.stage = self._execute_stage.value
|
135 | 135 |
meta.action_digest.CopyFrom(self._action_digest)
|
136 | 136 |
|
... | ... | @@ -25,7 +25,6 @@ from collections import deque |
25 | 25 |
|
26 | 26 |
from google.protobuf import any_pb2
|
27 | 27 |
|
28 |
-from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ActionResult
|
|
29 | 28 |
from buildgrid._protos.google.longrunning import operations_pb2
|
30 | 29 |
|
31 | 30 |
from .job import ExecuteStage, LeaseState
|
... | ... | @@ -83,9 +82,7 @@ class Scheduler: |
83 | 82 |
job.update_execute_stage(ExecuteStage.COMPLETED)
|
84 | 83 |
self.jobs[name] = job
|
85 | 84 |
if not job.do_not_cache and self.action_cache is not None:
|
86 |
- action_result = ActionResult()
|
|
87 |
- result.Unpack(action_result)
|
|
88 |
- self.action_cache.put_action_result(job.action_digest, action_result)
|
|
85 |
+ self.action_cache.put_action_result(job.action_digest, result)
|
|
89 | 86 |
|
90 | 87 |
def get_operations(self):
|
91 | 88 |
response = operations_pb2.ListOperationsResponse()
|
... | ... | @@ -94,7 +91,7 @@ class Scheduler: |
94 | 91 |
return response
|
95 | 92 |
|
96 | 93 |
def update_job_lease_state(self, name, state):
|
97 |
- job = self.jobs.get(name)
|
|
94 |
+ job = self.jobs[name]
|
|
98 | 95 |
job.lease.state = state
|
99 | 96 |
self.jobs[name] = job
|
100 | 97 |
|
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+import os
|
|
16 |
+ |
|
17 |
+from buildgrid.settings import HASH
|
|
18 |
+from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
|
19 |
+from buildgrid._protos.google.bytestream import bytestream_pb2
|
|
20 |
+ |
|
21 |
+ |
|
22 |
+def gen_fetch_blob(stub, digest, instance_name=""):
|
|
23 |
+ """ Generates byte stream from a fetch blob request
|
|
24 |
+ """
|
|
25 |
+ |
|
26 |
+ resource_name = os.path.join(instance_name, 'blobs', digest.hash, str(digest.size_bytes))
|
|
27 |
+ request = bytestream_pb2.ReadRequest(resource_name=resource_name,
|
|
28 |
+ read_offset=0)
|
|
29 |
+ for response in stub.Read(request):
|
|
30 |
+ yield response.data
|
|
31 |
+ |
|
32 |
+ |
|
33 |
+def write_fetch_directory(directory, stub, digest, instance_name=""):
|
|
34 |
+ """ Given a directory digest, fetches files and writes them to a directory
|
|
35 |
+ """
|
|
36 |
+ # TODO: Extend to symlinks and inner directories
|
|
37 |
+ # pathlib.Path('/my/directory').mkdir(parents=True, exist_ok=True)
|
|
38 |
+ |
|
39 |
+ directory_pb2 = remote_execution_pb2.Directory()
|
|
40 |
+ directory_pb2 = parse_to_pb2_from_fetch(directory_pb2, stub, digest, instance_name)
|
|
41 |
+ |
|
42 |
+ for file_node in directory_pb2.files:
|
|
43 |
+ path = os.path.join(directory, file_node.name)
|
|
44 |
+ with open(path, 'wb') as f:
|
|
45 |
+ write_fetch_blob(f, stub, file_node.digest, instance_name)
|
|
46 |
+ |
|
47 |
+ |
|
48 |
+def write_fetch_blob(out, stub, digest, instance_name=""):
|
|
49 |
+ """ Given an output buffer, fetches blob and writes to buffer
|
|
50 |
+ """
|
|
51 |
+ |
|
52 |
+ for stream in gen_fetch_blob(stub, digest, instance_name):
|
|
53 |
+ out.write(stream)
|
|
54 |
+ |
|
55 |
+ out.flush()
|
|
56 |
+ assert digest.size_bytes == os.fstat(out.fileno()).st_size
|
|
57 |
+ |
|
58 |
+ |
|
59 |
+def parse_to_pb2_from_fetch(pb2, stub, digest, instance_name=""):
|
|
60 |
+ """ Fetches stream and parses it into given pb2
|
|
61 |
+ """
|
|
62 |
+ |
|
63 |
+ stream_bytes = b''
|
|
64 |
+ for stream in gen_fetch_blob(stub, digest, instance_name):
|
|
65 |
+ stream_bytes += stream
|
|
66 |
+ |
|
67 |
+ pb2.ParseFromString(stream_bytes)
|
|
68 |
+ return pb2
|
|
69 |
+ |
|
70 |
+ |
|
71 |
+def create_digest(bytes_to_digest):
|
|
72 |
+ """ Creates a hash based on the hex digest and returns the digest
|
|
73 |
+ """
|
|
74 |
+ return remote_execution_pb2.Digest(hash=HASH(bytes_to_digest).hexdigest(),
|
|
75 |
+ size_bytes=len(bytes_to_digest))
|
|
76 |
+ |
|
77 |
+ |
|
78 |
+def merkle_maker(directory):
|
|
79 |
+ """ Walks thorugh given directory, yielding the binary and digest
|
|
80 |
+ """
|
|
81 |
+ directory_pb2 = remote_execution_pb2.Directory()
|
|
82 |
+ for (dir_path, dir_names, file_names) in os.walk(directory):
|
|
83 |
+ |
|
84 |
+ for file_name in file_names:
|
|
85 |
+ file_path = os.path.join(dir_path, file_name)
|
|
86 |
+ chunk = read_file(file_path)
|
|
87 |
+ file_digest = create_digest(chunk)
|
|
88 |
+ directory_pb2.files.extend([file_maker(file_path, file_digest)])
|
|
89 |
+ yield chunk, file_digest
|
|
90 |
+ |
|
91 |
+ for inner_dir in dir_names:
|
|
92 |
+ inner_dir_path = os.path.join(dir_path, inner_dir)
|
|
93 |
+ yield from merkle_maker(inner_dir_path)
|
|
94 |
+ |
|
95 |
+ directory_string = directory_pb2.SerializeToString()
|
|
96 |
+ |
|
97 |
+ yield directory_string, create_digest(directory_string)
|
|
98 |
+ |
|
99 |
+ |
|
100 |
+def file_maker(file_path, file_digest):
|
|
101 |
+ """ Creates a File Node
|
|
102 |
+ """
|
|
103 |
+ _, file_name = os.path.split(file_path)
|
|
104 |
+ return remote_execution_pb2.FileNode(name=file_name,
|
|
105 |
+ digest=file_digest,
|
|
106 |
+ is_executable=os.access(file_path, os.X_OK))
|
|
107 |
+ |
|
108 |
+ |
|
109 |
+def read_file(read):
|
|
110 |
+ with open(read, 'rb') as f:
|
|
111 |
+ return f.read()
|
... | ... | @@ -100,12 +100,14 @@ def test_list_operations_with_result(instance, execute_request, context): |
100 | 100 |
action_result = remote_execution_pb2.ActionResult()
|
101 | 101 |
output_file = remote_execution_pb2.OutputFile(path='unicorn')
|
102 | 102 |
action_result.output_files.extend([output_file])
|
103 |
- instance._instance._scheduler.jobs[response_execute.name].result = _pack_any(action_result)
|
|
103 |
+ |
|
104 |
+ instance._instance._scheduler.job_complete(response_execute.name, _pack_any(action_result))
|
|
104 | 105 |
|
105 | 106 |
request = operations_pb2.ListOperationsRequest()
|
106 | 107 |
response = instance.ListOperations(request, context)
|
107 | 108 |
|
108 | 109 |
assert response.operations[0].name == response_execute.name
|
110 |
+ |
|
109 | 111 |
execute_response = remote_execution_pb2.ExecuteResponse()
|
110 | 112 |
response.operations[0].response.Unpack(execute_response)
|
111 | 113 |
assert execute_response.result == action_result
|