finn pushed to branch finn/separate-services at BuildGrid / buildgrid
Commits:
-
0e6cf60e
by finn at 2018-09-11T07:53:18Z
-
5fe52c60
by finn at 2018-09-11T07:53:21Z
-
410d0a4b
by finn at 2018-09-11T07:53:21Z
-
a3c9a7e3
by Finn Ball at 2018-09-11T07:53:21Z
-
72351889
by Finn Ball at 2018-09-11T07:53:21Z
25 changed files:
- .gitlab-ci.yml
- buildgrid/_app/cli.py
- buildgrid/_app/commands/cmd_server.py
- + buildgrid/_app/server.py
- + buildgrid/_app/settings/default.yml
- + buildgrid/_app/settings/parser.py
- buildgrid/server/actioncache/service.py
- buildgrid/server/bots/service.py
- − buildgrid/server/buildgrid_server.py
- + buildgrid/server/cas/instance.py
- buildgrid/server/cas/service.py
- buildgrid/server/instance.py → buildgrid/server/controller.py
- buildgrid/server/execution/service.py
- buildgrid/server/operations/service.py
- buildgrid/server/referencestorage/service.py
- docs/source/using_dummy_build.rst
- docs/source/using_simple_build.rst
- setup.py
- tests/cas/test_services.py
- tests/cas/test_storage.py
- tests/integration/action_cache_service.py
- tests/integration/bots_service.py
- tests/integration/execution_service.py
- tests/integration/operations_service.py
- tests/integration/reference_storage_service.py
Changes:
... | ... | @@ -30,7 +30,7 @@ before_script: |
30 | 30 |
.run-dummy-job-template: &dummy-job
|
31 | 31 |
stage: test
|
32 | 32 |
script:
|
33 |
- - ${BGD} server start --allow-insecure &
|
|
33 |
+ - ${BGD} server start buildgrid/_app/settings/default.yml &
|
|
34 | 34 |
- sleep 1 # Allow server to boot
|
35 | 35 |
- ${BGD} bot dummy &
|
36 | 36 |
- ${BGD} execute request-dummy --wait-for-completion
|
... | ... | @@ -170,11 +170,8 @@ class BuildGridCLI(click.MultiCommand): |
170 | 170 |
return commands
|
171 | 171 |
|
172 | 172 |
def get_command(self, context, name):
|
173 |
- try:
|
|
174 |
- mod = __import__(name='buildgrid._app.commands.cmd_{}'.format(name),
|
|
175 |
- fromlist=['cli'])
|
|
176 |
- except ImportError:
|
|
177 |
- return None
|
|
173 |
+ mod = __import__(name='buildgrid._app.commands.cmd_{}'.format(name),
|
|
174 |
+ fromlist=['cli'])
|
|
178 | 175 |
return mod.cli
|
179 | 176 |
|
180 | 177 |
|
... | ... | @@ -22,20 +22,17 @@ Create a BuildGrid server. |
22 | 22 |
|
23 | 23 |
import asyncio
|
24 | 24 |
import logging
|
25 |
-import sys
|
|
26 | 25 |
|
27 | 26 |
import click
|
28 | 27 |
|
29 |
-from buildgrid.server import buildgrid_server
|
|
30 |
-from buildgrid.server.cas.storage.disk import DiskStorage
|
|
31 |
-from buildgrid.server.cas.storage.lru_memory_cache import LRUMemoryCache
|
|
32 |
-from buildgrid.server.cas.storage.s3 import S3Storage
|
|
33 |
-from buildgrid.server.cas.storage.with_cache import WithCacheStorage
|
|
28 |
+from buildgrid.server.controller import ExecutionController
|
|
34 | 29 |
from buildgrid.server.actioncache.storage import ActionCache
|
30 |
+from buildgrid.server.cas.instance import ByteStreamInstance, ContentAddressableStorageInstance
|
|
31 |
+from buildgrid.server.referencestorage.storage import ReferenceCache
|
|
35 | 32 |
|
36 | 33 |
from ..cli import pass_context
|
37 |
- |
|
38 |
-_SIZE_PREFIXES = {'k': 2 ** 10, 'm': 2 ** 20, 'g': 2 ** 30, 't': 2 ** 40}
|
|
34 |
+from ..settings import parser
|
|
35 |
+from ..server import BuildGridServer
|
|
39 | 36 |
|
40 | 37 |
|
41 | 38 |
@click.group(name='server', short_help="Start a local server instance.")
|
... | ... | @@ -45,71 +42,31 @@ def cli(context): |
45 | 42 |
|
46 | 43 |
|
47 | 44 |
@cli.command('start', short_help="Setup a new server instance.")
|
48 |
-@click.argument('instances', nargs=-1, type=click.STRING)
|
|
49 |
-@click.option('--port', type=click.INT, default='50051', show_default=True,
|
|
50 |
- help="The port number to be listened.")
|
|
51 |
-@click.option('--server-key', type=click.Path(exists=True, dir_okay=False), default=None,
|
|
52 |
- help="Private server key for TLS (PEM-encoded)")
|
|
53 |
-@click.option('--server-cert', type=click.Path(exists=True, dir_okay=False), default=None,
|
|
54 |
- help="Public server certificate for TLS (PEM-encoded)")
|
|
55 |
-@click.option('--client-certs', type=click.Path(exists=True, dir_okay=False), default=None,
|
|
56 |
- help="Public client certificates for TLS (PEM-encoded, one single file)")
|
|
57 |
-@click.option('--allow-insecure', type=click.BOOL, is_flag=True,
|
|
58 |
- help="Whether or not to allow unencrypted connections.")
|
|
59 |
-@click.option('--allow-update-action-result/--forbid-update-action-result',
|
|
60 |
- 'allow_uar', default=True, show_default=True,
|
|
61 |
- help="Whether or not to allow clients to manually edit the action cache.")
|
|
62 |
-@click.option('--max-cached-actions', type=click.INT, default=50, show_default=True,
|
|
63 |
- help="Maximum number of actions to keep in the ActionCache.")
|
|
64 |
-@click.option('--cas', type=click.Choice(('lru', 's3', 'disk', 'with-cache')),
|
|
65 |
- help="The CAS storage type to use.")
|
|
66 |
-@click.option('--cas-cache', type=click.Choice(('lru', 's3', 'disk')),
|
|
67 |
- help="For --cas=with-cache, the CAS storage to use as the cache.")
|
|
68 |
-@click.option('--cas-fallback', type=click.Choice(('lru', 's3', 'disk')),
|
|
69 |
- help="For --cas=with-cache, the CAS storage to use as the fallback.")
|
|
70 |
-@click.option('--cas-lru-size', type=click.STRING,
|
|
71 |
- help="For --cas=lru, the LRU cache's memory limit.")
|
|
72 |
-@click.option('--cas-s3-bucket', type=click.STRING,
|
|
73 |
- help="For --cas=s3, the bucket name.")
|
|
74 |
-@click.option('--cas-s3-endpoint', type=click.STRING,
|
|
75 |
- help="For --cas=s3, the endpoint URI.")
|
|
76 |
-@click.option('--cas-disk-directory', type=click.Path(file_okay=False, dir_okay=True, writable=True),
|
|
77 |
- help="For --cas=disk, the folder to store CAS blobs in.")
|
|
45 |
+@click.argument('CONFIG', type=click.Path(file_okay=True, dir_okay=False, writable=False))
|
|
78 | 46 |
@pass_context
|
79 |
-def start(context, port, allow_insecure, server_key, server_cert, client_certs,
|
|
80 |
- instances, max_cached_actions, allow_uar, cas, **cas_args):
|
|
81 |
- """Setups a new server instance."""
|
|
82 |
- credentials = None
|
|
83 |
- if not allow_insecure:
|
|
84 |
- credentials = context.load_server_credentials(server_key, server_cert, client_certs)
|
|
85 |
- if not credentials and not allow_insecure:
|
|
86 |
- click.echo("ERROR: no TLS keys were specified and no defaults could be found.\n" +
|
|
87 |
- "Use --allow-insecure in order to deactivate TLS encryption.\n", err=True)
|
|
88 |
- sys.exit(-1)
|
|
89 |
- |
|
90 |
- context.credentials = credentials
|
|
91 |
- context.port = port
|
|
92 |
- |
|
93 |
- context.logger.info("BuildGrid server booting up")
|
|
94 |
- context.logger.info("Starting on port {}".format(port))
|
|
95 |
- |
|
96 |
- cas_storage = _make_cas_storage(context, cas, cas_args)
|
|
97 |
- |
|
98 |
- if cas_storage is None:
|
|
99 |
- context.logger.info("Running without CAS - action cache will be unavailable")
|
|
100 |
- action_cache = None
|
|
101 |
- |
|
102 |
- else:
|
|
103 |
- action_cache = ActionCache(cas_storage, max_cached_actions, allow_uar)
|
|
104 |
- |
|
105 |
- if instances is None:
|
|
106 |
- instances = ['main']
|
|
107 |
- |
|
108 |
- server = buildgrid_server.BuildGridServer(port=context.port,
|
|
109 |
- credentials=context.credentials,
|
|
110 |
- instances=instances,
|
|
111 |
- cas_storage=cas_storage,
|
|
112 |
- action_cache=action_cache)
|
|
47 |
+def start(context, config):
|
|
48 |
+ with open(config) as f:
|
|
49 |
+ settings = parser.get_parser().safe_load(f)
|
|
50 |
+ |
|
51 |
+ server_settings = settings['server']
|
|
52 |
+ |
|
53 |
+ instances = settings['instances']
|
|
54 |
+ |
|
55 |
+ execution_controllers = _instance_maker(instances, ExecutionController)
|
|
56 |
+ reference_caches = _instance_maker(instances, ReferenceCache)
|
|
57 |
+ action_caches = _instance_maker(instances, ActionCache)
|
|
58 |
+ cas = _instance_maker(instances, ContentAddressableStorageInstance)
|
|
59 |
+ bytestreams = _instance_maker(instances, ByteStreamInstance)
|
|
60 |
+ |
|
61 |
+ port = server_settings['port']
|
|
62 |
+ server = BuildGridServer(port=port,
|
|
63 |
+ execution_controller=execution_controllers,
|
|
64 |
+ reference_storage_instances=reference_caches,
|
|
65 |
+ action_cache_instances=action_caches,
|
|
66 |
+ cas_instances=cas,
|
|
67 |
+ bytestream_instances=bytestreams)
|
|
68 |
+ |
|
69 |
+ context.logger.info("Starting server on port {}".format(port))
|
|
113 | 70 |
loop = asyncio.get_event_loop()
|
114 | 71 |
try:
|
115 | 72 |
server.start()
|
... | ... | @@ -119,57 +76,20 @@ def start(context, port, allow_insecure, server_key, server_cert, client_certs, |
119 | 76 |
pass
|
120 | 77 |
|
121 | 78 |
finally:
|
79 |
+ context.logger.info("Stopping server")
|
|
122 | 80 |
server.stop()
|
123 | 81 |
loop.close()
|
124 | 82 |
|
125 | 83 |
|
126 |
-def _make_cas_storage(context, cas_type, cas_args):
|
|
127 |
- """Returns the storage provider corresponding to the given `cas_type`,
|
|
128 |
- or None if the provider cannot be created.
|
|
129 |
- """
|
|
130 |
- if cas_type == "lru":
|
|
131 |
- if cas_args["cas_lru_size"] is None:
|
|
132 |
- context.logger.error("--cas-lru-size is required for LRU CAS")
|
|
133 |
- return None
|
|
134 |
- try:
|
|
135 |
- size = _parse_size(cas_args["cas_lru_size"])
|
|
136 |
- except ValueError:
|
|
137 |
- context.logger.error('Invalid LRU size "{0}"'.format(cas_args["cas_lru_size"]))
|
|
138 |
- return None
|
|
139 |
- return LRUMemoryCache(size)
|
|
140 |
- elif cas_type == "s3":
|
|
141 |
- if cas_args["cas_s3_bucket"] is None:
|
|
142 |
- context.logger.error("--cas-s3-bucket is required for S3 CAS")
|
|
143 |
- return None
|
|
144 |
- if cas_args["cas_s3_endpoint"] is not None:
|
|
145 |
- return S3Storage(cas_args["cas_s3_bucket"],
|
|
146 |
- endpoint_url=cas_args["cas_s3_endpoint"])
|
|
147 |
- return S3Storage(cas_args["cas_s3_bucket"])
|
|
148 |
- elif cas_type == "disk":
|
|
149 |
- if cas_args["cas_disk_directory"] is None:
|
|
150 |
- context.logger.error("--cas-disk-directory is required for disk CAS")
|
|
151 |
- return None
|
|
152 |
- return DiskStorage(cas_args["cas_disk_directory"])
|
|
153 |
- elif cas_type == "with-cache":
|
|
154 |
- cache = _make_cas_storage(context, cas_args["cas_cache"], cas_args)
|
|
155 |
- fallback = _make_cas_storage(context, cas_args["cas_fallback"], cas_args)
|
|
156 |
- if cache is None:
|
|
157 |
- context.logger.error("Missing cache provider for --cas=with-cache")
|
|
158 |
- return None
|
|
159 |
- elif fallback is None:
|
|
160 |
- context.logger.error("Missing fallback provider for --cas=with-cache")
|
|
161 |
- return None
|
|
162 |
- return WithCacheStorage(cache, fallback)
|
|
163 |
- elif cas_type is None:
|
|
164 |
- return None
|
|
165 |
- return None
|
|
166 |
- |
|
167 |
- |
|
168 |
-def _parse_size(size):
|
|
169 |
- """Convert a string containing a size in bytes (e.g. '2GB') to a number."""
|
|
170 |
- size = size.lower()
|
|
171 |
- if size[-1] == 'b':
|
|
172 |
- size = size[:-1]
|
|
173 |
- if size[-1] in _SIZE_PREFIXES:
|
|
174 |
- return int(size[:-1]) * _SIZE_PREFIXES[size[-1]]
|
|
175 |
- return int(size)
|
|
84 |
+# Turn away now if you want to keep your eyes
|
|
85 |
+def _instance_maker(instances, service_type):
|
|
86 |
+ # TODO get this mapped in parser
|
|
87 |
+ made = {}
|
|
88 |
+ |
|
89 |
+ for instance in instances:
|
|
90 |
+ services = instance['services']
|
|
91 |
+ instance_name = instance['name']
|
|
92 |
+ for service in services:
|
|
93 |
+ if isinstance(service, service_type):
|
|
94 |
+ made[instance_name] = service
|
|
95 |
+ return made
|
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+ |
|
16 |
+"""
|
|
17 |
+BuildGridServer
|
|
18 |
+==============
|
|
19 |
+ |
|
20 |
+Creates a BuildGrid server, binding all the requisite service instances together.
|
|
21 |
+"""
|
|
22 |
+ |
|
23 |
+import logging
|
|
24 |
+from concurrent import futures
|
|
25 |
+ |
|
26 |
+import grpc
|
|
27 |
+ |
|
28 |
+from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
|
|
29 |
+from buildgrid.server.actioncache.service import ActionCacheService
|
|
30 |
+from buildgrid.server.execution.service import ExecutionService
|
|
31 |
+from buildgrid.server.operations.service import OperationsService
|
|
32 |
+from buildgrid.server.bots.service import BotsService
|
|
33 |
+from buildgrid.server.referencestorage.service import ReferenceStorageService
|
|
34 |
+ |
|
35 |
+ |
|
36 |
+class BuildGridServer:
|
|
37 |
+ |
|
38 |
+ def __init__(self, port=50051, max_workers=10, credentials=None,
|
|
39 |
+ execution_controller=None, reference_storage_instances=None,
|
|
40 |
+ action_cache_instances=None, cas_instances=None, bytestream_instances=None):
|
|
41 |
+ |
|
42 |
+ self.logger = logging.getLogger(__name__)
|
|
43 |
+ address = '[::]:{0}'.format(port)
|
|
44 |
+ |
|
45 |
+ server = grpc.server(futures.ThreadPoolExecutor(max_workers))
|
|
46 |
+ |
|
47 |
+ if credentials is not None:
|
|
48 |
+ self.logger.info("Secure connection")
|
|
49 |
+ server.add_secure_port(address, credentials)
|
|
50 |
+ |
|
51 |
+ else:
|
|
52 |
+ self.logger.info("Insecure connection")
|
|
53 |
+ server.add_insecure_port(address)
|
|
54 |
+ |
|
55 |
+ if execution_controller:
|
|
56 |
+ self.logger.debug("Adding execution controllers {}".format(
|
|
57 |
+ execution_controller.keys()))
|
|
58 |
+ ExecutionService(server, execution_controller)
|
|
59 |
+ BotsService(server, execution_controller)
|
|
60 |
+ OperationsService(server, execution_controller)
|
|
61 |
+ |
|
62 |
+ if reference_storage_instances:
|
|
63 |
+ self.logger.debug("Adding reference storages {}".format(
|
|
64 |
+ reference_storage_instances.keys()))
|
|
65 |
+ ReferenceStorageService(server, reference_storage_instances)
|
|
66 |
+ |
|
67 |
+ if action_cache_instances:
|
|
68 |
+ self.logger.debug("Adding action cache instances {}".format(
|
|
69 |
+ action_cache_instances.keys()))
|
|
70 |
+ ActionCacheService(server, action_cache_instances)
|
|
71 |
+ |
|
72 |
+ if cas_instances:
|
|
73 |
+ self.logger.debug("Adding cas instances {}".format(
|
|
74 |
+ cas_instances.keys()))
|
|
75 |
+ ContentAddressableStorageService(server, cas_instances)
|
|
76 |
+ |
|
77 |
+ if bytestream_instances:
|
|
78 |
+ self.logger.debug("Adding bytestream instances {}".format(
|
|
79 |
+ bytestream_instances.keys()))
|
|
80 |
+ ByteStreamService(server, bytestream_instances)
|
|
81 |
+ |
|
82 |
+ self._server = server
|
|
83 |
+ |
|
84 |
+ def start(self):
|
|
85 |
+ self._server.start()
|
|
86 |
+ |
|
87 |
+ def stop(self):
|
|
88 |
+ self._server.stop(grace=0)
|
1 |
+server:
|
|
2 |
+ port: 50051
|
|
3 |
+ tls-server-key: null
|
|
4 |
+ tls-server-cert: null
|
|
5 |
+ tls-client-certs: null
|
|
6 |
+ insecure-mode: true
|
|
7 |
+ |
|
8 |
+description: |
|
|
9 |
+ A single default instance
|
|
10 |
+ |
|
11 |
+instances:
|
|
12 |
+ - name: main
|
|
13 |
+ description: |
|
|
14 |
+ The main server
|
|
15 |
+ |
|
16 |
+ storages:
|
|
17 |
+ - !disk-storage &main-storage
|
|
18 |
+ path: ~/cas/
|
|
19 |
+ |
|
20 |
+ services:
|
|
21 |
+ - !action-cache &main-action
|
|
22 |
+ storage: *main-storage
|
|
23 |
+ max_cached_refs: 256
|
|
24 |
+ allow_updates: true
|
|
25 |
+ |
|
26 |
+ - !execution
|
|
27 |
+ storage: *main-storage
|
|
28 |
+ action_cache: *main-action
|
|
29 |
+ |
|
30 |
+ - !cas
|
|
31 |
+ storage: *main-storage
|
|
32 |
+ |
|
33 |
+ - !bytestream
|
|
34 |
+ storage: *main-storage
|
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+ |
|
16 |
+import yaml
|
|
17 |
+ |
|
18 |
+from buildgrid.server.controller import ExecutionController
|
|
19 |
+from buildgrid.server.actioncache.storage import ActionCache
|
|
20 |
+from buildgrid.server.cas.instance import ByteStreamInstance, ContentAddressableStorageInstance
|
|
21 |
+from buildgrid.server.cas.storage.disk import DiskStorage
|
|
22 |
+from buildgrid.server.cas.storage.lru_memory_cache import LRUMemoryCache
|
|
23 |
+ |
|
24 |
+ |
|
25 |
+class YamlFactory(yaml.YAMLObject):
|
|
26 |
+ @classmethod
|
|
27 |
+ def from_yaml(cls, loader, node):
|
|
28 |
+ values = loader.construct_mapping(node, deep=True)
|
|
29 |
+ return cls(**values)
|
|
30 |
+ |
|
31 |
+ |
|
32 |
+class Disk(YamlFactory):
|
|
33 |
+ |
|
34 |
+ yaml_tag = u'!disk-storage'
|
|
35 |
+ |
|
36 |
+ def __new__(cls, path):
|
|
37 |
+ return DiskStorage(path)
|
|
38 |
+ |
|
39 |
+ |
|
40 |
+class LRU(YamlFactory):
|
|
41 |
+ |
|
42 |
+ yaml_tag = u'!lru-storage'
|
|
43 |
+ |
|
44 |
+ def __new__(cls, size):
|
|
45 |
+ return LRUMemoryCache(_parse_size(size))
|
|
46 |
+ |
|
47 |
+ |
|
48 |
+class Execution(YamlFactory):
|
|
49 |
+ |
|
50 |
+ yaml_tag = u'!execution'
|
|
51 |
+ |
|
52 |
+ def __new__(cls, storage, action_cache=None):
|
|
53 |
+ return ExecutionController(action_cache, storage)
|
|
54 |
+ |
|
55 |
+ |
|
56 |
+class Action(YamlFactory):
|
|
57 |
+ |
|
58 |
+ yaml_tag = u'!action-cache'
|
|
59 |
+ |
|
60 |
+ def __new__(cls, storage, max_cached_refs=0, allow_updates=True):
|
|
61 |
+ return ActionCache(storage, max_cached_refs, allow_updates)
|
|
62 |
+ |
|
63 |
+ |
|
64 |
+class CAS(YamlFactory):
|
|
65 |
+ |
|
66 |
+ yaml_tag = u'!cas'
|
|
67 |
+ |
|
68 |
+ def __new__(cls, storage):
|
|
69 |
+ return ContentAddressableStorageInstance(storage)
|
|
70 |
+ |
|
71 |
+ |
|
72 |
+class ByteStream(YamlFactory):
|
|
73 |
+ |
|
74 |
+ yaml_tag = u'!bytestream'
|
|
75 |
+ |
|
76 |
+ def __new__(cls, storage):
|
|
77 |
+ return ByteStreamInstance(storage)
|
|
78 |
+ |
|
79 |
+ |
|
80 |
+def _parse_size(size):
|
|
81 |
+ """Convert a string containing a size in bytes (e.g. '2GB') to a number."""
|
|
82 |
+ _size_prefixes = {'k': 2 ** 10, 'm': 2 ** 20, 'g': 2 ** 30, 't': 2 ** 40}
|
|
83 |
+ size = size.lower()
|
|
84 |
+ |
|
85 |
+ if size[-1] == 'b':
|
|
86 |
+ size = size[:-1]
|
|
87 |
+ if size[-1] in _size_prefixes:
|
|
88 |
+ return int(size[:-1]) * _size_prefixes[size[-1]]
|
|
89 |
+ return int(size)
|
|
90 |
+ |
|
91 |
+ |
|
92 |
+def get_parser():
|
|
93 |
+ |
|
94 |
+ yaml.SafeLoader.add_constructor(Execution.yaml_tag, Execution.from_yaml)
|
|
95 |
+ yaml.SafeLoader.add_constructor(Execution.yaml_tag, Execution.from_yaml)
|
|
96 |
+ yaml.SafeLoader.add_constructor(Action.yaml_tag, Action.from_yaml)
|
|
97 |
+ yaml.SafeLoader.add_constructor(Disk.yaml_tag, Disk.from_yaml)
|
|
98 |
+ yaml.SafeLoader.add_constructor(LRU.yaml_tag, LRU.from_yaml)
|
|
99 |
+ yaml.SafeLoader.add_constructor(CAS.yaml_tag, CAS.from_yaml)
|
|
100 |
+ yaml.SafeLoader.add_constructor(ByteStream.yaml_tag, ByteStream.from_yaml)
|
|
101 |
+ |
|
102 |
+ return yaml
|
... | ... | @@ -27,18 +27,27 @@ import grpc |
27 | 27 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
28 | 28 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
|
29 | 29 |
|
30 |
-from .._exceptions import NotFoundError
|
|
30 |
+from .._exceptions import InvalidArgumentError, NotFoundError
|
|
31 | 31 |
|
32 | 32 |
|
33 | 33 |
class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer):
|
34 | 34 |
|
35 |
- def __init__(self, action_cache):
|
|
36 |
- self._action_cache = action_cache
|
|
35 |
+ def __init__(self, server, instances):
|
|
36 |
+ self._instances = instances
|
|
37 |
+ |
|
37 | 38 |
self.logger = logging.getLogger(__name__)
|
38 | 39 |
|
40 |
+ remote_execution_pb2_grpc.add_ActionCacheServicer_to_server(self, server)
|
|
41 |
+ |
|
39 | 42 |
def GetActionResult(self, request, context):
|
40 | 43 |
try:
|
41 |
- return self._action_cache.get_action_result(request.action_digest)
|
|
44 |
+ instance = self._get_instance(request.instance_name)
|
|
45 |
+ return instance.get_action_result(request.action_digest)
|
|
46 |
+ |
|
47 |
+ except InvalidArgumentError as e:
|
|
48 |
+ self.logger.error(e)
|
|
49 |
+ context.set_details(str(e))
|
|
50 |
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
|
42 | 51 |
|
43 | 52 |
except NotFoundError as e:
|
44 | 53 |
self.logger.error(e)
|
... | ... | @@ -48,11 +57,24 @@ class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer): |
48 | 57 |
|
49 | 58 |
def UpdateActionResult(self, request, context):
|
50 | 59 |
try:
|
51 |
- self._action_cache.update_action_result(request.action_digest, request.action_result)
|
|
60 |
+ instance = self._get_instance(request.instance_name)
|
|
61 |
+ instance.update_action_result(request.action_digest, request.action_result)
|
|
52 | 62 |
return request.action_result
|
53 | 63 |
|
64 |
+ except InvalidArgumentError as e:
|
|
65 |
+ self.logger.error(e)
|
|
66 |
+ context.set_details(str(e))
|
|
67 |
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
|
68 |
+ |
|
54 | 69 |
except NotImplementedError as e:
|
55 | 70 |
self.logger.error(e)
|
56 | 71 |
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
57 | 72 |
|
58 | 73 |
return remote_execution_pb2.ActionResult()
|
74 |
+ |
|
75 |
+ def _get_instance(self, instance_name):
|
|
76 |
+ try:
|
|
77 |
+ return self._instances[instance_name]
|
|
78 |
+ |
|
79 |
+ except KeyError:
|
|
80 |
+ raise InvalidArgumentError("Invalid instance name: {}".format(instance_name))
|
... | ... | @@ -33,10 +33,12 @@ from .._exceptions import InvalidArgumentError, OutofSyncError |
33 | 33 |
|
34 | 34 |
class BotsService(bots_pb2_grpc.BotsServicer):
|
35 | 35 |
|
36 |
- def __init__(self, instances):
|
|
36 |
+ def __init__(self, server, instances):
|
|
37 | 37 |
self._instances = instances
|
38 | 38 |
self.logger = logging.getLogger(__name__)
|
39 | 39 |
|
40 |
+ bots_pb2_grpc.add_BotsServicer_to_server(self, server)
|
|
41 |
+ |
|
40 | 42 |
def CreateBotSession(self, request, context):
|
41 | 43 |
try:
|
42 | 44 |
parent = request.parent
|
1 |
-# Copyright (C) 2018 Bloomberg LP
|
|
2 |
-#
|
|
3 |
-# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
-# you may not use this file except in compliance with the License.
|
|
5 |
-# You may obtain a copy of the License at
|
|
6 |
-#
|
|
7 |
-# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
-#
|
|
9 |
-# Unless required by applicable law or agreed to in writing, software
|
|
10 |
-# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
-# See the License for the specific language governing permissions and
|
|
13 |
-# limitations under the License.
|
|
14 |
- |
|
15 |
- |
|
16 |
-"""
|
|
17 |
-BuildGridServer
|
|
18 |
-==============
|
|
19 |
- |
|
20 |
-Creates the user a local server BuildGrid server.
|
|
21 |
-"""
|
|
22 |
- |
|
23 |
-from concurrent import futures
|
|
24 |
- |
|
25 |
-import grpc
|
|
26 |
- |
|
27 |
-from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
|
|
28 |
-from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
|
|
29 |
-from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc
|
|
30 |
-from buildgrid._protos.google.longrunning import operations_pb2_grpc
|
|
31 |
- |
|
32 |
-from .instance import BuildGridInstance
|
|
33 |
-from .cas.service import ByteStreamService, ContentAddressableStorageService
|
|
34 |
-from .actioncache.service import ActionCacheService
|
|
35 |
-from .execution.service import ExecutionService
|
|
36 |
-from .operations.service import OperationsService
|
|
37 |
-from .bots.service import BotsService
|
|
38 |
- |
|
39 |
- |
|
40 |
-class BuildGridServer:
|
|
41 |
- |
|
42 |
- def __init__(self, port=50051, credentials=None, instances=None,
|
|
43 |
- max_workers=10, action_cache=None, cas_storage=None):
|
|
44 |
- address = '[::]:{0}'.format(port)
|
|
45 |
- |
|
46 |
- self._server = grpc.server(futures.ThreadPoolExecutor(max_workers))
|
|
47 |
- |
|
48 |
- if credentials is not None:
|
|
49 |
- self._server.add_secure_port(address, credentials)
|
|
50 |
- else:
|
|
51 |
- self._server.add_insecure_port(address)
|
|
52 |
- |
|
53 |
- if cas_storage is not None:
|
|
54 |
- cas_service = ContentAddressableStorageService(cas_storage)
|
|
55 |
- remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(cas_service,
|
|
56 |
- self._server)
|
|
57 |
- bytestream_pb2_grpc.add_ByteStreamServicer_to_server(ByteStreamService(cas_storage),
|
|
58 |
- self._server)
|
|
59 |
- if action_cache is not None:
|
|
60 |
- action_cache_service = ActionCacheService(action_cache)
|
|
61 |
- remote_execution_pb2_grpc.add_ActionCacheServicer_to_server(action_cache_service,
|
|
62 |
- self._server)
|
|
63 |
- |
|
64 |
- buildgrid_instances = {}
|
|
65 |
- if not instances:
|
|
66 |
- buildgrid_instances["main"] = BuildGridInstance(action_cache, cas_storage)
|
|
67 |
- else:
|
|
68 |
- for name in instances:
|
|
69 |
- buildgrid_instances[name] = BuildGridInstance(action_cache, cas_storage)
|
|
70 |
- |
|
71 |
- bots_pb2_grpc.add_BotsServicer_to_server(BotsService(buildgrid_instances),
|
|
72 |
- self._server)
|
|
73 |
- remote_execution_pb2_grpc.add_ExecutionServicer_to_server(ExecutionService(buildgrid_instances),
|
|
74 |
- self._server)
|
|
75 |
- operations_pb2_grpc.add_OperationsServicer_to_server(OperationsService(buildgrid_instances),
|
|
76 |
- self._server)
|
|
77 |
- |
|
78 |
- def start(self):
|
|
79 |
- self._server.start()
|
|
80 |
- |
|
81 |
- def stop(self):
|
|
82 |
- self._server.stop(0)
|
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+ |
|
16 |
+"""
|
|
17 |
+Storage Instances
|
|
18 |
+=========
|
|
19 |
+Instances of CAS and ByteStream
|
|
20 |
+"""
|
|
21 |
+ |
|
22 |
+from buildgrid._protos.google.bytestream import bytestream_pb2
|
|
23 |
+from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
|
|
24 |
+ |
|
25 |
+from .._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
|
|
26 |
+from ...settings import HASH
|
|
27 |
+ |
|
28 |
+ |
|
29 |
+class ContentAddressableStorageInstance:
|
|
30 |
+ |
|
31 |
+ def __init__(self, storage):
|
|
32 |
+ self._storage = storage
|
|
33 |
+ |
|
34 |
+ def find_missing_blobs(self, blob_digests):
|
|
35 |
+ storage = self._storage
|
|
36 |
+ return re_pb2.FindMissingBlobsResponse(
|
|
37 |
+ missing_blob_digests=storage.missing_blobs(blob_digests))
|
|
38 |
+ |
|
39 |
+ def batch_update_blobs(self, requests):
|
|
40 |
+ storage = self._storage
|
|
41 |
+ store = []
|
|
42 |
+ for request_proto in requests:
|
|
43 |
+ store.append((request_proto.digest, request_proto.data))
|
|
44 |
+ |
|
45 |
+ response = re_pb2.BatchUpdateBlobsResponse()
|
|
46 |
+ statuses = storage.bulk_update_blobs(store)
|
|
47 |
+ |
|
48 |
+ for (digest, _), status in zip(store, statuses):
|
|
49 |
+ response_proto = response.responses.add()
|
|
50 |
+ response_proto.digest.CopyFrom(digest)
|
|
51 |
+ response_proto.status.CopyFrom(status)
|
|
52 |
+ |
|
53 |
+ return response
|
|
54 |
+ |
|
55 |
+ |
|
56 |
+class ByteStreamInstance:
|
|
57 |
+ |
|
58 |
+ BLOCK_SIZE = 1 * 1024 * 1024 # 1 MB block size
|
|
59 |
+ |
|
60 |
+ def __init__(self, storage):
|
|
61 |
+ self._storage = storage
|
|
62 |
+ |
|
63 |
+ def read(self, path, read_offset, read_limit):
|
|
64 |
+ storage = self._storage
|
|
65 |
+ |
|
66 |
+ if path[0] == "blobs":
|
|
67 |
+ path = [""] + path
|
|
68 |
+ |
|
69 |
+ # Parse/verify resource name.
|
|
70 |
+ # Read resource names look like "[instance/]blobs/abc123hash/99".
|
|
71 |
+ digest = re_pb2.Digest(hash=path[2], size_bytes=int(path[3]))
|
|
72 |
+ |
|
73 |
+ # Check the given read offset and limit.
|
|
74 |
+ if read_offset < 0 or read_offset > digest.size_bytes:
|
|
75 |
+ raise OutOfRangeError("Read offset out of range")
|
|
76 |
+ |
|
77 |
+ elif read_limit == 0:
|
|
78 |
+ bytes_remaining = digest.size_bytes - read_offset
|
|
79 |
+ |
|
80 |
+ elif read_limit > 0:
|
|
81 |
+ bytes_remaining = read_limit
|
|
82 |
+ |
|
83 |
+ else:
|
|
84 |
+ raise InvalidArgumentError("Negative read_limit is invalid")
|
|
85 |
+ |
|
86 |
+ # Read the blob from storage and send its contents to the client.
|
|
87 |
+ result = storage.get_blob(digest)
|
|
88 |
+ if result is None:
|
|
89 |
+ raise NotFoundError("Blob not found")
|
|
90 |
+ |
|
91 |
+ elif result.seekable():
|
|
92 |
+ result.seek(read_offset)
|
|
93 |
+ |
|
94 |
+ else:
|
|
95 |
+ result.read(read_offset)
|
|
96 |
+ |
|
97 |
+ while bytes_remaining > 0:
|
|
98 |
+ yield bytestream_pb2.ReadResponse(
|
|
99 |
+ data=result.read(min(self.BLOCK_SIZE, bytes_remaining)))
|
|
100 |
+ bytes_remaining -= self.BLOCK_SIZE
|
|
101 |
+ |
|
102 |
+ def write(self, requests):
|
|
103 |
+ storage = self._storage
|
|
104 |
+ |
|
105 |
+ first_request = next(requests)
|
|
106 |
+ path = first_request.resource_name.split("/")
|
|
107 |
+ |
|
108 |
+ if path[0] == "uploads":
|
|
109 |
+ path = [""] + path
|
|
110 |
+ |
|
111 |
+ digest = re_pb2.Digest(hash=path[4], size_bytes=int(path[5]))
|
|
112 |
+ write_session = storage.begin_write(digest)
|
|
113 |
+ |
|
114 |
+ # Start the write session and write the first request's data.
|
|
115 |
+ write_session.write(first_request.data)
|
|
116 |
+ hash_ = HASH(first_request.data)
|
|
117 |
+ bytes_written = len(first_request.data)
|
|
118 |
+ finished = first_request.finish_write
|
|
119 |
+ |
|
120 |
+ # Handle subsequent write requests.
|
|
121 |
+ while not finished:
|
|
122 |
+ |
|
123 |
+ for request in requests:
|
|
124 |
+ if finished:
|
|
125 |
+ raise InvalidArgumentError("Write request sent after write finished")
|
|
126 |
+ |
|
127 |
+ elif request.write_offset != bytes_written:
|
|
128 |
+ raise InvalidArgumentError("Invalid write offset")
|
|
129 |
+ |
|
130 |
+ elif request.resource_name and request.resource_name != first_request.resource_name:
|
|
131 |
+ raise InvalidArgumentError("Resource name changed mid-write")
|
|
132 |
+ |
|
133 |
+ finished = request.finish_write
|
|
134 |
+ bytes_written += len(request.data)
|
|
135 |
+ if bytes_written > digest.size_bytes:
|
|
136 |
+ raise InvalidArgumentError("Wrote too much data to blob")
|
|
137 |
+ |
|
138 |
+ write_session.write(request.data)
|
|
139 |
+ hash_.update(request.data)
|
|
140 |
+ |
|
141 |
+ # Check that the data matches the provided digest.
|
|
142 |
+ if bytes_written != digest.size_bytes or not finished:
|
|
143 |
+ raise NotImplementedError("Cannot close stream before finishing write")
|
|
144 |
+ |
|
145 |
+ elif hash_.hexdigest() != digest.hash:
|
|
146 |
+ raise InvalidArgumentError("Data does not match hash")
|
|
147 |
+ |
|
148 |
+ storage.commit_write(digest, write_session)
|
|
149 |
+ return bytestream_pb2.WriteResponse(committed_size=bytes_written)
|
... | ... | @@ -21,131 +21,152 @@ Implements the Content Addressable Storage API and ByteStream API. |
21 | 21 |
"""
|
22 | 22 |
|
23 | 23 |
|
24 |
+from itertools import tee
|
|
25 |
+import logging
|
|
26 |
+ |
|
24 | 27 |
import grpc
|
25 | 28 |
|
26 | 29 |
from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
|
27 |
-from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
|
|
28 |
-from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc as re_pb2_grpc
|
|
30 |
+from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
|
31 |
+from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
|
|
32 |
+ |
|
33 |
+from .._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
|
|
29 | 34 |
|
30 |
-from ...settings import HASH
|
|
31 | 35 |
|
36 |
+class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
|
|
32 | 37 |
|
33 |
-class ContentAddressableStorageService(re_pb2_grpc.ContentAddressableStorageServicer):
|
|
38 |
+ def __init__(self, server, instances):
|
|
39 |
+ self.logger = logging.getLogger(__name__)
|
|
40 |
+ self._instances = instances
|
|
34 | 41 |
|
35 |
- def __init__(self, storage):
|
|
36 |
- self._storage = storage
|
|
42 |
+ remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(self, server)
|
|
37 | 43 |
|
38 | 44 |
def FindMissingBlobs(self, request, context):
|
39 |
- # Only one instance for now.
|
|
40 |
- storage = self._storage
|
|
41 |
- return re_pb2.FindMissingBlobsResponse(
|
|
42 |
- missing_blob_digests=storage.missing_blobs(request.blob_digests))
|
|
45 |
+ try:
|
|
46 |
+ instance = self._get_instance(request.instance_name)
|
|
47 |
+ return instance.find_missing_blobs(request.blob_digests)
|
|
48 |
+ |
|
49 |
+ except InvalidArgumentError as e:
|
|
50 |
+ self.logger.error(e)
|
|
51 |
+ context.set_details(str(e))
|
|
52 |
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
|
53 |
+ |
|
54 |
+ return remote_execution_pb2.FindMissingBlobsResponse()
|
|
43 | 55 |
|
44 | 56 |
def BatchUpdateBlobs(self, request, context):
|
45 |
- # Only one instance for now.
|
|
46 |
- storage = self._storage
|
|
47 |
- requests = []
|
|
48 |
- for request_proto in request.requests:
|
|
49 |
- requests.append((request_proto.digest, request_proto.data))
|
|
50 |
- response = re_pb2.BatchUpdateBlobsResponse()
|
|
51 |
- for (digest, _), status in zip(requests, storage.bulk_update_blobs(requests)):
|
|
52 |
- response_proto = response.responses.add()
|
|
53 |
- response_proto.digest.CopyFrom(digest)
|
|
54 |
- response_proto.status.CopyFrom(status)
|
|
55 |
- return response
|
|
57 |
+ try:
|
|
58 |
+ instance = self._get_instance(request.instance_name)
|
|
59 |
+ return instance.batch_update_blobs(request.requests)
|
|
60 |
+ |
|
61 |
+ except InvalidArgumentError as e:
|
|
62 |
+ self.logger.error(e)
|
|
63 |
+ context.set_details(str(e))
|
|
64 |
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
|
65 |
+ |
|
66 |
+ return remote_execution_pb2.BatchReadBlobsResponse()
|
|
67 |
+ |
|
68 |
+ def _get_instance(self, instance_name):
|
|
69 |
+ try:
|
|
70 |
+ return self._instances[instance_name]
|
|
71 |
+ |
|
72 |
+ except KeyError:
|
|
73 |
+ raise InvalidArgumentError("Invalid instance name: {}".format(instance_name))
|
|
56 | 74 |
|
57 | 75 |
|
58 | 76 |
class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
|
59 | 77 |
|
60 |
- BLOCK_SIZE = 1 * 1024 * 1024 # 1 MB block size
|
|
78 |
+ def __init__(self, server, instances):
|
|
79 |
+ self.logger = logging.getLogger(__name__)
|
|
80 |
+ self._instances = instances
|
|
61 | 81 |
|
62 |
- def __init__(self, storage):
|
|
63 |
- self._storage = storage
|
|
82 |
+ bytestream_pb2_grpc.add_ByteStreamServicer_to_server(self, server)
|
|
64 | 83 |
|
65 | 84 |
def Read(self, request, context):
|
66 |
- # Only one instance for now.
|
|
67 |
- storage = self._storage
|
|
68 |
- |
|
69 |
- # Parse/verify resource name.
|
|
70 |
- # Read resource names look like "[instance/]blobs/abc123hash/99".
|
|
71 |
- path = request.resource_name.split("/")
|
|
72 |
- if len(path) == 3:
|
|
73 |
- path = [""] + path
|
|
74 |
- if len(path) != 4 or path[1] != "blobs" or not path[3].isdigit():
|
|
75 |
- context.abort(grpc.StatusCode.NOT_FOUND, "Invalid resource name")
|
|
76 |
- # instance_name = path[0]
|
|
77 |
- digest = re_pb2.Digest(hash=path[2], size_bytes=int(path[3]))
|
|
78 |
- |
|
79 |
- # Check the given read offset and limit.
|
|
80 |
- if request.read_offset < 0 or request.read_offset > digest.size_bytes:
|
|
81 |
- context.abort(grpc.StatusCode.OUT_OF_RANGE, "Read offset out of range")
|
|
82 |
- elif request.read_limit == 0:
|
|
83 |
- bytes_remaining = digest.size_bytes - request.read_offset
|
|
84 |
- elif request.read_limit > 0:
|
|
85 |
- bytes_remaining = request.read_limit
|
|
86 |
- else:
|
|
87 |
- context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Negative read_limit is invalid")
|
|
88 |
- |
|
89 |
- # Read the blob from storage and send its contents to the client.
|
|
90 |
- result = storage.get_blob(digest)
|
|
91 |
- if result is None:
|
|
92 |
- context.abort(grpc.StatusCode.NOT_FOUND, "Blob not found")
|
|
93 |
- elif result.seekable():
|
|
94 |
- result.seek(request.read_offset)
|
|
95 |
- else:
|
|
96 |
- result.read(request.read_offset)
|
|
97 |
- while bytes_remaining > 0:
|
|
98 |
- yield bytestream_pb2.ReadResponse(
|
|
99 |
- data=result.read(min(self.BLOCK_SIZE, bytes_remaining)))
|
|
100 |
- bytes_remaining -= self.BLOCK_SIZE
|
|
101 |
- |
|
102 |
- def Write(self, request_iterator, context):
|
|
103 |
- # Only one instance for now.
|
|
104 |
- storage = self._storage
|
|
105 |
- |
|
106 |
- requests = iter(request_iterator)
|
|
107 |
- first_request = next(requests)
|
|
108 |
- if first_request.write_offset != 0:
|
|
109 |
- context.abort(grpc.StatusCode.UNIMPLEMENTED, "Nonzero write offset is unsupported")
|
|
110 |
- |
|
111 |
- # Parse/verify resource name.
|
|
112 |
- # Write resource names look like "[instance/]uploads/SOME-GUID/blobs/abc123hash/99".
|
|
113 |
- path = first_request.resource_name.split("/")
|
|
114 |
- if path[0] == "uploads":
|
|
115 |
- path = [""] + path
|
|
116 |
- if len(path) < 6 or path[1] != "uploads" or path[3] != "blobs" or not path[5].isdigit():
|
|
117 |
- context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Invalid resource name")
|
|
118 |
- # instance_name = path[0]
|
|
119 |
- digest = re_pb2.Digest(hash=path[4], size_bytes=int(path[5]))
|
|
120 |
- |
|
121 |
- # Start the write session and write the first request's data.
|
|
122 |
- write_session = storage.begin_write(digest)
|
|
123 |
- write_session.write(first_request.data)
|
|
124 |
- hash_ = HASH(first_request.data)
|
|
125 |
- bytes_written = len(first_request.data)
|
|
126 |
- done = first_request.finish_write
|
|
127 |
- |
|
128 |
- # Handle subsequent write requests.
|
|
129 |
- for request in requests:
|
|
130 |
- if done:
|
|
131 |
- context.abort(grpc.StatusCode.INVALID_ARGUMENT,
|
|
132 |
- "Write request sent after write finished")
|
|
133 |
- elif request.write_offset != bytes_written:
|
|
134 |
- context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Invalid write offset")
|
|
135 |
- elif request.resource_name and request.resource_name != first_request.resource_name:
|
|
136 |
- context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Resource name changed mid-write")
|
|
137 |
- done = request.finish_write
|
|
138 |
- bytes_written += len(request.data)
|
|
139 |
- if bytes_written > digest.size_bytes:
|
|
140 |
- context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Wrote too much data to blob")
|
|
141 |
- write_session.write(request.data)
|
|
142 |
- hash_.update(request.data)
|
|
143 |
- |
|
144 |
- # Check that the data matches the provided digest.
|
|
145 |
- if bytes_written != digest.size_bytes or not done:
|
|
146 |
- context.abort(grpc.StatusCode.UNIMPLEMENTED,
|
|
147 |
- "Cannot close stream before finishing write")
|
|
148 |
- elif hash_.hexdigest() != digest.hash:
|
|
149 |
- context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Data does not match hash")
|
|
150 |
- storage.commit_write(digest, write_session)
|
|
151 |
- return bytestream_pb2.WriteResponse(committed_size=bytes_written)
|
|
85 |
+ try:
|
|
86 |
+ path = request.resource_name.split("/")
|
|
87 |
+ instance_name = path[0]
|
|
88 |
+ |
|
89 |
+ # TODO: Decide on default instance name
|
|
90 |
+ if path[0] == "blobs":
|
|
91 |
+ if len(path) < 3 or not path[2].isdigit():
|
|
92 |
+ raise InvalidArgumentError("Invalid resource name: {}".format(context.resource_name))
|
|
93 |
+ instance_name = ""
|
|
94 |
+ |
|
95 |
+ elif path[1] == "blobs":
|
|
96 |
+ if len(path) < 4 or not path[3].isdigit():
|
|
97 |
+ raise InvalidArgumentError("Invalid resource name: {}".format(context.resource_name))
|
|
98 |
+ |
|
99 |
+ else:
|
|
100 |
+ raise InvalidArgumentError("Invalid resource name: {}".format(context.resource_name))
|
|
101 |
+ |
|
102 |
+ instance = self._get_instance(instance_name)
|
|
103 |
+ yield from instance.read(path,
|
|
104 |
+ request.read_offset,
|
|
105 |
+ request.read_limit)
|
|
106 |
+ |
|
107 |
+ except InvalidArgumentError as e:
|
|
108 |
+ self.logger.error(e)
|
|
109 |
+ context.set_details(str(e))
|
|
110 |
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
|
111 |
+ yield bytestream_pb2.ReadResponse()
|
|
112 |
+ |
|
113 |
+ except NotFoundError as e:
|
|
114 |
+ self.logger.error(e)
|
|
115 |
+ context.set_details(str(e))
|
|
116 |
+ context.set_code(grpc.StatusCode.NOT_FOUND)
|
|
117 |
+ yield bytestream_pb2.ReadResponse()
|
|
118 |
+ |
|
119 |
+ except OutOfRangeError as e:
|
|
120 |
+ self.logger.error(e)
|
|
121 |
+ context.set_details(str(e))
|
|
122 |
+ context.set_code(grpc.StatusCode.OUT_OF_RANGE)
|
|
123 |
+ yield bytestream_pb2.ReadResponse()
|
|
124 |
+ |
|
125 |
+ def Write(self, requests, context):
|
|
126 |
+ try:
|
|
127 |
+ requests, request_probe = tee(requests, 2)
|
|
128 |
+ first_request = next(request_probe)
|
|
129 |
+ |
|
130 |
+ path = first_request.resource_name.split("/")
|
|
131 |
+ |
|
132 |
+ instance_name = path[0]
|
|
133 |
+ |
|
134 |
+ # TODO: Sort out no instance name
|
|
135 |
+ if path[0] == "uploads":
|
|
136 |
+ if len(path) < 5 or path[2] != "blobs" or not path[4].isdigit():
|
|
137 |
+ raise InvalidArgumentError("Invalid resource name: {}".format(context.resource_name))
|
|
138 |
+ instance_name = ""
|
|
139 |
+ |
|
140 |
+ elif path[1] == "uploads":
|
|
141 |
+ if len(path) < 6 or path[3] != "blobs" or not path[5].isdigit():
|
|
142 |
+ raise InvalidArgumentError("Invalid resource name: {}".format(context.resource_name))
|
|
143 |
+ |
|
144 |
+ else:
|
|
145 |
+ raise InvalidArgumentError("Invalid resource name: {}".format(context.resource_name))
|
|
146 |
+ |
|
147 |
+ instance = self._get_instance(instance_name)
|
|
148 |
+ return instance.write(requests)
|
|
149 |
+ |
|
150 |
+ except NotImplementedError as e:
|
|
151 |
+ self.logger.error(e)
|
|
152 |
+ context.set_details(str(e))
|
|
153 |
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
|
154 |
+ |
|
155 |
+ except InvalidArgumentError as e:
|
|
156 |
+ self.logger.error(e)
|
|
157 |
+ context.set_details(str(e))
|
|
158 |
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
|
159 |
+ |
|
160 |
+ except NotFoundError as e:
|
|
161 |
+ self.logger.error(e)
|
|
162 |
+ context.set_details(str(e))
|
|
163 |
+ context.set_code(grpc.StatusCode.NOT_FOUND)
|
|
164 |
+ |
|
165 |
+ return bytestream_pb2.WriteResponse()
|
|
166 |
+ |
|
167 |
+ def _get_instance(self, instance_name):
|
|
168 |
+ try:
|
|
169 |
+ return self._instances[instance_name]
|
|
170 |
+ |
|
171 |
+ except KeyError:
|
|
172 |
+ raise InvalidArgumentError("Invalid instance name: {}".format(instance_name))
|
... | ... | @@ -14,31 +14,36 @@ |
14 | 14 |
|
15 | 15 |
|
16 | 16 |
"""
|
17 |
-BuildGrid Instance
|
|
17 |
+Execution Controller
|
|
18 | 18 |
==================
|
19 | 19 |
|
20 |
-An instance of the BuildGrid server.
|
|
20 |
+An instance of the Execution controller.
|
|
21 | 21 |
|
22 |
-Contains scheduler, execution instance and an interface to the bots.
|
|
22 |
+All this stuff you need to make the execution service work.
|
|
23 |
+ |
|
24 |
+Contains scheduler, execution instance, an interface to the bots
|
|
25 |
+and an operations instance.
|
|
23 | 26 |
"""
|
24 | 27 |
|
25 | 28 |
|
26 | 29 |
import logging
|
27 | 30 |
|
28 |
-from .execution.instance import ExecutionInstance
|
|
29 | 31 |
from .scheduler import Scheduler
|
30 | 32 |
from .bots.instance import BotsInterface
|
33 |
+from .execution.instance import ExecutionInstance
|
|
34 |
+from .operations.instance import OperationsInstance
|
|
31 | 35 |
|
32 | 36 |
|
33 |
-class BuildGridInstance(ExecutionInstance, BotsInterface):
|
|
37 |
+class ExecutionController(ExecutionInstance, BotsInterface, OperationsInstance):
|
|
34 | 38 |
|
35 |
- def __init__(self, action_cache=None, cas_storage=None):
|
|
39 |
+ def __init__(self, action_cache=None, storage=None):
|
|
36 | 40 |
scheduler = Scheduler(action_cache)
|
37 | 41 |
|
38 | 42 |
self.logger = logging.getLogger(__name__)
|
39 | 43 |
|
40 |
- ExecutionInstance.__init__(self, scheduler, cas_storage)
|
|
44 |
+ ExecutionInstance.__init__(self, scheduler, storage)
|
|
41 | 45 |
BotsInterface.__init__(self, scheduler)
|
46 |
+ OperationsInstance.__init__(self, scheduler)
|
|
42 | 47 |
|
43 | 48 |
def stream_operation_updates(self, message_queue, operation_name):
|
44 | 49 |
operation = message_queue.get()
|
... | ... | @@ -35,10 +35,12 @@ from .._exceptions import InvalidArgumentError |
35 | 35 |
|
36 | 36 |
class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
|
37 | 37 |
|
38 |
- def __init__(self, instances):
|
|
38 |
+ def __init__(self, server, instances):
|
|
39 | 39 |
self.logger = logging.getLogger(__name__)
|
40 | 40 |
self._instances = instances
|
41 | 41 |
|
42 |
+ remote_execution_pb2_grpc.add_ExecutionServicer_to_server(self, server)
|
|
43 |
+ |
|
42 | 44 |
def Execute(self, request, context):
|
43 | 45 |
try:
|
44 | 46 |
message_queue = queue.Queue()
|
... | ... | @@ -32,10 +32,12 @@ from .._exceptions import InvalidArgumentError |
32 | 32 |
|
33 | 33 |
class OperationsService(operations_pb2_grpc.OperationsServicer):
|
34 | 34 |
|
35 |
- def __init__(self, instances):
|
|
35 |
+ def __init__(self, server, instances):
|
|
36 | 36 |
self._instances = instances
|
37 | 37 |
self.logger = logging.getLogger(__name__)
|
38 | 38 |
|
39 |
+ operations_pb2_grpc.add_OperationsServicer_to_server(self, server)
|
|
40 |
+ |
|
39 | 41 |
def GetOperation(self, request, context):
|
40 | 42 |
try:
|
41 | 43 |
name = request.name
|
... | ... | @@ -20,34 +20,70 @@ import grpc |
20 | 20 |
from buildgrid._protos.buildstream.v2 import buildstream_pb2
|
21 | 21 |
from buildgrid._protos.buildstream.v2 import buildstream_pb2_grpc
|
22 | 22 |
|
23 |
-from .._exceptions import NotFoundError
|
|
23 |
+from .._exceptions import InvalidArgumentError, NotFoundError
|
|
24 | 24 |
|
25 | 25 |
|
26 | 26 |
class ReferenceStorageService(buildstream_pb2_grpc.ReferenceStorageServicer):
|
27 | 27 |
|
28 |
- def __init__(self, reference_cache):
|
|
29 |
- self._reference_cache = reference_cache
|
|
28 |
+ def __init__(self, server, instances):
|
|
30 | 29 |
self.logger = logging.getLogger(__name__)
|
31 | 30 |
|
31 |
+ self._instances = instances
|
|
32 |
+ |
|
33 |
+ buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server(self, server)
|
|
34 |
+ |
|
32 | 35 |
def GetReference(self, request, context):
|
33 | 36 |
try:
|
37 |
+ instance = self._get_instance(request.instance_name)
|
|
38 |
+ digest = instance.get_digest_reference(request.key)
|
|
34 | 39 |
response = buildstream_pb2.GetReferenceResponse()
|
35 |
- response.digest.CopyFrom(self._reference_cache.get_digest_reference(request.key))
|
|
40 |
+ response.digest.CopyFrom(digest)
|
|
36 | 41 |
return response
|
37 | 42 |
|
43 |
+ except InvalidArgumentError as e:
|
|
44 |
+ self.logger.error(e)
|
|
45 |
+ context.set_details(str(e))
|
|
46 |
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
|
47 |
+ |
|
38 | 48 |
except NotFoundError:
|
39 | 49 |
context.set_code(grpc.StatusCode.NOT_FOUND)
|
40 | 50 |
|
51 |
+ return buildstream_pb2.GetReferenceResponse()
|
|
52 |
+ |
|
41 | 53 |
def UpdateReference(self, request, context):
|
42 | 54 |
try:
|
55 |
+ instance = self._get_instance(request.instance_name)
|
|
56 |
+ digest = request.digest
|
|
57 |
+ |
|
43 | 58 |
for key in request.keys:
|
44 |
- self._reference_cache.update_reference(key, request.digest)
|
|
59 |
+ instance.update_reference(key, digest)
|
|
45 | 60 |
|
46 |
- return buildstream_pb2.UpdateReferenceResponse()
|
|
61 |
+ except InvalidArgumentError as e:
|
|
62 |
+ self.logger.error(e)
|
|
63 |
+ context.set_details(str(e))
|
|
64 |
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
|
47 | 65 |
|
48 | 66 |
except NotImplementedError:
|
49 | 67 |
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
50 | 68 |
|
69 |
+ return buildstream_pb2.UpdateReferenceResponse()
|
|
70 |
+ |
|
51 | 71 |
def Status(self, request, context):
|
52 |
- allow_updates = self._reference_cache.allow_updates
|
|
53 |
- return buildstream_pb2.StatusResponse(allow_updates=allow_updates)
|
|
72 |
+ try:
|
|
73 |
+ instance = self._get_instance(request.instance_name)
|
|
74 |
+ allow_updates = instance.allow_updates
|
|
75 |
+ return buildstream_pb2.StatusResponse(allow_updates=allow_updates)
|
|
76 |
+ |
|
77 |
+ except InvalidArgumentError as e:
|
|
78 |
+ self.logger.error(e)
|
|
79 |
+ context.set_details(str(e))
|
|
80 |
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
|
81 |
+ |
|
82 |
+ return buildstream_pb2.StatusResponse()
|
|
83 |
+ |
|
84 |
+ def _get_instance(self, instance_name):
|
|
85 |
+ try:
|
|
86 |
+ return self._instances[instance_name]
|
|
87 |
+ |
|
88 |
+ except KeyError:
|
|
89 |
+ raise InvalidArgumentError("Invalid instance name: {}".format(instance_name))
|
1 |
- |
|
2 | 1 |
.. _dummy-build:
|
3 | 2 |
|
4 | 3 |
Dummy build
|
... | ... | @@ -8,7 +7,7 @@ In one terminal, start a server: |
8 | 7 |
|
9 | 8 |
.. code-block:: sh
|
10 | 9 |
|
11 |
- bgd server start --allow-insecure
|
|
10 |
+ bgd server start buildgrid/_app/settings/default.yml
|
|
12 | 11 |
|
13 | 12 |
In another terminal, send a request for work:
|
14 | 13 |
|
1 |
- |
|
2 | 1 |
.. _simple-build:
|
3 | 2 |
|
4 | 3 |
Simple build
|
... | ... | @@ -27,7 +26,7 @@ Now start a BuildGrid server, passing it a directory it can write a CAS to: |
27 | 26 |
|
28 | 27 |
.. code-block:: sh
|
29 | 28 |
|
30 |
- bgd server start --allow-insecure --cas disk --cas-cache disk --cas-disk-directory /path/to/empty/directory
|
|
29 |
+ bgd server start buildgrid/_app/settings/default.yml
|
|
31 | 30 |
|
32 | 31 |
Start the following bot session:
|
33 | 32 |
|
... | ... | @@ -114,6 +114,7 @@ setup( |
114 | 114 |
'protobuf',
|
115 | 115 |
'grpcio',
|
116 | 116 |
'Click',
|
117 |
+ 'pyaml',
|
|
117 | 118 |
'boto3 < 1.8.0',
|
118 | 119 |
'botocore < 1.11.0',
|
119 | 120 |
'xdg',
|
... | ... | @@ -18,17 +18,25 @@ |
18 | 18 |
# pylint: disable=redefined-outer-name
|
19 | 19 |
|
20 | 20 |
import io
|
21 |
+from unittest import mock
|
|
21 | 22 |
|
23 |
+import grpc
|
|
24 |
+from grpc._server import _Context
|
|
22 | 25 |
import pytest
|
23 | 26 |
|
24 | 27 |
from buildgrid._protos.google.bytestream import bytestream_pb2
|
25 | 28 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
|
26 | 29 |
from buildgrid.server.cas.storage.storage_abc import StorageABC
|
27 |
-from buildgrid.server.cas.service import ByteStreamService
|
|
28 |
-from buildgrid.server.cas.service import ContentAddressableStorageService
|
|
30 |
+from buildgrid.server.cas.instance import ByteStreamInstance, ContentAddressableStorageInstance
|
|
31 |
+from buildgrid.server.cas import service
|
|
32 |
+from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
|
|
29 | 33 |
from buildgrid.settings import HASH
|
30 | 34 |
|
31 | 35 |
|
36 |
+context = mock.create_autospec(_Context)
|
|
37 |
+server = mock.create_autospec(grpc.server)
|
|
38 |
+ |
|
39 |
+ |
|
32 | 40 |
class SimpleStorage(StorageABC):
|
33 | 41 |
"""Storage provider wrapper around a dictionary.
|
34 | 42 |
|
... | ... | @@ -61,28 +69,18 @@ class SimpleStorage(StorageABC): |
61 | 69 |
self.data[(digest.hash, digest.size_bytes)] = data
|
62 | 70 |
|
63 | 71 |
|
64 |
-class MockObject:
|
|
65 |
- def __init__(self):
|
|
66 |
- self.abort = None
|
|
67 |
- |
|
68 |
- |
|
69 |
-class MockException(Exception):
|
|
70 |
- pass
|
|
71 |
- |
|
72 |
- |
|
73 |
-def raise_mock_exception(*args, **kwargs):
|
|
74 |
- raise MockException()
|
|
75 |
- |
|
76 |
- |
|
77 | 72 |
test_strings = [b"", b"hij"]
|
78 | 73 |
instances = ["", "test_inst"]
|
79 | 74 |
|
80 | 75 |
|
81 | 76 |
@pytest.mark.parametrize("data_to_read", test_strings)
|
82 | 77 |
@pytest.mark.parametrize("instance", instances)
|
83 |
-def test_bytestream_read(data_to_read, instance):
|
|
78 |
+@mock.patch.object(service, 'bytestream_pb2_grpc', autospec=True)
|
|
79 |
+def test_bytestream_read(mocked, data_to_read, instance):
|
|
84 | 80 |
storage = SimpleStorage([b"abc", b"defg", data_to_read])
|
85 |
- servicer = ByteStreamService(storage)
|
|
81 |
+ |
|
82 |
+ bs_instance = ByteStreamInstance(storage)
|
|
83 |
+ servicer = ByteStreamService(server, {instance: bs_instance})
|
|
86 | 84 |
|
87 | 85 |
request = bytestream_pb2.ReadRequest()
|
88 | 86 |
if instance != "":
|
... | ... | @@ -96,11 +94,13 @@ def test_bytestream_read(data_to_read, instance): |
96 | 94 |
|
97 | 95 |
|
98 | 96 |
@pytest.mark.parametrize("instance", instances)
|
99 |
-def test_bytestream_read_many(instance):
|
|
97 |
+@mock.patch.object(service, 'bytestream_pb2_grpc', autospec=True)
|
|
98 |
+def test_bytestream_read_many(mocked, instance):
|
|
100 | 99 |
data_to_read = b"testing" * 10000
|
101 | 100 |
|
102 | 101 |
storage = SimpleStorage([b"abc", b"defg", data_to_read])
|
103 |
- servicer = ByteStreamService(storage)
|
|
102 |
+ bs_instance = ByteStreamInstance(storage)
|
|
103 |
+ servicer = ByteStreamService(server, {instance: bs_instance})
|
|
104 | 104 |
|
105 | 105 |
request = bytestream_pb2.ReadRequest()
|
106 | 106 |
if instance != "":
|
... | ... | @@ -115,9 +115,11 @@ def test_bytestream_read_many(instance): |
115 | 115 |
|
116 | 116 |
@pytest.mark.parametrize("instance", instances)
|
117 | 117 |
@pytest.mark.parametrize("extra_data", ["", "/", "/extra/data"])
|
118 |
-def test_bytestream_write(instance, extra_data):
|
|
118 |
+@mock.patch.object(service, 'bytestream_pb2_grpc', autospec=True)
|
|
119 |
+def test_bytestream_write(mocked, instance, extra_data):
|
|
119 | 120 |
storage = SimpleStorage()
|
120 |
- servicer = ByteStreamService(storage)
|
|
121 |
+ bs_instance = ByteStreamInstance(storage)
|
|
122 |
+ servicer = ByteStreamService(server, {instance: bs_instance})
|
|
121 | 123 |
|
122 | 124 |
resource_name = ""
|
123 | 125 |
if instance != "":
|
... | ... | @@ -137,9 +139,11 @@ def test_bytestream_write(instance, extra_data): |
137 | 139 |
assert storage.data[(hash_, 6)] == b'abcdef'
|
138 | 140 |
|
139 | 141 |
|
140 |
-def test_bytestream_write_rejects_wrong_hash():
|
|
142 |
+@mock.patch.object(service, 'bytestream_pb2_grpc', autospec=True)
|
|
143 |
+def test_bytestream_write_rejects_wrong_hash(mocked):
|
|
141 | 144 |
storage = SimpleStorage()
|
142 |
- servicer = ByteStreamService(storage)
|
|
145 |
+ bs_instance = ByteStreamInstance(storage)
|
|
146 |
+ servicer = ByteStreamService(server, {"": bs_instance})
|
|
143 | 147 |
|
144 | 148 |
data = b'some data'
|
145 | 149 |
wrong_hash = HASH(b'incorrect').hexdigest()
|
... | ... | @@ -148,18 +152,18 @@ def test_bytestream_write_rejects_wrong_hash(): |
148 | 152 |
bytestream_pb2.WriteRequest(resource_name=resource_name, data=data, finish_write=True)
|
149 | 153 |
]
|
150 | 154 |
|
151 |
- context = MockObject()
|
|
152 |
- context.abort = raise_mock_exception
|
|
153 |
- with pytest.raises(MockException):
|
|
154 |
- servicer.Write(requests, context)
|
|
155 |
+ servicer.Write(requests, context)
|
|
156 |
+ context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
|
|
155 | 157 |
|
156 | 158 |
assert len(storage.data) is 0
|
157 | 159 |
|
158 | 160 |
|
159 | 161 |
@pytest.mark.parametrize("instance", instances)
|
160 |
-def test_cas_find_missing_blobs(instance):
|
|
162 |
+@mock.patch.object(service, 'remote_execution_pb2_grpc', autospec=True)
|
|
163 |
+def test_cas_find_missing_blobs(mocked, instance):
|
|
161 | 164 |
storage = SimpleStorage([b'abc', b'def'])
|
162 |
- servicer = ContentAddressableStorageService(storage)
|
|
165 |
+ cas_instance = ContentAddressableStorageInstance(storage)
|
|
166 |
+ servicer = ContentAddressableStorageService(server, {instance: cas_instance})
|
|
163 | 167 |
digests = [
|
164 | 168 |
re_pb2.Digest(hash=HASH(b'def').hexdigest(), size_bytes=3),
|
165 | 169 |
re_pb2.Digest(hash=HASH(b'ghij').hexdigest(), size_bytes=4)
|
... | ... | @@ -171,9 +175,12 @@ def test_cas_find_missing_blobs(instance): |
171 | 175 |
|
172 | 176 |
|
173 | 177 |
@pytest.mark.parametrize("instance", instances)
|
174 |
-def test_cas_batch_update_blobs(instance):
|
|
178 |
+@mock.patch.object(service, 'remote_execution_pb2_grpc', autospec=True)
|
|
179 |
+def test_cas_batch_update_blobs(mocked, instance):
|
|
175 | 180 |
storage = SimpleStorage()
|
176 |
- servicer = ContentAddressableStorageService(storage)
|
|
181 |
+ cas_instance = ContentAddressableStorageInstance(storage)
|
|
182 |
+ servicer = ContentAddressableStorageService(server, {instance: cas_instance})
|
|
183 |
+ |
|
177 | 184 |
update_requests = [
|
178 | 185 |
re_pb2.BatchUpdateBlobsRequest.Request(
|
179 | 186 |
digest=re_pb2.Digest(hash=HASH(b'abc').hexdigest(), size_bytes=3), data=b'abc'),
|
... | ... | @@ -181,16 +188,21 @@ def test_cas_batch_update_blobs(instance): |
181 | 188 |
digest=re_pb2.Digest(hash="invalid digest!", size_bytes=1000),
|
182 | 189 |
data=b'wrong data')
|
183 | 190 |
]
|
191 |
+ |
|
184 | 192 |
request = re_pb2.BatchUpdateBlobsRequest(instance_name=instance, requests=update_requests)
|
185 | 193 |
response = servicer.BatchUpdateBlobs(request, None)
|
186 | 194 |
assert len(response.responses) == 2
|
195 |
+ |
|
187 | 196 |
for blob_response in response.responses:
|
188 | 197 |
if blob_response.digest == update_requests[0].digest:
|
189 | 198 |
assert blob_response.status.code == 0
|
199 |
+ |
|
190 | 200 |
elif blob_response.digest == update_requests[1].digest:
|
191 | 201 |
assert blob_response.status.code != 0
|
202 |
+ |
|
192 | 203 |
else:
|
193 | 204 |
raise Exception("Unexpected blob response")
|
205 |
+ |
|
194 | 206 |
assert len(storage.data) == 1
|
195 | 207 |
assert (update_requests[0].digest.hash, 3) in storage.data
|
196 | 208 |
assert storage.data[(update_requests[0].digest.hash, 3)] == b'abc'
|
... | ... | @@ -19,18 +19,28 @@ |
19 | 19 |
|
20 | 20 |
import tempfile
|
21 | 21 |
|
22 |
+from unittest import mock
|
|
23 |
+ |
|
22 | 24 |
import boto3
|
25 |
+import grpc
|
|
26 |
+from grpc._server import _Context
|
|
23 | 27 |
import pytest
|
24 |
- |
|
25 | 28 |
from moto import mock_s3
|
26 | 29 |
|
27 | 30 |
from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
|
31 |
+from buildgrid.server.cas import service
|
|
32 |
+from buildgrid.server.cas.instance import ByteStreamInstance, ContentAddressableStorageInstance
|
|
33 |
+from buildgrid.server.cas.storage import remote
|
|
28 | 34 |
from buildgrid.server.cas.storage.lru_memory_cache import LRUMemoryCache
|
29 | 35 |
from buildgrid.server.cas.storage.disk import DiskStorage
|
30 | 36 |
from buildgrid.server.cas.storage.s3 import S3Storage
|
31 | 37 |
from buildgrid.server.cas.storage.with_cache import WithCacheStorage
|
32 | 38 |
from buildgrid.settings import HASH
|
33 | 39 |
|
40 |
+ |
|
41 |
+context = mock.create_autospec(_Context)
|
|
42 |
+server = mock.create_autospec(grpc.server)
|
|
43 |
+ |
|
34 | 44 |
abc = b"abc"
|
35 | 45 |
abc_digest = Digest(hash=HASH(abc).hexdigest(), size_bytes=3)
|
36 | 46 |
defg = b"defg"
|
... | ... | @@ -45,10 +55,64 @@ def write(storage, digest, blob): |
45 | 55 |
storage.commit_write(digest, session)
|
46 | 56 |
|
47 | 57 |
|
58 |
+class MockCASStorage(ByteStreamInstance, ContentAddressableStorageInstance):
|
|
59 |
+ |
|
60 |
+ def __init__(self):
|
|
61 |
+ storage = LRUMemoryCache(256)
|
|
62 |
+ super().__init__(storage)
|
|
63 |
+ |
|
64 |
+ |
|
65 |
+# Mock a CAS server with LRUStorage to return "calls" made to it
|
|
66 |
+class MockStubServer:
|
|
67 |
+ |
|
68 |
+ def __init__(self):
|
|
69 |
+ instances = {"": MockCASStorage(), "dna": MockCASStorage()}
|
|
70 |
+ self._requests = []
|
|
71 |
+ with mock.patch.object(service, 'bytestream_pb2_grpc'):
|
|
72 |
+ self._bs_service = service.ByteStreamService(server, instances)
|
|
73 |
+ with mock.patch.object(service, 'remote_execution_pb2_grpc'):
|
|
74 |
+ self._cas_service = service.ContentAddressableStorageService(server, instances)
|
|
75 |
+ |
|
76 |
+ def Read(self, request):
|
|
77 |
+ yield from self._bs_service.Read(request, context)
|
|
78 |
+ |
|
79 |
+ def Write(self, request):
|
|
80 |
+ self._requests.append(request)
|
|
81 |
+ if request.finish_write:
|
|
82 |
+ response = self._bs_service.Write(self._requests, context)
|
|
83 |
+ self._requests = []
|
|
84 |
+ return response
|
|
85 |
+ |
|
86 |
+ return None
|
|
87 |
+ |
|
88 |
+ def FindMissingBlobs(self, request):
|
|
89 |
+ return self._cas_service.FindMissingBlobs(request, context)
|
|
90 |
+ |
|
91 |
+ def BatchUpdateBlobs(self, request):
|
|
92 |
+ return self._cas_service.BatchUpdateBlobs(request, context)
|
|
93 |
+ |
|
94 |
+ |
|
95 |
+# Instances of MockCASStorage
|
|
96 |
+@pytest.fixture(params=["", "dna"])
|
|
97 |
+def instance(params):
|
|
98 |
+ return {params, MockCASStorage()}
|
|
99 |
+ |
|
100 |
+ |
|
101 |
+@pytest.fixture()
|
|
102 |
+@mock.patch.object(remote, 'bytestream_pb2_grpc')
|
|
103 |
+@mock.patch.object(remote, 'remote_execution_pb2_grpc')
|
|
104 |
+def remote_storage(mock_bs_grpc, mock_re_pb2_grpc):
|
|
105 |
+ mock_server = MockStubServer()
|
|
106 |
+ storage = remote.RemoteStorage(instance)
|
|
107 |
+ storage._stub_bs = mock_server
|
|
108 |
+ storage._stub_cas = mock_server
|
|
109 |
+ yield storage
|
|
110 |
+ |
|
111 |
+ |
|
48 | 112 |
# General tests for all storage providers
|
49 | 113 |
|
50 | 114 |
|
51 |
-@pytest.fixture(params=["lru", "disk", "s3", "lru_disk", "disk_s3"])
|
|
115 |
+@pytest.fixture(params=["lru", "disk", "s3", "lru_disk", "disk_s3", "remote"])
|
|
52 | 116 |
def any_storage(request):
|
53 | 117 |
if request.param == "lru":
|
54 | 118 |
yield LRUMemoryCache(256)
|
... | ... | @@ -70,6 +134,14 @@ def any_storage(request): |
70 | 134 |
with mock_s3():
|
71 | 135 |
boto3.resource('s3').create_bucket(Bucket="testing")
|
72 | 136 |
yield WithCacheStorage(DiskStorage(path), S3Storage("testing"))
|
137 |
+ elif request.param == "remote":
|
|
138 |
+ with mock.patch.object(remote, 'bytestream_pb2_grpc'):
|
|
139 |
+ with mock.patch.object(remote, 'remote_execution_pb2_grpc'):
|
|
140 |
+ mock_server = MockStubServer()
|
|
141 |
+ storage = remote.RemoteStorage(instance)
|
|
142 |
+ storage._stub_bs = mock_server
|
|
143 |
+ storage._stub_cas = mock_server
|
|
144 |
+ yield storage
|
|
73 | 145 |
|
74 | 146 |
|
75 | 147 |
def test_initially_empty(any_storage):
|
... | ... | @@ -26,10 +26,14 @@ import pytest |
26 | 26 |
|
27 | 27 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
28 | 28 |
from buildgrid.server.cas.storage import lru_memory_cache
|
29 |
+from buildgrid.server.actioncache import service
|
|
29 | 30 |
from buildgrid.server.actioncache.storage import ActionCache
|
30 | 31 |
from buildgrid.server.actioncache.service import ActionCacheService
|
31 | 32 |
|
32 | 33 |
|
34 |
+server = mock.create_autospec(grpc.server)
|
|
35 |
+ |
|
36 |
+ |
|
33 | 37 |
# Can mock this
|
34 | 38 |
@pytest.fixture
|
35 | 39 |
def context():
|
... | ... | @@ -42,36 +46,41 @@ def cas(): |
42 | 46 |
|
43 | 47 |
|
44 | 48 |
@pytest.fixture
|
45 |
-def cache(cas):
|
|
46 |
- yield ActionCache(cas, 50)
|
|
49 |
+def cache_instances(cas):
|
|
50 |
+ yield {"": ActionCache(cas, 50)}
|
|
47 | 51 |
|
48 | 52 |
|
49 |
-def test_simple_action_result(cache, context):
|
|
50 |
- service = ActionCacheService(cache)
|
|
53 |
+def test_simple_action_result(cache_instances, context):
|
|
54 |
+ with mock.patch.object(service, 'remote_execution_pb2_grpc'):
|
|
55 |
+ ac_service = ActionCacheService(server, cache_instances)
|
|
56 |
+ |
|
57 |
+ print(cache_instances)
|
|
51 | 58 |
action_digest = remote_execution_pb2.Digest(hash='sample', size_bytes=4)
|
52 | 59 |
|
53 | 60 |
# Check that before adding the ActionResult, attempting to fetch it fails
|
54 |
- request = remote_execution_pb2.GetActionResultRequest(action_digest=action_digest)
|
|
55 |
- service.GetActionResult(request, context)
|
|
61 |
+ request = remote_execution_pb2.GetActionResultRequest(instance_name="",
|
|
62 |
+ action_digest=action_digest)
|
|
63 |
+ ac_service.GetActionResult(request, context)
|
|
56 | 64 |
context.set_code.assert_called_once_with(grpc.StatusCode.NOT_FOUND)
|
57 | 65 |
|
58 | 66 |
# Add an ActionResult to the cache
|
59 | 67 |
action_result = remote_execution_pb2.ActionResult(stdout_raw=b'example output')
|
60 | 68 |
request = remote_execution_pb2.UpdateActionResultRequest(action_digest=action_digest,
|
61 | 69 |
action_result=action_result)
|
62 |
- service.UpdateActionResult(request, context)
|
|
70 |
+ ac_service.UpdateActionResult(request, context)
|
|
63 | 71 |
|
64 | 72 |
# Check that fetching it now works
|
65 | 73 |
request = remote_execution_pb2.GetActionResultRequest(action_digest=action_digest)
|
66 |
- fetched_result = service.GetActionResult(request, context)
|
|
74 |
+ fetched_result = ac_service.GetActionResult(request, context)
|
|
67 | 75 |
assert fetched_result.stdout_raw == action_result.stdout_raw
|
68 | 76 |
|
69 | 77 |
|
70 |
-def test_disabled_update_action_result(cache, context):
|
|
78 |
+def test_disabled_update_action_result(context):
|
|
71 | 79 |
disabled_push = ActionCache(cas, 50, False)
|
72 |
- service = ActionCacheService(disabled_push)
|
|
80 |
+ with mock.patch.object(service, 'remote_execution_pb2_grpc'):
|
|
81 |
+ ac_service = ActionCacheService(server, {"": disabled_push})
|
|
73 | 82 |
|
74 |
- request = remote_execution_pb2.UpdateActionResultRequest()
|
|
75 |
- service.UpdateActionResult(request, context)
|
|
83 |
+ request = remote_execution_pb2.UpdateActionResultRequest(instance_name='')
|
|
84 |
+ ac_service.UpdateActionResult(request, context)
|
|
76 | 85 |
|
77 | 86 |
context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
|
... | ... | @@ -27,12 +27,15 @@ import pytest |
27 | 27 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
28 | 28 |
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
|
29 | 29 |
from buildgrid.server import job
|
30 |
-from buildgrid.server.instance import BuildGridInstance
|
|
30 |
+from buildgrid.server.controller import ExecutionController
|
|
31 | 31 |
from buildgrid.server.job import LeaseState
|
32 |
-from buildgrid.server.bots.instance import BotsInterface
|
|
32 |
+from buildgrid.server.bots import service
|
|
33 | 33 |
from buildgrid.server.bots.service import BotsService
|
34 | 34 |
|
35 | 35 |
|
36 |
+server = mock.create_autospec(grpc.server)
|
|
37 |
+ |
|
38 |
+ |
|
36 | 39 |
# GRPC context
|
37 | 40 |
@pytest.fixture
|
38 | 41 |
def context():
|
... | ... | @@ -55,19 +58,15 @@ def bot_session(): |
55 | 58 |
|
56 | 59 |
@pytest.fixture
|
57 | 60 |
def buildgrid():
|
58 |
- yield BuildGridInstance()
|
|
59 |
- |
|
60 |
- |
|
61 |
-@pytest.fixture
|
|
62 |
-def bots(schedule):
|
|
63 |
- yield BotsInterface(schedule)
|
|
61 |
+ yield ExecutionController()
|
|
64 | 62 |
|
65 | 63 |
|
66 | 64 |
# Instance to test
|
67 | 65 |
@pytest.fixture
|
68 | 66 |
def instance(buildgrid):
|
69 | 67 |
instances = {"": buildgrid}
|
70 |
- yield BotsService(instances)
|
|
68 |
+ with mock.patch.object(service, 'bots_pb2_grpc'):
|
|
69 |
+ yield BotsService(server, instances)
|
|
71 | 70 |
|
72 | 71 |
|
73 | 72 |
def test_create_bot_session(bot_session, context, instance):
|
... | ... | @@ -29,12 +29,16 @@ from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_p |
29 | 29 |
from buildgrid._protos.google.longrunning import operations_pb2
|
30 | 30 |
|
31 | 31 |
from buildgrid.server import job
|
32 |
-from buildgrid.server.instance import BuildGridInstance
|
|
32 |
+from buildgrid.server.controller import ExecutionController
|
|
33 | 33 |
from buildgrid.server.cas.storage import lru_memory_cache
|
34 | 34 |
from buildgrid.server.actioncache.storage import ActionCache
|
35 |
+from buildgrid.server.execution import service
|
|
35 | 36 |
from buildgrid.server.execution.service import ExecutionService
|
36 | 37 |
|
37 | 38 |
|
39 |
+server = mock.create_autospec(grpc.server)
|
|
40 |
+ |
|
41 |
+ |
|
38 | 42 |
@pytest.fixture
|
39 | 43 |
def context():
|
40 | 44 |
cxt = mock.MagicMock(spec=_Context)
|
... | ... | @@ -46,17 +50,17 @@ def buildgrid(request): |
46 | 50 |
if request.param == "action-cache":
|
47 | 51 |
storage = lru_memory_cache.LRUMemoryCache(1024 * 1024)
|
48 | 52 |
cache = ActionCache(storage, 50)
|
49 |
- |
|
50 |
- return BuildGridInstance(action_cache=cache,
|
|
51 |
- cas_storage=storage)
|
|
52 |
- return BuildGridInstance()
|
|
53 |
+ yield ExecutionController(cache, storage)
|
|
54 |
+ else:
|
|
55 |
+ yield ExecutionController()
|
|
53 | 56 |
|
54 | 57 |
|
55 | 58 |
# Instance to test
|
56 | 59 |
@pytest.fixture
|
57 | 60 |
def instance(buildgrid):
|
58 | 61 |
instances = {"": buildgrid}
|
59 |
- yield ExecutionService(instances)
|
|
62 |
+ with mock.patch.object(service, 'remote_execution_pb2_grpc'):
|
|
63 |
+ yield ExecutionService(server, instances)
|
|
60 | 64 |
|
61 | 65 |
|
62 | 66 |
@pytest.mark.parametrize("skip_cache_lookup", [True, False])
|
... | ... | @@ -28,12 +28,14 @@ from google.protobuf import any_pb2 |
28 | 28 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
29 | 29 |
from buildgrid._protos.google.longrunning import operations_pb2
|
30 | 30 |
|
31 |
-from buildgrid.server.instance import BuildGridInstance
|
|
31 |
+from buildgrid.server.controller import ExecutionController
|
|
32 | 32 |
from buildgrid.server._exceptions import InvalidArgumentError
|
33 | 33 |
|
34 |
+from buildgrid.server.operations import service
|
|
34 | 35 |
from buildgrid.server.operations.service import OperationsService
|
35 | 36 |
|
36 | 37 |
|
38 |
+server = mock.create_autospec(grpc.server)
|
|
37 | 39 |
instance_name = "blade"
|
38 | 40 |
|
39 | 41 |
|
... | ... | @@ -56,14 +58,15 @@ def execute_request(): |
56 | 58 |
|
57 | 59 |
@pytest.fixture
|
58 | 60 |
def buildgrid():
|
59 |
- yield BuildGridInstance()
|
|
61 |
+ yield ExecutionController()
|
|
60 | 62 |
|
61 | 63 |
|
62 | 64 |
# Instance to test
|
63 | 65 |
@pytest.fixture
|
64 | 66 |
def instance(buildgrid):
|
65 | 67 |
instances = {instance_name: buildgrid}
|
66 |
- yield OperationsService(instances)
|
|
68 |
+ with mock.patch.object(service, 'operations_pb2_grpc'):
|
|
69 |
+ yield OperationsService(server, instances)
|
|
67 | 70 |
|
68 | 71 |
|
69 | 72 |
# Queue an execution, get operation corresponding to that request
|
... | ... | @@ -25,10 +25,15 @@ from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_p |
25 | 25 |
from buildgrid._protos.buildstream.v2 import buildstream_pb2
|
26 | 26 |
|
27 | 27 |
from buildgrid.server.cas.storage import lru_memory_cache
|
28 |
+from buildgrid.server.referencestorage import service
|
|
28 | 29 |
from buildgrid.server.referencestorage.service import ReferenceStorageService
|
29 | 30 |
from buildgrid.server.referencestorage.storage import ReferenceCache
|
30 | 31 |
|
31 | 32 |
|
33 |
+server = mock.create_autospec(grpc.server)
|
|
34 |
+instance_name = ''
|
|
35 |
+ |
|
36 |
+ |
|
32 | 37 |
# Can mock this
|
33 | 38 |
@pytest.fixture
|
34 | 39 |
def context():
|
... | ... | @@ -45,41 +50,49 @@ def cache(cas): |
45 | 50 |
yield ReferenceCache(cas, 50)
|
46 | 51 |
|
47 | 52 |
|
48 |
-def test_simple_result(cache, context):
|
|
53 |
+@pytest.fixture
|
|
54 |
+def instance(cache):
|
|
55 |
+ instances = {instance_name: cache}
|
|
56 |
+ with mock.patch.object(service, 'buildstream_pb2_grpc'):
|
|
57 |
+ yield ReferenceStorageService(server, instances)
|
|
58 |
+ |
|
59 |
+ |
|
60 |
+def test_simple_result(instance, context):
|
|
49 | 61 |
keys = ["rick", "roy", "rach"]
|
50 |
- service = ReferenceStorageService(cache)
|
|
51 | 62 |
|
52 | 63 |
# Check that before adding the ReferenceResult, attempting to fetch it fails
|
53 | 64 |
request = buildstream_pb2.GetReferenceRequest(key=keys[0])
|
54 |
- service.GetReference(request, context)
|
|
65 |
+ instance.GetReference(request, context)
|
|
55 | 66 |
context.set_code.assert_called_once_with(grpc.StatusCode.NOT_FOUND)
|
56 | 67 |
|
57 | 68 |
# Add an ReferenceResult to the cache
|
58 | 69 |
reference_result = remote_execution_pb2.Digest(hash='deckard')
|
59 | 70 |
request = buildstream_pb2.UpdateReferenceRequest(keys=keys,
|
60 | 71 |
digest=reference_result)
|
61 |
- service.UpdateReference(request, context)
|
|
72 |
+ instance.UpdateReference(request, context)
|
|
62 | 73 |
|
63 | 74 |
# Check that fetching it now works
|
64 | 75 |
for key in keys:
|
65 | 76 |
request = buildstream_pb2.GetReferenceRequest(key=key)
|
66 |
- fetched_result = service.GetReference(request, context)
|
|
77 |
+ fetched_result = instance.GetReference(request, context)
|
|
67 | 78 |
assert fetched_result.digest == reference_result
|
68 | 79 |
|
69 | 80 |
|
70 |
-def test_disabled_update_result(cache, context):
|
|
81 |
+def test_disabled_update_result(context):
|
|
71 | 82 |
disabled_push = ReferenceCache(cas, 50, False)
|
72 | 83 |
keys = ["rick", "roy", "rach"]
|
73 |
- service = ReferenceStorageService(disabled_push)
|
|
84 |
+ |
|
85 |
+ with mock.patch.object(service, 'buildstream_pb2_grpc'):
|
|
86 |
+ instance = ReferenceStorageService(server, {'': disabled_push})
|
|
74 | 87 |
|
75 | 88 |
# Add an ReferenceResult to the cache
|
76 | 89 |
reference_result = remote_execution_pb2.Digest(hash='deckard')
|
77 | 90 |
request = buildstream_pb2.UpdateReferenceRequest(keys=keys,
|
78 | 91 |
digest=reference_result)
|
79 |
- service.UpdateReference(request, context)
|
|
92 |
+ instance.UpdateReference(request, context)
|
|
80 | 93 |
|
81 | 94 |
request = buildstream_pb2.UpdateReferenceRequest()
|
82 |
- service.UpdateReference(request, context)
|
|
95 |
+ instance.UpdateReference(request, context)
|
|
83 | 96 |
|
84 | 97 |
context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
|
85 | 98 |
|
... | ... | @@ -87,9 +100,10 @@ def test_disabled_update_result(cache, context): |
87 | 100 |
@pytest.mark.parametrize("allow_updates", [True, False])
|
88 | 101 |
def test_status(allow_updates, context):
|
89 | 102 |
cache = ReferenceCache(cas, 5, allow_updates)
|
90 |
- service = ReferenceStorageService(cache)
|
|
103 |
+ with mock.patch.object(service, 'buildstream_pb2_grpc'):
|
|
104 |
+ instance = ReferenceStorageService(server, {'': cache})
|
|
91 | 105 |
|
92 | 106 |
request = buildstream_pb2.StatusRequest()
|
93 |
- response = service.Status(request, context)
|
|
107 |
+ response = instance.Status(request, context)
|
|
94 | 108 |
|
95 | 109 |
assert response.allow_updates == allow_updates
|