Martin Blanchard pushed to branch mablanch/61-bazel-support at BuildGrid / buildgrid
Commits:
-
1489472d
by finn at 2018-08-24T10:05:46Z
-
5738650f
by finn at 2018-08-24T10:05:46Z
-
943ef5e2
by Jim MacArthur at 2018-08-24T16:28:23Z
-
8d65751d
by Jim MacArthur at 2018-08-24T16:29:08Z
-
b8ea3fed
by Jim MacArthur at 2018-08-24T16:29:24Z
-
b97017d3
by Martin Blanchard at 2018-08-28T10:18:49Z
-
6cf604e7
by Martin Blanchard at 2018-08-28T10:18:49Z
-
4485c7ea
by Martin Blanchard at 2018-08-28T10:18:49Z
-
45014701
by Martin Blanchard at 2018-08-28T10:38:17Z
-
3dde4398
by Martin Blanchard at 2018-08-28T10:38:19Z
-
771b3b83
by Martin Blanchard at 2018-08-28T10:38:19Z
-
2b888d44
by Martin Blanchard at 2018-08-28T10:38:19Z
18 changed files:
- buildgrid/_app/bots/buildbox.py
- buildgrid/_app/bots/temp_directory.py
- buildgrid/_app/commands/cmd_bot.py
- buildgrid/_app/commands/cmd_cas.py
- buildgrid/_app/commands/cmd_execute.py
- buildgrid/_app/commands/cmd_server.py
- + buildgrid/server/buildgrid_instance.py
- buildgrid/server/build_grid_server.py → buildgrid/server/buildgrid_server.py
- buildgrid/server/execution/execution_instance.py
- buildgrid/server/execution/execution_service.py
- buildgrid/server/execution/operations_service.py
- buildgrid/server/scheduler.py
- buildgrid/server/worker/bots_interface.py
- buildgrid/server/worker/bots_service.py
- buildgrid/utils.py
- tests/integration/bots_service.py
- tests/integration/execution_service.py
- tests/integration/operations_service.py
Changes:
... | ... | @@ -21,16 +21,16 @@ import grpc |
21 | 21 |
from google.protobuf import any_pb2
|
22 | 22 |
|
23 | 23 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
24 |
-from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
|
|
25 |
-from buildgrid.utils import read_file
|
|
24 |
+from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
|
|
25 |
+from buildgrid.utils import read_file, parse_to_pb2_from_fetch
|
|
26 | 26 |
|
27 | 27 |
|
28 | 28 |
def work_buildbox(context, lease):
|
29 | 29 |
logger = context.logger
|
30 | 30 |
|
31 |
- action_any = lease.payload
|
|
32 |
- action = remote_execution_pb2.Action()
|
|
33 |
- action_any.Unpack(action)
|
|
31 |
+ action_digest_any = lease.payload
|
|
32 |
+ action_digest = remote_execution_pb2.Digest()
|
|
33 |
+ action_digest_any.Unpack(action_digest)
|
|
34 | 34 |
|
35 | 35 |
cert_server = read_file(context.server_cert)
|
36 | 36 |
cert_client = read_file(context.client_cert)
|
... | ... | @@ -45,38 +45,57 @@ def work_buildbox(context, lease): |
45 | 45 |
|
46 | 46 |
stub = bytestream_pb2_grpc.ByteStreamStub(channel)
|
47 | 47 |
|
48 |
- remote_command = _buildstream_fetch_command(context.local_cas, stub, action.command_digest)
|
|
48 |
+ action = remote_execution_pb2.Action()
|
|
49 |
+ parse_to_pb2_from_fetch(action, stub, action_digest)
|
|
50 |
+ |
|
51 |
+ casdir = context.local_cas
|
|
52 |
+ remote_command = remote_execution_pb2.Command()
|
|
53 |
+ parse_to_pb2_from_fetch(remote_command, stub, action.command_digest)
|
|
54 |
+ |
|
49 | 55 |
environment = dict((x.name, x.value) for x in remote_command.environment_variables)
|
50 | 56 |
logger.debug("command hash: {}".format(action.command_digest.hash))
|
51 | 57 |
logger.debug("vdir hash: {}".format(action.input_root_digest.hash))
|
52 | 58 |
logger.debug("\n{}".format(' '.join(remote_command.arguments)))
|
53 | 59 |
|
54 |
- command = ['buildbox',
|
|
55 |
- '--remote={}'.format('https://{}:{}'.format(context.remote, context.port)),
|
|
56 |
- '--server-cert={}'.format(context.server_cert),
|
|
57 |
- '--client-key={}'.format(context.client_key),
|
|
58 |
- '--client-cert={}'.format(context.client_cert),
|
|
59 |
- '--local={}'.format(context.local_cas),
|
|
60 |
- '--chdir={}'.format(environment['PWD']),
|
|
61 |
- context.fuse_dir]
|
|
62 |
- |
|
63 |
- command.extend(remote_command.arguments)
|
|
64 |
- |
|
65 |
- logger.debug(' '.join(command))
|
|
66 |
- logger.debug("Input root digest:\n{}".format(action.input_root_digest))
|
|
67 |
- logger.info("Launching process")
|
|
68 |
- |
|
69 |
- proc = subprocess.Popen(command,
|
|
70 |
- stdin=subprocess.PIPE,
|
|
71 |
- stdout=subprocess.PIPE)
|
|
72 |
- std_send = action.input_root_digest.SerializeToString()
|
|
73 |
- std_out, _ = proc.communicate(std_send)
|
|
74 |
- |
|
75 |
- output_root_digest = remote_execution_pb2.Digest()
|
|
76 |
- output_root_digest.ParseFromString(std_out)
|
|
77 |
- logger.debug("Output root digest: {}".format(output_root_digest))
|
|
78 |
- |
|
79 |
- output_file = remote_execution_pb2.OutputDirectory(tree_digest=output_root_digest)
|
|
60 |
+ # Input hash must be written to disk for buildbox.
|
|
61 |
+ os.makedirs(os.path.join(casdir, 'tmp'), exist_ok=True)
|
|
62 |
+ with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as input_digest_file:
|
|
63 |
+ with open(input_digest_file.name, 'wb') as f:
|
|
64 |
+ f.write(action.input_root_digest.SerializeToString())
|
|
65 |
+ f.flush()
|
|
66 |
+ |
|
67 |
+ with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as output_digest_file:
|
|
68 |
+ command = ['buildbox',
|
|
69 |
+ '--remote={}'.format('https://{}:{}'.format(context.remote, context.port)),
|
|
70 |
+ '--server-cert={}'.format(context.server_cert),
|
|
71 |
+ '--client-key={}'.format(context.client_key),
|
|
72 |
+ '--client-cert={}'.format(context.client_cert),
|
|
73 |
+ '--input-digest={}'.format(input_digest_file.name),
|
|
74 |
+ '--output-digest={}'.format(output_digest_file.name),
|
|
75 |
+ '--local={}'.format(casdir)]
|
|
76 |
+ if 'PWD' in environment and environment['PWD']:
|
|
77 |
+ command.append('--chdir={}'.format(environment['PWD']))
|
|
78 |
+ |
|
79 |
+ command.append(context.fuse_dir)
|
|
80 |
+ command.extend(remote_command.arguments)
|
|
81 |
+ |
|
82 |
+ logger.debug(' '.join(command))
|
|
83 |
+ logger.debug("Input root digest:\n{}".format(action.input_root_digest))
|
|
84 |
+ logger.info("Launching process")
|
|
85 |
+ |
|
86 |
+ proc = subprocess.Popen(command,
|
|
87 |
+ stdin=subprocess.PIPE,
|
|
88 |
+ stdout=subprocess.PIPE)
|
|
89 |
+ proc.communicate()
|
|
90 |
+ |
|
91 |
+ output_root_digest = remote_execution_pb2.Digest()
|
|
92 |
+ with open(output_digest_file.name, 'rb') as f:
|
|
93 |
+ output_root_digest.ParseFromString(f.read())
|
|
94 |
+ logger.debug("Output root digest: {}".format(output_root_digest))
|
|
95 |
+ |
|
96 |
+ if len(output_root_digest.hash) < 64:
|
|
97 |
+ logger.warning("Buildbox command failed - no output root digest present.")
|
|
98 |
+ output_file = remote_execution_pb2.OutputDirectory(tree_digest=output_root_digest)
|
|
80 | 99 |
|
81 | 100 |
action_result = remote_execution_pb2.ActionResult()
|
82 | 101 |
action_result.output_directories.extend([output_file])
|
... | ... | @@ -87,33 +106,3 @@ def work_buildbox(context, lease): |
87 | 106 |
lease.result.CopyFrom(action_result_any)
|
88 | 107 |
|
89 | 108 |
return lease
|
90 |
- |
|
91 |
- |
|
92 |
-def _buildstream_fetch_blob(remote, digest, out):
|
|
93 |
- resource_name = os.path.join(digest.hash, str(digest.size_bytes))
|
|
94 |
- request = bytestream_pb2.ReadRequest()
|
|
95 |
- request.resource_name = resource_name
|
|
96 |
- request.read_offset = 0
|
|
97 |
- for response in remote.Read(request):
|
|
98 |
- out.write(response.data)
|
|
99 |
- |
|
100 |
- out.flush()
|
|
101 |
- assert digest.size_bytes == os.fstat(out.fileno()).st_size
|
|
102 |
- |
|
103 |
- |
|
104 |
-def _buildstream_fetch_command(casdir, remote, digest):
|
|
105 |
- with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as out:
|
|
106 |
- _buildstream_fetch_blob(remote, digest, out)
|
|
107 |
- remote_command = remote_execution_pb2.Command()
|
|
108 |
- with open(out.name, 'rb') as f:
|
|
109 |
- remote_command.ParseFromString(f.read())
|
|
110 |
- return remote_command
|
|
111 |
- |
|
112 |
- |
|
113 |
-def _buildstream_fetch_action(casdir, remote, digest):
|
|
114 |
- with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as out:
|
|
115 |
- _buildstream_fetch_blob(remote, digest, out)
|
|
116 |
- remote_action = remote_execution_pb2.Action()
|
|
117 |
- with open(out.name, 'rb') as f:
|
|
118 |
- remote_action.ParseFromString(f.read())
|
|
119 |
- return remote_action
|
... | ... | @@ -19,7 +19,7 @@ import tempfile |
19 | 19 |
|
20 | 20 |
from google.protobuf import any_pb2
|
21 | 21 |
|
22 |
-from buildgrid.utils import read_file, create_digest, write_fetch_directory, parse_to_pb2_from_fetch
|
|
22 |
+from buildgrid.utils import output_file_maker, write_fetch_directory, parse_to_pb2_from_fetch
|
|
23 | 23 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
24 | 24 |
from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
|
25 | 25 |
|
... | ... | @@ -29,60 +29,86 @@ def work_temp_directory(context, lease): |
29 | 29 |
then uploads results back to CAS
|
30 | 30 |
"""
|
31 | 31 |
|
32 |
- instance_name = context.instance_name
|
|
32 |
+ instance_name = context.parent
|
|
33 | 33 |
stub_bytestream = bytestream_pb2_grpc.ByteStreamStub(context.channel)
|
34 | 34 |
|
35 | 35 |
action_digest = remote_execution_pb2.Digest()
|
36 | 36 |
lease.payload.Unpack(action_digest)
|
37 | 37 |
|
38 |
- action = remote_execution_pb2.Action()
|
|
38 |
+ action = parse_to_pb2_from_fetch(remote_execution_pb2.Action(),
|
|
39 |
+ stub_bytestream, action_digest, instance_name)
|
|
39 | 40 |
|
40 |
- action = parse_to_pb2_from_fetch(action, stub_bytestream, action_digest, instance_name)
|
|
41 |
+ with tempfile.TemporaryDirectory() as temp_directory:
|
|
42 |
+ command = parse_to_pb2_from_fetch(remote_execution_pb2.Command(),
|
|
43 |
+ stub_bytestream, action.command_digest, instance_name)
|
|
41 | 44 |
|
42 |
- with tempfile.TemporaryDirectory() as temp_dir:
|
|
45 |
+ write_fetch_directory(temp_directory, stub_bytestream,
|
|
46 |
+ action.input_root_digest, instance_name)
|
|
43 | 47 |
|
44 |
- command = remote_execution_pb2.Command()
|
|
45 |
- command = parse_to_pb2_from_fetch(command, stub_bytestream, action.command_digest, instance_name)
|
|
46 |
- |
|
47 |
- arguments = "cd {} &&".format(temp_dir)
|
|
48 |
+ execution_envionment = os.environ.copy()
|
|
49 |
+ for variable in command.environment_variables:
|
|
50 |
+ if variable.name not in ['PATH', 'PWD']:
|
|
51 |
+ execution_envionment[variable.name] = variable.value
|
|
48 | 52 |
|
53 |
+ command_arguments = list()
|
|
49 | 54 |
for argument in command.arguments:
|
50 |
- arguments += " {}".format(argument)
|
|
51 |
- |
|
52 |
- context.logger.info(arguments)
|
|
53 |
- |
|
54 |
- write_fetch_directory(temp_dir, stub_bytestream, action.input_root_digest, instance_name)
|
|
55 |
- |
|
56 |
- proc = subprocess.Popen(arguments,
|
|
57 |
- shell=True,
|
|
58 |
- stdin=subprocess.PIPE,
|
|
59 |
- stdout=subprocess.PIPE)
|
|
60 |
- |
|
61 |
- # TODO: Should return the std_out to the user
|
|
62 |
- proc.communicate()
|
|
63 |
- |
|
64 |
- result = remote_execution_pb2.ActionResult()
|
|
65 |
- requests = []
|
|
66 |
- for output_file in command.output_files:
|
|
67 |
- path = os.path.join(temp_dir, output_file)
|
|
68 |
- chunk = read_file(path)
|
|
69 |
- |
|
70 |
- digest = create_digest(chunk)
|
|
71 |
- |
|
72 |
- result.output_files.extend([remote_execution_pb2.OutputFile(path=output_file,
|
|
73 |
- digest=digest)])
|
|
74 |
- |
|
75 |
- requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
|
|
76 |
- digest=digest, data=chunk))
|
|
77 |
- |
|
78 |
- request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
|
|
79 |
- requests=requests)
|
|
55 |
+ command_arguments.append(argument.strip())
|
|
56 |
+ |
|
57 |
+ working_directory = None
|
|
58 |
+ if command.working_directory:
|
|
59 |
+ working_directory = os.path.join(temp_directory,
|
|
60 |
+ command.working_directory)
|
|
61 |
+ os.makedirs(working_directory, exist_ok=True)
|
|
62 |
+ else:
|
|
63 |
+ working_directory = temp_directory
|
|
64 |
+ |
|
65 |
+ # Ensure that output files structure exists:
|
|
66 |
+ for output_path in command.output_files:
|
|
67 |
+ directory_path = os.path.join(working_directory,
|
|
68 |
+ os.path.dirname(output_path))
|
|
69 |
+ os.makedirs(directory_path, exist_ok=True)
|
|
70 |
+ |
|
71 |
+ process = subprocess.Popen(command_arguments,
|
|
72 |
+ cwd=working_directory,
|
|
73 |
+ universal_newlines=True,
|
|
74 |
+ env=execution_envionment,
|
|
75 |
+ stdin=subprocess.PIPE,
|
|
76 |
+ stdout=subprocess.PIPE)
|
|
77 |
+ # TODO: Should return the stdout and stderr to the user.
|
|
78 |
+ process.communicate()
|
|
79 |
+ |
|
80 |
+ update_requests = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name)
|
|
81 |
+ action_result = remote_execution_pb2.ActionResult()
|
|
82 |
+ |
|
83 |
+ for output_path in command.output_files:
|
|
84 |
+ file_path = os.path.join(working_directory, output_path)
|
|
85 |
+ # Missing outputs should simply be omitted in ActionResult:
|
|
86 |
+ if not os.path.isfile(file_path):
|
|
87 |
+ continue
|
|
88 |
+ |
|
89 |
+ # OutputFile.path should be relative to the working direcory:
|
|
90 |
+ output_file, update_request = output_file_maker(file_path, working_directory)
|
|
91 |
+ |
|
92 |
+ action_result.output_files.extend([output_file])
|
|
93 |
+ update_requests.requests.extend([update_request])
|
|
94 |
+ |
|
95 |
+ for output_path in command.output_directories:
|
|
96 |
+ directory_path = os.path.join(working_directory, output_path)
|
|
97 |
+ # Missing outputs should simply be omitted in ActionResult:
|
|
98 |
+ if not os.path.isdir(directory_path):
|
|
99 |
+ continue
|
|
100 |
+ |
|
101 |
+ # OutputDirectory.path should be relative to the working direcory:
|
|
102 |
+ output_directory, update_request = output_directory_maker(directory_path, working_directory)
|
|
103 |
+ |
|
104 |
+ action_result.output_directories.extend([output_directory])
|
|
105 |
+ update_requests.requests.extend(update_request)
|
|
80 | 106 |
|
81 | 107 |
stub_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel)
|
82 |
- stub_cas.BatchUpdateBlobs(request)
|
|
108 |
+ stub_cas.BatchUpdateBlobs(update_requests)
|
|
83 | 109 |
|
84 | 110 |
result_any = any_pb2.Any()
|
85 |
- result_any.Pack(result)
|
|
111 |
+ result_any.Pack(action_result)
|
|
86 | 112 |
|
87 | 113 |
lease.result.CopyFrom(result_any)
|
88 | 114 |
|
... | ... | @@ -35,7 +35,7 @@ from ..cli import pass_context |
35 | 35 |
|
36 | 36 |
|
37 | 37 |
@click.group(name='bot', short_help="Create and register bot clients.")
|
38 |
-@click.option('--parent', type=click.STRING, default='bgd_test', show_default=True,
|
|
38 |
+@click.option('--parent', type=click.STRING, default='main', show_default=True,
|
|
39 | 39 |
help="Targeted farm resource.")
|
40 | 40 |
@click.option('--port', type=click.INT, default='50051', show_default=True,
|
41 | 41 |
help="Remote server's port number.")
|
... | ... | @@ -49,6 +49,7 @@ def cli(context, host, port, parent): |
49 | 49 |
context.logger = logging.getLogger(__name__)
|
50 | 50 |
context.logger.info("Starting on port {}".format(port))
|
51 | 51 |
context.channel = channel
|
52 |
+ context.parent = parent
|
|
52 | 53 |
|
53 | 54 |
worker = Worker()
|
54 | 55 |
worker.add_device(Device())
|
... | ... | @@ -75,14 +76,11 @@ def run_dummy(context): |
75 | 76 |
|
76 | 77 |
|
77 | 78 |
@cli.command('temp-directory', short_help="Runs commands in temp directory and uploads results.")
|
78 |
-@click.option('--instance-name', type=click.STRING, default='testing', show_default=True,
|
|
79 |
- help="Targeted farm instance name.")
|
|
80 | 79 |
@pass_context
|
81 |
-def run_temp_directory(context, instance_name):
|
|
80 |
+def run_temp_directory(context):
|
|
82 | 81 |
""" Downloads files and command from CAS and runs
|
83 | 82 |
in a temp directory, uploading result back to CAS
|
84 | 83 |
"""
|
85 |
- context.instance_name = instance_name
|
|
86 | 84 |
try:
|
87 | 85 |
b = bot.Bot(context.bot_session)
|
88 | 86 |
b.session(temp_directory.work_temp_directory,
|
... | ... | @@ -31,25 +31,26 @@ from ..cli import pass_context |
31 | 31 |
|
32 | 32 |
|
33 | 33 |
@click.group(name='cas', short_help="Interact with the CAS server.")
|
34 |
+@click.option('--instance-name', type=click.STRING, default='main', show_default=True,
|
|
35 |
+ help="Targeted farm instance name.")
|
|
34 | 36 |
@click.option('--port', type=click.INT, default='50051', show_default=True,
|
35 | 37 |
help="Remote server's port number.")
|
36 | 38 |
@click.option('--host', type=click.STRING, default='localhost', show_default=True,
|
37 | 39 |
help="Remote server's hostname.")
|
38 | 40 |
@pass_context
|
39 |
-def cli(context, host, port):
|
|
41 |
+def cli(context, instance_name, host, port):
|
|
40 | 42 |
context.logger = logging.getLogger(__name__)
|
41 | 43 |
context.logger.info("Starting on port {}".format(port))
|
42 | 44 |
|
45 |
+ context.instance_name = instance_name
|
|
43 | 46 |
context.channel = grpc.insecure_channel('{}:{}'.format(host, port))
|
44 | 47 |
context.port = port
|
45 | 48 |
|
46 | 49 |
|
47 | 50 |
@cli.command('upload-files', short_help="Upload files to the CAS server.")
|
48 |
-@click.option('--instance-name', type=click.STRING, default='testing', show_default=True,
|
|
49 |
- help="Targeted farm instance name.")
|
|
50 | 51 |
@click.argument('files', nargs=-1, type=click.File('rb'), required=True)
|
51 | 52 |
@pass_context
|
52 |
-def upload_files(context, files, instance_name):
|
|
53 |
+def upload_files(context, files):
|
|
53 | 54 |
stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel)
|
54 | 55 |
|
55 | 56 |
requests = []
|
... | ... | @@ -58,7 +59,7 @@ def upload_files(context, files, instance_name): |
58 | 59 |
requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
|
59 | 60 |
digest=create_digest(chunk), data=chunk))
|
60 | 61 |
|
61 |
- request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
|
|
62 |
+ request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=context.instance_name,
|
|
62 | 63 |
requests=requests)
|
63 | 64 |
|
64 | 65 |
context.logger.info("Sending: {}".format(request))
|
... | ... | @@ -67,11 +68,9 @@ def upload_files(context, files, instance_name): |
67 | 68 |
|
68 | 69 |
|
69 | 70 |
@cli.command('upload-dir', short_help="Upload a directory to the CAS server.")
|
70 |
-@click.option('--instance-name', type=click.STRING, default='testing', show_default=True,
|
|
71 |
- help="Targeted farm instance name.")
|
|
72 | 71 |
@click.argument('directory', nargs=1, type=click.Path(), required=True)
|
73 | 72 |
@pass_context
|
74 |
-def upload_dir(context, directory, instance_name):
|
|
73 |
+def upload_dir(context, directory):
|
|
75 | 74 |
context.logger.info("Uploading directory to cas")
|
76 | 75 |
stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel)
|
77 | 76 |
|
... | ... | @@ -81,7 +80,7 @@ def upload_dir(context, directory, instance_name): |
81 | 80 |
requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
|
82 | 81 |
digest=file_digest, data=chunk))
|
83 | 82 |
|
84 |
- request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
|
|
83 |
+ request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=context.instance_name,
|
|
85 | 84 |
requests=requests)
|
86 | 85 |
|
87 | 86 |
context.logger.info("Request:\n{}".format(request))
|
... | ... | @@ -36,34 +36,35 @@ from ..cli import pass_context |
36 | 36 |
|
37 | 37 |
|
38 | 38 |
@click.group(name='execute', short_help="Execute simple operations.")
|
39 |
+@click.option('--instance-name', type=click.STRING, default='main',
|
|
40 |
+ show_default=True, help="Targeted farm instance name.")
|
|
39 | 41 |
@click.option('--port', type=click.INT, default='50051', show_default=True,
|
40 | 42 |
help="Remote server's port number.")
|
41 | 43 |
@click.option('--host', type=click.STRING, default='localhost', show_default=True,
|
42 | 44 |
help="Remote server's hostname.")
|
43 | 45 |
@pass_context
|
44 |
-def cli(context, host, port):
|
|
46 |
+def cli(context, instance_name, host, port):
|
|
45 | 47 |
context.logger = logging.getLogger(__name__)
|
46 | 48 |
context.logger.info("Starting on port {}".format(port))
|
47 | 49 |
|
50 |
+ context.instance_name = instance_name
|
|
48 | 51 |
context.channel = grpc.insecure_channel('{}:{}'.format(host, port))
|
49 | 52 |
context.port = port
|
50 | 53 |
|
51 | 54 |
|
52 | 55 |
@cli.command('request-dummy', short_help="Send a dummy action.")
|
53 |
-@click.option('--instance-name', type=click.STRING, default='testing', show_default=True,
|
|
54 |
- help="Targeted farm instance name.")
|
|
55 | 56 |
@click.option('--number', type=click.INT, default=1, show_default=True,
|
56 | 57 |
help="Number of request to send.")
|
57 | 58 |
@click.option('--wait-for-completion', is_flag=True,
|
58 | 59 |
help="Stream updates until jobs are completed.")
|
59 | 60 |
@pass_context
|
60 |
-def request_dummy(context, number, instance_name, wait_for_completion):
|
|
61 |
+def request_dummy(context, number, wait_for_completion):
|
|
61 | 62 |
action_digest = remote_execution_pb2.Digest()
|
62 | 63 |
|
63 | 64 |
context.logger.info("Sending execution request...")
|
64 | 65 |
stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
|
65 | 66 |
|
66 |
- request = remote_execution_pb2.ExecuteRequest(instance_name=instance_name,
|
|
67 |
+ request = remote_execution_pb2.ExecuteRequest(instance_name=context.instance_name,
|
|
67 | 68 |
action_digest=action_digest,
|
68 | 69 |
skip_cache_lookup=True)
|
69 | 70 |
|
... | ... | @@ -98,7 +99,7 @@ def list_operations(context): |
98 | 99 |
context.logger.info("Getting list of operations")
|
99 | 100 |
stub = operations_pb2_grpc.OperationsStub(context.channel)
|
100 | 101 |
|
101 |
- request = operations_pb2.ListOperationsRequest()
|
|
102 |
+ request = operations_pb2.ListOperationsRequest(name=context.instance_name)
|
|
102 | 103 |
|
103 | 104 |
response = stub.ListOperations(request)
|
104 | 105 |
|
... | ... | @@ -115,7 +116,8 @@ def list_operations(context): |
115 | 116 |
@pass_context
|
116 | 117 |
def wait_execution(context, operation_name):
|
117 | 118 |
stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
|
118 |
- request = remote_execution_pb2.WaitExecutionRequest(name=operation_name)
|
|
119 |
+ request = remote_execution_pb2.WaitExecutionRequest(instance_name=context.instance_name,
|
|
120 |
+ name=operation_name)
|
|
119 | 121 |
|
120 | 122 |
response = stub.WaitExecution(request)
|
121 | 123 |
|
... | ... | @@ -124,8 +126,6 @@ def wait_execution(context, operation_name): |
124 | 126 |
|
125 | 127 |
|
126 | 128 |
@cli.command('command', short_help="Send a command to be executed.")
|
127 |
-@click.option('--instance-name', type=click.STRING, default='testing', show_default=True,
|
|
128 |
- help="Targeted farm instance name.")
|
|
129 | 129 |
@click.option('--output-file', nargs=2, type=(click.STRING, click.BOOL), multiple=True,
|
130 | 130 |
help="Tuple of expected output file and is-executeable flag.")
|
131 | 131 |
@click.option('--output-directory', default='testing', show_default=True,
|
... | ... | @@ -133,7 +133,7 @@ def wait_execution(context, operation_name): |
133 | 133 |
@click.argument('input-root', nargs=1, type=click.Path(), required=True)
|
134 | 134 |
@click.argument('commands', nargs=-1, type=click.STRING, required=True)
|
135 | 135 |
@pass_context
|
136 |
-def command(context, input_root, commands, output_file, output_directory, instance_name):
|
|
136 |
+def command(context, input_root, commands, output_file, output_directory):
|
|
137 | 137 |
stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
|
138 | 138 |
|
139 | 139 |
execute_command = remote_execution_pb2.Command()
|
... | ... | @@ -170,11 +170,11 @@ def command(context, input_root, commands, output_file, output_directory, instan |
170 | 170 |
requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
|
171 | 171 |
digest=action_digest, data=action.SerializeToString()))
|
172 | 172 |
|
173 |
- request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
|
|
173 |
+ request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=context.instance_name,
|
|
174 | 174 |
requests=requests)
|
175 | 175 |
remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel).BatchUpdateBlobs(request)
|
176 | 176 |
|
177 |
- request = remote_execution_pb2.ExecuteRequest(instance_name=instance_name,
|
|
177 |
+ request = remote_execution_pb2.ExecuteRequest(instance_name=context.instance_name,
|
|
178 | 178 |
action_digest=action_digest,
|
179 | 179 |
skip_cache_lookup=True)
|
180 | 180 |
response = stub.Execute(request)
|
... | ... | @@ -201,7 +201,7 @@ def command(context, input_root, commands, output_file, output_directory, instan |
201 | 201 |
raise
|
202 | 202 |
|
203 | 203 |
with open(path, 'wb+') as f:
|
204 |
- write_fetch_blob(f, stub, output_file_response.digest, instance_name)
|
|
204 |
+ write_fetch_blob(f, stub, output_file_response.digest, context.instance_name)
|
|
205 | 205 |
|
206 | 206 |
if output_file_response.path in output_executeables:
|
207 | 207 |
st = os.stat(path)
|
... | ... | @@ -25,7 +25,7 @@ import logging |
25 | 25 |
|
26 | 26 |
import click
|
27 | 27 |
|
28 |
-from buildgrid.server import build_grid_server
|
|
28 |
+from buildgrid.server import buildgrid_server
|
|
29 | 29 |
from buildgrid.server.cas.storage.disk import DiskStorage
|
30 | 30 |
from buildgrid.server.cas.storage.lru_memory_cache import LRUMemoryCache
|
31 | 31 |
from buildgrid.server.cas.storage.s3 import S3Storage
|
... | ... | @@ -45,6 +45,7 @@ def cli(context): |
45 | 45 |
|
46 | 46 |
|
47 | 47 |
@cli.command('start', short_help="Setup a new server instance.")
|
48 |
+@click.argument('instances', nargs=-1, type=click.STRING)
|
|
48 | 49 |
@click.option('--port', type=click.INT, default='50051', show_default=True,
|
49 | 50 |
help="The port number to be listened.")
|
50 | 51 |
@click.option('--max-cached-actions', type=click.INT, default=50, show_default=True,
|
... | ... | @@ -67,7 +68,9 @@ def cli(context): |
67 | 68 |
@click.option('--cas-disk-directory', type=click.Path(file_okay=False, dir_okay=True, writable=True),
|
68 | 69 |
help="For --cas=disk, the folder to store CAS blobs in.")
|
69 | 70 |
@pass_context
|
70 |
-def start(context, port, max_cached_actions, allow_uar, cas, **cas_args):
|
|
71 |
+def start(context, instances, port, max_cached_actions, allow_uar, cas, **cas_args):
|
|
72 |
+ """ Starts a BuildGrid server.
|
|
73 |
+ """
|
|
71 | 74 |
context.logger.info("Starting on port {}".format(port))
|
72 | 75 |
|
73 | 76 |
cas_storage = _make_cas_storage(context, cas, cas_args)
|
... | ... | @@ -79,9 +82,13 @@ def start(context, port, max_cached_actions, allow_uar, cas, **cas_args): |
79 | 82 |
else:
|
80 | 83 |
action_cache = ActionCache(cas_storage, max_cached_actions, allow_uar)
|
81 | 84 |
|
82 |
- server = build_grid_server.BuildGridServer(port,
|
|
83 |
- cas_storage=cas_storage,
|
|
84 |
- action_cache=action_cache)
|
|
85 |
+ if instances is None:
|
|
86 |
+ instances = ['main']
|
|
87 |
+ |
|
88 |
+ server = buildgrid_server.BuildGridServer(port,
|
|
89 |
+ instances,
|
|
90 |
+ cas_storage=cas_storage,
|
|
91 |
+ action_cache=action_cache)
|
|
85 | 92 |
loop = asyncio.get_event_loop()
|
86 | 93 |
try:
|
87 | 94 |
server.start()
|
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+ |
|
16 |
+"""
|
|
17 |
+BuildGrid Instance
|
|
18 |
+==================
|
|
19 |
+ |
|
20 |
+An instance of the BuildGrid server.
|
|
21 |
+ |
|
22 |
+Contains scheduler, execution instance and an interface to the bots.
|
|
23 |
+"""
|
|
24 |
+ |
|
25 |
+ |
|
26 |
+import logging
|
|
27 |
+ |
|
28 |
+from .execution.execution_instance import ExecutionInstance
|
|
29 |
+from .scheduler import Scheduler
|
|
30 |
+from .worker.bots_interface import BotsInterface
|
|
31 |
+ |
|
32 |
+ |
|
33 |
+class BuildGridInstance(ExecutionInstance, BotsInterface):
|
|
34 |
+ |
|
35 |
+ def __init__(self, action_cache=None, cas_storage=None):
|
|
36 |
+ scheduler = Scheduler(action_cache)
|
|
37 |
+ |
|
38 |
+ self.logger = logging.getLogger(__name__)
|
|
39 |
+ |
|
40 |
+ ExecutionInstance.__init__(self, scheduler, cas_storage)
|
|
41 |
+ BotsInterface.__init__(self, scheduler)
|
|
42 |
+ |
|
43 |
+ def stream_operation_updates(self, message_queue, operation_name):
|
|
44 |
+ operation = message_queue.get()
|
|
45 |
+ while not operation.done:
|
|
46 |
+ yield operation
|
|
47 |
+ operation = message_queue.get()
|
|
48 |
+ yield operation
|
|
49 |
+ |
|
50 |
+ def cancel_operation(self, name):
|
|
51 |
+ # TODO: Cancel leases
|
|
52 |
+ raise NotImplementedError("Cancelled operations not supported")
|
... | ... | @@ -29,35 +29,23 @@ from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_p |
29 | 29 |
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc
|
30 | 30 |
from buildgrid._protos.google.longrunning import operations_pb2_grpc
|
31 | 31 |
|
32 |
+from .buildgrid_instance import BuildGridInstance
|
|
32 | 33 |
from .cas.bytestream_service import ByteStreamService
|
33 | 34 |
from .cas.content_addressable_storage_service import ContentAddressableStorageService
|
34 | 35 |
from .execution.action_cache_service import ActionCacheService
|
35 | 36 |
from .execution.execution_service import ExecutionService
|
36 | 37 |
from .execution.operations_service import OperationsService
|
37 |
-from .execution.execution_instance import ExecutionInstance
|
|
38 |
-from .scheduler import Scheduler
|
|
39 | 38 |
from .worker.bots_service import BotsService
|
40 |
-from .worker.bots_interface import BotsInterface
|
|
41 | 39 |
|
42 | 40 |
|
43 | 41 |
class BuildGridServer:
|
44 | 42 |
|
45 |
- def __init__(self, port='50051', max_workers=10, cas_storage=None, action_cache=None):
|
|
43 |
+ def __init__(self, port='50051', instances=None, max_workers=10, action_cache=None, cas_storage=None):
|
|
46 | 44 |
port = '[::]:{0}'.format(port)
|
47 |
- scheduler = Scheduler(action_cache)
|
|
48 |
- bots_interface = BotsInterface(scheduler)
|
|
49 |
- execution_instance = ExecutionInstance(scheduler, cas_storage)
|
|
50 | 45 |
|
51 | 46 |
self._server = grpc.server(futures.ThreadPoolExecutor(max_workers))
|
52 | 47 |
self._server.add_insecure_port(port)
|
53 | 48 |
|
54 |
- bots_pb2_grpc.add_BotsServicer_to_server(BotsService(bots_interface),
|
|
55 |
- self._server)
|
|
56 |
- remote_execution_pb2_grpc.add_ExecutionServicer_to_server(ExecutionService(execution_instance),
|
|
57 |
- self._server)
|
|
58 |
- operations_pb2_grpc.add_OperationsServicer_to_server(OperationsService(execution_instance),
|
|
59 |
- self._server)
|
|
60 |
- |
|
61 | 49 |
if cas_storage is not None:
|
62 | 50 |
cas_service = ContentAddressableStorageService(cas_storage)
|
63 | 51 |
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(cas_service,
|
... | ... | @@ -69,6 +57,20 @@ class BuildGridServer: |
69 | 57 |
remote_execution_pb2_grpc.add_ActionCacheServicer_to_server(action_cache_service,
|
70 | 58 |
self._server)
|
71 | 59 |
|
60 |
+ buildgrid_instances = {}
|
|
61 |
+ if not instances:
|
|
62 |
+ buildgrid_instances["main"] = BuildGridInstance(action_cache, cas_storage)
|
|
63 |
+ else:
|
|
64 |
+ for name in instances:
|
|
65 |
+ buildgrid_instances[name] = BuildGridInstance(action_cache, cas_storage)
|
|
66 |
+ |
|
67 |
+ bots_pb2_grpc.add_BotsServicer_to_server(BotsService(buildgrid_instances),
|
|
68 |
+ self._server)
|
|
69 |
+ remote_execution_pb2_grpc.add_ExecutionServicer_to_server(ExecutionService(buildgrid_instances),
|
|
70 |
+ self._server)
|
|
71 |
+ operations_pb2_grpc.add_OperationsServicer_to_server(OperationsService(buildgrid_instances),
|
|
72 |
+ self._server)
|
|
73 |
+ |
|
72 | 74 |
def start(self):
|
73 | 75 |
self._server.start()
|
74 | 76 |
|
... | ... | @@ -56,12 +56,14 @@ class ExecutionInstance: |
56 | 56 |
|
57 | 57 |
def get_operation(self, name):
|
58 | 58 |
operation = self._scheduler.jobs.get(name)
|
59 |
+ |
|
59 | 60 |
if operation is None:
|
60 | 61 |
raise InvalidArgumentError("Operation name does not exist: {}".format(name))
|
62 |
+ |
|
61 | 63 |
else:
|
62 | 64 |
return operation.get_operation()
|
63 | 65 |
|
64 |
- def list_operations(self, name, list_filter, page_size, page_token):
|
|
66 |
+ def list_operations(self, list_filter, page_size, page_token):
|
|
65 | 67 |
# TODO: Pages
|
66 | 68 |
# Spec says number of pages and length of a page are optional
|
67 | 69 |
return self._scheduler.get_operations()
|
... | ... | @@ -72,10 +74,6 @@ class ExecutionInstance: |
72 | 74 |
except KeyError:
|
73 | 75 |
raise InvalidArgumentError("Operation name does not exist: {}".format(name))
|
74 | 76 |
|
75 |
- def cancel_operation(self, name):
|
|
76 |
- # TODO: Cancel leases
|
|
77 |
- raise NotImplementedError("Cancelled operations not supported")
|
|
78 |
- |
|
79 | 77 |
def register_message_client(self, name, queue):
|
80 | 78 |
try:
|
81 | 79 |
self._scheduler.register_client(name, queue)
|
... | ... | @@ -35,23 +35,23 @@ from .._exceptions import InvalidArgumentError |
35 | 35 |
|
36 | 36 |
class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
|
37 | 37 |
|
38 |
- def __init__(self, instance):
|
|
38 |
+ def __init__(self, instances):
|
|
39 | 39 |
self.logger = logging.getLogger(__name__)
|
40 |
- self._instance = instance
|
|
40 |
+ self._instances = instances
|
|
41 | 41 |
|
42 | 42 |
def Execute(self, request, context):
|
43 |
- # Ignore request.instance_name for now
|
|
44 |
- # Have only one instance
|
|
45 | 43 |
try:
|
46 | 44 |
message_queue = queue.Queue()
|
47 |
- operation = self._instance.execute(request.action_digest,
|
|
48 |
- request.skip_cache_lookup,
|
|
49 |
- message_queue)
|
|
45 |
+ instance = self._get_instance(request.instance_name)
|
|
46 |
+ operation = instance.execute(request.action_digest,
|
|
47 |
+ request.skip_cache_lookup,
|
|
48 |
+ message_queue)
|
|
50 | 49 |
|
51 |
- context.add_callback(partial(self._remove_client, operation.name, message_queue))
|
|
50 |
+ context.add_callback(partial(instance.unregister_message_client,
|
|
51 |
+ operation.name, message_queue))
|
|
52 | 52 |
|
53 |
- yield from self._stream_operation_updates(message_queue,
|
|
54 |
- operation.name)
|
|
53 |
+ yield from instance.stream_operation_updates(message_queue,
|
|
54 |
+ operation.name)
|
|
55 | 55 |
|
56 | 56 |
except InvalidArgumentError as e:
|
57 | 57 |
self.logger.error(e)
|
... | ... | @@ -59,23 +59,25 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
59 | 59 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
60 | 60 |
yield operations_pb2.Operation()
|
61 | 61 |
|
62 |
- except NotImplementedError as e:
|
|
63 |
- self.logger.error(e)
|
|
64 |
- context.set_details(str(e))
|
|
65 |
- context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
|
66 |
- yield operations_pb2.Operation()
|
|
67 |
- |
|
68 | 62 |
def WaitExecution(self, request, context):
|
69 | 63 |
try:
|
64 |
+ names = request.name.split("/")
|
|
65 |
+ |
|
66 |
+ # Operation name should be in format:
|
|
67 |
+ # {instance/name}/{operation_id}
|
|
68 |
+ instance_name = ''.join(names[0:-1])
|
|
69 |
+ |
|
70 | 70 |
message_queue = queue.Queue()
|
71 |
- operation_name = request.name
|
|
71 |
+ operation_name = names[-1]
|
|
72 |
+ instance = self._get_instance(instance_name)
|
|
72 | 73 |
|
73 |
- self._instance.register_message_client(operation_name, message_queue)
|
|
74 |
+ instance.register_message_client(operation_name, message_queue)
|
|
74 | 75 |
|
75 |
- context.add_callback(partial(self._remove_client, operation_name, message_queue))
|
|
76 |
+ context.add_callback(partial(instance.unregister_message_client,
|
|
77 |
+ operation_name, message_queue))
|
|
76 | 78 |
|
77 |
- yield from self._stream_operation_updates(message_queue,
|
|
78 |
- operation_name)
|
|
79 |
+ yield from instance.stream_operation_updates(message_queue,
|
|
80 |
+ operation_name)
|
|
79 | 81 |
|
80 | 82 |
except InvalidArgumentError as e:
|
81 | 83 |
self.logger.error(e)
|
... | ... | @@ -83,12 +85,14 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
83 | 85 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
84 | 86 |
yield operations_pb2.Operation()
|
85 | 87 |
|
86 |
- def _remove_client(self, operation_name, message_queue):
|
|
87 |
- self._instance.unregister_message_client(operation_name, message_queue)
|
|
88 |
+ def _get_instance(self, name):
|
|
89 |
+ # If client does not support multiple instances, it may omit the
|
|
90 |
+ # instance name request parameter, so better map our default:
|
|
91 |
+ if not name and len(self._instances) == 1:
|
|
92 |
+ name = 'main'
|
|
93 |
+ |
|
94 |
+ try:
|
|
95 |
+ return self._instances[name]
|
|
88 | 96 |
|
89 |
- def _stream_operation_updates(self, message_queue, operation_name):
|
|
90 |
- operation = message_queue.get()
|
|
91 |
- while not operation.done:
|
|
92 |
- yield operation
|
|
93 |
- operation = message_queue.get()
|
|
94 |
- yield operation
|
|
97 |
+ except KeyError:
|
|
98 |
+ raise InvalidArgumentError("Instance doesn't exist on server: {}".format(name))
|
... | ... | @@ -23,6 +23,8 @@ import logging |
23 | 23 |
|
24 | 24 |
import grpc
|
25 | 25 |
|
26 |
+from google.protobuf.empty_pb2 import Empty
|
|
27 |
+ |
|
26 | 28 |
from buildgrid._protos.google.longrunning import operations_pb2_grpc, operations_pb2
|
27 | 29 |
|
28 | 30 |
from .._exceptions import InvalidArgumentError
|
... | ... | @@ -30,42 +32,102 @@ from .._exceptions import InvalidArgumentError |
30 | 32 |
|
31 | 33 |
class OperationsService(operations_pb2_grpc.OperationsServicer):
|
32 | 34 |
|
33 |
- def __init__(self, instance):
|
|
34 |
- self._instance = instance
|
|
35 |
+ def __init__(self, instances):
|
|
36 |
+ self._instances = instances
|
|
35 | 37 |
self.logger = logging.getLogger(__name__)
|
36 | 38 |
|
37 | 39 |
def GetOperation(self, request, context):
|
38 | 40 |
try:
|
39 |
- return self._instance.get_operation(request.name)
|
|
41 |
+ name = request.name
|
|
42 |
+ operation_name = self._get_operation_name(name)
|
|
43 |
+ |
|
44 |
+ instance = self._get_instance(name)
|
|
45 |
+ |
|
46 |
+ operation = instance.get_operation(operation_name)
|
|
47 |
+ operation.name = name
|
|
48 |
+ return operation
|
|
40 | 49 |
|
41 | 50 |
except InvalidArgumentError as e:
|
42 | 51 |
self.logger.error(e)
|
43 | 52 |
context.set_details(str(e))
|
44 | 53 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
45 |
- return operations_pb2.Operation()
|
|
54 |
+ |
|
55 |
+ return operations_pb2.Operation()
|
|
46 | 56 |
|
47 | 57 |
def ListOperations(self, request, context):
|
48 |
- return self._instance.list_operations(request.name,
|
|
49 |
- request.filter,
|
|
58 |
+ try:
|
|
59 |
+ # Name should be the collection name
|
|
60 |
+ # Or in this case, the instance_name
|
|
61 |
+ name = request.name
|
|
62 |
+ instance = self._get_instance(name)
|
|
63 |
+ |
|
64 |
+ result = instance.list_operations(request.filter,
|
|
50 | 65 |
request.page_size,
|
51 | 66 |
request.page_token)
|
52 | 67 |
|
68 |
+ for operation in result.operations:
|
|
69 |
+ operation.name = "{}/{}".format(name, operation.name)
|
|
70 |
+ |
|
71 |
+ return result
|
|
72 |
+ |
|
73 |
+ except InvalidArgumentError as e:
|
|
74 |
+ self.logger.error(e)
|
|
75 |
+ context.set_details(str(e))
|
|
76 |
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
|
77 |
+ |
|
78 |
+ return operations_pb2.ListOperationsResponse()
|
|
79 |
+ |
|
53 | 80 |
def DeleteOperation(self, request, context):
|
54 | 81 |
try:
|
55 |
- return self._instance.delete_operation(request.name)
|
|
82 |
+ name = request.name
|
|
83 |
+ operation_name = self._get_operation_name(name)
|
|
84 |
+ |
|
85 |
+ instance = self._get_instance(name)
|
|
86 |
+ |
|
87 |
+ instance.delete_operation(operation_name)
|
|
56 | 88 |
|
57 | 89 |
except InvalidArgumentError as e:
|
58 | 90 |
self.logger.error(e)
|
59 | 91 |
context.set_details(str(e))
|
60 | 92 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
61 |
- return operations_pb2.Operation()
|
|
93 |
+ |
|
94 |
+ return Empty()
|
|
62 | 95 |
|
63 | 96 |
def CancelOperation(self, request, context):
|
64 | 97 |
try:
|
65 |
- return self._instance.cancel_operation(request.name)
|
|
98 |
+ name = request.name
|
|
99 |
+ operation_name = self._get_operation_name(name)
|
|
100 |
+ |
|
101 |
+ instance = self._get_instance(name)
|
|
102 |
+ |
|
103 |
+ instance.cancel_operation(operation_name)
|
|
66 | 104 |
|
67 | 105 |
except NotImplementedError as e:
|
68 | 106 |
self.logger.error(e)
|
69 | 107 |
context.set_details(str(e))
|
70 | 108 |
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
71 |
- return operations_pb2.Operation()
|
|
109 |
+ |
|
110 |
+ except InvalidArgumentError as e:
|
|
111 |
+ self.logger.error(e)
|
|
112 |
+ context.set_details(str(e))
|
|
113 |
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
|
114 |
+ |
|
115 |
+ return Empty()
|
|
116 |
+ |
|
117 |
+ def _get_operation_name(self, name):
|
|
118 |
+ return name.split("/")[-1]
|
|
119 |
+ |
|
120 |
+ def _get_instance(self, name):
|
|
121 |
+ try:
|
|
122 |
+ names = name.split("/")
|
|
123 |
+ |
|
124 |
+ # Operation name should be in format:
|
|
125 |
+ # {instance/name}/{operation_id}
|
|
126 |
+ instance_name = ''.join(names[0:-1])
|
|
127 |
+ if not instance_name:
|
|
128 |
+ return self._instances[name]
|
|
129 |
+ |
|
130 |
+ return self._instances[instance_name]
|
|
131 |
+ |
|
132 |
+ except KeyError:
|
|
133 |
+ raise InvalidArgumentError("Instance doesn't exist on server: {}".format(name))
|
... | ... | @@ -90,7 +90,7 @@ class Scheduler: |
90 | 90 |
job.update_execute_stage(ExecuteStage.COMPLETED)
|
91 | 91 |
self.jobs[name] = job
|
92 | 92 |
if not job.do_not_cache and self._action_cache is not None:
|
93 |
- self._action_cache.put_action_result(job.action_digest, result)
|
|
93 |
+ self._action_cache.update_action_result(job.action_digest, result)
|
|
94 | 94 |
|
95 | 95 |
def get_operations(self):
|
96 | 96 |
response = operations_pb2.ListOperationsResponse()
|
... | ... | @@ -54,7 +54,8 @@ class BotsInterface: |
54 | 54 |
pass
|
55 | 55 |
|
56 | 56 |
# Bot session name, selected by the server
|
57 |
- name = str(uuid.uuid4())
|
|
57 |
+ name = "{}/{}".format(parent, str(uuid.uuid4()))
|
|
58 |
+ |
|
58 | 59 |
bot_session.name = name
|
59 | 60 |
|
60 | 61 |
self._bot_ids[name] = bot_id
|
... | ... | @@ -33,14 +33,17 @@ from .._exceptions import InvalidArgumentError, OutofSyncError |
33 | 33 |
|
34 | 34 |
class BotsService(bots_pb2_grpc.BotsServicer):
|
35 | 35 |
|
36 |
- def __init__(self, instance):
|
|
37 |
- self._instance = instance
|
|
36 |
+ def __init__(self, instances):
|
|
37 |
+ self._instances = instances
|
|
38 | 38 |
self.logger = logging.getLogger(__name__)
|
39 | 39 |
|
40 | 40 |
def CreateBotSession(self, request, context):
|
41 | 41 |
try:
|
42 |
- return self._instance.create_bot_session(request.parent,
|
|
43 |
- request.bot_session)
|
|
42 |
+ parent = request.parent
|
|
43 |
+ instance = self._get_instance(request.parent)
|
|
44 |
+ return instance.create_bot_session(parent,
|
|
45 |
+ request.bot_session)
|
|
46 |
+ |
|
44 | 47 |
except InvalidArgumentError as e:
|
45 | 48 |
self.logger.error(e)
|
46 | 49 |
context.set_details(str(e))
|
... | ... | @@ -50,8 +53,15 @@ class BotsService(bots_pb2_grpc.BotsServicer): |
50 | 53 |
|
51 | 54 |
def UpdateBotSession(self, request, context):
|
52 | 55 |
try:
|
53 |
- return self._instance.update_bot_session(request.name,
|
|
54 |
- request.bot_session)
|
|
56 |
+ names = request.name.split("/")
|
|
57 |
+ # Operation name should be in format:
|
|
58 |
+ # {instance/name}/{uuid}
|
|
59 |
+ instance_name = ''.join(names[0:-1])
|
|
60 |
+ |
|
61 |
+ instance = self._get_instance(instance_name)
|
|
62 |
+ return instance.update_bot_session(request.name,
|
|
63 |
+ request.bot_session)
|
|
64 |
+ |
|
55 | 65 |
except InvalidArgumentError as e:
|
56 | 66 |
self.logger.error(e)
|
57 | 67 |
context.set_details(str(e))
|
... | ... | @@ -72,3 +82,10 @@ class BotsService(bots_pb2_grpc.BotsServicer): |
72 | 82 |
def PostBotEventTemp(self, request, context):
|
73 | 83 |
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
74 | 84 |
return Empty()
|
85 |
+ |
|
86 |
+ def _get_instance(self, name):
|
|
87 |
+ try:
|
|
88 |
+ return self._instances[name]
|
|
89 |
+ |
|
90 |
+ except KeyError:
|
|
91 |
+ raise InvalidArgumentError("Instance doesn't exist on server: {}".format(name))
|
... | ... | @@ -13,6 +13,7 @@ |
13 | 13 |
# limitations under the License.
|
14 | 14 |
|
15 | 15 |
|
16 |
+from operator import attrgetter
|
|
16 | 17 |
import os
|
17 | 18 |
|
18 | 19 |
from buildgrid.settings import HASH
|
... | ... | @@ -31,30 +32,59 @@ def gen_fetch_blob(stub, digest, instance_name=""): |
31 | 32 |
yield response.data
|
32 | 33 |
|
33 | 34 |
|
34 |
-def write_fetch_directory(directory, stub, digest, instance_name=""):
|
|
35 |
- """ Given a directory digest, fetches files and writes them to a directory
|
|
35 |
+def write_fetch_directory(root_directory, stub, digest, instance_name=None):
|
|
36 |
+ """Locally replicates a directory from CAS.
|
|
37 |
+ |
|
38 |
+ Args:
|
|
39 |
+ root_directory (str): local directory to populate.
|
|
40 |
+ stub (): gRPC stub for CAS communication.
|
|
41 |
+ digest (Digest): digest for the directory to fetch from CAS.
|
|
42 |
+ instance_name (str, optional): farm instance name to query data from.
|
|
36 | 43 |
"""
|
37 |
- # TODO: Extend to symlinks and inner directories
|
|
38 |
- # pathlib.Path('/my/directory').mkdir(parents=True, exist_ok=True)
|
|
44 |
+ if not os.path.isabs(root_directory):
|
|
45 |
+ root_directory = os.path.abspath(root_directory)
|
|
46 |
+ if not os.path.exists(root_directory):
|
|
47 |
+ os.makedirs(root_directory, exist_ok=True)
|
|
39 | 48 |
|
40 |
- directory_pb2 = remote_execution_pb2.Directory()
|
|
41 |
- directory_pb2 = parse_to_pb2_from_fetch(directory_pb2, stub, digest, instance_name)
|
|
49 |
+ directory = parse_to_pb2_from_fetch(remote_execution_pb2.Directory(),
|
|
50 |
+ stub, digest, instance_name)
|
|
51 |
+ |
|
52 |
+ for directory_node in directory.directories:
|
|
53 |
+ child_path = os.path.join(root_directory, directory_node.name)
|
|
54 |
+ |
|
55 |
+ write_fetch_directory(child_path, stub, directory_node.digest, instance_name)
|
|
56 |
+ |
|
57 |
+ for file_node in directory.files:
|
|
58 |
+ child_path = os.path.join(root_directory, file_node.name)
|
|
59 |
+ |
|
60 |
+ with open(child_path, 'wb') as child_file:
|
|
61 |
+ write_fetch_blob(child_file, stub, file_node.digest, instance_name)
|
|
62 |
+ |
|
63 |
+ for symlink_node in directory.symlinks:
|
|
64 |
+ child_path = os.path.join(root_directory, symlink_node.name)
|
|
65 |
+ |
|
66 |
+ if os.path.isabs(symlink_node.target):
|
|
67 |
+ continue # No out of temp-directory links for now.
|
|
68 |
+ target_path = os.path.join(root_directory, symlink_node.target)
|
|
69 |
+ |
|
70 |
+ os.symlink(child_path, target_path)
|
|
42 | 71 |
|
43 |
- for file_node in directory_pb2.files:
|
|
44 |
- path = os.path.join(directory, file_node.name)
|
|
45 |
- with open(path, 'wb') as f:
|
|
46 |
- write_fetch_blob(f, stub, file_node.digest, instance_name)
|
|
47 | 72 |
|
73 |
+def write_fetch_blob(target_file, stub, digest, instance_name=None):
|
|
74 |
+ """Extracts a blob from CAS into a local file.
|
|
48 | 75 |
|
49 |
-def write_fetch_blob(out, stub, digest, instance_name=""):
|
|
50 |
- """ Given an output buffer, fetches blob and writes to buffer
|
|
76 |
+ Args:
|
|
77 |
+ target_file (str): local file to write.
|
|
78 |
+ stub (): gRPC stub for CAS communication.
|
|
79 |
+ digest (Digest): digest for the blob to fetch from CAS.
|
|
80 |
+ instance_name (str, optional): farm instance name to query data from.
|
|
51 | 81 |
"""
|
52 | 82 |
|
53 | 83 |
for stream in gen_fetch_blob(stub, digest, instance_name):
|
54 |
- out.write(stream)
|
|
84 |
+ target_file.write(stream)
|
|
85 |
+ target_file.flush()
|
|
55 | 86 |
|
56 |
- out.flush()
|
|
57 |
- assert digest.size_bytes == os.fstat(out.fileno()).st_size
|
|
87 |
+ assert digest.size_bytes == os.fstat(target_file.fileno()).st_size
|
|
58 | 88 |
|
59 | 89 |
|
60 | 90 |
def parse_to_pb2_from_fetch(pb2, stub, digest, instance_name=""):
|
... | ... | @@ -70,7 +100,15 @@ def parse_to_pb2_from_fetch(pb2, stub, digest, instance_name=""): |
70 | 100 |
|
71 | 101 |
|
72 | 102 |
def create_digest(bytes_to_digest):
|
73 |
- """ Creates a hash based on the hex digest and returns the digest
|
|
103 |
+ """Computes the :obj:`Digest` of a piece of data.
|
|
104 |
+ |
|
105 |
+ The :obj:`Digest` of a data is a function of its hash **and** size.
|
|
106 |
+ |
|
107 |
+ Args:
|
|
108 |
+ bytes_to_digest (bytes): byte data to digest.
|
|
109 |
+ |
|
110 |
+ Returns:
|
|
111 |
+ :obj:`Digest`: The gRPC :obj:`Digest` for the given byte data.
|
|
74 | 112 |
"""
|
75 | 113 |
return remote_execution_pb2.Digest(hash=HASH(bytes_to_digest).hexdigest(),
|
76 | 114 |
size_bytes=len(bytes_to_digest))
|
... | ... | @@ -107,6 +145,183 @@ def file_maker(file_path, file_digest): |
107 | 145 |
is_executable=os.access(file_path, os.X_OK))
|
108 | 146 |
|
109 | 147 |
|
110 |
-def read_file(read):
|
|
111 |
- with open(read, 'rb') as f:
|
|
112 |
- return f.read()
|
|
148 |
+def directory_maker(directory_path):
|
|
149 |
+ """
|
|
150 |
+ """
|
|
151 |
+ if not os.path.isabs(directory_path):
|
|
152 |
+ directory_path = os.path.abspath(directory_path)
|
|
153 |
+ |
|
154 |
+ child_directories = list()
|
|
155 |
+ update_requests = list()
|
|
156 |
+ |
|
157 |
+ files, directories, symlinks = list(), list(), list()
|
|
158 |
+ for directory_entry in os.scandir(directory_path):
|
|
159 |
+ # Create a FileNode and corresponding BatchUpdateBlobsRequest:
|
|
160 |
+ if directory_entry.is_file(follow_symlinks=False):
|
|
161 |
+ node_blob = read_file(directory_entry.path)
|
|
162 |
+ node_digest = create_digest(node_blob)
|
|
163 |
+ |
|
164 |
+ node = remote_execution_pb2.FileNode()
|
|
165 |
+ node.name = directory_entry.name
|
|
166 |
+ node.digest = node_digest
|
|
167 |
+ node.is_executable = os.access(directory_entry.path, os.X_OK)
|
|
168 |
+ |
|
169 |
+ node_request = remote_execution_pb2.BatchUpdateBlobsRequest.Request(digest=node_digest)
|
|
170 |
+ node_request.data = node_blob
|
|
171 |
+ |
|
172 |
+ update_requests.append(node_request)
|
|
173 |
+ files.append(node)
|
|
174 |
+ |
|
175 |
+ # Create a DirectoryNode and corresponding BatchUpdateBlobsRequest:
|
|
176 |
+ elif directory_entry.is_dir(follow_symlinks=False):
|
|
177 |
+ node_directory, node_children, node_requests = directory_maker(directory_entry.path)
|
|
178 |
+ |
|
179 |
+ node = remote_execution_pb2.DirectoryNode()
|
|
180 |
+ node.name = directory_entry.name
|
|
181 |
+ node.digest = node_requests[-1].digest
|
|
182 |
+ |
|
183 |
+ child_directories.extend(node_children)
|
|
184 |
+ child_directories.append(node_directory)
|
|
185 |
+ update_requests.expend(node_requests)
|
|
186 |
+ directories.append(node)
|
|
187 |
+ |
|
188 |
+ # Create a SymlinkNode if necessary;
|
|
189 |
+ elif os.path.islink(directory_entry.path):
|
|
190 |
+ node_target = os.readlink(directory_entry.path)
|
|
191 |
+ |
|
192 |
+ node = remote_execution_pb2.SymlinkNode()
|
|
193 |
+ node.name = directory_entry.name
|
|
194 |
+ node.target = node_target
|
|
195 |
+ |
|
196 |
+ symlinks.append(node)
|
|
197 |
+ |
|
198 |
+ directory = remote_execution_pb2.Directory()
|
|
199 |
+ directory.files.extend(files.sort(key=attrgetter('name')))
|
|
200 |
+ directory.directories.extend(directories.sort(key=attrgetter('name')))
|
|
201 |
+ directory.symlinks.extend(symlinks.sort(key=attrgetter('name')))
|
|
202 |
+ |
|
203 |
+ directory_blob = directory.SerializeToString()
|
|
204 |
+ directory_digest = create_digest(directory_blob)
|
|
205 |
+ |
|
206 |
+ update_request = remote_execution_pb2.BatchUpdateBlobsRequest.Request(digest=directory_digest)
|
|
207 |
+ update_request.data = directory_blob
|
|
208 |
+ |
|
209 |
+ update_requests.append(update_request)
|
|
210 |
+ |
|
211 |
+ return directory, child_directories, update_requests
|
|
212 |
+ |
|
213 |
+ |
|
214 |
+def read_file(file_path):
|
|
215 |
+ """Loads raw file content in memory.
|
|
216 |
+ |
|
217 |
+ Returns:
|
|
218 |
+ bytes: Raw file's content until EOF.
|
|
219 |
+ |
|
220 |
+ Raises:
|
|
221 |
+ OSError: If `file_path` does not exist or is not readable.
|
|
222 |
+ """
|
|
223 |
+ with open(file_path, 'rb') as byte_file:
|
|
224 |
+ return byte_file.read()
|
|
225 |
+ |
|
226 |
+ |
|
227 |
+def output_file_maker(file_path, input_path):
|
|
228 |
+ """Creates an :obj:`OutputFile` from a local file.
|
|
229 |
+ |
|
230 |
+ `file_path` **must** point inside or be relative to `input_path`.
|
|
231 |
+ |
|
232 |
+ Args:
|
|
233 |
+ file_path (str): absolute or relative path to a local file.
|
|
234 |
+ input_path (str): absolute or relative path to the input root directory.
|
|
235 |
+ |
|
236 |
+ Returns:
|
|
237 |
+ :obj:`OutputFile`, :obj:`BatchUpdateBlobsRequest`: Tuple of a new gRPC
|
|
238 |
+ :obj:`OutputFile` object for the file pointed by `file_path` and the
|
|
239 |
+ corresponding :obj:`BatchUpdateBlobsRequest` for CAS upload.
|
|
240 |
+ """
|
|
241 |
+ if not os.path.isabs(file_path):
|
|
242 |
+ file_path = os.path.abspath(file_path)
|
|
243 |
+ if not os.path.isabs(input_path):
|
|
244 |
+ input_path = os.path.abspath(input_path)
|
|
245 |
+ |
|
246 |
+ file_blob = read_file(file_path)
|
|
247 |
+ file_digest = create_digest(file_blob)
|
|
248 |
+ |
|
249 |
+ output_file = remote_execution_pb2.OutputFile(digest=file_digest)
|
|
250 |
+ output_file.path = os.path.relpath(file_path, start=input_path)
|
|
251 |
+ output_file.is_executable = os.access(file_path, os.X_OK)
|
|
252 |
+ |
|
253 |
+ update_request = remote_execution_pb2.BatchUpdateBlobsRequest.Request(digest=file_digest)
|
|
254 |
+ update_request.data = file_blob
|
|
255 |
+ |
|
256 |
+ return output_file, update_request
|
|
257 |
+ |
|
258 |
+ |
|
259 |
+def output_directory_maker(directory_path, working_path):
|
|
260 |
+ """Creates a gRPC :obj:`OutputDirectory` from a local directory.
|
|
261 |
+ |
|
262 |
+ `directory_path` **must** point inside or be relative to `input_path`.
|
|
263 |
+ |
|
264 |
+ Args:
|
|
265 |
+ directory_path (str): absolute or relative path to a local directory.
|
|
266 |
+ working_path (str): absolute or relative path to the working directory.
|
|
267 |
+ |
|
268 |
+ Returns:
|
|
269 |
+ :obj:`OutputDirectory`, :obj:`BatchUpdateBlobsRequest`: Tuple of a new
|
|
270 |
+ gRPC :obj:`OutputDirectory` for the directory pointed by
|
|
271 |
+ `directory_path` and the corresponding list of
|
|
272 |
+ :obj:`BatchUpdateBlobsRequest` for CAS upload.
|
|
273 |
+ """
|
|
274 |
+ if not os.path.isabs(directory_path):
|
|
275 |
+ directory_path = os.path.abspath(directory_path)
|
|
276 |
+ if not os.path.isabs(working_path):
|
|
277 |
+ working_path = os.path.abspath(working_path)
|
|
278 |
+ |
|
279 |
+ tree, update_requests = tree_maker(directory_path)
|
|
280 |
+ |
|
281 |
+ output_directory = remote_execution_pb2.OutputDirectory()
|
|
282 |
+ output_directory.tree_digest = update_requests[-1].digest
|
|
283 |
+ output_directory.path = os.path.relpath(directory_path, start=working_path)
|
|
284 |
+ |
|
285 |
+ output_directory_blob = output_directory.SerializeToString()
|
|
286 |
+ output_directory_digest = create_digest(output_directory_blob)
|
|
287 |
+ |
|
288 |
+ update_request = remote_execution_pb2.BatchUpdateBlobsRequest.Request(digest=output_directory_digest)
|
|
289 |
+ update_request.data = output_directory_blob
|
|
290 |
+ |
|
291 |
+ update_requests.append(update_request)
|
|
292 |
+ |
|
293 |
+ return output_directory, update_requests
|
|
294 |
+ |
|
295 |
+ |
|
296 |
+def tree_maker(directory_path):
|
|
297 |
+ """Creates a gRPC :obj:`Tree` from a local directory.
|
|
298 |
+ |
|
299 |
+ Args:
|
|
300 |
+ directory_path (str): absolute or relative path to a local directory.
|
|
301 |
+ |
|
302 |
+ Returns:
|
|
303 |
+ :obj:`Tree`, :obj:`BatchUpdateBlobsRequest`: Tuple of a new
|
|
304 |
+ gRPC :obj:`Tree` for the directory pointed by `directory_path` and the
|
|
305 |
+ corresponding list of :obj:`BatchUpdateBlobsRequest` for CAS upload.
|
|
306 |
+ |
|
307 |
+ The :obj:`BatchUpdateBlobsRequest` list may come in any order. However,
|
|
308 |
+ its last element is guaranteed to be the :obj:`Tree`'s request.
|
|
309 |
+ """
|
|
310 |
+ if not os.path.isabs(directory_path):
|
|
311 |
+ directory_path = os.path.abspath(directory_path)
|
|
312 |
+ |
|
313 |
+ directory, child_directories, update_requests = directory_maker(directory_path)
|
|
314 |
+ |
|
315 |
+ tree = remote_execution_pb2.Tree()
|
|
316 |
+ tree.children.expend([child_directories])
|
|
317 |
+ tree.root = directory
|
|
318 |
+ |
|
319 |
+ tree_blob = tree.SerializeToString()
|
|
320 |
+ tree_digest = create_digest(file_blob)
|
|
321 |
+ |
|
322 |
+ update_request = remote_execution_pb2.BatchUpdateBlobsRequest.Request(digest=tree_digest)
|
|
323 |
+ update_request.data = tree_blob
|
|
324 |
+ |
|
325 |
+ update_requests.append(update_request)
|
|
326 |
+ |
|
327 |
+ return tree, update_requests
|
... | ... | @@ -18,7 +18,6 @@ |
18 | 18 |
# pylint: disable=redefined-outer-name
|
19 | 19 |
|
20 | 20 |
import copy
|
21 |
-import uuid
|
|
22 | 21 |
from unittest import mock
|
23 | 22 |
|
24 | 23 |
import grpc
|
... | ... | @@ -27,7 +26,7 @@ import pytest |
27 | 26 |
|
28 | 27 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
29 | 28 |
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
|
30 |
-from buildgrid.server import scheduler, job
|
|
29 |
+from buildgrid.server import job, buildgrid_instance
|
|
31 | 30 |
from buildgrid.server.job import LeaseState
|
32 | 31 |
from buildgrid.server.worker import bots_interface, bots_service
|
33 | 32 |
|
... | ... | @@ -53,8 +52,8 @@ def bot_session(): |
53 | 52 |
|
54 | 53 |
|
55 | 54 |
@pytest.fixture
|
56 |
-def schedule():
|
|
57 |
- yield scheduler.Scheduler()
|
|
55 |
+def buildgrid():
|
|
56 |
+ yield buildgrid_instance.BuildGridInstance()
|
|
58 | 57 |
|
59 | 58 |
|
60 | 59 |
@pytest.fixture
|
... | ... | @@ -64,19 +63,17 @@ def bots(schedule): |
64 | 63 |
|
65 | 64 |
# Instance to test
|
66 | 65 |
@pytest.fixture
|
67 |
-def instance(bots):
|
|
68 |
- yield bots_service.BotsService(bots)
|
|
66 |
+def instance(buildgrid):
|
|
67 |
+ instances = {"": buildgrid}
|
|
68 |
+ yield bots_service.BotsService(instances)
|
|
69 | 69 |
|
70 | 70 |
|
71 | 71 |
def test_create_bot_session(bot_session, context, instance):
|
72 |
- parent = 'rach'
|
|
73 |
- request = bots_pb2.CreateBotSessionRequest(parent=parent,
|
|
74 |
- bot_session=bot_session)
|
|
72 |
+ request = bots_pb2.CreateBotSessionRequest(bot_session=bot_session)
|
|
75 | 73 |
|
76 | 74 |
response = instance.CreateBotSession(request, context)
|
77 | 75 |
|
78 | 76 |
assert isinstance(response, bots_pb2.BotSession)
|
79 |
- assert uuid.UUID(response.name, version=4)
|
|
80 | 77 |
assert bot_session.bot_id == response.bot_id
|
81 | 78 |
|
82 | 79 |
|
... | ... | @@ -92,8 +89,7 @@ def test_create_bot_session_bot_id_fail(context, instance): |
92 | 89 |
|
93 | 90 |
|
94 | 91 |
def test_update_bot_session(bot_session, context, instance):
|
95 |
- request = bots_pb2.CreateBotSessionRequest(parent='',
|
|
96 |
- bot_session=bot_session)
|
|
92 |
+ request = bots_pb2.CreateBotSessionRequest(bot_session=bot_session)
|
|
97 | 93 |
bot = instance.CreateBotSession(request, context)
|
98 | 94 |
|
99 | 95 |
request = bots_pb2.UpdateBotSessionRequest(name=bot.name,
|
... | ... | @@ -106,8 +102,7 @@ def test_update_bot_session(bot_session, context, instance): |
106 | 102 |
|
107 | 103 |
|
108 | 104 |
def test_update_bot_session_zombie(bot_session, context, instance):
|
109 |
- request = bots_pb2.CreateBotSessionRequest(parent='',
|
|
110 |
- bot_session=bot_session)
|
|
105 |
+ request = bots_pb2.CreateBotSessionRequest(bot_session=bot_session)
|
|
111 | 106 |
bot = instance.CreateBotSession(request, context)
|
112 | 107 |
# Update server with incorrect UUID by rotating it
|
113 | 108 |
bot.name = bot.name[len(bot.name): 0]
|
... | ... | @@ -121,8 +116,7 @@ def test_update_bot_session_zombie(bot_session, context, instance): |
121 | 116 |
|
122 | 117 |
|
123 | 118 |
def test_update_bot_session_bot_id_fail(bot_session, context, instance):
|
124 |
- request = bots_pb2.UpdateBotSessionRequest(name='ana',
|
|
125 |
- bot_session=bot_session)
|
|
119 |
+ request = bots_pb2.UpdateBotSessionRequest(bot_session=bot_session)
|
|
126 | 120 |
|
127 | 121 |
instance.UpdateBotSession(request, context)
|
128 | 122 |
|
... | ... | @@ -131,17 +125,15 @@ def test_update_bot_session_bot_id_fail(bot_session, context, instance): |
131 | 125 |
|
132 | 126 |
@pytest.mark.parametrize("number_of_jobs", [0, 1, 3, 500])
|
133 | 127 |
def test_number_of_leases(number_of_jobs, bot_session, context, instance):
|
134 |
- request = bots_pb2.CreateBotSessionRequest(parent='',
|
|
135 |
- bot_session=bot_session)
|
|
128 |
+ request = bots_pb2.CreateBotSessionRequest(bot_session=bot_session)
|
|
136 | 129 |
# Inject work
|
137 | 130 |
for _ in range(0, number_of_jobs):
|
138 | 131 |
action_digest = remote_execution_pb2.Digest()
|
139 |
- instance._instance._scheduler.append_job(job.Job(action_digest))
|
|
132 |
+ instance._instances[""].execute(action_digest, True)
|
|
140 | 133 |
|
141 | 134 |
response = instance.CreateBotSession(request, context)
|
142 | 135 |
|
143 | 136 |
assert len(response.leases) == number_of_jobs
|
144 |
- assert isinstance(response, bots_pb2.BotSession)
|
|
145 | 137 |
|
146 | 138 |
|
147 | 139 |
def test_update_leases_with_work(bot_session, context, instance):
|
... | ... | @@ -149,7 +141,7 @@ def test_update_leases_with_work(bot_session, context, instance): |
149 | 141 |
bot_session=bot_session)
|
150 | 142 |
# Inject work
|
151 | 143 |
action_digest = remote_execution_pb2.Digest(hash='gaff')
|
152 |
- instance._instance._scheduler.append_job(job.Job(action_digest))
|
|
144 |
+ instance._instances[""].execute(action_digest, True)
|
|
153 | 145 |
|
154 | 146 |
response = instance.CreateBotSession(request, context)
|
155 | 147 |
|
... | ... | @@ -159,7 +151,6 @@ def test_update_leases_with_work(bot_session, context, instance): |
159 | 151 |
|
160 | 152 |
assert isinstance(response, bots_pb2.BotSession)
|
161 | 153 |
assert response.leases[0].state == LeaseState.PENDING.value
|
162 |
- assert uuid.UUID(response.leases[0].id, version=4)
|
|
163 | 154 |
assert response_action == action_digest
|
164 | 155 |
|
165 | 156 |
|
... | ... | @@ -172,7 +163,7 @@ def test_update_leases_work_complete(bot_session, context, instance): |
172 | 163 |
|
173 | 164 |
# Inject work
|
174 | 165 |
action_digest = remote_execution_pb2.Digest(hash='gaff')
|
175 |
- instance._instance._scheduler.append_job(job.Job(action_digest))
|
|
166 |
+ instance._instances[""].execute(action_digest, True)
|
|
176 | 167 |
|
177 | 168 |
request = bots_pb2.UpdateBotSessionRequest(name=response.name,
|
178 | 169 |
bot_session=response)
|
... | ... | @@ -200,7 +191,7 @@ def test_work_rejected_by_bot(bot_session, context, instance): |
200 | 191 |
bot_session=bot_session)
|
201 | 192 |
# Inject work
|
202 | 193 |
action_digest = remote_execution_pb2.Digest(hash='gaff')
|
203 |
- instance._instance._scheduler.append_job(job.Job(action_digest))
|
|
194 |
+ instance._instances[""].execute(action_digest, True)
|
|
204 | 195 |
|
205 | 196 |
# Simulated the severed binding between client and server
|
206 | 197 |
response = copy.deepcopy(instance.CreateBotSession(request, context))
|
... | ... | @@ -222,7 +213,8 @@ def test_work_out_of_sync_from_pending(state, bot_session, context, instance): |
222 | 213 |
bot_session=bot_session)
|
223 | 214 |
# Inject work
|
224 | 215 |
action_digest = remote_execution_pb2.Digest(hash='gaff')
|
225 |
- instance._instance._scheduler.append_job(job.Job(action_digest))
|
|
216 |
+ instance._instances[""].execute(action_digest, True)
|
|
217 |
+ |
|
226 | 218 |
# Simulated the severed binding between client and server
|
227 | 219 |
response = copy.deepcopy(instance.CreateBotSession(request, context))
|
228 | 220 |
|
... | ... | @@ -242,7 +234,8 @@ def test_work_out_of_sync_from_active(state, bot_session, context, instance): |
242 | 234 |
bot_session=bot_session)
|
243 | 235 |
# Inject work
|
244 | 236 |
action_digest = remote_execution_pb2.Digest(hash='gaff')
|
245 |
- instance._instance._scheduler.append_job(job.Job(action_digest))
|
|
237 |
+ instance._instances[""].execute(action_digest, True)
|
|
238 |
+ |
|
246 | 239 |
# Simulated the severed binding between client and server
|
247 | 240 |
response = copy.deepcopy(instance.CreateBotSession(request, context))
|
248 | 241 |
|
... | ... | @@ -268,7 +261,8 @@ def test_work_active_to_active(bot_session, context, instance): |
268 | 261 |
bot_session=bot_session)
|
269 | 262 |
# Inject work
|
270 | 263 |
action_digest = remote_execution_pb2.Digest(hash='gaff')
|
271 |
- instance._instance._scheduler.append_job(job.Job(action_digest))
|
|
264 |
+ instance._instances[""].execute(action_digest, True)
|
|
265 |
+ |
|
272 | 266 |
# Simulated the severed binding between client and server
|
273 | 267 |
response = copy.deepcopy(instance.CreateBotSession(request, context))
|
274 | 268 |
|
... | ... | @@ -20,15 +20,17 @@ |
20 | 20 |
import uuid
|
21 | 21 |
from unittest import mock
|
22 | 22 |
|
23 |
+import grpc
|
|
23 | 24 |
from grpc._server import _Context
|
24 | 25 |
import pytest
|
26 |
+from google.protobuf import any_pb2
|
|
25 | 27 |
|
26 | 28 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
27 | 29 |
from buildgrid._protos.google.longrunning import operations_pb2
|
28 | 30 |
|
29 |
-from buildgrid.server import scheduler, job
|
|
31 |
+from buildgrid.server import job, buildgrid_instance
|
|
30 | 32 |
from buildgrid.server.cas.storage import lru_memory_cache
|
31 |
-from buildgrid.server.execution import action_cache, execution_instance, execution_service
|
|
33 |
+from buildgrid.server.execution import action_cache, execution_service
|
|
32 | 34 |
|
33 | 35 |
|
34 | 36 |
@pytest.fixture
|
... | ... | @@ -38,19 +40,21 @@ def context(): |
38 | 40 |
|
39 | 41 |
|
40 | 42 |
@pytest.fixture(params=["action-cache", "no-action-cache"])
|
41 |
-def execution(request):
|
|
43 |
+def buildgrid(request):
|
|
42 | 44 |
if request.param == "action-cache":
|
43 | 45 |
storage = lru_memory_cache.LRUMemoryCache(1024 * 1024)
|
44 | 46 |
cache = action_cache.ActionCache(storage, 50)
|
45 |
- schedule = scheduler.Scheduler(cache)
|
|
46 |
- return execution_instance.ExecutionInstance(schedule, storage)
|
|
47 |
- return execution_instance.ExecutionInstance(scheduler.Scheduler())
|
|
47 |
+ |
|
48 |
+ return buildgrid_instance.BuildGridInstance(action_cache=cache,
|
|
49 |
+ cas_storage=storage)
|
|
50 |
+ return buildgrid_instance.BuildGridInstance()
|
|
48 | 51 |
|
49 | 52 |
|
50 | 53 |
# Instance to test
|
51 | 54 |
@pytest.fixture
|
52 |
-def instance(execution):
|
|
53 |
- yield execution_service.ExecutionService(execution)
|
|
55 |
+def instance(buildgrid):
|
|
56 |
+ instances = {"": buildgrid}
|
|
57 |
+ yield execution_service.ExecutionService(instances)
|
|
54 | 58 |
|
55 | 59 |
|
56 | 60 |
@pytest.mark.parametrize("skip_cache_lookup", [True, False])
|
... | ... | @@ -72,23 +76,45 @@ def test_execute(skip_cache_lookup, instance, context): |
72 | 76 |
assert result.done is False
|
73 | 77 |
|
74 | 78 |
|
75 |
-# def test_wait_execution(instance, context):
|
|
76 |
- # TODO: Figure out why next(response) hangs on the .get()
|
|
77 |
- # method when running in pytest.
|
|
78 |
-# action_digest = remote_execution_pb2.Digest()
|
|
79 |
-# action_digest.hash = 'zhora'
|
|
79 |
+def test_wrong_execute_instance(instance, context):
|
|
80 |
+ request = remote_execution_pb2.ExecuteRequest(instance_name='blade')
|
|
81 |
+ response = instance.Execute(request, context)
|
|
82 |
+ |
|
83 |
+ next(response)
|
|
84 |
+ context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
|
|
85 |
+ |
|
86 |
+ |
|
87 |
+def test_wait_execution(instance, buildgrid, context):
|
|
88 |
+ action_digest = remote_execution_pb2.Digest()
|
|
89 |
+ action_digest.hash = 'zhora'
|
|
90 |
+ |
|
91 |
+ j = job.Job(action_digest, None)
|
|
92 |
+ j._operation.done = True
|
|
93 |
+ |
|
94 |
+ request = remote_execution_pb2.WaitExecutionRequest(name="{}/{}".format('', j.name))
|
|
80 | 95 |
|
81 |
-# j = job.Job(action_digest, None)
|
|
82 |
-# j._operation.done = True
|
|
96 |
+ buildgrid._scheduler.jobs[j.name] = j
|
|
83 | 97 |
|
84 |
-# request = remote_execution_pb2.WaitExecutionRequest(name=j.name)
|
|
98 |
+ action_result_any = any_pb2.Any()
|
|
99 |
+ action_result = remote_execution_pb2.ActionResult()
|
|
100 |
+ action_result_any.Pack(action_result)
|
|
85 | 101 |
|
86 |
-# instance._instance._scheduler.jobs[j.name] = j
|
|
102 |
+ j.update_execute_stage(job.ExecuteStage.COMPLETED)
|
|
103 |
+ |
|
104 |
+ response = instance.WaitExecution(request, context)
|
|
105 |
+ |
|
106 |
+ result = next(response)
|
|
107 |
+ |
|
108 |
+ assert isinstance(result, operations_pb2.Operation)
|
|
109 |
+ metadata = remote_execution_pb2.ExecuteOperationMetadata()
|
|
110 |
+ result.metadata.Unpack(metadata)
|
|
111 |
+ assert metadata.stage == job.ExecuteStage.COMPLETED.value
|
|
112 |
+ assert uuid.UUID(result.name, version=4)
|
|
113 |
+ assert result.done is True
|
|
87 | 114 |
|
88 |
-# action_result_any = any_pb2.Any()
|
|
89 |
-# action_result = remote_execution_pb2.ActionResult()
|
|
90 |
-# action_result_any.Pack(action_result)
|
|
91 | 115 |
|
92 |
-# instance._instance._scheduler._update_execute_stage(j, job.ExecuteStage.COMPLETED)
|
|
116 |
+def test_wrong_instance_wait_execution(instance, buildgrid, context):
|
|
117 |
+ request = remote_execution_pb2.WaitExecutionRequest(name="blade")
|
|
118 |
+ next(instance.WaitExecution(request, context))
|
|
93 | 119 |
|
94 |
-# response = instance.WaitExecution(request, context)
|
|
120 |
+ context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
|
... | ... | @@ -28,10 +28,13 @@ from google.protobuf import any_pb2 |
28 | 28 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
29 | 29 |
from buildgrid._protos.google.longrunning import operations_pb2
|
30 | 30 |
|
31 |
-from buildgrid.server import scheduler
|
|
31 |
+from buildgrid.server import buildgrid_instance
|
|
32 | 32 |
from buildgrid.server._exceptions import InvalidArgumentError
|
33 | 33 |
|
34 |
-from buildgrid.server.execution import execution_instance, operations_service
|
|
34 |
+from buildgrid.server.execution import operations_service
|
|
35 |
+ |
|
36 |
+ |
|
37 |
+instance_name = "blade"
|
|
35 | 38 |
|
36 | 39 |
|
37 | 40 |
# Can mock this
|
... | ... | @@ -52,65 +55,80 @@ def execute_request(): |
52 | 55 |
|
53 | 56 |
|
54 | 57 |
@pytest.fixture
|
55 |
-def schedule():
|
|
56 |
- yield scheduler.Scheduler()
|
|
57 |
- |
|
58 |
- |
|
59 |
-@pytest.fixture
|
|
60 |
-def execution(schedule):
|
|
61 |
- yield execution_instance.ExecutionInstance(schedule)
|
|
58 |
+def buildgrid():
|
|
59 |
+ yield buildgrid_instance.BuildGridInstance()
|
|
62 | 60 |
|
63 | 61 |
|
64 | 62 |
# Instance to test
|
65 | 63 |
@pytest.fixture
|
66 |
-def instance(execution):
|
|
67 |
- yield operations_service.OperationsService(execution)
|
|
64 |
+def instance(buildgrid):
|
|
65 |
+ instances = {instance_name: buildgrid}
|
|
66 |
+ yield operations_service.OperationsService(instances)
|
|
68 | 67 |
|
69 | 68 |
|
70 | 69 |
# Queue an execution, get operation corresponding to that request
|
71 |
-def test_get_operation(instance, execute_request, context):
|
|
72 |
- response_execute = instance._instance.execute(execute_request.action_digest,
|
|
73 |
- execute_request.skip_cache_lookup)
|
|
70 |
+def test_get_operation(instance, buildgrid, execute_request, context):
|
|
71 |
+ response_execute = buildgrid.execute(execute_request.action_digest,
|
|
72 |
+ execute_request.skip_cache_lookup)
|
|
74 | 73 |
|
75 | 74 |
request = operations_pb2.GetOperationRequest()
|
76 | 75 |
|
77 |
- request.name = response_execute.name
|
|
76 |
+ request.name = "{}/{}".format(instance_name, response_execute.name)
|
|
78 | 77 |
|
79 | 78 |
response = instance.GetOperation(request, context)
|
80 | 79 |
assert response is response_execute
|
81 | 80 |
|
82 | 81 |
|
83 | 82 |
def test_get_operation_fail(instance, context):
|
83 |
+ request = operations_pb2.GetOperationRequest()
|
|
84 |
+ request.name = "{}/{}".format(instance_name, "runner")
|
|
85 |
+ instance.GetOperation(request, context)
|
|
86 |
+ |
|
87 |
+ context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
|
|
88 |
+ |
|
89 |
+ |
|
90 |
+def test_get_operation_instance_fail(instance, context):
|
|
84 | 91 |
request = operations_pb2.GetOperationRequest()
|
85 | 92 |
instance.GetOperation(request, context)
|
86 | 93 |
|
87 | 94 |
context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
|
88 | 95 |
|
89 | 96 |
|
90 |
-def test_list_operations(instance, execute_request, context):
|
|
91 |
- response_execute = instance._instance.execute(execute_request.action_digest,
|
|
92 |
- execute_request.skip_cache_lookup)
|
|
97 |
+def test_list_operations(instance, buildgrid, execute_request, context):
|
|
98 |
+ response_execute = buildgrid.execute(execute_request.action_digest,
|
|
99 |
+ execute_request.skip_cache_lookup)
|
|
93 | 100 |
|
94 |
- request = operations_pb2.ListOperationsRequest()
|
|
101 |
+ request = operations_pb2.ListOperationsRequest(name=instance_name)
|
|
95 | 102 |
response = instance.ListOperations(request, context)
|
96 | 103 |
|
97 |
- assert response.operations[0].name == response_execute.name
|
|
104 |
+ assert response.operations[0].name.split('/')[-1] == response_execute.name
|
|
105 |
+ |
|
98 | 106 |
|
107 |
+def test_list_operations_instance_fail(instance, buildgrid, execute_request, context):
|
|
108 |
+ buildgrid.execute(execute_request.action_digest,
|
|
109 |
+ execute_request.skip_cache_lookup)
|
|
99 | 110 |
|
100 |
-def test_list_operations_with_result(instance, execute_request, context):
|
|
101 |
- response_execute = instance._instance.execute(execute_request.action_digest,
|
|
102 |
- execute_request.skip_cache_lookup)
|
|
111 |
+ request = operations_pb2.ListOperationsRequest()
|
|
112 |
+ instance.ListOperations(request, context)
|
|
113 |
+ |
|
114 |
+ context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
|
|
115 |
+ |
|
116 |
+ |
|
117 |
+def test_list_operations_with_result(instance, buildgrid, execute_request, context):
|
|
118 |
+ response_execute = buildgrid.execute(execute_request.action_digest,
|
|
119 |
+ execute_request.skip_cache_lookup)
|
|
103 | 120 |
|
104 | 121 |
action_result = remote_execution_pb2.ActionResult()
|
105 | 122 |
output_file = remote_execution_pb2.OutputFile(path='unicorn')
|
106 | 123 |
action_result.output_files.extend([output_file])
|
107 | 124 |
|
108 |
- instance._instance._scheduler.job_complete(response_execute.name, _pack_any(action_result))
|
|
125 |
+ buildgrid._scheduler.job_complete(response_execute.name,
|
|
126 |
+ _pack_any(action_result))
|
|
109 | 127 |
|
110 |
- request = operations_pb2.ListOperationsRequest()
|
|
128 |
+ request = operations_pb2.ListOperationsRequest(name=instance_name)
|
|
111 | 129 |
response = instance.ListOperations(request, context)
|
112 | 130 |
|
113 |
- assert response.operations[0].name == response_execute.name
|
|
131 |
+ assert response.operations[0].name.split('/')[-1] == response_execute.name
|
|
114 | 132 |
|
115 | 133 |
execute_response = remote_execution_pb2.ExecuteResponse()
|
116 | 134 |
response.operations[0].response.Unpack(execute_response)
|
... | ... | @@ -118,7 +136,7 @@ def test_list_operations_with_result(instance, execute_request, context): |
118 | 136 |
|
119 | 137 |
|
120 | 138 |
def test_list_operations_empty(instance, context):
|
121 |
- request = operations_pb2.ListOperationsRequest()
|
|
139 |
+ request = operations_pb2.ListOperationsRequest(name=instance_name)
|
|
122 | 140 |
|
123 | 141 |
response = instance.ListOperations(request, context)
|
124 | 142 |
|
... | ... | @@ -126,21 +144,23 @@ def test_list_operations_empty(instance, context): |
126 | 144 |
|
127 | 145 |
|
128 | 146 |
# Send execution off, delete, try to find operation should fail
|
129 |
-def test_delete_operation(instance, execute_request, context):
|
|
130 |
- response_execute = instance._instance.execute(execute_request.action_digest,
|
|
131 |
- execute_request.skip_cache_lookup)
|
|
147 |
+def test_delete_operation(instance, buildgrid, execute_request, context):
|
|
148 |
+ response_execute = buildgrid.execute(execute_request.action_digest,
|
|
149 |
+ execute_request.skip_cache_lookup)
|
|
132 | 150 |
request = operations_pb2.DeleteOperationRequest()
|
133 |
- request.name = response_execute.name
|
|
151 |
+ request.name = "{}/{}".format(instance_name, response_execute.name)
|
|
134 | 152 |
instance.DeleteOperation(request, context)
|
135 | 153 |
|
136 | 154 |
request = operations_pb2.GetOperationRequest()
|
137 |
- request.name = response_execute.name
|
|
155 |
+ request.name = "{}/{}".format(instance_name, response_execute.name)
|
|
156 |
+ |
|
138 | 157 |
with pytest.raises(InvalidArgumentError):
|
139 |
- instance._instance.get_operation(response_execute.name)
|
|
158 |
+ buildgrid.get_operation(response_execute.name)
|
|
140 | 159 |
|
141 | 160 |
|
142 |
-def test_delete_operation_fail(instance, execute_request, context):
|
|
161 |
+def test_delete_operation_fail(instance, context):
|
|
143 | 162 |
request = operations_pb2.DeleteOperationRequest()
|
163 |
+ request.name = "{}/{}".format(instance_name, "runner")
|
|
144 | 164 |
instance.DeleteOperation(request, context)
|
145 | 165 |
|
146 | 166 |
context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
|
... | ... | @@ -148,11 +168,19 @@ def test_delete_operation_fail(instance, execute_request, context): |
148 | 168 |
|
149 | 169 |
def test_cancel_operation(instance, context):
|
150 | 170 |
request = operations_pb2.CancelOperationRequest()
|
171 |
+ request.name = "{}/{}".format(instance_name, "runner")
|
|
151 | 172 |
instance.CancelOperation(request, context)
|
152 | 173 |
|
153 | 174 |
context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
|
154 | 175 |
|
155 | 176 |
|
177 |
+def test_cancel_operation_instance_fail(instance, context):
|
|
178 |
+ request = operations_pb2.CancelOperationRequest()
|
|
179 |
+ instance.CancelOperation(request, context)
|
|
180 |
+ |
|
181 |
+ context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
|
|
182 |
+ |
|
183 |
+ |
|
156 | 184 |
def _pack_any(pack):
|
157 | 185 |
some_any = any_pb2.Any()
|
158 | 186 |
some_any.Pack(pack)
|