finn pushed to branch finn/delete-action-cache at BuildGrid / buildgrid
Commits:
-
ea7e2208
by finn at 2018-08-23T10:56:26Z
5 changed files:
- − buildgrid/server/action_cache.py
- buildgrid/server/execution/action_cache_service.py
- buildgrid/server/execution/execution_service.py
- buildgrid/server/execution/operations_service.py
- buildgrid/server/worker/bots_service.py
Changes:
1 |
-# Copyright (C) 2018 Bloomberg LP
|
|
2 |
-#
|
|
3 |
-# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
-# you may not use this file except in compliance with the License.
|
|
5 |
-# You may obtain a copy of the License at
|
|
6 |
-#
|
|
7 |
-# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
-#
|
|
9 |
-# Unless required by applicable law or agreed to in writing, software
|
|
10 |
-# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
-# See the License for the specific language governing permissions and
|
|
13 |
-# limitations under the License.
|
|
14 |
- |
|
15 |
- |
|
16 |
-"""
|
|
17 |
-ActionCache
|
|
18 |
-===========
|
|
19 |
- |
|
20 |
-Implements a simple in-memory action cache.
|
|
21 |
- |
|
22 |
-The action cache maps Action to their corresponding ActionResult. An
|
|
23 |
-ActionResult may be found in cache, for any given Action, if that action has
|
|
24 |
-already been executed.
|
|
25 |
- |
|
26 |
-Note:
|
|
27 |
- Action and ActionResult are referenced by their Digest and mapping is stored
|
|
28 |
- in-memory.
|
|
29 |
-"""
|
|
30 |
- |
|
31 |
-import collections
|
|
32 |
- |
|
33 |
-from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
|
|
34 |
- |
|
35 |
- |
|
36 |
-class ActionCache:
|
|
37 |
- """In-memory Action to ActionResult associative array.
|
|
38 |
- """
|
|
39 |
- |
|
40 |
- def __init__(self, storage, max_cached_actions):
|
|
41 |
- """Initialises a new ActionCache instance.
|
|
42 |
- |
|
43 |
- Args:
|
|
44 |
- storage (StorageABC): storage backend instance to be used.
|
|
45 |
- max_cached_actions (int): maximun number of entries to cache.
|
|
46 |
- """
|
|
47 |
- self._storage = storage
|
|
48 |
- self._max_cached_actions = max_cached_actions
|
|
49 |
- self._digest_map = collections.OrderedDict()
|
|
50 |
- |
|
51 |
- def get_action_result(self, action_digest):
|
|
52 |
- """Retrieves the cached ActionResult for the given Action digest.
|
|
53 |
- |
|
54 |
- Args:
|
|
55 |
- action_digest (Digest): digest of the Action to query.
|
|
56 |
- |
|
57 |
- Returns:
|
|
58 |
- The cached ActionResult matching the given Action digest or None if
|
|
59 |
- the nothing hass been cached yet for that Action.
|
|
60 |
- """
|
|
61 |
- key = (action_digest.hash, action_digest.size_bytes)
|
|
62 |
- if key in self._digest_map:
|
|
63 |
- action_result = self._storage.get_message(self._digest_map[key],
|
|
64 |
- re_pb2.ActionResult)
|
|
65 |
- if action_result is not None:
|
|
66 |
- if self._blobs_still_exist(action_result):
|
|
67 |
- self._digest_map.move_to_end(key)
|
|
68 |
- return action_result
|
|
69 |
- del self._digest_map[key]
|
|
70 |
- return None
|
|
71 |
- |
|
72 |
- def put_action_result(self, action_digest, action_result):
|
|
73 |
- """Stores an ActionResult in cache for the given Action digest.
|
|
74 |
- |
|
75 |
- If the cache size limit has been reached, the oldest cache entries will
|
|
76 |
- be dropped before insertion so that the cache size never exceeds the
|
|
77 |
- maximum numbers of entries allowed.
|
|
78 |
- |
|
79 |
- Args:
|
|
80 |
- action_digest (Digest): digest of the Action to select.
|
|
81 |
- action_result (ActionResult): result object to store.
|
|
82 |
- """
|
|
83 |
- if self._max_cached_actions == 0:
|
|
84 |
- return
|
|
85 |
- |
|
86 |
- while len(self._digest_map) >= self._max_cached_actions:
|
|
87 |
- self._digest_map.popitem(last=False)
|
|
88 |
- |
|
89 |
- key = (action_digest.hash, action_digest.size_bytes)
|
|
90 |
- action_result_digest = self._storage.put_message(action_result)
|
|
91 |
- self._digest_map[key] = action_result_digest
|
|
92 |
- |
|
93 |
- def _blobs_still_exist(self, action_result):
|
|
94 |
- """Checks CAS for ActionResult output blobs existance.
|
|
95 |
- |
|
96 |
- Args:
|
|
97 |
- action_result (ActionResult): ActionResult to search referenced
|
|
98 |
- output blobs for.
|
|
99 |
- |
|
100 |
- Returns:
|
|
101 |
- True if all referenced blobs are present in CAS, False otherwise.
|
|
102 |
- """
|
|
103 |
- blobs_needed = []
|
|
104 |
- |
|
105 |
- for output_file in action_result.output_files:
|
|
106 |
- blobs_needed.append(output_file.digest)
|
|
107 |
- |
|
108 |
- for output_directory in action_result.output_directories:
|
|
109 |
- blobs_needed.append(output_directory.tree_digest)
|
|
110 |
- tree = self._storage.get_message(output_directory.tree_digest,
|
|
111 |
- re_pb2.Tree)
|
|
112 |
- if tree is None:
|
|
113 |
- return False
|
|
114 |
- for file_node in tree.root.files:
|
|
115 |
- blobs_needed.append(file_node.digest)
|
|
116 |
- for child in tree.children:
|
|
117 |
- for file_node in child.files:
|
|
118 |
- blobs_needed.append(file_node.digest)
|
|
119 |
- |
|
120 |
- if action_result.stdout_digest.hash and not action_result.stdout_raw:
|
|
121 |
- blobs_needed.append(action_result.stdout_digest)
|
|
122 |
- if action_result.stderr_digest.hash and not action_result.stderr_raw:
|
|
123 |
- blobs_needed.append(action_result.stderr_digest)
|
|
124 |
- |
|
125 |
- missing = self._storage.missing_blobs(blobs_needed)
|
|
126 |
- return len(missing) == 0
|
... | ... | @@ -24,6 +24,7 @@ import logging |
24 | 24 |
|
25 | 25 |
import grpc
|
26 | 26 |
|
27 |
+from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
|
27 | 28 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
|
28 | 29 |
|
29 | 30 |
from .._exceptions import NotFoundError
|
... | ... | @@ -39,13 +40,19 @@ class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer): |
39 | 40 |
try:
|
40 | 41 |
return self._action_cache.get_action_result(request.action_digest)
|
41 | 42 |
|
42 |
- except NotFoundError:
|
|
43 |
+ except NotFoundError as e:
|
|
44 |
+ self.logger.error(e)
|
|
43 | 45 |
context.set_code(grpc.StatusCode.NOT_FOUND)
|
44 | 46 |
|
47 |
+ return remote_execution_pb2.ActionResult()
|
|
48 |
+ |
|
45 | 49 |
def UpdateActionResult(self, request, context):
|
46 | 50 |
try:
|
47 | 51 |
self._action_cache.update_action_result(request.action_digest, request.action_result)
|
48 | 52 |
return request.action_result
|
49 | 53 |
|
50 |
- except NotImplementedError:
|
|
54 |
+ except NotImplementedError as e:
|
|
55 |
+ self.logger.error(e)
|
|
51 | 56 |
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
57 |
+ |
|
58 |
+ return remote_execution_pb2.ActionResult()
|
... | ... | @@ -28,6 +28,8 @@ import grpc |
28 | 28 |
|
29 | 29 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
|
30 | 30 |
|
31 |
+from buildgrid._protos.google.longrunning import operations_pb2
|
|
32 |
+ |
|
31 | 33 |
from .._exceptions import InvalidArgumentError
|
32 | 34 |
|
33 | 35 |
|
... | ... | @@ -55,11 +57,13 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
55 | 57 |
self.logger.error(e)
|
56 | 58 |
context.set_details(str(e))
|
57 | 59 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
60 |
+ yield operations_pb2.Operation()
|
|
58 | 61 |
|
59 | 62 |
except NotImplementedError as e:
|
60 | 63 |
self.logger.error(e)
|
61 | 64 |
context.set_details(str(e))
|
62 | 65 |
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
66 |
+ yield operations_pb2.Operation()
|
|
63 | 67 |
|
64 | 68 |
def WaitExecution(self, request, context):
|
65 | 69 |
try:
|
... | ... | @@ -77,6 +81,7 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
77 | 81 |
self.logger.error(e)
|
78 | 82 |
context.set_details(str(e))
|
79 | 83 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
84 |
+ yield operations_pb2.Operation()
|
|
80 | 85 |
|
81 | 86 |
def _remove_client(self, operation_name, message_queue):
|
82 | 87 |
self._instance.unregister_message_client(operation_name, message_queue)
|
... | ... | @@ -37,6 +37,7 @@ class OperationsService(operations_pb2_grpc.OperationsServicer): |
37 | 37 |
def GetOperation(self, request, context):
|
38 | 38 |
try:
|
39 | 39 |
return self._instance.get_operation(request.name)
|
40 |
+ |
|
40 | 41 |
except InvalidArgumentError as e:
|
41 | 42 |
self.logger.error(e)
|
42 | 43 |
context.set_details(str(e))
|
... | ... | @@ -52,15 +53,19 @@ class OperationsService(operations_pb2_grpc.OperationsServicer): |
52 | 53 |
def DeleteOperation(self, request, context):
|
53 | 54 |
try:
|
54 | 55 |
return self._instance.delete_operation(request.name)
|
56 |
+ |
|
55 | 57 |
except InvalidArgumentError as e:
|
56 | 58 |
self.logger.error(e)
|
57 | 59 |
context.set_details(str(e))
|
58 | 60 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
61 |
+ return operations_pb2.Operation()
|
|
59 | 62 |
|
60 | 63 |
def CancelOperation(self, request, context):
|
61 | 64 |
try:
|
62 | 65 |
return self._instance.cancel_operation(request.name)
|
66 |
+ |
|
63 | 67 |
except NotImplementedError as e:
|
64 | 68 |
self.logger.error(e)
|
65 | 69 |
context.set_details(str(e))
|
66 | 70 |
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
71 |
+ return operations_pb2.Operation()
|
... | ... | @@ -23,6 +23,9 @@ import logging |
23 | 23 |
|
24 | 24 |
import grpc
|
25 | 25 |
|
26 |
+from google.protobuf.empty_pb2 import Empty
|
|
27 |
+ |
|
28 |
+from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
|
|
26 | 29 |
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc
|
27 | 30 |
|
28 | 31 |
from .._exceptions import InvalidArgumentError, OutofSyncError
|
... | ... | @@ -43,6 +46,8 @@ class BotsService(bots_pb2_grpc.BotsServicer): |
43 | 46 |
context.set_details(str(e))
|
44 | 47 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
45 | 48 |
|
49 |
+ return bots_pb2.BotSession()
|
|
50 |
+ |
|
46 | 51 |
def UpdateBotSession(self, request, context):
|
47 | 52 |
try:
|
48 | 53 |
return self._instance.update_bot_session(request.name,
|
... | ... | @@ -62,5 +67,8 @@ class BotsService(bots_pb2_grpc.BotsServicer): |
62 | 67 |
context.set_details(str(e))
|
63 | 68 |
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
64 | 69 |
|
70 |
+ return bots_pb2.BotSession()
|
|
71 |
+ |
|
65 | 72 |
def PostBotEventTemp(self, request, context):
|
66 | 73 |
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
74 |
+ return Empty()
|