[Notes] [Git][BuildGrid/buildgrid][mablanch/75-requests-multiplexing] 12 commits: setup.py: Fix the PyYAML dependency



Title: GitLab

Martin Blanchard pushed to branch mablanch/75-requests-multiplexing at BuildGrid / buildgrid

Commits:

12 changed files:

Changes:

  • buildgrid/_app/commands/cmd_operation.py
    ... ... @@ -155,6 +155,19 @@ def status(context, operation_name, json):
    155 155
             click.echo(json_format.MessageToJson(operation))
    
    156 156
     
    
    157 157
     
    
    158
    +@cli.command('cancel', short_help="Cancel an operation.")
    
    159
    +@click.argument('operation-name', nargs=1, type=click.STRING, required=True)
    
    160
    +@pass_context
    
    161
    +def cancel(context, operation_name):
    
    162
    +    context.logger.info("Cancelling an operation...")
    
    163
    +    stub = operations_pb2_grpc.OperationsStub(context.channel)
    
    164
    +
    
    165
    +    request = operations_pb2.CancelOperationRequest(name=operation_name)
    
    166
    +
    
    167
    +    stub.CancelOperation(request)
    
    168
    +    context.logger.info("Operation cancelled: [{}]".format(request))
    
    169
    +
    
    170
    +
    
    158 171
     @cli.command('list', short_help="List operations.")
    
    159 172
     @click.option('--json', is_flag=True, show_default=True,
    
    160 173
                   help="Print operations list in JSON format.")
    

  • buildgrid/_exceptions.py
    ... ... @@ -52,6 +52,12 @@ class BotError(BgdError):
    52 52
             super().__init__(message, detail=detail, domain=ErrorDomain.BOT, reason=reason)
    
    53 53
     
    
    54 54
     
    
    55
    +class CancelledError(BgdError):
    
    56
    +    """The job was cancelled and any callers should be notified"""
    
    57
    +    def __init__(self, message, detail=None, reason=None):
    
    58
    +        super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
    
    59
    +
    
    60
    +
    
    55 61
     class InvalidArgumentError(BgdError):
    
    56 62
         """A bad argument was passed, such as a name which doesn't exist."""
    
    57 63
         def __init__(self, message, detail=None, reason=None):
    

  • buildgrid/server/execution/instance.py
    ... ... @@ -21,11 +21,9 @@ An instance of the Remote Execution Service.
    21 21
     
    
    22 22
     import logging
    
    23 23
     
    
    24
    -from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
    
    24
    +from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, NotFoundError
    
    25 25
     from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
    
    26 26
     
    
    27
    -from ..job import Job
    
    28
    -
    
    29 27
     
    
    30 28
     class ExecutionInstance:
    
    31 29
     
    
    ... ... @@ -37,7 +35,7 @@ class ExecutionInstance:
    37 35
         def register_instance_with_server(self, instance_name, server):
    
    38 36
             server.add_execution_instance(self, instance_name)
    
    39 37
     
    
    40
    -    def execute(self, action_digest, skip_cache_lookup, message_queue=None):
    
    38
    +    def execute(self, action_digest, skip_cache_lookup, peer=None, message_queue=None):
    
    41 39
             """ Sends a job for execution.
    
    42 40
             Queues an action and creates an Operation instance to be associated with
    
    43 41
             this action.
    
    ... ... @@ -48,27 +46,26 @@ class ExecutionInstance:
    48 46
             if not action:
    
    49 47
                 raise FailedPreconditionError("Could not get action from storage.")
    
    50 48
     
    
    51
    -        job = Job(action, action_digest)
    
    52
    -        if message_queue is not None:
    
    53
    -            job.register_client(message_queue)
    
    49
    +        job = self._scheduler.queue_job(action, action_digest, skip_cache_lookup)
    
    54 50
     
    
    55
    -        self._scheduler.queue_job(job, skip_cache_lookup)
    
    51
    +        if peer is not None and message_queue is not None:
    
    52
    +            job.register_client(peer, message_queue)
    
    56 53
     
    
    57 54
             return job.operation
    
    58 55
     
    
    59
    -    def register_message_client(self, name, queue):
    
    56
    +    def register_message_client(self, job_name, peer, message_queue):
    
    60 57
             try:
    
    61
    -            self._scheduler.register_client(name, queue)
    
    58
    +            self._scheduler.register_client(job_name, peer, message_queue)
    
    62 59
     
    
    63
    -        except KeyError:
    
    64
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    60
    +        except NotFoundError:
    
    61
    +            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
    
    65 62
     
    
    66
    -    def unregister_message_client(self, name, queue):
    
    63
    +    def unregister_message_client(self, job_name, peer):
    
    67 64
             try:
    
    68
    -            self._scheduler.unregister_client(name, queue)
    
    65
    +            self._scheduler.unregister_client(job_name, peer)
    
    69 66
     
    
    70
    -        except KeyError:
    
    71
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    67
    +        except NotFoundError:
    
    68
    +            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
    
    72 69
     
    
    73 70
         def stream_operation_updates(self, message_queue, operation_name):
    
    74 71
             operation = message_queue.get()
    

  • buildgrid/server/execution/service.py
    ... ... @@ -26,7 +26,7 @@ from functools import partial
    26 26
     
    
    27 27
     import grpc
    
    28 28
     
    
    29
    -from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
    
    29
    +from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, CancelledError
    
    30 30
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
    
    31 31
     from buildgrid._protos.google.longrunning import operations_pb2
    
    32 32
     
    
    ... ... @@ -47,10 +47,11 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    47 47
                 instance = self._get_instance(request.instance_name)
    
    48 48
                 operation = instance.execute(request.action_digest,
    
    49 49
                                              request.skip_cache_lookup,
    
    50
    -                                         message_queue)
    
    50
    +                                         peer=context.peer(),
    
    51
    +                                         message_queue=message_queue)
    
    51 52
     
    
    52 53
                 context.add_callback(partial(instance.unregister_message_client,
    
    53
    -                                         operation.name, message_queue))
    
    54
    +                                         operation.name, context.peer()))
    
    54 55
     
    
    55 56
                 instanced_op_name = "{}/{}".format(request.instance_name,
    
    56 57
                                                    operation.name)
    
    ... ... @@ -76,6 +77,12 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    76 77
                 context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    77 78
                 yield operations_pb2.Operation()
    
    78 79
     
    
    80
    +        except CancelledError as e:
    
    81
    +            self.logger.error(e)
    
    82
    +            context.set_details(str(e))
    
    83
    +            context.set_code(grpc.StatusCode.CANCELLED)
    
    84
    +            yield operations_pb2.Operation()
    
    85
    +
    
    79 86
         def WaitExecution(self, request, context):
    
    80 87
             try:
    
    81 88
                 names = request.name.split("/")
    
    ... ... @@ -88,10 +95,11 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    88 95
                 operation_name = names[-1]
    
    89 96
                 instance = self._get_instance(instance_name)
    
    90 97
     
    
    91
    -            instance.register_message_client(operation_name, message_queue)
    
    98
    +            instance.register_message_client(operation_name,
    
    99
    +                                             context.peer(), message_queue)
    
    92 100
     
    
    93 101
                 context.add_callback(partial(instance.unregister_message_client,
    
    94
    -                                         operation_name, message_queue))
    
    102
    +                                         operation_name, context.peer()))
    
    95 103
     
    
    96 104
                 for operation in instance.stream_operation_updates(message_queue,
    
    97 105
                                                                    operation_name):
    
    ... ... @@ -106,6 +114,12 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    106 114
                 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    107 115
                 yield operations_pb2.Operation()
    
    108 116
     
    
    117
    +        except CancelledError as e:
    
    118
    +            self.logger.error(e)
    
    119
    +            context.set_details(str(e))
    
    120
    +            context.set_code(grpc.StatusCode.CANCELLED)
    
    121
    +            yield operations_pb2.Operation()
    
    122
    +
    
    109 123
         def _get_instance(self, name):
    
    110 124
             try:
    
    111 125
                 return self._instances[name]
    

  • buildgrid/server/job.py
    ... ... @@ -19,33 +19,40 @@ import uuid
    19 19
     from google.protobuf import timestamp_pb2
    
    20 20
     
    
    21 21
     from buildgrid._enums import LeaseState, OperationStage
    
    22
    +from buildgrid._exceptions import CancelledError
    
    22 23
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    23 24
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    24 25
     from buildgrid._protos.google.longrunning import operations_pb2
    
    26
    +from buildgrid._protos.google.rpc import code_pb2
    
    25 27
     
    
    26 28
     
    
    27 29
     class Job:
    
    28 30
     
    
    29
    -    def __init__(self, action, action_digest):
    
    31
    +    def __init__(self, action, action_digest, priority=0):
    
    30 32
             self.logger = logging.getLogger(__name__)
    
    31 33
     
    
    32 34
             self._name = str(uuid.uuid4())
    
    35
    +        self._priority = priority
    
    33 36
             self._action = remote_execution_pb2.Action()
    
    34 37
             self._operation = operations_pb2.Operation()
    
    35 38
             self._lease = None
    
    36 39
     
    
    37 40
             self.__execute_response = None
    
    38 41
             self.__operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    42
    +
    
    39 43
             self.__queued_timestamp = timestamp_pb2.Timestamp()
    
    40 44
             self.__worker_start_timestamp = timestamp_pb2.Timestamp()
    
    41 45
             self.__worker_completed_timestamp = timestamp_pb2.Timestamp()
    
    42 46
     
    
    47
    +        self.__operation_cancelled = False
    
    48
    +        self.__lease_cancelled = False
    
    49
    +
    
    43 50
             self.__operation_metadata.action_digest.CopyFrom(action_digest)
    
    44 51
             self.__operation_metadata.stage = OperationStage.UNKNOWN.value
    
    45 52
     
    
    46 53
             self._action.CopyFrom(action)
    
    47 54
             self._do_not_cache = self._action.do_not_cache
    
    48
    -        self._operation_update_queues = []
    
    55
    +        self._operation_update_queues = {}
    
    49 56
             self._operation.name = self._name
    
    50 57
             self._operation.done = False
    
    51 58
             self._n_tries = 0
    
    ... ... @@ -54,6 +61,10 @@ class Job:
    54 61
         def name(self):
    
    55 62
             return self._name
    
    56 63
     
    
    64
    +    @property
    
    65
    +    def priority(self):
    
    66
    +        return self._priority
    
    67
    +
    
    57 68
         @property
    
    58 69
         def do_not_cache(self):
    
    59 70
             return self._do_not_cache
    
    ... ... @@ -100,22 +111,34 @@ class Job:
    100 111
         def n_clients(self):
    
    101 112
             return len(self._operation_update_queues)
    
    102 113
     
    
    103
    -    def register_client(self, queue):
    
    104
    -        """Subscribes to the job's :class:`Operation` stage change events.
    
    114
    +    def __eq__(self, other):
    
    115
    +        if isinstance(other, Job):
    
    116
    +            return self.name == other.name
    
    117
    +        return False
    
    118
    +
    
    119
    +    def __ne__(self, other):
    
    120
    +        return not self.__eq__(other)
    
    121
    +
    
    122
    +    def register_client(self, peer, message_queue):
    
    123
    +        """Subscribes to the job's :class:`Operation` stage changes.
    
    105 124
     
    
    106 125
             Args:
    
    107
    -            queue (queue.Queue): the event queue to register.
    
    126
    +            peer (str): a unique string identifying the client.
    
    127
    +            message_queue (queue.Queue): the event queue to register.
    
    108 128
             """
    
    109
    -        self._operation_update_queues.append(queue)
    
    110
    -        queue.put(self._operation)
    
    129
    +        if peer not in self._operation_update_queues:
    
    130
    +            self._operation_update_queues[peer] = message_queue
    
    111 131
     
    
    112
    -    def unregister_client(self, queue):
    
    113
    -        """Unsubscribes to the job's :class:`Operation` stage change events.
    
    132
    +        message_queue.put(self._operation)
    
    133
    +
    
    134
    +    def unregister_client(self, peer):
    
    135
    +        """Unsubscribes to the job's :class:`Operation` stage change.
    
    114 136
     
    
    115 137
             Args:
    
    116
    -            queue (queue.Queue): the event queue to unregister.
    
    138
    +            peer (str): a unique string identifying the client.
    
    117 139
             """
    
    118
    -        self._operation_update_queues.remove(queue)
    
    140
    +        if peer not in self._operation_update_queues:
    
    141
    +            del self._operation_update_queues[peer]
    
    119 142
     
    
    120 143
         def set_cached_result(self, action_result):
    
    121 144
             """Allows specifying an action result form the action cache for the job.
    
    ... ... @@ -130,7 +153,9 @@ class Job:
    130 153
             Only one :class:`Lease` can be emitted for a given job. This method
    
    131 154
             should only be used once, any furhter calls are ignored.
    
    132 155
             """
    
    133
    -        if self._lease is not None:
    
    156
    +        if self.__operation_cancelled:
    
    157
    +            return None
    
    158
    +        elif self._lease is not None:
    
    134 159
                 return None
    
    135 160
     
    
    136 161
             self._lease = bots_pb2.Lease()
    
    ... ... @@ -171,7 +196,7 @@ class Job:
    171 196
                 action_result = remote_execution_pb2.ActionResult()
    
    172 197
     
    
    173 198
                 # TODO: Make a distinction between build and bot failures!
    
    174
    -            if status.code != 0:
    
    199
    +            if status.code != code_pb2.OK:
    
    175 200
                     self._do_not_cache = True
    
    176 201
     
    
    177 202
                 if result is not None:
    
    ... ... @@ -188,6 +213,15 @@ class Job:
    188 213
                 self.__execute_response.cached_result = False
    
    189 214
                 self.__execute_response.status.CopyFrom(status)
    
    190 215
     
    
    216
    +    def cancel_lease(self):
    
    217
    +        """Triggers a job's :class:Lease cancellation.
    
    218
    +
    
    219
    +        This will not cancel the job's :class:Operation.
    
    220
    +        """
    
    221
    +        self.__lease_cancelled = True
    
    222
    +        if self._lease is not None:
    
    223
    +            self.update_lease_state(LeaseState.CANCELLED)
    
    224
    +
    
    191 225
         def update_operation_stage(self, stage):
    
    192 226
             """Operates a stage transition for the job's :class:Operation.
    
    193 227
     
    
    ... ... @@ -211,5 +245,22 @@ class Job:
    211 245
     
    
    212 246
             self._operation.metadata.Pack(self.__operation_metadata)
    
    213 247
     
    
    214
    -        for queue in self._operation_update_queues:
    
    248
    +        for queue in self._operation_update_queues.values():
    
    215 249
                 queue.put(self._operation)
    
    250
    +
    
    251
    +    def cancel_operation(self):
    
    252
    +        """Triggers a job's :class:Operation cancellation.
    
    253
    +
    
    254
    +        This will also cancel any job's :class:Lease that may have been issued.
    
    255
    +        """
    
    256
    +        self.__operation_cancelled = True
    
    257
    +        if self._lease is not None:
    
    258
    +            self.cancel_lease()
    
    259
    +
    
    260
    +        self.__execute_response = remote_execution_pb2.ExecuteResponse()
    
    261
    +        self.__execute_response.status.code = code_pb2.CANCELLED
    
    262
    +        self.__execute_response.status.message = "Operation cancelled by client."
    
    263
    +
    
    264
    +        self.update_operation_stage(OperationStage.COMPLETED)
    
    265
    +
    
    266
    +        raise CancelledError("Operation cancelled: {}".format(self._name))

  • buildgrid/server/operations/instance.py
    ... ... @@ -21,7 +21,7 @@ An instance of the LongRunningOperations Service.
    21 21
     
    
    22 22
     import logging
    
    23 23
     
    
    24
    -from buildgrid._exceptions import InvalidArgumentError
    
    24
    +from buildgrid._exceptions import InvalidArgumentError, NotFoundError
    
    25 25
     from buildgrid._protos.google.longrunning import operations_pb2
    
    26 26
     
    
    27 27
     
    
    ... ... @@ -34,14 +34,14 @@ class OperationsInstance:
    34 34
         def register_instance_with_server(self, instance_name, server):
    
    35 35
             server.add_operations_instance(self, instance_name)
    
    36 36
     
    
    37
    -    def get_operation(self, name):
    
    38
    -        job = self._scheduler.jobs.get(name)
    
    37
    +    def get_operation(self, job_name):
    
    38
    +        try:
    
    39
    +            operation = self._scheduler.get_job_operation(job_name)
    
    39 40
     
    
    40
    -        if job is None:
    
    41
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    41
    +        except NotFoundError:
    
    42
    +            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
    
    42 43
     
    
    43
    -        else:
    
    44
    -            return job.operation
    
    44
    +        return operation
    
    45 45
     
    
    46 46
         def list_operations(self, list_filter, page_size, page_token):
    
    47 47
             # TODO: Pages
    
    ... ... @@ -57,34 +57,16 @@ class OperationsInstance:
    57 57
     
    
    58 58
             return response
    
    59 59
     
    
    60
    -    def delete_operation(self, name):
    
    61
    -        try:
    
    62
    -            self._scheduler.jobs.pop(name)
    
    63
    -
    
    64
    -        except KeyError:
    
    65
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    66
    -
    
    67
    -    def register_message_client(self, name, queue):
    
    60
    +    def delete_operation(self, job_name, peer):
    
    68 61
             try:
    
    69
    -            self._scheduler.register_client(name, queue)
    
    62
    +            self._scheduler.unregister_client(job_name, peer)
    
    70 63
     
    
    71
    -        except KeyError:
    
    72
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    64
    +        except NotFoundError:
    
    65
    +            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
    
    73 66
     
    
    74
    -    def unregister_message_client(self, name, queue):
    
    67
    +    def cancel_operation(self, job_name):
    
    75 68
             try:
    
    76
    -            self._scheduler.unregister_client(name, queue)
    
    77
    -
    
    78
    -        except KeyError:
    
    79
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    80
    -
    
    81
    -    def stream_operation_updates(self, message_queue, operation_name):
    
    82
    -        operation = message_queue.get()
    
    83
    -        while not operation.done:
    
    84
    -            yield operation
    
    85
    -            operation = message_queue.get()
    
    86
    -        yield operation
    
    69
    +            self._scheduler.cancel_job_operation(job_name)
    
    87 70
     
    
    88
    -    def cancel_operation(self, name):
    
    89
    -        # TODO: Cancel leases
    
    90
    -        raise NotImplementedError("Cancelled operations not supported")
    71
    +        except NotFoundError:
    
    72
    +            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))

  • buildgrid/server/operations/service.py
    ... ... @@ -25,7 +25,7 @@ import grpc
    25 25
     
    
    26 26
     from google.protobuf.empty_pb2 import Empty
    
    27 27
     
    
    28
    -from buildgrid._exceptions import InvalidArgumentError
    
    28
    +from buildgrid._exceptions import CancelledError, InvalidArgumentError
    
    29 29
     from buildgrid._protos.google.longrunning import operations_pb2_grpc, operations_pb2
    
    30 30
     
    
    31 31
     
    
    ... ... @@ -93,7 +93,7 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
    93 93
                 instance = self._get_instance(instance_name)
    
    94 94
     
    
    95 95
                 operation_name = self._parse_operation_name(name)
    
    96
    -            instance.delete_operation(operation_name)
    
    96
    +            instance.delete_operation(operation_name, context.peer())
    
    97 97
     
    
    98 98
             except InvalidArgumentError as e:
    
    99 99
                 self.logger.error(e)
    
    ... ... @@ -112,10 +112,10 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
    112 112
                 operation_name = self._parse_operation_name(name)
    
    113 113
                 instance.cancel_operation(operation_name)
    
    114 114
     
    
    115
    -        except NotImplementedError as e:
    
    115
    +        except CancelledError as e:
    
    116 116
                 self.logger.error(e)
    
    117 117
                 context.set_details(str(e))
    
    118
    -            context.set_code(grpc.StatusCode.UNIMPLEMENTED)
    
    118
    +            context.set_code(grpc.StatusCode.CANCELLED)
    
    119 119
     
    
    120 120
             except InvalidArgumentError as e:
    
    121 121
                 self.logger.error(e)
    

  • buildgrid/server/scheduler.py
    ... ... @@ -23,7 +23,7 @@ from collections import deque
    23 23
     
    
    24 24
     from buildgrid._exceptions import NotFoundError
    
    25 25
     
    
    26
    -from .job import OperationStage, LeaseState
    
    26
    +from .job import Job, OperationStage, LeaseState
    
    27 27
     
    
    28 28
     
    
    29 29
     class Scheduler:
    
    ... ... @@ -32,28 +32,97 @@ class Scheduler:
    32 32
     
    
    33 33
         def __init__(self, action_cache=None):
    
    34 34
             self._action_cache = action_cache
    
    35
    -        self.jobs = {}
    
    36
    -        self.queue = deque()
    
    37 35
     
    
    38
    -    def register_client(self, job_name, queue):
    
    39
    -        self.jobs[job_name].register_client(queue)
    
    36
    +        self.__jobs_by_action = {}
    
    37
    +        self.__jobs_by_name = {}
    
    38
    +        self.__queue = deque()
    
    40 39
     
    
    41
    -    def unregister_client(self, job_name, queue):
    
    42
    -        self.jobs[job_name].unregister_client(queue)
    
    40
    +    def register_client(self, job_name, peer, message_queue):
    
    41
    +        """Subscribes to one of the job's :class:`Operation` stage changes.
    
    43 42
     
    
    44
    -        if not self.jobs[job_name].n_clients and self.jobs[job_name].operation.done:
    
    45
    -            del self.jobs[job_name]
    
    43
    +        Args:
    
    44
    +            job_name (str): name of the job subscribe to.
    
    45
    +            peer (str): a unique string identifying the client.
    
    46
    +            message_queue (queue.Queue): the event queue to register.
    
    47
    +
    
    48
    +        Raises:
    
    49
    +            NotFoundError: If no job with `job_name` exists.
    
    50
    +        """
    
    51
    +        try:
    
    52
    +            job = self.__jobs_by_name[job_name]
    
    53
    +        except KeyError:
    
    54
    +            raise NotFoundError('No job named {} found.'.format(job_name))
    
    55
    +
    
    56
    +        job.register_client(peer, message_queue)
    
    57
    +
    
    58
    +    def unregister_client(self, job_name, peer):
    
    59
    +        """Unsubscribes to one of the job's :class:`Operation` stage change.
    
    60
    +
    
    61
    +        Args:
    
    62
    +            job_name (str): name of the job to unsubscribe from.
    
    63
    +            peer (str): a unique string identifying the client.
    
    64
    +
    
    65
    +        Raises:
    
    66
    +            NotFoundError: If no job with `job_name` exists.
    
    67
    +        """
    
    68
    +        try:
    
    69
    +            job = self.__jobs_by_name[job_name]
    
    70
    +        except KeyError:
    
    71
    +            raise NotFoundError('No job named {} found.'.format(job_name))
    
    72
    +
    
    73
    +        job.unregister_client(peer)
    
    74
    +
    
    75
    +        if job.n_clients == 0 and job.operation.done:
    
    76
    +            del self.__jobs_by_action[job.action_digest]
    
    77
    +            del self.__jobs_by_name[job.name]
    
    78
    +
    
    79
    +    def queue_job(self, action, action_digest, priority=0, skip_cache_lookup=False):
    
    80
    +        """Inserts a newly created job into the execution queue.
    
    81
    +
    
    82
    +        Args:
    
    83
    +            action (Action): the given action to queue for execution.
    
    84
    +            action_digest (Digest): the digest of the given action.
    
    85
    +            priority (int): the execution job's priority.
    
    86
    +            skip_cache_lookup (bool): whether or not to look for pre-computed
    
    87
    +                result for the given action.
    
    88
    +        """
    
    89
    +        def __queue_job(queue, new_job):
    
    90
    +            index = 0
    
    91
    +            for queued_job in reversed(queue):
    
    92
    +                if new_job.priority < queued_job.priority:
    
    93
    +                    index += 1
    
    94
    +                else:
    
    95
    +                    break
    
    96
    +
    
    97
    +            index = len(queue) - index
    
    98
    +
    
    99
    +            queue.insert(index, new_job)
    
    100
    +
    
    101
    +        if action_digest.hash in self.__jobs_by_action:
    
    102
    +            job = self.__jobs_by_action[action_digest.hash]
    
    103
    +
    
    104
    +            if priority < job.priority:
    
    105
    +                job.priority = priority
    
    106
    +
    
    107
    +                if job in self.__queue:
    
    108
    +                    self.__queue.remove(job)
    
    109
    +                    __queue_job(self.__queue, job)
    
    110
    +
    
    111
    +            return job
    
    112
    +
    
    113
    +        job = Job(action, action_digest, priority=priority)
    
    46 114
     
    
    47
    -    def queue_job(self, job, skip_cache_lookup=False):
    
    48
    -        self.jobs[job.name] = job
    
    115
    +        self.__jobs_by_action[job.action_digest.hash] = job
    
    116
    +        self.__jobs_by_name[job.name] = job
    
    49 117
     
    
    50 118
             operation_stage = None
    
    51 119
             if self._action_cache is not None and not skip_cache_lookup:
    
    52 120
                 try:
    
    53 121
                     action_result = self._action_cache.get_action_result(job.action_digest)
    
    122
    +
    
    54 123
                 except NotFoundError:
    
    55 124
                     operation_stage = OperationStage.QUEUED
    
    56
    -                self.queue.append(job)
    
    125
    +                __queue_job(self.__queue, job)
    
    57 126
     
    
    58 127
                 else:
    
    59 128
                     job.set_cached_result(action_result)
    
    ... ... @@ -61,23 +130,25 @@ class Scheduler:
    61 130
     
    
    62 131
             else:
    
    63 132
                 operation_stage = OperationStage.QUEUED
    
    64
    -            self.queue.append(job)
    
    133
    +            __queue_job(self.__queue, job)
    
    65 134
     
    
    66 135
             job.update_operation_stage(operation_stage)
    
    67 136
     
    
    137
    +        return job
    
    138
    +
    
    68 139
         def retry_job(self, job_name):
    
    69
    -        if job_name in self.jobs:
    
    70
    -            job = self.jobs[job_name]
    
    140
    +        if job_name in self.__jobs_by_name:
    
    141
    +            job = self.__jobs_by_name[job_name]
    
    71 142
                 if job.n_tries >= self.MAX_N_TRIES:
    
    72 143
                     # TODO: Decide what to do with these jobs
    
    73 144
                     job.update_operation_stage(OperationStage.COMPLETED)
    
    74 145
                     # TODO: Mark these jobs as done
    
    75 146
                 else:
    
    76 147
                     job.update_operation_stage(OperationStage.QUEUED)
    
    77
    -                self.queue.appendleft(job)
    
    148
    +                self.__queue.appendleft(job)
    
    78 149
     
    
    79 150
         def list_jobs(self):
    
    80
    -        return self.jobs.values()
    
    151
    +        return self.__jobs_by_name.values()
    
    81 152
     
    
    82 153
         def request_job_leases(self, worker_capabilities):
    
    83 154
             """Generates a list of the highest priority leases to be run.
    
    ... ... @@ -87,14 +158,17 @@ class Scheduler:
    87 158
                     worker properties, configuration and state at the time of the
    
    88 159
                     request.
    
    89 160
             """
    
    90
    -        if not self.queue:
    
    161
    +        if not self.__queue:
    
    91 162
                 return []
    
    92 163
     
    
    93
    -        job = self.queue.popleft()
    
    164
    +        job = self.__queue.popleft()
    
    94 165
             # For now, one lease at a time:
    
    95 166
             lease = job.create_lease()
    
    96 167
     
    
    97
    -        return [lease]
    
    168
    +        if lease:
    
    169
    +            return [lease]
    
    170
    +
    
    171
    +        return None
    
    98 172
     
    
    99 173
         def update_job_lease_state(self, job_name, lease_state, lease_status=None, lease_result=None):
    
    100 174
             """Requests a state transition for a job's current :class:Lease.
    
    ... ... @@ -107,7 +181,7 @@ class Scheduler:
    107 181
                 lease_result (google.protobuf.Any): the lease execution result, only
    
    108 182
                     required if `lease_state` is `COMPLETED`.
    
    109 183
             """
    
    110
    -        job = self.jobs[job_name]
    
    184
    +        job = self.__jobs_by_name[job_name]
    
    111 185
     
    
    112 186
             if lease_state == LeaseState.PENDING:
    
    113 187
                 job.update_lease_state(LeaseState.PENDING)
    
    ... ... @@ -127,9 +201,48 @@ class Scheduler:
    127 201
                 job.update_operation_stage(OperationStage.COMPLETED)
    
    128 202
     
    
    129 203
         def get_job_lease(self, job_name):
    
    130
    -        """Returns the lease associated to job, if any have been emitted yet."""
    
    131
    -        return self.jobs[job_name].lease
    
    204
    +        """Returns the lease associated to job, if any have been emitted yet.
    
    205
    +
    
    206
    +        Args:
    
    207
    +            job_name (str): name of the job to query.
    
    208
    +
    
    209
    +        Raises:
    
    210
    +            NotFoundError: If no job with `job_name` exists.
    
    211
    +        """
    
    212
    +        try:
    
    213
    +            job = self.__jobs_by_name[job_name]
    
    214
    +        except KeyError:
    
    215
    +            raise NotFoundError('No job named {} found.'.format(job_name))
    
    216
    +
    
    217
    +        return job.lease
    
    218
    +
    
    219
    +    def cancel_job_operation(self, job_name):
    
    220
    +        """"Cancels the underlying operation of a given job.
    
    221
    +
    
    222
    +        This will also cancel any job's lease that may have been issued.
    
    223
    +
    
    224
    +        Args:
    
    225
    +            job_name (str): name of the job holding the operation to cancel.
    
    226
    +        """
    
    227
    +        try:
    
    228
    +            job = self.__jobs_by_name[job_name]
    
    229
    +        except KeyError:
    
    230
    +            raise NotFoundError('No job named {} found.'.format(job_name))
    
    231
    +
    
    232
    +        job.cancel_operation()
    
    132 233
     
    
    133 234
         def get_job_operation(self, job_name):
    
    134
    -        """Returns the operation associated to job."""
    
    135
    -        return self.jobs[job_name].operation
    235
    +        """Returns the operation associated to job.
    
    236
    +
    
    237
    +        Args:
    
    238
    +            job_name (str): name of the job to query.
    
    239
    +
    
    240
    +        Raises:
    
    241
    +            NotFoundError: If no job with `job_name` exists.
    
    242
    +        """
    
    243
    +        try:
    
    244
    +            job = self.__jobs_by_name[job_name]
    
    245
    +        except KeyError:
    
    246
    +            raise NotFoundError('No job named {} found.'.format(job_name))
    
    247
    +
    
    248
    +        return job.operation

  • setup.py
    ... ... @@ -116,7 +116,7 @@ setup(
    116 116
             'protobuf',
    
    117 117
             'grpcio',
    
    118 118
             'Click',
    
    119
    -        'pyaml',
    
    119
    +        'PyYAML',
    
    120 120
             'boto3 < 1.8.0',
    
    121 121
             'botocore < 1.11.0',
    
    122 122
         ],
    

  • tests/integration/bots_service.py
    ... ... @@ -26,7 +26,6 @@ import pytest
    26 26
     
    
    27 27
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    28 28
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    29
    -from buildgrid.server import job
    
    30 29
     from buildgrid.server.controller import ExecutionController
    
    31 30
     from buildgrid.server.job import LeaseState
    
    32 31
     from buildgrid.server.bots import service
    
    ... ... @@ -283,7 +282,8 @@ def test_post_bot_event_temp(context, instance):
    283 282
     def _inject_work(scheduler, action=None, action_digest=None):
    
    284 283
         if not action:
    
    285 284
             action = remote_execution_pb2.Action()
    
    285
    +
    
    286 286
         if not action_digest:
    
    287 287
             action_digest = remote_execution_pb2.Digest()
    
    288
    -    j = job.Job(action, action_digest)
    
    289
    -    scheduler.queue_job(j, True)
    288
    +
    
    289
    +    scheduler.queue_job(action, action_digest, skip_cache_lookup=True)

  • tests/integration/execution_service.py
    ... ... @@ -106,13 +106,13 @@ def test_no_action_digest_in_storage(instance, context):
    106 106
     
    
    107 107
     
    
    108 108
     def test_wait_execution(instance, controller, context):
    
    109
    -    j = job.Job(action, action_digest)
    
    109
    +    j = controller.execution_instance._scheduler.queue_job(action,
    
    110
    +                                                           action_digest,
    
    111
    +                                                           skip_cache_lookup=True)
    
    110 112
         j._operation.done = True
    
    111 113
     
    
    112 114
         request = remote_execution_pb2.WaitExecutionRequest(name=j.name)
    
    113 115
     
    
    114
    -    controller.execution_instance._scheduler.jobs[j.name] = j
    
    115
    -
    
    116 116
         action_result_any = any_pb2.Any()
    
    117 117
         action_result = remote_execution_pb2.ActionResult()
    
    118 118
         action_result_any.Pack(action_result)
    

  • tests/integration/operations_service.py
    ... ... @@ -24,6 +24,7 @@ import grpc
    24 24
     from grpc._server import _Context
    
    25 25
     import pytest
    
    26 26
     
    
    27
    +from buildgrid._enums import OperationStage
    
    27 28
     from buildgrid._exceptions import InvalidArgumentError
    
    28 29
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    29 30
     from buildgrid._protos.google.longrunning import operations_pb2
    
    ... ... @@ -173,7 +174,7 @@ def test_list_operations_with_result(instance, controller, execute_request, cont
    173 174
         output_file = remote_execution_pb2.OutputFile(path='unicorn')
    
    174 175
         action_result.output_files.extend([output_file])
    
    175 176
     
    
    176
    -    controller.operations_instance._scheduler.jobs[response_execute.name].create_lease()
    
    177
    +    controller.operations_instance._scheduler.request_job_leases({})
    
    177 178
         controller.operations_instance._scheduler.update_job_lease_state(response_execute.name,
    
    178 179
                                                                          LeaseState.COMPLETED,
    
    179 180
                                                                          lease_status=status_pb2.Status(),
    
    ... ... @@ -236,12 +237,26 @@ def test_delete_operation_fail(instance, context):
    236 237
         context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
    
    237 238
     
    
    238 239
     
    
    239
    -def test_cancel_operation(instance, context):
    
    240
    +def test_cancel_operation(instance, controller, execute_request, context):
    
    241
    +    response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    242
    +                                                             execute_request.skip_cache_lookup)
    
    243
    +
    
    240 244
         request = operations_pb2.CancelOperationRequest()
    
    241
    -    request.name = "{}/{}".format(instance_name, "runner")
    
    245
    +    request.name = "{}/{}".format(instance_name, response_execute.name)
    
    246
    +
    
    242 247
         instance.CancelOperation(request, context)
    
    243 248
     
    
    244
    -    context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
    
    249
    +    context.set_code.assert_called_once_with(grpc.StatusCode.CANCELLED)
    
    250
    +
    
    251
    +    request = operations_pb2.ListOperationsRequest(name=instance_name)
    
    252
    +    response = instance.ListOperations(request, context)
    
    253
    +
    
    254
    +    assert len(response.operations) is 1
    
    255
    +
    
    256
    +    for operation in response.operations:
    
    257
    +        operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    258
    +        operation.metadata.Unpack(operation_metadata)
    
    259
    +        assert operation_metadata.stage == OperationStage.COMPLETED.value
    
    245 260
     
    
    246 261
     
    
    247 262
     def test_cancel_operation_blank(blank_instance, context):
    
    ... ... @@ -249,7 +264,7 @@ def test_cancel_operation_blank(blank_instance, context):
    249 264
         request.name = "runner"
    
    250 265
         blank_instance.CancelOperation(request, context)
    
    251 266
     
    
    252
    -    context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
    
    267
    +    context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
    
    253 268
     
    
    254 269
     
    
    255 270
     def test_cancel_operation_instance_fail(instance, context):
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]