finnball pushed to branch finn/artifact-cas at BuildGrid / buildgrid
Commits:
-
83d79fd1
by finn at 2018-08-21T11:19:54Z
-
19bcb6a6
by finn at 2018-08-21T11:19:57Z
-
8312df91
by finn at 2018-08-21T11:19:57Z
13 changed files:
- app/commands/cmd_server.py
- buildgrid/server/build_grid_server.py
- + buildgrid/server/cas/reference_cache.py
- + buildgrid/server/cas/reference_storage_service.py
- + buildgrid/server/execution/action_cache.py
- buildgrid/server/execution/action_cache_service.py
- buildgrid/server/scheduler.py
- docs/source/reference_cli.rst
- tests/action_cache.py
- tests/cas/test_storage.py
- tests/integration/action_cache_service.py
- + tests/integration/reference_storage_service.py
- + tests/reference_cache.py
Changes:
... | ... | @@ -28,11 +28,11 @@ import logging |
28 | 28 |
import click
|
29 | 29 |
|
30 | 30 |
from buildgrid.server import build_grid_server
|
31 |
-from buildgrid.server.action_cache import ActionCache
|
|
32 | 31 |
from buildgrid.server.cas.storage.disk import DiskStorage
|
33 | 32 |
from buildgrid.server.cas.storage.lru_memory_cache import LRUMemoryCache
|
34 | 33 |
from buildgrid.server.cas.storage.s3 import S3Storage
|
35 | 34 |
from buildgrid.server.cas.storage.with_cache import WithCacheStorage
|
35 |
+from buildgrid.server.execution.action_cache import ActionCache
|
|
36 | 36 |
|
37 | 37 |
from ..cli import pass_context
|
38 | 38 |
|
... | ... | @@ -72,36 +72,31 @@ def cli(context): |
72 | 72 |
def start(context, port, max_cached_actions, allow_uar, cas, **cas_args):
|
73 | 73 |
context.logger.info("Starting on port {}".format(port))
|
74 | 74 |
|
75 |
- loop = asyncio.get_event_loop()
|
|
76 |
- |
|
77 | 75 |
cas_storage = _make_cas_storage(context, cas, cas_args)
|
76 |
+ |
|
78 | 77 |
if cas_storage is None:
|
79 | 78 |
context.logger.info("Running without CAS - action cache will be unavailable")
|
80 | 79 |
action_cache = None
|
80 |
+ |
|
81 | 81 |
else:
|
82 |
- action_cache = ActionCache(cas_storage, max_cached_actions)
|
|
82 |
+ action_cache = ActionCache(cas_storage, max_cached_actions, allow_uar)
|
|
83 | 83 |
|
84 | 84 |
server = build_grid_server.BuildGridServer(port,
|
85 | 85 |
cas_storage=cas_storage,
|
86 |
- action_cache=action_cache,
|
|
87 |
- allow_update_action_result=allow_uar)
|
|
88 |
- |
|
86 |
+ action_cache=action_cache)
|
|
87 |
+ loop = asyncio.get_event_loop()
|
|
89 | 88 |
try:
|
90 |
- asyncio.ensure_future(server.start())
|
|
89 |
+ server.start()
|
|
91 | 90 |
loop.run_forever()
|
91 |
+ |
|
92 | 92 |
except KeyboardInterrupt:
|
93 | 93 |
pass
|
94 |
+ |
|
94 | 95 |
finally:
|
95 |
- loop.run_until_complete(server.stop())
|
|
96 |
+ server.stop()
|
|
96 | 97 |
loop.close()
|
97 | 98 |
|
98 | 99 |
|
99 |
-@cli.command('stop', short_help="Request a server to teardown.")
|
|
100 |
-@pass_context
|
|
101 |
-def stop(context):
|
|
102 |
- context.logger.error("Not implemented yet")
|
|
103 |
- |
|
104 |
- |
|
105 | 100 |
def _make_cas_storage(context, cas_type, cas_args):
|
106 | 101 |
"""Returns the storage provider corresponding to the given `cas_type`,
|
107 | 102 |
or None if the provider cannot be created.
|
... | ... | @@ -44,8 +44,7 @@ from .worker.bots_interface import BotsInterface |
44 | 44 |
|
45 | 45 |
class BuildGridServer:
|
46 | 46 |
|
47 |
- def __init__(self, port='50051', max_workers=10, cas_storage=None,
|
|
48 |
- action_cache=None, allow_update_action_result=True):
|
|
47 |
+ def __init__(self, port='50051', max_workers=10, cas_storage=None, action_cache=None):
|
|
49 | 48 |
port = '[::]:{0}'.format(port)
|
50 | 49 |
scheduler = Scheduler(action_cache)
|
51 | 50 |
bots_interface = BotsInterface(scheduler)
|
... | ... | @@ -68,13 +67,12 @@ class BuildGridServer: |
68 | 67 |
bytestream_pb2_grpc.add_ByteStreamServicer_to_server(ByteStreamService(cas_storage),
|
69 | 68 |
self._server)
|
70 | 69 |
if action_cache is not None:
|
71 |
- action_cache_service = ActionCacheService(action_cache,
|
|
72 |
- allow_update_action_result)
|
|
70 |
+ action_cache_service = ActionCacheService(action_cache)
|
|
73 | 71 |
remote_execution_pb2_grpc.add_ActionCacheServicer_to_server(action_cache_service,
|
74 | 72 |
self._server)
|
75 | 73 |
|
76 |
- async def start(self):
|
|
74 |
+ def start(self):
|
|
77 | 75 |
self._server.start()
|
78 | 76 |
|
79 |
- async def stop(self):
|
|
77 |
+ def stop(self):
|
|
80 | 78 |
self._server.stop(0)
|
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+ |
|
16 |
+"""
|
|
17 |
+Reference Cache
|
|
18 |
+==================
|
|
19 |
+ |
|
20 |
+Implements an in-memory reference cache.
|
|
21 |
+ |
|
22 |
+For a given key, it
|
|
23 |
+"""
|
|
24 |
+ |
|
25 |
+import collections
|
|
26 |
+ |
|
27 |
+from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
|
28 |
+ |
|
29 |
+from .._exceptions import NotFoundError
|
|
30 |
+ |
|
31 |
+ |
|
32 |
+class ReferenceCache:
|
|
33 |
+ |
|
34 |
+ def __init__(self, storage, max_cached_refs, allow_updates=True):
|
|
35 |
+ """ Initialises a new ReferenceCache instance.
|
|
36 |
+ |
|
37 |
+ Args:
|
|
38 |
+ storage (StorageABC): storage backend instance to be used.
|
|
39 |
+ max_cached_refs (int): maximum number of entries to be stored.
|
|
40 |
+ allow_updates (bool): allow the client to write to storage
|
|
41 |
+ """
|
|
42 |
+ self._allow_updates = allow_updates
|
|
43 |
+ self._storage = storage
|
|
44 |
+ self._max_cached_refs = max_cached_refs
|
|
45 |
+ self._digest_map = collections.OrderedDict()
|
|
46 |
+ |
|
47 |
+ @property
|
|
48 |
+ def allow_updates(self):
|
|
49 |
+ return self._allow_updates
|
|
50 |
+ |
|
51 |
+ def get_digest_reference(self, key):
|
|
52 |
+ """Retrieves the cached Digest for the given key.
|
|
53 |
+ |
|
54 |
+ Args:
|
|
55 |
+ key: key for Digest to query.
|
|
56 |
+ |
|
57 |
+ Returns:
|
|
58 |
+ The cached Digest matching the given key or raises
|
|
59 |
+ NotFoundError.
|
|
60 |
+ """
|
|
61 |
+ if key in self._digest_map:
|
|
62 |
+ reference_result = self._storage.get_message(self._digest_map[key], remote_execution_pb2.Digest)
|
|
63 |
+ |
|
64 |
+ if reference_result is not None:
|
|
65 |
+ return reference_result
|
|
66 |
+ |
|
67 |
+ del self._digest_map[key]
|
|
68 |
+ |
|
69 |
+ raise NotFoundError("Key not found: {}".format(key))
|
|
70 |
+ |
|
71 |
+ def get_action_reference(self, key):
|
|
72 |
+ """Retrieves the cached ActionResult for the given Action digest.
|
|
73 |
+ |
|
74 |
+ Args:
|
|
75 |
+ key: key for ActionResult to query.
|
|
76 |
+ |
|
77 |
+ Returns:
|
|
78 |
+ The cached ActionResult matching the given key or raises
|
|
79 |
+ NotFoundError.
|
|
80 |
+ """
|
|
81 |
+ if key in self._digest_map:
|
|
82 |
+ reference_result = self._storage.get_message(self._digest_map[key], remote_execution_pb2.ActionResult)
|
|
83 |
+ |
|
84 |
+ if reference_result is not None:
|
|
85 |
+ if self._action_result_blobs_still_exist(reference_result):
|
|
86 |
+ self._digest_map.move_to_end(key)
|
|
87 |
+ return reference_result
|
|
88 |
+ |
|
89 |
+ del self._digest_map[key]
|
|
90 |
+ |
|
91 |
+ raise NotFoundError("Key not found: {}".format(key))
|
|
92 |
+ |
|
93 |
+ def update_reference(self, key, result):
|
|
94 |
+ """Stores the result in cache for the given key.
|
|
95 |
+ |
|
96 |
+ If the cache size limit has been reached, the oldest cache entries will
|
|
97 |
+ be dropped before insertion so that the cache size never exceeds the
|
|
98 |
+ maximum numbers of entries allowed.
|
|
99 |
+ |
|
100 |
+ Args:
|
|
101 |
+ key: key to store result.
|
|
102 |
+ result (Digest): result digest to store.
|
|
103 |
+ """
|
|
104 |
+ if not self._allow_updates:
|
|
105 |
+ raise NotImplementedError("Updating cache not allowed")
|
|
106 |
+ |
|
107 |
+ if self._max_cached_refs == 0:
|
|
108 |
+ return
|
|
109 |
+ |
|
110 |
+ while len(self._digest_map) >= self._max_cached_refs:
|
|
111 |
+ self._digest_map.popitem(last=False)
|
|
112 |
+ |
|
113 |
+ result_digest = self._storage.put_message(result)
|
|
114 |
+ self._digest_map[key] = result_digest
|
|
115 |
+ |
|
116 |
+ def _action_result_blobs_still_exist(self, action_result):
|
|
117 |
+ """Checks CAS for ActionResult output blobs existance.
|
|
118 |
+ |
|
119 |
+ Args:
|
|
120 |
+ action_result (ActionResult): ActionResult to search referenced
|
|
121 |
+ output blobs for.
|
|
122 |
+ |
|
123 |
+ Returns:
|
|
124 |
+ True if all referenced blobs are present in CAS, False otherwise.
|
|
125 |
+ """
|
|
126 |
+ blobs_needed = []
|
|
127 |
+ |
|
128 |
+ for output_file in action_result.output_files:
|
|
129 |
+ blobs_needed.append(output_file.digest)
|
|
130 |
+ |
|
131 |
+ for output_directory in action_result.output_directories:
|
|
132 |
+ blobs_needed.append(output_directory.tree_digest)
|
|
133 |
+ tree = self._storage.get_message(output_directory.tree_digest,
|
|
134 |
+ remote_execution_pb2.Tree)
|
|
135 |
+ if tree is None:
|
|
136 |
+ return False
|
|
137 |
+ |
|
138 |
+ for file_node in tree.root.files:
|
|
139 |
+ blobs_needed.append(file_node.digest)
|
|
140 |
+ |
|
141 |
+ for child in tree.children:
|
|
142 |
+ for file_node in child.files:
|
|
143 |
+ blobs_needed.append(file_node.digest)
|
|
144 |
+ |
|
145 |
+ if action_result.stdout_digest.hash and not action_result.stdout_raw:
|
|
146 |
+ blobs_needed.append(action_result.stdout_digest)
|
|
147 |
+ |
|
148 |
+ if action_result.stderr_digest.hash and not action_result.stderr_raw:
|
|
149 |
+ blobs_needed.append(action_result.stderr_digest)
|
|
150 |
+ |
|
151 |
+ missing = self._storage.missing_blobs(blobs_needed)
|
|
152 |
+ return len(missing) == 0
|
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+import logging
|
|
16 |
+ |
|
17 |
+import grpc
|
|
18 |
+ |
|
19 |
+from buildgrid._protos.buildstream.v2 import buildstream_pb2
|
|
20 |
+from buildgrid._protos.buildstream.v2 import buildstream_pb2_grpc
|
|
21 |
+ |
|
22 |
+from .._exceptions import NotFoundError
|
|
23 |
+ |
|
24 |
+ |
|
25 |
+class ReferenceStorageService(buildstream_pb2_grpc.ReferenceStorageServicer):
|
|
26 |
+ |
|
27 |
+ def __init__(self, reference_cache):
|
|
28 |
+ self._reference_cache = reference_cache
|
|
29 |
+ self.logger = logging.getLogger(__name__)
|
|
30 |
+ |
|
31 |
+ def GetReference(self, request, context):
|
|
32 |
+ try:
|
|
33 |
+ response = buildstream_pb2.GetReferenceResponse()
|
|
34 |
+ response.digest.CopyFrom(self._reference_cache.get_digest_reference(request.key))
|
|
35 |
+ return response
|
|
36 |
+ |
|
37 |
+ except NotFoundError:
|
|
38 |
+ context.set_code(grpc.StatusCode.NOT_FOUND)
|
|
39 |
+ |
|
40 |
+ def UpdateReference(self, request, context):
|
|
41 |
+ try:
|
|
42 |
+ for key in request.keys:
|
|
43 |
+ self._reference_cache.update_reference(key, request.digest)
|
|
44 |
+ |
|
45 |
+ return buildstream_pb2.UpdateReferenceResponse()
|
|
46 |
+ |
|
47 |
+ except NotImplementedError:
|
|
48 |
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
|
49 |
+ |
|
50 |
+ def Status(self, request, context):
|
|
51 |
+ allow_updates = self._reference_cache.allow_updates
|
|
52 |
+ return buildstream_pb2.StatusResponse(allow_updates=allow_updates)
|
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+ |
|
16 |
+"""
|
|
17 |
+Action Cache
|
|
18 |
+============
|
|
19 |
+ |
|
20 |
+Implements an in-memory action Cache
|
|
21 |
+"""
|
|
22 |
+ |
|
23 |
+ |
|
24 |
+from ..cas.reference_cache import ReferenceCache
|
|
25 |
+ |
|
26 |
+ |
|
27 |
+class ActionCache(ReferenceCache):
|
|
28 |
+ |
|
29 |
+ def get_action_result(self, action_digest):
|
|
30 |
+ key = self._get_key(action_digest)
|
|
31 |
+ return self.get_action_reference(key)
|
|
32 |
+ |
|
33 |
+ def update_action_result(self, action_digest, action_result):
|
|
34 |
+ key = self._get_key(action_digest)
|
|
35 |
+ self.update_reference(key, action_result)
|
|
36 |
+ |
|
37 |
+ def _get_key(self, action_digest):
|
|
38 |
+ return (action_digest.hash, action_digest.size_bytes)
|
... | ... | @@ -26,27 +26,28 @@ import logging |
26 | 26 |
|
27 | 27 |
import grpc
|
28 | 28 |
|
29 |
-from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
|
30 | 29 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
|
31 | 30 |
|
31 |
+from .._exceptions import NotFoundError
|
|
32 |
+ |
|
32 | 33 |
|
33 | 34 |
class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer):
|
34 | 35 |
|
35 |
- def __init__(self, action_cache, allow_updates=True):
|
|
36 |
+ def __init__(self, action_cache):
|
|
36 | 37 |
self._action_cache = action_cache
|
37 |
- self._allow_updates = allow_updates
|
|
38 | 38 |
self.logger = logging.getLogger(__name__)
|
39 | 39 |
|
40 | 40 |
def GetActionResult(self, request, context):
|
41 |
- result = self._action_cache.get_action_result(request.action_digest)
|
|
42 |
- if result is None:
|
|
41 |
+ try:
|
|
42 |
+ return self._action_cache.get_action_result(request.action_digest)
|
|
43 |
+ |
|
44 |
+ except NotFoundError:
|
|
43 | 45 |
context.set_code(grpc.StatusCode.NOT_FOUND)
|
44 |
- return remote_execution_pb2.ActionResult()
|
|
45 |
- return result
|
|
46 | 46 |
|
47 | 47 |
def UpdateActionResult(self, request, context):
|
48 |
- if not self._allow_updates:
|
|
48 |
+ try:
|
|
49 |
+ self._action_cache.update_action_result(request.action_digest, request.action_result)
|
|
50 |
+ return request.action_result
|
|
51 |
+ |
|
52 |
+ except NotImplementedError:
|
|
49 | 53 |
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
50 |
- return remote_execution_pb2.ActionResult()
|
|
51 |
- self._action_cache.put_action_result(request.action_digest, request.action_result)
|
|
52 |
- return request.action_result
|
... | ... | @@ -25,6 +25,8 @@ from collections import deque |
25 | 25 |
|
26 | 26 |
from google.protobuf import any_pb2
|
27 | 27 |
|
28 |
+ |
|
29 |
+from buildgrid.server._exceptions import NotFoundError
|
|
28 | 30 |
from buildgrid._protos.google.longrunning import operations_pb2
|
29 | 31 |
|
30 | 32 |
from .job import ExecuteStage, LeaseState
|
... | ... | @@ -35,7 +37,7 @@ class Scheduler: |
35 | 37 |
MAX_N_TRIES = 5
|
36 | 38 |
|
37 | 39 |
def __init__(self, action_cache=None):
|
38 |
- self.action_cache = action_cache
|
|
40 |
+ self._action_cache = action_cache
|
|
39 | 41 |
self.jobs = {}
|
40 | 42 |
self.queue = deque()
|
41 | 43 |
|
... | ... | @@ -50,17 +52,23 @@ class Scheduler: |
50 | 52 |
|
51 | 53 |
def append_job(self, job, skip_cache_lookup=False):
|
52 | 54 |
self.jobs[job.name] = job
|
53 |
- if self.action_cache is not None and not skip_cache_lookup:
|
|
54 |
- cached_result = self.action_cache.get_action_result(job.action_digest)
|
|
55 |
- if cached_result is not None:
|
|
55 |
+ if self._action_cache is not None and not skip_cache_lookup:
|
|
56 |
+ try:
|
|
57 |
+ cached_result = self._action_cache.get_action_result(job.action_digest)
|
|
58 |
+ except NotFoundError:
|
|
59 |
+ self.queue.append(job)
|
|
60 |
+ job.update_execute_stage(ExecuteStage.QUEUED)
|
|
61 |
+ |
|
62 |
+ else:
|
|
56 | 63 |
cached_result_any = any_pb2.Any()
|
57 | 64 |
cached_result_any.Pack(cached_result)
|
58 | 65 |
job.result = cached_result_any
|
59 | 66 |
job.result_cached = True
|
60 | 67 |
job.update_execute_stage(ExecuteStage.COMPLETED)
|
61 |
- return
|
|
62 |
- self.queue.append(job)
|
|
63 |
- job.update_execute_stage(ExecuteStage.QUEUED)
|
|
68 |
+ |
|
69 |
+ else:
|
|
70 |
+ self.queue.append(job)
|
|
71 |
+ job.update_execute_stage(ExecuteStage.QUEUED)
|
|
64 | 72 |
|
65 | 73 |
def retry_job(self, name):
|
66 | 74 |
if name in self.jobs:
|
... | ... | @@ -81,8 +89,8 @@ class Scheduler: |
81 | 89 |
job.result = result
|
82 | 90 |
job.update_execute_stage(ExecuteStage.COMPLETED)
|
83 | 91 |
self.jobs[name] = job
|
84 |
- if not job.do_not_cache and self.action_cache is not None:
|
|
85 |
- self.action_cache.put_action_result(job.action_digest, result)
|
|
92 |
+ if not job.do_not_cache and self._action_cache is not None:
|
|
93 |
+ self._action_cache.put_action_result(job.action_digest, result)
|
|
86 | 94 |
|
87 | 95 |
def get_operations(self):
|
88 | 96 |
response = operations_pb2.ListOperationsResponse()
|
... | ... | @@ -117,10 +117,3 @@ BuildGrid's Command Line Interface (CLI) reference documentation. |
117 | 117 |
|
118 | 118 |
.. click:: app.commands.cmd_server:start
|
119 | 119 |
:prog: bgd server start
|
120 |
- |
|
121 |
-----
|
|
122 |
- |
|
123 |
-.. _invoking_bgd_server_stop:
|
|
124 |
- |
|
125 |
-.. click:: app.commands.cmd_server:stop
|
|
126 |
- :prog: bgd server stop
|
... | ... | @@ -11,17 +11,16 @@ |
11 | 11 |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
12 | 12 |
# See the License for the specific language governing permissions and
|
13 | 13 |
# limitations under the License.
|
14 |
-#
|
|
15 |
-# Authors:
|
|
16 |
-# Carter Sande <csande bloomberg net>
|
|
14 |
+ |
|
17 | 15 |
|
18 | 16 |
# pylint: disable=redefined-outer-name
|
19 | 17 |
|
20 | 18 |
import pytest
|
21 | 19 |
|
22 |
-from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
|
23 |
-from buildgrid.server import action_cache
|
|
24 | 20 |
from buildgrid.server.cas.storage import lru_memory_cache
|
21 |
+from buildgrid.server.execution import action_cache
|
|
22 |
+from buildgrid.server._exceptions import NotFoundError
|
|
23 |
+from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
|
25 | 24 |
|
26 | 25 |
|
27 | 26 |
@pytest.fixture
|
... | ... | @@ -35,8 +34,9 @@ def test_null_action_cache(cas): |
35 | 34 |
action_digest1 = remote_execution_pb2.Digest(hash='alpha', size_bytes=4)
|
36 | 35 |
dummy_result = remote_execution_pb2.ActionResult()
|
37 | 36 |
|
38 |
- cache.put_action_result(action_digest1, dummy_result)
|
|
39 |
- assert cache.get_action_result(action_digest1) is None
|
|
37 |
+ cache.update_action_result(action_digest1, dummy_result)
|
|
38 |
+ with pytest.raises(NotFoundError):
|
|
39 |
+ cache.get_action_result(action_digest1)
|
|
40 | 40 |
|
41 | 41 |
|
42 | 42 |
def test_action_cache_expiry(cas):
|
... | ... | @@ -47,16 +47,18 @@ def test_action_cache_expiry(cas): |
47 | 47 |
action_digest3 = remote_execution_pb2.Digest(hash='charlie', size_bytes=4)
|
48 | 48 |
dummy_result = remote_execution_pb2.ActionResult()
|
49 | 49 |
|
50 |
- cache.put_action_result(action_digest1, dummy_result)
|
|
51 |
- cache.put_action_result(action_digest2, dummy_result)
|
|
50 |
+ cache.update_action_result(action_digest1, dummy_result)
|
|
51 |
+ cache.update_action_result(action_digest2, dummy_result)
|
|
52 | 52 |
|
53 | 53 |
# Get digest 1 (making 2 the least recently used)
|
54 | 54 |
assert cache.get_action_result(action_digest1) is not None
|
55 | 55 |
# Add digest 3 (so 2 gets removed from the cache)
|
56 |
- cache.put_action_result(action_digest3, dummy_result)
|
|
56 |
+ cache.update_action_result(action_digest3, dummy_result)
|
|
57 | 57 |
|
58 | 58 |
assert cache.get_action_result(action_digest1) is not None
|
59 |
- assert cache.get_action_result(action_digest2) is None
|
|
59 |
+ with pytest.raises(NotFoundError):
|
|
60 |
+ cache.get_action_result(action_digest2)
|
|
61 |
+ |
|
60 | 62 |
assert cache.get_action_result(action_digest3) is not None
|
61 | 63 |
|
62 | 64 |
|
... | ... | @@ -67,34 +69,35 @@ def test_action_cache_checks_cas(cas): |
67 | 69 |
action_digest2 = remote_execution_pb2.Digest(hash='bravo', size_bytes=4)
|
68 | 70 |
action_digest3 = remote_execution_pb2.Digest(hash='charlie', size_bytes=4)
|
69 | 71 |
|
70 |
- # Create a tree that references digests in CAS
|
|
72 |
+ # Create a tree that actions digests in CAS
|
|
71 | 73 |
sample_digest = cas.put_message(remote_execution_pb2.Command(arguments=["sample"]))
|
72 | 74 |
tree = remote_execution_pb2.Tree()
|
73 | 75 |
tree.root.files.add().digest.CopyFrom(sample_digest)
|
74 | 76 |
tree.children.add().files.add().digest.CopyFrom(sample_digest)
|
75 | 77 |
tree_digest = cas.put_message(tree)
|
76 | 78 |
|
77 |
- # Add an ActionResult that references real digests to the cache
|
|
79 |
+ # Add an ActionResult that actions real digests to the cache
|
|
78 | 80 |
action_result1 = remote_execution_pb2.ActionResult()
|
79 | 81 |
action_result1.output_directories.add().tree_digest.CopyFrom(tree_digest)
|
80 | 82 |
action_result1.output_files.add().digest.CopyFrom(sample_digest)
|
81 | 83 |
action_result1.stdout_digest.CopyFrom(sample_digest)
|
82 | 84 |
action_result1.stderr_digest.CopyFrom(sample_digest)
|
83 |
- cache.put_action_result(action_digest1, action_result1)
|
|
85 |
+ cache.update_action_result(action_digest1, action_result1)
|
|
84 | 86 |
|
85 |
- # Add ActionResults that reference fake digests to the cache
|
|
87 |
+ # Add ActionResults that action fake digests to the cache
|
|
86 | 88 |
action_result2 = remote_execution_pb2.ActionResult()
|
87 | 89 |
action_result2.output_directories.add().tree_digest.hash = "nonexistent"
|
88 | 90 |
action_result2.output_directories[0].tree_digest.size_bytes = 8
|
89 |
- cache.put_action_result(action_digest2, action_result2)
|
|
91 |
+ cache.update_action_result(action_digest2, action_result2)
|
|
90 | 92 |
|
91 | 93 |
action_result3 = remote_execution_pb2.ActionResult()
|
92 | 94 |
action_result3.stdout_digest.hash = "nonexistent"
|
93 | 95 |
action_result3.stdout_digest.size_bytes = 8
|
94 |
- cache.put_action_result(action_digest3, action_result3)
|
|
96 |
+ cache.update_action_result(action_digest3, action_result3)
|
|
95 | 97 |
|
96 | 98 |
# Verify we can get the first ActionResult but not the others
|
97 | 99 |
fetched_result1 = cache.get_action_result(action_digest1)
|
98 | 100 |
assert fetched_result1.output_directories[0].tree_digest.hash == tree_digest.hash
|
99 |
- assert cache.get_action_result(action_digest2) is None
|
|
100 |
- assert cache.get_action_result(action_digest3) is None
|
|
101 |
+ with pytest.raises(NotFoundError):
|
|
102 |
+ cache.get_action_result(action_digest2)
|
|
103 |
+ cache.get_action_result(action_digest3)
|
... | ... | @@ -20,9 +20,10 @@ |
20 | 20 |
import tempfile
|
21 | 21 |
|
22 | 22 |
import boto3
|
23 |
-from moto import mock_s3
|
|
24 | 23 |
import pytest
|
25 | 24 |
|
25 |
+from moto import mock_s3
|
|
26 |
+ |
|
26 | 27 |
from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
|
27 | 28 |
from buildgrid.server.cas.storage.lru_memory_cache import LRUMemoryCache
|
28 | 29 |
from buildgrid.server.cas.storage.disk import DiskStorage
|
... | ... | @@ -23,10 +23,10 @@ import grpc |
23 | 23 |
from grpc._server import _Context
|
24 | 24 |
import pytest
|
25 | 25 |
|
26 |
+ |
|
26 | 27 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
27 |
-from buildgrid.server import action_cache
|
|
28 | 28 |
from buildgrid.server.cas.storage import lru_memory_cache
|
29 |
-from buildgrid.server.execution import action_cache_service
|
|
29 |
+from buildgrid.server.execution import action_cache, action_cache_service
|
|
30 | 30 |
|
31 | 31 |
|
32 | 32 |
# Can mock this
|
... | ... | @@ -67,7 +67,8 @@ def test_simple_action_result(cache, context): |
67 | 67 |
|
68 | 68 |
|
69 | 69 |
def test_disabled_update_action_result(cache, context):
|
70 |
- service = action_cache_service.ActionCacheService(cache, False)
|
|
70 |
+ disabled_push = action_cache.ActionCache(cas, 50, False)
|
|
71 |
+ service = action_cache_service.ActionCacheService(disabled_push)
|
|
71 | 72 |
|
72 | 73 |
request = remote_execution_pb2.UpdateActionResultRequest()
|
73 | 74 |
service.UpdateActionResult(request, context)
|
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+# pylint: disable=redefined-outer-name
|
|
16 |
+ |
|
17 |
+from unittest import mock
|
|
18 |
+ |
|
19 |
+import grpc
|
|
20 |
+from grpc._server import _Context
|
|
21 |
+ |
|
22 |
+import pytest
|
|
23 |
+ |
|
24 |
+from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
|
25 |
+from buildgrid._protos.buildstream.v2 import buildstream_pb2
|
|
26 |
+ |
|
27 |
+from buildgrid.server.cas.storage import lru_memory_cache
|
|
28 |
+from buildgrid.server.cas import reference_cache, reference_storage_service
|
|
29 |
+ |
|
30 |
+ |
|
31 |
+# Can mock this
|
|
32 |
+@pytest.fixture
|
|
33 |
+def context():
|
|
34 |
+ yield mock.MagicMock(spec=_Context)
|
|
35 |
+ |
|
36 |
+ |
|
37 |
+@pytest.fixture
|
|
38 |
+def cas():
|
|
39 |
+ yield lru_memory_cache.LRUMemoryCache(1024 * 1024)
|
|
40 |
+ |
|
41 |
+ |
|
42 |
+@pytest.fixture
|
|
43 |
+def cache(cas):
|
|
44 |
+ yield reference_cache.ReferenceCache(cas, 50)
|
|
45 |
+ |
|
46 |
+ |
|
47 |
+def test_simple_result(cache, context):
|
|
48 |
+ keys = ["rick", "roy", "rach"]
|
|
49 |
+ service = reference_storage_service.ReferenceStorageService(cache)
|
|
50 |
+ |
|
51 |
+ # Check that before adding the ReferenceResult, attempting to fetch it fails
|
|
52 |
+ request = buildstream_pb2.GetReferenceRequest(key=keys[0])
|
|
53 |
+ service.GetReference(request, context)
|
|
54 |
+ context.set_code.assert_called_once_with(grpc.StatusCode.NOT_FOUND)
|
|
55 |
+ |
|
56 |
+ # Add an ReferenceResult to the cache
|
|
57 |
+ reference_result = remote_execution_pb2.Digest(hash='deckard')
|
|
58 |
+ request = buildstream_pb2.UpdateReferenceRequest(keys=keys,
|
|
59 |
+ digest=reference_result)
|
|
60 |
+ service.UpdateReference(request, context)
|
|
61 |
+ |
|
62 |
+ # Check that fetching it now works
|
|
63 |
+ for key in keys:
|
|
64 |
+ request = buildstream_pb2.GetReferenceRequest(key=key)
|
|
65 |
+ fetched_result = service.GetReference(request, context)
|
|
66 |
+ assert fetched_result.digest == reference_result
|
|
67 |
+ |
|
68 |
+ |
|
69 |
+def test_disabled_update_result(cache, context):
|
|
70 |
+ disabled_push = reference_cache.ReferenceCache(cas, 50, False)
|
|
71 |
+ keys = ["rick", "roy", "rach"]
|
|
72 |
+ service = reference_storage_service.ReferenceStorageService(disabled_push)
|
|
73 |
+ |
|
74 |
+ # Add an ReferenceResult to the cache
|
|
75 |
+ reference_result = remote_execution_pb2.Digest(hash='deckard')
|
|
76 |
+ request = buildstream_pb2.UpdateReferenceRequest(keys=keys,
|
|
77 |
+ digest=reference_result)
|
|
78 |
+ service.UpdateReference(request, context)
|
|
79 |
+ |
|
80 |
+ request = buildstream_pb2.UpdateReferenceRequest()
|
|
81 |
+ service.UpdateReference(request, context)
|
|
82 |
+ |
|
83 |
+ context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
|
|
84 |
+ |
|
85 |
+ |
|
86 |
+@pytest.mark.parametrize("allow_updates", [True, False])
|
|
87 |
+def test_status(allow_updates, context):
|
|
88 |
+ cache = reference_cache.ReferenceCache(cas, 5, allow_updates)
|
|
89 |
+ service = reference_storage_service.ReferenceStorageService(cache)
|
|
90 |
+ |
|
91 |
+ request = buildstream_pb2.StatusRequest()
|
|
92 |
+ response = service.Status(request, context)
|
|
93 |
+ |
|
94 |
+ assert response.allow_updates == allow_updates
|
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+import pytest
|
|
16 |
+ |
|
17 |
+from buildgrid.server.cas.storage import lru_memory_cache
|
|
18 |
+ |
|
19 |
+ |
|
20 |
+@pytest.fixture
|
|
21 |
+def cas():
|
|
22 |
+ return lru_memory_cache.LRUMemoryCache(1024 * 1024)
|
|
23 |
+ |
|
24 |
+ |
|
25 |
+def test_ref_storage():
|
|
26 |
+ pass
|