[Notes] [Git][BuildGrid/buildgrid][mablanch/75-requests-multiplexing] 10 commits: Downloader._fetch_directory(): fix error handling



Title: GitLab

Martin Blanchard pushed to branch mablanch/75-requests-multiplexing at BuildGrid / buildgrid

Commits:

11 changed files:

Changes:

  • buildgrid/client/cas.py
    ... ... @@ -391,7 +391,7 @@ class Downloader:
    391 391
                 except grpc.RpcError as e:
    
    392 392
                     status_code = e.code()
    
    393 393
                     if status_code == grpc.StatusCode.UNIMPLEMENTED:
    
    394
    -                    _CallCache.mark_unimplemented(self.channel, 'BatchUpdateBlobs')
    
    394
    +                    _CallCache.mark_unimplemented(self.channel, 'GetTree')
    
    395 395
     
    
    396 396
                     elif status_code == grpc.StatusCode.NOT_FOUND:
    
    397 397
                         raise NotFoundError("Requested directory does not exist on the remote.")
    

  • buildgrid/server/bots/instance.py
    ... ... @@ -126,7 +126,7 @@ class BotsInterface:
    126 126
                 # Job does not exist, remove from bot.
    
    127 127
                 return None
    
    128 128
     
    
    129
    -        self._scheduler.update_job_lease(lease)
    
    129
    +        self._scheduler.update_job_lease_state(lease.id, lease)
    
    130 130
     
    
    131 131
             if lease_state == LeaseState.COMPLETED:
    
    132 132
                 return None
    
    ... ... @@ -164,7 +164,7 @@ class BotsInterface:
    164 164
                     self.__logger.error("Assigned lease id=[%s],"
    
    165 165
                                         " not found on bot with name=[%s] and id=[%s]."
    
    166 166
                                         " Retrying job", lease_id, bot_session.name, bot_session.bot_id)
    
    167
    -                self._scheduler.retry_job(lease_id)
    
    167
    +                self._scheduler.retry_job_lease(lease_id)
    
    168 168
     
    
    169 169
         def _close_bot_session(self, name):
    
    170 170
             """ Before removing the session, close any leases and
    
    ... ... @@ -177,7 +177,7 @@ class BotsInterface:
    177 177
     
    
    178 178
             self.__logger.debug("Attempting to close [%s] with name: [%s]", bot_id, name)
    
    179 179
             for lease_id in self._assigned_leases[name]:
    
    180
    -            self._scheduler.retry_job(lease_id)
    
    180
    +            self._scheduler.retry_job_lease(lease_id)
    
    181 181
             self._assigned_leases.pop(name)
    
    182 182
     
    
    183 183
             self.__logger.debug("Closing bot session: [%s]", name)
    

  • buildgrid/server/execution/instance.py
    ... ... @@ -21,11 +21,9 @@ An instance of the Remote Execution Service.
    21 21
     
    
    22 22
     import logging
    
    23 23
     
    
    24
    -from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
    
    24
    +from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, NotFoundError
    
    25 25
     from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
    
    26
    -
    
    27
    -from ..job import Job
    
    28
    -from ...utils import get_hash_type
    
    26
    +from buildgrid.utils import get_hash_type
    
    29 27
     
    
    30 28
     
    
    31 29
     class ExecutionInstance:
    
    ... ... @@ -46,44 +44,46 @@ class ExecutionInstance:
    46 44
         def hash_type(self):
    
    47 45
             return get_hash_type()
    
    48 46
     
    
    49
    -    def execute(self, action_digest, skip_cache_lookup, message_queue=None):
    
    47
    +    def execute(self, action_digest, skip_cache_lookup):
    
    50 48
             """ Sends a job for execution.
    
    51 49
             Queues an action and creates an Operation instance to be associated with
    
    52 50
             this action.
    
    53 51
             """
    
    54
    -
    
    55 52
             action = self._storage.get_message(action_digest, Action)
    
    56 53
     
    
    57 54
             if not action:
    
    58 55
                 raise FailedPreconditionError("Could not get action from storage.")
    
    59 56
     
    
    60
    -        job = Job(action, action_digest)
    
    61
    -        if message_queue is not None:
    
    62
    -            job.register_client(message_queue)
    
    57
    +        return self._scheduler.queue_job_operation(action, action_digest,
    
    58
    +                                                   skip_cache_lookup=skip_cache_lookup)
    
    63 59
     
    
    64
    -        self._scheduler.queue_job(job, skip_cache_lookup)
    
    60
    +    def register_operation_peer(self, operation_name, peer, message_queue):
    
    61
    +        try:
    
    62
    +            return self._scheduler.register_job_operation_peer(operation_name,
    
    63
    +                                                               peer, message_queue)
    
    65 64
     
    
    66
    -        return job.operation
    
    65
    +        except NotFoundError:
    
    66
    +            raise InvalidArgumentError("Operation name does not exist: [{}]"
    
    67
    +                                       .format(operation_name))
    
    67 68
     
    
    68
    -    def register_message_client(self, name, queue):
    
    69
    +    def unregister_operation_peer(self, operation_name, peer):
    
    69 70
             try:
    
    70
    -            self._scheduler.register_client(name, queue)
    
    71
    +            self._scheduler.unregister_job_operation_peer(operation_name, peer)
    
    71 72
     
    
    72
    -        except KeyError:
    
    73
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    73
    +        except NotFoundError:
    
    74
    +            raise InvalidArgumentError("Operation name does not exist: [{}]"
    
    75
    +                                       .format(operation_name))
    
    74 76
     
    
    75
    -    def unregister_message_client(self, name, queue):
    
    76
    -        try:
    
    77
    -            self._scheduler.unregister_client(name, queue)
    
    77
    +    def stream_operation_updates(self, message_queue):
    
    78
    +        error, operation = message_queue.get()
    
    79
    +        if error is not None:
    
    80
    +            raise error
    
    78 81
     
    
    79
    -        except KeyError:
    
    80
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    82
    +        while not operation.done:
    
    83
    +            yield operation
    
    81 84
     
    
    82
    -    def stream_operation_updates(self, message_queue, operation_name):
    
    83
    -        job = message_queue.get()
    
    84
    -        while not job.operation.done:
    
    85
    -            yield job.operation
    
    86
    -            job = message_queue.get()
    
    87
    -            job.check_operation_status()
    
    85
    +            error, operation = message_queue.get()
    
    86
    +            if error is not None:
    
    87
    +                raise error
    
    88 88
     
    
    89
    -        yield job.operation
    89
    +        yield operation

  • buildgrid/server/execution/service.py
    ... ... @@ -98,12 +98,15 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    98 98
     
    
    99 99
             try:
    
    100 100
                 instance = self._get_instance(instance_name)
    
    101
    -            operation = instance.execute(request.action_digest,
    
    102
    -                                         request.skip_cache_lookup,
    
    103
    -                                         message_queue)
    
    101
    +
    
    102
    +            job_name = instance.execute(request.action_digest,
    
    103
    +                                        request.skip_cache_lookup)
    
    104
    +
    
    105
    +            operation_name = instance.register_operation_peer(job_name,
    
    106
    +                                                              peer, message_queue)
    
    104 107
     
    
    105 108
                 context.add_callback(partial(self._rpc_termination_callback,
    
    106
    -                                         peer, instance_name, operation.name, message_queue))
    
    109
    +                                         peer, instance_name, operation_name))
    
    107 110
     
    
    108 111
                 if self._is_instrumented:
    
    109 112
                     if peer not in self.__peers:
    
    ... ... @@ -112,16 +115,13 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    112 115
                     else:
    
    113 116
                         self.__peers[peer] += 1
    
    114 117
     
    
    115
    -            instanced_op_name = "{}/{}".format(instance_name, operation.name)
    
    118
    +            operation_full_name = "{}/{}".format(instance_name, operation_name)
    
    116 119
     
    
    117
    -            self.__logger.info("Operation name: [%s]", instanced_op_name)
    
    120
    +            self.__logger.info("Operation name: [%s]", operation_full_name)
    
    118 121
     
    
    119
    -            for operation in instance.stream_operation_updates(message_queue,
    
    120
    -                                                               operation.name):
    
    121
    -                op = operations_pb2.Operation()
    
    122
    -                op.CopyFrom(operation)
    
    123
    -                op.name = instanced_op_name
    
    124
    -                yield op
    
    122
    +            for operation in instance.stream_operation_updates(message_queue):
    
    123
    +                operation.name = operation_full_name
    
    124
    +                yield operation
    
    125 125
     
    
    126 126
             except InvalidArgumentError as e:
    
    127 127
                 self.__logger.error(e)
    
    ... ... @@ -160,9 +160,11 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    160 160
             try:
    
    161 161
                 instance = self._get_instance(instance_name)
    
    162 162
     
    
    163
    -            instance.register_message_client(operation_name, message_queue)
    
    163
    +            operation_name = instance.register_operation_peer(operation_name,
    
    164
    +                                                              peer, message_queue)
    
    165
    +
    
    164 166
                 context.add_callback(partial(self._rpc_termination_callback,
    
    165
    -                                         peer, instance_name, operation_name, message_queue))
    
    167
    +                                         peer, instance_name, operation_name))
    
    166 168
     
    
    167 169
                 if self._is_instrumented:
    
    168 170
                     if peer not in self.__peers:
    
    ... ... @@ -171,12 +173,11 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    171 173
                     else:
    
    172 174
                         self.__peers[peer] += 1
    
    173 175
     
    
    174
    -            for operation in instance.stream_operation_updates(message_queue,
    
    175
    -                                                               operation_name):
    
    176
    -                op = operations_pb2.Operation()
    
    177
    -                op.CopyFrom(operation)
    
    178
    -                op.name = request.name
    
    179
    -                yield op
    
    176
    +            operation_full_name = "{}/{}".format(instance_name, operation_name)
    
    177
    +
    
    178
    +            for operation in instance.stream_operation_updates(message_queue):
    
    179
    +                operation.name = operation_full_name
    
    180
    +                yield operation
    
    180 181
     
    
    181 182
             except InvalidArgumentError as e:
    
    182 183
                 self.__logger.error(e)
    
    ... ... @@ -211,10 +212,10 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    211 212
     
    
    212 213
         # --- Private API ---
    
    213 214
     
    
    214
    -    def _rpc_termination_callback(self, peer, instance_name, job_name, message_queue):
    
    215
    +    def _rpc_termination_callback(self, peer, instance_name, operation_name):
    
    215 216
             instance = self._get_instance(instance_name)
    
    216 217
     
    
    217
    -        instance.unregister_message_client(job_name, message_queue)
    
    218
    +        instance.unregister_operation_peer(operation_name, peer)
    
    218 219
     
    
    219 220
             if self._is_instrumented:
    
    220 221
                 if self.__peers[peer] > 1:
    

  • buildgrid/server/job.py
    ... ... @@ -20,7 +20,7 @@ import uuid
    20 20
     from google.protobuf import duration_pb2, timestamp_pb2
    
    21 21
     
    
    22 22
     from buildgrid._enums import LeaseState, OperationStage
    
    23
    -from buildgrid._exceptions import CancelledError
    
    23
    +from buildgrid._exceptions import CancelledError, NotFoundError
    
    24 24
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    25 25
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    26 26
     from buildgrid._protos.google.longrunning import operations_pb2
    
    ... ... @@ -29,35 +29,70 @@ from buildgrid._protos.google.rpc import code_pb2
    29 29
     
    
    30 30
     class Job:
    
    31 31
     
    
    32
    -    def __init__(self, action, action_digest):
    
    32
    +    def __init__(self, action, action_digest, priority=0):
    
    33 33
             self.__logger = logging.getLogger(__name__)
    
    34 34
     
    
    35 35
             self._name = str(uuid.uuid4())
    
    36
    +        self._priority = priority
    
    36 37
             self._action = remote_execution_pb2.Action()
    
    37
    -        self._operation = operations_pb2.Operation()
    
    38 38
             self._lease = None
    
    39 39
     
    
    40 40
             self.__execute_response = None
    
    41 41
             self.__operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    42
    +        self.__operations_by_name = {}  # Name to Operation 1:1 mapping
    
    43
    +        self.__operations_by_peer = {}  # Peer to Operation 1:1 mapping
    
    42 44
     
    
    43 45
             self.__queued_timestamp = timestamp_pb2.Timestamp()
    
    44 46
             self.__queued_time_duration = duration_pb2.Duration()
    
    45 47
             self.__worker_start_timestamp = timestamp_pb2.Timestamp()
    
    46 48
             self.__worker_completed_timestamp = timestamp_pb2.Timestamp()
    
    47 49
     
    
    48
    -        self.__operation_cancelled = False
    
    50
    +        self.__operations_message_queues = {}
    
    51
    +        self.__operations_cancelled = set()
    
    49 52
             self.__lease_cancelled = False
    
    53
    +        self.__job_cancelled = False
    
    50 54
     
    
    51 55
             self.__operation_metadata.action_digest.CopyFrom(action_digest)
    
    52 56
             self.__operation_metadata.stage = OperationStage.UNKNOWN.value
    
    53 57
     
    
    54 58
             self._action.CopyFrom(action)
    
    55 59
             self._do_not_cache = self._action.do_not_cache
    
    56
    -        self._operation_update_queues = []
    
    57
    -        self._operation.name = self._name
    
    58
    -        self._operation.done = False
    
    59 60
             self._n_tries = 0
    
    60 61
     
    
    62
    +        self._done = False
    
    63
    +
    
    64
    +    def __lt__(self, other):
    
    65
    +        try:
    
    66
    +            return self.priority < other.priority
    
    67
    +        except AttributeError:
    
    68
    +            return NotImplemented
    
    69
    +
    
    70
    +    def __le__(self, other):
    
    71
    +        try:
    
    72
    +            return self.priority <= other.priority
    
    73
    +        except AttributeError:
    
    74
    +            return NotImplemented
    
    75
    +
    
    76
    +    def __eq__(self, other):
    
    77
    +        if isinstance(other, Job):
    
    78
    +            return self.name == other.name
    
    79
    +        return False
    
    80
    +
    
    81
    +    def __ne__(self, other):
    
    82
    +        return not self.__eq__(other)
    
    83
    +
    
    84
    +    def __gt__(self, other):
    
    85
    +        try:
    
    86
    +            return self.priority > other.priority
    
    87
    +        except AttributeError:
    
    88
    +            return NotImplemented
    
    89
    +
    
    90
    +    def __ge__(self, other):
    
    91
    +        try:
    
    92
    +            return self.priority >= other.priority
    
    93
    +        except AttributeError:
    
    94
    +            return NotImplemented
    
    95
    +
    
    61 96
         # --- Public API ---
    
    62 97
     
    
    63 98
         @property
    
    ... ... @@ -65,17 +100,27 @@ class Job:
    65 100
             return self._name
    
    66 101
     
    
    67 102
         @property
    
    68
    -    def do_not_cache(self):
    
    69
    -        return self._do_not_cache
    
    103
    +    def priority(self):
    
    104
    +        return self._priority
    
    105
    +
    
    106
    +    @property
    
    107
    +    def done(self):
    
    108
    +        return self._done
    
    109
    +
    
    110
    +    # --- Public API: REAPI ---
    
    70 111
     
    
    71 112
         @property
    
    72
    -    def action(self):
    
    73
    -        return self._action
    
    113
    +    def do_not_cache(self):
    
    114
    +        return self._do_not_cache
    
    74 115
     
    
    75 116
         @property
    
    76 117
         def action_digest(self):
    
    77 118
             return self.__operation_metadata.action_digest
    
    78 119
     
    
    120
    +    @property
    
    121
    +    def operation_stage(self):
    
    122
    +        return OperationStage(self.__operation_metadata.stage)
    
    123
    +
    
    79 124
         @property
    
    80 125
         def action_result(self):
    
    81 126
             if self.__execute_response is not None:
    
    ... ... @@ -84,19 +129,177 @@ class Job:
    84 129
                 return None
    
    85 130
     
    
    86 131
         @property
    
    87
    -    def holds_cached_action_result(self):
    
    132
    +    def holds_cached_result(self):
    
    88 133
             if self.__execute_response is not None:
    
    89 134
                 return self.__execute_response.cached_result
    
    90 135
             else:
    
    91 136
                 return False
    
    92 137
     
    
    93
    -    @property
    
    94
    -    def operation(self):
    
    95
    -        return self._operation
    
    138
    +    def set_cached_result(self, action_result):
    
    139
    +        """Allows specifying an action result form the action cache for the job.
    
    140
    +
    
    141
    +        Note:
    
    142
    +            This won't trigger any :class:`Operation` stage transition.
    
    143
    +
    
    144
    +        Args:
    
    145
    +            action_result (ActionResult): The result from cache.
    
    146
    +        """
    
    147
    +        self.__execute_response = remote_execution_pb2.ExecuteResponse()
    
    148
    +        self.__execute_response.result.CopyFrom(action_result)
    
    149
    +        self.__execute_response.cached_result = True
    
    96 150
     
    
    97 151
         @property
    
    98
    -    def operation_stage(self):
    
    99
    -        return OperationStage(self.__operation_metadata.state)
    
    152
    +    def n_peers(self):
    
    153
    +        return len(self.__operations_message_queues)
    
    154
    +
    
    155
    +    def register_operation_peer(self, peer, message_queue):
    
    156
    +        """Subscribes to the job's :class:`Operation` stage changes.
    
    157
    +
    
    158
    +        Args:
    
    159
    +            peer (str): a unique string identifying the client.
    
    160
    +            message_queue (queue.Queue): the event queue to register.
    
    161
    +
    
    162
    +        Returns:
    
    163
    +            str: The name of the subscribed :class:`Operation`.
    
    164
    +        """
    
    165
    +        if peer in self.__operations_by_peer:
    
    166
    +            operation = self.__operations_by_peer[peer]
    
    167
    +        else:
    
    168
    +            operation = self.create_operation_for_peer(peer)
    
    169
    +
    
    170
    +        self.__operations_message_queues[peer] = message_queue
    
    171
    +
    
    172
    +        self._send_operations_updates(peers=[peer])
    
    173
    +
    
    174
    +        return operation.name
    
    175
    +
    
    176
    +    def unregister_operation_peer(self, peer):
    
    177
    +        """Unsubscribes to the job's :class:`Operation` stage change.
    
    178
    +
    
    179
    +        Args:
    
    180
    +            peer (str): a unique string identifying the client.
    
    181
    +        """
    
    182
    +        if peer in self.__operations_message_queues:
    
    183
    +            del self.__operations_message_queues[peer]
    
    184
    +
    
    185
    +        # Drop the operation if nobody is watching it anymore:
    
    186
    +        if peer in self.__operations_by_peer:
    
    187
    +            operation = self.__operations_by_peer.pop(peer)
    
    188
    +
    
    189
    +            if operation not in self.__operations_by_peer.values():
    
    190
    +                del self.__operations_by_name[operation.name]
    
    191
    +
    
    192
    +            self.__operations_cancelled.discard(operation.name)
    
    193
    +
    
    194
    +    def create_operation_for_peer(self, peer):
    
    195
    +        """Generates a new :class:`Operation` for `peer`.
    
    196
    +
    
    197
    +        Args:
    
    198
    +            peer (str): a unique string identifying the client.
    
    199
    +        """
    
    200
    +        if peer in self.__operations_by_peer:
    
    201
    +            return self.__operations_by_peer[peer]
    
    202
    +
    
    203
    +        new_operation = operations_pb2.Operation()
    
    204
    +        # Copy state from first existing and non cancelled operation:
    
    205
    +        for operation in self.__operations_by_name.values():
    
    206
    +            if operation.name not in self.__operations_cancelled:
    
    207
    +                new_operation.CopyFrom(operation)
    
    208
    +                break
    
    209
    +
    
    210
    +        new_operation.name = str(uuid.uuid4())
    
    211
    +
    
    212
    +        self.__operations_by_name[new_operation.name] = new_operation
    
    213
    +        self.__operations_by_peer[peer] = new_operation
    
    214
    +
    
    215
    +        return new_operation
    
    216
    +
    
    217
    +    def cancel_operation_for_peer(self, peer):
    
    218
    +        """Triggers a job's :class:`Operation` cancellation.
    
    219
    +
    
    220
    +        This may cancel any job's :class:`Lease` that may have been issued.
    
    221
    +
    
    222
    +        Args:
    
    223
    +            peer (str): a unique string identifying the client.
    
    224
    +        """
    
    225
    +        operations_to_cancel, peers_to_notify = set(), set()
    
    226
    +        # If the peer is watching the job, only cancel its operation, if not,
    
    227
    +        # cancel the entire job (including all operations and lease):
    
    228
    +        if peer in self.__operations_by_peer:
    
    229
    +            operations_to_cancel.add(self.__operations_by_peer[peer].name)
    
    230
    +            peers_to_notify.add(peer)
    
    231
    +
    
    232
    +        else:
    
    233
    +            operations_to_cancel.update(self.__operations_by_name.keys())
    
    234
    +            peers_to_notify.update(self.__operations_by_peer.keys())
    
    235
    +
    
    236
    +        operations_to_cancel = operations_to_cancel - self.__operations_cancelled
    
    237
    +        if not operations_to_cancel:
    
    238
    +            return
    
    239
    +
    
    240
    +        self.__operations_cancelled.update(operations_to_cancel)
    
    241
    +
    
    242
    +        ongoing_operations = set(self.__operations_by_name.keys())
    
    243
    +        # Job is cancelled if all the operation are:
    
    244
    +        self.__job_cancelled = ongoing_operations.issubset(self.__operations_cancelled)
    
    245
    +
    
    246
    +        if self.__job_cancelled and self._lease is not None:
    
    247
    +            self.cancel_lease()
    
    248
    +
    
    249
    +        self._send_operations_updates(peers=peers_to_notify, notify_cancelled=True)
    
    250
    +
    
    251
    +    def list_operations(self):
    
    252
    +        """Lists the :class:`Operation` related to a job.
    
    253
    +
    
    254
    +        Returns:
    
    255
    +            list: A list of :class:`Operation` names.
    
    256
    +        """
    
    257
    +        return list(self.__operations_by_name.keys())
    
    258
    +
    
    259
    +    def get_operation(self, operation_name):
    
    260
    +        """Returns a copy of the the job's :class:`Operation`.
    
    261
    +
    
    262
    +        Args:
    
    263
    +            operation_name (str): the operation's name.
    
    264
    +
    
    265
    +        Raises:
    
    266
    +            NotFoundError: If no operation with `operation_name` exists.
    
    267
    +        """
    
    268
    +        try:
    
    269
    +            operation = self.__operations_by_name[operation_name]
    
    270
    +
    
    271
    +        except KeyError:
    
    272
    +            raise NotFoundError("Operation name does not exist: [{}]"
    
    273
    +                                .format(operation_name))
    
    274
    +
    
    275
    +        return self._copy_operation(operation)
    
    276
    +
    
    277
    +    def update_operation_stage(self, stage):
    
    278
    +        """Operates a stage transition for the job's :class:`Operation`.
    
    279
    +
    
    280
    +        Args:
    
    281
    +            stage (OperationStage): the operation stage to transition to.
    
    282
    +        """
    
    283
    +        if stage.value == self.__operation_metadata.stage:
    
    284
    +            return
    
    285
    +
    
    286
    +        self.__operation_metadata.stage = stage.value
    
    287
    +
    
    288
    +        if self.__operation_metadata.stage == OperationStage.QUEUED.value:
    
    289
    +            if self.__queued_timestamp.ByteSize() == 0:
    
    290
    +                self.__queued_timestamp.GetCurrentTime()
    
    291
    +            self._n_tries += 1
    
    292
    +
    
    293
    +        elif self.__operation_metadata.stage == OperationStage.EXECUTING.value:
    
    294
    +            queue_in, queue_out = self.__queued_timestamp.ToDatetime(), datetime.now()
    
    295
    +            self.__queued_time_duration.FromTimedelta(queue_out - queue_in)
    
    296
    +
    
    297
    +        elif self.__operation_metadata.stage == OperationStage.COMPLETED.value:
    
    298
    +            self._done = True
    
    299
    +
    
    300
    +        self._send_operations_updates()
    
    301
    +
    
    302
    +    # --- Public API: RWAPI ---
    
    100 303
     
    
    101 304
         @property
    
    102 305
         def lease(self):
    
    ... ... @@ -117,45 +320,15 @@ class Job:
    117 320
         def n_tries(self):
    
    118 321
             return self._n_tries
    
    119 322
     
    
    120
    -    @property
    
    121
    -    def n_clients(self):
    
    122
    -        return len(self._operation_update_queues)
    
    123
    -
    
    124
    -    def register_client(self, queue):
    
    125
    -        """Subscribes to the job's :class:`Operation` stage change events.
    
    126
    -
    
    127
    -        Queues this :object:`Job` instance.
    
    128
    -
    
    129
    -        Args:
    
    130
    -            queue (queue.Queue): the event queue to register.
    
    131
    -        """
    
    132
    -        self._operation_update_queues.append(queue)
    
    133
    -        queue.put(self)
    
    134
    -
    
    135
    -    def unregister_client(self, queue):
    
    136
    -        """Unsubscribes to the job's :class:`Operation` stage change events.
    
    137
    -
    
    138
    -        Args:
    
    139
    -            queue (queue.Queue): the event queue to unregister.
    
    140
    -        """
    
    141
    -        self._operation_update_queues.remove(queue)
    
    142
    -
    
    143
    -    def set_cached_result(self, action_result):
    
    144
    -        """Allows specifying an action result form the action cache for the job.
    
    145
    -        """
    
    146
    -        self.__execute_response = remote_execution_pb2.ExecuteResponse()
    
    147
    -        self.__execute_response.result.CopyFrom(action_result)
    
    148
    -        self.__execute_response.cached_result = True
    
    149
    -
    
    150 323
         def create_lease(self):
    
    151 324
             """Emits a new :class:`Lease` for the job.
    
    152 325
     
    
    153 326
             Only one :class:`Lease` can be emitted for a given job. This method
    
    154
    -        should only be used once, any furhter calls are ignored.
    
    327
    +        should only be used once, any further calls are ignored.
    
    155 328
             """
    
    156
    -        if self.__operation_cancelled:
    
    157
    -            return None
    
    158
    -        elif self._lease is not None:
    
    329
    +        if self._lease is not None:
    
    330
    +            return self._lease
    
    331
    +        elif self.__job_cancelled:
    
    159 332
                 return None
    
    160 333
     
    
    161 334
             self._lease = bots_pb2.Lease()
    
    ... ... @@ -166,14 +339,14 @@ class Job:
    166 339
             return self._lease
    
    167 340
     
    
    168 341
         def update_lease_state(self, state, status=None, result=None):
    
    169
    -        """Operates a state transition for the job's current :class:Lease.
    
    342
    +        """Operates a state transition for the job's current :class:`Lease`.
    
    170 343
     
    
    171 344
             Args:
    
    172 345
                 state (LeaseState): the lease state to transition to.
    
    173
    -            status (google.rpc.Status): the lease execution status, only
    
    174
    -                required if `state` is `COMPLETED`.
    
    175
    -            result (google.protobuf.Any): the lease execution result, only
    
    176
    -                required if `state` is `COMPLETED`.
    
    346
    +            status (google.rpc.Status, optional): the lease execution status,
    
    347
    +                only required if `state` is `COMPLETED`.
    
    348
    +            result (google.protobuf.Any, optional): the lease execution result,
    
    349
    +                only required if `state` is `COMPLETED`.
    
    177 350
             """
    
    178 351
             if state.value == self._lease.state:
    
    179 352
                 return
    
    ... ... @@ -214,79 +387,96 @@ class Job:
    214 387
                 self.__execute_response.status.CopyFrom(status)
    
    215 388
     
    
    216 389
         def cancel_lease(self):
    
    217
    -        """Triggers a job's :class:Lease cancellation.
    
    390
    +        """Triggers a job's :class:`Lease` cancellation.
    
    218 391
     
    
    219
    -        This will not cancel the job's :class:Operation.
    
    392
    +        Note:
    
    393
    +            This will not cancel the job's :class:`Operation`.
    
    220 394
             """
    
    221 395
             self.__lease_cancelled = True
    
    222 396
             if self._lease is not None:
    
    223 397
                 self.update_lease_state(LeaseState.CANCELLED)
    
    224 398
     
    
    225 399
         def delete_lease(self):
    
    226
    -        """Discard the job's :class:Lease."""
    
    400
    +        """Discard the job's :class:`Lease`.
    
    401
    +
    
    402
    +        Note:
    
    403
    +            This will not cancel the job's :class:`Operation`.
    
    404
    +        """
    
    227 405
             self.__worker_start_timestamp.Clear()
    
    228 406
             self.__worker_completed_timestamp.Clear()
    
    229 407
     
    
    230 408
             self._lease = None
    
    231 409
     
    
    232
    -    def update_operation_stage(self, stage):
    
    233
    -        """Operates a stage transition for the job's :class:Operation.
    
    410
    +    # --- Public API: Monitoring ---
    
    234 411
     
    
    235
    -        Args:
    
    236
    -            stage (OperationStage): the operation stage to transition to.
    
    237
    -        """
    
    238
    -        if stage.value == self.__operation_metadata.stage:
    
    239
    -            return
    
    412
    +    def query_queue_time(self):
    
    413
    +        return self.__queued_time_duration.ToTimedelta()
    
    240 414
     
    
    241
    -        self.__operation_metadata.stage = stage.value
    
    415
    +    def query_n_retries(self):
    
    416
    +        return self._n_tries - 1 if self._n_tries > 0 else 0
    
    242 417
     
    
    243
    -        if self.__operation_metadata.stage == OperationStage.QUEUED.value:
    
    244
    -            if self.__queued_timestamp.ByteSize() == 0:
    
    245
    -                self.__queued_timestamp.GetCurrentTime()
    
    246
    -            self._n_tries += 1
    
    418
    +    # --- Private API ---
    
    247 419
     
    
    248
    -        elif self.__operation_metadata.stage == OperationStage.EXECUTING.value:
    
    249
    -            queue_in, queue_out = self.__queued_timestamp.ToDatetime(), datetime.now()
    
    250
    -            self.__queued_time_duration.FromTimedelta(queue_out - queue_in)
    
    420
    +    def _copy_operation(self, operation):
    
    421
    +        """Simply duplicates a given :class:`Lease` object."""
    
    422
    +        new_operation = operations_pb2.Operation()
    
    423
    +        new_operation.CopyFrom(operation)
    
    251 424
     
    
    252
    -        elif self.__operation_metadata.stage == OperationStage.COMPLETED.value:
    
    253
    -            if self.__execute_response is not None:
    
    254
    -                self._operation.response.Pack(self.__execute_response)
    
    255
    -            self._operation.done = True
    
    425
    +        return new_operation
    
    256 426
     
    
    257
    -        self._operation.metadata.Pack(self.__operation_metadata)
    
    427
    +    def _update_operation(self, operation, operation_metadata, execute_response=None, done=False):
    
    428
    +        """Forges a :class:`Operation` message given input data."""
    
    429
    +        operation.metadata.Pack(operation_metadata)
    
    258 430
     
    
    259
    -        for queue in self._operation_update_queues:
    
    260
    -            queue.put(self)
    
    431
    +        if execute_response is not None:
    
    432
    +            operation.response.Pack(execute_response)
    
    261 433
     
    
    262
    -    def check_operation_status(self):
    
    263
    -        """Reports errors on unexpected job's :class:Operation state.
    
    434
    +        operation.done = done
    
    264 435
     
    
    265
    -        Raises:
    
    266
    -            CancelledError: if the job's :class:Operation was cancelled.
    
    267
    -        """
    
    268
    -        if self.__operation_cancelled:
    
    269
    -            raise CancelledError(self.__execute_response.status.message)
    
    436
    +    def _update_cancelled_operation(self, operation, operation_metadata, execute_response=None):
    
    437
    +        """Forges a cancelled :class:`Operation` message given input data."""
    
    438
    +        cancelled_operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    439
    +        cancelled_operation_metadata.CopyFrom(operation_metadata)
    
    440
    +        cancelled_operation_metadata.stage = OperationStage.COMPLETED.value
    
    270 441
     
    
    271
    -    def cancel_operation(self):
    
    272
    -        """Triggers a job's :class:Operation cancellation.
    
    442
    +        operation.metadata.Pack(cancelled_operation_metadata)
    
    273 443
     
    
    274
    -        This will also cancel any job's :class:Lease that may have been issued.
    
    275
    -        """
    
    276
    -        self.__operation_cancelled = True
    
    277
    -        if self._lease is not None:
    
    278
    -            self.cancel_lease()
    
    444
    +        cancelled_execute_response = remote_execution_pb2.ExecuteResponse()
    
    445
    +        if execute_response is not None:
    
    446
    +            cancelled_execute_response.CopyFrom(self.__execute_response)
    
    447
    +        cancelled_execute_response.status.code = code_pb2.CANCELLED
    
    448
    +        cancelled_execute_response.status.message = "Operation cancelled by client."
    
    279 449
     
    
    280
    -        self.__execute_response = remote_execution_pb2.ExecuteResponse()
    
    281
    -        self.__execute_response.status.code = code_pb2.CANCELLED
    
    282
    -        self.__execute_response.status.message = "Operation cancelled by client."
    
    450
    +        operation.response.Pack(cancelled_execute_response)
    
    283 451
     
    
    284
    -        self.update_operation_stage(OperationStage.COMPLETED)
    
    452
    +        operation.done = True
    
    285 453
     
    
    286
    -    # --- Public API: Monitoring ---
    
    454
    +    def _send_operations_updates(self, peers=None, notify_cancelled=False):
    
    455
    +        """Sends :class:`Operation` stage change messages to watchers."""
    
    456
    +        for operation in self.__operations_by_name.values():
    
    457
    +            if operation.name in self.__operations_cancelled:
    
    458
    +                self._update_cancelled_operation(operation, self.__operation_metadata,
    
    459
    +                                                 execute_response=self.__execute_response)
    
    287 460
     
    
    288
    -    def query_queue_time(self):
    
    289
    -        return self.__queued_time_duration.ToTimedelta()
    
    461
    +            else:
    
    462
    +                self._update_operation(operation, self.__operation_metadata,
    
    463
    +                                       execute_response=self.__execute_response,
    
    464
    +                                       done=self._done)
    
    290 465
     
    
    291
    -    def query_n_retries(self):
    
    292
    -        return self._n_tries - 1 if self._n_tries > 0 else 0
    466
    +        for peer, message_queue in self.__operations_message_queues.items():
    
    467
    +            if peer not in self.__operations_by_peer:
    
    468
    +                continue
    
    469
    +            elif peers and peer not in peers:
    
    470
    +                continue
    
    471
    +
    
    472
    +            operation = self.__operations_by_peer[peer]
    
    473
    +            # Messages are pairs of (Exception, Operation,):
    
    474
    +            if not notify_cancelled and operation.name in self.__operations_cancelled:
    
    475
    +                continue
    
    476
    +            elif operation.name not in self.__operations_cancelled:
    
    477
    +                message = (None, self._copy_operation(operation),)
    
    478
    +            else:
    
    479
    +                message = (CancelledError("Operation has been cancelled"),
    
    480
    +                           self._copy_operation(operation),)
    
    481
    +
    
    482
    +            message_queue.put(message)

  • buildgrid/server/operations/instance.py
    ... ... @@ -21,7 +21,7 @@ An instance of the LongRunningOperations Service.
    21 21
     
    
    22 22
     import logging
    
    23 23
     
    
    24
    -from buildgrid._exceptions import InvalidArgumentError
    
    24
    +from buildgrid._exceptions import InvalidArgumentError, NotFoundError
    
    25 25
     from buildgrid._protos.google.longrunning import operations_pb2
    
    26 26
     
    
    27 27
     
    
    ... ... @@ -39,62 +39,43 @@ class OperationsInstance:
    39 39
         def register_instance_with_server(self, instance_name, server):
    
    40 40
             server.add_operations_instance(self, instance_name)
    
    41 41
     
    
    42
    -    def get_operation(self, name):
    
    43
    -        job = self._scheduler.jobs.get(name)
    
    42
    +    def get_operation(self, job_name):
    
    43
    +        try:
    
    44
    +            operation = self._scheduler.get_job_operation(job_name)
    
    44 45
     
    
    45
    -        if job is None:
    
    46
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    46
    +        except NotFoundError:
    
    47
    +            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
    
    47 48
     
    
    48
    -        else:
    
    49
    -            return job.operation
    
    49
    +        return operation
    
    50 50
     
    
    51 51
         def list_operations(self, list_filter, page_size, page_token):
    
    52 52
             # TODO: Pages
    
    53 53
             # Spec says number of pages and length of a page are optional
    
    54 54
             response = operations_pb2.ListOperationsResponse()
    
    55
    +
    
    56
    +        operation_names = [operation_name for job_name in
    
    57
    +                           self._scheduler.list_current_jobs() for operation_name in
    
    58
    +                           self._scheduler.list_job_operations(job_name)]
    
    59
    +
    
    55 60
             operations = []
    
    56
    -        for job in self._scheduler.list_jobs():
    
    57
    -            op = operations_pb2.Operation()
    
    58
    -            op.CopyFrom(job.operation)
    
    59
    -            operations.append(op)
    
    61
    +        for operation_name in operation_names:
    
    62
    +            operation = self._scheduler.get_job_operation(operation_name)
    
    63
    +            operations.append(operation)
    
    60 64
     
    
    61 65
             response.operations.extend(operations)
    
    62 66
     
    
    63 67
             return response
    
    64 68
     
    
    65
    -    def delete_operation(self, name):
    
    66
    -        try:
    
    67
    -            self._scheduler.jobs.pop(name)
    
    68
    -
    
    69
    -        except KeyError:
    
    70
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    71
    -
    
    72
    -    def cancel_operation(self, name):
    
    69
    +    def delete_operation(self, job_name):
    
    73 70
             try:
    
    74
    -            self._scheduler.cancel_job_operation(name)
    
    71
    +            self._scheduler.delete_job_operation(job_name)
    
    75 72
     
    
    76
    -        except KeyError:
    
    77
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    73
    +        except NotFoundError:
    
    74
    +            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
    
    78 75
     
    
    79
    -    def register_message_client(self, name, queue):
    
    76
    +    def cancel_operation(self, job_name):
    
    80 77
             try:
    
    81
    -            self._scheduler.register_client(name, queue)
    
    82
    -
    
    83
    -        except KeyError:
    
    84
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    85
    -
    
    86
    -    def unregister_message_client(self, name, queue):
    
    87
    -        try:
    
    88
    -            self._scheduler.unregister_client(name, queue)
    
    89
    -
    
    90
    -        except KeyError:
    
    91
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    92
    -
    
    93
    -    def stream_operation_updates(self, message_queue, operation_name):
    
    94
    -        job = message_queue.get()
    
    95
    -        while not job.operation.done:
    
    96
    -            yield job.operation
    
    97
    -            job = message_queue.get()
    
    98
    -            job.check_operation_status()
    
    78
    +            self._scheduler.cancel_job_operation(job_name)
    
    99 79
     
    
    100
    -        yield job.operation
    80
    +        except NotFoundError:
    
    81
    +            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))

  • buildgrid/server/scheduler.py
    ... ... @@ -19,12 +19,13 @@ Scheduler
    19 19
     Schedules jobs.
    
    20 20
     """
    
    21 21
     
    
    22
    -from collections import deque
    
    22
    +import bisect
    
    23 23
     from datetime import timedelta
    
    24 24
     import logging
    
    25 25
     
    
    26 26
     from buildgrid._enums import LeaseState, OperationStage
    
    27 27
     from buildgrid._exceptions import NotFoundError
    
    28
    +from buildgrid.server.job import Job
    
    28 29
     
    
    29 30
     
    
    30 31
     class Scheduler:
    
    ... ... @@ -42,8 +43,12 @@ class Scheduler:
    42 43
             self.__retries_count = 0
    
    43 44
     
    
    44 45
             self._action_cache = action_cache
    
    45
    -        self.jobs = {}
    
    46
    -        self.queue = deque()
    
    46
    +
    
    47
    +        self.__jobs_by_action = {}  # Action to Job 1:1 mapping
    
    48
    +        self.__jobs_by_operation = {}  # Operation to Job 1:1 mapping
    
    49
    +        self.__jobs_by_name = {}  # Name to Job 1:1 mapping
    
    50
    +
    
    51
    +        self.__queue = []
    
    47 52
     
    
    48 53
             self._is_instrumented = monitor
    
    49 54
     
    
    ... ... @@ -52,61 +57,189 @@ class Scheduler:
    52 57
     
    
    53 58
         # --- Public API ---
    
    54 59
     
    
    55
    -    def register_client(self, job_name, queue):
    
    56
    -        job = self.jobs[job_name]
    
    60
    +    def list_current_jobs(self):
    
    61
    +        """Returns a list of the :class:`Job` names currently managed."""
    
    62
    +        return self.__jobs_by_name.keys()
    
    63
    +
    
    64
    +    def list_job_operations(self, job_name):
    
    65
    +        """Returns a list of :class:`Operation` names for a :class:`Job`."""
    
    66
    +        if job_name in self.__jobs_by_name:
    
    67
    +            return self.__jobs_by_name[job_name].list_operations()
    
    68
    +        else:
    
    69
    +            return []
    
    57 70
     
    
    58
    -        job.register_client(queue)
    
    71
    +    # --- Public API: REAPI ---
    
    59 72
     
    
    60
    -    def unregister_client(self, job_name, queue):
    
    61
    -        job = self.jobs[job_name]
    
    73
    +    def register_job_operation_peer(self, operation_name, peer, message_queue):
    
    74
    +        """Subscribes to one of the job's :class:`Operation` stage changes.
    
    62 75
     
    
    63
    -        job.unregister_client(queue)
    
    76
    +        Args:
    
    77
    +            operation_name (str): name of the operation to subscribe to.
    
    78
    +            peer (str): a unique string identifying the client.
    
    79
    +            message_queue (queue.Queue): the event queue to register.
    
    80
    +
    
    81
    +        Returns:
    
    82
    +            str: The name of the subscribed :class:`Operation`.
    
    83
    +
    
    84
    +        Raises:
    
    85
    +            NotFoundError: If no operation with `operation_name` exists.
    
    86
    +        """
    
    87
    +        if operation_name in self.__jobs_by_operation:
    
    88
    +            job = self.__jobs_by_operation[operation_name]
    
    64 89
     
    
    65
    -        if not job.n_clients and job.operation.done and not job.lease:
    
    90
    +        elif operation_name in self.__jobs_by_name:
    
    91
    +            job = self.__jobs_by_name[operation_name]
    
    92
    +
    
    93
    +        else:
    
    94
    +            raise NotFoundError("Operation name does not exist: [{}]"
    
    95
    +                                .format(operation_name))
    
    96
    +
    
    97
    +        operation_name = job.register_operation_peer(peer, message_queue)
    
    98
    +
    
    99
    +        self.__jobs_by_operation[operation_name] = job
    
    100
    +
    
    101
    +        return operation_name
    
    102
    +
    
    103
    +    def unregister_job_operation_peer(self, operation_name, peer):
    
    104
    +        """Unsubscribes to one of the job's :class:`Operation` stage change.
    
    105
    +
    
    106
    +        Args:
    
    107
    +            operation_name (str): name of the operation to unsubscribe from.
    
    108
    +            peer (str): a unique string identifying the client.
    
    109
    +
    
    110
    +        Raises:
    
    111
    +            NotFoundError: If no operation with `operation_name` exists.
    
    112
    +        """
    
    113
    +        if operation_name in self.__jobs_by_operation:
    
    114
    +            job = self.__jobs_by_operation[operation_name]
    
    115
    +
    
    116
    +        else:
    
    117
    +            raise NotFoundError("Operation name does not exist: [{}]"
    
    118
    +                                .format(operation_name))
    
    119
    +
    
    120
    +        if operation_name in self.__jobs_by_operation:
    
    121
    +            del self.__jobs_by_operation[operation_name]
    
    122
    +
    
    123
    +        job.unregister_operation_peer(peer)
    
    124
    +
    
    125
    +        if not job.n_peers and job.done and not job.lease:
    
    66 126
                 self._delete_job(job.name)
    
    67 127
     
    
    68
    -    def queue_job(self, job, skip_cache_lookup=False):
    
    69
    -        self.jobs[job.name] = job
    
    128
    +    def queue_job_operation(self, action, action_digest, priority=0, skip_cache_lookup=False):
    
    129
    +        """Inserts a newly created job into the execution queue.
    
    130
    +
    
    131
    +        Warning:
    
    132
    +            Priority is handle like a POSIX ``nice`` values: a higher value
    
    133
    +            means a low priority, 0 being default priority.
    
    134
    +
    
    135
    +        Args:
    
    136
    +            action (Action): the given action to queue for execution.
    
    137
    +            action_digest (Digest): the digest of the given action.
    
    138
    +            priority (int): the execution job's priority.
    
    139
    +            skip_cache_lookup (bool): whether or not to look for pre-computed
    
    140
    +                result for the given action.
    
    141
    +
    
    142
    +        Returns:
    
    143
    +            str: the newly created operation's name.
    
    144
    +        """
    
    145
    +        if action_digest.hash in self.__jobs_by_action:
    
    146
    +            job = self.__jobs_by_action[action_digest.hash]
    
    147
    +
    
    148
    +            # Reschedule if priority is now greater:
    
    149
    +            if priority < job.priority:
    
    150
    +                job.priority = priority
    
    151
    +
    
    152
    +                if job.operation_stage == OperationStage.QUEUED:
    
    153
    +                    self._queue_job(job.name)
    
    154
    +
    
    155
    +            return job.name
    
    156
    +
    
    157
    +        job = Job(action, action_digest, priority=priority)
    
    158
    +
    
    159
    +        self.__jobs_by_action[job.action_digest.hash] = job
    
    160
    +        self.__jobs_by_name[job.name] = job
    
    70 161
     
    
    71 162
             operation_stage = None
    
    163
    +
    
    72 164
             if self._action_cache is not None and not skip_cache_lookup:
    
    73 165
                 try:
    
    74 166
                     action_result = self._action_cache.get_action_result(job.action_digest)
    
    167
    +
    
    75 168
                 except NotFoundError:
    
    76 169
                     operation_stage = OperationStage.QUEUED
    
    77
    -                self.queue.append(job)
    
    170
    +                self._queue_job(job.name)
    
    78 171
     
    
    79 172
                 else:
    
    80
    -                job.set_cached_result(action_result)
    
    81 173
                     operation_stage = OperationStage.COMPLETED
    
    174
    +                job.set_cached_result(action_result)
    
    82 175
     
    
    83 176
                     if self._is_instrumented:
    
    84 177
                         self.__retries_count += 1
    
    85 178
     
    
    86 179
             else:
    
    87 180
                 operation_stage = OperationStage.QUEUED
    
    88
    -            self.queue.append(job)
    
    181
    +            self._queue_job(job.name)
    
    89 182
     
    
    90 183
             self._update_job_operation_stage(job.name, operation_stage)
    
    91 184
     
    
    92
    -    def retry_job(self, job_name):
    
    93
    -        job = self.jobs[job_name]
    
    185
    +        return job.name
    
    94 186
     
    
    95
    -        operation_stage = None
    
    96
    -        if job.n_tries >= self.MAX_N_TRIES:
    
    97
    -            # TODO: Decide what to do with these jobs
    
    98
    -            operation_stage = OperationStage.COMPLETED
    
    99
    -            # TODO: Mark these jobs as done
    
    187
    +    def get_job_operation(self, operation_name):
    
    188
    +        """Retrieves a job's :class:`Operation` by name.
    
    100 189
     
    
    101
    -        else:
    
    102
    -            operation_stage = OperationStage.QUEUED
    
    103
    -            job.update_lease_state(LeaseState.PENDING)
    
    104
    -            self.queue.append(job)
    
    190
    +        Args:
    
    191
    +            operation_name (str): name of the operation to query.
    
    105 192
     
    
    106
    -        self._update_job_operation_stage(job_name, operation_stage)
    
    193
    +        Raises:
    
    194
    +            NotFoundError: If no operation with `operation_name` exists.
    
    195
    +        """
    
    196
    +        try:
    
    197
    +            job = self.__jobs_by_operation[operation_name]
    
    107 198
     
    
    108
    -    def list_jobs(self):
    
    109
    -        return self.jobs.values()
    
    199
    +        except KeyError:
    
    200
    +            raise NotFoundError("Operation name does not exist: [{}]"
    
    201
    +                                .format(operation_name))
    
    202
    +
    
    203
    +        return job.get_operation(operation_name)
    
    204
    +
    
    205
    +    def cancel_job_operation(self, operation_name):
    
    206
    +        """"Cancels a job's :class:`Operation` by name.
    
    207
    +
    
    208
    +        Args:
    
    209
    +            operation_name (str): name of the operation to cancel.
    
    210
    +
    
    211
    +        Raises:
    
    212
    +            NotFoundError: If no operation with `operation_name` exists.
    
    213
    +        """
    
    214
    +        try:
    
    215
    +            job = self.__jobs_by_operation[operation_name]
    
    216
    +
    
    217
    +        except KeyError:
    
    218
    +            raise NotFoundError("Operation name does not exist: [{}]"
    
    219
    +                                .format(operation_name))
    
    220
    +
    
    221
    +        job.cancel_operation_for_peer(operation_name)
    
    222
    +
    
    223
    +    def delete_job_operation(self, operation_name):
    
    224
    +        """"Removes a job.
    
    225
    +
    
    226
    +        Args:
    
    227
    +            operation_name (str): name of the operation to delete.
    
    228
    +
    
    229
    +        Raises:
    
    230
    +            NotFoundError: If no operation with `operation_name` exists.
    
    231
    +        """
    
    232
    +        try:
    
    233
    +            job = self.__jobs_by_operation[operation_name]
    
    234
    +
    
    235
    +        except KeyError:
    
    236
    +            raise NotFoundError("Operation name does not exist: [{}]"
    
    237
    +                                .format(operation_name))
    
    238
    +
    
    239
    +        if not job.n_peers and job.done and not job.lease:
    
    240
    +            self._delete_job(job.name)
    
    241
    +
    
    242
    +    # --- Public API: RWAPI ---
    
    110 243
     
    
    111 244
         def request_job_leases(self, worker_capabilities):
    
    112 245
             """Generates a list of the highest priority leases to be run.
    
    ... ... @@ -115,11 +248,14 @@ class Scheduler:
    115 248
                 worker_capabilities (dict): a set of key-value pairs decribing the
    
    116 249
                     worker properties, configuration and state at the time of the
    
    117 250
                     request.
    
    251
    +
    
    252
    +        Warning: Worker capabilities handling is not implemented at the moment!
    
    118 253
             """
    
    119
    -        if not self.queue:
    
    254
    +        if not self.__queue:
    
    120 255
                 return []
    
    121 256
     
    
    122
    -        job = self.queue.popleft()
    
    257
    +        # TODO: Try to match worker_capabilities with jobs properties.
    
    258
    +        job = self.__queue.pop()
    
    123 259
     
    
    124 260
             lease = job.lease
    
    125 261
     
    
    ... ... @@ -132,18 +268,25 @@ class Scheduler:
    132 268
     
    
    133 269
             return None
    
    134 270
     
    
    135
    -    def update_job_lease(self, lease):
    
    271
    +    def update_job_lease_state(self, job_name, lease):
    
    136 272
             """Requests a state transition for a job's current :class:Lease.
    
    137 273
     
    
    274
    +        Note:
    
    275
    +            This may trigger a job's :class:`Operation` stage transition.
    
    276
    +
    
    138 277
             Args:
    
    139
    -            job_name (str): name of the job to query.
    
    140
    -            lease_state (LeaseState): the lease state to transition to.
    
    141
    -            lease_status (google.rpc.Status): the lease execution status, only
    
    142
    -                required if `lease_state` is `COMPLETED`.
    
    143
    -            lease_result (google.protobuf.Any): the lease execution result, only
    
    144
    -                required if `lease_state` is `COMPLETED`.
    
    278
    +            job_name (str): name of the job to update lease state from.
    
    279
    +            lease (Lease): the lease holding the new state.
    
    280
    +
    
    281
    +        Raises:
    
    282
    +            NotFoundError: If no job with `job_name` exists.
    
    145 283
             """
    
    146
    -        job = self.jobs[lease.id]
    
    284
    +        try:
    
    285
    +            job = self.__jobs_by_name[job_name]
    
    286
    +
    
    287
    +        except KeyError:
    
    288
    +            raise NotFoundError("Job name does not exist: [{}]".format(job_name))
    
    289
    +
    
    147 290
             lease_state = LeaseState(lease.state)
    
    148 291
     
    
    149 292
             operation_stage = None
    
    ... ... @@ -179,38 +322,93 @@ class Scheduler:
    179 322
                     self.__leases_by_state[LeaseState.ACTIVE].discard(lease.id)
    
    180 323
                     self.__leases_by_state[LeaseState.COMPLETED].add(lease.id)
    
    181 324
     
    
    182
    -        self._update_job_operation_stage(lease.id, operation_stage)
    
    325
    +        self._update_job_operation_stage(job_name, operation_stage)
    
    326
    +
    
    327
    +    def retry_job_lease(self, job_name):
    
    328
    +        """Re-queues a job on lease execution failure.
    
    329
    +
    
    330
    +        Note:
    
    331
    +            This may trigger a job's :class:`Operation` stage transition.
    
    332
    +
    
    333
    +        Args:
    
    334
    +            job_name (str): name of the job to retry the lease from.
    
    335
    +
    
    336
    +        Raises:
    
    337
    +            NotFoundError: If no job with `job_name` exists.
    
    338
    +        """
    
    339
    +        try:
    
    340
    +            job = self.__jobs_by_name[job_name]
    
    341
    +
    
    342
    +        except KeyError:
    
    343
    +            raise NotFoundError("Job name does not exist: [{}]".format(job_name))
    
    344
    +
    
    345
    +        operation_stage = None
    
    346
    +        if job.n_tries >= self.MAX_N_TRIES:
    
    347
    +            # TODO: Decide what to do with these jobs
    
    348
    +            operation_stage = OperationStage.COMPLETED
    
    349
    +            # TODO: Mark these jobs as done
    
    350
    +
    
    351
    +        else:
    
    352
    +            operation_stage = OperationStage.QUEUED
    
    353
    +            self._queue_job(job.name)
    
    354
    +
    
    355
    +            job.update_lease_state(LeaseState.PENDING)
    
    356
    +
    
    357
    +        self._update_job_operation_stage(job_name, operation_stage)
    
    183 358
     
    
    184 359
         def get_job_lease(self, job_name):
    
    185
    -        """Returns the lease associated to job, if any have been emitted yet."""
    
    186
    -        return self.jobs[job_name].lease
    
    360
    +        """Returns the lease associated to job, if any have been emitted yet.
    
    187 361
     
    
    188
    -    def get_job_lease_cancelled(self, job_name):
    
    189
    -        """Returns true if the lease is cancelled"""
    
    190
    -        return self.jobs[job_name].lease_cancelled
    
    362
    +        Args:
    
    363
    +            job_name (str): name of the job to query the lease from.
    
    364
    +
    
    365
    +        Raises:
    
    366
    +            NotFoundError: If no job with `job_name` exists.
    
    367
    +        """
    
    368
    +        try:
    
    369
    +            job = self.__jobs_by_name[job_name]
    
    370
    +
    
    371
    +        except KeyError:
    
    372
    +            raise NotFoundError("Job name does not exist: [{}]".format(job_name))
    
    373
    +
    
    374
    +        return job.lease
    
    191 375
     
    
    192 376
         def delete_job_lease(self, job_name):
    
    193
    -        """Discards the lease associated to a job."""
    
    194
    -        job = self.jobs[job_name]
    
    377
    +        """Discards the lease associated with a job.
    
    195 378
     
    
    196
    -        self.jobs[job.name].delete_lease()
    
    379
    +        Args:
    
    380
    +            job_name (str): name of the job to delete the lease from.
    
    197 381
     
    
    198
    -        if not job.n_clients and job.operation.done:
    
    199
    -            self._delete_job(job.name)
    
    382
    +        Raises:
    
    383
    +            NotFoundError: If no job with `job_name` exists.
    
    384
    +        """
    
    385
    +        try:
    
    386
    +            job = self.__jobs_by_name[job_name]
    
    387
    +
    
    388
    +        except KeyError:
    
    389
    +            raise NotFoundError("Job name does not exist: [{}]".format(job_name))
    
    200 390
     
    
    201
    -    def get_job_operation(self, job_name):
    
    202
    -        """Returns the operation associated to job."""
    
    203
    -        return self.jobs[job_name].operation
    
    391
    +        job.delete_lease()
    
    204 392
     
    
    205
    -    def cancel_job_operation(self, job_name):
    
    206
    -        """"Cancels the underlying operation of a given job.
    
    393
    +        if not job.n_peers and job.operation.done:
    
    394
    +            self._delete_job(job.name)
    
    207 395
     
    
    208
    -        This will also cancel any job's lease that may have been issued.
    
    396
    +    def get_job_lease_cancelled(self, job_name):
    
    397
    +        """Returns true if the lease is cancelled.
    
    209 398
     
    
    210 399
             Args:
    
    211
    -            job_name (str): name of the job holding the operation to cancel.
    
    400
    +            job_name (str): name of the job to query the lease state from.
    
    401
    +
    
    402
    +        Raises:
    
    403
    +            NotFoundError: If no job with `job_name` exists.
    
    212 404
             """
    
    213
    -        self.jobs[job_name].cancel_operation()
    
    405
    +        try:
    
    406
    +            job = self.__jobs_by_name[job_name]
    
    407
    +
    
    408
    +        except KeyError:
    
    409
    +            raise NotFoundError("Job name does not exist: [{}]".format(job_name))
    
    410
    +
    
    411
    +        return job.lease_cancelled
    
    214 412
     
    
    215 413
         # --- Public API: Monitoring ---
    
    216 414
     
    
    ... ... @@ -260,11 +458,11 @@ class Scheduler:
    260 458
                 self.__build_metadata_queues.append(message_queue)
    
    261 459
     
    
    262 460
         def query_n_jobs(self):
    
    263
    -        return len(self.jobs)
    
    461
    +        return len(self.__jobs_by_name)
    
    264 462
     
    
    265 463
         def query_n_operations(self):
    
    266 464
             # For now n_operations == n_jobs:
    
    267
    -        return len(self.jobs)
    
    465
    +        return len(self.__jobs_by_operation)
    
    268 466
     
    
    269 467
         def query_n_operations_by_stage(self, operation_stage):
    
    270 468
             try:
    
    ... ... @@ -275,7 +473,7 @@ class Scheduler:
    275 473
             return 0
    
    276 474
     
    
    277 475
         def query_n_leases(self):
    
    278
    -        return len(self.jobs)
    
    476
    +        return len(self.__jobs_by_name)
    
    279 477
     
    
    280 478
         def query_n_leases_by_state(self, lease_state):
    
    281 479
             try:
    
    ... ... @@ -295,19 +493,35 @@ class Scheduler:
    295 493
     
    
    296 494
         # --- Private API ---
    
    297 495
     
    
    496
    +    def _queue_job(self, job_name):
    
    497
    +        """Schedules or reschedules a job."""
    
    498
    +        job = self.__jobs_by_name[job_name]
    
    499
    +
    
    500
    +        if job.operation_stage == OperationStage.QUEUED:
    
    501
    +            self.__queue.sort()
    
    502
    +
    
    503
    +        else:
    
    504
    +            bisect.insort(self.__queue, job)
    
    505
    +
    
    298 506
         def _delete_job(self, job_name):
    
    299 507
             """Drops an entry from the internal list of jobs."""
    
    300
    -        del self.jobs[job_name]
    
    508
    +        job = self.__jobs_by_name[job_name]
    
    509
    +
    
    510
    +        if job.operation_stage == OperationStage.QUEUED:
    
    511
    +            self.__queue.remove(job)
    
    512
    +
    
    513
    +        del self.__jobs_by_action[job.action_digest.hash]
    
    514
    +        del self.__jobs_by_name[job.name]
    
    301 515
     
    
    302 516
             if self._is_instrumented:
    
    303
    -            self.__operations_by_stage[OperationStage.CACHE_CHECK].discard(job_name)
    
    304
    -            self.__operations_by_stage[OperationStage.QUEUED].discard(job_name)
    
    305
    -            self.__operations_by_stage[OperationStage.EXECUTING].discard(job_name)
    
    306
    -            self.__operations_by_stage[OperationStage.COMPLETED].discard(job_name)
    
    517
    +            self.__operations_by_stage[OperationStage.CACHE_CHECK].discard(job.name)
    
    518
    +            self.__operations_by_stage[OperationStage.QUEUED].discard(job.name)
    
    519
    +            self.__operations_by_stage[OperationStage.EXECUTING].discard(job.name)
    
    520
    +            self.__operations_by_stage[OperationStage.COMPLETED].discard(job.name)
    
    307 521
     
    
    308
    -            self.__leases_by_state[LeaseState.PENDING].discard(job_name)
    
    309
    -            self.__leases_by_state[LeaseState.ACTIVE].discard(job_name)
    
    310
    -            self.__leases_by_state[LeaseState.COMPLETED].discard(job_name)
    
    522
    +            self.__leases_by_state[LeaseState.PENDING].discard(job.name)
    
    523
    +            self.__leases_by_state[LeaseState.ACTIVE].discard(job.name)
    
    524
    +            self.__leases_by_state[LeaseState.COMPLETED].discard(job.name)
    
    311 525
     
    
    312 526
         def _update_job_operation_stage(self, job_name, operation_stage):
    
    313 527
             """Requests a stage transition for the job's :class:Operations.
    
    ... ... @@ -316,7 +530,7 @@ class Scheduler:
    316 530
                 job_name (str): name of the job to query.
    
    317 531
                 operation_stage (OperationStage): the stage to transition to.
    
    318 532
             """
    
    319
    -        job = self.jobs[job_name]
    
    533
    +        job = self.__jobs_by_name[job_name]
    
    320 534
     
    
    321 535
             if operation_stage == OperationStage.CACHE_CHECK:
    
    322 536
                 job.update_operation_stage(OperationStage.CACHE_CHECK)
    
    ... ... @@ -365,7 +579,7 @@ class Scheduler:
    365 579
     
    
    366 580
                     self.__queue_time_average = average_order, average_time
    
    367 581
     
    
    368
    -                if not job.holds_cached_action_result:
    
    582
    +                if not job.holds_cached_result:
    
    369 583
                         execution_metadata = job.action_result.execution_metadata
    
    370 584
                         context_metadata = {'job-is': job.name}
    
    371 585
     
    

  • docs/source/conf.py
    ... ... @@ -182,3 +182,11 @@ texinfo_documents = [
    182 182
          author, 'BuildGrid', 'One line description of project.',
    
    183 183
          'Miscellaneous'),
    
    184 184
     ]
    
    185
    +
    
    186
    +# -- Options for the autodoc extension ----------------------------------------
    
    187
    +
    
    188
    +# This value selects if automatically documented members are sorted
    
    189
    +# alphabetical (value 'alphabetical'), by member type (value 'groupwise') or
    
    190
    +# by source order (value 'bysource'). The default is alphabetical.
    
    191
    +autodoc_member_order = 'bysource'
    
    192
    +

  • tests/integration/bots_service.py
    ... ... @@ -25,7 +25,6 @@ import pytest
    25 25
     
    
    26 26
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    27 27
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    28
    -from buildgrid.server import job
    
    29 28
     from buildgrid.server.controller import ExecutionController
    
    30 29
     from buildgrid.server.job import LeaseState
    
    31 30
     from buildgrid.server.bots import service
    
    ... ... @@ -159,7 +158,8 @@ def test_post_bot_event_temp(context, instance):
    159 158
     def _inject_work(scheduler, action=None, action_digest=None):
    
    160 159
         if not action:
    
    161 160
             action = remote_execution_pb2.Action()
    
    161
    +
    
    162 162
         if not action_digest:
    
    163 163
             action_digest = remote_execution_pb2.Digest()
    
    164
    -    j = job.Job(action, action_digest)
    
    165
    -    scheduler.queue_job(j, True)
    164
    +
    
    165
    +    scheduler.queue_job_operation(action, action_digest, skip_cache_lookup=True)

  • tests/integration/execution_service.py
    ... ... @@ -20,11 +20,11 @@
    20 20
     import uuid
    
    21 21
     from unittest import mock
    
    22 22
     
    
    23
    -from google.protobuf import any_pb2
    
    24 23
     import grpc
    
    25 24
     from grpc._server import _Context
    
    26 25
     import pytest
    
    27 26
     
    
    27
    +from buildgrid._enums import OperationStage
    
    28 28
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    29 29
     from buildgrid._protos.google.longrunning import operations_pb2
    
    30 30
     
    
    ... ... @@ -82,7 +82,7 @@ def test_execute(skip_cache_lookup, instance, context):
    82 82
         assert isinstance(result, operations_pb2.Operation)
    
    83 83
         metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    84 84
         result.metadata.Unpack(metadata)
    
    85
    -    assert metadata.stage == job.OperationStage.QUEUED.value
    
    85
    +    assert metadata.stage == OperationStage.QUEUED.value
    
    86 86
         operation_uuid = result.name.split('/')[-1]
    
    87 87
         assert uuid.UUID(operation_uuid, version=4)
    
    88 88
         assert result.done is False
    
    ... ... @@ -106,18 +106,14 @@ def test_no_action_digest_in_storage(instance, context):
    106 106
     
    
    107 107
     
    
    108 108
     def test_wait_execution(instance, controller, context):
    
    109
    -    j = job.Job(action, action_digest)
    
    110
    -    j._operation.done = True
    
    109
    +    job_name = controller.execution_instance._scheduler.queue_job_operation(action,
    
    110
    +                                                                            action_digest,
    
    111
    +                                                                            skip_cache_lookup=True)
    
    111 112
     
    
    112
    -    request = remote_execution_pb2.WaitExecutionRequest(name=j.name)
    
    113
    +    controller.execution_instance._scheduler._update_job_operation_stage(job_name,
    
    114
    +                                                                         OperationStage.COMPLETED)
    
    113 115
     
    
    114
    -    controller.execution_instance._scheduler.jobs[j.name] = j
    
    115
    -
    
    116
    -    action_result_any = any_pb2.Any()
    
    117
    -    action_result = remote_execution_pb2.ActionResult()
    
    118
    -    action_result_any.Pack(action_result)
    
    119
    -
    
    120
    -    j.update_operation_stage(job.OperationStage.COMPLETED)
    
    116
    +    request = remote_execution_pb2.WaitExecutionRequest(name=job_name)
    
    121 117
     
    
    122 118
         response = instance.WaitExecution(request, context)
    
    123 119
     
    
    ... ... @@ -127,7 +123,6 @@ def test_wait_execution(instance, controller, context):
    127 123
         metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    128 124
         result.metadata.Unpack(metadata)
    
    129 125
         assert metadata.stage == job.OperationStage.COMPLETED.value
    
    130
    -    assert uuid.UUID(result.name, version=4)
    
    131 126
         assert result.done is True
    
    132 127
     
    
    133 128
     
    

  • tests/integration/operations_service.py
    ... ... @@ -17,6 +17,7 @@
    17 17
     
    
    18 18
     # pylint: disable=redefined-outer-name
    
    19 19
     
    
    20
    +import queue
    
    20 21
     from unittest import mock
    
    21 22
     
    
    22 23
     from google.protobuf import any_pb2
    
    ... ... @@ -86,8 +87,13 @@ def blank_instance(controller):
    86 87
     
    
    87 88
     # Queue an execution, get operation corresponding to that request
    
    88 89
     def test_get_operation(instance, controller, execute_request, context):
    
    89
    -    response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    90
    -                                                             execute_request.skip_cache_lookup)
    
    90
    +    job_name = controller.execution_instance.execute(execute_request.action_digest,
    
    91
    +                                                     execute_request.skip_cache_lookup)
    
    92
    +
    
    93
    +    message_queue = queue.Queue()
    
    94
    +    operation_name = controller.execution_instance.register_operation_peer(job_name,
    
    95
    +                                                                           context.peer(),
    
    96
    +                                                                           message_queue)
    
    91 97
     
    
    92 98
         request = operations_pb2.GetOperationRequest()
    
    93 99
     
    
    ... ... @@ -95,25 +101,28 @@ def test_get_operation(instance, controller, execute_request, context):
    95 101
         # we're manually creating the instance here, it doesn't get a name.
    
    96 102
         # Therefore we need to manually add the instance name to the operation
    
    97 103
         # name in the GetOperation request.
    
    98
    -    request.name = "{}/{}".format(instance_name, response_execute.name)
    
    104
    +    request.name = "{}/{}".format(instance_name, operation_name)
    
    99 105
     
    
    100 106
         response = instance.GetOperation(request, context)
    
    101
    -    assert response.name == "{}/{}".format(instance_name, response_execute.name)
    
    102
    -    assert response.done == response_execute.done
    
    107
    +    assert response.name == "{}/{}".format(instance_name, operation_name)
    
    103 108
     
    
    104 109
     
    
    105 110
     # Queue an execution, get operation corresponding to that request
    
    106 111
     def test_get_operation_blank(blank_instance, controller, execute_request, context):
    
    107
    -    response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    108
    -                                                             execute_request.skip_cache_lookup)
    
    112
    +    job_name = controller.execution_instance.execute(execute_request.action_digest,
    
    113
    +                                                     execute_request.skip_cache_lookup)
    
    114
    +
    
    115
    +    message_queue = queue.Queue()
    
    116
    +    operation_name = controller.execution_instance.register_operation_peer(job_name,
    
    117
    +                                                                           context.peer(),
    
    118
    +                                                                           message_queue)
    
    109 119
     
    
    110 120
         request = operations_pb2.GetOperationRequest()
    
    111 121
     
    
    112
    -    request.name = response_execute.name
    
    122
    +    request.name = operation_name
    
    113 123
     
    
    114 124
         response = blank_instance.GetOperation(request, context)
    
    115
    -    assert response.name == response_execute.name
    
    116
    -    assert response.done == response_execute.done
    
    125
    +    assert response.name == operation_name
    
    117 126
     
    
    118 127
     
    
    119 128
     def test_get_operation_fail(instance, context):
    
    ... ... @@ -133,25 +142,35 @@ def test_get_operation_instance_fail(instance, context):
    133 142
     
    
    134 143
     
    
    135 144
     def test_list_operations(instance, controller, execute_request, context):
    
    136
    -    response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    137
    -                                                             execute_request.skip_cache_lookup)
    
    145
    +    job_name = controller.execution_instance.execute(execute_request.action_digest,
    
    146
    +                                                     execute_request.skip_cache_lookup)
    
    147
    +
    
    148
    +    message_queue = queue.Queue()
    
    149
    +    operation_name = controller.execution_instance.register_operation_peer(job_name,
    
    150
    +                                                                           context.peer(),
    
    151
    +                                                                           message_queue)
    
    138 152
     
    
    139 153
         request = operations_pb2.ListOperationsRequest(name=instance_name)
    
    140 154
         response = instance.ListOperations(request, context)
    
    141 155
     
    
    142 156
         names = response.operations[0].name.split('/')
    
    143 157
         assert names[0] == instance_name
    
    144
    -    assert names[1] == response_execute.name
    
    158
    +    assert names[1] == operation_name
    
    145 159
     
    
    146 160
     
    
    147 161
     def test_list_operations_blank(blank_instance, controller, execute_request, context):
    
    148
    -    response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    149
    -                                                             execute_request.skip_cache_lookup)
    
    162
    +    job_name = controller.execution_instance.execute(execute_request.action_digest,
    
    163
    +                                                     execute_request.skip_cache_lookup)
    
    164
    +
    
    165
    +    message_queue = queue.Queue()
    
    166
    +    operation_name = controller.execution_instance.register_operation_peer(job_name,
    
    167
    +                                                                           context.peer(),
    
    168
    +                                                                           message_queue)
    
    150 169
     
    
    151 170
         request = operations_pb2.ListOperationsRequest(name='')
    
    152 171
         response = blank_instance.ListOperations(request, context)
    
    153 172
     
    
    154
    -    assert response.operations[0].name.split('/')[-1] == response_execute.name
    
    173
    +    assert response.operations[0].name.split('/')[-1] == operation_name
    
    155 174
     
    
    156 175
     
    
    157 176
     def test_list_operations_instance_fail(instance, controller, execute_request, context):
    
    ... ... @@ -174,14 +193,19 @@ def test_list_operations_empty(instance, context):
    174 193
     
    
    175 194
     # Send execution off, delete, try to find operation should fail
    
    176 195
     def test_delete_operation(instance, controller, execute_request, context):
    
    177
    -    response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    178
    -                                                             execute_request.skip_cache_lookup)
    
    196
    +    job_name = controller.execution_instance.execute(execute_request.action_digest,
    
    197
    +                                                     execute_request.skip_cache_lookup)
    
    198
    +
    
    199
    +    message_queue = queue.Queue()
    
    200
    +    operation_name = controller.execution_instance.register_operation_peer(job_name,
    
    201
    +                                                                           context.peer(),
    
    202
    +                                                                           message_queue)
    
    179 203
     
    
    180 204
         request = operations_pb2.DeleteOperationRequest()
    
    181
    -    request.name = response_execute.name
    
    205
    +    request.name = operation_name
    
    182 206
         instance.DeleteOperation(request, context)
    
    183 207
     
    
    184
    -    request_name = "{}/{}".format(instance_name, response_execute.name)
    
    208
    +    request_name = "{}/{}".format(instance_name, operation_name)
    
    185 209
     
    
    186 210
         with pytest.raises(InvalidArgumentError):
    
    187 211
             controller.operations_instance.get_operation(request_name)
    
    ... ... @@ -189,17 +213,11 @@ def test_delete_operation(instance, controller, execute_request, context):
    189 213
     
    
    190 214
     # Send execution off, delete, try to find operation should fail
    
    191 215
     def test_delete_operation_blank(blank_instance, controller, execute_request, context):
    
    192
    -    response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    193
    -                                                             execute_request.skip_cache_lookup)
    
    194
    -
    
    195 216
         request = operations_pb2.DeleteOperationRequest()
    
    196
    -    request.name = response_execute.name
    
    217
    +    request.name = "runner"
    
    197 218
         blank_instance.DeleteOperation(request, context)
    
    198 219
     
    
    199
    -    request_name = response_execute.name
    
    200
    -
    
    201
    -    with pytest.raises(InvalidArgumentError):
    
    202
    -        controller.operations_instance.get_operation(request_name)
    
    220
    +    context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
    
    203 221
     
    
    204 222
     
    
    205 223
     def test_delete_operation_fail(instance, context):
    
    ... ... @@ -211,11 +229,16 @@ def test_delete_operation_fail(instance, context):
    211 229
     
    
    212 230
     
    
    213 231
     def test_cancel_operation(instance, controller, execute_request, context):
    
    214
    -    response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    215
    -                                                             execute_request.skip_cache_lookup)
    
    232
    +    job_name = controller.execution_instance.execute(execute_request.action_digest,
    
    233
    +                                                     execute_request.skip_cache_lookup)
    
    234
    +
    
    235
    +    message_queue = queue.Queue()
    
    236
    +    operation_name = controller.execution_instance.register_operation_peer(job_name,
    
    237
    +                                                                           context.peer(),
    
    238
    +                                                                           message_queue)
    
    216 239
     
    
    217 240
         request = operations_pb2.CancelOperationRequest()
    
    218
    -    request.name = "{}/{}".format(instance_name, response_execute.name)
    
    241
    +    request.name = "{}/{}".format(instance_name, operation_name)
    
    219 242
     
    
    220 243
         instance.CancelOperation(request, context)
    
    221 244
     
    
    ... ... @@ -238,7 +261,7 @@ def test_cancel_operation_blank(blank_instance, context):
    238 261
         context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
    
    239 262
     
    
    240 263
     
    
    241
    -def test_cancel_operation_instance_fail(instance, context):
    
    264
    +def test_cancel_operation__fail(instance, context):
    
    242 265
         request = operations_pb2.CancelOperationRequest()
    
    243 266
         instance.CancelOperation(request, context)
    
    244 267
     
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]