[Notes] [Git][BuildGrid/buildgrid][mablanch/152-servicer-level-auth] 13 commits: bots/instance.py: Fix how cancelled leases get reported



Title: GitLab

Martin Blanchard pushed to branch mablanch/152-servicer-level-auth at BuildGrid / buildgrid

Commits:

13 changed files:

Changes:

  • buildgrid/client/authentication.py
    ... ... @@ -191,7 +191,7 @@ class AuthMetadataClientInterceptor(
    191 191
     
    
    192 192
             class _ClientCallDetails(
    
    193 193
                     namedtuple('_ClientCallDetails',
    
    194
    -                           ('method', 'timeout', 'credentials', 'metadata')),
    
    194
    +                           ('method', 'timeout', 'credentials', 'metadata',)),
    
    195 195
                     grpc.ClientCallDetails):
    
    196 196
                 pass
    
    197 197
     
    

  • buildgrid/server/_authentication.py
    ... ... @@ -13,8 +13,10 @@
    13 13
     # limitations under the License.
    
    14 14
     
    
    15 15
     
    
    16
    +from collections import namedtuple
    
    16 17
     from datetime import datetime
    
    17 18
     from enum import Enum
    
    19
    +import functools
    
    18 20
     import logging
    
    19 21
     
    
    20 22
     import grpc
    
    ... ... @@ -55,6 +57,11 @@ class AuthMetadataAlgorithm(Enum):
    55 57
         JWT_RS512 = 'rs512'  # RSASSA-PKCS1-v1_5 signature algorithm using SHA-512 hash algorithm
    
    56 58
     
    
    57 59
     
    
    60
    +class AuthContext:
    
    61
    +
    
    62
    +    interceptor = None
    
    63
    +
    
    64
    +
    
    58 65
     class _InvalidTokenError(Exception):
    
    59 66
         pass
    
    60 67
     
    
    ... ... @@ -67,14 +74,50 @@ class _UnboundedTokenError(Exception):
    67 74
         pass
    
    68 75
     
    
    69 76
     
    
    77
    +def authorize(auth_context):
    
    78
    +
    
    79
    +    def __authorize_decorator(behavior):
    
    80
    +
    
    81
    +        _HandlerCallDetails = namedtuple(
    
    82
    +            '_HandlerCallDetails', ('invocation_metadata', 'method',))
    
    83
    +
    
    84
    +        @functools.wraps(behavior)
    
    85
    +        def __authorize_wrapper(self, request, context):
    
    86
    +            if auth_context.interceptor is None:
    
    87
    +                return behavior(self, request, context)
    
    88
    +
    
    89
    +            authorized = False
    
    90
    +
    
    91
    +            def __continuator(handler_call_details):
    
    92
    +                nonlocal authorized
    
    93
    +                authorized = True
    
    94
    +
    
    95
    +            details = _HandlerCallDetails(context.invocation_metadata(),
    
    96
    +                                          behavior.__name__)
    
    97
    +
    
    98
    +            auth_context.interceptor.intercept_service(__continuator, details)
    
    99
    +
    
    100
    +            if authorized:
    
    101
    +                return behavior(self, request, context)
    
    102
    +
    
    103
    +            context.abort(grpc.StatusCode.UNAUTHENTICATED,
    
    104
    +                          "No valid authorization or authentication provided")
    
    105
    +
    
    106
    +            return None
    
    107
    +
    
    108
    +        return __authorize_wrapper
    
    109
    +
    
    110
    +    return __authorize_decorator
    
    111
    +
    
    112
    +
    
    70 113
     class AuthMetadataServerInterceptor(grpc.ServerInterceptor):
    
    71 114
     
    
    72 115
         __auth_errors = {
    
    73
    -        'missing-bearer': 'Missing authentication header field',
    
    74
    -        'invalid-bearer': 'Invalid authentication header field',
    
    75
    -        'invalid-token': 'Invalid authentication token',
    
    76
    -        'expired-token': 'Expired authentication token',
    
    77
    -        'unbounded-token': 'Unbounded authentication token',
    
    116
    +        'missing-bearer': "Missing authentication header field",
    
    117
    +        'invalid-bearer': "Invalid authentication header field",
    
    118
    +        'invalid-token': "Invalid authentication token",
    
    119
    +        'expired-token': "Expired authentication token",
    
    120
    +        'unbounded-token': "Unbounded authentication token",
    
    78 121
         }
    
    79 122
     
    
    80 123
         def __init__(self, method, secret=None, algorithm=AuthMetadataAlgorithm.UNSPECIFIED):
    

  • buildgrid/server/actioncache/service.py
    ... ... @@ -27,6 +27,7 @@ import grpc
    27 27
     from buildgrid._exceptions import InvalidArgumentError, NotFoundError
    
    28 28
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    29 29
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
    
    30
    +from buildgrid.server._authentication import AuthContext, authorize
    
    30 31
     
    
    31 32
     
    
    32 33
     class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer):
    
    ... ... @@ -38,9 +39,14 @@ class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer):
    38 39
     
    
    39 40
             remote_execution_pb2_grpc.add_ActionCacheServicer_to_server(self, server)
    
    40 41
     
    
    42
    +    # --- Public API ---
    
    43
    +
    
    41 44
         def add_instance(self, name, instance):
    
    42 45
             self._instances[name] = instance
    
    43 46
     
    
    47
    +    # --- Public API: Servicer ---
    
    48
    +
    
    49
    +    @authorize(AuthContext)
    
    44 50
         def GetActionResult(self, request, context):
    
    45 51
             self.__logger.debug("GetActionResult request from [%s]", context.peer())
    
    46 52
     
    
    ... ... @@ -59,6 +65,7 @@ class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer):
    59 65
     
    
    60 66
             return remote_execution_pb2.ActionResult()
    
    61 67
     
    
    68
    +    @authorize(AuthContext)
    
    62 69
         def UpdateActionResult(self, request, context):
    
    63 70
             self.__logger.debug("UpdateActionResult request from [%s]", context.peer())
    
    64 71
     
    
    ... ... @@ -78,6 +85,8 @@ class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer):
    78 85
     
    
    79 86
             return remote_execution_pb2.ActionResult()
    
    80 87
     
    
    88
    +    # --- Private API ---
    
    89
    +
    
    81 90
         def _get_instance(self, instance_name):
    
    82 91
             try:
    
    83 92
                 return self._instances[instance_name]
    

  • buildgrid/server/bots/instance.py
    ... ... @@ -83,7 +83,7 @@ class BotsInterface:
    83 83
             self._check_bot_ids(bot_session.bot_id, name)
    
    84 84
             self._check_assigned_leases(bot_session)
    
    85 85
     
    
    86
    -        for lease in bot_session.leases:
    
    86
    +        for lease in list(bot_session.leases):
    
    87 87
                 checked_lease = self._check_lease_state(lease)
    
    88 88
                 if not checked_lease:
    
    89 89
                     # TODO: Make sure we don't need this
    
    ... ... @@ -91,7 +91,10 @@ class BotsInterface:
    91 91
                         self._assigned_leases[name].remove(lease.id)
    
    92 92
                     except KeyError:
    
    93 93
                         pass
    
    94
    -                lease.Clear()
    
    94
    +
    
    95
    +                self._scheduler.delete_job_lease(lease.id)
    
    96
    +
    
    97
    +                bot_session.leases.remove(lease)
    
    95 98
     
    
    96 99
             self._request_leases(bot_session)
    
    97 100
             return bot_session
    
    ... ... @@ -117,7 +120,7 @@ class BotsInterface:
    117 120
     
    
    118 121
             try:
    
    119 122
                 if self._scheduler.get_job_lease_cancelled(lease.id):
    
    120
    -                lease.state.CopyFrom(LeaseState.CANCELLED.value)
    
    123
    +                lease.state = LeaseState.CANCELLED.value
    
    121 124
                     return lease
    
    122 125
             except KeyError:
    
    123 126
                 # Job does not exist, remove from bot.
    

  • buildgrid/server/bots/service.py
    ... ... @@ -29,6 +29,7 @@ from buildgrid._enums import BotStatus
    29 29
     from buildgrid._exceptions import InvalidArgumentError, OutOfSyncError
    
    30 30
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    31 31
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc
    
    32
    +from buildgrid.server._authentication import AuthContext, authorize
    
    32 33
     
    
    33 34
     
    
    34 35
     class BotsService(bots_pb2_grpc.BotsServicer):
    
    ... ... @@ -86,6 +87,7 @@ class BotsService(bots_pb2_grpc.BotsServicer):
    86 87
     
    
    87 88
         # --- Public API: Servicer ---
    
    88 89
     
    
    90
    +    @authorize(AuthContext)
    
    89 91
         def CreateBotSession(self, request, context):
    
    90 92
             """Handles CreateBotSessionRequest messages.
    
    91 93
     
    
    ... ... @@ -121,6 +123,7 @@ class BotsService(bots_pb2_grpc.BotsServicer):
    121 123
     
    
    122 124
             return bots_pb2.BotSession()
    
    123 125
     
    
    126
    +    @authorize(AuthContext)
    
    124 127
         def UpdateBotSession(self, request, context):
    
    125 128
             """Handles UpdateBotSessionRequest messages.
    
    126 129
     
    
    ... ... @@ -175,6 +178,7 @@ class BotsService(bots_pb2_grpc.BotsServicer):
    175 178
     
    
    176 179
             return bots_pb2.BotSession()
    
    177 180
     
    
    181
    +    @authorize(AuthContext)
    
    178 182
         def PostBotEventTemp(self, request, context):
    
    179 183
             """Handles PostBotEventTempRequest messages.
    
    180 184
     
    

  • buildgrid/server/capabilities/service.py
    ... ... @@ -19,15 +19,20 @@ import grpc
    19 19
     
    
    20 20
     from buildgrid._exceptions import InvalidArgumentError
    
    21 21
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    22
    +from buildgrid.server._authentication import AuthContext, authorize
    
    22 23
     
    
    23 24
     
    
    24 25
     class CapabilitiesService(remote_execution_pb2_grpc.CapabilitiesServicer):
    
    25 26
     
    
    26 27
         def __init__(self, server):
    
    27 28
             self.__logger = logging.getLogger(__name__)
    
    29
    +
    
    28 30
             self.__instances = {}
    
    31
    +
    
    29 32
             remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(self, server)
    
    30 33
     
    
    34
    +    # --- Public API ---
    
    35
    +
    
    31 36
         def add_instance(self, name, instance):
    
    32 37
             self.__instances[name] = instance
    
    33 38
     
    
    ... ... @@ -40,6 +45,9 @@ class CapabilitiesService(remote_execution_pb2_grpc.CapabilitiesServicer):
    40 45
         def add_execution_instance(self, name, instance):
    
    41 46
             self.__instances[name].add_execution_instance(instance)
    
    42 47
     
    
    48
    +    # --- Public API: Servicer ---
    
    49
    +
    
    50
    +    @authorize(AuthContext)
    
    43 51
         def GetCapabilities(self, request, context):
    
    44 52
             try:
    
    45 53
                 instance = self._get_instance(request.instance_name)
    
    ... ... @@ -52,6 +60,8 @@ class CapabilitiesService(remote_execution_pb2_grpc.CapabilitiesServicer):
    52 60
     
    
    53 61
             return remote_execution_pb2.ServerCapabilities()
    
    54 62
     
    
    63
    +    # --- Private API ---
    
    64
    +
    
    55 65
         def _get_instance(self, name):
    
    56 66
             try:
    
    57 67
                 return self.__instances[name]
    

  • buildgrid/server/cas/service.py
    ... ... @@ -29,6 +29,7 @@ from buildgrid._exceptions import InvalidArgumentError, NotFoundError, OutOfRang
    29 29
     from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    30 30
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    31 31
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
    
    32
    +from buildgrid.server._authentication import AuthContext, authorize
    
    32 33
     
    
    33 34
     
    
    34 35
     class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
    
    ... ... @@ -40,9 +41,14 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa
    40 41
     
    
    41 42
             remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(self, server)
    
    42 43
     
    
    44
    +    # --- Public API ---
    
    45
    +
    
    43 46
         def add_instance(self, name, instance):
    
    44 47
             self._instances[name] = instance
    
    45 48
     
    
    49
    +    # --- Public API: Servicer ---
    
    50
    +
    
    51
    +    @authorize(AuthContext)
    
    46 52
         def FindMissingBlobs(self, request, context):
    
    47 53
             self.__logger.debug("FindMissingBlobs request from [%s]", context.peer())
    
    48 54
     
    
    ... ... @@ -59,6 +65,7 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa
    59 65
     
    
    60 66
             return remote_execution_pb2.FindMissingBlobsResponse()
    
    61 67
     
    
    68
    +    @authorize(AuthContext)
    
    62 69
         def BatchUpdateBlobs(self, request, context):
    
    63 70
             self.__logger.debug("BatchUpdateBlobs request from [%s]", context.peer())
    
    64 71
     
    
    ... ... @@ -75,6 +82,7 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa
    75 82
     
    
    76 83
             return remote_execution_pb2.BatchReadBlobsResponse()
    
    77 84
     
    
    85
    +    @authorize(AuthContext)
    
    78 86
         def BatchReadBlobs(self, request, context):
    
    79 87
             self.__logger.debug("BatchReadBlobs request from [%s]", context.peer())
    
    80 88
     
    
    ... ... @@ -83,6 +91,7 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa
    83 91
     
    
    84 92
             return remote_execution_pb2.BatchReadBlobsResponse()
    
    85 93
     
    
    94
    +    @authorize(AuthContext)
    
    86 95
         def GetTree(self, request, context):
    
    87 96
             self.__logger.debug("GetTree request from [%s]", context.peer())
    
    88 97
     
    
    ... ... @@ -97,6 +106,8 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa
    97 106
     
    
    98 107
                 yield remote_execution_pb2.GetTreeResponse()
    
    99 108
     
    
    109
    +    # --- Private API ---
    
    110
    +
    
    100 111
         def _get_instance(self, instance_name):
    
    101 112
             try:
    
    102 113
                 return self._instances[instance_name]
    
    ... ... @@ -114,9 +125,14 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
    114 125
     
    
    115 126
             bytestream_pb2_grpc.add_ByteStreamServicer_to_server(self, server)
    
    116 127
     
    
    128
    +    # --- Public API ---
    
    129
    +
    
    117 130
         def add_instance(self, name, instance):
    
    118 131
             self._instances[name] = instance
    
    119 132
     
    
    133
    +    # --- Public API: Servicer ---
    
    134
    +
    
    135
    +    @authorize(AuthContext)
    
    120 136
         def Read(self, request, context):
    
    121 137
             self.__logger.debug("Read request from [%s]", context.peer())
    
    122 138
     
    
    ... ... @@ -163,6 +179,7 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
    163 179
                 context.set_code(grpc.StatusCode.OUT_OF_RANGE)
    
    164 180
                 yield bytestream_pb2.ReadResponse()
    
    165 181
     
    
    182
    +    @authorize(AuthContext)
    
    166 183
         def Write(self, requests, context):
    
    167 184
             self.__logger.debug("Write request from [%s]", context.peer())
    
    168 185
     
    
    ... ... @@ -209,12 +226,15 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
    209 226
     
    
    210 227
             return bytestream_pb2.WriteResponse()
    
    211 228
     
    
    229
    +    @authorize(AuthContext)
    
    212 230
         def QueryWriteStatus(self, request, context):
    
    213 231
             context.set_code(grpc.StatusCode.UNIMPLEMENTED)
    
    214 232
             context.set_details('Method not implemented!')
    
    215 233
     
    
    216 234
             return bytestream_pb2.QueryWriteStatusResponse()
    
    217 235
     
    
    236
    +    # --- Private API ---
    
    237
    +
    
    218 238
         def _get_instance(self, instance_name):
    
    219 239
             try:
    
    220 240
                 return self._instances[instance_name]
    

  • buildgrid/server/execution/service.py
    ... ... @@ -29,6 +29,7 @@ import grpc
    29 29
     from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, CancelledError
    
    30 30
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
    
    31 31
     from buildgrid._protos.google.longrunning import operations_pb2
    
    32
    +from buildgrid.server._authentication import AuthContext, authorize
    
    32 33
     
    
    33 34
     
    
    34 35
     class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    
    ... ... @@ -81,6 +82,7 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    81 82
     
    
    82 83
         # --- Public API: Servicer ---
    
    83 84
     
    
    85
    +    @authorize(AuthContext)
    
    84 86
         def Execute(self, request, context):
    
    85 87
             """Handles ExecuteRequest messages.
    
    86 88
     
    
    ... ... @@ -139,6 +141,7 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    139 141
                 context.set_code(grpc.StatusCode.CANCELLED)
    
    140 142
                 yield operations_pb2.Operation()
    
    141 143
     
    
    144
    +    @authorize(AuthContext)
    
    142 145
         def WaitExecution(self, request, context):
    
    143 146
             """Handles WaitExecutionRequest messages.
    
    144 147
     
    

  • buildgrid/server/instance.py
    ... ... @@ -29,7 +29,8 @@ import janus
    29 29
     from buildgrid._enums import BotStatus, LogRecordLevel, MetricRecordDomain, MetricRecordType
    
    30 30
     from buildgrid._protos.buildgrid.v2 import monitoring_pb2
    
    31 31
     from buildgrid.server.actioncache.service import ActionCacheService
    
    32
    -from buildgrid.server._authentication import AuthMetadataMethod, AuthMetadataAlgorithm, AuthMetadataServerInterceptor
    
    32
    +from buildgrid.server._authentication import AuthMetadataMethod, AuthMetadataAlgorithm
    
    33
    +from buildgrid.server._authentication import AuthContext, AuthMetadataServerInterceptor
    
    33 34
     from buildgrid.server.bots.service import BotsService
    
    34 35
     from buildgrid.server.capabilities.instance import CapabilitiesInstance
    
    35 36
     from buildgrid.server.capabilities.service import CapabilitiesService
    
    ... ... @@ -78,16 +79,15 @@ class BuildGridServer:
    78 79
                 max_workers = (os.cpu_count() or 1) * 5
    
    79 80
     
    
    80 81
             self.__grpc_auth_interceptor = None
    
    82
    +
    
    81 83
             if auth_method != AuthMetadataMethod.NONE:
    
    82 84
                 self.__grpc_auth_interceptor = AuthMetadataServerInterceptor(
    
    83 85
                     method=auth_method, secret=auth_secret, algorithm=auth_algorithm)
    
    84
    -        self.__grpc_executor = futures.ThreadPoolExecutor(max_workers)
    
    85 86
     
    
    86
    -        if self.__grpc_auth_interceptor is not None:
    
    87
    -            self.__grpc_server = grpc.server(
    
    88
    -                self.__grpc_executor, interceptors=(self.__grpc_auth_interceptor,))
    
    89
    -        else:
    
    90
    -            self.__grpc_server = grpc.server(self.__grpc_executor)
    
    87
    +            AuthContext.interceptor = self.__grpc_auth_interceptor
    
    88
    +
    
    89
    +        self.__grpc_executor = futures.ThreadPoolExecutor(max_workers)
    
    90
    +        self.__grpc_server = grpc.server(self.__grpc_executor)
    
    91 91
     
    
    92 92
             self.__main_loop = asyncio.get_event_loop()
    
    93 93
     
    

  • buildgrid/server/job.py
    ... ... @@ -222,6 +222,13 @@ class Job:
    222 222
             if self._lease is not None:
    
    223 223
                 self.update_lease_state(LeaseState.CANCELLED)
    
    224 224
     
    
    225
    +    def delete_lease(self):
    
    226
    +        """Discard the job's :class:Lease."""
    
    227
    +        self.__worker_start_timestamp.Clear()
    
    228
    +        self.__worker_completed_timestamp.Clear()
    
    229
    +
    
    230
    +        self._lease = None
    
    231
    +
    
    225 232
         def update_operation_stage(self, stage):
    
    226 233
             """Operates a stage transition for the job's :class:Operation.
    
    227 234
     
    

  • buildgrid/server/operations/service.py
    ... ... @@ -27,6 +27,7 @@ from google.protobuf.empty_pb2 import Empty
    27 27
     
    
    28 28
     from buildgrid._exceptions import InvalidArgumentError
    
    29 29
     from buildgrid._protos.google.longrunning import operations_pb2_grpc, operations_pb2
    
    30
    +from buildgrid.server._authentication import AuthContext, authorize
    
    30 31
     
    
    31 32
     
    
    32 33
     class OperationsService(operations_pb2_grpc.OperationsServicer):
    
    ... ... @@ -51,6 +52,7 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
    51 52
     
    
    52 53
         # --- Public API: Servicer ---
    
    53 54
     
    
    55
    +    @authorize(AuthContext)
    
    54 56
         def GetOperation(self, request, context):
    
    55 57
             self.__logger.debug("GetOperation request from [%s]", context.peer())
    
    56 58
     
    
    ... ... @@ -74,6 +76,7 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
    74 76
     
    
    75 77
             return operations_pb2.Operation()
    
    76 78
     
    
    79
    +    @authorize(AuthContext)
    
    77 80
         def ListOperations(self, request, context):
    
    78 81
             self.__logger.debug("ListOperations request from [%s]", context.peer())
    
    79 82
     
    
    ... ... @@ -99,6 +102,7 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
    99 102
     
    
    100 103
             return operations_pb2.ListOperationsResponse()
    
    101 104
     
    
    105
    +    @authorize(AuthContext)
    
    102 106
         def DeleteOperation(self, request, context):
    
    103 107
             self.__logger.debug("DeleteOperation request from [%s]", context.peer())
    
    104 108
     
    
    ... ... @@ -118,6 +122,7 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
    118 122
     
    
    119 123
             return Empty()
    
    120 124
     
    
    125
    +    @authorize(AuthContext)
    
    121 126
         def CancelOperation(self, request, context):
    
    122 127
             self.__logger.debug("CancelOperation request from [%s]", context.peer())
    
    123 128
     
    

  • buildgrid/server/referencestorage/service.py
    ... ... @@ -20,6 +20,7 @@ import grpc
    20 20
     from buildgrid._exceptions import InvalidArgumentError, NotFoundError
    
    21 21
     from buildgrid._protos.buildstream.v2 import buildstream_pb2
    
    22 22
     from buildgrid._protos.buildstream.v2 import buildstream_pb2_grpc
    
    23
    +from buildgrid.server._authentication import AuthContext, authorize
    
    23 24
     
    
    24 25
     
    
    25 26
     class ReferenceStorageService(buildstream_pb2_grpc.ReferenceStorageServicer):
    
    ... ... @@ -31,9 +32,14 @@ class ReferenceStorageService(buildstream_pb2_grpc.ReferenceStorageServicer):
    31 32
     
    
    32 33
             buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server(self, server)
    
    33 34
     
    
    35
    +    # --- Public API ---
    
    36
    +
    
    34 37
         def add_instance(self, name, instance):
    
    35 38
             self._instances[name] = instance
    
    36 39
     
    
    40
    +    # --- Public API: Servicer ---
    
    41
    +
    
    42
    +    @authorize(AuthContext)
    
    37 43
         def GetReference(self, request, context):
    
    38 44
             self.__logger.debug("GetReference request from [%s]", context.peer())
    
    39 45
     
    
    ... ... @@ -55,6 +61,7 @@ class ReferenceStorageService(buildstream_pb2_grpc.ReferenceStorageServicer):
    55 61
     
    
    56 62
             return buildstream_pb2.GetReferenceResponse()
    
    57 63
     
    
    64
    +    @authorize(AuthContext)
    
    58 65
         def UpdateReference(self, request, context):
    
    59 66
             self.__logger.debug("UpdateReference request from [%s]", context.peer())
    
    60 67
     
    
    ... ... @@ -75,6 +82,7 @@ class ReferenceStorageService(buildstream_pb2_grpc.ReferenceStorageServicer):
    75 82
     
    
    76 83
             return buildstream_pb2.UpdateReferenceResponse()
    
    77 84
     
    
    85
    +    @authorize(AuthContext)
    
    78 86
         def Status(self, request, context):
    
    79 87
             self.__logger.debug("Status request from [%s]", context.peer())
    
    80 88
     
    
    ... ... @@ -90,6 +98,8 @@ class ReferenceStorageService(buildstream_pb2_grpc.ReferenceStorageServicer):
    90 98
     
    
    91 99
             return buildstream_pb2.StatusResponse()
    
    92 100
     
    
    101
    +    # --- Private API ---
    
    102
    +
    
    93 103
         def _get_instance(self, instance_name):
    
    94 104
             try:
    
    95 105
                 return self._instances[instance_name]
    

  • buildgrid/server/scheduler.py
    ... ... @@ -62,18 +62,8 @@ class Scheduler:
    62 62
     
    
    63 63
             job.unregister_client(queue)
    
    64 64
     
    
    65
    -        if not job.n_clients and job.operation.done:
    
    66
    -            del self.jobs[job_name]
    
    67
    -
    
    68
    -            if self._is_instrumented:
    
    69
    -                self.__operations_by_stage[OperationStage.CACHE_CHECK].discard(job_name)
    
    70
    -                self.__operations_by_stage[OperationStage.QUEUED].discard(job_name)
    
    71
    -                self.__operations_by_stage[OperationStage.EXECUTING].discard(job_name)
    
    72
    -                self.__operations_by_stage[OperationStage.COMPLETED].discard(job_name)
    
    73
    -
    
    74
    -                self.__leases_by_state[LeaseState.PENDING].discard(job_name)
    
    75
    -                self.__leases_by_state[LeaseState.ACTIVE].discard(job_name)
    
    76
    -                self.__leases_by_state[LeaseState.COMPLETED].discard(job_name)
    
    65
    +        if not job.n_clients and job.operation.done and not job.lease:
    
    66
    +            self._delete_job(job.name)
    
    77 67
     
    
    78 68
         def queue_job(self, job, skip_cache_lookup=False):
    
    79 69
             self.jobs[job.name] = job
    
    ... ... @@ -199,6 +189,15 @@ class Scheduler:
    199 189
             """Returns true if the lease is cancelled"""
    
    200 190
             return self.jobs[job_name].lease_cancelled
    
    201 191
     
    
    192
    +    def delete_job_lease(self, job_name):
    
    193
    +        """Discards the lease associated to a job."""
    
    194
    +        job = self.jobs[job_name]
    
    195
    +
    
    196
    +        self.jobs[job.name].delete_lease()
    
    197
    +
    
    198
    +        if not job.n_clients and job.operation.done:
    
    199
    +            self._delete_job(job.name)
    
    200
    +
    
    202 201
         def get_job_operation(self, job_name):
    
    203 202
             """Returns the operation associated to job."""
    
    204 203
             return self.jobs[job_name].operation
    
    ... ... @@ -296,6 +295,20 @@ class Scheduler:
    296 295
     
    
    297 296
         # --- Private API ---
    
    298 297
     
    
    298
    +    def _delete_job(self, job_name):
    
    299
    +        """Drops an entry from the internal list of jobs."""
    
    300
    +        del self.jobs[job_name]
    
    301
    +
    
    302
    +        if self._is_instrumented:
    
    303
    +            self.__operations_by_stage[OperationStage.CACHE_CHECK].discard(job_name)
    
    304
    +            self.__operations_by_stage[OperationStage.QUEUED].discard(job_name)
    
    305
    +            self.__operations_by_stage[OperationStage.EXECUTING].discard(job_name)
    
    306
    +            self.__operations_by_stage[OperationStage.COMPLETED].discard(job_name)
    
    307
    +
    
    308
    +            self.__leases_by_state[LeaseState.PENDING].discard(job_name)
    
    309
    +            self.__leases_by_state[LeaseState.ACTIVE].discard(job_name)
    
    310
    +            self.__leases_by_state[LeaseState.COMPLETED].discard(job_name)
    
    311
    +
    
    299 312
         def _update_job_operation_stage(self, job_name, operation_stage):
    
    300 313
             """Requests a stage transition for the job's :class:Operations.
    
    301 314
     
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]