[Notes] [Git][BuildGrid/buildgrid][mablanch/75-requests-multiplexing] 6 commits: job.py: Add an integer priority attribute



Title: GitLab

Martin Blanchard pushed to branch mablanch/75-requests-multiplexing at BuildGrid / buildgrid

Commits:

8 changed files:

Changes:

  • buildgrid/server/bots/instance.py
    ... ... @@ -123,7 +123,7 @@ class BotsInterface:
    123 123
                 # Job does not exist, remove from bot.
    
    124 124
                 return None
    
    125 125
     
    
    126
    -        self._scheduler.update_job_lease(lease)
    
    126
    +        self._scheduler.update_job_lease(lease.id, lease)
    
    127 127
     
    
    128 128
             if lease_state == LeaseState.COMPLETED:
    
    129 129
                 return None
    

  • buildgrid/server/execution/instance.py
    ... ... @@ -21,11 +21,9 @@ An instance of the Remote Execution Service.
    21 21
     
    
    22 22
     import logging
    
    23 23
     
    
    24
    -from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
    
    24
    +from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, NotFoundError
    
    25 25
     from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
    
    26
    -
    
    27
    -from ..job import Job
    
    28
    -from ...utils import get_hash_type
    
    26
    +from buildgrid.utils import get_hash_type
    
    29 27
     
    
    30 28
     
    
    31 29
     class ExecutionInstance:
    
    ... ... @@ -46,44 +44,45 @@ class ExecutionInstance:
    46 44
         def hash_type(self):
    
    47 45
             return get_hash_type()
    
    48 46
     
    
    49
    -    def execute(self, action_digest, skip_cache_lookup, message_queue=None):
    
    47
    +    def execute(self, action_digest, skip_cache_lookup, peer):
    
    50 48
             """ Sends a job for execution.
    
    51 49
             Queues an action and creates an Operation instance to be associated with
    
    52 50
             this action.
    
    53 51
             """
    
    54
    -
    
    55 52
             action = self._storage.get_message(action_digest, Action)
    
    56 53
     
    
    57 54
             if not action:
    
    58 55
                 raise FailedPreconditionError("Could not get action from storage.")
    
    59 56
     
    
    60
    -        job = Job(action, action_digest)
    
    61
    -        if message_queue is not None:
    
    62
    -            job.register_client(message_queue)
    
    57
    +        return self._scheduler.queue_job(peer, action, action_digest, skip_cache_lookup)
    
    63 58
     
    
    64
    -        self._scheduler.queue_job(job, skip_cache_lookup)
    
    59
    +    def register_operation_client(self, operation_name, peer, message_queue):
    
    60
    +        try:
    
    61
    +            self._scheduler.register_operation_client(operation_name,
    
    62
    +                                                      peer, message_queue)
    
    65 63
     
    
    66
    -        return job.operation
    
    64
    +        except NotFoundError:
    
    65
    +            raise InvalidArgumentError("Operation name does not exist: [{}]"
    
    66
    +                                       .format(operation_name))
    
    67 67
     
    
    68
    -    def register_message_client(self, name, queue):
    
    68
    +    def unregister_operation_client(self, operation_name, peer):
    
    69 69
             try:
    
    70
    -            self._scheduler.register_client(name, queue)
    
    70
    +            self._scheduler.unregister_operation_client(operation_name, peer)
    
    71 71
     
    
    72
    -        except KeyError:
    
    73
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    72
    +        except NotFoundError:
    
    73
    +            raise InvalidArgumentError("Operation name does not exist: [{}]"
    
    74
    +                                       .format(operation_name))
    
    74 75
     
    
    75
    -    def unregister_message_client(self, name, queue):
    
    76
    -        try:
    
    77
    -            self._scheduler.unregister_client(name, queue)
    
    76
    +    def stream_operation_updates(self, message_queue):
    
    77
    +        error, operation = message_queue.get()
    
    78
    +        if error is not None:
    
    79
    +            raise error
    
    78 80
     
    
    79
    -        except KeyError:
    
    80
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    81
    +        while not operation.done:
    
    82
    +            yield operation
    
    81 83
     
    
    82
    -    def stream_operation_updates(self, message_queue, operation_name):
    
    83
    -        job = message_queue.get()
    
    84
    -        while not job.operation.done:
    
    85
    -            yield job.operation
    
    86
    -            job = message_queue.get()
    
    87
    -            job.check_operation_status()
    
    84
    +            error, operation = message_queue.get()
    
    85
    +            if error is not None:
    
    86
    +                raise error
    
    88 87
     
    
    89
    -        yield job.operation
    88
    +        yield operation

  • buildgrid/server/execution/service.py
    ... ... @@ -96,12 +96,15 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    96 96
     
    
    97 97
             try:
    
    98 98
                 instance = self._get_instance(instance_name)
    
    99
    -            operation = instance.execute(request.action_digest,
    
    100
    -                                         request.skip_cache_lookup,
    
    101
    -                                         message_queue)
    
    99
    +
    
    100
    +            operation_name = instance.execute(request.action_digest,
    
    101
    +                                              request.skip_cache_lookup)
    
    102
    +
    
    103
    +            instance.register_operation_client(operation_name,
    
    104
    +                                               peer, message_queue)
    
    102 105
     
    
    103 106
                 context.add_callback(partial(self._rpc_termination_callback,
    
    104
    -                                         peer, instance_name, operation.name, message_queue))
    
    107
    +                                         peer, instance_name, operation_name))
    
    105 108
     
    
    106 109
                 if self._is_instrumented:
    
    107 110
                     if peer not in self.__peers:
    
    ... ... @@ -110,16 +113,13 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    110 113
                     else:
    
    111 114
                         self.__peers[peer] += 1
    
    112 115
     
    
    113
    -            instanced_op_name = "{}/{}".format(instance_name, operation.name)
    
    116
    +            operation_full_name = "{}/{}".format(instance_name, operation_name)
    
    114 117
     
    
    115
    -            self.__logger.info("Operation name: [%s]", instanced_op_name)
    
    118
    +            self.__logger.info("Operation name: [%s]", operation_full_name)
    
    116 119
     
    
    117
    -            for operation in instance.stream_operation_updates(message_queue,
    
    118
    -                                                               operation.name):
    
    119
    -                op = operations_pb2.Operation()
    
    120
    -                op.CopyFrom(operation)
    
    121
    -                op.name = instanced_op_name
    
    122
    -                yield op
    
    120
    +            for operation in instance.stream_operation_updates(message_queue):
    
    121
    +                operation.name = operation_full_name
    
    122
    +                yield operation
    
    123 123
     
    
    124 124
             except InvalidArgumentError as e:
    
    125 125
                 self.__logger.error(e)
    
    ... ... @@ -151,15 +151,18 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    151 151
             names = request.name.split('/')
    
    152 152
             instance_name = '/'.join(names[:-1])
    
    153 153
             operation_name = names[-1]
    
    154
    +        operation_full_name = request.name
    
    154 155
             message_queue = queue.Queue()
    
    155 156
             peer = context.peer()
    
    156 157
     
    
    157 158
             try:
    
    158 159
                 instance = self._get_instance(instance_name)
    
    159 160
     
    
    160
    -            instance.register_message_client(operation_name, message_queue)
    
    161
    +            instance.register_operation_client(operation_name,
    
    162
    +                                               peer, message_queue)
    
    163
    +
    
    161 164
                 context.add_callback(partial(self._rpc_termination_callback,
    
    162
    -                                         peer, instance_name, operation_name, message_queue))
    
    165
    +                                         peer, instance_name, operation_name))
    
    163 166
     
    
    164 167
                 if self._is_instrumented:
    
    165 168
                     if peer not in self.__peers:
    
    ... ... @@ -168,12 +171,9 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    168 171
                     else:
    
    169 172
                         self.__peers[peer] += 1
    
    170 173
     
    
    171
    -            for operation in instance.stream_operation_updates(message_queue,
    
    172
    -                                                               operation_name):
    
    173
    -                op = operations_pb2.Operation()
    
    174
    -                op.CopyFrom(operation)
    
    175
    -                op.name = request.name
    
    176
    -                yield op
    
    174
    +            for operation in instance.stream_operation_updates(message_queue):
    
    175
    +                operation.name = operation_full_name
    
    176
    +                yield operation
    
    177 177
     
    
    178 178
             except InvalidArgumentError as e:
    
    179 179
                 self.__logger.error(e)
    
    ... ... @@ -208,10 +208,10 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    208 208
     
    
    209 209
         # --- Private API ---
    
    210 210
     
    
    211
    -    def _rpc_termination_callback(self, peer, instance_name, job_name, message_queue):
    
    211
    +    def _rpc_termination_callback(self, peer, instance_name, operation_name):
    
    212 212
             instance = self._get_instance(instance_name)
    
    213 213
     
    
    214
    -        instance.unregister_message_client(job_name, message_queue)
    
    214
    +        instance.unregister_operation_client(operation_name, peer)
    
    215 215
     
    
    216 216
             if self._is_instrumented:
    
    217 217
                 if self.__peers[peer] > 1:
    

  • buildgrid/server/job.py
    ... ... @@ -29,12 +29,13 @@ from buildgrid._protos.google.rpc import code_pb2
    29 29
     
    
    30 30
     class Job:
    
    31 31
     
    
    32
    -    def __init__(self, action, action_digest):
    
    32
    +    def __init__(self, action, action_digest, priority=0):
    
    33 33
             self.__logger = logging.getLogger(__name__)
    
    34 34
     
    
    35 35
             self._name = str(uuid.uuid4())
    
    36
    +        self._priority = priority
    
    36 37
             self._action = remote_execution_pb2.Action()
    
    37
    -        self._operation = operations_pb2.Operation()
    
    38
    +        self._operations = []
    
    38 39
             self._lease = None
    
    39 40
     
    
    40 41
             self.__execute_response = None
    
    ... ... @@ -45,25 +46,38 @@ class Job:
    45 46
             self.__worker_start_timestamp = timestamp_pb2.Timestamp()
    
    46 47
             self.__worker_completed_timestamp = timestamp_pb2.Timestamp()
    
    47 48
     
    
    48
    -        self.__operation_cancelled = False
    
    49
    +        self.__operations_by_name = {}
    
    50
    +        self.__operations_by_peer = {}
    
    51
    +        self.__operations_message_queues = {}
    
    52
    +        self.__operations_cancelled = []
    
    49 53
             self.__lease_cancelled = False
    
    54
    +        self.__job_cancelled = False
    
    50 55
     
    
    51 56
             self.__operation_metadata.action_digest.CopyFrom(action_digest)
    
    52 57
             self.__operation_metadata.stage = OperationStage.UNKNOWN.value
    
    53 58
     
    
    54 59
             self._action.CopyFrom(action)
    
    55 60
             self._do_not_cache = self._action.do_not_cache
    
    56
    -        self._operation_update_queues = []
    
    57
    -        self._operation.name = self._name
    
    58
    -        self._operation.done = False
    
    59 61
             self._n_tries = 0
    
    60 62
     
    
    63
    +    def __eq__(self, other):
    
    64
    +        if isinstance(other, Job):
    
    65
    +            return self.name == other.name
    
    66
    +        return False
    
    67
    +
    
    68
    +    def __ne__(self, other):
    
    69
    +        return not self.__eq__(other)
    
    70
    +
    
    61 71
         # --- Public API ---
    
    62 72
     
    
    63 73
         @property
    
    64 74
         def name(self):
    
    65 75
             return self._name
    
    66 76
     
    
    77
    +    @property
    
    78
    +    def priority(self):
    
    79
    +        return self._priority
    
    80
    +
    
    67 81
         @property
    
    68 82
         def do_not_cache(self):
    
    69 83
             return self._do_not_cache
    
    ... ... @@ -98,6 +112,9 @@ class Job:
    98 112
         def operation_stage(self):
    
    99 113
             return OperationStage(self.__operation_metadata.state)
    
    100 114
     
    
    115
    +    def operations(self):
    
    116
    +        return self._operations
    
    117
    +
    
    101 118
         @property
    
    102 119
         def lease(self):
    
    103 120
             return self._lease
    
    ... ... @@ -119,26 +136,52 @@ class Job:
    119 136
     
    
    120 137
         @property
    
    121 138
         def n_clients(self):
    
    122
    -        return len(self._operation_update_queues)
    
    139
    +        return len(self.__operations_message_queues)
    
    123 140
     
    
    124
    -    def register_client(self, queue):
    
    125
    -        """Subscribes to the job's :class:`Operation` stage change events.
    
    141
    +    # --- Public API: REAPI-side ---
    
    142
    +
    
    143
    +    def register_operation_client(self, peer, message_queue):
    
    144
    +        """Subscribes to the job's :class:`Operation` stage changes.
    
    126 145
     
    
    127 146
             Queues this :object:`Job` instance.
    
    128 147
     
    
    129 148
             Args:
    
    130
    -            queue (queue.Queue): the event queue to register.
    
    149
    +            peer (str): a unique string identifying the client.
    
    150
    +            message_queue (queue.Queue): the event queue to register.
    
    151
    +
    
    152
    +        Return:
    
    153
    +            str: the subscribed operation's name.
    
    131 154
             """
    
    132
    -        self._operation_update_queues.append(queue)
    
    133
    -        queue.put(self)
    
    155
    +        if peer not in self.__operations_message_queues:
    
    156
    +            self.__operations_message_queues[peer] = message_queue
    
    157
    +        elif self.__operations_message_queues[peer] != message_queue:
    
    158
    +            self.__operations_message_queues[peer] = message_queue
    
    159
    +
    
    160
    +        if peer not in self.__operations_by_peer:
    
    161
    +            operation = operations_pb2.Operation()
    
    162
    +            operation.name = str(uuid.uuid4())
    
    163
    +            operation.done = False
    
    164
    +
    
    165
    +            self.__operations_by_name[operation.name] = operation
    
    166
    +            self.__operations_by_peer[peer] = operation
    
    167
    +            self._operations.append(operation)
    
    168
    +
    
    169
    +            send_operation_update = True
    
    134 170
     
    
    135
    -    def unregister_client(self, queue):
    
    136
    -        """Unsubscribes to the job's :class:`Operation` stage change events.
    
    171
    +        if send_operation_update:
    
    172
    +            message = (None, self._copy_operation(self._operation),)
    
    173
    +            message_queue.put(message)
    
    174
    +
    
    175
    +        return self._operation.name
    
    176
    +
    
    177
    +    def unregister_operation_client(self, peer):
    
    178
    +        """Unsubscribes to the job's :class:`Operation` stage change.
    
    137 179
     
    
    138 180
             Args:
    
    139
    -            queue (queue.Queue): the event queue to unregister.
    
    181
    +            peer (str): a unique string identifying the client.
    
    140 182
             """
    
    141
    -        self._operation_update_queues.remove(queue)
    
    183
    +        if peer not in self.__operations_message_queues:
    
    184
    +            del self.__operations_message_queues[peer]
    
    142 185
     
    
    143 186
         def set_cached_result(self, action_result):
    
    144 187
             """Allows specifying an action result form the action cache for the job.
    
    ... ... @@ -147,6 +190,50 @@ class Job:
    147 190
             self.__execute_response.result.CopyFrom(action_result)
    
    148 191
             self.__execute_response.cached_result = True
    
    149 192
     
    
    193
    +    def update_operation_stage(self, stage):
    
    194
    +        """Operates a stage transition for the job's :class:Operation.
    
    195
    +
    
    196
    +        Args:
    
    197
    +            stage (OperationStage): the operation stage to transition to.
    
    198
    +        """
    
    199
    +        if stage.value == self.__operation_metadata.stage:
    
    200
    +            return
    
    201
    +
    
    202
    +        self.__operation_metadata.stage = stage.value
    
    203
    +
    
    204
    +        if self.__operation_metadata.stage == OperationStage.QUEUED.value:
    
    205
    +            if self.__queued_timestamp.ByteSize() == 0:
    
    206
    +                self.__queued_timestamp.GetCurrentTime()
    
    207
    +            self._n_tries += 1
    
    208
    +
    
    209
    +        self._update_operations()
    
    210
    +
    
    211
    +    def cancel_operation(self):
    
    212
    +        """Triggers a job's :class:Operation cancellation.
    
    213
    +
    
    214
    +        This will also cancel any job's :class:Lease that may have been issued.
    
    215
    +        """
    
    216
    +        self.__operation_cancelled = True
    
    217
    +        if self._lease is not None:
    
    218
    +            self.cancel_lease()
    
    219
    +
    
    220
    +        self.__execute_response = remote_execution_pb2.ExecuteResponse()
    
    221
    +        self.__execute_response.status.code = code_pb2.CANCELLED
    
    222
    +        self.__execute_response.status.message = "Operation cancelled by client."
    
    223
    +
    
    224
    +        self.update_operation_stage(OperationStage.COMPLETED)
    
    225
    +
    
    226
    +    def check_operation_status(self):
    
    227
    +        """Reports errors on unexpected job's :class:Operation state.
    
    228
    +
    
    229
    +        Raises:
    
    230
    +            CancelledError: if the job's :class:Operation was cancelled.
    
    231
    +        """
    
    232
    +        if self.__operation_cancelled:
    
    233
    +            raise CancelledError(self.__execute_response.status.message)
    
    234
    +
    
    235
    +    # --- Public API: RWAPI-side ---
    
    236
    +
    
    150 237
         def create_lease(self):
    
    151 238
             """Emits a new :class:`Lease` for the job.
    
    152 239
     
    
    ... ... @@ -222,44 +309,39 @@ class Job:
    222 309
             if self._lease is not None:
    
    223 310
                 self.update_lease_state(LeaseState.CANCELLED)
    
    224 311
     
    
    225
    -    def update_operation_stage(self, stage):
    
    226
    -        """Operates a stage transition for the job's :class:Operation.
    
    312
    +    # --- Private API ---
    
    227 313
     
    
    228
    -        Args:
    
    229
    -            stage (OperationStage): the operation stage to transition to.
    
    230
    -        """
    
    231
    -        if stage.value == self.__operation_metadata.stage:
    
    232
    -            return
    
    314
    +    def _copy_operation(self, operation):
    
    315
    +        new_operation = operations_pb2.Operation()
    
    233 316
     
    
    234
    -        self.__operation_metadata.stage = stage.value
    
    317
    +        new_operation.CopyFrom(operation)
    
    235 318
     
    
    236
    -        if self.__operation_metadata.stage == OperationStage.QUEUED.value:
    
    237
    -            if self.__queued_timestamp.ByteSize() == 0:
    
    238
    -                self.__queued_timestamp.GetCurrentTime()
    
    239
    -            self._n_tries += 1
    
    319
    +        return new_operation
    
    240 320
     
    
    241
    -        elif self.__operation_metadata.stage == OperationStage.EXECUTING.value:
    
    242
    -            queue_in, queue_out = self.__queued_timestamp.ToDatetime(), datetime.now()
    
    243
    -            self.__queued_time_duration.FromTimedelta(queue_out - queue_in)
    
    321
    +    def _update_operations(self, restricted_peers_to_notify=None):
    
    322
    +        for operation in self._operations:
    
    323
    +            if operation.name not in self.__operations_by_name:
    
    324
    +                continue
    
    244 325
     
    
    245
    -        elif self.__operation_metadata.stage == OperationStage.COMPLETED.value:
    
    246
    -            if self.__execute_response is not None:
    
    247
    -                self._operation.response.Pack(self.__execute_response)
    
    248
    -            self._operation.done = True
    
    326
    +            elif self.__operation_metadata.stage == OperationStage.EXECUTING.value:
    
    327
    +                queue_in, queue_out = self.__queued_timestamp.ToDatetime(), datetime.now()
    
    328
    +                self.__queued_time_duration.FromTimedelta(queue_out - queue_in)
    
    249 329
     
    
    250
    -        self._operation.metadata.Pack(self.__operation_metadata)
    
    330
    +            elif self.__operation_metadata.stage == OperationStage.COMPLETED.value:
    
    331
    +                if self.__execute_response is not None:
    
    332
    +                    self._operation.response.Pack(self.__execute_response)
    
    333
    +                self._operation.done = True
    
    251 334
     
    
    252
    -        for queue in self._operation_update_queues:
    
    253
    -            queue.put(self)
    
    335
    +            operation.metadata.Pack(self.__operation_metadata)
    
    254 336
     
    
    255
    -    def check_operation_status(self):
    
    256
    -        """Reports errors on unexpected job's :class:Operation state.
    
    337
    +        if not self.__operation_cancelled:
    
    338
    +            message = (None, self._copy_operation(self._operation),)
    
    339
    +        else:
    
    340
    +            message = (CancelledError(self.__execute_response.status.message),
    
    341
    +                       self._copy_operation(self._operation),)
    
    257 342
     
    
    258
    -        Raises:
    
    259
    -            CancelledError: if the job's :class:Operation was cancelled.
    
    260
    -        """
    
    261
    -        if self.__operation_cancelled:
    
    262
    -            raise CancelledError(self.__execute_response.status.message)
    
    343
    +        for message_queue in self.__operation_message_queues.values():
    
    344
    +            message_queue.put(message)
    
    263 345
     
    
    264 346
         def cancel_operation(self):
    
    265 347
             """Triggers a job's :class:Operation cancellation.
    
    ... ... @@ -283,3 +365,12 @@ class Job:
    283 365
     
    
    284 366
         def query_n_retries(self):
    
    285 367
             return self._n_tries - 1 if self._n_tries > 0 else 0
    
    368
    +
    
    369
    +    # --- Private API ---
    
    370
    +
    
    371
    +    def _copy_operation(self, operation):
    
    372
    +        new_operation = operations_pb2.Operation()
    
    373
    +
    
    374
    +        new_operation.CopyFrom(operation)
    
    375
    +
    
    376
    +        return new_operation

  • buildgrid/server/operations/instance.py
    ... ... @@ -21,7 +21,7 @@ An instance of the LongRunningOperations Service.
    21 21
     
    
    22 22
     import logging
    
    23 23
     
    
    24
    -from buildgrid._exceptions import InvalidArgumentError
    
    24
    +from buildgrid._exceptions import InvalidArgumentError, NotFoundError
    
    25 25
     from buildgrid._protos.google.longrunning import operations_pb2
    
    26 26
     
    
    27 27
     
    
    ... ... @@ -39,14 +39,14 @@ class OperationsInstance:
    39 39
         def register_instance_with_server(self, instance_name, server):
    
    40 40
             server.add_operations_instance(self, instance_name)
    
    41 41
     
    
    42
    -    def get_operation(self, name):
    
    43
    -        job = self._scheduler.jobs.get(name)
    
    42
    +    def get_operation(self, job_name):
    
    43
    +        try:
    
    44
    +            operation = self._scheduler.get_job_operation(job_name)
    
    44 45
     
    
    45
    -        if job is None:
    
    46
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    46
    +        except NotFoundError:
    
    47
    +            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
    
    47 48
     
    
    48
    -        else:
    
    49
    -            return job.operation
    
    49
    +        return operation
    
    50 50
     
    
    51 51
         def list_operations(self, list_filter, page_size, page_token):
    
    52 52
             # TODO: Pages
    
    ... ... @@ -62,39 +62,17 @@ class OperationsInstance:
    62 62
     
    
    63 63
             return response
    
    64 64
     
    
    65
    -    def delete_operation(self, name):
    
    65
    +    def delete_operation(self, job_name):
    
    66 66
             try:
    
    67
    -            self._scheduler.jobs.pop(name)
    
    67
    +            # TODO: Unregister the caller client
    
    68
    +            pass
    
    68 69
     
    
    69
    -        except KeyError:
    
    70
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    70
    +        except NotFoundError:
    
    71
    +            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
    
    71 72
     
    
    72
    -    def cancel_operation(self, name):
    
    73
    +    def cancel_operation(self, job_name):
    
    73 74
             try:
    
    74
    -            self._scheduler.cancel_job_operation(name)
    
    75
    -
    
    76
    -        except KeyError:
    
    77
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    78
    -
    
    79
    -    def register_message_client(self, name, queue):
    
    80
    -        try:
    
    81
    -            self._scheduler.register_client(name, queue)
    
    82
    -
    
    83
    -        except KeyError:
    
    84
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    85
    -
    
    86
    -    def unregister_message_client(self, name, queue):
    
    87
    -        try:
    
    88
    -            self._scheduler.unregister_client(name, queue)
    
    89
    -
    
    90
    -        except KeyError:
    
    91
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    92
    -
    
    93
    -    def stream_operation_updates(self, message_queue, operation_name):
    
    94
    -        job = message_queue.get()
    
    95
    -        while not job.operation.done:
    
    96
    -            yield job.operation
    
    97
    -            job = message_queue.get()
    
    98
    -            job.check_operation_status()
    
    75
    +            self._scheduler.cancel_job_operation(job_name)
    
    99 76
     
    
    100
    -        yield job.operation
    77
    +        except NotFoundError:
    
    78
    +            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))

  • buildgrid/server/scheduler.py
    ... ... @@ -25,6 +25,7 @@ import logging
    25 25
     
    
    26 26
     from buildgrid._enums import LeaseState, OperationStage
    
    27 27
     from buildgrid._exceptions import NotFoundError
    
    28
    +from buildgrid.server.job import Job
    
    28 29
     
    
    29 30
     
    
    30 31
     class Scheduler:
    
    ... ... @@ -42,8 +43,11 @@ class Scheduler:
    42 43
             self.__retries_count = 0
    
    43 44
     
    
    44 45
             self._action_cache = action_cache
    
    45
    -        self.jobs = {}
    
    46
    -        self.queue = deque()
    
    46
    +
    
    47
    +        self.__jobs_by_action = {}
    
    48
    +        self.__jobs_by_name = {}
    
    49
    +
    
    50
    +        self.__queue = deque()
    
    47 51
     
    
    48 52
             self._is_instrumented = monitor
    
    49 53
     
    
    ... ... @@ -52,18 +56,49 @@ class Scheduler:
    52 56
     
    
    53 57
         # --- Public API ---
    
    54 58
     
    
    55
    -    def register_client(self, job_name, queue):
    
    56
    -        job = self.jobs[job_name]
    
    59
    +    def register_operation_client(self, operation_name, peer, message_queue):
    
    60
    +        """Subscribes to one of the job's :class:`Operation` stage changes.
    
    61
    +
    
    62
    +        Args:
    
    63
    +            operation_name (str): name of the operation to subscribe to.
    
    64
    +            peer (str): a unique string identifying the client.
    
    65
    +            message_queue (queue.Queue): the event queue to register.
    
    66
    +
    
    67
    +        Returns:
    
    68
    +            str: the subscribed operation's name.
    
    69
    +
    
    70
    +        Raises:
    
    71
    +            NotFoundError: If no operation with `operation_name` exists.
    
    72
    +        """
    
    73
    +        try:
    
    74
    +            job = self.__jobs_by_name[operation_name]
    
    75
    +        except KeyError:
    
    76
    +            raise NotFoundError("Job name does not exist: [{}]"
    
    77
    +                                .format(operation_name))
    
    78
    +
    
    79
    +        job.register_operation_client(peer, message_queue)
    
    57 80
     
    
    58
    -        job.register_client(queue)
    
    81
    +    def unregister_operation_client(self, operation_name, peer):
    
    82
    +        """Unsubscribes to one of the job's :class:`Operation` stage change.
    
    83
    +
    
    84
    +        Args:
    
    85
    +            operation_name (str): name of the operation to unsubscribe from.
    
    86
    +            peer (str): a unique string identifying the client.
    
    59 87
     
    
    60
    -    def unregister_client(self, job_name, queue):
    
    61
    -        job = self.jobs[job_name]
    
    88
    +        Raises:
    
    89
    +            NotFoundError: If no operation with `operation_name` exists.
    
    90
    +        """
    
    91
    +        try:
    
    92
    +            job = self.__jobs_by_name[operation_name]
    
    93
    +        except KeyError:
    
    94
    +            raise NotFoundError("Operation name does not exist: [{}]"
    
    95
    +                                .format(operation_name))
    
    62 96
     
    
    63
    -        job.unregister_client(queue)
    
    97
    +        job.unregister_operation_client(peer)
    
    64 98
     
    
    65
    -        if not job.n_clients and job.operation.done:
    
    66
    -            del self.jobs[job_name]
    
    99
    +        if job.n_clients == 0 and job.operation.done:
    
    100
    +            del self.__jobs_by_action[job.action_digest]
    
    101
    +            del self.__jobs_by_name[job.name]
    
    67 102
     
    
    68 103
                 if self._is_instrumented:
    
    69 104
                     self.__operations_by_stage[OperationStage.CACHE_CHECK].discard(job_name)
    
    ... ... @@ -75,16 +110,59 @@ class Scheduler:
    75 110
                     self.__leases_by_state[LeaseState.ACTIVE].discard(job_name)
    
    76 111
                     self.__leases_by_state[LeaseState.COMPLETED].discard(job_name)
    
    77 112
     
    
    78
    -    def queue_job(self, job, skip_cache_lookup=False):
    
    79
    -        self.jobs[job.name] = job
    
    113
    +
    
    114
    +    def queue_job(self, peer, action, action_digest, priority=0, skip_cache_lookup=False):
    
    115
    +        """Inserts a newly created job into the execution queue.
    
    116
    +
    
    117
    +        Args:
    
    118
    +            peer (str): a unique string identifying the client.
    
    119
    +            action (Action): the given action to queue for execution.
    
    120
    +            action_digest (Digest): the digest of the given action.
    
    121
    +            priority (int): the execution job's priority.
    
    122
    +            skip_cache_lookup (bool): whether or not to look for pre-computed
    
    123
    +                result for the given action.
    
    124
    +
    
    125
    +        Returns:
    
    126
    +            str: the newly created operation's name.
    
    127
    +        """
    
    128
    +        def __queue_job(jobs_queue, new_job):
    
    129
    +            index = 0
    
    130
    +            for queued_job in reversed(jobs_queue):
    
    131
    +                if new_job.priority < queued_job.priority:
    
    132
    +                    index += 1
    
    133
    +                else:
    
    134
    +                    break
    
    135
    +
    
    136
    +            index = len(jobs_queue) - index
    
    137
    +
    
    138
    +            jobs_queue.insert(index, new_job)
    
    139
    +
    
    140
    +        if action_digest.hash in self.__jobs_by_action:
    
    141
    +            job = self.__jobs_by_action[action_digest.hash]
    
    142
    +
    
    143
    +            # Reschdule if priority is now greater:
    
    144
    +            if priority < job.priority:
    
    145
    +                job.priority = priority
    
    146
    +
    
    147
    +                if job in self.__queue:
    
    148
    +                    self.__queue.remove(job)
    
    149
    +                    __queue_job(self.__queue, job)
    
    150
    +
    
    151
    +            return job.name
    
    152
    +
    
    153
    +        job = Job(action, action_digest, priority=priority)
    
    154
    +
    
    155
    +        self.__jobs_by_action[job.action_digest.hash] = job
    
    156
    +        self.__jobs_by_name[job.name] = job
    
    80 157
     
    
    81 158
             operation_stage = None
    
    82 159
             if self._action_cache is not None and not skip_cache_lookup:
    
    83 160
                 try:
    
    84 161
                     action_result = self._action_cache.get_action_result(job.action_digest)
    
    162
    +
    
    85 163
                 except NotFoundError:
    
    86 164
                     operation_stage = OperationStage.QUEUED
    
    87
    -                self.queue.append(job)
    
    165
    +                __queue_job(self.__queue, job)
    
    88 166
     
    
    89 167
                 else:
    
    90 168
                     job.set_cached_result(action_result)
    
    ... ... @@ -95,12 +173,17 @@ class Scheduler:
    95 173
     
    
    96 174
             else:
    
    97 175
                 operation_stage = OperationStage.QUEUED
    
    98
    -            self.queue.append(job)
    
    176
    +            __queue_job(self.__queue, job)
    
    99 177
     
    
    100 178
             self._update_job_operation_stage(job.name, operation_stage)
    
    101 179
     
    
    180
    +        return job.name
    
    181
    +
    
    102 182
         def retry_job(self, job_name):
    
    103
    -        job = self.jobs[job_name]
    
    183
    +        try:
    
    184
    +            job = self.__jobs_by_name[job_name]
    
    185
    +        except KeyError:
    
    186
    +            raise NotFoundError("Job name does not exist: [{}]".format(job_name))
    
    104 187
     
    
    105 188
             operation_stage = None
    
    106 189
             if job.n_tries >= self.MAX_N_TRIES:
    
    ... ... @@ -111,12 +194,12 @@ class Scheduler:
    111 194
             else:
    
    112 195
                 operation_stage = OperationStage.QUEUED
    
    113 196
                 job.update_lease_state(LeaseState.PENDING)
    
    114
    -            self.queue.append(job)
    
    197
    +            self.__queue.append(job)
    
    115 198
     
    
    116 199
             self._update_job_operation_stage(job_name, operation_stage)
    
    117 200
     
    
    118 201
         def list_jobs(self):
    
    119
    -        return self.jobs.values()
    
    202
    +        return self.__jobs_by_name.values()
    
    120 203
     
    
    121 204
         def request_job_leases(self, worker_capabilities):
    
    122 205
             """Generates a list of the highest priority leases to be run.
    
    ... ... @@ -126,10 +209,10 @@ class Scheduler:
    126 209
                     worker properties, configuration and state at the time of the
    
    127 210
                     request.
    
    128 211
             """
    
    129
    -        if not self.queue:
    
    212
    +        if not self.__queue:
    
    130 213
                 return []
    
    131 214
     
    
    132
    -        job = self.queue.popleft()
    
    215
    +        job = self.__queue.popleft()
    
    133 216
     
    
    134 217
             lease = job.lease
    
    135 218
     
    
    ... ... @@ -142,18 +225,21 @@ class Scheduler:
    142 225
     
    
    143 226
             return None
    
    144 227
     
    
    145
    -    def update_job_lease(self, lease):
    
    228
    +    def update_job_lease(self, job_name, lease):
    
    146 229
             """Requests a state transition for a job's current :class:Lease.
    
    147 230
     
    
    148 231
             Args:
    
    149 232
                 job_name (str): name of the job to query.
    
    150
    -            lease_state (LeaseState): the lease state to transition to.
    
    151
    -            lease_status (google.rpc.Status): the lease execution status, only
    
    152
    -                required if `lease_state` is `COMPLETED`.
    
    153
    -            lease_result (google.protobuf.Any): the lease execution result, only
    
    154
    -                required if `lease_state` is `COMPLETED`.
    
    233
    +            lease (Lease): the lease holding the new state.
    
    234
    +
    
    235
    +        Raises:
    
    236
    +            NotFoundError: If no job with `job_name` exists.
    
    155 237
             """
    
    156
    -        job = self.jobs[lease.id]
    
    238
    +        try:
    
    239
    +            job = self.__jobs_by_name[job_name]
    
    240
    +        except KeyError:
    
    241
    +            raise NotFoundError("Job name does not exist: [{}]".format(job_name))
    
    242
    +
    
    157 243
             lease_state = LeaseState(lease.state)
    
    158 244
     
    
    159 245
             operation_stage = None
    
    ... ... @@ -189,19 +275,55 @@ class Scheduler:
    189 275
                     self.__leases_by_state[LeaseState.ACTIVE].discard(lease.id)
    
    190 276
                     self.__leases_by_state[LeaseState.COMPLETED].add(lease.id)
    
    191 277
     
    
    192
    -        self._update_job_operation_stage(lease.id, operation_stage)
    
    278
    +        self._update_job_operation_stage(job_name, operation_stage)
    
    193 279
     
    
    194 280
         def get_job_lease(self, job_name):
    
    195
    -        """Returns the lease associated to job, if any have been emitted yet."""
    
    196
    -        return self.jobs[job_name].lease
    
    281
    +        """Returns the lease associated to job, if any have been emitted yet.
    
    282
    +
    
    283
    +        Args:
    
    284
    +            job_name (str): name of the job to query.
    
    285
    +
    
    286
    +        Raises:
    
    287
    +            NotFoundError: If no job with `job_name` exists.
    
    288
    +        """
    
    289
    +        try:
    
    290
    +            job = self.__jobs_by_name[job_name]
    
    291
    +        except KeyError:
    
    292
    +            raise NotFoundError("Job name does not exist: [{}]".format(job_name))
    
    293
    +
    
    294
    +        return job.lease
    
    197 295
     
    
    198 296
         def get_job_lease_cancelled(self, job_name):
    
    199
    -        """Returns true if the lease is cancelled"""
    
    200
    -        return self.jobs[job_name].lease_cancelled
    
    297
    +        """Returns true if the lease is cancelled.
    
    298
    +
    
    299
    +        Args:
    
    300
    +            job_name (str): name of the job to query.
    
    301
    +
    
    302
    +        Raises:
    
    303
    +            NotFoundError: If no job with `job_name` exists.
    
    304
    +        """
    
    305
    +        try:
    
    306
    +            job = self.__jobs_by_name[job_name]
    
    307
    +        except KeyError:
    
    308
    +            raise NotFoundError("Job name does not exist: [{}]".format(job_name))
    
    309
    +
    
    310
    +        return job.lease_cancelled
    
    201 311
     
    
    202 312
         def get_job_operation(self, job_name):
    
    203
    -        """Returns the operation associated to job."""
    
    204
    -        return self.jobs[job_name].operation
    
    313
    +        """Returns the operation associated to job.
    
    314
    +
    
    315
    +        Args:
    
    316
    +            job_name (str): name of the job to query.
    
    317
    +
    
    318
    +        Raises:
    
    319
    +            NotFoundError: If no job with `job_name` exists.
    
    320
    +        """
    
    321
    +        try:
    
    322
    +            job = self.__jobs_by_name[job_name]
    
    323
    +        except KeyError:
    
    324
    +            raise NotFoundError("Job name does not exist: [{}]".format(job_name))
    
    325
    +
    
    326
    +        return job.operation
    
    205 327
     
    
    206 328
         def cancel_job_operation(self, job_name):
    
    207 329
             """"Cancels the underlying operation of a given job.
    
    ... ... @@ -211,7 +333,12 @@ class Scheduler:
    211 333
             Args:
    
    212 334
                 job_name (str): name of the job holding the operation to cancel.
    
    213 335
             """
    
    214
    -        self.jobs[job_name].cancel_operation()
    
    336
    +        try:
    
    337
    +            job = self.__jobs_by_name[job_name]
    
    338
    +        except KeyError:
    
    339
    +            raise NotFoundError("Job name does not exist: [{}]".format(job_name))
    
    340
    +
    
    341
    +        job.cancel_operation()
    
    215 342
     
    
    216 343
         # --- Public API: Monitoring ---
    
    217 344
     
    
    ... ... @@ -261,11 +388,11 @@ class Scheduler:
    261 388
                 self.__build_metadata_queues.append(message_queue)
    
    262 389
     
    
    263 390
         def query_n_jobs(self):
    
    264
    -        return len(self.jobs)
    
    391
    +        return len(self.__jobs_by_name)
    
    265 392
     
    
    266 393
         def query_n_operations(self):
    
    267 394
             # For now n_operations == n_jobs:
    
    268
    -        return len(self.jobs)
    
    395
    +        return len(self.__jobs_by_name)
    
    269 396
     
    
    270 397
         def query_n_operations_by_stage(self, operation_stage):
    
    271 398
             try:
    
    ... ... @@ -276,7 +403,7 @@ class Scheduler:
    276 403
             return 0
    
    277 404
     
    
    278 405
         def query_n_leases(self):
    
    279
    -        return len(self.jobs)
    
    406
    +        return len(self.__jobs_by_name)
    
    280 407
     
    
    281 408
         def query_n_leases_by_state(self, lease_state):
    
    282 409
             try:
    
    ... ... @@ -303,7 +430,7 @@ class Scheduler:
    303 430
                 job_name (str): name of the job to query.
    
    304 431
                 operation_stage (OperationStage): the stage to transition to.
    
    305 432
             """
    
    306
    -        job = self.jobs[job_name]
    
    433
    +        job = self.__jobs_by_name[job_name]
    
    307 434
     
    
    308 435
             if operation_stage == OperationStage.CACHE_CHECK:
    
    309 436
                 job.update_operation_stage(OperationStage.CACHE_CHECK)
    

  • tests/integration/bots_service.py
    ... ... @@ -25,7 +25,6 @@ import pytest
    25 25
     
    
    26 26
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    27 27
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    28
    -from buildgrid.server import job
    
    29 28
     from buildgrid.server.controller import ExecutionController
    
    30 29
     from buildgrid.server.job import LeaseState
    
    31 30
     from buildgrid.server.bots import service
    
    ... ... @@ -159,7 +158,8 @@ def test_post_bot_event_temp(context, instance):
    159 158
     def _inject_work(scheduler, action=None, action_digest=None):
    
    160 159
         if not action:
    
    161 160
             action = remote_execution_pb2.Action()
    
    161
    +
    
    162 162
         if not action_digest:
    
    163 163
             action_digest = remote_execution_pb2.Digest()
    
    164
    -    j = job.Job(action, action_digest)
    
    165
    -    scheduler.queue_job(j, True)
    164
    +
    
    165
    +    scheduler.queue_job(action, action_digest, skip_cache_lookup=True)

  • tests/integration/execution_service.py
    ... ... @@ -106,13 +106,13 @@ def test_no_action_digest_in_storage(instance, context):
    106 106
     
    
    107 107
     
    
    108 108
     def test_wait_execution(instance, controller, context):
    
    109
    -    j = job.Job(action, action_digest)
    
    109
    +    j = controller.execution_instance._scheduler.queue_job(action,
    
    110
    +                                                           action_digest,
    
    111
    +                                                           skip_cache_lookup=True)
    
    110 112
         j._operation.done = True
    
    111 113
     
    
    112 114
         request = remote_execution_pb2.WaitExecutionRequest(name=j.name)
    
    113 115
     
    
    114
    -    controller.execution_instance._scheduler.jobs[j.name] = j
    
    115
    -
    
    116 116
         action_result_any = any_pb2.Any()
    
    117 117
         action_result = remote_execution_pb2.ActionResult()
    
    118 118
         action_result_any.Pack(action_result)
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]