| ... | 
... | 
@@ -13,18 +13,21 @@ | 
| 
13
 | 
13
 | 
 # limitations under the License.
 
 | 
| 
14
 | 
14
 | 
 
 
 | 
| 
15
 | 
15
 | 
 
 
 | 
| 
 
 | 
16
 | 
+import asyncio
 
 | 
| 
16
 | 
17
 | 
 from concurrent import futures
 
 | 
| 
17
 | 
18
 | 
 import logging
 
 | 
| 
18
 | 
19
 | 
 import os
 
 | 
| 
 
 | 
20
 | 
+import time
 
 | 
| 
19
 | 
21
 | 
 
 
 | 
| 
20
 | 
22
 | 
 import grpc
 
 | 
| 
21
 | 
23
 | 
 
 
 | 
| 
22
 | 
 
 | 
-from .cas.service import ByteStreamService, ContentAddressableStorageService
 
 | 
| 
23
 | 
 
 | 
-from .actioncache.service import ActionCacheService
 
 | 
| 
24
 | 
 
 | 
-from .execution.service import ExecutionService
 
 | 
| 
25
 | 
 
 | 
-from .operations.service import OperationsService
 
 | 
| 
26
 | 
 
 | 
-from .bots.service import BotsService
 
 | 
| 
27
 | 
 
 | 
-from .referencestorage.service import ReferenceStorageService
 
 | 
| 
 
 | 
24
 | 
+from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
 
 | 
| 
 
 | 
25
 | 
+from buildgrid.server.actioncache.service import ActionCacheService
 
 | 
| 
 
 | 
26
 | 
+from buildgrid.server.execution.service import ExecutionService
 
 | 
| 
 
 | 
27
 | 
+from buildgrid.server.operations.service import OperationsService
 
 | 
| 
 
 | 
28
 | 
+from buildgrid.server.bots.service import BotsService
 
 | 
| 
 
 | 
29
 | 
+from buildgrid.server.referencestorage.service import ReferenceStorageService
 
 | 
| 
 
 | 
30
 | 
+from buildgrid.settings import MONITORING_PERIOD
 
 | 
| 
28
 | 
31
 | 
 
 
 | 
| 
29
 | 
32
 | 
 
 
 | 
| 
30
 | 
33
 | 
 class BuildGridServer:
 
 | 
| ... | 
... | 
@@ -46,9 +49,11 @@ class BuildGridServer: | 
| 
46
 | 
49
 | 
             # Use max_workers default from Python 3.5+
 
 | 
| 
47
 | 
50
 | 
             max_workers = (os.cpu_count() or 1) * 5
 
 | 
| 
48
 | 
51
 | 
 
 
 | 
| 
49
 | 
 
 | 
-        server = grpc.server(futures.ThreadPoolExecutor(max_workers))
 
 | 
| 
 
 | 
52
 | 
+        self.__grpc_executor = futures.ThreadPoolExecutor(max_workers)
 
 | 
| 
 
 | 
53
 | 
+        self.__grpc_server = grpc.server(self.__grpc_executor)
 
 | 
| 
50
 | 
54
 | 
 
 
 | 
| 
51
 | 
 
 | 
-        self._server = server
 
 | 
| 
 
 | 
55
 | 
+        self.__main_loop = asyncio.get_event_loop()
 
 | 
| 
 
 | 
56
 | 
+        self.__monitoring_task = None
 
 | 
| 
52
 | 
57
 | 
 
 
 | 
| 
53
 | 
58
 | 
         self._execution_service = None
 
 | 
| 
54
 | 
59
 | 
         self._bots_service = None
 
 | 
| ... | 
... | 
@@ -58,15 +63,32 @@ class BuildGridServer: | 
| 
58
 | 
63
 | 
         self._cas_service = None
 
 | 
| 
59
 | 
64
 | 
         self._bytestream_service = None
 
 | 
| 
60
 | 
65
 | 
 
 
 | 
| 
 
 | 
66
 | 
+        self._instances = set()
 
 | 
| 
 
 | 
67
 | 
+
 
 | 
| 
 
 | 
68
 | 
+    # --- Public API ---
 
 | 
| 
 
 | 
69
 | 
+
 
 | 
| 
61
 | 
70
 | 
     def start(self):
 
 | 
| 
62
 | 
 
 | 
-        """Starts the server.
 
 | 
| 
 
 | 
71
 | 
+        """Starts the BuildGrid server.
 
 | 
| 
63
 | 
72
 | 
         """
 
 | 
| 
64
 | 
 
 | 
-        self._server.start()
 
 | 
| 
 
 | 
73
 | 
+        self.__grpc_server.start()
 
 | 
| 
 
 | 
74
 | 
+
 
 | 
| 
 
 | 
75
 | 
+        self.__monitoring_task = asyncio.ensure_future(
 
 | 
| 
 
 | 
76
 | 
+            self._monitoring_worker(period=MONITORING_PERIOD), loop=self.__main_loop)
 
 | 
| 
 
 | 
77
 | 
+        self.__main_loop.run_forever()
 
 | 
| 
65
 | 
78
 | 
 
 
 | 
| 
66
 | 
79
 | 
     def stop(self, grace=0):
 
 | 
| 
67
 | 
 
 | 
-        """Stops the server.
 
 | 
| 
 
 | 
80
 | 
+        """Stops the BuildGrid server.
 
 | 
| 
 
 | 
81
 | 
+
 
 | 
| 
 
 | 
82
 | 
+        Args:
 
 | 
| 
 
 | 
83
 | 
+            grace (int, optional): A duration of time in seconds. Defaults to 0.
 
 | 
| 
68
 | 
84
 | 
         """
 
 | 
| 
69
 | 
 
 | 
-        self._server.stop(grace)
 
 | 
| 
 
 | 
85
 | 
+        if self.__monitoring_task is not None:
 
 | 
| 
 
 | 
86
 | 
+            self.__monitoring_task.cancel()
 
 | 
| 
 
 | 
87
 | 
+
 
 | 
| 
 
 | 
88
 | 
+        self.__grpc_server.stop(grace)
 
 | 
| 
 
 | 
89
 | 
+
 
 | 
| 
 
 | 
90
 | 
+        if grace > 0:
 
 | 
| 
 
 | 
91
 | 
+            time.sleep(grace)
 
 | 
| 
70
 | 
92
 | 
 
 
 | 
| 
71
 | 
93
 | 
     def add_port(self, address, credentials):
 
 | 
| 
72
 | 
94
 | 
         """Adds a port to the server.
 
 | 
| ... | 
... | 
@@ -80,11 +102,11 @@ class BuildGridServer: | 
| 
80
 | 
102
 | 
         """
 
 | 
| 
81
 | 
103
 | 
         if credentials is not None:
 
 | 
| 
82
 | 
104
 | 
             self.__logger.info("Adding secure connection on: [%s]", address)
 | 
| 
83
 | 
 
 | 
-            self._server.add_secure_port(address, credentials)
 
 | 
| 
 
 | 
105
 | 
+            self.__grpc_server.add_secure_port(address, credentials)
 
 | 
| 
84
 | 
106
 | 
 
 
 | 
| 
85
 | 
107
 | 
         else:
 
 | 
| 
86
 | 
108
 | 
             self.__logger.info("Adding insecure connection on [%s]", address)
 | 
| 
87
 | 
 
 | 
-            self._server.add_insecure_port(address)
 
 | 
| 
 
 | 
109
 | 
+            self.__grpc_server.add_insecure_port(address)
 
 | 
| 
88
 | 
110
 | 
 
 
 | 
| 
89
 | 
111
 | 
     def add_execution_instance(self, instance, instance_name):
 
 | 
| 
90
 | 
112
 | 
         """Adds an :obj:`ExecutionInstance` to the service.
 
 | 
| ... | 
... | 
@@ -96,10 +118,11 @@ class BuildGridServer: | 
| 
96
 | 
118
 | 
             instance_name (str): Instance name.
 
 | 
| 
97
 | 
119
 | 
         """
 
 | 
| 
98
 | 
120
 | 
         if self._execution_service is None:
 
 | 
| 
99
 | 
 
 | 
-            self._execution_service = ExecutionService(self._server)
 
 | 
| 
100
 | 
 
 | 
-
 
 | 
| 
 
 | 
121
 | 
+            self._execution_service = ExecutionService(self.__grpc_server)
 
 | 
| 
101
 | 
122
 | 
         self._execution_service.add_instance(instance_name, instance)
 
 | 
| 
102
 | 
123
 | 
 
 
 | 
| 
 
 | 
124
 | 
+        self._instances.add(instance_name)
 
 | 
| 
 
 | 
125
 | 
+
 
 | 
| 
103
 | 
126
 | 
     def add_bots_interface(self, instance, instance_name):
 
 | 
| 
104
 | 
127
 | 
         """Adds a :obj:`BotsInterface` to the service.
 
 | 
| 
105
 | 
128
 | 
 
 
 | 
| ... | 
... | 
@@ -110,10 +133,11 @@ class BuildGridServer: | 
| 
110
 | 
133
 | 
             instance_name (str): Instance name.
 
 | 
| 
111
 | 
134
 | 
         """
 
 | 
| 
112
 | 
135
 | 
         if self._bots_service is None:
 
 | 
| 
113
 | 
 
 | 
-            self._bots_service = BotsService(self._server)
 
 | 
| 
114
 | 
 
 | 
-
 
 | 
| 
 
 | 
136
 | 
+            self._bots_service = BotsService(self.__grpc_server)
 
 | 
| 
115
 | 
137
 | 
         self._bots_service.add_instance(instance_name, instance)
 
 | 
| 
116
 | 
138
 | 
 
 
 | 
| 
 
 | 
139
 | 
+        self._instances.add(instance_name)
 
 | 
| 
 
 | 
140
 | 
+
 
 | 
| 
117
 | 
141
 | 
     def add_operations_instance(self, instance, instance_name):
 
 | 
| 
118
 | 
142
 | 
         """Adds an :obj:`OperationsInstance` to the service.
 
 | 
| 
119
 | 
143
 | 
 
 
 | 
| ... | 
... | 
@@ -124,8 +148,7 @@ class BuildGridServer: | 
| 
124
 | 
148
 | 
             instance_name (str): Instance name.
 
 | 
| 
125
 | 
149
 | 
         """
 
 | 
| 
126
 | 
150
 | 
         if self._operations_service is None:
 
 | 
| 
127
 | 
 
 | 
-            self._operations_service = OperationsService(self._server)
 
 | 
| 
128
 | 
 
 | 
-
 
 | 
| 
 
 | 
151
 | 
+            self._operations_service = OperationsService(self.__grpc_server)
 
 | 
| 
129
 | 
152
 | 
         self._operations_service.add_instance(instance_name, instance)
 
 | 
| 
130
 | 
153
 | 
 
 
 | 
| 
131
 | 
154
 | 
     def add_reference_storage_instance(self, instance, instance_name):
 
 | 
| ... | 
... | 
@@ -138,8 +161,7 @@ class BuildGridServer: | 
| 
138
 | 
161
 | 
             instance_name (str): Instance name.
 
 | 
| 
139
 | 
162
 | 
         """
 
 | 
| 
140
 | 
163
 | 
         if self._reference_storage_service is None:
 
 | 
| 
141
 | 
 
 | 
-            self._reference_storage_service = ReferenceStorageService(self._server)
 
 | 
| 
142
 | 
 
 | 
-
 
 | 
| 
 
 | 
164
 | 
+            self._reference_storage_service = ReferenceStorageService(self.__grpc_server)
 
 | 
| 
143
 | 
165
 | 
         self._reference_storage_service.add_instance(instance_name, instance)
 
 | 
| 
144
 | 
166
 | 
 
 
 | 
| 
145
 | 
167
 | 
     def add_action_cache_instance(self, instance, instance_name):
 
 | 
| ... | 
... | 
@@ -152,8 +174,7 @@ class BuildGridServer: | 
| 
152
 | 
174
 | 
             instance_name (str): Instance name.
 
 | 
| 
153
 | 
175
 | 
         """
 
 | 
| 
154
 | 
176
 | 
         if self._action_cache_service is None:
 
 | 
| 
155
 | 
 
 | 
-            self._action_cache_service = ActionCacheService(self._server)
 
 | 
| 
156
 | 
 
 | 
-
 
 | 
| 
 
 | 
177
 | 
+            self._action_cache_service = ActionCacheService(self.__grpc_server)
 
 | 
| 
157
 | 
178
 | 
         self._action_cache_service.add_instance(instance_name, instance)
 
 | 
| 
158
 | 
179
 | 
 
 
 | 
| 
159
 | 
180
 | 
     def add_cas_instance(self, instance, instance_name):
 
 | 
| ... | 
... | 
@@ -166,8 +187,7 @@ class BuildGridServer: | 
| 
166
 | 
187
 | 
             instance_name (str): Instance name.
 
 | 
| 
167
 | 
188
 | 
         """
 
 | 
| 
168
 | 
189
 | 
         if self._cas_service is None:
 
 | 
| 
169
 | 
 
 | 
-            self._cas_service = ContentAddressableStorageService(self._server)
 
 | 
| 
170
 | 
 
 | 
-
 
 | 
| 
 
 | 
190
 | 
+            self._cas_service = ContentAddressableStorageService(self.__grpc_server)
 
 | 
| 
171
 | 
191
 | 
         self._cas_service.add_instance(instance_name, instance)
 
 | 
| 
172
 | 
192
 | 
 
 
 | 
| 
173
 | 
193
 | 
     def add_bytestream_instance(self, instance, instance_name):
 
 | 
| ... | 
... | 
@@ -180,6 +200,31 @@ class BuildGridServer: | 
| 
180
 | 
200
 | 
             instance_name (str): Instance name.
 
 | 
| 
181
 | 
201
 | 
         """
 
 | 
| 
182
 | 
202
 | 
         if self._bytestream_service is None:
 
 | 
| 
183
 | 
 
 | 
-            self._bytestream_service = ByteStreamService(self._server)
 
 | 
| 
184
 | 
 
 | 
-
 
 | 
| 
 
 | 
203
 | 
+            self._bytestream_service = ByteStreamService(self.__grpc_server)
 
 | 
| 
185
 | 
204
 | 
         self._bytestream_service.add_instance(instance_name, instance)
 
 | 
| 
 
 | 
205
 | 
+
 
 | 
| 
 
 | 
206
 | 
+    # --- Private API ---
 
 | 
| 
 
 | 
207
 | 
+
 
 | 
| 
 
 | 
208
 | 
+    async def _monitoring_worker(self, period=1):
 
 | 
| 
 
 | 
209
 | 
+        while True:
 
 | 
| 
 
 | 
210
 | 
+            try:
 
 | 
| 
 
 | 
211
 | 
+                n_clients = self._execution_service.query_n_clients()
 
 | 
| 
 
 | 
212
 | 
+                n_bots = self._bots_service.query_n_bots()
 
 | 
| 
 
 | 
213
 | 
+
 
 | 
| 
 
 | 
214
 | 
+                print('---')
 | 
| 
 
 | 
215
 | 
+                print('Totals: n_clients={}, n_bots={}'.format(n_clients, n_bots))
 | 
| 
 
 | 
216
 | 
+                print('Per instances:')
 | 
| 
 
 | 
217
 | 
+                for instance_name in self._instances:
 
 | 
| 
 
 | 
218
 | 
+                    n_clients = self._execution_service.query_n_clients_for_instance(instance_name)
 
 | 
| 
 
 | 
219
 | 
+                    n_bots = self._bots_service.query_n_bots_for_instance(instance_name)
 
 | 
| 
 
 | 
220
 | 
+
 
 | 
| 
 
 | 
221
 | 
+                    instance_name = instance_name or 'empty'
 
 | 
| 
 
 | 
222
 | 
+
 
 | 
| 
 
 | 
223
 | 
+                    print(' - {}: n_clients={}, n_bots={}'.format(instance_name, n_clients, n_bots))
 | 
| 
 
 | 
224
 | 
+
 
 | 
| 
 
 | 
225
 | 
+                await asyncio.sleep(period)
 
 | 
| 
 
 | 
226
 | 
+
 
 | 
| 
 
 | 
227
 | 
+            except asyncio.CancelledError:
 
 | 
| 
 
 | 
228
 | 
+                break
 
 | 
| 
 
 | 
229
 | 
+
 
 | 
| 
 
 | 
230
 | 
+        self.__main_loop.stop() 
 |