finn pushed to branch finn/separate-services at BuildGrid / buildgrid
Commits:
-
85b2dc07
by Finn Ball at 2018-09-11T12:49:12Z
-
28f13753
by Finn Ball at 2018-09-11T12:49:17Z
-
ce7090bf
by finnball at 2018-09-11T12:49:17Z
11 changed files:
- .gitlab-ci.yml
- + buildgrid/_app/commands/cmd_operation.py
- docs/source/using_dummy_build.rst
- docs/source/using_simple_build.rst
- tests/cas/test_services.py
- tests/cas/test_storage.py
- tests/integration/action_cache_service.py
- tests/integration/bots_service.py
- tests/integration/execution_service.py
- tests/integration/operations_service.py
- tests/integration/reference_storage_service.py
Changes:
... | ... | @@ -30,7 +30,7 @@ before_script: |
30 | 30 |
.run-dummy-job-template: &dummy-job
|
31 | 31 |
stage: test
|
32 | 32 |
script:
|
33 |
- - ${BGD} server start --allow-insecure &
|
|
33 |
+ - ${BGD} server start buildgrid/_app/settings/default.yml &
|
|
34 | 34 |
- sleep 1 # Allow server to boot
|
35 | 35 |
- ${BGD} bot dummy &
|
36 | 36 |
- ${BGD} execute request-dummy --wait-for-completion
|
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+ |
|
16 |
+"""
|
|
17 |
+Operations command
|
|
18 |
+=================
|
|
19 |
+ |
|
20 |
+Check the status of operations
|
|
21 |
+"""
|
|
22 |
+ |
|
23 |
+import logging
|
|
24 |
+from urllib.parse import urlparse
|
|
25 |
+import sys
|
|
26 |
+ |
|
27 |
+import click
|
|
28 |
+import grpc
|
|
29 |
+ |
|
30 |
+from buildgrid._protos.google.longrunning import operations_pb2, operations_pb2_grpc
|
|
31 |
+ |
|
32 |
+from ..cli import pass_context
|
|
33 |
+ |
|
34 |
+ |
|
35 |
+@click.group(name='operation', short_help="Long running operations commands")
|
|
36 |
+@click.option('--remote', type=click.STRING, default='http://localhost:50051', show_default=True,
|
|
37 |
+ help="Remote execution server's URL (port defaults to 50051 if no specified).")
|
|
38 |
+@click.option('--client-key', type=click.Path(exists=True, dir_okay=False), default=None,
|
|
39 |
+ help="Private client key for TLS (PEM-encoded)")
|
|
40 |
+@click.option('--client-cert', type=click.Path(exists=True, dir_okay=False), default=None,
|
|
41 |
+ help="Public client certificate for TLS (PEM-encoded)")
|
|
42 |
+@click.option('--server-cert', type=click.Path(exists=True, dir_okay=False), default=None,
|
|
43 |
+ help="Public server certificate for TLS (PEM-encoded)")
|
|
44 |
+@click.option('--instance-name', type=click.STRING, default='main', show_default=True,
|
|
45 |
+ help="Targeted farm instance name.")
|
|
46 |
+@pass_context
|
|
47 |
+def cli(context, remote, instance_name, client_key, client_cert, server_cert):
|
|
48 |
+ url = urlparse(remote)
|
|
49 |
+ |
|
50 |
+ context.remote = '{}:{}'.format(url.hostname, url.port or 50051)
|
|
51 |
+ context.instance_name = instance_name
|
|
52 |
+ |
|
53 |
+ if url.scheme == 'http':
|
|
54 |
+ context.channel = grpc.insecure_channel(context.remote)
|
|
55 |
+ else:
|
|
56 |
+ credentials = context.load_client_credentials(client_key, client_cert, server_cert)
|
|
57 |
+ if not credentials:
|
|
58 |
+ click.echo("ERROR: no TLS keys were specified and no defaults could be found.\n" +
|
|
59 |
+ "Use `insecure-mode: false` in order to deactivate TLS encryption.\n", err=True)
|
|
60 |
+ sys.exit(-1)
|
|
61 |
+ |
|
62 |
+ context.channel = grpc.secure_channel(context.remote, credentials)
|
|
63 |
+ |
|
64 |
+ context.logger = logging.getLogger(__name__)
|
|
65 |
+ context.logger.debug("Starting for remote {}".format(context.remote))
|
|
66 |
+ |
|
67 |
+ |
|
68 |
+@cli.command('status', short_help="Get the status of an operation.")
|
|
69 |
+@click.argument('operation-name', nargs=1, type=click.STRING, required=True)
|
|
70 |
+@pass_context
|
|
71 |
+def status(context, operation_name):
|
|
72 |
+ context.logger.info("Getting operation status...")
|
|
73 |
+ stub = operations_pb2_grpc.OperationsStub(context.channel)
|
|
74 |
+ |
|
75 |
+ request = operations_pb2.GetOperationRequest(name=operation_name)
|
|
76 |
+ |
|
77 |
+ response = stub.GetOperation(request)
|
|
78 |
+ context.logger.info(response)
|
|
79 |
+ |
|
80 |
+ |
|
81 |
+@cli.command('list', short_help="List operations.")
|
|
82 |
+@pass_context
|
|
83 |
+def lists(context):
|
|
84 |
+ context.logger.info("Getting list of operations")
|
|
85 |
+ stub = operations_pb2_grpc.OperationsStub(context.channel)
|
|
86 |
+ |
|
87 |
+ request = operations_pb2.ListOperationsRequest(name=context.instance_name)
|
|
88 |
+ |
|
89 |
+ response = stub.ListOperations(request)
|
|
90 |
+ |
|
91 |
+ if not response.operations:
|
|
92 |
+ context.logger.warning("No operations to list")
|
|
93 |
+ return
|
|
94 |
+ |
|
95 |
+ for op in response.operations:
|
|
96 |
+ context.logger.info(op)
|
1 |
- |
|
2 | 1 |
.. _dummy-build:
|
3 | 2 |
|
4 | 3 |
Dummy build
|
... | ... | @@ -8,7 +7,7 @@ In one terminal, start a server: |
8 | 7 |
|
9 | 8 |
.. code-block:: sh
|
10 | 9 |
|
11 |
- bgd server start --allow-insecure
|
|
10 |
+ bgd server start buildgrid/_app/settings/default.yml
|
|
12 | 11 |
|
13 | 12 |
In another terminal, send a request for work:
|
14 | 13 |
|
1 |
- |
|
2 | 1 |
.. _simple-build:
|
3 | 2 |
|
4 | 3 |
Simple build
|
... | ... | @@ -27,7 +26,7 @@ Now start a BuildGrid server, passing it a directory it can write a CAS to: |
27 | 26 |
|
28 | 27 |
.. code-block:: sh
|
29 | 28 |
|
30 |
- bgd server start --allow-insecure --cas disk --cas-cache disk --cas-disk-directory /path/to/empty/directory
|
|
29 |
+ bgd server start buildgrid/_app/settings/default.yml
|
|
31 | 30 |
|
32 | 31 |
Start the following bot session:
|
33 | 32 |
|
... | ... | @@ -28,11 +28,13 @@ from buildgrid._protos.google.bytestream import bytestream_pb2 |
28 | 28 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
|
29 | 29 |
from buildgrid.server.cas.storage.storage_abc import StorageABC
|
30 | 30 |
from buildgrid.server.cas.instance import ByteStreamInstance, ContentAddressableStorageInstance
|
31 |
+from buildgrid.server.cas import service
|
|
31 | 32 |
from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
|
32 | 33 |
from buildgrid.settings import HASH
|
33 | 34 |
|
34 | 35 |
|
35 | 36 |
context = mock.create_autospec(_Context)
|
37 |
+server = mock.create_autospec(grpc.server)
|
|
36 | 38 |
|
37 | 39 |
|
38 | 40 |
class SimpleStorage(StorageABC):
|
... | ... | @@ -73,11 +75,12 @@ instances = ["", "test_inst"] |
73 | 75 |
|
74 | 76 |
@pytest.mark.parametrize("data_to_read", test_strings)
|
75 | 77 |
@pytest.mark.parametrize("instance", instances)
|
76 |
-def test_bytestream_read(data_to_read, instance):
|
|
78 |
+@mock.patch.object(service, 'bytestream_pb2_grpc', autospec=True)
|
|
79 |
+def test_bytestream_read(mocked, data_to_read, instance):
|
|
77 | 80 |
storage = SimpleStorage([b"abc", b"defg", data_to_read])
|
78 | 81 |
|
79 | 82 |
bs_instance = ByteStreamInstance(storage)
|
80 |
- servicer = ByteStreamService({instance: bs_instance})
|
|
83 |
+ servicer = ByteStreamService(server, {instance: bs_instance})
|
|
81 | 84 |
|
82 | 85 |
request = bytestream_pb2.ReadRequest()
|
83 | 86 |
if instance != "":
|
... | ... | @@ -91,12 +94,13 @@ def test_bytestream_read(data_to_read, instance): |
91 | 94 |
|
92 | 95 |
|
93 | 96 |
@pytest.mark.parametrize("instance", instances)
|
94 |
-def test_bytestream_read_many(instance):
|
|
97 |
+@mock.patch.object(service, 'bytestream_pb2_grpc', autospec=True)
|
|
98 |
+def test_bytestream_read_many(mocked, instance):
|
|
95 | 99 |
data_to_read = b"testing" * 10000
|
96 | 100 |
|
97 | 101 |
storage = SimpleStorage([b"abc", b"defg", data_to_read])
|
98 | 102 |
bs_instance = ByteStreamInstance(storage)
|
99 |
- servicer = ByteStreamService({instance: bs_instance})
|
|
103 |
+ servicer = ByteStreamService(server, {instance: bs_instance})
|
|
100 | 104 |
|
101 | 105 |
request = bytestream_pb2.ReadRequest()
|
102 | 106 |
if instance != "":
|
... | ... | @@ -111,10 +115,11 @@ def test_bytestream_read_many(instance): |
111 | 115 |
|
112 | 116 |
@pytest.mark.parametrize("instance", instances)
|
113 | 117 |
@pytest.mark.parametrize("extra_data", ["", "/", "/extra/data"])
|
114 |
-def test_bytestream_write(instance, extra_data):
|
|
118 |
+@mock.patch.object(service, 'bytestream_pb2_grpc', autospec=True)
|
|
119 |
+def test_bytestream_write(mocked, instance, extra_data):
|
|
115 | 120 |
storage = SimpleStorage()
|
116 | 121 |
bs_instance = ByteStreamInstance(storage)
|
117 |
- servicer = ByteStreamService({instance: bs_instance})
|
|
122 |
+ servicer = ByteStreamService(server, {instance: bs_instance})
|
|
118 | 123 |
|
119 | 124 |
resource_name = ""
|
120 | 125 |
if instance != "":
|
... | ... | @@ -134,10 +139,11 @@ def test_bytestream_write(instance, extra_data): |
134 | 139 |
assert storage.data[(hash_, 6)] == b'abcdef'
|
135 | 140 |
|
136 | 141 |
|
137 |
-def test_bytestream_write_rejects_wrong_hash():
|
|
142 |
+@mock.patch.object(service, 'bytestream_pb2_grpc', autospec=True)
|
|
143 |
+def test_bytestream_write_rejects_wrong_hash(mocked):
|
|
138 | 144 |
storage = SimpleStorage()
|
139 | 145 |
bs_instance = ByteStreamInstance(storage)
|
140 |
- servicer = ByteStreamService({"": bs_instance})
|
|
146 |
+ servicer = ByteStreamService(server, {"": bs_instance})
|
|
141 | 147 |
|
142 | 148 |
data = b'some data'
|
143 | 149 |
wrong_hash = HASH(b'incorrect').hexdigest()
|
... | ... | @@ -153,10 +159,11 @@ def test_bytestream_write_rejects_wrong_hash(): |
153 | 159 |
|
154 | 160 |
|
155 | 161 |
@pytest.mark.parametrize("instance", instances)
|
156 |
-def test_cas_find_missing_blobs(instance):
|
|
162 |
+@mock.patch.object(service, 'remote_execution_pb2_grpc', autospec=True)
|
|
163 |
+def test_cas_find_missing_blobs(mocked, instance):
|
|
157 | 164 |
storage = SimpleStorage([b'abc', b'def'])
|
158 | 165 |
cas_instance = ContentAddressableStorageInstance(storage)
|
159 |
- servicer = ContentAddressableStorageService({instance: cas_instance})
|
|
166 |
+ servicer = ContentAddressableStorageService(server, {instance: cas_instance})
|
|
160 | 167 |
digests = [
|
161 | 168 |
re_pb2.Digest(hash=HASH(b'def').hexdigest(), size_bytes=3),
|
162 | 169 |
re_pb2.Digest(hash=HASH(b'ghij').hexdigest(), size_bytes=4)
|
... | ... | @@ -168,10 +175,11 @@ def test_cas_find_missing_blobs(instance): |
168 | 175 |
|
169 | 176 |
|
170 | 177 |
@pytest.mark.parametrize("instance", instances)
|
171 |
-def test_cas_batch_update_blobs(instance):
|
|
178 |
+@mock.patch.object(service, 'remote_execution_pb2_grpc', autospec=True)
|
|
179 |
+def test_cas_batch_update_blobs(mocked, instance):
|
|
172 | 180 |
storage = SimpleStorage()
|
173 | 181 |
cas_instance = ContentAddressableStorageInstance(storage)
|
174 |
- servicer = ContentAddressableStorageService({instance: cas_instance})
|
|
182 |
+ servicer = ContentAddressableStorageService(server, {instance: cas_instance})
|
|
175 | 183 |
|
176 | 184 |
update_requests = [
|
177 | 185 |
re_pb2.BatchUpdateBlobsRequest.Request(
|
... | ... | @@ -22,6 +22,7 @@ import tempfile |
22 | 22 |
from unittest import mock
|
23 | 23 |
|
24 | 24 |
import boto3
|
25 |
+import grpc
|
|
25 | 26 |
from grpc._server import _Context
|
26 | 27 |
import pytest
|
27 | 28 |
from moto import mock_s3
|
... | ... | @@ -38,6 +39,7 @@ from buildgrid.settings import HASH |
38 | 39 |
|
39 | 40 |
|
40 | 41 |
context = mock.create_autospec(_Context)
|
42 |
+server = mock.create_autospec(grpc.server)
|
|
41 | 43 |
|
42 | 44 |
abc = b"abc"
|
43 | 45 |
abc_digest = Digest(hash=HASH(abc).hexdigest(), size_bytes=3)
|
... | ... | @@ -66,8 +68,10 @@ class MockStubServer: |
66 | 68 |
def __init__(self):
|
67 | 69 |
instances = {"": MockCASStorage(), "dna": MockCASStorage()}
|
68 | 70 |
self._requests = []
|
69 |
- self._bs_service = service.ByteStreamService(instances)
|
|
70 |
- self._cas_service = service.ContentAddressableStorageService(instances)
|
|
71 |
+ with mock.patch.object(service, 'bytestream_pb2_grpc'):
|
|
72 |
+ self._bs_service = service.ByteStreamService(server, instances)
|
|
73 |
+ with mock.patch.object(service, 'remote_execution_pb2_grpc'):
|
|
74 |
+ self._cas_service = service.ContentAddressableStorageService(server, instances)
|
|
71 | 75 |
|
72 | 76 |
def Read(self, request):
|
73 | 77 |
yield from self._bs_service.Read(request, context)
|
... | ... | @@ -26,10 +26,14 @@ import pytest |
26 | 26 |
|
27 | 27 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
28 | 28 |
from buildgrid.server.cas.storage import lru_memory_cache
|
29 |
+from buildgrid.server.actioncache import service
|
|
29 | 30 |
from buildgrid.server.actioncache.storage import ActionCache
|
30 | 31 |
from buildgrid.server.actioncache.service import ActionCacheService
|
31 | 32 |
|
32 | 33 |
|
34 |
+server = mock.create_autospec(grpc.server)
|
|
35 |
+ |
|
36 |
+ |
|
33 | 37 |
# Can mock this
|
34 | 38 |
@pytest.fixture
|
35 | 39 |
def context():
|
... | ... | @@ -42,36 +46,41 @@ def cas(): |
42 | 46 |
|
43 | 47 |
|
44 | 48 |
@pytest.fixture
|
45 |
-def cache(cas):
|
|
46 |
- yield ActionCache(cas, 50)
|
|
49 |
+def cache_instances(cas):
|
|
50 |
+ yield {"": ActionCache(cas, 50)}
|
|
47 | 51 |
|
48 | 52 |
|
49 |
-def test_simple_action_result(cache, context):
|
|
50 |
- service = ActionCacheService(cache)
|
|
53 |
+def test_simple_action_result(cache_instances, context):
|
|
54 |
+ with mock.patch.object(service, 'remote_execution_pb2_grpc'):
|
|
55 |
+ ac_service = ActionCacheService(server, cache_instances)
|
|
56 |
+ |
|
57 |
+ print(cache_instances)
|
|
51 | 58 |
action_digest = remote_execution_pb2.Digest(hash='sample', size_bytes=4)
|
52 | 59 |
|
53 | 60 |
# Check that before adding the ActionResult, attempting to fetch it fails
|
54 |
- request = remote_execution_pb2.GetActionResultRequest(action_digest=action_digest)
|
|
55 |
- service.GetActionResult(request, context)
|
|
61 |
+ request = remote_execution_pb2.GetActionResultRequest(instance_name="",
|
|
62 |
+ action_digest=action_digest)
|
|
63 |
+ ac_service.GetActionResult(request, context)
|
|
56 | 64 |
context.set_code.assert_called_once_with(grpc.StatusCode.NOT_FOUND)
|
57 | 65 |
|
58 | 66 |
# Add an ActionResult to the cache
|
59 | 67 |
action_result = remote_execution_pb2.ActionResult(stdout_raw=b'example output')
|
60 | 68 |
request = remote_execution_pb2.UpdateActionResultRequest(action_digest=action_digest,
|
61 | 69 |
action_result=action_result)
|
62 |
- service.UpdateActionResult(request, context)
|
|
70 |
+ ac_service.UpdateActionResult(request, context)
|
|
63 | 71 |
|
64 | 72 |
# Check that fetching it now works
|
65 | 73 |
request = remote_execution_pb2.GetActionResultRequest(action_digest=action_digest)
|
66 |
- fetched_result = service.GetActionResult(request, context)
|
|
74 |
+ fetched_result = ac_service.GetActionResult(request, context)
|
|
67 | 75 |
assert fetched_result.stdout_raw == action_result.stdout_raw
|
68 | 76 |
|
69 | 77 |
|
70 |
-def test_disabled_update_action_result(cache, context):
|
|
78 |
+def test_disabled_update_action_result(context):
|
|
71 | 79 |
disabled_push = ActionCache(cas, 50, False)
|
72 |
- service = ActionCacheService(disabled_push)
|
|
80 |
+ with mock.patch.object(service, 'remote_execution_pb2_grpc'):
|
|
81 |
+ ac_service = ActionCacheService(server, {"": disabled_push})
|
|
73 | 82 |
|
74 |
- request = remote_execution_pb2.UpdateActionResultRequest()
|
|
75 |
- service.UpdateActionResult(request, context)
|
|
83 |
+ request = remote_execution_pb2.UpdateActionResultRequest(instance_name='')
|
|
84 |
+ ac_service.UpdateActionResult(request, context)
|
|
76 | 85 |
|
77 | 86 |
context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
|
... | ... | @@ -27,25 +27,21 @@ import pytest |
27 | 27 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
28 | 28 |
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
|
29 | 29 |
from buildgrid.server import job
|
30 |
-from buildgrid.server.instance import BuildGridInstance
|
|
30 |
+from buildgrid.server.controller import ExecutionController
|
|
31 | 31 |
from buildgrid.server.job import LeaseState
|
32 |
-from buildgrid.server.bots.instance import BotsInterface
|
|
32 |
+from buildgrid.server.bots import service
|
|
33 | 33 |
from buildgrid.server.bots.service import BotsService
|
34 | 34 |
|
35 | 35 |
|
36 |
+server = mock.create_autospec(grpc.server)
|
|
37 |
+ |
|
38 |
+ |
|
36 | 39 |
# GRPC context
|
37 | 40 |
@pytest.fixture
|
38 | 41 |
def context():
|
39 | 42 |
yield mock.MagicMock(spec=_Context)
|
40 | 43 |
|
41 | 44 |
|
42 |
-@pytest.fixture
|
|
43 |
-def action_job():
|
|
44 |
- action_digest = remote_execution_pb2.Digest()
|
|
45 |
- j = job.Job(action_digest, None)
|
|
46 |
- yield j
|
|
47 |
- |
|
48 |
- |
|
49 | 45 |
@pytest.fixture
|
50 | 46 |
def bot_session():
|
51 | 47 |
bot = bots_pb2.BotSession()
|
... | ... | @@ -54,20 +50,16 @@ def bot_session(): |
54 | 50 |
|
55 | 51 |
|
56 | 52 |
@pytest.fixture
|
57 |
-def buildgrid():
|
|
58 |
- yield BuildGridInstance()
|
|
59 |
- |
|
60 |
- |
|
61 |
-@pytest.fixture
|
|
62 |
-def bots(schedule):
|
|
63 |
- yield BotsInterface(schedule)
|
|
53 |
+def controller():
|
|
54 |
+ yield ExecutionController()
|
|
64 | 55 |
|
65 | 56 |
|
66 | 57 |
# Instance to test
|
67 | 58 |
@pytest.fixture
|
68 |
-def instance(buildgrid):
|
|
69 |
- instances = {"": buildgrid}
|
|
70 |
- yield BotsService(instances)
|
|
59 |
+def instance(controller):
|
|
60 |
+ instances = {"": controller.bots_interface}
|
|
61 |
+ with mock.patch.object(service, 'bots_pb2_grpc'):
|
|
62 |
+ yield BotsService(server, instances)
|
|
71 | 63 |
|
72 | 64 |
|
73 | 65 |
def test_create_bot_session(bot_session, context, instance):
|
... | ... | @@ -127,12 +119,11 @@ def test_update_bot_session_bot_id_fail(bot_session, context, instance): |
127 | 119 |
|
128 | 120 |
@pytest.mark.parametrize("number_of_jobs", [0, 1, 3, 500])
|
129 | 121 |
def test_number_of_leases(number_of_jobs, bot_session, context, instance):
|
130 |
- request = bots_pb2.CreateBotSessionRequest(bot_session=bot_session)
|
|
131 |
- # Inject work
|
|
122 |
+ |
|
132 | 123 |
for _ in range(0, number_of_jobs):
|
133 |
- action_digest = remote_execution_pb2.Digest()
|
|
134 |
- instance._instances[""].execute(action_digest, True)
|
|
124 |
+ _inject_work(instance._instances[""]._scheduler)
|
|
135 | 125 |
|
126 |
+ request = bots_pb2.CreateBotSessionRequest(bot_session=bot_session)
|
|
136 | 127 |
response = instance.CreateBotSession(request, context)
|
137 | 128 |
|
138 | 129 |
assert len(response.leases) == number_of_jobs
|
... | ... | @@ -141,9 +132,9 @@ def test_number_of_leases(number_of_jobs, bot_session, context, instance): |
141 | 132 |
def test_update_leases_with_work(bot_session, context, instance):
|
142 | 133 |
request = bots_pb2.CreateBotSessionRequest(parent='',
|
143 | 134 |
bot_session=bot_session)
|
144 |
- # Inject work
|
|
135 |
+ |
|
145 | 136 |
action_digest = remote_execution_pb2.Digest(hash='gaff')
|
146 |
- instance._instances[""].execute(action_digest, True)
|
|
137 |
+ _inject_work(instance._instances[""]._scheduler, action_digest)
|
|
147 | 138 |
|
148 | 139 |
response = instance.CreateBotSession(request, context)
|
149 | 140 |
|
... | ... | @@ -165,7 +156,7 @@ def test_update_leases_work_complete(bot_session, context, instance): |
165 | 156 |
|
166 | 157 |
# Inject work
|
167 | 158 |
action_digest = remote_execution_pb2.Digest(hash='gaff')
|
168 |
- instance._instances[""].execute(action_digest, True)
|
|
159 |
+ _inject_work(instance._instances[""]._scheduler, action_digest)
|
|
169 | 160 |
|
170 | 161 |
request = bots_pb2.UpdateBotSessionRequest(name=response.name,
|
171 | 162 |
bot_session=response)
|
... | ... | @@ -193,7 +184,7 @@ def test_work_rejected_by_bot(bot_session, context, instance): |
193 | 184 |
bot_session=bot_session)
|
194 | 185 |
# Inject work
|
195 | 186 |
action_digest = remote_execution_pb2.Digest(hash='gaff')
|
196 |
- instance._instances[""].execute(action_digest, True)
|
|
187 |
+ _inject_work(instance._instances[""]._scheduler, action_digest)
|
|
197 | 188 |
|
198 | 189 |
# Simulated the severed binding between client and server
|
199 | 190 |
response = copy.deepcopy(instance.CreateBotSession(request, context))
|
... | ... | @@ -215,7 +206,7 @@ def test_work_out_of_sync_from_pending(state, bot_session, context, instance): |
215 | 206 |
bot_session=bot_session)
|
216 | 207 |
# Inject work
|
217 | 208 |
action_digest = remote_execution_pb2.Digest(hash='gaff')
|
218 |
- instance._instances[""].execute(action_digest, True)
|
|
209 |
+ _inject_work(instance._instances[""]._scheduler, action_digest)
|
|
219 | 210 |
|
220 | 211 |
# Simulated the severed binding between client and server
|
221 | 212 |
response = copy.deepcopy(instance.CreateBotSession(request, context))
|
... | ... | @@ -236,7 +227,7 @@ def test_work_out_of_sync_from_active(state, bot_session, context, instance): |
236 | 227 |
bot_session=bot_session)
|
237 | 228 |
# Inject work
|
238 | 229 |
action_digest = remote_execution_pb2.Digest(hash='gaff')
|
239 |
- instance._instances[""].execute(action_digest, True)
|
|
230 |
+ _inject_work(instance._instances[""]._scheduler, action_digest)
|
|
240 | 231 |
|
241 | 232 |
# Simulated the severed binding between client and server
|
242 | 233 |
response = copy.deepcopy(instance.CreateBotSession(request, context))
|
... | ... | @@ -263,7 +254,7 @@ def test_work_active_to_active(bot_session, context, instance): |
263 | 254 |
bot_session=bot_session)
|
264 | 255 |
# Inject work
|
265 | 256 |
action_digest = remote_execution_pb2.Digest(hash='gaff')
|
266 |
- instance._instances[""].execute(action_digest, True)
|
|
257 |
+ _inject_work(instance._instances[""]._scheduler, action_digest)
|
|
267 | 258 |
|
268 | 259 |
# Simulated the severed binding between client and server
|
269 | 260 |
response = copy.deepcopy(instance.CreateBotSession(request, context))
|
... | ... | @@ -283,3 +274,10 @@ def test_post_bot_event_temp(context, instance): |
283 | 274 |
instance.PostBotEventTemp(request, context)
|
284 | 275 |
|
285 | 276 |
context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
|
277 |
+ |
|
278 |
+ |
|
279 |
+def _inject_work(scheduler, action_digest=None):
|
|
280 |
+ if not action_digest:
|
|
281 |
+ action_digest = remote_execution_pb2.Digest()
|
|
282 |
+ j = job.Job(action_digest, False)
|
|
283 |
+ scheduler.append_job(j, True)
|
... | ... | @@ -20,21 +20,25 @@ |
20 | 20 |
import uuid
|
21 | 21 |
from unittest import mock
|
22 | 22 |
|
23 |
+from google.protobuf import any_pb2
|
|
23 | 24 |
import grpc
|
24 | 25 |
from grpc._server import _Context
|
25 | 26 |
import pytest
|
26 |
-from google.protobuf import any_pb2
|
|
27 | 27 |
|
28 | 28 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
29 | 29 |
from buildgrid._protos.google.longrunning import operations_pb2
|
30 | 30 |
|
31 | 31 |
from buildgrid.server import job
|
32 |
-from buildgrid.server.instance import BuildGridInstance
|
|
32 |
+from buildgrid.server.controller import ExecutionController
|
|
33 | 33 |
from buildgrid.server.cas.storage import lru_memory_cache
|
34 | 34 |
from buildgrid.server.actioncache.storage import ActionCache
|
35 |
+from buildgrid.server.execution import service
|
|
35 | 36 |
from buildgrid.server.execution.service import ExecutionService
|
36 | 37 |
|
37 | 38 |
|
39 |
+server = mock.create_autospec(grpc.server)
|
|
40 |
+ |
|
41 |
+ |
|
38 | 42 |
@pytest.fixture
|
39 | 43 |
def context():
|
40 | 44 |
cxt = mock.MagicMock(spec=_Context)
|
... | ... | @@ -42,21 +46,21 @@ def context(): |
42 | 46 |
|
43 | 47 |
|
44 | 48 |
@pytest.fixture(params=["action-cache", "no-action-cache"])
|
45 |
-def buildgrid(request):
|
|
49 |
+def controller(request):
|
|
46 | 50 |
if request.param == "action-cache":
|
47 | 51 |
storage = lru_memory_cache.LRUMemoryCache(1024 * 1024)
|
48 | 52 |
cache = ActionCache(storage, 50)
|
49 |
- |
|
50 |
- return BuildGridInstance(action_cache=cache,
|
|
51 |
- cas_storage=storage)
|
|
52 |
- return BuildGridInstance()
|
|
53 |
+ yield ExecutionController(cache, storage)
|
|
54 |
+ else:
|
|
55 |
+ yield ExecutionController()
|
|
53 | 56 |
|
54 | 57 |
|
55 | 58 |
# Instance to test
|
56 | 59 |
@pytest.fixture
|
57 |
-def instance(buildgrid):
|
|
58 |
- instances = {"": buildgrid}
|
|
59 |
- yield ExecutionService(instances)
|
|
60 |
+def instance(controller):
|
|
61 |
+ instances = {"": controller.execution_instance}
|
|
62 |
+ with mock.patch.object(service, 'remote_execution_pb2_grpc'):
|
|
63 |
+ yield ExecutionService(server, instances)
|
|
60 | 64 |
|
61 | 65 |
|
62 | 66 |
@pytest.mark.parametrize("skip_cache_lookup", [True, False])
|
... | ... | @@ -86,7 +90,7 @@ def test_wrong_execute_instance(instance, context): |
86 | 90 |
context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
|
87 | 91 |
|
88 | 92 |
|
89 |
-def test_wait_execution(instance, buildgrid, context):
|
|
93 |
+def test_wait_execution(instance, controller, context):
|
|
90 | 94 |
action_digest = remote_execution_pb2.Digest()
|
91 | 95 |
action_digest.hash = 'zhora'
|
92 | 96 |
|
... | ... | @@ -95,7 +99,7 @@ def test_wait_execution(instance, buildgrid, context): |
95 | 99 |
|
96 | 100 |
request = remote_execution_pb2.WaitExecutionRequest(name="{}/{}".format('', j.name))
|
97 | 101 |
|
98 |
- buildgrid._scheduler.jobs[j.name] = j
|
|
102 |
+ controller.execution_instance._scheduler.jobs[j.name] = j
|
|
99 | 103 |
|
100 | 104 |
action_result_any = any_pb2.Any()
|
101 | 105 |
action_result = remote_execution_pb2.ActionResult()
|
... | ... | @@ -115,7 +119,7 @@ def test_wait_execution(instance, buildgrid, context): |
115 | 119 |
assert result.done is True
|
116 | 120 |
|
117 | 121 |
|
118 |
-def test_wrong_instance_wait_execution(instance, buildgrid, context):
|
|
122 |
+def test_wrong_instance_wait_execution(instance, context):
|
|
119 | 123 |
request = remote_execution_pb2.WaitExecutionRequest(name="blade")
|
120 | 124 |
next(instance.WaitExecution(request, context))
|
121 | 125 |
|
... | ... | @@ -19,21 +19,22 @@ |
19 | 19 |
|
20 | 20 |
from unittest import mock
|
21 | 21 |
|
22 |
+from google.protobuf import any_pb2
|
|
22 | 23 |
import grpc
|
23 | 24 |
from grpc._server import _Context
|
24 | 25 |
import pytest
|
25 | 26 |
|
26 |
-from google.protobuf import any_pb2
|
|
27 |
- |
|
28 | 27 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
29 | 28 |
from buildgrid._protos.google.longrunning import operations_pb2
|
30 | 29 |
|
31 |
-from buildgrid.server.instance import BuildGridInstance
|
|
30 |
+from buildgrid.server.controller import ExecutionController
|
|
32 | 31 |
from buildgrid.server._exceptions import InvalidArgumentError
|
33 | 32 |
|
33 |
+from buildgrid.server.operations import service
|
|
34 | 34 |
from buildgrid.server.operations.service import OperationsService
|
35 | 35 |
|
36 | 36 |
|
37 |
+server = mock.create_autospec(grpc.server)
|
|
37 | 38 |
instance_name = "blade"
|
38 | 39 |
|
39 | 40 |
|
... | ... | @@ -55,21 +56,22 @@ def execute_request(): |
55 | 56 |
|
56 | 57 |
|
57 | 58 |
@pytest.fixture
|
58 |
-def buildgrid():
|
|
59 |
- yield BuildGridInstance()
|
|
59 |
+def controller():
|
|
60 |
+ yield ExecutionController()
|
|
60 | 61 |
|
61 | 62 |
|
62 | 63 |
# Instance to test
|
63 | 64 |
@pytest.fixture
|
64 |
-def instance(buildgrid):
|
|
65 |
- instances = {instance_name: buildgrid}
|
|
66 |
- yield OperationsService(instances)
|
|
65 |
+def instance(controller):
|
|
66 |
+ instances = {instance_name: controller.operations_instance}
|
|
67 |
+ with mock.patch.object(service, 'operations_pb2_grpc'):
|
|
68 |
+ yield OperationsService(server, instances)
|
|
67 | 69 |
|
68 | 70 |
|
69 | 71 |
# Queue an execution, get operation corresponding to that request
|
70 |
-def test_get_operation(instance, buildgrid, execute_request, context):
|
|
71 |
- response_execute = buildgrid.execute(execute_request.action_digest,
|
|
72 |
- execute_request.skip_cache_lookup)
|
|
72 |
+def test_get_operation(instance, controller, execute_request, context):
|
|
73 |
+ response_execute = controller.execution_instance.execute(execute_request.action_digest,
|
|
74 |
+ execute_request.skip_cache_lookup)
|
|
73 | 75 |
|
74 | 76 |
request = operations_pb2.GetOperationRequest()
|
75 | 77 |
|
... | ... | @@ -94,9 +96,9 @@ def test_get_operation_instance_fail(instance, context): |
94 | 96 |
context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
|
95 | 97 |
|
96 | 98 |
|
97 |
-def test_list_operations(instance, buildgrid, execute_request, context):
|
|
98 |
- response_execute = buildgrid.execute(execute_request.action_digest,
|
|
99 |
- execute_request.skip_cache_lookup)
|
|
99 |
+def test_list_operations(instance, controller, execute_request, context):
|
|
100 |
+ response_execute = controller.execution_instance.execute(execute_request.action_digest,
|
|
101 |
+ execute_request.skip_cache_lookup)
|
|
100 | 102 |
|
101 | 103 |
request = operations_pb2.ListOperationsRequest(name=instance_name)
|
102 | 104 |
response = instance.ListOperations(request, context)
|
... | ... | @@ -104,9 +106,9 @@ def test_list_operations(instance, buildgrid, execute_request, context): |
104 | 106 |
assert response.operations[0].name.split('/')[-1] == response_execute.name
|
105 | 107 |
|
106 | 108 |
|
107 |
-def test_list_operations_instance_fail(instance, buildgrid, execute_request, context):
|
|
108 |
- buildgrid.execute(execute_request.action_digest,
|
|
109 |
- execute_request.skip_cache_lookup)
|
|
109 |
+def test_list_operations_instance_fail(instance, controller, execute_request, context):
|
|
110 |
+ controller.execution_instance.execute(execute_request.action_digest,
|
|
111 |
+ execute_request.skip_cache_lookup)
|
|
110 | 112 |
|
111 | 113 |
request = operations_pb2.ListOperationsRequest()
|
112 | 114 |
instance.ListOperations(request, context)
|
... | ... | @@ -114,16 +116,16 @@ def test_list_operations_instance_fail(instance, buildgrid, execute_request, con |
114 | 116 |
context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
|
115 | 117 |
|
116 | 118 |
|
117 |
-def test_list_operations_with_result(instance, buildgrid, execute_request, context):
|
|
118 |
- response_execute = buildgrid.execute(execute_request.action_digest,
|
|
119 |
- execute_request.skip_cache_lookup)
|
|
119 |
+def test_list_operations_with_result(instance, controller, execute_request, context):
|
|
120 |
+ response_execute = controller.execution_instance.execute(execute_request.action_digest,
|
|
121 |
+ execute_request.skip_cache_lookup)
|
|
120 | 122 |
|
121 | 123 |
action_result = remote_execution_pb2.ActionResult()
|
122 | 124 |
output_file = remote_execution_pb2.OutputFile(path='unicorn')
|
123 | 125 |
action_result.output_files.extend([output_file])
|
124 | 126 |
|
125 |
- buildgrid._scheduler.job_complete(response_execute.name,
|
|
126 |
- _pack_any(action_result))
|
|
127 |
+ controller.operations_instance._scheduler.job_complete(response_execute.name,
|
|
128 |
+ _pack_any(action_result))
|
|
127 | 129 |
|
128 | 130 |
request = operations_pb2.ListOperationsRequest(name=instance_name)
|
129 | 131 |
response = instance.ListOperations(request, context)
|
... | ... | @@ -144,9 +146,9 @@ def test_list_operations_empty(instance, context): |
144 | 146 |
|
145 | 147 |
|
146 | 148 |
# Send execution off, delete, try to find operation should fail
|
147 |
-def test_delete_operation(instance, buildgrid, execute_request, context):
|
|
148 |
- response_execute = buildgrid.execute(execute_request.action_digest,
|
|
149 |
- execute_request.skip_cache_lookup)
|
|
149 |
+def test_delete_operation(instance, controller, execute_request, context):
|
|
150 |
+ response_execute = controller.execution_instance.execute(execute_request.action_digest,
|
|
151 |
+ execute_request.skip_cache_lookup)
|
|
150 | 152 |
request = operations_pb2.DeleteOperationRequest()
|
151 | 153 |
request.name = "{}/{}".format(instance_name, response_execute.name)
|
152 | 154 |
instance.DeleteOperation(request, context)
|
... | ... | @@ -155,7 +157,7 @@ def test_delete_operation(instance, buildgrid, execute_request, context): |
155 | 157 |
request.name = "{}/{}".format(instance_name, response_execute.name)
|
156 | 158 |
|
157 | 159 |
with pytest.raises(InvalidArgumentError):
|
158 |
- buildgrid.get_operation(response_execute.name)
|
|
160 |
+ controller.operations_instance.get_operation(response_execute.name)
|
|
159 | 161 |
|
160 | 162 |
|
161 | 163 |
def test_delete_operation_fail(instance, context):
|
... | ... | @@ -25,10 +25,15 @@ from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_p |
25 | 25 |
from buildgrid._protos.buildstream.v2 import buildstream_pb2
|
26 | 26 |
|
27 | 27 |
from buildgrid.server.cas.storage import lru_memory_cache
|
28 |
+from buildgrid.server.referencestorage import service
|
|
28 | 29 |
from buildgrid.server.referencestorage.service import ReferenceStorageService
|
29 | 30 |
from buildgrid.server.referencestorage.storage import ReferenceCache
|
30 | 31 |
|
31 | 32 |
|
33 |
+server = mock.create_autospec(grpc.server)
|
|
34 |
+instance_name = ''
|
|
35 |
+ |
|
36 |
+ |
|
32 | 37 |
# Can mock this
|
33 | 38 |
@pytest.fixture
|
34 | 39 |
def context():
|
... | ... | @@ -45,41 +50,49 @@ def cache(cas): |
45 | 50 |
yield ReferenceCache(cas, 50)
|
46 | 51 |
|
47 | 52 |
|
48 |
-def test_simple_result(cache, context):
|
|
53 |
+@pytest.fixture
|
|
54 |
+def instance(cache):
|
|
55 |
+ instances = {instance_name: cache}
|
|
56 |
+ with mock.patch.object(service, 'buildstream_pb2_grpc'):
|
|
57 |
+ yield ReferenceStorageService(server, instances)
|
|
58 |
+ |
|
59 |
+ |
|
60 |
+def test_simple_result(instance, context):
|
|
49 | 61 |
keys = ["rick", "roy", "rach"]
|
50 |
- service = ReferenceStorageService(cache)
|
|
51 | 62 |
|
52 | 63 |
# Check that before adding the ReferenceResult, attempting to fetch it fails
|
53 | 64 |
request = buildstream_pb2.GetReferenceRequest(key=keys[0])
|
54 |
- service.GetReference(request, context)
|
|
65 |
+ instance.GetReference(request, context)
|
|
55 | 66 |
context.set_code.assert_called_once_with(grpc.StatusCode.NOT_FOUND)
|
56 | 67 |
|
57 | 68 |
# Add an ReferenceResult to the cache
|
58 | 69 |
reference_result = remote_execution_pb2.Digest(hash='deckard')
|
59 | 70 |
request = buildstream_pb2.UpdateReferenceRequest(keys=keys,
|
60 | 71 |
digest=reference_result)
|
61 |
- service.UpdateReference(request, context)
|
|
72 |
+ instance.UpdateReference(request, context)
|
|
62 | 73 |
|
63 | 74 |
# Check that fetching it now works
|
64 | 75 |
for key in keys:
|
65 | 76 |
request = buildstream_pb2.GetReferenceRequest(key=key)
|
66 |
- fetched_result = service.GetReference(request, context)
|
|
77 |
+ fetched_result = instance.GetReference(request, context)
|
|
67 | 78 |
assert fetched_result.digest == reference_result
|
68 | 79 |
|
69 | 80 |
|
70 |
-def test_disabled_update_result(cache, context):
|
|
81 |
+def test_disabled_update_result(context):
|
|
71 | 82 |
disabled_push = ReferenceCache(cas, 50, False)
|
72 | 83 |
keys = ["rick", "roy", "rach"]
|
73 |
- service = ReferenceStorageService(disabled_push)
|
|
84 |
+ |
|
85 |
+ with mock.patch.object(service, 'buildstream_pb2_grpc'):
|
|
86 |
+ instance = ReferenceStorageService(server, {'': disabled_push})
|
|
74 | 87 |
|
75 | 88 |
# Add an ReferenceResult to the cache
|
76 | 89 |
reference_result = remote_execution_pb2.Digest(hash='deckard')
|
77 | 90 |
request = buildstream_pb2.UpdateReferenceRequest(keys=keys,
|
78 | 91 |
digest=reference_result)
|
79 |
- service.UpdateReference(request, context)
|
|
92 |
+ instance.UpdateReference(request, context)
|
|
80 | 93 |
|
81 | 94 |
request = buildstream_pb2.UpdateReferenceRequest()
|
82 |
- service.UpdateReference(request, context)
|
|
95 |
+ instance.UpdateReference(request, context)
|
|
83 | 96 |
|
84 | 97 |
context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
|
85 | 98 |
|
... | ... | @@ -87,9 +100,10 @@ def test_disabled_update_result(cache, context): |
87 | 100 |
@pytest.mark.parametrize("allow_updates", [True, False])
|
88 | 101 |
def test_status(allow_updates, context):
|
89 | 102 |
cache = ReferenceCache(cas, 5, allow_updates)
|
90 |
- service = ReferenceStorageService(cache)
|
|
103 |
+ with mock.patch.object(service, 'buildstream_pb2_grpc'):
|
|
104 |
+ instance = ReferenceStorageService(server, {'': cache})
|
|
91 | 105 |
|
92 | 106 |
request = buildstream_pb2.StatusRequest()
|
93 |
- response = service.Status(request, context)
|
|
107 |
+ response = instance.Status(request, context)
|
|
94 | 108 |
|
95 | 109 |
assert response.allow_updates == allow_updates
|