[Notes] [Git][BuildGrid/buildgrid][mablanch/75-requests-multiplexing] 8 commits: Message queue now queues the job.



Title: GitLab

Martin Blanchard pushed to branch mablanch/75-requests-multiplexing at BuildGrid / buildgrid

Commits:

9 changed files:

Changes:

  • buildgrid/_app/commands/cmd_operation.py
    ... ... @@ -155,6 +155,25 @@ def status(context, operation_name, json):
    155 155
             click.echo(json_format.MessageToJson(operation))
    
    156 156
     
    
    157 157
     
    
    158
    +@cli.command('cancel', short_help="Cancel an operation.")
    
    159
    +@click.argument('operation-name', nargs=1, type=click.STRING, required=True)
    
    160
    +@pass_context
    
    161
    +def cancel(context, operation_name):
    
    162
    +    context.logger.info("Cancelling an operation...")
    
    163
    +    stub = operations_pb2_grpc.OperationsStub(context.channel)
    
    164
    +
    
    165
    +    request = operations_pb2.CancelOperationRequest(name=operation_name)
    
    166
    +
    
    167
    +    try:
    
    168
    +        stub.CancelOperation(request)
    
    169
    +    except grpc.RpcError as e:
    
    170
    +        status_code = e.code()
    
    171
    +        if status_code != grpc.StatusCode.CANCELLED:
    
    172
    +            raise
    
    173
    +
    
    174
    +    context.logger.info("Operation cancelled: [{}]".format(request))
    
    175
    +
    
    176
    +
    
    158 177
     @cli.command('list', short_help="List operations.")
    
    159 178
     @click.option('--json', is_flag=True, show_default=True,
    
    160 179
                   help="Print operations list in JSON format.")
    

  • buildgrid/server/execution/instance.py
    ... ... @@ -21,11 +21,9 @@ An instance of the Remote Execution Service.
    21 21
     
    
    22 22
     import logging
    
    23 23
     
    
    24
    -from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
    
    24
    +from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, NotFoundError
    
    25 25
     from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
    
    26 26
     
    
    27
    -from ..job import Job
    
    28
    -
    
    29 27
     
    
    30 28
     class ExecutionInstance:
    
    31 29
     
    
    ... ... @@ -37,7 +35,7 @@ class ExecutionInstance:
    37 35
         def register_instance_with_server(self, instance_name, server):
    
    38 36
             server.add_execution_instance(self, instance_name)
    
    39 37
     
    
    40
    -    def execute(self, action_digest, skip_cache_lookup, message_queue=None):
    
    38
    +    def execute(self, action_digest, skip_cache_lookup, peer, message_queue):
    
    41 39
             """ Sends a job for execution.
    
    42 40
             Queues an action and creates an Operation instance to be associated with
    
    43 41
             this action.
    
    ... ... @@ -48,31 +46,34 @@ class ExecutionInstance:
    48 46
             if not action:
    
    49 47
                 raise FailedPreconditionError("Could not get action from storage.")
    
    50 48
     
    
    51
    -        job = Job(action, action_digest)
    
    52
    -        if message_queue is not None:
    
    53
    -            job.register_client(message_queue)
    
    49
    +        job = self._scheduler.queue_job(peer, action, action_digest, skip_cache_lookup)
    
    54 50
     
    
    55
    -        self._scheduler.queue_job(job, skip_cache_lookup)
    
    51
    +        if peer is not None and message_queue is not None:
    
    52
    +            job.register_client(peer, message_queue)
    
    56 53
     
    
    57 54
             return job.operation
    
    58 55
     
    
    59
    -    def register_message_client(self, name, queue):
    
    56
    +    def register_message_client(self, job_name, peer, message_queue):
    
    60 57
             try:
    
    61
    -            self._scheduler.register_client(name, queue)
    
    58
    +            self._scheduler.register_client(job_name, peer, message_queue)
    
    62 59
     
    
    63
    -        except KeyError:
    
    64
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    60
    +        except NotFoundError:
    
    61
    +            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
    
    65 62
     
    
    66
    -    def unregister_message_client(self, name, queue):
    
    63
    +    def unregister_message_client(self, job_name, peer):
    
    67 64
             try:
    
    68
    -            self._scheduler.unregister_client(name, queue)
    
    65
    +            self._scheduler.unregister_client(job_name, peer)
    
    69 66
     
    
    70
    -        except KeyError:
    
    71
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    67
    +        except NotFoundError:
    
    68
    +            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
    
    72 69
     
    
    73 70
         def stream_operation_updates(self, message_queue, operation_name):
    
    74
    -        operation = message_queue.get()
    
    75
    -        while not operation.done:
    
    76
    -            yield operation
    
    77
    -            operation = message_queue.get()
    
    78
    -        yield operation
    71
    +        while True:
    
    72
    +            message = message_queue.get()
    
    73
    +            if isinstance(message, Exception):
    
    74
    +                raise message
    
    75
    +            elif message.done:
    
    76
    +                break
    
    77
    +            yield message
    
    78
    +
    
    79
    +        yield message

  • buildgrid/server/execution/service.py
    ... ... @@ -47,10 +47,11 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    47 47
                 instance = self._get_instance(request.instance_name)
    
    48 48
                 operation = instance.execute(request.action_digest,
    
    49 49
                                              request.skip_cache_lookup,
    
    50
    -                                         message_queue)
    
    50
    +                                         peer=context.peer(),
    
    51
    +                                         message_queue=message_queue)
    
    51 52
     
    
    52 53
                 context.add_callback(partial(instance.unregister_message_client,
    
    53
    -                                         operation.name, message_queue))
    
    54
    +                                         operation.name, context.peer()))
    
    54 55
     
    
    55 56
                 instanced_op_name = "{}/{}".format(request.instance_name,
    
    56 57
                                                    operation.name)
    
    ... ... @@ -94,10 +95,11 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    94 95
                 operation_name = names[-1]
    
    95 96
                 instance = self._get_instance(instance_name)
    
    96 97
     
    
    97
    -            instance.register_message_client(operation_name, message_queue)
    
    98
    +            instance.register_message_client(operation_name,
    
    99
    +                                             context.peer(), message_queue)
    
    98 100
     
    
    99 101
                 context.add_callback(partial(instance.unregister_message_client,
    
    100
    -                                         operation_name, message_queue))
    
    102
    +                                         operation_name, context.peer()))
    
    101 103
     
    
    102 104
                 for operation in instance.stream_operation_updates(message_queue,
    
    103 105
                                                                    operation_name):
    

  • buildgrid/server/job.py
    ... ... @@ -28,12 +28,13 @@ from buildgrid._protos.google.rpc import code_pb2
    28 28
     
    
    29 29
     class Job:
    
    30 30
     
    
    31
    -    def __init__(self, action, action_digest):
    
    31
    +    def __init__(self, action, action_digest, priority=0):
    
    32 32
             self.logger = logging.getLogger(__name__)
    
    33 33
     
    
    34 34
             self._name = str(uuid.uuid4())
    
    35
    +        self._priority = priority
    
    35 36
             self._action = remote_execution_pb2.Action()
    
    36
    -        self._operation = operations_pb2.Operation()
    
    37
    +        self._operations = []
    
    37 38
             self._lease = None
    
    38 39
     
    
    39 40
             self.__execute_response = None
    
    ... ... @@ -43,23 +44,38 @@ class Job:
    43 44
             self.__worker_start_timestamp = timestamp_pb2.Timestamp()
    
    44 45
             self.__worker_completed_timestamp = timestamp_pb2.Timestamp()
    
    45 46
     
    
    46
    -        self.__operation_cancelled = False
    
    47
    +        self.__operations_by_name = {}
    
    48
    +        self.__operations_by_peer = {}
    
    49
    +        self.__operations_message_queues = {}
    
    50
    +        self.__operations_cancelled = []
    
    47 51
             self.__lease_cancelled = False
    
    52
    +        self.__job_cancelled = False
    
    48 53
     
    
    49 54
             self.__operation_metadata.action_digest.CopyFrom(action_digest)
    
    50 55
             self.__operation_metadata.stage = OperationStage.UNKNOWN.value
    
    51 56
     
    
    52 57
             self._action.CopyFrom(action)
    
    53 58
             self._do_not_cache = self._action.do_not_cache
    
    54
    -        self._operation_update_queues = []
    
    55
    -        self._operation.name = self._name
    
    56
    -        self._operation.done = False
    
    57 59
             self._n_tries = 0
    
    58 60
     
    
    61
    +    def __eq__(self, other):
    
    62
    +        if isinstance(other, Job):
    
    63
    +            return self.name == other.name
    
    64
    +        return False
    
    65
    +
    
    66
    +    def __ne__(self, other):
    
    67
    +        return not self.__eq__(other)
    
    68
    +
    
    69
    +    # --- Public API ---
    
    70
    +
    
    59 71
         @property
    
    60 72
         def name(self):
    
    61 73
             return self._name
    
    62 74
     
    
    75
    +    @property
    
    76
    +    def priority(self):
    
    77
    +        return self._priority
    
    78
    +
    
    63 79
         @property
    
    64 80
         def do_not_cache(self):
    
    65 81
             return self._do_not_cache
    
    ... ... @@ -80,48 +96,57 @@ class Job:
    80 96
                 return None
    
    81 97
     
    
    82 98
         @property
    
    83
    -    def operation(self):
    
    84
    -        return self._operation
    
    85
    -
    
    86
    -    @property
    
    87
    -    def operation_stage(self):
    
    88
    -        return OperationStage(self.__operation_metadata.state)
    
    99
    +    def operations(self):
    
    100
    +        return self._operations
    
    89 101
     
    
    90 102
         @property
    
    91 103
         def lease(self):
    
    92 104
             return self._lease
    
    93 105
     
    
    94
    -    @property
    
    95
    -    def lease_state(self):
    
    96
    -        if self._lease is not None:
    
    97
    -            return LeaseState(self._lease.state)
    
    98
    -        else:
    
    99
    -            return None
    
    100
    -
    
    101 106
         @property
    
    102 107
         def n_tries(self):
    
    103 108
             return self._n_tries
    
    104 109
     
    
    105 110
         @property
    
    106 111
         def n_clients(self):
    
    107
    -        return len(self._operation_update_queues)
    
    112
    +        return len(self.__operations_message_queues)
    
    113
    +
    
    114
    +    # --- Public API: REAPI-side ---
    
    108 115
     
    
    109
    -    def register_client(self, queue):
    
    110
    -        """Subscribes to the job's :class:`Operation` stage change events.
    
    116
    +    def register_client(self, peer, message_queue):
    
    117
    +        """Subscribes to the job's :class:`Operation` stage changes.
    
    111 118
     
    
    112 119
             Args:
    
    113
    -            queue (queue.Queue): the event queue to register.
    
    120
    +            peer (str): a unique string identifying the client.
    
    121
    +            message_queue (queue.Queue): the event queue to register.
    
    114 122
             """
    
    115
    -        self._operation_update_queues.append(queue)
    
    116
    -        queue.put(self._operation)
    
    123
    +        if peer not in self.__operations_message_queues:
    
    124
    +            self.__operations_message_queues[peer] = message_queue
    
    125
    +        elif self.__operations_message_queues[peer] != message_queue:
    
    126
    +            self.__operations_message_queues[peer] = message_queue
    
    127
    +
    
    128
    +        if peer not in self.__operations_by_peer:
    
    129
    +            operation = operations_pb2.Operation()
    
    130
    +            operation.name = str(uuid.uuid4())
    
    131
    +            operation.done = False
    
    132
    +
    
    133
    +            self.__operations_by_name[operation.name] = operation
    
    134
    +            self.__operations_by_peer[peer] = operation
    
    135
    +            self._operations.append(operation)
    
    136
    +
    
    137
    +            send_operation_update = True
    
    117 138
     
    
    118
    -    def unregister_client(self, queue):
    
    119
    -        """Unsubscribes to the job's :class:`Operation` stage change events.
    
    139
    +        if send_operation_update:
    
    140
    +            message_queue.put(self._copy_operation(self._operation))
    
    141
    +
    
    142
    +    def unregister_client(self, peer):
    
    143
    +        """Unsubscribes to the job's :class:`Operation` stage change.
    
    120 144
     
    
    121 145
             Args:
    
    122
    -            queue (queue.Queue): the event queue to unregister.
    
    146
    +            peer (str): a unique string identifying the client.
    
    123 147
             """
    
    124
    -        self._operation_update_queues.remove(queue)
    
    148
    +        if peer not in self.__operations_message_queues:
    
    149
    +            del self.__operations_message_queues[peer]
    
    125 150
     
    
    126 151
         def set_cached_result(self, action_result):
    
    127 152
             """Allows specifying an action result form the action cache for the job.
    
    ... ... @@ -130,6 +155,50 @@ class Job:
    130 155
             self.__execute_response.result.CopyFrom(action_result)
    
    131 156
             self.__execute_response.cached_result = True
    
    132 157
     
    
    158
    +    def update_operation_stage(self, stage):
    
    159
    +        """Operates a stage transition for the job's :class:Operation.
    
    160
    +
    
    161
    +        Args:
    
    162
    +            stage (OperationStage): the operation stage to transition to.
    
    163
    +        """
    
    164
    +        if stage.value == self.__operation_metadata.stage:
    
    165
    +            return
    
    166
    +
    
    167
    +        self.__operation_metadata.stage = stage.value
    
    168
    +
    
    169
    +        if self.__operation_metadata.stage == OperationStage.QUEUED.value:
    
    170
    +            if self.__queued_timestamp.ByteSize() == 0:
    
    171
    +                self.__queued_timestamp.GetCurrentTime()
    
    172
    +            self._n_tries += 1
    
    173
    +
    
    174
    +        self._update_operations()
    
    175
    +
    
    176
    +    def cancel_operation(self):
    
    177
    +        """Triggers a job's :class:Operation cancellation.
    
    178
    +
    
    179
    +        This will also cancel any job's :class:Lease that may have been issued.
    
    180
    +        """
    
    181
    +        self.__operation_cancelled = True
    
    182
    +        if self._lease is not None:
    
    183
    +            self.cancel_lease()
    
    184
    +
    
    185
    +        self.__execute_response = remote_execution_pb2.ExecuteResponse()
    
    186
    +        self.__execute_response.status.code = code_pb2.CANCELLED
    
    187
    +        self.__execute_response.status.message = "Operation cancelled by client."
    
    188
    +
    
    189
    +        self.update_operation_stage(OperationStage.COMPLETED)
    
    190
    +
    
    191
    +    def check_operation_status(self):
    
    192
    +        """Reports errors on unexpected job's :class:Operation state.
    
    193
    +
    
    194
    +        Raises:
    
    195
    +            CancelledError: if the job's :class:Operation was cancelled.
    
    196
    +        """
    
    197
    +        if self.__operation_cancelled:
    
    198
    +            raise CancelledError(self.__execute_response.status.message)
    
    199
    +
    
    200
    +    # --- Public API: RWAPI-side ---
    
    201
    +
    
    133 202
         def create_lease(self):
    
    134 203
             """Emits a new :class:`Lease` for the job.
    
    135 204
     
    
    ... ... @@ -205,45 +274,29 @@ class Job:
    205 274
             if self._lease is not None:
    
    206 275
                 self.update_lease_state(LeaseState.CANCELLED)
    
    207 276
     
    
    208
    -    def update_operation_stage(self, stage):
    
    209
    -        """Operates a stage transition for the job's :class:Operation.
    
    210
    -
    
    211
    -        Args:
    
    212
    -            stage (OperationStage): the operation stage to transition to.
    
    213
    -        """
    
    214
    -        if stage.value == self.__operation_metadata.stage:
    
    215
    -            return
    
    216
    -
    
    217
    -        self.__operation_metadata.stage = stage.value
    
    218
    -
    
    219
    -        if self.__operation_metadata.stage == OperationStage.QUEUED.value:
    
    220
    -            if self.__queued_timestamp.ByteSize() == 0:
    
    221
    -                self.__queued_timestamp.GetCurrentTime()
    
    222
    -            self._n_tries += 1
    
    223
    -
    
    224
    -        elif self.__operation_metadata.stage == OperationStage.COMPLETED.value:
    
    225
    -            if self.__execute_response is not None:
    
    226
    -                self._operation.response.Pack(self.__execute_response)
    
    227
    -            self._operation.done = True
    
    277
    +    # --- Private API ---
    
    228 278
     
    
    229
    -        self._operation.metadata.Pack(self.__operation_metadata)
    
    279
    +    def _copy_operation(self, operation):
    
    280
    +        new_operation = operations_pb2.Operation()
    
    230 281
     
    
    231
    -        for queue in self._operation_update_queues:
    
    232
    -            queue.put(self._operation)
    
    282
    +        new_operation.CopyFrom(operation)
    
    233 283
     
    
    234
    -    def cancel_operation(self):
    
    235
    -        """Triggers a job's :class:Operation cancellation.
    
    284
    +        return new_operation
    
    236 285
     
    
    237
    -        This will also cancel any job's :class:Lease that may have been issued.
    
    238
    -        """
    
    239
    -        self.__operation_cancelled = True
    
    240
    -        if self._lease is not None:
    
    241
    -            self.cancel_lease()
    
    286
    +    def _update_operations(self, restricted_peers_to_notify=None):
    
    287
    +        for operation in self._operations:
    
    288
    +            if operation.name not in self.__operations_by_name:
    
    289
    +                continue
    
    242 290
     
    
    243
    -        self.__execute_response = remote_execution_pb2.ExecuteResponse()
    
    244
    -        self.__execute_response.status.code = code_pb2.CANCELLED
    
    245
    -        self.__execute_response.status.message = "Operation cancelled by client."
    
    291
    +            operation.metadata.Pack(self.__operation_metadata)
    
    246 292
     
    
    247
    -        self.update_operation_stage(OperationStage.COMPLETED)
    
    293
    +            if self.__operation_metadata.stage == OperationStage.COMPLETED.value:
    
    294
    +                if self.__execute_response is not None:
    
    295
    +                    self._operation.response.Pack(self.__execute_response)
    
    296
    +                self._operation.done = True
    
    248 297
     
    
    249
    -        raise CancelledError("Operation cancelled: {}".format(self._name))
    298
    +        for message_queue in self.__operation_message_queues.values():
    
    299
    +            if self.__operation_cancelled:
    
    300
    +                message_queue.put(CancelledError(self.__execute_response.status.message))
    
    301
    +            else:
    
    302
    +                message_queue.put(self._copy_operation(self._operation))

  • buildgrid/server/operations/instance.py
    ... ... @@ -21,7 +21,7 @@ An instance of the LongRunningOperations Service.
    21 21
     
    
    22 22
     import logging
    
    23 23
     
    
    24
    -from buildgrid._exceptions import InvalidArgumentError
    
    24
    +from buildgrid._exceptions import InvalidArgumentError, NotFoundError
    
    25 25
     from buildgrid._protos.google.longrunning import operations_pb2
    
    26 26
     
    
    27 27
     
    
    ... ... @@ -34,14 +34,14 @@ class OperationsInstance:
    34 34
         def register_instance_with_server(self, instance_name, server):
    
    35 35
             server.add_operations_instance(self, instance_name)
    
    36 36
     
    
    37
    -    def get_operation(self, name):
    
    38
    -        job = self._scheduler.jobs.get(name)
    
    37
    +    def get_operation(self, job_name):
    
    38
    +        try:
    
    39
    +            operation = self._scheduler.get_job_operation(job_name)
    
    39 40
     
    
    40
    -        if job is None:
    
    41
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    41
    +        except NotFoundError:
    
    42
    +            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
    
    42 43
     
    
    43
    -        else:
    
    44
    -            return job.operation
    
    44
    +        return operation
    
    45 45
     
    
    46 46
         def list_operations(self, list_filter, page_size, page_token):
    
    47 47
             # TODO: Pages
    
    ... ... @@ -57,37 +57,17 @@ class OperationsInstance:
    57 57
     
    
    58 58
             return response
    
    59 59
     
    
    60
    -    def delete_operation(self, name):
    
    61
    -        try:
    
    62
    -            self._scheduler.jobs.pop(name)
    
    63
    -
    
    64
    -        except KeyError:
    
    65
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    66
    -
    
    67
    -    def cancel_operation(self, name):
    
    60
    +    def delete_operation(self, job_name):
    
    68 61
             try:
    
    69
    -            self._scheduler.cancel_job_operation(name)
    
    62
    +            # TODO: Unregister the caller client
    
    63
    +            pass
    
    70 64
     
    
    71
    -        except KeyError:
    
    72
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    65
    +        except NotFoundError:
    
    66
    +            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
    
    73 67
     
    
    74
    -    def register_message_client(self, name, queue):
    
    68
    +    def cancel_operation(self, job_name):
    
    75 69
             try:
    
    76
    -            self._scheduler.register_client(name, queue)
    
    77
    -
    
    78
    -        except KeyError:
    
    79
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    80
    -
    
    81
    -    def unregister_message_client(self, name, queue):
    
    82
    -        try:
    
    83
    -            self._scheduler.unregister_client(name, queue)
    
    84
    -
    
    85
    -        except KeyError:
    
    86
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    70
    +            self._scheduler.cancel_job_operation(job_name)
    
    87 71
     
    
    88
    -    def stream_operation_updates(self, message_queue, operation_name):
    
    89
    -        operation = message_queue.get()
    
    90
    -        while not operation.done:
    
    91
    -            yield operation
    
    92
    -            operation = message_queue.get()
    
    93
    -        yield operation
    72
    +        except NotFoundError:
    
    73
    +            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))

  • buildgrid/server/scheduler.py
    ... ... @@ -23,7 +23,7 @@ from collections import deque
    23 23
     
    
    24 24
     from buildgrid._exceptions import NotFoundError
    
    25 25
     
    
    26
    -from .job import OperationStage, LeaseState
    
    26
    +from .job import Job, OperationStage, LeaseState
    
    27 27
     
    
    28 28
     
    
    29 29
     class Scheduler:
    
    ... ... @@ -32,28 +32,98 @@ class Scheduler:
    32 32
     
    
    33 33
         def __init__(self, action_cache=None):
    
    34 34
             self._action_cache = action_cache
    
    35
    -        self.jobs = {}
    
    36
    -        self.queue = deque()
    
    37 35
     
    
    38
    -    def register_client(self, job_name, queue):
    
    39
    -        self.jobs[job_name].register_client(queue)
    
    36
    +        self.__jobs_by_action = {}
    
    37
    +        self.__jobs_by_name = {}
    
    40 38
     
    
    41
    -    def unregister_client(self, job_name, queue):
    
    42
    -        self.jobs[job_name].unregister_client(queue)
    
    39
    +        self.__queue = deque()
    
    43 40
     
    
    44
    -        if not self.jobs[job_name].n_clients and self.jobs[job_name].operation.done:
    
    45
    -            del self.jobs[job_name]
    
    41
    +    def register_client(self, job_name, peer, message_queue):
    
    42
    +        """Subscribes to one of the job's :class:`Operation` stage changes.
    
    46 43
     
    
    47
    -    def queue_job(self, job, skip_cache_lookup=False):
    
    48
    -        self.jobs[job.name] = job
    
    44
    +        Args:
    
    45
    +            job_name (str): name of the job subscribe to.
    
    46
    +            peer (str): a unique string identifying the client.
    
    47
    +            message_queue (queue.Queue): the event queue to register.
    
    48
    +
    
    49
    +        Raises:
    
    50
    +            NotFoundError: If no job with `job_name` exists.
    
    51
    +        """
    
    52
    +        try:
    
    53
    +            job = self.__jobs_by_name[job_name]
    
    54
    +        except KeyError:
    
    55
    +            raise NotFoundError('No job named {} found.'.format(job_name))
    
    56
    +
    
    57
    +        job.register_client(peer, message_queue)
    
    58
    +
    
    59
    +    def unregister_client(self, job_name, peer):
    
    60
    +        """Unsubscribes to one of the job's :class:`Operation` stage change.
    
    61
    +
    
    62
    +        Args:
    
    63
    +            job_name (str): name of the job to unsubscribe from.
    
    64
    +            peer (str): a unique string identifying the client.
    
    65
    +
    
    66
    +        Raises:
    
    67
    +            NotFoundError: If no job with `job_name` exists.
    
    68
    +        """
    
    69
    +        try:
    
    70
    +            job = self.__jobs_by_name[job_name]
    
    71
    +        except KeyError:
    
    72
    +            raise NotFoundError('No job named {} found.'.format(job_name))
    
    73
    +
    
    74
    +        job.unregister_client(peer)
    
    75
    +
    
    76
    +        if job.n_clients == 0 and job.operation.done:
    
    77
    +            del self.__jobs_by_action[job.action_digest]
    
    78
    +            del self.__jobs_by_name[job.name]
    
    79
    +
    
    80
    +    def queue_job(self, peer, action, action_digest, priority=0, skip_cache_lookup=False):
    
    81
    +        """Inserts a newly created job into the execution queue.
    
    82
    +
    
    83
    +        Args:
    
    84
    +            action (Action): the given action to queue for execution.
    
    85
    +            action_digest (Digest): the digest of the given action.
    
    86
    +            priority (int): the execution job's priority.
    
    87
    +            skip_cache_lookup (bool): whether or not to look for pre-computed
    
    88
    +                result for the given action.
    
    89
    +        """
    
    90
    +        def __queue_job(queue, new_job):
    
    91
    +            index = 0
    
    92
    +            for queued_job in reversed(queue):
    
    93
    +                if new_job.priority < queued_job.priority:
    
    94
    +                    index += 1
    
    95
    +                else:
    
    96
    +                    break
    
    97
    +
    
    98
    +            index = len(queue) - index
    
    99
    +
    
    100
    +            queue.insert(index, new_job)
    
    101
    +
    
    102
    +        if action_digest.hash in self.__jobs_by_action:
    
    103
    +            job = self.__jobs_by_action[action_digest.hash]
    
    104
    +
    
    105
    +            if priority < job.priority:
    
    106
    +                job.priority = priority
    
    107
    +
    
    108
    +                if job in self.__queue:
    
    109
    +                    self.__queue.remove(job)
    
    110
    +                    __queue_job(self.__queue, job)
    
    111
    +
    
    112
    +            return job
    
    113
    +
    
    114
    +        job = Job(action, action_digest, priority=priority)
    
    115
    +
    
    116
    +        self.__jobs_by_action[job.action_digest.hash] = job
    
    117
    +        self.__jobs_by_name[job.name] = job
    
    49 118
     
    
    50 119
             operation_stage = None
    
    51 120
             if self._action_cache is not None and not skip_cache_lookup:
    
    52 121
                 try:
    
    53 122
                     action_result = self._action_cache.get_action_result(job.action_digest)
    
    123
    +
    
    54 124
                 except NotFoundError:
    
    55 125
                     operation_stage = OperationStage.QUEUED
    
    56
    -                self.queue.append(job)
    
    126
    +                __queue_job(self.__queue, job)
    
    57 127
     
    
    58 128
                 else:
    
    59 129
                     job.set_cached_result(action_result)
    
    ... ... @@ -61,23 +131,25 @@ class Scheduler:
    61 131
     
    
    62 132
             else:
    
    63 133
                 operation_stage = OperationStage.QUEUED
    
    64
    -            self.queue.append(job)
    
    134
    +            __queue_job(self.__queue, job)
    
    65 135
     
    
    66 136
             job.update_operation_stage(operation_stage)
    
    67 137
     
    
    138
    +        return job
    
    139
    +
    
    68 140
         def retry_job(self, job_name):
    
    69
    -        if job_name in self.jobs:
    
    70
    -            job = self.jobs[job_name]
    
    141
    +        if job_name in self.__jobs_by_name:
    
    142
    +            job = self.__jobs_by_name[job_name]
    
    71 143
                 if job.n_tries >= self.MAX_N_TRIES:
    
    72 144
                     # TODO: Decide what to do with these jobs
    
    73 145
                     job.update_operation_stage(OperationStage.COMPLETED)
    
    74 146
                     # TODO: Mark these jobs as done
    
    75 147
                 else:
    
    76 148
                     job.update_operation_stage(OperationStage.QUEUED)
    
    77
    -                self.queue.appendleft(job)
    
    149
    +                self.__queue.appendleft(job)
    
    78 150
     
    
    79 151
         def list_jobs(self):
    
    80
    -        return self.jobs.values()
    
    152
    +        return self.__jobs_by_name.values()
    
    81 153
     
    
    82 154
         def request_job_leases(self, worker_capabilities):
    
    83 155
             """Generates a list of the highest priority leases to be run.
    
    ... ... @@ -87,10 +159,10 @@ class Scheduler:
    87 159
                     worker properties, configuration and state at the time of the
    
    88 160
                     request.
    
    89 161
             """
    
    90
    -        if not self.queue:
    
    162
    +        if not self.__queue:
    
    91 163
                 return []
    
    92 164
     
    
    93
    -        job = self.queue.popleft()
    
    165
    +        job = self.__queue.popleft()
    
    94 166
             # For now, one lease at a time:
    
    95 167
             lease = job.create_lease()
    
    96 168
     
    
    ... ... @@ -110,7 +182,7 @@ class Scheduler:
    110 182
                 lease_result (google.protobuf.Any): the lease execution result, only
    
    111 183
                     required if `lease_state` is `COMPLETED`.
    
    112 184
             """
    
    113
    -        job = self.jobs[job_name]
    
    185
    +        job = self.__jobs_by_name[job_name]
    
    114 186
     
    
    115 187
             if lease_state == LeaseState.PENDING:
    
    116 188
                 job.update_lease_state(LeaseState.PENDING)
    
    ... ... @@ -130,12 +202,20 @@ class Scheduler:
    130 202
                 job.update_operation_stage(OperationStage.COMPLETED)
    
    131 203
     
    
    132 204
         def get_job_lease(self, job_name):
    
    133
    -        """Returns the lease associated to job, if any have been emitted yet."""
    
    134
    -        return self.jobs[job_name].lease
    
    205
    +        """Returns the lease associated to job, if any have been emitted yet.
    
    135 206
     
    
    136
    -    def get_job_operation(self, job_name):
    
    137
    -        """Returns the operation associated to job."""
    
    138
    -        return self.jobs[job_name].operation
    
    207
    +        Args:
    
    208
    +            job_name (str): name of the job to query.
    
    209
    +
    
    210
    +        Raises:
    
    211
    +            NotFoundError: If no job with `job_name` exists.
    
    212
    +        """
    
    213
    +        try:
    
    214
    +            job = self.__jobs_by_name[job_name]
    
    215
    +        except KeyError:
    
    216
    +            raise NotFoundError('No job named {} found.'.format(job_name))
    
    217
    +
    
    218
    +        return job.lease
    
    139 219
     
    
    140 220
         def cancel_job_operation(self, job_name):
    
    141 221
             """"Cancels the underlying operation of a given job.
    
    ... ... @@ -145,4 +225,25 @@ class Scheduler:
    145 225
             Args:
    
    146 226
                 job_name (str): name of the job holding the operation to cancel.
    
    147 227
             """
    
    148
    -        self.jobs[job_name].cancel_operation()
    228
    +        try:
    
    229
    +            job = self.__jobs_by_name[job_name]
    
    230
    +        except KeyError:
    
    231
    +            raise NotFoundError('No job named {} found.'.format(job_name))
    
    232
    +
    
    233
    +        job.cancel_operation()
    
    234
    +
    
    235
    +    def get_job_operation(self, job_name):
    
    236
    +        """Returns the operation associated to job.
    
    237
    +
    
    238
    +        Args:
    
    239
    +            job_name (str): name of the job to query.
    
    240
    +
    
    241
    +        Raises:
    
    242
    +            NotFoundError: If no job with `job_name` exists.
    
    243
    +        """
    
    244
    +        try:
    
    245
    +            job = self.__jobs_by_name[job_name]
    
    246
    +        except KeyError:
    
    247
    +            raise NotFoundError('No job named {} found.'.format(job_name))
    
    248
    +
    
    249
    +        return job.operation

  • tests/integration/bots_service.py
    ... ... @@ -26,7 +26,6 @@ import pytest
    26 26
     
    
    27 27
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    28 28
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    29
    -from buildgrid.server import job
    
    30 29
     from buildgrid.server.controller import ExecutionController
    
    31 30
     from buildgrid.server.job import LeaseState
    
    32 31
     from buildgrid.server.bots import service
    
    ... ... @@ -283,7 +282,8 @@ def test_post_bot_event_temp(context, instance):
    283 282
     def _inject_work(scheduler, action=None, action_digest=None):
    
    284 283
         if not action:
    
    285 284
             action = remote_execution_pb2.Action()
    
    285
    +
    
    286 286
         if not action_digest:
    
    287 287
             action_digest = remote_execution_pb2.Digest()
    
    288
    -    j = job.Job(action, action_digest)
    
    289
    -    scheduler.queue_job(j, True)
    288
    +
    
    289
    +    scheduler.queue_job(action, action_digest, skip_cache_lookup=True)

  • tests/integration/execution_service.py
    ... ... @@ -106,13 +106,13 @@ def test_no_action_digest_in_storage(instance, context):
    106 106
     
    
    107 107
     
    
    108 108
     def test_wait_execution(instance, controller, context):
    
    109
    -    j = job.Job(action, action_digest)
    
    109
    +    j = controller.execution_instance._scheduler.queue_job(action,
    
    110
    +                                                           action_digest,
    
    111
    +                                                           skip_cache_lookup=True)
    
    110 112
         j._operation.done = True
    
    111 113
     
    
    112 114
         request = remote_execution_pb2.WaitExecutionRequest(name=j.name)
    
    113 115
     
    
    114
    -    controller.execution_instance._scheduler.jobs[j.name] = j
    
    115
    -
    
    116 116
         action_result_any = any_pb2.Any()
    
    117 117
         action_result = remote_execution_pb2.ActionResult()
    
    118 118
         action_result_any.Pack(action_result)
    

  • tests/integration/operations_service.py
    ... ... @@ -24,6 +24,7 @@ import grpc
    24 24
     from grpc._server import _Context
    
    25 25
     import pytest
    
    26 26
     
    
    27
    +from buildgrid._enums import OperationStage
    
    27 28
     from buildgrid._exceptions import InvalidArgumentError
    
    28 29
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    29 30
     from buildgrid._protos.google.longrunning import operations_pb2
    
    ... ... @@ -173,7 +174,7 @@ def test_list_operations_with_result(instance, controller, execute_request, cont
    173 174
         output_file = remote_execution_pb2.OutputFile(path='unicorn')
    
    174 175
         action_result.output_files.extend([output_file])
    
    175 176
     
    
    176
    -    controller.operations_instance._scheduler.jobs[response_execute.name].create_lease()
    
    177
    +    controller.operations_instance._scheduler.request_job_leases({})
    
    177 178
         controller.operations_instance._scheduler.update_job_lease_state(response_execute.name,
    
    178 179
                                                                          LeaseState.COMPLETED,
    
    179 180
                                                                          lease_status=status_pb2.Status(),
    
    ... ... @@ -236,12 +237,26 @@ def test_delete_operation_fail(instance, context):
    236 237
         context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
    
    237 238
     
    
    238 239
     
    
    239
    -def test_cancel_operation(instance, context):
    
    240
    +def test_cancel_operation(instance, controller, execute_request, context):
    
    241
    +    response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    242
    +                                                             execute_request.skip_cache_lookup)
    
    243
    +
    
    240 244
         request = operations_pb2.CancelOperationRequest()
    
    241
    -    request.name = "{}/{}".format(instance_name, "runner")
    
    245
    +    request.name = "{}/{}".format(instance_name, response_execute.name)
    
    246
    +
    
    242 247
         instance.CancelOperation(request, context)
    
    243 248
     
    
    244
    -    context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
    
    249
    +    context.set_code.assert_called_once_with(grpc.StatusCode.CANCELLED)
    
    250
    +
    
    251
    +    request = operations_pb2.ListOperationsRequest(name=instance_name)
    
    252
    +    response = instance.ListOperations(request, context)
    
    253
    +
    
    254
    +    assert len(response.operations) is 1
    
    255
    +
    
    256
    +    for operation in response.operations:
    
    257
    +        operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    258
    +        operation.metadata.Unpack(operation_metadata)
    
    259
    +        assert operation_metadata.stage == OperationStage.COMPLETED.value
    
    245 260
     
    
    246 261
     
    
    247 262
     def test_cancel_operation_blank(blank_instance, context):
    
    ... ... @@ -249,7 +264,7 @@ def test_cancel_operation_blank(blank_instance, context):
    249 264
         request.name = "runner"
    
    250 265
         blank_instance.CancelOperation(request, context)
    
    251 266
     
    
    252
    -    context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
    
    267
    +    context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
    
    253 268
     
    
    254 269
     
    
    255 270
     def test_cancel_operation_instance_fail(instance, context):
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]