Martin Blanchard pushed to branch mablanch/132-gather-state-metrics at BuildGrid / buildgrid
Commits:
-
07963525
by Raoul Hidalgo Charman at 2018-11-08T13:57:26Z
-
aac2f938
by Raoul Hidalgo Charman at 2018-11-08T13:57:26Z
-
76f2e31f
by Martin Blanchard at 2018-11-08T13:57:26Z
-
dcd82219
by Martin Blanchard at 2018-11-08T14:50:25Z
-
dd572340
by Martin Blanchard at 2018-11-08T16:54:57Z
-
1446a9cd
by Martin Blanchard at 2018-11-09T11:26:17Z
-
f0b591cc
by Martin Blanchard at 2018-11-09T11:26:18Z
-
3ef64559
by Martin Blanchard at 2018-11-09T11:26:18Z
-
fedcd0ff
by Martin Blanchard at 2018-11-09T11:26:18Z
-
aa207582
by Martin Blanchard at 2018-11-09T11:26:18Z
-
26eb97ac
by Martin Blanchard at 2018-11-09T11:26:18Z
-
740d4b70
by Martin Blanchard at 2018-11-09T11:26:18Z
29 changed files:
- − buildgrid/_app/_logging.py
- buildgrid/_app/cli.py
- buildgrid/_app/commands/cmd_server.py
- buildgrid/bot/bot.py
- buildgrid/bot/bot_interface.py
- buildgrid/bot/bot_session.py
- buildgrid/server/actioncache/service.py
- buildgrid/server/bots/instance.py
- buildgrid/server/bots/service.py
- buildgrid/server/cas/instance.py
- buildgrid/server/cas/service.py
- buildgrid/server/cas/storage/disk.py
- buildgrid/server/cas/storage/lru_memory_cache.py
- buildgrid/server/cas/storage/remote.py
- buildgrid/server/cas/storage/s3.py
- buildgrid/server/cas/storage/with_cache.py
- buildgrid/server/controller.py
- buildgrid/server/execution/instance.py
- buildgrid/server/execution/service.py
- buildgrid/server/instance.py
- buildgrid/server/job.py
- buildgrid/server/operations/instance.py
- buildgrid/server/operations/service.py
- buildgrid/server/referencestorage/service.py
- buildgrid/server/referencestorage/storage.py
- buildgrid/server/scheduler.py
- buildgrid/settings.py
- setup.py
- tests/cas/test_services.py
Changes:
1 |
-# Copyright (C) 2018 Bloomberg LP
|
|
2 |
-#
|
|
3 |
-# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
-# you may not use this file except in compliance with the License.
|
|
5 |
-# You may obtain a copy of the License at
|
|
6 |
-#
|
|
7 |
-# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
-#
|
|
9 |
-# Unless required by applicable law or agreed to in writing, software
|
|
10 |
-# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
-# See the License for the specific language governing permissions and
|
|
13 |
-# limitations under the License.
|
|
14 |
- |
|
15 |
- |
|
16 |
-import logging
|
|
17 |
- |
|
18 |
- |
|
19 |
-def bgd_logger():
|
|
20 |
- formatter = logging.Formatter(
|
|
21 |
- fmt='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
22 |
- )
|
|
23 |
- |
|
24 |
- logger = logging.getLogger()
|
|
25 |
- logger.setLevel(logging.INFO)
|
|
26 |
- |
|
27 |
- handler = logging.StreamHandler()
|
|
28 |
- handler.setFormatter(formatter)
|
|
29 |
- |
|
30 |
- logger.addHandler(handler)
|
|
31 |
- |
|
32 |
- return logger
|
... | ... | @@ -21,16 +21,14 @@ Any files in the commands/ folder with the name cmd_*.py |
21 | 21 |
will be attempted to be imported.
|
22 | 22 |
"""
|
23 | 23 |
|
24 |
-import os
|
|
25 | 24 |
import logging
|
25 |
+import os
|
|
26 | 26 |
|
27 | 27 |
import click
|
28 | 28 |
import grpc
|
29 | 29 |
|
30 | 30 |
from buildgrid.utils import read_file
|
31 | 31 |
|
32 |
-from . import _logging
|
|
33 |
- |
|
34 | 32 |
CONTEXT_SETTINGS = dict(auto_envvar_prefix='BUILDGRID')
|
35 | 33 |
|
36 | 34 |
|
... | ... | @@ -141,12 +139,27 @@ class BuildGridCLI(click.MultiCommand): |
141 | 139 |
|
142 | 140 |
|
143 | 141 |
@click.command(cls=BuildGridCLI, context_settings=CONTEXT_SETTINGS)
|
144 |
-@click.option('-v', '--verbose', is_flag=True,
|
|
145 |
- help='Enables verbose mode.')
|
|
142 |
+@click.option('-v', '--verbose', count=True,
|
|
143 |
+ help='Increase log verbosity level.')
|
|
146 | 144 |
@pass_context
|
147 | 145 |
def cli(context, verbose):
|
148 | 146 |
"""BuildGrid App"""
|
149 |
- logger = _logging.bgd_logger()
|
|
150 |
- context.verbose = verbose
|
|
151 |
- if verbose:
|
|
147 |
+ logger = logging.getLogger()
|
|
148 |
+ |
|
149 |
+ # Clean-up root logger for any pre-configuration:
|
|
150 |
+ for log_handler in logger.handlers[:]:
|
|
151 |
+ logger.removeHandler(log_handler)
|
|
152 |
+ for log_filter in logger.filters[:]:
|
|
153 |
+ logger.removeFilter(log_filter)
|
|
154 |
+ |
|
155 |
+ logging.basicConfig(
|
|
156 |
+ format='%(asctime)s:%(name)32.32s][%(levelname)5.5s]: %(message)s')
|
|
157 |
+ |
|
158 |
+ if verbose == 1:
|
|
159 |
+ logger.setLevel(logging.WARNING)
|
|
160 |
+ elif verbose == 2:
|
|
161 |
+ logger.setLevel(logging.INFO)
|
|
162 |
+ elif verbose >= 3:
|
|
152 | 163 |
logger.setLevel(logging.DEBUG)
|
164 |
+ else:
|
|
165 |
+ logger.setLevel(logging.ERROR)
|
... | ... | @@ -20,7 +20,6 @@ Server command |
20 | 20 |
Create a BuildGrid server.
|
21 | 21 |
"""
|
22 | 22 |
|
23 |
-import asyncio
|
|
24 | 23 |
import logging
|
25 | 24 |
import sys
|
26 | 25 |
|
... | ... | @@ -52,18 +51,14 @@ def start(context, config): |
52 | 51 |
click.echo("ERROR: Could not parse config: {}.\n".format(str(e)), err=True)
|
53 | 52 |
sys.exit(-1)
|
54 | 53 |
|
55 |
- loop = asyncio.get_event_loop()
|
|
56 | 54 |
try:
|
57 | 55 |
server.start()
|
58 |
- loop.run_forever()
|
|
59 | 56 |
|
60 | 57 |
except KeyboardInterrupt:
|
61 | 58 |
pass
|
62 | 59 |
|
63 | 60 |
finally:
|
64 |
- context.logger.info("Stopping server")
|
|
65 | 61 |
server.stop()
|
66 |
- loop.close()
|
|
67 | 62 |
|
68 | 63 |
|
69 | 64 |
def _create_server_from_config(config):
|
... | ... | @@ -30,7 +30,7 @@ class Bot: |
30 | 30 |
"""
|
31 | 31 |
|
32 | 32 |
def __init__(self, bot_session, update_period=1):
|
33 |
- self.logger = logging.getLogger(__name__)
|
|
33 |
+ self.__logger = logging.getLogger(__name__)
|
|
34 | 34 |
|
35 | 35 |
self._bot_session = bot_session
|
36 | 36 |
self._update_period = update_period
|
... | ... | @@ -31,8 +31,8 @@ class BotInterface: |
31 | 31 |
"""
|
32 | 32 |
|
33 | 33 |
def __init__(self, channel):
|
34 |
- self.logger = logging.getLogger(__name__)
|
|
35 |
- self.logger.info(channel)
|
|
34 |
+ self.__logger = logging.getLogger(__name__)
|
|
35 |
+ |
|
36 | 36 |
self._stub = bots_pb2_grpc.BotsStub(channel)
|
37 | 37 |
|
38 | 38 |
def create_bot_session(self, parent, bot_session):
|
... | ... | @@ -43,8 +43,7 @@ class BotSession: |
43 | 43 |
If a bot attempts to update an invalid session, it must be rejected and
|
44 | 44 |
may be put in quarantine.
|
45 | 45 |
"""
|
46 |
- |
|
47 |
- self.logger = logging.getLogger(__name__)
|
|
46 |
+ self.__logger = logging.getLogger(__name__)
|
|
48 | 47 |
|
49 | 48 |
self._bot_id = '{}.{}'.format(parent, platform.node())
|
50 | 49 |
self._context = None
|
... | ... | @@ -64,20 +63,20 @@ class BotSession: |
64 | 63 |
self._worker = worker
|
65 | 64 |
|
66 | 65 |
def create_bot_session(self, work, context=None):
|
67 |
- self.logger.debug("Creating bot session")
|
|
66 |
+ self.__logger.debug("Creating bot session")
|
|
68 | 67 |
self._work = work
|
69 | 68 |
self._context = context
|
70 | 69 |
|
71 | 70 |
session = self._interface.create_bot_session(self._parent, self.get_pb2())
|
72 | 71 |
self._name = session.name
|
73 | 72 |
|
74 |
- self.logger.info("Created bot session with name: [{}]".format(self._name))
|
|
73 |
+ self.__logger.info("Created bot session with name: [%s]", self._name)
|
|
75 | 74 |
|
76 | 75 |
for lease in session.leases:
|
77 | 76 |
self._update_lease_from_server(lease)
|
78 | 77 |
|
79 | 78 |
def update_bot_session(self):
|
80 |
- self.logger.debug("Updating bot session: [{}]".format(self._bot_id))
|
|
79 |
+ self.__logger.debug("Updating bot session: [%s]", self._bot_id)
|
|
81 | 80 |
session = self._interface.update_bot_session(self.get_pb2())
|
82 | 81 |
for k, v in list(self._leases.items()):
|
83 | 82 |
if v.state == LeaseState.COMPLETED.value:
|
... | ... | @@ -113,25 +112,25 @@ class BotSession: |
113 | 112 |
asyncio.ensure_future(self.create_work(lease))
|
114 | 113 |
|
115 | 114 |
async def create_work(self, lease):
|
116 |
- self.logger.debug("Work created: [{}]".format(lease.id))
|
|
115 |
+ self.__logger.debug("Work created: [%s]", lease.id)
|
|
117 | 116 |
loop = asyncio.get_event_loop()
|
118 | 117 |
|
119 | 118 |
try:
|
120 | 119 |
lease = await loop.run_in_executor(None, self._work, self._context, lease)
|
121 | 120 |
|
122 | 121 |
except grpc.RpcError as e:
|
123 |
- self.logger.error("RPC error thrown: [{}]".format(e))
|
|
122 |
+ self.__logger.error(e)
|
|
124 | 123 |
lease.status.CopyFrom(e.code())
|
125 | 124 |
|
126 | 125 |
except BotError as e:
|
127 |
- self.logger.error("Internal bot error thrown: [{}]".format(e))
|
|
126 |
+ self.__logger.error(e)
|
|
128 | 127 |
lease.status.code = code_pb2.INTERNAL
|
129 | 128 |
|
130 | 129 |
except Exception as e:
|
131 |
- self.logger.error("Exception thrown: [{}]".format(e))
|
|
130 |
+ self.__logger.error(e)
|
|
132 | 131 |
lease.status.code = code_pb2.INTERNAL
|
133 | 132 |
|
134 |
- self.logger.debug("Work complete: [{}]".format(lease.id))
|
|
133 |
+ self.__logger.debug("Work complete: [%s]", lease.id)
|
|
135 | 134 |
self.lease_completed(lease)
|
136 | 135 |
|
137 | 136 |
|
... | ... | @@ -32,7 +32,7 @@ from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_p |
32 | 32 |
class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer):
|
33 | 33 |
|
34 | 34 |
def __init__(self, server):
|
35 |
- self.logger = logging.getLogger(__name__)
|
|
35 |
+ self.__logger = logging.getLogger(__name__)
|
|
36 | 36 |
|
37 | 37 |
self._instances = {}
|
38 | 38 |
|
... | ... | @@ -42,34 +42,38 @@ class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer): |
42 | 42 |
self._instances[name] = instance
|
43 | 43 |
|
44 | 44 |
def GetActionResult(self, request, context):
|
45 |
+ self.__logger.debug("GetActionResult request from [%s]", context.peer())
|
|
46 |
+ |
|
45 | 47 |
try:
|
46 | 48 |
instance = self._get_instance(request.instance_name)
|
47 | 49 |
return instance.get_action_result(request.action_digest)
|
48 | 50 |
|
49 | 51 |
except InvalidArgumentError as e:
|
50 |
- self.logger.error(e)
|
|
52 |
+ self.__logger.error(e)
|
|
51 | 53 |
context.set_details(str(e))
|
52 | 54 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
53 | 55 |
|
54 | 56 |
except NotFoundError as e:
|
55 |
- self.logger.debug(e)
|
|
57 |
+ self.__logger.debug(e)
|
|
56 | 58 |
context.set_code(grpc.StatusCode.NOT_FOUND)
|
57 | 59 |
|
58 | 60 |
return remote_execution_pb2.ActionResult()
|
59 | 61 |
|
60 | 62 |
def UpdateActionResult(self, request, context):
|
63 |
+ self.__logger.debug("UpdateActionResult request from [%s]", context.peer())
|
|
64 |
+ |
|
61 | 65 |
try:
|
62 | 66 |
instance = self._get_instance(request.instance_name)
|
63 | 67 |
instance.update_action_result(request.action_digest, request.action_result)
|
64 | 68 |
return request.action_result
|
65 | 69 |
|
66 | 70 |
except InvalidArgumentError as e:
|
67 |
- self.logger.error(e)
|
|
71 |
+ self.__logger.error(e)
|
|
68 | 72 |
context.set_details(str(e))
|
69 | 73 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
70 | 74 |
|
71 | 75 |
except NotImplementedError as e:
|
72 |
- self.logger.error(e)
|
|
76 |
+ self.__logger.error(e)
|
|
73 | 77 |
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
74 | 78 |
|
75 | 79 |
return remote_execution_pb2.ActionResult()
|
... | ... | @@ -31,7 +31,7 @@ from ..job import LeaseState |
31 | 31 |
class BotsInterface:
|
32 | 32 |
|
33 | 33 |
def __init__(self, scheduler):
|
34 |
- self.logger = logging.getLogger(__name__)
|
|
34 |
+ self.__logger = logging.getLogger(__name__)
|
|
35 | 35 |
|
36 | 36 |
self._bot_ids = {}
|
37 | 37 |
self._bot_sessions = {}
|
... | ... | @@ -64,7 +64,7 @@ class BotsInterface: |
64 | 64 |
|
65 | 65 |
self._bot_ids[name] = bot_id
|
66 | 66 |
self._bot_sessions[name] = bot_session
|
67 |
- self.logger.info("Created bot session name=[{}] with bot_id=[{}]".format(name, bot_id))
|
|
67 |
+ self.__logger.info("Created bot session name=[%s] with bot_id=[%s]", name, bot_id)
|
|
68 | 68 |
|
69 | 69 |
# TODO: Send worker capabilities to the scheduler!
|
70 | 70 |
leases = self._scheduler.request_job_leases({})
|
... | ... | @@ -77,7 +77,7 @@ class BotsInterface: |
77 | 77 |
""" Client updates the server. Any changes in state to the Lease should be
|
78 | 78 |
registered server side. Assigns available leases with work.
|
79 | 79 |
"""
|
80 |
- self.logger.debug("Updating bot session name={}".format(name))
|
|
80 |
+ self.__logger.debug("Updating bot session name=[%s]", name)
|
|
81 | 81 |
self._check_bot_ids(bot_session.bot_id, name)
|
82 | 82 |
|
83 | 83 |
leases = filter(None, [self.check_states(lease) for lease in bot_session.leases])
|
... | ... | @@ -173,12 +173,12 @@ class BotsInterface: |
173 | 173 |
if bot_id is None:
|
174 | 174 |
raise InvalidArgumentError("Bot id does not exist: [{}]".format(name))
|
175 | 175 |
|
176 |
- self.logger.debug("Attempting to close [{}] with name: [{}]".format(bot_id, name))
|
|
176 |
+ self.__logger.debug("Attempting to close [%s] with name: [%s]", bot_id, name)
|
|
177 | 177 |
for lease in self._bot_sessions[name].leases:
|
178 | 178 |
if lease.state != LeaseState.COMPLETED.value:
|
179 | 179 |
# TODO: Be wary here, may need to handle rejected leases in future
|
180 | 180 |
self._scheduler.retry_job(lease.id)
|
181 | 181 |
|
182 |
- self.logger.debug("Closing bot session: [{}]".format(name))
|
|
182 |
+ self.__logger.debug("Closing bot session: [%s]", name)
|
|
183 | 183 |
self._bot_ids.pop(name)
|
184 |
- self.logger.info("Closed bot [{}] with name: [{}]".format(bot_id, name))
|
|
184 |
+ self.__logger.info("Closed bot [%s] with name: [%s]", bot_id, name)
|
... | ... | @@ -23,8 +23,9 @@ import logging |
23 | 23 |
|
24 | 24 |
import grpc
|
25 | 25 |
|
26 |
-from google.protobuf.empty_pb2 import Empty
|
|
26 |
+from google.protobuf import empty_pb2, timestamp_pb2
|
|
27 | 27 |
|
28 |
+from buildgrid._enums import BotStatus
|
|
28 | 29 |
from buildgrid._exceptions import InvalidArgumentError, OutOfSyncError
|
29 | 30 |
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
|
30 | 31 |
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc
|
... | ... | @@ -32,61 +33,152 @@ from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grp |
32 | 33 |
|
33 | 34 |
class BotsService(bots_pb2_grpc.BotsServicer):
|
34 | 35 |
|
35 |
- def __init__(self, server):
|
|
36 |
- self.logger = logging.getLogger(__name__)
|
|
36 |
+ def __init__(self, server, monitor=True):
|
|
37 |
+ self.__logger = logging.getLogger(__name__)
|
|
38 |
+ |
|
39 |
+ self.__bots_by_status = {}
|
|
40 |
+ self.__bots_by_instance = {}
|
|
41 |
+ self.__bots = {}
|
|
37 | 42 |
|
38 | 43 |
self._instances = {}
|
44 |
+ self._is_monitored = True
|
|
39 | 45 |
|
40 | 46 |
bots_pb2_grpc.add_BotsServicer_to_server(self, server)
|
41 | 47 |
|
42 |
- def add_instance(self, name, instance):
|
|
43 |
- self._instances[name] = instance
|
|
48 |
+ if self._is_monitored:
|
|
49 |
+ self.__bots_by_status[BotStatus.OK] = set()
|
|
50 |
+ self.__bots_by_status[BotStatus.UNHEALTHY] = set()
|
|
51 |
+ self.__bots_by_status[BotStatus.HOST_REBOOTING] = set()
|
|
52 |
+ self.__bots_by_status[BotStatus.BOT_TERMINATING] = set()
|
|
53 |
+ |
|
54 |
+ # --- Public API ---
|
|
55 |
+ |
|
56 |
+ def add_instance(self, instance_name, instance):
|
|
57 |
+ self._instances[instance_name] = instance
|
|
58 |
+ |
|
59 |
+ if self._is_monitored:
|
|
60 |
+ self.__bots_by_instance[instance_name] = 0
|
|
61 |
+ |
|
62 |
+ # --- Public API: Servicer ---
|
|
44 | 63 |
|
45 | 64 |
def CreateBotSession(self, request, context):
|
65 |
+ """Handles CreateBotSessionRequest messages.
|
|
66 |
+ |
|
67 |
+ Args:
|
|
68 |
+ request (CreateBotSessionRequest): The incoming RPC request.
|
|
69 |
+ context (grpc.ServicerContext): Context for the RPC call.
|
|
70 |
+ """
|
|
71 |
+ self.__logger.debug("CreateBotSession request from [%s]", context.peer())
|
|
72 |
+ |
|
73 |
+ instance_name = request.parent
|
|
74 |
+ bot_status = BotStatus(request.bot_session.status)
|
|
75 |
+ bot_id = request.bot_session.bot_id
|
|
76 |
+ |
|
46 | 77 |
try:
|
47 |
- parent = request.parent
|
|
48 |
- instance = self._get_instance(request.parent)
|
|
49 |
- return instance.create_bot_session(parent,
|
|
50 |
- request.bot_session)
|
|
78 |
+ instance = self._get_instance(instance_name)
|
|
79 |
+ bot_session = instance.create_bot_session(instance_name,
|
|
80 |
+ request.bot_session)
|
|
81 |
+ now = timestamp_pb2.Timestamp()
|
|
82 |
+ now.GetCurrentTime()
|
|
83 |
+ |
|
84 |
+ if self._is_monitored:
|
|
85 |
+ self.__bots[bot_id] = now
|
|
86 |
+ self.__bots_by_instance[instance_name] += 1
|
|
87 |
+ self.__bots_by_status[bot_status].add(bot_id)
|
|
88 |
+ |
|
89 |
+ return bot_session
|
|
51 | 90 |
|
52 | 91 |
except InvalidArgumentError as e:
|
53 |
- self.logger.error(e)
|
|
92 |
+ self.__logger.error(e)
|
|
54 | 93 |
context.set_details(str(e))
|
55 | 94 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
56 | 95 |
|
57 | 96 |
return bots_pb2.BotSession()
|
58 | 97 |
|
59 | 98 |
def UpdateBotSession(self, request, context):
|
99 |
+ """Handles UpdateBotSessionRequest messages.
|
|
100 |
+ |
|
101 |
+ Args:
|
|
102 |
+ request (UpdateBotSessionRequest): The incoming RPC request.
|
|
103 |
+ context (grpc.ServicerContext): Context for the RPC call.
|
|
104 |
+ """
|
|
105 |
+ self.__logger.debug("UpdateBotSession request from [%s]", context.peer())
|
|
106 |
+ |
|
107 |
+ names = request.name.split("/")
|
|
108 |
+ bot_status = BotStatus(request.bot_session.status)
|
|
109 |
+ bot_id = request.bot_session.bot_id
|
|
110 |
+ |
|
60 | 111 |
try:
|
61 |
- names = request.name.split("/")
|
|
62 |
- # Operation name should be in format:
|
|
63 |
- # {instance/name}/{uuid}
|
|
64 |
- instance_name = ''.join(names[0:-1])
|
|
112 |
+ instance_name = '/'.join(names[:-1])
|
|
65 | 113 |
|
66 | 114 |
instance = self._get_instance(instance_name)
|
67 |
- return instance.update_bot_session(request.name,
|
|
68 |
- request.bot_session)
|
|
115 |
+ bot_session = instance.update_bot_session(request.name,
|
|
116 |
+ request.bot_session)
|
|
117 |
+ |
|
118 |
+ if self._is_monitored:
|
|
119 |
+ self.__bots[bot_id].GetCurrentTime()
|
|
120 |
+ if bot_id not in self.__bots_by_status[bot_status]:
|
|
121 |
+ self.__bots_by_status[BotStatus.OK].discard(bot_id)
|
|
122 |
+ self.__bots_by_status[BotStatus.UNHEALTHY].discard(bot_id)
|
|
123 |
+ self.__bots_by_status[BotStatus.HOST_REBOOTING].discard(bot_id)
|
|
124 |
+ self.__bots_by_status[BotStatus.BOT_TERMINATING].discard(bot_id)
|
|
125 |
+ |
|
126 |
+ self.__bots_by_status[bot_status].add(bot_id)
|
|
127 |
+ |
|
128 |
+ return bot_session
|
|
69 | 129 |
|
70 | 130 |
except InvalidArgumentError as e:
|
71 |
- self.logger.error(e)
|
|
131 |
+ self.__logger.error(e)
|
|
72 | 132 |
context.set_details(str(e))
|
73 | 133 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
74 | 134 |
|
75 | 135 |
except OutOfSyncError as e:
|
76 |
- self.logger.error(e)
|
|
136 |
+ self.__logger.error(e)
|
|
77 | 137 |
context.set_details(str(e))
|
78 | 138 |
context.set_code(grpc.StatusCode.DATA_LOSS)
|
79 | 139 |
|
80 | 140 |
except NotImplementedError as e:
|
81 |
- self.logger.error(e)
|
|
141 |
+ self.__logger.error(e)
|
|
82 | 142 |
context.set_details(str(e))
|
83 | 143 |
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
84 | 144 |
|
85 | 145 |
return bots_pb2.BotSession()
|
86 | 146 |
|
87 | 147 |
def PostBotEventTemp(self, request, context):
|
148 |
+ """Handles PostBotEventTempRequest messages.
|
|
149 |
+ |
|
150 |
+ Args:
|
|
151 |
+ request (PostBotEventTempRequest): The incoming RPC request.
|
|
152 |
+ context (grpc.ServicerContext): Context for the RPC call.
|
|
153 |
+ """
|
|
154 |
+ self.__logger.debug("PostBotEventTemp request from [%s]", context.peer())
|
|
155 |
+ |
|
88 | 156 |
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
89 |
- return Empty()
|
|
157 |
+ |
|
158 |
+ return empty_pb2.Empty()
|
|
159 |
+ |
|
160 |
+ # --- Public API: Monitoring ---
|
|
161 |
+ |
|
162 |
+ @property
|
|
163 |
+ def is_monitored(self):
|
|
164 |
+ return self._is_monitored
|
|
165 |
+ |
|
166 |
+ def query_n_bots(self):
|
|
167 |
+ return len(self.__bots)
|
|
168 |
+ |
|
169 |
+ def query_n_bots_for_instance(self, instance_name):
|
|
170 |
+ try:
|
|
171 |
+ return self.__bots_by_instance[instance_name]
|
|
172 |
+ except KeyError:
|
|
173 |
+ return 0
|
|
174 |
+ |
|
175 |
+ def query_n_bots_for_status(self, bot_status):
|
|
176 |
+ try:
|
|
177 |
+ return len(self.__bots_by_status[bot_status])
|
|
178 |
+ except KeyError:
|
|
179 |
+ return 0
|
|
180 |
+ |
|
181 |
+ # --- Private API ---
|
|
90 | 182 |
|
91 | 183 |
def _get_instance(self, name):
|
92 | 184 |
try:
|
... | ... | @@ -19,6 +19,8 @@ Storage Instances |
19 | 19 |
Instances of CAS and ByteStream
|
20 | 20 |
"""
|
21 | 21 |
|
22 |
+import logging
|
|
23 |
+ |
|
22 | 24 |
from buildgrid._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
|
23 | 25 |
from buildgrid._protos.google.bytestream import bytestream_pb2
|
24 | 26 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
|
... | ... | @@ -28,6 +30,8 @@ from buildgrid.settings import HASH |
28 | 30 |
class ContentAddressableStorageInstance:
|
29 | 31 |
|
30 | 32 |
def __init__(self, storage):
|
33 |
+ self.__logger = logging.getLogger(__name__)
|
|
34 |
+ |
|
31 | 35 |
self._storage = storage
|
32 | 36 |
|
33 | 37 |
def register_instance_with_server(self, instance_name, server):
|
... | ... | @@ -60,6 +64,8 @@ class ByteStreamInstance: |
60 | 64 |
BLOCK_SIZE = 1 * 1024 * 1024 # 1 MB block size
|
61 | 65 |
|
62 | 66 |
def __init__(self, storage):
|
67 |
+ self.__logger = logging.getLogger(__name__)
|
|
68 |
+ |
|
63 | 69 |
self._storage = storage
|
64 | 70 |
|
65 | 71 |
def register_instance_with_server(self, instance_name, server):
|
... | ... | @@ -35,7 +35,7 @@ from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_p |
35 | 35 |
class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
|
36 | 36 |
|
37 | 37 |
def __init__(self, server):
|
38 |
- self.logger = logging.getLogger(__name__)
|
|
38 |
+ self.__logger = logging.getLogger(__name__)
|
|
39 | 39 |
|
40 | 40 |
self._instances = {}
|
41 | 41 |
|
... | ... | @@ -45,42 +45,48 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa |
45 | 45 |
self._instances[name] = instance
|
46 | 46 |
|
47 | 47 |
def FindMissingBlobs(self, request, context):
|
48 |
+ self.__logger.debug("FindMissingBlobs request from [%s]", context.peer())
|
|
49 |
+ |
|
48 | 50 |
try:
|
49 |
- self.logger.debug("FindMissingBlobs request: [{}]".format(request))
|
|
50 | 51 |
instance = self._get_instance(request.instance_name)
|
51 | 52 |
response = instance.find_missing_blobs(request.blob_digests)
|
52 |
- self.logger.debug("FindMissingBlobs response: [{}]".format(response))
|
|
53 |
+ |
|
53 | 54 |
return response
|
54 | 55 |
|
55 | 56 |
except InvalidArgumentError as e:
|
56 |
- self.logger.error(e)
|
|
57 |
+ self.__logger.error(e)
|
|
57 | 58 |
context.set_details(str(e))
|
58 | 59 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
59 | 60 |
|
60 | 61 |
return remote_execution_pb2.FindMissingBlobsResponse()
|
61 | 62 |
|
62 | 63 |
def BatchUpdateBlobs(self, request, context):
|
64 |
+ self.__logger.debug("BatchUpdateBlobs request from [%s]", context.peer())
|
|
65 |
+ |
|
63 | 66 |
try:
|
64 |
- self.logger.debug("BatchUpdateBlobs request: [{}]".format(request))
|
|
65 | 67 |
instance = self._get_instance(request.instance_name)
|
66 | 68 |
response = instance.batch_update_blobs(request.requests)
|
67 |
- self.logger.debug("FindMissingBlobs response: [{}]".format(response))
|
|
69 |
+ |
|
68 | 70 |
return response
|
69 | 71 |
|
70 | 72 |
except InvalidArgumentError as e:
|
71 |
- self.logger.error(e)
|
|
73 |
+ self.__logger.error(e)
|
|
72 | 74 |
context.set_details(str(e))
|
73 | 75 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
74 | 76 |
|
75 | 77 |
return remote_execution_pb2.BatchReadBlobsResponse()
|
76 | 78 |
|
77 | 79 |
def BatchReadBlobs(self, request, context):
|
80 |
+ self.__logger.debug("BatchReadBlobs request from [%s]", context.peer())
|
|
81 |
+ |
|
78 | 82 |
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
79 | 83 |
context.set_details('Method not implemented!')
|
80 | 84 |
|
81 | 85 |
return remote_execution_pb2.BatchReadBlobsResponse()
|
82 | 86 |
|
83 | 87 |
def GetTree(self, request, context):
|
88 |
+ self.__logger.debug("GetTree request from [%s]", context.peer())
|
|
89 |
+ |
|
84 | 90 |
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
85 | 91 |
context.set_details('Method not implemented!')
|
86 | 92 |
|
... | ... | @@ -97,7 +103,7 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa |
97 | 103 |
class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
|
98 | 104 |
|
99 | 105 |
def __init__(self, server):
|
100 |
- self.logger = logging.getLogger(__name__)
|
|
106 |
+ self.__logger = logging.getLogger(__name__)
|
|
101 | 107 |
|
102 | 108 |
self._instances = {}
|
103 | 109 |
|
... | ... | @@ -107,8 +113,9 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer): |
107 | 113 |
self._instances[name] = instance
|
108 | 114 |
|
109 | 115 |
def Read(self, request, context):
|
116 |
+ self.__logger.debug("Read request from [%s]", context.peer())
|
|
117 |
+ |
|
110 | 118 |
try:
|
111 |
- self.logger.debug("Read request: [{}]".format(request))
|
|
112 | 119 |
path = request.resource_name.split("/")
|
113 | 120 |
instance_name = path[0]
|
114 | 121 |
|
... | ... | @@ -131,30 +138,29 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer): |
131 | 138 |
request.read_limit)
|
132 | 139 |
|
133 | 140 |
except InvalidArgumentError as e:
|
134 |
- self.logger.error(e)
|
|
141 |
+ self.__logger.error(e)
|
|
135 | 142 |
context.set_details(str(e))
|
136 | 143 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
137 | 144 |
yield bytestream_pb2.ReadResponse()
|
138 | 145 |
|
139 | 146 |
except NotFoundError as e:
|
140 |
- self.logger.error(e)
|
|
147 |
+ self.__logger.error(e)
|
|
141 | 148 |
context.set_details(str(e))
|
142 | 149 |
context.set_code(grpc.StatusCode.NOT_FOUND)
|
143 | 150 |
yield bytestream_pb2.ReadResponse()
|
144 | 151 |
|
145 | 152 |
except OutOfRangeError as e:
|
146 |
- self.logger.error(e)
|
|
153 |
+ self.__logger.error(e)
|
|
147 | 154 |
context.set_details(str(e))
|
148 | 155 |
context.set_code(grpc.StatusCode.OUT_OF_RANGE)
|
149 | 156 |
yield bytestream_pb2.ReadResponse()
|
150 | 157 |
|
151 |
- self.logger.debug("Read finished.")
|
|
152 |
- |
|
153 | 158 |
def Write(self, requests, context):
|
159 |
+ self.__logger.debug("Write request from [%s]", context.peer())
|
|
160 |
+ |
|
154 | 161 |
try:
|
155 | 162 |
requests, request_probe = tee(requests, 2)
|
156 | 163 |
first_request = next(request_probe)
|
157 |
- self.logger.debug("First write request: [{}]".format(first_request))
|
|
158 | 164 |
|
159 | 165 |
path = first_request.resource_name.split("/")
|
160 | 166 |
|
... | ... | @@ -175,21 +181,21 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer): |
175 | 181 |
|
176 | 182 |
instance = self._get_instance(instance_name)
|
177 | 183 |
response = instance.write(requests)
|
178 |
- self.logger.debug("Write response: [{}]".format(response))
|
|
184 |
+ |
|
179 | 185 |
return response
|
180 | 186 |
|
181 | 187 |
except NotImplementedError as e:
|
182 |
- self.logger.error(e)
|
|
188 |
+ self.__logger.error(e)
|
|
183 | 189 |
context.set_details(str(e))
|
184 | 190 |
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
185 | 191 |
|
186 | 192 |
except InvalidArgumentError as e:
|
187 |
- self.logger.error(e)
|
|
193 |
+ self.__logger.error(e)
|
|
188 | 194 |
context.set_details(str(e))
|
189 | 195 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
190 | 196 |
|
191 | 197 |
except NotFoundError as e:
|
192 |
- self.logger.error(e)
|
|
198 |
+ self.__logger.error(e)
|
|
193 | 199 |
context.set_details(str(e))
|
194 | 200 |
context.set_code(grpc.StatusCode.NOT_FOUND)
|
195 | 201 |
|
... | ... | @@ -20,6 +20,7 @@ DiskStorage |
20 | 20 |
A CAS storage provider that stores files as blobs on disk.
|
21 | 21 |
"""
|
22 | 22 |
|
23 |
+import logging
|
|
23 | 24 |
import os
|
24 | 25 |
import tempfile
|
25 | 26 |
|
... | ... | @@ -29,6 +30,8 @@ from .storage_abc import StorageABC |
29 | 30 |
class DiskStorage(StorageABC):
|
30 | 31 |
|
31 | 32 |
def __init__(self, path):
|
33 |
+ self.__logger = logging.getLogger(__name__)
|
|
34 |
+ |
|
32 | 35 |
if not os.path.isabs(path):
|
33 | 36 |
self.__root_path = os.path.abspath(path)
|
34 | 37 |
else:
|
... | ... | @@ -43,6 +43,8 @@ class _NullBytesIO(io.BufferedIOBase): |
43 | 43 |
class LRUMemoryCache(StorageABC):
|
44 | 44 |
|
45 | 45 |
def __init__(self, limit):
|
46 |
+ self.__logger = logging.getLogger(__name__)
|
|
47 |
+ |
|
46 | 48 |
self._limit = limit
|
47 | 49 |
self._storage = collections.OrderedDict()
|
48 | 50 |
self._bytes_stored = 0
|
... | ... | @@ -35,7 +35,7 @@ from .storage_abc import StorageABC |
35 | 35 |
class RemoteStorage(StorageABC):
|
36 | 36 |
|
37 | 37 |
def __init__(self, channel, instance_name):
|
38 |
- self.logger = logging.getLogger(__name__)
|
|
38 |
+ self.__logger = logging.getLogger(__name__)
|
|
39 | 39 |
|
40 | 40 |
self.instance_name = instance_name
|
41 | 41 |
self.channel = channel
|
... | ... | @@ -21,6 +21,7 @@ A storage provider that stores data in an Amazon S3 bucket. |
21 | 21 |
"""
|
22 | 22 |
|
23 | 23 |
import io
|
24 |
+import logging
|
|
24 | 25 |
|
25 | 26 |
import boto3
|
26 | 27 |
from botocore.exceptions import ClientError
|
... | ... | @@ -31,6 +32,8 @@ from .storage_abc import StorageABC |
31 | 32 |
class S3Storage(StorageABC):
|
32 | 33 |
|
33 | 34 |
def __init__(self, bucket, **kwargs):
|
35 |
+ self.__logger = logging.getLogger(__name__)
|
|
36 |
+ |
|
34 | 37 |
self._bucket = bucket
|
35 | 38 |
self._s3 = boto3.resource('s3', **kwargs)
|
36 | 39 |
|
... | ... | @@ -26,6 +26,7 @@ the fallback. |
26 | 26 |
"""
|
27 | 27 |
|
28 | 28 |
import io
|
29 |
+import logging
|
|
29 | 30 |
|
30 | 31 |
from .storage_abc import StorageABC
|
31 | 32 |
|
... | ... | @@ -118,6 +119,8 @@ class _CachingTee(io.RawIOBase): |
118 | 119 |
class WithCacheStorage(StorageABC):
|
119 | 120 |
|
120 | 121 |
def __init__(self, cache, fallback):
|
122 |
+ self.__logger = logging.getLogger(__name__)
|
|
123 |
+ |
|
121 | 124 |
self._cache = cache
|
122 | 125 |
self._fallback = fallback
|
123 | 126 |
|
... | ... | @@ -37,9 +37,9 @@ from .operations.instance import OperationsInstance |
37 | 37 |
class ExecutionController:
|
38 | 38 |
|
39 | 39 |
def __init__(self, action_cache=None, storage=None):
|
40 |
- scheduler = Scheduler(action_cache)
|
|
40 |
+ self.__logger = logging.getLogger(__name__)
|
|
41 | 41 |
|
42 |
- self.logger = logging.getLogger(__name__)
|
|
42 |
+ scheduler = Scheduler(action_cache)
|
|
43 | 43 |
|
44 | 44 |
self._execution_instance = ExecutionInstance(scheduler, storage)
|
45 | 45 |
self._bots_interface = BotsInterface(scheduler)
|
... | ... | @@ -30,7 +30,8 @@ from ..job import Job |
30 | 30 |
class ExecutionInstance:
|
31 | 31 |
|
32 | 32 |
def __init__(self, scheduler, storage):
|
33 |
- self.logger = logging.getLogger(__name__)
|
|
33 |
+ self.__logger = logging.getLogger(__name__)
|
|
34 |
+ |
|
34 | 35 |
self._storage = storage
|
35 | 36 |
self._scheduler = scheduler
|
36 | 37 |
|
... | ... | @@ -33,29 +33,60 @@ from buildgrid._protos.google.longrunning import operations_pb2 |
33 | 33 |
|
34 | 34 |
class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
|
35 | 35 |
|
36 |
- def __init__(self, server):
|
|
37 |
- self.logger = logging.getLogger(__name__)
|
|
36 |
+ def __init__(self, server, monitor=True):
|
|
37 |
+ self.__logger = logging.getLogger(__name__)
|
|
38 |
+ |
|
39 |
+ self.__peers_by_instance = {}
|
|
40 |
+ self.__peers = {}
|
|
41 |
+ |
|
38 | 42 |
self._instances = {}
|
43 |
+ self._is_monitored = True
|
|
44 |
+ |
|
39 | 45 |
remote_execution_pb2_grpc.add_ExecutionServicer_to_server(self, server)
|
40 | 46 |
|
41 |
- def add_instance(self, name, instance):
|
|
42 |
- self._instances[name] = instance
|
|
47 |
+ # --- Public API ---
|
|
48 |
+ |
|
49 |
+ def add_instance(self, instance_name, instance):
|
|
50 |
+ self._instances[instance_name] = instance
|
|
51 |
+ |
|
52 |
+ if self._is_monitored:
|
|
53 |
+ self.__peers_by_instance[instance_name] = set()
|
|
54 |
+ |
|
55 |
+ # --- Public API: Servicer ---
|
|
43 | 56 |
|
44 | 57 |
def Execute(self, request, context):
|
58 |
+ """Handles ExecuteRequest messages.
|
|
59 |
+ |
|
60 |
+ Args:
|
|
61 |
+ request (ExecuteRequest): The incoming RPC request.
|
|
62 |
+ context (grpc.ServicerContext): Context for the RPC call.
|
|
63 |
+ """
|
|
64 |
+ self.__logger.debug("Execute request from [%s]", context.peer())
|
|
65 |
+ |
|
66 |
+ instance_name = request.instance_name
|
|
67 |
+ message_queue = queue.Queue()
|
|
68 |
+ peer = context.peer()
|
|
69 |
+ |
|
45 | 70 |
try:
|
46 |
- message_queue = queue.Queue()
|
|
47 |
- instance = self._get_instance(request.instance_name)
|
|
71 |
+ instance = self._get_instance(instance_name)
|
|
48 | 72 |
operation = instance.execute(request.action_digest,
|
49 | 73 |
request.skip_cache_lookup,
|
50 | 74 |
message_queue)
|
51 | 75 |
|
52 |
- context.add_callback(partial(instance.unregister_message_client,
|
|
53 |
- operation.name, message_queue))
|
|
76 |
+ context.add_callback(partial(self._rpc_termination_callback,
|
|
77 |
+ peer, instance_name, operation.name, message_queue))
|
|
54 | 78 |
|
55 |
- instanced_op_name = "{}/{}".format(request.instance_name,
|
|
56 |
- operation.name)
|
|
79 |
+ if self._is_monitored:
|
|
80 |
+ if peer in self.__peers:
|
|
81 |
+ self.__peers[peer] += 1
|
|
82 |
+ else:
|
|
83 |
+ self.__peers[peer] = 1
|
|
57 | 84 |
|
58 |
- self.logger.info("Operation name: [{}]".format(instanced_op_name))
|
|
85 |
+ self.__peers_by_instance[instance_name].add(peer)
|
|
86 |
+ |
|
87 |
+ instanced_op_name = "{}/{}".format(instance_name, operation.name)
|
|
88 |
+ |
|
89 |
+ self.__logger.info("Operation name: [%s]", instanced_op_name)
|
|
59 | 90 |
|
60 | 91 |
for operation in instance.stream_operation_updates(message_queue,
|
61 | 92 |
operation.name):
|
... | ... | @@ -65,33 +96,48 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
65 | 96 |
yield op
|
66 | 97 |
|
67 | 98 |
except InvalidArgumentError as e:
|
68 |
- self.logger.error(e)
|
|
99 |
+ self.__logger.error(e)
|
|
69 | 100 |
context.set_details(str(e))
|
70 | 101 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
71 | 102 |
yield operations_pb2.Operation()
|
72 | 103 |
|
73 | 104 |
except FailedPreconditionError as e:
|
74 |
- self.logger.error(e)
|
|
105 |
+ self.__logger.error(e)
|
|
75 | 106 |
context.set_details(str(e))
|
76 | 107 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
77 | 108 |
yield operations_pb2.Operation()
|
78 | 109 |
|
79 | 110 |
def WaitExecution(self, request, context):
|
80 |
- try:
|
|
81 |
- names = request.name.split("/")
|
|
111 |
+ """Handles WaitExecutionRequest messages.
|
|
82 | 112 |
|
83 |
- # Operation name should be in format:
|
|
84 |
- # {instance/name}/{operation_id}
|
|
85 |
- instance_name = ''.join(names[0:-1])
|
|
113 |
+ Args:
|
|
114 |
+ request (WaitExecutionRequest): The incoming RPC request.
|
|
115 |
+ context (grpc.ServicerContext): Context for the RPC call.
|
|
116 |
+ """
|
|
117 |
+ self.__logger.debug("WaitExecution request from [%s]", context.peer())
|
|
118 |
+ |
|
119 |
+ names = request.name.split('/')
|
|
120 |
+ instance_name = '/'.join(names[:-1])
|
|
121 |
+ operation_name = names[-1]
|
|
122 |
+ message_queue = queue.Queue()
|
|
123 |
+ peer = context.peer()
|
|
124 |
+ |
|
125 |
+ try:
|
|
126 |
+ if instance_name != request.instance_name:
|
|
127 |
+ raise InvalidArgumentError("Invalid operation [{}] for instance [{}]"
|
|
128 |
+ .format(request.name, instance_name))
|
|
86 | 129 |
|
87 |
- message_queue = queue.Queue()
|
|
88 |
- operation_name = names[-1]
|
|
89 | 130 |
instance = self._get_instance(instance_name)
|
90 | 131 |
|
91 | 132 |
instance.register_message_client(operation_name, message_queue)
|
133 |
+ context.add_callback(partial(self._rpc_termination_callback,
|
|
134 |
+ peer, instance_name, operation_name, message_queue))
|
|
92 | 135 |
|
93 |
- context.add_callback(partial(instance.unregister_message_client,
|
|
94 |
- operation_name, message_queue))
|
|
136 |
+ if self._is_monitored:
|
|
137 |
+ if peer in self.__peers:
|
|
138 |
+ self.__peers[peer] += 1
|
|
139 |
+ else:
|
|
140 |
+ self.__peers[peer] = 1
|
|
95 | 141 |
|
96 | 142 |
for operation in instance.stream_operation_updates(message_queue,
|
97 | 143 |
operation_name):
|
... | ... | @@ -101,11 +147,39 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
101 | 147 |
yield op
|
102 | 148 |
|
103 | 149 |
except InvalidArgumentError as e:
|
104 |
- self.logger.error(e)
|
|
150 |
+ self.__logger.error(e)
|
|
105 | 151 |
context.set_details(str(e))
|
106 | 152 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
107 | 153 |
yield operations_pb2.Operation()
|
108 | 154 |
|
155 |
+ # --- Public API: Monitoring ---
|
|
156 |
+ |
|
157 |
+ @property
|
|
158 |
+ def is_monitored(self):
|
|
159 |
+ return self._is_monitored
|
|
160 |
+ |
|
161 |
+ def query_n_clients(self):
|
|
162 |
+ return len(self.__peers)
|
|
163 |
+ |
|
164 |
+ def query_n_clients_for_instance(self, instance_name):
|
|
165 |
+ try:
|
|
166 |
+ return len(self.__peers_by_instance[instance_name])
|
|
167 |
+ except KeyError:
|
|
168 |
+ return 0
|
|
169 |
+ |
|
170 |
+ # --- Private API ---
|
|
171 |
+ |
|
172 |
+ def _rpc_termination_callback(self, peer, instance_name, job_name, message_queue):
|
|
173 |
+ instance = self._get_instance(instance_name)
|
|
174 |
+ |
|
175 |
+ instance.unregister_message_client(job_name, message_queue)
|
|
176 |
+ |
|
177 |
+ if self._is_monitored:
|
|
178 |
+ if self.__peers[peer] > 1:
|
|
179 |
+ self.__peers[peer] -= 1
|
|
180 |
+ else:
|
|
181 |
+ del self.__peers[peer]
|
|
182 |
+ |
|
109 | 183 |
def _get_instance(self, name):
|
110 | 184 |
try:
|
111 | 185 |
return self._instances[name]
|
... | ... | @@ -13,18 +13,21 @@ |
13 | 13 |
# limitations under the License.
|
14 | 14 |
|
15 | 15 |
|
16 |
+import asyncio
|
|
17 |
+from concurrent import futures
|
|
16 | 18 |
import logging
|
17 | 19 |
import os
|
18 |
-from concurrent import futures
|
|
20 |
+import time
|
|
19 | 21 |
|
20 | 22 |
import grpc
|
21 | 23 |
|
22 |
-from .cas.service import ByteStreamService, ContentAddressableStorageService
|
|
23 |
-from .actioncache.service import ActionCacheService
|
|
24 |
-from .execution.service import ExecutionService
|
|
25 |
-from .operations.service import OperationsService
|
|
26 |
-from .bots.service import BotsService
|
|
27 |
-from .referencestorage.service import ReferenceStorageService
|
|
24 |
+from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
|
|
25 |
+from buildgrid.server.actioncache.service import ActionCacheService
|
|
26 |
+from buildgrid.server.execution.service import ExecutionService
|
|
27 |
+from buildgrid.server.operations.service import OperationsService
|
|
28 |
+from buildgrid.server.bots.service import BotsService
|
|
29 |
+from buildgrid.server.referencestorage.service import ReferenceStorageService
|
|
30 |
+from buildgrid.settings import MONITORING_PERIOD
|
|
28 | 31 |
|
29 | 32 |
|
30 | 33 |
class BuildGridServer:
|
... | ... | @@ -40,16 +43,17 @@ class BuildGridServer: |
40 | 43 |
Args:
|
41 | 44 |
max_workers (int, optional): A pool of max worker threads.
|
42 | 45 |
"""
|
43 |
- |
|
44 |
- self.logger = logging.getLogger(__name__)
|
|
46 |
+ self.__logger = logging.getLogger(__name__)
|
|
45 | 47 |
|
46 | 48 |
if max_workers is None:
|
47 | 49 |
# Use max_workers default from Python 3.5+
|
48 | 50 |
max_workers = (os.cpu_count() or 1) * 5
|
49 | 51 |
|
50 |
- server = grpc.server(futures.ThreadPoolExecutor(max_workers))
|
|
52 |
+ self.__grpc_executor = futures.ThreadPoolExecutor(max_workers)
|
|
53 |
+ self.__grpc_server = grpc.server(self.__grpc_executor)
|
|
51 | 54 |
|
52 |
- self._server = server
|
|
55 |
+ self.__main_loop = asyncio.get_event_loop()
|
|
56 |
+ self.__monitoring_task = None
|
|
53 | 57 |
|
54 | 58 |
self._execution_service = None
|
55 | 59 |
self._bots_service = None
|
... | ... | @@ -59,15 +63,33 @@ class BuildGridServer: |
59 | 63 |
self._cas_service = None
|
60 | 64 |
self._bytestream_service = None
|
61 | 65 |
|
66 |
+ self.__execution_instances = []
|
|
67 |
+ self.__bots_instances = []
|
|
68 |
+ |
|
69 |
+ # --- Public API ---
|
|
70 |
+ |
|
62 | 71 |
def start(self):
|
63 |
- """Starts the server.
|
|
72 |
+ """Starts the BuildGrid server.
|
|
64 | 73 |
"""
|
65 |
- self._server.start()
|
|
74 |
+ self.__grpc_server.start()
|
|
75 |
+ |
|
76 |
+ self.__monitoring_task = asyncio.ensure_future(
|
|
77 |
+ self._monitoring_worker(period=MONITORING_PERIOD), loop=self.__main_loop)
|
|
78 |
+ self.__main_loop.run_forever()
|
|
66 | 79 |
|
67 | 80 |
def stop(self, grace=0):
|
68 |
- """Stops the server.
|
|
81 |
+ """Stops the BuildGrid server.
|
|
82 |
+ |
|
83 |
+ Args:
|
|
84 |
+ grace (int, optional): A duration of time in seconds. Defaults to 0.
|
|
69 | 85 |
"""
|
70 |
- self._server.stop(grace)
|
|
86 |
+ if self.__monitoring_task is not None:
|
|
87 |
+ self.__monitoring_task.cancel()
|
|
88 |
+ |
|
89 |
+ self.__grpc_server.stop(grace)
|
|
90 |
+ |
|
91 |
+ if grace > 0:
|
|
92 |
+ time.sleep(grace)
|
|
71 | 93 |
|
72 | 94 |
def add_port(self, address, credentials):
|
73 | 95 |
"""Adds a port to the server.
|
... | ... | @@ -80,12 +102,12 @@ class BuildGridServer: |
80 | 102 |
credentials (:obj:`grpc.ChannelCredentials`): Credentials object.
|
81 | 103 |
"""
|
82 | 104 |
if credentials is not None:
|
83 |
- self.logger.info("Adding secure connection on: [{}]".format(address))
|
|
84 |
- self._server.add_secure_port(address, credentials)
|
|
105 |
+ self.__logger.info("Adding secure connection on: [%s]", address)
|
|
106 |
+ self.__grpc_server.add_secure_port(address, credentials)
|
|
85 | 107 |
|
86 | 108 |
else:
|
87 |
- self.logger.info("Adding insecure connection on [{}]".format(address))
|
|
88 |
- self._server.add_insecure_port(address)
|
|
109 |
+ self.__logger.info("Adding insecure connection on [%s]", address)
|
|
110 |
+ self.__grpc_server.add_insecure_port(address)
|
|
89 | 111 |
|
90 | 112 |
def add_execution_instance(self, instance, instance_name):
|
91 | 113 |
"""Adds an :obj:`ExecutionInstance` to the service.
|
... | ... | @@ -97,10 +119,11 @@ class BuildGridServer: |
97 | 119 |
instance_name (str): Instance name.
|
98 | 120 |
"""
|
99 | 121 |
if self._execution_service is None:
|
100 |
- self._execution_service = ExecutionService(self._server)
|
|
101 |
- |
|
122 |
+ self._execution_service = ExecutionService(self.__grpc_server)
|
|
102 | 123 |
self._execution_service.add_instance(instance_name, instance)
|
103 | 124 |
|
125 |
+ self.__execution_instances.append(instance_name)
|
|
126 |
+ |
|
104 | 127 |
def add_bots_interface(self, instance, instance_name):
|
105 | 128 |
"""Adds a :obj:`BotsInterface` to the service.
|
106 | 129 |
|
... | ... | @@ -111,10 +134,11 @@ class BuildGridServer: |
111 | 134 |
instance_name (str): Instance name.
|
112 | 135 |
"""
|
113 | 136 |
if self._bots_service is None:
|
114 |
- self._bots_service = BotsService(self._server)
|
|
115 |
- |
|
137 |
+ self._bots_service = BotsService(self.__grpc_server)
|
|
116 | 138 |
self._bots_service.add_instance(instance_name, instance)
|
117 | 139 |
|
140 |
+ self.__bots_instances.append(instance_name)
|
|
141 |
+ |
|
118 | 142 |
def add_operations_instance(self, instance, instance_name):
|
119 | 143 |
"""Adds an :obj:`OperationsInstance` to the service.
|
120 | 144 |
|
... | ... | @@ -125,8 +149,7 @@ class BuildGridServer: |
125 | 149 |
instance_name (str): Instance name.
|
126 | 150 |
"""
|
127 | 151 |
if self._operations_service is None:
|
128 |
- self._operations_service = OperationsService(self._server)
|
|
129 |
- |
|
152 |
+ self._operations_service = OperationsService(self.__grpc_server)
|
|
130 | 153 |
self._operations_service.add_instance(instance_name, instance)
|
131 | 154 |
|
132 | 155 |
def add_reference_storage_instance(self, instance, instance_name):
|
... | ... | @@ -139,8 +162,7 @@ class BuildGridServer: |
139 | 162 |
instance_name (str): Instance name.
|
140 | 163 |
"""
|
141 | 164 |
if self._reference_storage_service is None:
|
142 |
- self._reference_storage_service = ReferenceStorageService(self._server)
|
|
143 |
- |
|
165 |
+ self._reference_storage_service = ReferenceStorageService(self.__grpc_server)
|
|
144 | 166 |
self._reference_storage_service.add_instance(instance_name, instance)
|
145 | 167 |
|
146 | 168 |
def add_action_cache_instance(self, instance, instance_name):
|
... | ... | @@ -153,8 +175,7 @@ class BuildGridServer: |
153 | 175 |
instance_name (str): Instance name.
|
154 | 176 |
"""
|
155 | 177 |
if self._action_cache_service is None:
|
156 |
- self._action_cache_service = ActionCacheService(self._server)
|
|
157 |
- |
|
178 |
+ self._action_cache_service = ActionCacheService(self.__grpc_server)
|
|
158 | 179 |
self._action_cache_service.add_instance(instance_name, instance)
|
159 | 180 |
|
160 | 181 |
def add_cas_instance(self, instance, instance_name):
|
... | ... | @@ -167,8 +188,7 @@ class BuildGridServer: |
167 | 188 |
instance_name (str): Instance name.
|
168 | 189 |
"""
|
169 | 190 |
if self._cas_service is None:
|
170 |
- self._cas_service = ContentAddressableStorageService(self._server)
|
|
171 |
- |
|
191 |
+ self._cas_service = ContentAddressableStorageService(self.__grpc_server)
|
|
172 | 192 |
self._cas_service.add_instance(instance_name, instance)
|
173 | 193 |
|
174 | 194 |
def add_bytestream_instance(self, instance, instance_name):
|
... | ... | @@ -181,6 +201,23 @@ class BuildGridServer: |
181 | 201 |
instance_name (str): Instance name.
|
182 | 202 |
"""
|
183 | 203 |
if self._bytestream_service is None:
|
184 |
- self._bytestream_service = ByteStreamService(self._server)
|
|
185 |
- |
|
204 |
+ self._bytestream_service = ByteStreamService(self.__grpc_server)
|
|
186 | 205 |
self._bytestream_service.add_instance(instance_name, instance)
|
206 |
+ |
|
207 |
+ # --- Private API ---
|
|
208 |
+ |
|
209 |
+ async def _monitoring_worker(self, period=1):
|
|
210 |
+ while True:
|
|
211 |
+ try:
|
|
212 |
+ n_clients = self._execution_service.query_n_clients()
|
|
213 |
+ n_bots = self._bots_service.query_n_bots()
|
|
214 |
+ |
|
215 |
+ print('---')
|
|
216 |
+ print('n_clients={}'.format(n_clients))
|
|
217 |
+ print('n_bots={}'.format(n_bots))
|
|
218 |
+ |
|
219 |
+ await asyncio.sleep(period)
|
|
220 |
+ except asyncio.CancelledError:
|
|
221 |
+ break
|
|
222 |
+ |
|
223 |
+ self.__main_loop.stop()
|
... | ... | @@ -27,7 +27,7 @@ from buildgrid._protos.google.longrunning import operations_pb2 |
27 | 27 |
class Job:
|
28 | 28 |
|
29 | 29 |
def __init__(self, action, action_digest):
|
30 |
- self.logger = logging.getLogger(__name__)
|
|
30 |
+ self.__logger = logging.getLogger(__name__)
|
|
31 | 31 |
|
32 | 32 |
self._name = str(uuid.uuid4())
|
33 | 33 |
self._action = remote_execution_pb2.Action()
|
... | ... | @@ -28,7 +28,8 @@ from buildgrid._protos.google.longrunning import operations_pb2 |
28 | 28 |
class OperationsInstance:
|
29 | 29 |
|
30 | 30 |
def __init__(self, scheduler):
|
31 |
- self.logger = logging.getLogger(__name__)
|
|
31 |
+ self.__logger = logging.getLogger(__name__)
|
|
32 |
+ |
|
32 | 33 |
self._scheduler = scheduler
|
33 | 34 |
|
34 | 35 |
def register_instance_with_server(self, instance_name, server):
|
... | ... | @@ -32,7 +32,7 @@ from buildgrid._protos.google.longrunning import operations_pb2_grpc, operations |
32 | 32 |
class OperationsService(operations_pb2_grpc.OperationsServicer):
|
33 | 33 |
|
34 | 34 |
def __init__(self, server):
|
35 |
- self.logger = logging.getLogger(__name__)
|
|
35 |
+ self.__logger = logging.getLogger(__name__)
|
|
36 | 36 |
|
37 | 37 |
self._instances = {}
|
38 | 38 |
|
... | ... | @@ -42,6 +42,8 @@ class OperationsService(operations_pb2_grpc.OperationsServicer): |
42 | 42 |
self._instances[name] = instance
|
43 | 43 |
|
44 | 44 |
def GetOperation(self, request, context):
|
45 |
+ self.__logger.debug("GetOperation request from [%s]", context.peer())
|
|
46 |
+ |
|
45 | 47 |
try:
|
46 | 48 |
name = request.name
|
47 | 49 |
|
... | ... | @@ -56,13 +58,15 @@ class OperationsService(operations_pb2_grpc.OperationsServicer): |
56 | 58 |
return op
|
57 | 59 |
|
58 | 60 |
except InvalidArgumentError as e:
|
59 |
- self.logger.error(e)
|
|
61 |
+ self.__logger.error(e)
|
|
60 | 62 |
context.set_details(str(e))
|
61 | 63 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
62 | 64 |
|
63 | 65 |
return operations_pb2.Operation()
|
64 | 66 |
|
65 | 67 |
def ListOperations(self, request, context):
|
68 |
+ self.__logger.debug("ListOperations request from [%s]", context.peer())
|
|
69 |
+ |
|
66 | 70 |
try:
|
67 | 71 |
# The request name should be the collection name
|
68 | 72 |
# In our case, this is just the instance_name
|
... | ... | @@ -79,13 +83,15 @@ class OperationsService(operations_pb2_grpc.OperationsServicer): |
79 | 83 |
return result
|
80 | 84 |
|
81 | 85 |
except InvalidArgumentError as e:
|
82 |
- self.logger.error(e)
|
|
86 |
+ self.__logger.error(e)
|
|
83 | 87 |
context.set_details(str(e))
|
84 | 88 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
85 | 89 |
|
86 | 90 |
return operations_pb2.ListOperationsResponse()
|
87 | 91 |
|
88 | 92 |
def DeleteOperation(self, request, context):
|
93 |
+ self.__logger.debug("DeleteOperation request from [%s]", context.peer())
|
|
94 |
+ |
|
89 | 95 |
try:
|
90 | 96 |
name = request.name
|
91 | 97 |
|
... | ... | @@ -96,13 +102,15 @@ class OperationsService(operations_pb2_grpc.OperationsServicer): |
96 | 102 |
instance.delete_operation(operation_name)
|
97 | 103 |
|
98 | 104 |
except InvalidArgumentError as e:
|
99 |
- self.logger.error(e)
|
|
105 |
+ self.__logger.error(e)
|
|
100 | 106 |
context.set_details(str(e))
|
101 | 107 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
102 | 108 |
|
103 | 109 |
return Empty()
|
104 | 110 |
|
105 | 111 |
def CancelOperation(self, request, context):
|
112 |
+ self.__logger.debug("CancelOperation request from [%s]", context.peer())
|
|
113 |
+ |
|
106 | 114 |
try:
|
107 | 115 |
name = request.name
|
108 | 116 |
|
... | ... | @@ -113,12 +121,12 @@ class OperationsService(operations_pb2_grpc.OperationsServicer): |
113 | 121 |
instance.cancel_operation(operation_name)
|
114 | 122 |
|
115 | 123 |
except NotImplementedError as e:
|
116 |
- self.logger.error(e)
|
|
124 |
+ self.__logger.error(e)
|
|
117 | 125 |
context.set_details(str(e))
|
118 | 126 |
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
119 | 127 |
|
120 | 128 |
except InvalidArgumentError as e:
|
121 |
- self.logger.error(e)
|
|
129 |
+ self.__logger.error(e)
|
|
122 | 130 |
context.set_details(str(e))
|
123 | 131 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
124 | 132 |
|
... | ... | @@ -25,7 +25,7 @@ from buildgrid._protos.buildstream.v2 import buildstream_pb2_grpc |
25 | 25 |
class ReferenceStorageService(buildstream_pb2_grpc.ReferenceStorageServicer):
|
26 | 26 |
|
27 | 27 |
def __init__(self, server):
|
28 |
- self.logger = logging.getLogger(__name__)
|
|
28 |
+ self.__logger = logging.getLogger(__name__)
|
|
29 | 29 |
|
30 | 30 |
self._instances = {}
|
31 | 31 |
|
... | ... | @@ -35,6 +35,8 @@ class ReferenceStorageService(buildstream_pb2_grpc.ReferenceStorageServicer): |
35 | 35 |
self._instances[name] = instance
|
36 | 36 |
|
37 | 37 |
def GetReference(self, request, context):
|
38 |
+ self.__logger.debug("GetReference request from [%s]", context.peer())
|
|
39 |
+ |
|
38 | 40 |
try:
|
39 | 41 |
instance = self._get_instance(request.instance_name)
|
40 | 42 |
digest = instance.get_digest_reference(request.key)
|
... | ... | @@ -43,17 +45,19 @@ class ReferenceStorageService(buildstream_pb2_grpc.ReferenceStorageServicer): |
43 | 45 |
return response
|
44 | 46 |
|
45 | 47 |
except InvalidArgumentError as e:
|
46 |
- self.logger.error(e)
|
|
48 |
+ self.__logger.error(e)
|
|
47 | 49 |
context.set_details(str(e))
|
48 | 50 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
49 | 51 |
|
50 | 52 |
except NotFoundError as e:
|
51 |
- self.logger.debug(e)
|
|
53 |
+ self.__logger.debug(e)
|
|
52 | 54 |
context.set_code(grpc.StatusCode.NOT_FOUND)
|
53 | 55 |
|
54 | 56 |
return buildstream_pb2.GetReferenceResponse()
|
55 | 57 |
|
56 | 58 |
def UpdateReference(self, request, context):
|
59 |
+ self.__logger.debug("UpdateReference request from [%s]", context.peer())
|
|
60 |
+ |
|
57 | 61 |
try:
|
58 | 62 |
instance = self._get_instance(request.instance_name)
|
59 | 63 |
digest = request.digest
|
... | ... | @@ -62,7 +66,7 @@ class ReferenceStorageService(buildstream_pb2_grpc.ReferenceStorageServicer): |
62 | 66 |
instance.update_reference(key, digest)
|
63 | 67 |
|
64 | 68 |
except InvalidArgumentError as e:
|
65 |
- self.logger.error(e)
|
|
69 |
+ self.__logger.error(e)
|
|
66 | 70 |
context.set_details(str(e))
|
67 | 71 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
68 | 72 |
|
... | ... | @@ -72,13 +76,15 @@ class ReferenceStorageService(buildstream_pb2_grpc.ReferenceStorageServicer): |
72 | 76 |
return buildstream_pb2.UpdateReferenceResponse()
|
73 | 77 |
|
74 | 78 |
def Status(self, request, context):
|
79 |
+ self.__logger.debug("Status request from [%s]", context.peer())
|
|
80 |
+ |
|
75 | 81 |
try:
|
76 | 82 |
instance = self._get_instance(request.instance_name)
|
77 | 83 |
allow_updates = instance.allow_updates
|
78 | 84 |
return buildstream_pb2.StatusResponse(allow_updates=allow_updates)
|
79 | 85 |
|
80 | 86 |
except InvalidArgumentError as e:
|
81 |
- self.logger.error(e)
|
|
87 |
+ self.__logger.error(e)
|
|
82 | 88 |
context.set_details(str(e))
|
83 | 89 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
84 | 90 |
|
... | ... | @@ -23,6 +23,7 @@ For a given key, it |
23 | 23 |
"""
|
24 | 24 |
|
25 | 25 |
import collections
|
26 |
+import logging
|
|
26 | 27 |
|
27 | 28 |
from buildgrid._exceptions import NotFoundError
|
28 | 29 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
... | ... | @@ -38,6 +39,8 @@ class ReferenceCache: |
38 | 39 |
max_cached_refs (int): maximum number of entries to be stored.
|
39 | 40 |
allow_updates (bool): allow the client to write to storage
|
40 | 41 |
"""
|
42 |
+ self.__logger = logging.getLogger(__name__)
|
|
43 |
+ |
|
41 | 44 |
self._allow_updates = allow_updates
|
42 | 45 |
self._storage = storage
|
43 | 46 |
self._max_cached_refs = max_cached_refs
|
... | ... | @@ -20,21 +20,33 @@ Schedules jobs. |
20 | 20 |
"""
|
21 | 21 |
|
22 | 22 |
from collections import deque
|
23 |
+import logging
|
|
23 | 24 |
|
24 |
-from buildgrid._exceptions import NotFoundError
|
|
25 |
+from google.protobuf import duration_pb2
|
|
25 | 26 |
|
26 |
-from .job import OperationStage, LeaseState
|
|
27 |
+from buildgrid._enums import LeaseState, OperationStage
|
|
28 |
+from buildgrid._exceptions import NotFoundError
|
|
27 | 29 |
|
28 | 30 |
|
29 | 31 |
class Scheduler:
|
30 | 32 |
|
31 | 33 |
MAX_N_TRIES = 5
|
32 | 34 |
|
33 |
- def __init__(self, action_cache=None):
|
|
35 |
+ def __init__(self, action_cache=None, monitor=True):
|
|
36 |
+ self.__logger = logging.getLogger(__name__)
|
|
37 |
+ |
|
38 |
+ self.__queue_times_by_priority = {}
|
|
39 |
+ self.__queue_time = duration_pb2.Duration()
|
|
40 |
+ self.__retries_by_error = {}
|
|
41 |
+ self.__retries_count = 0
|
|
42 |
+ |
|
34 | 43 |
self._action_cache = action_cache
|
44 |
+ self._is_monitored = True
|
|
35 | 45 |
self.jobs = {}
|
36 | 46 |
self.queue = deque()
|
37 | 47 |
|
48 |
+ # --- Public API ---
|
|
49 |
+ |
|
38 | 50 |
def register_client(self, job_name, queue):
|
39 | 51 |
self.jobs[job_name].register_client(queue)
|
40 | 52 |
|
... | ... | @@ -133,3 +145,42 @@ class Scheduler: |
133 | 145 |
def get_job_operation(self, job_name):
|
134 | 146 |
"""Returns the operation associated to job."""
|
135 | 147 |
return self.jobs[job_name].operation
|
148 |
+ |
|
149 |
+ # --- Public API: Monitoring ---
|
|
150 |
+ |
|
151 |
+ @property
|
|
152 |
+ def is_monitored(self):
|
|
153 |
+ return self._is_monitored
|
|
154 |
+ |
|
155 |
+ def query_n_jobs(self):
|
|
156 |
+ return len(self.jobs)
|
|
157 |
+ |
|
158 |
+ def query_n_operations(self):
|
|
159 |
+ return len(self.jobs)
|
|
160 |
+ |
|
161 |
+ def query_n_operations_by_stage(self):
|
|
162 |
+ return len(self.jobs)
|
|
163 |
+ |
|
164 |
+ def query_n_leases(self):
|
|
165 |
+ return len(self.jobs)
|
|
166 |
+ |
|
167 |
+ def query_n_leases_by_state(self):
|
|
168 |
+ return len(self.jobs)
|
|
169 |
+ |
|
170 |
+ def query_n_retries(self):
|
|
171 |
+ return self.__retries_count
|
|
172 |
+ |
|
173 |
+ def query_n_retries_for_error(self, error_type):
|
|
174 |
+ try:
|
|
175 |
+ return self.__retries_by_error[error_type]
|
|
176 |
+ except KeyError:
|
|
177 |
+ return 0
|
|
178 |
+ |
|
179 |
+ def query_am_queue_time(self):
|
|
180 |
+ return self.__average_queue_time
|
|
181 |
+ |
|
182 |
+ def query_am_queue_time_for_priority(self, priority_level):
|
|
183 |
+ try:
|
|
184 |
+ return self.__queue_times_by_priority[priority_level]
|
|
185 |
+ except KeyError:
|
|
186 |
+ return 0
|
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+ |
|
1 | 16 |
import hashlib
|
2 | 17 |
|
3 | 18 |
|
4 |
-# The hash function that CAS uses
|
|
19 |
+# Hash function used for computing digests:
|
|
5 | 20 |
HASH = hashlib.sha256
|
21 |
+ |
|
22 |
+# Lenght in bytes of a hash string returned by HASH:
|
|
6 | 23 |
HASH_LENGTH = HASH().digest_size * 2
|
24 |
+ |
|
25 |
+# Period, in seconds, for the monitoring cycle:
|
|
26 |
+MONITORING_PERIOD = 5.0
|
... | ... | @@ -112,13 +112,15 @@ setup( |
112 | 112 |
license="Apache License, Version 2.0",
|
113 | 113 |
description="A remote execution service",
|
114 | 114 |
packages=find_packages(),
|
115 |
+ python_requires='>= 3.5.3', # janus requirement
|
|
115 | 116 |
install_requires=[
|
116 |
- 'protobuf',
|
|
117 |
- 'grpcio',
|
|
118 |
- 'Click',
|
|
119 |
- 'PyYAML',
|
|
120 | 117 |
'boto3 < 1.8.0',
|
121 | 118 |
'botocore < 1.11.0',
|
119 |
+ 'click',
|
|
120 |
+ 'grpcio',
|
|
121 |
+ 'janus',
|
|
122 |
+ 'protobuf',
|
|
123 |
+ 'pyyaml',
|
|
122 | 124 |
],
|
123 | 125 |
entry_points={
|
124 | 126 |
'console_scripts': [
|
... | ... | @@ -89,7 +89,7 @@ def test_bytestream_read(mocked, data_to_read, instance): |
89 | 89 |
request.resource_name += "blobs/{}/{}".format(HASH(data_to_read).hexdigest(), len(data_to_read))
|
90 | 90 |
|
91 | 91 |
data = b""
|
92 |
- for response in servicer.Read(request, None):
|
|
92 |
+ for response in servicer.Read(request, context):
|
|
93 | 93 |
data += response.data
|
94 | 94 |
assert data == data_to_read
|
95 | 95 |
|
... | ... | @@ -111,7 +111,7 @@ def test_bytestream_read_many(mocked, instance): |
111 | 111 |
request.resource_name += "blobs/{}/{}".format(HASH(data_to_read).hexdigest(), len(data_to_read))
|
112 | 112 |
|
113 | 113 |
data = b""
|
114 |
- for response in servicer.Read(request, None):
|
|
114 |
+ for response in servicer.Read(request, context):
|
|
115 | 115 |
data += response.data
|
116 | 116 |
assert data == data_to_read
|
117 | 117 |
|
... | ... | @@ -137,7 +137,7 @@ def test_bytestream_write(mocked, instance, extra_data): |
137 | 137 |
bytestream_pb2.WriteRequest(data=b'def', write_offset=3, finish_write=True)
|
138 | 138 |
]
|
139 | 139 |
|
140 |
- response = servicer.Write(requests, None)
|
|
140 |
+ response = servicer.Write(requests, context)
|
|
141 | 141 |
assert response.committed_size == 6
|
142 | 142 |
assert len(storage.data) == 1
|
143 | 143 |
assert (hash_, 6) in storage.data
|
... | ... | @@ -178,7 +178,7 @@ def test_cas_find_missing_blobs(mocked, instance): |
178 | 178 |
re_pb2.Digest(hash=HASH(b'ghij').hexdigest(), size_bytes=4)
|
179 | 179 |
]
|
180 | 180 |
request = re_pb2.FindMissingBlobsRequest(instance_name=instance, blob_digests=digests)
|
181 |
- response = servicer.FindMissingBlobs(request, None)
|
|
181 |
+ response = servicer.FindMissingBlobs(request, context)
|
|
182 | 182 |
assert len(response.missing_blob_digests) == 1
|
183 | 183 |
assert response.missing_blob_digests[0] == digests[1]
|
184 | 184 |
|
... | ... | @@ -201,7 +201,7 @@ def test_cas_batch_update_blobs(mocked, instance): |
201 | 201 |
]
|
202 | 202 |
|
203 | 203 |
request = re_pb2.BatchUpdateBlobsRequest(instance_name=instance, requests=update_requests)
|
204 |
- response = servicer.BatchUpdateBlobs(request, None)
|
|
204 |
+ response = servicer.BatchUpdateBlobs(request, context)
|
|
205 | 205 |
assert len(response.responses) == 2
|
206 | 206 |
|
207 | 207 |
for blob_response in response.responses:
|