finnball pushed to branch finn/cas-commands at BuildGrid / buildgrid
Commits:
-
6f55ca7f
by finn at 2018-08-14T09:49:28Z
14 changed files:
- .gitlab-ci.yml
- README.rst
- + app/bots/__init__.py
- + app/bots/buildbox.py
- + app/bots/dummy.py
- + app/bots/temp_directory.py
- app/commands/cmd_bot.py
- + app/commands/cmd_cas.py
- app/commands/cmd_execute.py
- buildgrid/bot/bot_session.py
- buildgrid/server/job.py
- buildgrid/server/scheduler.py
- + buildgrid/utils.py
- tests/integration/operations_service.py
Changes:
... | ... | @@ -31,7 +31,7 @@ before_script: |
31 | 31 |
- ${BGD} server start &
|
32 | 32 |
- sleep 1 # Allow server to boot
|
33 | 33 |
- ${BGD} bot --host=0.0.0.0 dummy &
|
34 |
- - ${BGD} execute --host=0.0.0.0 request --wait-for-completion
|
|
34 |
+ - ${BGD} execute --host=0.0.0.0 request-dummy --wait-for-completion
|
|
35 | 35 |
|
36 | 36 |
tests-debian-stretch:
|
37 | 37 |
<<: *linux-tests
|
... | ... | @@ -51,3 +51,35 @@ Create a bot session:: |
51 | 51 |
Show the work as completed::
|
52 | 52 |
|
53 | 53 |
bgd execute list
|
54 |
+ |
|
55 |
+Instructions for a Simple Build
|
|
56 |
+-------------------------------
|
|
57 |
+ |
|
58 |
+This example covers a simple build. The user will upload a directory containing a C file and a command to the CAS. The bot will then create a temporary directory, fetch the directory and run the command defined by the user. This is an early demo and still lacks a few features such as symlink support and checking to see if files exist in the CAS before executing a command.
|
|
59 |
+ |
|
60 |
+Create a new directory called `test-buildgrid/` and place the following C file in it called `hello.c`::
|
|
61 |
+ |
|
62 |
+ #include <stdio.h>
|
|
63 |
+ int main()
|
|
64 |
+ {
|
|
65 |
+ printf("Hello, World!");
|
|
66 |
+ return 0;
|
|
67 |
+ }
|
|
68 |
+ |
|
69 |
+Now start a BuildGrid server, passing it a directory it can write a CAS to::
|
|
70 |
+ |
|
71 |
+ bgd server start --cas disk --cas-cache disk --cas-disk-directory /path/to/empty/directory
|
|
72 |
+ |
|
73 |
+Start the following bot session::
|
|
74 |
+ |
|
75 |
+ bgd bot temp-directory
|
|
76 |
+ |
|
77 |
+Upload the directory containing the C file::
|
|
78 |
+ |
|
79 |
+ bgd cas upload-dir /path/to/test-buildgrid
|
|
80 |
+ |
|
81 |
+Now we send an execution request to the bot with the name of the epxected `output-file`, a boolean describing if it is executeable, the path to the directory we uploaded in order to calculate the digest and finally the command to run on the bot::
|
|
82 |
+ |
|
83 |
+ bgd execute command --output-file hello True /path/to/test-buildgrid -- gcc -Wall hello.c -o hello
|
|
84 |
+ |
|
85 |
+The resulting executeable should have returned to a new directory called `testing/`
|
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+ |
|
16 |
+import asyncio
|
|
17 |
+import grpc
|
|
18 |
+import os
|
|
19 |
+import subprocess
|
|
20 |
+ |
|
21 |
+from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
|
22 |
+from buildgrid._protos.google.bytestream import bytestream_pb2
|
|
23 |
+from google.protobuf import any_pb2
|
|
24 |
+ |
|
25 |
+def work_buildbox(context, lease):
|
|
26 |
+ logger = context.logger
|
|
27 |
+ |
|
28 |
+ action_any = lease.payload
|
|
29 |
+ action = remote_execution_pb2.Action()
|
|
30 |
+ action_any.Unpack(action)
|
|
31 |
+ |
|
32 |
+ cert_server = read_file(context.server_cert)
|
|
33 |
+ cert_client = read_file(context.client_cert)
|
|
34 |
+ key_client = read_file(context.client_key)
|
|
35 |
+ |
|
36 |
+ # create server credentials
|
|
37 |
+ credentials = grpc.ssl_channel_credentials(root_certificates=cert_server,
|
|
38 |
+ private_key=key_client,
|
|
39 |
+ certificate_chain=cert_client)
|
|
40 |
+ |
|
41 |
+ channel = grpc.secure_channel('{}:{}'.format(context.remote, context.port), credentials)
|
|
42 |
+ |
|
43 |
+ stub = bytestream_pb2_grpc.ByteStreamStub(channel)
|
|
44 |
+ |
|
45 |
+ remote_command = _buildstream_fetch_command(context.local_cas, stub, action.command_digest)
|
|
46 |
+ environment = dict((x.name, x.value) for x in remote_command.environment_variables)
|
|
47 |
+ logger.debug("command hash: {}".format(action.command_digest.hash))
|
|
48 |
+ logger.debug("vdir hash: {}".format(action.input_root_digest.hash))
|
|
49 |
+ logger.debug("\n{}".format(' '.join(remote_command.arguments)))
|
|
50 |
+ |
|
51 |
+ command = ['buildbox',
|
|
52 |
+ '--remote={}'.format('https://{}:{}'.format(context.remote, context.port)),
|
|
53 |
+ '--server-cert={}'.format(context.server_cert),
|
|
54 |
+ '--client-key={}'.format(context.client_key),
|
|
55 |
+ '--client-cert={}'.format(context.client_cert),
|
|
56 |
+ '--local={}'.format(context.local_cas),
|
|
57 |
+ '--chdir={}'.format(environment['PWD']),
|
|
58 |
+ context.fuse_dir]
|
|
59 |
+ |
|
60 |
+ command.extend(remote_command.arguments)
|
|
61 |
+ |
|
62 |
+ logger.debug(' '.join(command))
|
|
63 |
+ logger.debug("Input root digest:\n{}".format(action.input_root_digest))
|
|
64 |
+ logger.info("Launching process")
|
|
65 |
+ |
|
66 |
+ proc = subprocess.Popen(command,
|
|
67 |
+ stdin=subprocess.PIPE,
|
|
68 |
+ stdout=subprocess.PIPE)
|
|
69 |
+ std_send = action.input_root_digest.SerializeToString()
|
|
70 |
+ std_out, std_error = proc.communicate(std_send)
|
|
71 |
+ |
|
72 |
+ output_root_digest = remote_execution_pb2.Digest()
|
|
73 |
+ output_root_digest.ParseFromString(std_out)
|
|
74 |
+ logger.debug("Output root digest: {}".format(output_root_digest))
|
|
75 |
+ |
|
76 |
+ output_file = remote_execution_pb2.OutputDirectory(tree_digest = output_root_digest)
|
|
77 |
+ |
|
78 |
+ action_result = remote_execution_pb2.ActionResult()
|
|
79 |
+ action_result.output_directories.extend([output_file])
|
|
80 |
+ |
|
81 |
+ action_result_any = any_pb2.Any()
|
|
82 |
+ action_result_any.Pack(action_result)
|
|
83 |
+ |
|
84 |
+ lease.result.CopyFrom(action_result_any)
|
|
85 |
+ |
|
86 |
+ return lease
|
|
87 |
+ |
|
88 |
+## ########################################################################## ##
|
|
89 |
+## Below is for BuildStream's current incorrect implementation for ByteStream ##
|
|
90 |
+## ########################################################################## ##
|
|
91 |
+ |
|
92 |
+def _buildstream_fetch_blob(remote, digest, out):
|
|
93 |
+ resource_name = os.path.join(digest.hash, str(digest.size_bytes))
|
|
94 |
+ request = bytestream_pb2.ReadRequest()
|
|
95 |
+ request.resource_name = resource_name
|
|
96 |
+ request.read_offset = 0
|
|
97 |
+ for response in remote.Read(request):
|
|
98 |
+ out.write(response.data)
|
|
99 |
+ |
|
100 |
+ out.flush()
|
|
101 |
+ assert digest.size_bytes == os.fstat(out.fileno()).st_size
|
|
102 |
+ |
|
103 |
+def _buildstream_fetch_command(casdir, remote, digest):
|
|
104 |
+ with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as out:
|
|
105 |
+ _buildstream_fetch_blob(remote, digest, out)
|
|
106 |
+ remote_command = remote_execution_pb2.Command()
|
|
107 |
+ with open(out.name, 'rb') as f:
|
|
108 |
+ remote_command.ParseFromString(f.read())
|
|
109 |
+ return remote_command
|
|
110 |
+ |
|
111 |
+def _buildstream_fetch_action(casdir, remote, digest):
|
|
112 |
+ with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as out:
|
|
113 |
+ _buildstream_fetch_blob(remote, digest, out)
|
|
114 |
+ remote_action = remote_execution_pb2.Action()
|
|
115 |
+ with open(out.name, 'rb') as f:
|
|
116 |
+ remote_action.ParseFromString(f.read())
|
|
117 |
+ return remote_action
|
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+ |
|
16 |
+import random
|
|
17 |
+import time
|
|
18 |
+ |
|
19 |
+def work_dummy(context, lease):
|
|
20 |
+ """ Just returns lease after some random time
|
|
21 |
+ """
|
|
22 |
+ time.sleep(random.randint(1,5))
|
|
23 |
+ return lease
|
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+ |
|
16 |
+import asyncio
|
|
17 |
+import os
|
|
18 |
+import subprocess
|
|
19 |
+import tempfile
|
|
20 |
+ |
|
21 |
+from buildgrid.utils import read_file, create_digest, write_fetch_blob, write_fetch_directory, parse_to_pb2_from_fetch
|
|
22 |
+ |
|
23 |
+from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
|
24 |
+from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
|
|
25 |
+from google.protobuf import any_pb2
|
|
26 |
+ |
|
27 |
+def work_temp_directory(context, lease):
|
|
28 |
+ """ Bot downloads directories and files into a temp directory,
|
|
29 |
+ then uploads results back to CAS
|
|
30 |
+ """
|
|
31 |
+ |
|
32 |
+ instance_name = context.instance_name
|
|
33 |
+ stub_bytestream = bytestream_pb2_grpc.ByteStreamStub(context.channel)
|
|
34 |
+ |
|
35 |
+ action_digest = remote_execution_pb2.Digest()
|
|
36 |
+ lease.payload.Unpack(action_digest)
|
|
37 |
+ |
|
38 |
+ action = remote_execution_pb2.Action()
|
|
39 |
+ |
|
40 |
+ action = parse_to_pb2_from_fetch(action, stub_bytestream, action_digest, instance_name)
|
|
41 |
+ |
|
42 |
+ with tempfile.TemporaryDirectory() as temp_dir:
|
|
43 |
+ |
|
44 |
+ command = remote_execution_pb2.Command()
|
|
45 |
+ command = parse_to_pb2_from_fetch(command, stub_bytestream, action.command_digest, instance_name)
|
|
46 |
+ |
|
47 |
+ arguments = "cd {} &&".format(temp_dir)
|
|
48 |
+ |
|
49 |
+ for argument in command.arguments:
|
|
50 |
+ arguments += " {}".format(argument)
|
|
51 |
+ |
|
52 |
+ context.logger.info(arguments)
|
|
53 |
+ |
|
54 |
+ write_fetch_directory(temp_dir, stub_bytestream, action.input_root_digest, instance_name)
|
|
55 |
+ |
|
56 |
+ proc = subprocess.Popen(arguments,
|
|
57 |
+ shell=True,
|
|
58 |
+ stdin=subprocess.PIPE,
|
|
59 |
+ stdout=subprocess.PIPE)
|
|
60 |
+ std_out, std_error = proc.communicate()
|
|
61 |
+ |
|
62 |
+ result = remote_execution_pb2.ActionResult()
|
|
63 |
+ requests = []
|
|
64 |
+ for output_file in command.output_files:
|
|
65 |
+ path = os.path.join(temp_dir, output_file)
|
|
66 |
+ |
|
67 |
+ chunk = read_file(path)
|
|
68 |
+ digest = create_digest(chunk)
|
|
69 |
+ |
|
70 |
+ result.output_files.extend([remote_execution_pb2.OutputFile(path=output_file,
|
|
71 |
+ digest=digest)])
|
|
72 |
+ |
|
73 |
+ requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
|
|
74 |
+ digest=digest, data=chunk))
|
|
75 |
+ |
|
76 |
+ request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
|
|
77 |
+ requests=requests)
|
|
78 |
+ |
|
79 |
+ stub_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel)
|
|
80 |
+ stub_cas.BatchUpdateBlobs(request)
|
|
81 |
+ |
|
82 |
+ result_any = any_pb2.Any()
|
|
83 |
+ result_any.Pack(result)
|
|
84 |
+ |
|
85 |
+ lease.result.CopyFrom(result_any)
|
|
86 |
+ |
|
87 |
+ return lease
|
... | ... | @@ -26,22 +26,17 @@ import asyncio |
26 | 26 |
import click
|
27 | 27 |
import grpc
|
28 | 28 |
import logging
|
29 |
-import os
|
|
30 |
-import random
|
|
31 |
-import subprocess
|
|
32 |
-import tempfile
|
|
33 | 29 |
|
34 | 30 |
from pathlib import Path, PurePath
|
35 | 31 |
|
36 | 32 |
from buildgrid.bot import bot, bot_interface
|
37 | 33 |
from buildgrid.bot.bot_session import BotSession, Device, Worker
|
38 |
-from buildgrid._exceptions import BotError
|
|
39 | 34 |
|
35 |
+from ..bots import buildbox, dummy, temp_directory
|
|
40 | 36 |
from ..cli import pass_context
|
41 | 37 |
|
42 |
-from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
|
|
43 | 38 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
44 |
-from google.protobuf import any_pb2
|
|
39 |
+ |
|
45 | 40 |
|
46 | 41 |
@click.group(short_help = 'Create a bot client')
|
47 | 42 |
@click.option('--parent', default='bgd_test')
|
... | ... | @@ -54,6 +49,7 @@ def cli(context, host, port, parent): |
54 | 49 |
|
55 | 50 |
context.logger = logging.getLogger(__name__)
|
56 | 51 |
context.logger.info("Starting on port {}".format(port))
|
52 |
+ context.channel = channel
|
|
57 | 53 |
|
58 | 54 |
worker = Worker()
|
59 | 55 |
worker.add_device(Device())
|
... | ... | @@ -63,22 +59,41 @@ def cli(context, host, port, parent): |
63 | 59 |
|
64 | 60 |
context.bot_session = bot_session
|
65 | 61 |
|
66 |
-@cli.command('dummy', short_help='Create a dummy bot session')
|
|
62 |
+ |
|
63 |
+@cli.command('dummy', short_help='Create a dummy bot session which just returns lease')
|
|
67 | 64 |
@pass_context
|
68 |
-def dummy(context):
|
|
65 |
+def run_dummy(context):
|
|
69 | 66 |
"""
|
70 | 67 |
Simple dummy client. Creates a session, accepts leases, does fake work and
|
71 | 68 |
updates the server.
|
72 | 69 |
"""
|
73 | 70 |
try:
|
74 | 71 |
b = bot.Bot(context.bot_session)
|
75 |
- b.session(_work_dummy,
|
|
72 |
+ b.session(dummy.work_dummy,
|
|
73 |
+ context)
|
|
74 |
+ |
|
75 |
+ except KeyboardInterrupt:
|
|
76 |
+ pass
|
|
77 |
+ |
|
78 |
+ |
|
79 |
+@cli.command('temp-directory', short_help='Runs commands in temp directory and uploads results')
|
|
80 |
+@click.option('--instance-name', default='testing')
|
|
81 |
+@pass_context
|
|
82 |
+def run_temp_directory(context, instance_name):
|
|
83 |
+ """ Downloads files and command from CAS and runs
|
|
84 |
+ in a temp directory, uploading result back to CAS
|
|
85 |
+ """
|
|
86 |
+ context.instance_name = instance_name
|
|
87 |
+ try:
|
|
88 |
+ b = bot.Bot(context.bot_session)
|
|
89 |
+ b.session(temp_directory.work_temp_directory,
|
|
76 | 90 |
context)
|
77 | 91 |
|
78 | 92 |
except KeyboardInterrupt:
|
79 | 93 |
pass
|
80 | 94 |
|
81 |
-@cli.command('buildbox', short_help='Create a bot session with busybox')
|
|
95 |
+ |
|
96 |
+@cli.command('buildbox', short_help='Create a bot session with buildbox')
|
|
82 | 97 |
@click.option('--fuse-dir', show_default = True, default=str(PurePath(Path.home(), 'fuse')))
|
83 | 98 |
@click.option('--local-cas', show_default = True, default=str(PurePath(Path.home(), 'cas')))
|
84 | 99 |
@click.option('--client-cert', show_default = True, default=str(PurePath(Path.home(), 'client.crt')))
|
... | ... | @@ -87,7 +102,7 @@ def dummy(context): |
87 | 102 |
@click.option('--port', show_default = True, default=11001)
|
88 | 103 |
@click.option('--remote', show_default = True, default='localhost')
|
89 | 104 |
@pass_context
|
90 |
-def work_buildbox(context, remote, port, server_cert, client_key, client_cert, local_cas, fuse_dir):
|
|
105 |
+def run_buildbox(context, remote, port, server_cert, client_key, client_cert, local_cas, fuse_dir):
|
|
91 | 106 |
"""
|
92 | 107 |
Uses BuildBox to run commands.
|
93 | 108 |
"""
|
... | ... | @@ -103,104 +118,10 @@ def work_buildbox(context, remote, port, server_cert, client_key, client_cert, l |
103 | 118 |
context.fuse_dir = fuse_dir
|
104 | 119 |
|
105 | 120 |
try:
|
106 |
- b = bot.Bot(work=_work_buildbox,
|
|
107 |
- bot_session=context.bot_session,
|
|
108 |
- channel=context.channel,
|
|
109 |
- parent=context.parent)
|
|
110 | 121 |
|
111 |
- b.session(context.parent,
|
|
112 |
- _work_buildbox,
|
|
122 |
+ b = bot.Bot(context.bot_session)
|
|
123 |
+ b.session(buildbox.work_buildbox,
|
|
113 | 124 |
context)
|
114 | 125 |
|
115 | 126 |
except KeyboardInterrupt:
|
116 | 127 |
pass
|
117 |
- |
|
118 |
-async def _work_dummy(context, lease):
|
|
119 |
- await asyncio.sleep(random.randint(1,5))
|
|
120 |
- return lease
|
|
121 |
- |
|
122 |
-async def _work_buildbox(context, lease):
|
|
123 |
- logger = context.logger
|
|
124 |
- |
|
125 |
- action_any = lease.payload
|
|
126 |
- action = remote_execution_pb2.Action()
|
|
127 |
- action_any.Unpack(action)
|
|
128 |
- |
|
129 |
- cert_server = _file_read(context.server_cert)
|
|
130 |
- cert_client = _file_read(context.client_cert)
|
|
131 |
- key_client = _file_read(context.client_key)
|
|
132 |
- |
|
133 |
- # create server credentials
|
|
134 |
- credentials = grpc.ssl_channel_credentials(root_certificates=cert_server,
|
|
135 |
- private_key=key_client,
|
|
136 |
- certificate_chain=cert_client)
|
|
137 |
- |
|
138 |
- channel = grpc.secure_channel('{}:{}'.format(context.remote, context.port), credentials)
|
|
139 |
- |
|
140 |
- stub = bytestream_pb2_grpc.ByteStreamStub(channel)
|
|
141 |
- |
|
142 |
- remote_command = _fetch_command(context.local_cas, stub, action.command_digest)
|
|
143 |
- environment = dict((x.name, x.value) for x in remote_command.environment_variables)
|
|
144 |
- logger.debug("command hash: {}".format(action.command_digest.hash))
|
|
145 |
- logger.debug("vdir hash: {}".format(action.input_root_digest.hash))
|
|
146 |
- logger.debug("\n{}".format(' '.join(remote_command.arguments)))
|
|
147 |
- |
|
148 |
- command = ['buildbox',
|
|
149 |
- '--remote={}'.format('https://{}:{}'.format(context.remote, context.port)),
|
|
150 |
- '--server-cert={}'.format(context.server_cert),
|
|
151 |
- '--client-key={}'.format(context.client_key),
|
|
152 |
- '--client-cert={}'.format(context.client_cert),
|
|
153 |
- '--local={}'.format(context.local_cas),
|
|
154 |
- '--chdir={}'.format(environment['PWD']),
|
|
155 |
- context.fuse_dir]
|
|
156 |
- |
|
157 |
- command.extend(remote_command.arguments)
|
|
158 |
- |
|
159 |
- logger.debug(' '.join(command))
|
|
160 |
- logger.debug("Input root digest:\n{}".format(action.input_root_digest))
|
|
161 |
- logger.info("Launching process")
|
|
162 |
- |
|
163 |
- proc = subprocess.Popen(command,
|
|
164 |
- stdin=subprocess.PIPE,
|
|
165 |
- stdout=subprocess.PIPE)
|
|
166 |
- std_send = action.input_root_digest.SerializeToString()
|
|
167 |
- std_out, std_error = proc.communicate(std_send)
|
|
168 |
- |
|
169 |
- output_root_digest = remote_execution_pb2.Digest()
|
|
170 |
- output_root_digest.ParseFromString(std_out)
|
|
171 |
- logger.debug("Output root digest: {}".format(output_root_digest))
|
|
172 |
- |
|
173 |
- output_file = remote_execution_pb2.OutputDirectory(tree_digest = output_root_digest)
|
|
174 |
- |
|
175 |
- action_result = remote_execution_pb2.ActionResult()
|
|
176 |
- action_result.output_directories.extend([output_file])
|
|
177 |
- |
|
178 |
- action_result_any = any_pb2.Any()
|
|
179 |
- action_result_any.Pack(action_result)
|
|
180 |
- |
|
181 |
- lease.result.CopyFrom(action_result_any)
|
|
182 |
- |
|
183 |
- return lease
|
|
184 |
- |
|
185 |
-def _fetch_blob(remote, digest, out):
|
|
186 |
- resource_name = os.path.join(digest.hash, str(digest.size_bytes))
|
|
187 |
- request = bytestream_pb2.ReadRequest()
|
|
188 |
- request.resource_name = resource_name
|
|
189 |
- request.read_offset = 0
|
|
190 |
- for response in remote.Read(request):
|
|
191 |
- out.write(response.data)
|
|
192 |
- |
|
193 |
- out.flush()
|
|
194 |
- assert digest.size_bytes == os.fstat(out.fileno()).st_size
|
|
195 |
- |
|
196 |
-def _fetch_command(casdir, remote, digest):
|
|
197 |
- with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as out:
|
|
198 |
- _fetch_blob(remote, digest, out)
|
|
199 |
- remote_command = remote_execution_pb2.Command()
|
|
200 |
- with open(out.name, 'rb') as f:
|
|
201 |
- remote_command.ParseFromString(f.read())
|
|
202 |
- return remote_command
|
|
203 |
- |
|
204 |
-def _file_read(file_path):
|
|
205 |
- with open(file_path, 'rb') as f:
|
|
206 |
- return f.read()
|
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+ |
|
16 |
+"""
|
|
17 |
+Execute command
|
|
18 |
+=================
|
|
19 |
+ |
|
20 |
+Request work to be executed and monitor status of jobs.
|
|
21 |
+"""
|
|
22 |
+ |
|
23 |
+import click
|
|
24 |
+import grpc
|
|
25 |
+import logging
|
|
26 |
+import os
|
|
27 |
+import sys
|
|
28 |
+ |
|
29 |
+from buildgrid.utils import merkle_maker, read_file, create_digest
|
|
30 |
+from ..cli import pass_context
|
|
31 |
+ |
|
32 |
+from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
|
33 |
+ |
|
34 |
+@click.group(short_help = "Interact with the CAS")
|
|
35 |
+@click.option('--port', default='50051')
|
|
36 |
+@click.option('--host', default='localhost')
|
|
37 |
+@pass_context
|
|
38 |
+def cli(context, host, port):
|
|
39 |
+ context.logger = logging.getLogger(__name__)
|
|
40 |
+ context.logger.info("Starting on port {}".format(port))
|
|
41 |
+ |
|
42 |
+ context.channel = grpc.insecure_channel('{}:{}'.format(host, port))
|
|
43 |
+ context.port = port
|
|
44 |
+ |
|
45 |
+@cli.command('upload-files', short_help='Upload files')
|
|
46 |
+@click.argument('files', nargs=-1, type=click.File('rb'))
|
|
47 |
+@click.option('--instance-name', default='testing')
|
|
48 |
+@pass_context
|
|
49 |
+def upload_files(context, files, instance_name):
|
|
50 |
+ stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel)
|
|
51 |
+ |
|
52 |
+ requests = []
|
|
53 |
+ for file in files:
|
|
54 |
+ chunk = file.read()
|
|
55 |
+ requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
|
|
56 |
+ digest=create_digest(chunk), data=chunk))
|
|
57 |
+ |
|
58 |
+ request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
|
|
59 |
+ requests=requests)
|
|
60 |
+ |
|
61 |
+ context.logger.info("Sending: {}".format(request))
|
|
62 |
+ response = stub.BatchUpdateBlobs(request)
|
|
63 |
+ context.logger.info("Response: {}".format(response))
|
|
64 |
+ |
|
65 |
+@cli.command('upload-dir', short_help='Upload files')
|
|
66 |
+@click.argument('dir')
|
|
67 |
+@click.option('--instance-name', default='testing')
|
|
68 |
+@pass_context
|
|
69 |
+def upload_dir(context, dir, instance_name):
|
|
70 |
+ context.logger.info("Uploading directory to cas")
|
|
71 |
+ stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel)
|
|
72 |
+ |
|
73 |
+ requests = []
|
|
74 |
+ |
|
75 |
+ for chunk, file_digest in merkle_maker(dir):
|
|
76 |
+ requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
|
|
77 |
+ digest=file_digest, data=chunk))
|
|
78 |
+ |
|
79 |
+ request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
|
|
80 |
+ requests=requests)
|
|
81 |
+ |
|
82 |
+ context.logger.info("Request:\n{}".format(request))
|
|
83 |
+ response = stub.BatchUpdateBlobs(request)
|
|
84 |
+ context.logger.info("Response:\n{}".format(response))
|
... | ... | @@ -25,16 +25,22 @@ Request work to be executed and monitor status of jobs. |
25 | 25 |
import click
|
26 | 26 |
import grpc
|
27 | 27 |
import logging
|
28 |
-import sys
|
|
29 |
-import time
|
|
28 |
+import stat
|
|
29 |
+import os
|
|
30 | 30 |
|
31 |
+from buildgrid.utils import merkle_maker, read_file, create_digest, write_fetch_blob
|
|
31 | 32 |
from ..cli import pass_context
|
32 | 33 |
|
34 |
+from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
|
|
35 |
+ |
|
36 |
+from buildgrid.settings import HASH
|
|
37 |
+ |
|
33 | 38 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
34 | 39 |
from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ExecuteOperationMetadata
|
35 | 40 |
from buildgrid._protos.google.longrunning import operations_pb2, operations_pb2_grpc
|
36 | 41 |
from google.protobuf import any_pb2
|
37 | 42 |
|
43 |
+ |
|
38 | 44 |
@click.group(short_help = "Simple execute client")
|
39 | 45 |
@click.option('--port', default='50051')
|
40 | 46 |
@click.option('--host', default='localhost')
|
... | ... | @@ -46,12 +52,13 @@ def cli(context, host, port): |
46 | 52 |
context.channel = grpc.insecure_channel('{}:{}'.format(host, port))
|
47 | 53 |
context.port = port
|
48 | 54 |
|
49 |
-@cli.command('request', short_help='Send a dummy action')
|
|
55 |
+ |
|
56 |
+@cli.command('request-dummy', short_help='Send a dummy action')
|
|
50 | 57 |
@click.option('--number', default=1)
|
51 | 58 |
@click.option('--instance-name', default='testing')
|
52 |
-@click.option('--wait-for-completion', is_flag=True)
|
|
59 |
+@click.option('--wait-for-completion', is_flag=True, help='Stream updates until jobs are completed')
|
|
53 | 60 |
@pass_context
|
54 |
-def request(context, number, instance_name, wait_for_completion):
|
|
61 |
+def request_dummy(context, number, instance_name, wait_for_completion):
|
|
55 | 62 |
action_digest = remote_execution_pb2.Digest()
|
56 | 63 |
|
57 | 64 |
context.logger.info("Sending execution request...\n")
|
... | ... | @@ -71,6 +78,7 @@ def request(context, number, instance_name, wait_for_completion): |
71 | 78 |
else:
|
72 | 79 |
context.logger.info(next(response))
|
73 | 80 |
|
81 |
+ |
|
74 | 82 |
@cli.command('status', short_help='Get the status of an operation')
|
75 | 83 |
@click.argument('operation-name')
|
76 | 84 |
@pass_context
|
... | ... | @@ -83,6 +91,7 @@ def operation_status(context, operation_name): |
83 | 91 |
response = stub.GetOperation(request)
|
84 | 92 |
context.logger.info(response)
|
85 | 93 |
|
94 |
+ |
|
86 | 95 |
@cli.command('list', short_help='List operations')
|
87 | 96 |
@pass_context
|
88 | 97 |
def list_operations(context):
|
... | ... | @@ -100,6 +109,7 @@ def list_operations(context): |
100 | 109 |
for op in response.operations:
|
101 | 110 |
context.logger.info(op)
|
102 | 111 |
|
112 |
+ |
|
103 | 113 |
@cli.command('wait', short_help='Streams an operation until it is complete')
|
104 | 114 |
@click.argument('operation-name')
|
105 | 115 |
@pass_context
|
... | ... | @@ -111,3 +121,81 @@ def wait_execution(context, operation_name): |
111 | 121 |
|
112 | 122 |
for stream in response:
|
113 | 123 |
context.logger.info(stream)
|
124 |
+ |
|
125 |
+@cli.command('command', short_help='Send a command to be executed')
|
|
126 |
+@click.argument('input-root')
|
|
127 |
+@click.argument('commands', nargs=-1)
|
|
128 |
+@click.option('--output-file', nargs=2, type=(str, bool), multiple=True, help='{Expected output file, is_executeable flag}')
|
|
129 |
+@click.option('--output-directory', default='testing', help='Output directory for output files')
|
|
130 |
+@click.option('--instance-name', default='testing')
|
|
131 |
+@pass_context
|
|
132 |
+def command(context, input_root, commands, output_file, output_directory, instance_name):
|
|
133 |
+ stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
|
|
134 |
+ |
|
135 |
+ command = remote_execution_pb2.Command()
|
|
136 |
+ |
|
137 |
+ for arg in commands:
|
|
138 |
+ command.arguments.extend([arg])
|
|
139 |
+ |
|
140 |
+ output_executeables = []
|
|
141 |
+ for file, is_executeable in output_file:
|
|
142 |
+ command.output_files.extend([file])
|
|
143 |
+ output_executeables.append(file)
|
|
144 |
+ |
|
145 |
+ command_digest = create_digest(command.SerializeToString())
|
|
146 |
+ context.logger.info(command_digest)
|
|
147 |
+ |
|
148 |
+ # TODO: Check for missing blobs
|
|
149 |
+ digest = None
|
|
150 |
+ for _, digest in merkle_maker(input_root):
|
|
151 |
+ pass
|
|
152 |
+ |
|
153 |
+ action = remote_execution_pb2.Action(command_digest=command_digest,
|
|
154 |
+ input_root_digest=digest,
|
|
155 |
+ do_not_cache=True)
|
|
156 |
+ |
|
157 |
+ action_digest = create_digest(action.SerializeToString())
|
|
158 |
+ |
|
159 |
+ context.logger.info("Sending execution request...\n")
|
|
160 |
+ |
|
161 |
+ requests = []
|
|
162 |
+ requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
|
|
163 |
+ digest=command_digest, data=command.SerializeToString()))
|
|
164 |
+ |
|
165 |
+ requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
|
|
166 |
+ digest=action_digest, data=action.SerializeToString()))
|
|
167 |
+ |
|
168 |
+ request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
|
|
169 |
+ requests=requests)
|
|
170 |
+ remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel).BatchUpdateBlobs(request)
|
|
171 |
+ |
|
172 |
+ request = remote_execution_pb2.ExecuteRequest(instance_name = instance_name,
|
|
173 |
+ action_digest = action_digest,
|
|
174 |
+ skip_cache_lookup = True)
|
|
175 |
+ response = stub.Execute(request)
|
|
176 |
+ |
|
177 |
+ stub = bytestream_pb2_grpc.ByteStreamStub(context.channel)
|
|
178 |
+ |
|
179 |
+ stream = None
|
|
180 |
+ for stream in response:
|
|
181 |
+ context.logger.info(stream)
|
|
182 |
+ execute_response = remote_execution_pb2.ExecuteResponse()
|
|
183 |
+ stream.response.Unpack(execute_response)
|
|
184 |
+ for output_file in execute_response.result.output_files:
|
|
185 |
+ path = os.path.join(output_directory, output_file.path)
|
|
186 |
+ |
|
187 |
+ if not os.path.exists(os.path.dirname(path)):
|
|
188 |
+ |
|
189 |
+ try:
|
|
190 |
+ os.makedirs(os.path.dirname(path))
|
|
191 |
+ |
|
192 |
+ except OSError as exc:
|
|
193 |
+ if exc.errno != errno.EEXIST:
|
|
194 |
+ raise
|
|
195 |
+ |
|
196 |
+ with open(path, 'wb+') as f:
|
|
197 |
+ write_fetch_blob(f, stub, output_file.digest, instance_name)
|
|
198 |
+ |
|
199 |
+ if output_file.path in output_executeables:
|
|
200 |
+ st = os.stat(path)
|
|
201 |
+ os.chmod(path, st.st_mode | stat.S_IXUSR)
|
... | ... | @@ -83,14 +83,15 @@ class BotSession: |
83 | 83 |
self._update_lease_from_server(lease)
|
84 | 84 |
|
85 | 85 |
def update_bot_session(self):
|
86 |
+ self.logger.debug("Updating bot session: {}".format(self._bot_id))
|
|
86 | 87 |
session = self._interface.update_bot_session(self.get_pb2())
|
87 |
- for lease in session.leases:
|
|
88 |
- self._update_lease_from_server(lease)
|
|
89 |
- |
|
90 |
- for k, v in self._leases.items():
|
|
88 |
+ for k, v in list(self._leases.items()):
|
|
91 | 89 |
if v.state == LeaseState.COMPLETED.value:
|
92 | 90 |
del self._leases[k]
|
93 | 91 |
|
92 |
+ for lease in session.leases:
|
|
93 |
+ self._update_lease_from_server(lease)
|
|
94 |
+ |
|
94 | 95 |
def get_pb2(self):
|
95 | 96 |
leases = list(self._leases.values())
|
96 | 97 |
if not leases:
|
... | ... | @@ -114,12 +115,16 @@ class BotSession: |
114 | 115 |
lease_bot = self._leases.get(lease.id)
|
115 | 116 |
if lease.state == LeaseState.PENDING.value:
|
116 | 117 |
lease.state = LeaseState.ACTIVE.value
|
117 |
- asyncio.ensure_future(self.create_work(lease))
|
|
118 | 118 |
self._leases[lease.id] = lease
|
119 |
+ self.update_bot_session()
|
|
120 |
+ asyncio.ensure_future(self.create_work(lease))
|
|
119 | 121 |
|
120 | 122 |
async def create_work(self, lease):
|
121 | 123 |
self.logger.debug("Work created: {}".format(lease.id))
|
122 |
- lease = await self._work(self._context, lease)
|
|
124 |
+ |
|
125 |
+ loop = asyncio.get_event_loop()
|
|
126 |
+ lease = await loop.run_in_executor(None, self._work, self._context, lease)
|
|
127 |
+ |
|
123 | 128 |
self.logger.debug("Work complete: {}".format(lease.id))
|
124 | 129 |
self.lease_completed(lease)
|
125 | 130 |
|
... | ... | @@ -20,7 +20,7 @@ import uuid |
20 | 20 |
|
21 | 21 |
from enum import Enum
|
22 | 22 |
|
23 |
-from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ExecuteOperationMetadata, ExecuteResponse
|
|
23 |
+from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ActionResult, ExecuteOperationMetadata, ExecuteResponse
|
|
24 | 24 |
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2, worker_pb2
|
25 | 25 |
from buildgrid._protos.google.longrunning import operations_pb2
|
26 | 26 |
from google.protobuf import any_pb2
|
... | ... | @@ -52,13 +52,13 @@ class Job(): |
52 | 52 |
def __init__(self, action_digest, do_not_cache=False, message_queue=None):
|
53 | 53 |
self.lease = None
|
54 | 54 |
self.logger = logging.getLogger(__name__)
|
55 |
+ self.n_tries = 0
|
|
55 | 56 |
self.result = None
|
56 | 57 |
self.result_cached = False
|
57 | 58 |
|
58 | 59 |
self._action_digest = action_digest
|
59 | 60 |
self._do_not_cache = do_not_cache
|
60 | 61 |
self._execute_stage = ExecuteStage.UNKNOWN
|
61 |
- self._n_tries = 0
|
|
62 | 62 |
self._name = str(uuid.uuid4())
|
63 | 63 |
self._operation = operations_pb2.Operation(name = self._name)
|
64 | 64 |
self._operation_update_queues = []
|
... | ... | @@ -94,9 +94,10 @@ class Job(): |
94 | 94 |
self._operation.metadata.CopyFrom(self._pack_any(self.get_operation_meta()))
|
95 | 95 |
if self.result is not None:
|
96 | 96 |
self._operation.done = True
|
97 |
- response = ExecuteResponse()
|
|
98 |
- self.result.Unpack(response.result)
|
|
99 |
- response.cached_result = self.result_cached
|
|
97 |
+ action_result = ActionResult()
|
|
98 |
+ self.result.Unpack(action_result)
|
|
99 |
+ response = ExecuteResponse(result=action_result,
|
|
100 |
+ cached_result = self.result_cached)
|
|
100 | 101 |
self._operation.response.CopyFrom(self._pack_any(response))
|
101 | 102 |
|
102 | 103 |
return self._operation
|
... | ... | @@ -31,7 +31,7 @@ from .job import ExecuteStage, LeaseState |
31 | 31 |
|
32 | 32 |
class Scheduler():
|
33 | 33 |
|
34 |
- MAX_N_TRIES = 5
|
|
34 |
+ MAX_N_TRIES = 100
|
|
35 | 35 |
|
36 | 36 |
def __init__(self, action_cache=None):
|
37 | 37 |
self.action_cache = action_cache
|
... | ... | @@ -62,7 +62,8 @@ class Scheduler(): |
62 | 62 |
job.update_execute_stage(ExecuteStage.QUEUED)
|
63 | 63 |
|
64 | 64 |
def retry_job(self, name):
|
65 |
- if job in self.jobs[name]:
|
|
65 |
+ job = self.jobs.get(name)
|
|
66 |
+ if job is not None:
|
|
66 | 67 |
if job.n_tries >= self.MAX_N_TRIES:
|
67 | 68 |
# TODO: Decide what to do with these jobs
|
68 | 69 |
job.update_execute_stage(ExecuteStage.COMPLETED)
|
... | ... | @@ -80,9 +81,7 @@ class Scheduler(): |
80 | 81 |
job.update_execute_stage(ExecuteStage.COMPLETED)
|
81 | 82 |
self.jobs[name] = job
|
82 | 83 |
if not job.do_not_cache and self.action_cache is not None:
|
83 |
- action_result = ActionResult()
|
|
84 |
- result.Unpack(action_result)
|
|
85 |
- self.action_cache.put_action_result(job.action_digest, action_result)
|
|
84 |
+ self.action_cache.put_action_result(job.action_digest, result)
|
|
86 | 85 |
|
87 | 86 |
def get_operations(self):
|
88 | 87 |
response = operations_pb2.ListOperationsResponse()
|
... | ... | @@ -91,7 +90,7 @@ class Scheduler(): |
91 | 90 |
return response
|
92 | 91 |
|
93 | 92 |
def update_job_lease_state(self, name, state):
|
94 |
- job = self.jobs.get(name)
|
|
93 |
+ job = self.jobs[name]
|
|
95 | 94 |
job.lease.state = state
|
96 | 95 |
self.jobs[name] = job
|
97 | 96 |
|
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+import os
|
|
16 |
+import pathlib
|
|
17 |
+ |
|
18 |
+from buildgrid.settings import HASH
|
|
19 |
+ |
|
20 |
+from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
|
21 |
+from buildgrid._protos.google.bytestream import bytestream_pb2
|
|
22 |
+ |
|
23 |
+def gen_fetch_blob(stub, digest, instance_name=""):
|
|
24 |
+ """ Generates byte stream from a fetch blob request
|
|
25 |
+ """
|
|
26 |
+ |
|
27 |
+ resource_name = os.path.join(instance_name,'blobs',digest.hash, str(digest.size_bytes))
|
|
28 |
+ request = bytestream_pb2.ReadRequest(resource_name=resource_name,
|
|
29 |
+ read_offset=0)
|
|
30 |
+ for response in stub.Read(request):
|
|
31 |
+ yield response.data
|
|
32 |
+ |
|
33 |
+def write_fetch_directory(dir, stub, digest, instance_name=""):
|
|
34 |
+ """ Given a directory digest, fetches files and writes them to a directory
|
|
35 |
+ """
|
|
36 |
+ ## TODO: Extend to symlinks and inner directories
|
|
37 |
+ ## pathlib.Path('/my/directory').mkdir(parents=True, exist_ok=True)
|
|
38 |
+ |
|
39 |
+ directory = remote_execution_pb2.Directory()
|
|
40 |
+ directory = parse_to_pb2_from_fetch(directory, stub, digest, instance_name)
|
|
41 |
+ |
|
42 |
+ for file_node in directory.files:
|
|
43 |
+ path = os.path.join(dir, file_node.name)
|
|
44 |
+ with open(path, 'wb') as f:
|
|
45 |
+ write_fetch_blob(f, stub, file_node.digest, instance_name)
|
|
46 |
+ |
|
47 |
+def write_fetch_blob(out, stub, digest, instance_name=""):
|
|
48 |
+ """ Given an output buffer, fetches blob and writes to buffer
|
|
49 |
+ """
|
|
50 |
+ |
|
51 |
+ for stream in gen_fetch_blob(stub, digest, instance_name):
|
|
52 |
+ out.write(stream)
|
|
53 |
+ |
|
54 |
+ out.flush()
|
|
55 |
+ assert digest.size_bytes == os.fstat(out.fileno()).st_size
|
|
56 |
+ |
|
57 |
+def parse_to_pb2_from_fetch(pb2, stub, digest, instance_name=""):
|
|
58 |
+ """ Fetches stream and parses it into given pb2
|
|
59 |
+ """
|
|
60 |
+ |
|
61 |
+ bytes = b''
|
|
62 |
+ for stream in gen_fetch_blob(stub, digest, instance_name):
|
|
63 |
+ bytes += stream
|
|
64 |
+ |
|
65 |
+ pb2.ParseFromString(bytes)
|
|
66 |
+ return pb2
|
|
67 |
+ |
|
68 |
+def create_digest(bytes):
|
|
69 |
+ """ Creates a hash based on the hex digest and returns the digest
|
|
70 |
+ """
|
|
71 |
+ hash = HASH(bytes)
|
|
72 |
+ return remote_execution_pb2.Digest(hash = hash.hexdigest(),
|
|
73 |
+ size_bytes=len(bytes))
|
|
74 |
+ |
|
75 |
+def merkle_maker(dir):
|
|
76 |
+ """ Walks thorugh given directory, yielding the binary and digest
|
|
77 |
+ """
|
|
78 |
+ directory = remote_execution_pb2.Directory()
|
|
79 |
+ for (dir_path, dir_names, file_names) in os.walk(dir):
|
|
80 |
+ |
|
81 |
+ for file_name in file_names:
|
|
82 |
+ file_path = os.path.join(dir_path, file_name)
|
|
83 |
+ chunk = read_file(file_path)
|
|
84 |
+ file_digest = create_digest(chunk)
|
|
85 |
+ directory.files.extend([file_maker(file_path, file_digest)])
|
|
86 |
+ yield chunk, file_digest
|
|
87 |
+ |
|
88 |
+ for inner_dir in dir_names:
|
|
89 |
+ inner_dir_path = os.path.join(dir_path, inner_dir_path)
|
|
90 |
+ yield from _merkle_maker(inner_dir_path)
|
|
91 |
+ |
|
92 |
+ directory_string = directory.SerializeToString()
|
|
93 |
+ |
|
94 |
+ yield directory_string, create_digest(directory_string)
|
|
95 |
+ |
|
96 |
+def file_maker(file_path, file_digest):
|
|
97 |
+ """ Creates a File Node
|
|
98 |
+ """
|
|
99 |
+ _, file_name = os.path.split(file_path)
|
|
100 |
+ return remote_execution_pb2.FileNode(name=file_name,
|
|
101 |
+ digest=file_digest,
|
|
102 |
+ is_executable=os.access(file_path, os.X_OK))
|
|
103 |
+ |
|
104 |
+def read_file(file):
|
|
105 |
+ with open(file, 'rb') as f:
|
|
106 |
+ return f.read()
|
... | ... | @@ -91,12 +91,14 @@ def test_list_operations_with_result(instance, execute_request, context): |
91 | 91 |
action_result = remote_execution_pb2.ActionResult()
|
92 | 92 |
output_file = remote_execution_pb2.OutputFile(path = 'unicorn')
|
93 | 93 |
action_result.output_files.extend([output_file])
|
94 |
- instance._instance._scheduler.jobs[response_execute.name].result = _pack_any(action_result)
|
|
94 |
+ |
|
95 |
+ instance._instance._scheduler.job_complete(response_execute.name, _pack_any(action_result))
|
|
95 | 96 |
|
96 | 97 |
request = operations_pb2.ListOperationsRequest()
|
97 | 98 |
response = instance.ListOperations(request, context)
|
98 | 99 |
|
99 | 100 |
assert response.operations[0].name == response_execute.name
|
101 |
+ |
|
100 | 102 |
execute_response = remote_execution_pb2.ExecuteResponse()
|
101 | 103 |
response.operations[0].response.Unpack(execute_response)
|
102 | 104 |
assert execute_response.result == action_result
|