Martin Blanchard pushed to branch mablanch/75-requests-multiplexing at BuildGrid / buildgrid
Commits:
- 
33158b7f
by Valentin David at 2018-12-18T13:56:43Z
 - 
a05de331
by Martin Blanchard at 2018-12-18T15:34:36Z
 - 
939d3763
by Martin Blanchard at 2018-12-18T15:34:36Z
 - 
b1c0ef09
by Martin Blanchard at 2018-12-18T15:34:36Z
 - 
3bfd2b48
by Martin Blanchard at 2018-12-18T15:34:36Z
 - 
3369374c
by Martin Blanchard at 2018-12-18T15:34:36Z
 - 
0deda7b7
by Martin Blanchard at 2018-12-18T15:34:36Z
 - 
b2546189
by Martin Blanchard at 2018-12-18T15:34:36Z
 - 
a43da410
by Martin Blanchard at 2018-12-18T15:41:03Z
 - 
cb739d94
by Martin Blanchard at 2018-12-18T15:41:04Z
 
14 changed files:
- buildgrid/_app/commands/cmd_capabilities.py
 - buildgrid/_app/commands/cmd_cas.py
 - buildgrid/_app/commands/cmd_execute.py
 - buildgrid/_app/commands/cmd_operation.py
 - buildgrid/server/bots/instance.py
 - buildgrid/server/execution/instance.py
 - buildgrid/server/execution/service.py
 - buildgrid/server/job.py
 - buildgrid/server/operations/instance.py
 - buildgrid/server/scheduler.py
 - docs/source/conf.py
 - tests/integration/bots_service.py
 - tests/integration/execution_service.py
 - tests/integration/operations_service.py
 
Changes:
| ... | ... | @@ -43,7 +43,8 @@ def cli(context, remote, instance_name, auth_token, client_key, client_cert, ser | 
| 43 | 43 | 
     """Entry point for the bgd-capabilities CLI command group."""
 | 
| 44 | 44 | 
     try:
 | 
| 45 | 45 | 
         context.channel, _ = setup_channel(remote, auth_token=auth_token,
 | 
| 46 | 
-                                           client_key=client_key, client_cert=client_cert)
 | 
|
| 46 | 
+                                           client_key=client_key, client_cert=client_cert,
 | 
|
| 47 | 
+                                           server_cert=server_cert)
 | 
|
| 47 | 48 | 
 | 
| 48 | 49 | 
     except InvalidArgumentError as e:
 | 
| 49 | 50 | 
         click.echo("Error: {}.".format(e), err=True)
 | 
| ... | ... | @@ -52,7 +52,8 @@ def cli(context, remote, instance_name, auth_token, client_key, client_cert, ser | 
| 52 | 52 | 
     """Entry point for the bgd-cas CLI command group."""
 | 
| 53 | 53 | 
     try:
 | 
| 54 | 54 | 
         context.channel, _ = setup_channel(remote, auth_token=auth_token,
 | 
| 55 | 
-                                           client_key=client_key, client_cert=client_cert)
 | 
|
| 55 | 
+                                           client_key=client_key, client_cert=client_cert,
 | 
|
| 56 | 
+                                           server_cert=server_cert)
 | 
|
| 56 | 57 | 
 | 
| 57 | 58 | 
     except InvalidArgumentError as e:
 | 
| 58 | 59 | 
         click.echo("Error: {}.".format(e), err=True)
 | 
| ... | ... | @@ -53,7 +53,8 @@ def cli(context, remote, instance_name, auth_token, client_key, client_cert, ser | 
| 53 | 53 | 
     """Entry point for the bgd-execute CLI command group."""
 | 
| 54 | 54 | 
     try:
 | 
| 55 | 55 | 
         context.channel, _ = setup_channel(remote, auth_token=auth_token,
 | 
| 56 | 
-                                           client_key=client_key, client_cert=client_cert)
 | 
|
| 56 | 
+                                           client_key=client_key, client_cert=client_cert,
 | 
|
| 57 | 
+                                           server_cert=server_cert)
 | 
|
| 57 | 58 | 
 | 
| 58 | 59 | 
     except InvalidArgumentError as e:
 | 
| 59 | 60 | 
         click.echo("Error: {}.".format(e), err=True)
 | 
| ... | ... | @@ -56,7 +56,8 @@ def cli(context, remote, instance_name, auth_token, client_key, client_cert, ser | 
| 56 | 56 | 
     """Entry point for the bgd-operation CLI command group."""
 | 
| 57 | 57 | 
     try:
 | 
| 58 | 58 | 
         context.channel, _ = setup_channel(remote, auth_token=auth_token,
 | 
| 59 | 
-                                           client_key=client_key, client_cert=client_cert)
 | 
|
| 59 | 
+                                           client_key=client_key, client_cert=client_cert,
 | 
|
| 60 | 
+                                           server_cert=server_cert)
 | 
|
| 60 | 61 | 
 | 
| 61 | 62 | 
     except InvalidArgumentError as e:
 | 
| 62 | 63 | 
         click.echo("Error: {}.".format(e), err=True)
 | 
| ... | ... | @@ -123,7 +123,7 @@ class BotsInterface: | 
| 123 | 123 | 
             # Job does not exist, remove from bot.
 | 
| 124 | 124 | 
             return None
 | 
| 125 | 125 | 
 | 
| 126 | 
-        self._scheduler.update_job_lease(lease)
 | 
|
| 126 | 
+        self._scheduler.update_job_lease_state(lease.id, lease)
 | 
|
| 127 | 127 | 
 | 
| 128 | 128 | 
         if lease_state == LeaseState.COMPLETED:
 | 
| 129 | 129 | 
             return None
 | 
| ... | ... | @@ -161,7 +161,7 @@ class BotsInterface: | 
| 161 | 161 | 
                 self.__logger.error("Assigned lease id=[%s],"
 | 
| 162 | 162 | 
                                     " not found on bot with name=[%s] and id=[%s]."
 | 
| 163 | 163 | 
                                     " Retrying job", lease_id, bot_session.name, bot_session.bot_id)
 | 
| 164 | 
-                self._scheduler.retry_job(lease_id)
 | 
|
| 164 | 
+                self._scheduler.retry_job_lease(lease_id)
 | 
|
| 165 | 165 | 
 | 
| 166 | 166 | 
     def _close_bot_session(self, name):
 | 
| 167 | 167 | 
         """ Before removing the session, close any leases and
 | 
| ... | ... | @@ -174,7 +174,7 @@ class BotsInterface: | 
| 174 | 174 | 
 | 
| 175 | 175 | 
         self.__logger.debug("Attempting to close [%s] with name: [%s]", bot_id, name)
 | 
| 176 | 176 | 
         for lease_id in self._assigned_leases[name]:
 | 
| 177 | 
-            self._scheduler.retry_job(lease_id)
 | 
|
| 177 | 
+            self._scheduler.retry_job_lease(lease_id)
 | 
|
| 178 | 178 | 
         self._assigned_leases.pop(name)
 | 
| 179 | 179 | 
 | 
| 180 | 180 | 
         self.__logger.debug("Closing bot session: [%s]", name)
 | 
| ... | ... | @@ -21,11 +21,9 @@ An instance of the Remote Execution Service. | 
| 21 | 21 | 
 | 
| 22 | 22 | 
 import logging
 | 
| 23 | 23 | 
 | 
| 24 | 
-from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
 | 
|
| 24 | 
+from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, NotFoundError
 | 
|
| 25 | 25 | 
 from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
 | 
| 26 | 
-  | 
|
| 27 | 
-from ..job import Job
 | 
|
| 28 | 
-from ...utils import get_hash_type
 | 
|
| 26 | 
+from buildgrid.utils import get_hash_type
 | 
|
| 29 | 27 | 
 | 
| 30 | 28 | 
 | 
| 31 | 29 | 
 class ExecutionInstance:
 | 
| ... | ... | @@ -46,44 +44,45 @@ class ExecutionInstance: | 
| 46 | 44 | 
     def hash_type(self):
 | 
| 47 | 45 | 
         return get_hash_type()
 | 
| 48 | 46 | 
 | 
| 49 | 
-    def execute(self, action_digest, skip_cache_lookup, message_queue=None):
 | 
|
| 47 | 
+    def execute(self, action_digest, skip_cache_lookup):
 | 
|
| 50 | 48 | 
         """ Sends a job for execution.
 | 
| 51 | 49 | 
         Queues an action and creates an Operation instance to be associated with
 | 
| 52 | 50 | 
         this action.
 | 
| 53 | 51 | 
         """
 | 
| 54 | 
-  | 
|
| 55 | 52 | 
         action = self._storage.get_message(action_digest, Action)
 | 
| 56 | 53 | 
 | 
| 57 | 54 | 
         if not action:
 | 
| 58 | 55 | 
             raise FailedPreconditionError("Could not get action from storage.")
 | 
| 59 | 56 | 
 | 
| 60 | 
-        job = Job(action, action_digest)
 | 
|
| 61 | 
-        if message_queue is not None:
 | 
|
| 62 | 
-            job.register_client(message_queue)
 | 
|
| 57 | 
+        return self._scheduler.queue_job_operation(action, action_digest, skip_cache_lookup)
 | 
|
| 63 | 58 | 
 | 
| 64 | 
-        self._scheduler.queue_job(job, skip_cache_lookup)
 | 
|
| 59 | 
+    def register_operation_client(self, operation_name, peer, message_queue):
 | 
|
| 60 | 
+        try:
 | 
|
| 61 | 
+            return self._scheduler.register_job_operation_client(operation_name,
 | 
|
| 62 | 
+                                                                 peer, message_queue)
 | 
|
| 65 | 63 | 
 | 
| 66 | 
-        return job.operation
 | 
|
| 64 | 
+        except NotFoundError:
 | 
|
| 65 | 
+            raise InvalidArgumentError("Operation name does not exist: [{}]"
 | 
|
| 66 | 
+                                       .format(operation_name))
 | 
|
| 67 | 67 | 
 | 
| 68 | 
-    def register_message_client(self, name, queue):
 | 
|
| 68 | 
+    def unregister_operation_client(self, operation_name, peer):
 | 
|
| 69 | 69 | 
         try:
 | 
| 70 | 
-            self._scheduler.register_client(name, queue)
 | 
|
| 70 | 
+            self._scheduler.unregister_job_operation_client(operation_name, peer)
 | 
|
| 71 | 71 | 
 | 
| 72 | 
-        except KeyError:
 | 
|
| 73 | 
-            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
 | 
|
| 72 | 
+        except NotFoundError:
 | 
|
| 73 | 
+            raise InvalidArgumentError("Operation name does not exist: [{}]"
 | 
|
| 74 | 
+                                       .format(operation_name))
 | 
|
| 74 | 75 | 
 | 
| 75 | 
-    def unregister_message_client(self, name, queue):
 | 
|
| 76 | 
-        try:
 | 
|
| 77 | 
-            self._scheduler.unregister_client(name, queue)
 | 
|
| 76 | 
+    def stream_operation_updates(self, message_queue):
 | 
|
| 77 | 
+        error, operation = message_queue.get()
 | 
|
| 78 | 
+        if error is not None:
 | 
|
| 79 | 
+            raise error
 | 
|
| 78 | 80 | 
 | 
| 79 | 
-        except KeyError:
 | 
|
| 80 | 
-            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
 | 
|
| 81 | 
+        while not operation.done:
 | 
|
| 82 | 
+            yield operation
 | 
|
| 81 | 83 | 
 | 
| 82 | 
-    def stream_operation_updates(self, message_queue, operation_name):
 | 
|
| 83 | 
-        job = message_queue.get()
 | 
|
| 84 | 
-        while not job.operation.done:
 | 
|
| 85 | 
-            yield job.operation
 | 
|
| 86 | 
-            job = message_queue.get()
 | 
|
| 87 | 
-            job.check_operation_status()
 | 
|
| 84 | 
+            error, operation = message_queue.get()
 | 
|
| 85 | 
+            if error is not None:
 | 
|
| 86 | 
+                raise error
 | 
|
| 88 | 87 | 
 | 
| 89 | 
-        yield job.operation
 | 
|
| 88 | 
+        yield operation
 | 
| ... | ... | @@ -96,12 +96,15 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): | 
| 96 | 96 | 
 | 
| 97 | 97 | 
         try:
 | 
| 98 | 98 | 
             instance = self._get_instance(instance_name)
 | 
| 99 | 
-            operation = instance.execute(request.action_digest,
 | 
|
| 100 | 
-                                         request.skip_cache_lookup,
 | 
|
| 101 | 
-                                         message_queue)
 | 
|
| 99 | 
+  | 
|
| 100 | 
+            job_name = instance.execute(request.action_digest,
 | 
|
| 101 | 
+                                        request.skip_cache_lookup)
 | 
|
| 102 | 
+  | 
|
| 103 | 
+            operation_name = instance.register_operation_client(job_name,
 | 
|
| 104 | 
+                                                                peer, message_queue)
 | 
|
| 102 | 105 | 
 | 
| 103 | 106 | 
             context.add_callback(partial(self._rpc_termination_callback,
 | 
| 104 | 
-                                         peer, instance_name, operation.name, message_queue))
 | 
|
| 107 | 
+                                         peer, instance_name, operation_name))
 | 
|
| 105 | 108 | 
 | 
| 106 | 109 | 
             if self._is_instrumented:
 | 
| 107 | 110 | 
                 if peer not in self.__peers:
 | 
| ... | ... | @@ -110,16 +113,13 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): | 
| 110 | 113 | 
                 else:
 | 
| 111 | 114 | 
                     self.__peers[peer] += 1
 | 
| 112 | 115 | 
 | 
| 113 | 
-            instanced_op_name = "{}/{}".format(instance_name, operation.name)
 | 
|
| 116 | 
+            operation_full_name = "{}/{}".format(instance_name, operation_name)
 | 
|
| 114 | 117 | 
 | 
| 115 | 
-            self.__logger.info("Operation name: [%s]", instanced_op_name)
 | 
|
| 118 | 
+            self.__logger.info("Operation name: [%s]", operation_full_name)
 | 
|
| 116 | 119 | 
 | 
| 117 | 
-            for operation in instance.stream_operation_updates(message_queue,
 | 
|
| 118 | 
-                                                               operation.name):
 | 
|
| 119 | 
-                op = operations_pb2.Operation()
 | 
|
| 120 | 
-                op.CopyFrom(operation)
 | 
|
| 121 | 
-                op.name = instanced_op_name
 | 
|
| 122 | 
-                yield op
 | 
|
| 120 | 
+            for operation in instance.stream_operation_updates(message_queue):
 | 
|
| 121 | 
+                operation.name = operation_full_name
 | 
|
| 122 | 
+                yield operation
 | 
|
| 123 | 123 | 
 | 
| 124 | 124 | 
         except InvalidArgumentError as e:
 | 
| 125 | 125 | 
             self.__logger.error(e)
 | 
| ... | ... | @@ -157,9 +157,11 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): | 
| 157 | 157 | 
         try:
 | 
| 158 | 158 | 
             instance = self._get_instance(instance_name)
 | 
| 159 | 159 | 
 | 
| 160 | 
-            instance.register_message_client(operation_name, message_queue)
 | 
|
| 160 | 
+            operation_name = instance.register_operation_client(operation_name,
 | 
|
| 161 | 
+                                                                peer, message_queue)
 | 
|
| 162 | 
+  | 
|
| 161 | 163 | 
             context.add_callback(partial(self._rpc_termination_callback,
 | 
| 162 | 
-                                         peer, instance_name, operation_name, message_queue))
 | 
|
| 164 | 
+                                         peer, instance_name, operation_name))
 | 
|
| 163 | 165 | 
 | 
| 164 | 166 | 
             if self._is_instrumented:
 | 
| 165 | 167 | 
                 if peer not in self.__peers:
 | 
| ... | ... | @@ -168,12 +170,11 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): | 
| 168 | 170 | 
                 else:
 | 
| 169 | 171 | 
                     self.__peers[peer] += 1
 | 
| 170 | 172 | 
 | 
| 171 | 
-            for operation in instance.stream_operation_updates(message_queue,
 | 
|
| 172 | 
-                                                               operation_name):
 | 
|
| 173 | 
-                op = operations_pb2.Operation()
 | 
|
| 174 | 
-                op.CopyFrom(operation)
 | 
|
| 175 | 
-                op.name = request.name
 | 
|
| 176 | 
-                yield op
 | 
|
| 173 | 
+            operation_full_name = "{}/{}".format(instance_name, operation_name)
 | 
|
| 174 | 
+  | 
|
| 175 | 
+            for operation in instance.stream_operation_updates(message_queue):
 | 
|
| 176 | 
+                operation.name = operation_full_name
 | 
|
| 177 | 
+                yield operation
 | 
|
| 177 | 178 | 
 | 
| 178 | 179 | 
         except InvalidArgumentError as e:
 | 
| 179 | 180 | 
             self.__logger.error(e)
 | 
| ... | ... | @@ -208,10 +209,10 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): | 
| 208 | 209 | 
 | 
| 209 | 210 | 
     # --- Private API ---
 | 
| 210 | 211 | 
 | 
| 211 | 
-    def _rpc_termination_callback(self, peer, instance_name, job_name, message_queue):
 | 
|
| 212 | 
+    def _rpc_termination_callback(self, peer, instance_name, operation_name):
 | 
|
| 212 | 213 | 
         instance = self._get_instance(instance_name)
 | 
| 213 | 214 | 
 | 
| 214 | 
-        instance.unregister_message_client(job_name, message_queue)
 | 
|
| 215 | 
+        instance.unregister_operation_client(operation_name, peer)
 | 
|
| 215 | 216 | 
 | 
| 216 | 217 | 
         if self._is_instrumented:
 | 
| 217 | 218 | 
             if self.__peers[peer] > 1:
 | 
| ... | ... | @@ -20,7 +20,7 @@ import uuid | 
| 20 | 20 | 
 from google.protobuf import duration_pb2, timestamp_pb2
 | 
| 21 | 21 | 
 | 
| 22 | 22 | 
 from buildgrid._enums import LeaseState, OperationStage
 | 
| 23 | 
-from buildgrid._exceptions import CancelledError
 | 
|
| 23 | 
+from buildgrid._exceptions import CancelledError, NotFoundError
 | 
|
| 24 | 24 | 
 from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 | 
| 25 | 25 | 
 from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
 | 
| 26 | 26 | 
 from buildgrid._protos.google.longrunning import operations_pb2
 | 
| ... | ... | @@ -29,35 +29,46 @@ from buildgrid._protos.google.rpc import code_pb2 | 
| 29 | 29 | 
 | 
| 30 | 30 | 
 class Job:
 | 
| 31 | 31 | 
 | 
| 32 | 
-    def __init__(self, action, action_digest):
 | 
|
| 32 | 
+    def __init__(self, action, action_digest, priority=0):
 | 
|
| 33 | 33 | 
         self.__logger = logging.getLogger(__name__)
 | 
| 34 | 34 | 
 | 
| 35 | 35 | 
         self._name = str(uuid.uuid4())
 | 
| 36 | 
+        self._priority = priority
 | 
|
| 36 | 37 | 
         self._action = remote_execution_pb2.Action()
 | 
| 37 | 
-        self._operation = operations_pb2.Operation()
 | 
|
| 38 | 38 | 
         self._lease = None
 | 
| 39 | 39 | 
 | 
| 40 | 40 | 
         self.__execute_response = None
 | 
| 41 | 41 | 
         self.__operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
 | 
| 42 | 
+        self.__operations_by_name = {}
 | 
|
| 43 | 
+        self.__operations_by_peer = {}
 | 
|
| 42 | 44 | 
 | 
| 43 | 45 | 
         self.__queued_timestamp = timestamp_pb2.Timestamp()
 | 
| 44 | 46 | 
         self.__queued_time_duration = duration_pb2.Duration()
 | 
| 45 | 47 | 
         self.__worker_start_timestamp = timestamp_pb2.Timestamp()
 | 
| 46 | 48 | 
         self.__worker_completed_timestamp = timestamp_pb2.Timestamp()
 | 
| 47 | 49 | 
 | 
| 48 | 
-        self.__operation_cancelled = False
 | 
|
| 50 | 
+        self.__operations_message_queues = {}
 | 
|
| 51 | 
+        self.__operations_cancelled = set()
 | 
|
| 49 | 52 | 
         self.__lease_cancelled = False
 | 
| 53 | 
+        self.__job_cancelled = False
 | 
|
| 50 | 54 | 
 | 
| 51 | 55 | 
         self.__operation_metadata.action_digest.CopyFrom(action_digest)
 | 
| 52 | 56 | 
         self.__operation_metadata.stage = OperationStage.UNKNOWN.value
 | 
| 53 | 57 | 
 | 
| 54 | 58 | 
         self._action.CopyFrom(action)
 | 
| 55 | 59 | 
         self._do_not_cache = self._action.do_not_cache
 | 
| 56 | 
-        self._operation_update_queues = []
 | 
|
| 57 | 
-        self._operation.name = self._name
 | 
|
| 58 | 
-        self._operation.done = False
 | 
|
| 59 | 60 | 
         self._n_tries = 0
 | 
| 60 | 61 | 
 | 
| 62 | 
+        self._done = False
 | 
|
| 63 | 
+  | 
|
| 64 | 
+    def __eq__(self, other):
 | 
|
| 65 | 
+        if isinstance(other, Job):
 | 
|
| 66 | 
+            return self.name == other.name
 | 
|
| 67 | 
+        return False
 | 
|
| 68 | 
+  | 
|
| 69 | 
+    def __ne__(self, other):
 | 
|
| 70 | 
+        return not self.__eq__(other)
 | 
|
| 71 | 
+  | 
|
| 61 | 72 | 
     # --- Public API ---
 | 
| 62 | 73 | 
 | 
| 63 | 74 | 
     @property
 | 
| ... | ... | @@ -65,12 +76,18 @@ class Job: | 
| 65 | 76 | 
         return self._name
 | 
| 66 | 77 | 
 | 
| 67 | 78 | 
     @property
 | 
| 68 | 
-    def do_not_cache(self):
 | 
|
| 69 | 
-        return self._do_not_cache
 | 
|
| 79 | 
+    def priority(self):
 | 
|
| 80 | 
+        return self._priority
 | 
|
| 70 | 81 | 
 | 
| 71 | 82 | 
     @property
 | 
| 72 | 
-    def action(self):
 | 
|
| 73 | 
-        return self._action
 | 
|
| 83 | 
+    def done(self):
 | 
|
| 84 | 
+        return self._done
 | 
|
| 85 | 
+  | 
|
| 86 | 
+    # --- Public API: REAPI ---
 | 
|
| 87 | 
+  | 
|
| 88 | 
+    @property
 | 
|
| 89 | 
+    def do_not_cache(self):
 | 
|
| 90 | 
+        return self._do_not_cache
 | 
|
| 74 | 91 | 
 | 
| 75 | 92 | 
     @property
 | 
| 76 | 93 | 
     def action_digest(self):
 | 
| ... | ... | @@ -84,19 +101,176 @@ class Job: | 
| 84 | 101 | 
             return None
 | 
| 85 | 102 | 
 | 
| 86 | 103 | 
     @property
 | 
| 87 | 
-    def holds_cached_action_result(self):
 | 
|
| 104 | 
+    def holds_cached_result(self):
 | 
|
| 88 | 105 | 
         if self.__execute_response is not None:
 | 
| 89 | 106 | 
             return self.__execute_response.cached_result
 | 
| 90 | 107 | 
         else:
 | 
| 91 | 108 | 
             return False
 | 
| 92 | 109 | 
 | 
| 93 | 
-    @property
 | 
|
| 94 | 
-    def operation(self):
 | 
|
| 95 | 
-        return self._operation
 | 
|
| 110 | 
+    def set_cached_result(self, action_result):
 | 
|
| 111 | 
+        """Allows specifying an action result form the action cache for the job.
 | 
|
| 112 | 
+  | 
|
| 113 | 
+        Note:
 | 
|
| 114 | 
+            This won't trigger any :class:`Operation` stage transition.
 | 
|
| 115 | 
+  | 
|
| 116 | 
+        Args:
 | 
|
| 117 | 
+            action_result (ActionResult): The result from cache.
 | 
|
| 118 | 
+        """
 | 
|
| 119 | 
+        self.__execute_response = remote_execution_pb2.ExecuteResponse()
 | 
|
| 120 | 
+        self.__execute_response.result.CopyFrom(action_result)
 | 
|
| 121 | 
+        self.__execute_response.cached_result = True
 | 
|
| 96 | 122 | 
 | 
| 97 | 123 | 
     @property
 | 
| 98 | 
-    def operation_stage(self):
 | 
|
| 99 | 
-        return OperationStage(self.__operation_metadata.state)
 | 
|
| 124 | 
+    def n_clients(self):
 | 
|
| 125 | 
+        return len(self.__operations_message_queues)
 | 
|
| 126 | 
+  | 
|
| 127 | 
+    def register_operation_client(self, peer, message_queue):
 | 
|
| 128 | 
+        """Subscribes to the job's :class:`Operation` stage changes.
 | 
|
| 129 | 
+  | 
|
| 130 | 
+        Args:
 | 
|
| 131 | 
+            peer (str): a unique string identifying the client.
 | 
|
| 132 | 
+            message_queue (queue.Queue): the event queue to register.
 | 
|
| 133 | 
+  | 
|
| 134 | 
+        Returns:
 | 
|
| 135 | 
+            str: The name of the subscribed :class:`Operation`.
 | 
|
| 136 | 
+        """
 | 
|
| 137 | 
+        if peer in self.__operations_by_peer:
 | 
|
| 138 | 
+            operation = self.__operations_by_peer[peer]
 | 
|
| 139 | 
+        else:
 | 
|
| 140 | 
+            operation = self.create_operation(peer)
 | 
|
| 141 | 
+  | 
|
| 142 | 
+        self.__operations_message_queues[peer] = message_queue
 | 
|
| 143 | 
+  | 
|
| 144 | 
+        self._send_operations_updates(peers=[peer])
 | 
|
| 145 | 
+  | 
|
| 146 | 
+        return operation.name
 | 
|
| 147 | 
+  | 
|
| 148 | 
+    def unregister_operation_client(self, peer):
 | 
|
| 149 | 
+        """Unsubscribes to the job's :class:`Operation` stage change.
 | 
|
| 150 | 
+  | 
|
| 151 | 
+        Args:
 | 
|
| 152 | 
+            peer (str): a unique string identifying the client.
 | 
|
| 153 | 
+        """
 | 
|
| 154 | 
+        if peer in self.__operations_message_queues:
 | 
|
| 155 | 
+            del self.__operations_message_queues[peer]
 | 
|
| 156 | 
+  | 
|
| 157 | 
+        # Drop the operation if nobody is watching it anymore:
 | 
|
| 158 | 
+        if peer in self.__operations_by_peer:
 | 
|
| 159 | 
+            operation = self.__operations_by_peer[peer]
 | 
|
| 160 | 
+  | 
|
| 161 | 
+            if operation not in self.__operations_by_peer.values():
 | 
|
| 162 | 
+                del self.__operations_by_name[operation.name]
 | 
|
| 163 | 
+  | 
|
| 164 | 
+            del self.__operations_by_peer[peer]
 | 
|
| 165 | 
+  | 
|
| 166 | 
+    def create_operation(self, peer):
 | 
|
| 167 | 
+        """Generates a new :class:`Operation` for `peer`.
 | 
|
| 168 | 
+  | 
|
| 169 | 
+        Args:
 | 
|
| 170 | 
+            peer (str): a unique string identifying the client.
 | 
|
| 171 | 
+        """
 | 
|
| 172 | 
+        if peer in self.__operations_by_peer:
 | 
|
| 173 | 
+            return self.__operations_by_peer[peer]
 | 
|
| 174 | 
+  | 
|
| 175 | 
+        new_operation = operations_pb2.Operation()
 | 
|
| 176 | 
+        # Copy state from first existing and non cancelled operation:
 | 
|
| 177 | 
+        for operation in self.__operations_by_name.values():
 | 
|
| 178 | 
+            if operation.name not in self.__operations_cancelled:
 | 
|
| 179 | 
+                new_operation.CopyFrom(operation)
 | 
|
| 180 | 
+                break
 | 
|
| 181 | 
+  | 
|
| 182 | 
+        new_operation.name = str(uuid.uuid4())
 | 
|
| 183 | 
+  | 
|
| 184 | 
+        self.__operations_by_name[new_operation.name] = new_operation
 | 
|
| 185 | 
+        self.__operations_by_peer[peer] = new_operation
 | 
|
| 186 | 
+  | 
|
| 187 | 
+        return new_operation
 | 
|
| 188 | 
+  | 
|
| 189 | 
+    def list_operations(self):
 | 
|
| 190 | 
+        """Lists the :class:`Operation` related to a job.
 | 
|
| 191 | 
+  | 
|
| 192 | 
+        Returns:
 | 
|
| 193 | 
+            list: A list of :class:`Operation` names.
 | 
|
| 194 | 
+        """
 | 
|
| 195 | 
+        return list(self.__operations_by_name.keys())
 | 
|
| 196 | 
+  | 
|
| 197 | 
+    def get_operation(self, operation_name):
 | 
|
| 198 | 
+        """Returns a copy of the the job's :class:`Operation`.
 | 
|
| 199 | 
+  | 
|
| 200 | 
+        Args:
 | 
|
| 201 | 
+            operation_name (str): the operation's name.
 | 
|
| 202 | 
+  | 
|
| 203 | 
+        Raises:
 | 
|
| 204 | 
+            NotFoundError: If no operation with `operation_name` exists.
 | 
|
| 205 | 
+        """
 | 
|
| 206 | 
+        try:
 | 
|
| 207 | 
+            operation = self.__operations_by_name[operation_name]
 | 
|
| 208 | 
+  | 
|
| 209 | 
+        except KeyError:
 | 
|
| 210 | 
+            raise NotFoundError("Operation name does not exist: [{}]"
 | 
|
| 211 | 
+                                .format(operation_name))
 | 
|
| 212 | 
+  | 
|
| 213 | 
+        return self._copy_operation(operation)
 | 
|
| 214 | 
+  | 
|
| 215 | 
+    def update_operation_stage(self, stage):
 | 
|
| 216 | 
+        """Operates a stage transition for the job's :class:`Operation`.
 | 
|
| 217 | 
+  | 
|
| 218 | 
+        Args:
 | 
|
| 219 | 
+            stage (OperationStage): the operation stage to transition to.
 | 
|
| 220 | 
+        """
 | 
|
| 221 | 
+        if stage.value == self.__operation_metadata.stage:
 | 
|
| 222 | 
+            return
 | 
|
| 223 | 
+  | 
|
| 224 | 
+        self.__operation_metadata.stage = stage.value
 | 
|
| 225 | 
+  | 
|
| 226 | 
+        if self.__operation_metadata.stage == OperationStage.QUEUED.value:
 | 
|
| 227 | 
+            if self.__queued_timestamp.ByteSize() == 0:
 | 
|
| 228 | 
+                self.__queued_timestamp.GetCurrentTime()
 | 
|
| 229 | 
+            self._n_tries += 1
 | 
|
| 230 | 
+  | 
|
| 231 | 
+        elif self.__operation_metadata.stage == OperationStage.EXECUTING.value:
 | 
|
| 232 | 
+            queue_in, queue_out = self.__queued_timestamp.ToDatetime(), datetime.now()
 | 
|
| 233 | 
+            self.__queued_time_duration.FromTimedelta(queue_out - queue_in)
 | 
|
| 234 | 
+  | 
|
| 235 | 
+        elif self.__operation_metadata.stage == OperationStage.COMPLETED.value:
 | 
|
| 236 | 
+            self._done = True
 | 
|
| 237 | 
+  | 
|
| 238 | 
+        self._send_operations_updates()
 | 
|
| 239 | 
+  | 
|
| 240 | 
+    def cancel_operation(self, peer):
 | 
|
| 241 | 
+        """Triggers a job's :class:`Operation` cancellation.
 | 
|
| 242 | 
+  | 
|
| 243 | 
+        This may cancel any job's :class:`Lease` that may have been issued.
 | 
|
| 244 | 
+  | 
|
| 245 | 
+        Args:
 | 
|
| 246 | 
+            peer (str): a unique string identifying the client.
 | 
|
| 247 | 
+        """
 | 
|
| 248 | 
+  | 
|
| 249 | 
+        operation_names, peers = set(), set()
 | 
|
| 250 | 
+        if peer in self.__operations_by_peer:
 | 
|
| 251 | 
+            operation_names.add(self.__operations_by_peer[peer].name)
 | 
|
| 252 | 
+            peers.add(peer)
 | 
|
| 253 | 
+  | 
|
| 254 | 
+        else:
 | 
|
| 255 | 
+            operation_names.update(self.__operations_by_name.keys())
 | 
|
| 256 | 
+            peers.update(self.__operations_by_peer.keys())
 | 
|
| 257 | 
+  | 
|
| 258 | 
+        operations_cancelled = operation_names - self.__operations_cancelled
 | 
|
| 259 | 
+        if not operations_cancelled:
 | 
|
| 260 | 
+            return
 | 
|
| 261 | 
+  | 
|
| 262 | 
+        self.__operations_cancelled.update(operations_cancelled)
 | 
|
| 263 | 
+  | 
|
| 264 | 
+        operation_names = set(self.__operations_by_name.keys())
 | 
|
| 265 | 
+        # Job is cancelled if all the operation are:
 | 
|
| 266 | 
+        self.__job_cancelled = bool(operation_names - self.__operations_cancelled)
 | 
|
| 267 | 
+  | 
|
| 268 | 
+        if self.__job_cancelled and self._lease is not None:
 | 
|
| 269 | 
+            self.cancel_lease()
 | 
|
| 270 | 
+  | 
|
| 271 | 
+        self._send_operations_updates(peers=peers, notify_cancelled=True)
 | 
|
| 272 | 
+  | 
|
| 273 | 
+    # --- Public API: RWAPI ---
 | 
|
| 100 | 274 | 
 | 
| 101 | 275 | 
     @property
 | 
| 102 | 276 | 
     def lease(self):
 | 
| ... | ... | @@ -117,45 +291,15 @@ class Job: | 
| 117 | 291 | 
     def n_tries(self):
 | 
| 118 | 292 | 
         return self._n_tries
 | 
| 119 | 293 | 
 | 
| 120 | 
-    @property
 | 
|
| 121 | 
-    def n_clients(self):
 | 
|
| 122 | 
-        return len(self._operation_update_queues)
 | 
|
| 123 | 
-  | 
|
| 124 | 
-    def register_client(self, queue):
 | 
|
| 125 | 
-        """Subscribes to the job's :class:`Operation` stage change events.
 | 
|
| 126 | 
-  | 
|
| 127 | 
-        Queues this :object:`Job` instance.
 | 
|
| 128 | 
-  | 
|
| 129 | 
-        Args:
 | 
|
| 130 | 
-            queue (queue.Queue): the event queue to register.
 | 
|
| 131 | 
-        """
 | 
|
| 132 | 
-        self._operation_update_queues.append(queue)
 | 
|
| 133 | 
-        queue.put(self)
 | 
|
| 134 | 
-  | 
|
| 135 | 
-    def unregister_client(self, queue):
 | 
|
| 136 | 
-        """Unsubscribes to the job's :class:`Operation` stage change events.
 | 
|
| 137 | 
-  | 
|
| 138 | 
-        Args:
 | 
|
| 139 | 
-            queue (queue.Queue): the event queue to unregister.
 | 
|
| 140 | 
-        """
 | 
|
| 141 | 
-        self._operation_update_queues.remove(queue)
 | 
|
| 142 | 
-  | 
|
| 143 | 
-    def set_cached_result(self, action_result):
 | 
|
| 144 | 
-        """Allows specifying an action result form the action cache for the job.
 | 
|
| 145 | 
-        """
 | 
|
| 146 | 
-        self.__execute_response = remote_execution_pb2.ExecuteResponse()
 | 
|
| 147 | 
-        self.__execute_response.result.CopyFrom(action_result)
 | 
|
| 148 | 
-        self.__execute_response.cached_result = True
 | 
|
| 149 | 
-  | 
|
| 150 | 294 | 
     def create_lease(self):
 | 
| 151 | 295 | 
         """Emits a new :class:`Lease` for the job.
 | 
| 152 | 296 | 
 | 
| 153 | 297 | 
         Only one :class:`Lease` can be emitted for a given job. This method
 | 
| 154 | 
-        should only be used once, any furhter calls are ignored.
 | 
|
| 298 | 
+        should only be used once, any further calls are ignored.
 | 
|
| 155 | 299 | 
         """
 | 
| 156 | 
-        if self.__operation_cancelled:
 | 
|
| 157 | 
-            return None
 | 
|
| 158 | 
-        elif self._lease is not None:
 | 
|
| 300 | 
+        if self._lease is not None:
 | 
|
| 301 | 
+            return self._lease
 | 
|
| 302 | 
+        elif self.__job_cancelled:
 | 
|
| 159 | 303 | 
             return None
 | 
| 160 | 304 | 
 | 
| 161 | 305 | 
         self._lease = bots_pb2.Lease()
 | 
| ... | ... | @@ -166,14 +310,14 @@ class Job: | 
| 166 | 310 | 
         return self._lease
 | 
| 167 | 311 | 
 | 
| 168 | 312 | 
     def update_lease_state(self, state, status=None, result=None):
 | 
| 169 | 
-        """Operates a state transition for the job's current :class:Lease.
 | 
|
| 313 | 
+        """Operates a state transition for the job's current :class:`Lease`.
 | 
|
| 170 | 314 | 
 | 
| 171 | 315 | 
         Args:
 | 
| 172 | 316 | 
             state (LeaseState): the lease state to transition to.
 | 
| 173 | 
-            status (google.rpc.Status): the lease execution status, only
 | 
|
| 174 | 
-                required if `state` is `COMPLETED`.
 | 
|
| 175 | 
-            result (google.protobuf.Any): the lease execution result, only
 | 
|
| 176 | 
-                required if `state` is `COMPLETED`.
 | 
|
| 317 | 
+            status (google.rpc.Status, optional): the lease execution status,
 | 
|
| 318 | 
+                only required if `state` is `COMPLETED`.
 | 
|
| 319 | 
+            result (google.protobuf.Any, optional): the lease execution result,
 | 
|
| 320 | 
+                only required if `state` is `COMPLETED`.
 | 
|
| 177 | 321 | 
         """
 | 
| 178 | 322 | 
         if state.value == self._lease.state:
 | 
| 179 | 323 | 
             return
 | 
| ... | ... | @@ -214,72 +358,74 @@ class Job: | 
| 214 | 358 | 
             self.__execute_response.status.CopyFrom(status)
 | 
| 215 | 359 | 
 | 
| 216 | 360 | 
     def cancel_lease(self):
 | 
| 217 | 
-        """Triggers a job's :class:Lease cancellation.
 | 
|
| 361 | 
+        """Triggers a job's :class:`Lease` cancellation.
 | 
|
| 218 | 362 | 
 | 
| 219 | 
-        This will not cancel the job's :class:Operation.
 | 
|
| 363 | 
+        Note:
 | 
|
| 364 | 
+            This will not cancel the job's :class:`Operation`.
 | 
|
| 220 | 365 | 
         """
 | 
| 221 | 366 | 
         self.__lease_cancelled = True
 | 
| 222 | 367 | 
         if self._lease is not None:
 | 
| 223 | 368 | 
             self.update_lease_state(LeaseState.CANCELLED)
 | 
| 224 | 369 | 
 | 
| 225 | 
-    def update_operation_stage(self, stage):
 | 
|
| 226 | 
-        """Operates a stage transition for the job's :class:Operation.
 | 
|
| 227 | 
-  | 
|
| 228 | 
-        Args:
 | 
|
| 229 | 
-            stage (OperationStage): the operation stage to transition to.
 | 
|
| 230 | 
-        """
 | 
|
| 231 | 
-        if stage.value == self.__operation_metadata.stage:
 | 
|
| 232 | 
-            return
 | 
|
| 370 | 
+    # --- Public API: Monitoring ---
 | 
|
| 233 | 371 | 
 | 
| 234 | 
-        self.__operation_metadata.stage = stage.value
 | 
|
| 372 | 
+    def query_queue_time(self):
 | 
|
| 373 | 
+        return self.__queued_time_duration.ToTimedelta()
 | 
|
| 235 | 374 | 
 | 
| 236 | 
-        if self.__operation_metadata.stage == OperationStage.QUEUED.value:
 | 
|
| 237 | 
-            if self.__queued_timestamp.ByteSize() == 0:
 | 
|
| 238 | 
-                self.__queued_timestamp.GetCurrentTime()
 | 
|
| 239 | 
-            self._n_tries += 1
 | 
|
| 375 | 
+    def query_n_retries(self):
 | 
|
| 376 | 
+        return self._n_tries - 1 if self._n_tries > 0 else 0
 | 
|
| 240 | 377 | 
 | 
| 241 | 
-        elif self.__operation_metadata.stage == OperationStage.EXECUTING.value:
 | 
|
| 242 | 
-            queue_in, queue_out = self.__queued_timestamp.ToDatetime(), datetime.now()
 | 
|
| 243 | 
-            self.__queued_time_duration.FromTimedelta(queue_out - queue_in)
 | 
|
| 378 | 
+    # --- Private API ---
 | 
|
| 244 | 379 | 
 | 
| 245 | 
-        elif self.__operation_metadata.stage == OperationStage.COMPLETED.value:
 | 
|
| 246 | 
-            if self.__execute_response is not None:
 | 
|
| 247 | 
-                self._operation.response.Pack(self.__execute_response)
 | 
|
| 248 | 
-            self._operation.done = True
 | 
|
| 380 | 
+    def _copy_operation(self, operation):
 | 
|
| 381 | 
+        """Simply duplicates a given :class:`Lease` object."""
 | 
|
| 382 | 
+        new_operation = operations_pb2.Operation()
 | 
|
| 249 | 383 | 
 | 
| 250 | 
-        self._operation.metadata.Pack(self.__operation_metadata)
 | 
|
| 384 | 
+        new_operation.CopyFrom(operation)
 | 
|
| 251 | 385 | 
 | 
| 252 | 
-        for queue in self._operation_update_queues:
 | 
|
| 253 | 
-            queue.put(self)
 | 
|
| 386 | 
+        return new_operation
 | 
|
| 254 | 387 | 
 | 
| 255 | 
-    def check_operation_status(self):
 | 
|
| 256 | 
-        """Reports errors on unexpected job's :class:Operation state.
 | 
|
| 388 | 
+    def _send_operations_updates(self, peers=None, notify_cancelled=False):
 | 
|
| 389 | 
+        """Sends :class:`Operation` stage change messages to watchers."""
 | 
|
| 390 | 
+        for operation in self.__operations_by_name.values():
 | 
|
| 391 | 
+            if operation.name not in self.__operations_cancelled:
 | 
|
| 392 | 
+                operation_metadata = self.__operation_metadata
 | 
|
| 393 | 
+                execute_response = self.__execute_response
 | 
|
| 257 | 394 | 
 | 
| 258 | 
-        Raises:
 | 
|
| 259 | 
-            CancelledError: if the job's :class:Operation was cancelled.
 | 
|
| 260 | 
-        """
 | 
|
| 261 | 
-        if self.__operation_cancelled:
 | 
|
| 262 | 
-            raise CancelledError(self.__execute_response.status.message)
 | 
|
| 395 | 
+                operation_done = self._done
 | 
|
| 263 | 396 | 
 | 
| 264 | 
-    def cancel_operation(self):
 | 
|
| 265 | 
-        """Triggers a job's :class:Operation cancellation.
 | 
|
| 397 | 
+            else:
 | 
|
| 398 | 
+                operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
 | 
|
| 399 | 
+                operation_metadata.CopyFrom(self.__operation_metadata)
 | 
|
| 400 | 
+                operation_metadata.stage = OperationStage.COMPLETED.value
 | 
|
| 266 | 401 | 
 | 
| 267 | 
-        This will also cancel any job's :class:Lease that may have been issued.
 | 
|
| 268 | 
-        """
 | 
|
| 269 | 
-        self.__operation_cancelled = True
 | 
|
| 270 | 
-        if self._lease is not None:
 | 
|
| 271 | 
-            self.cancel_lease()
 | 
|
| 402 | 
+                execute_response = remote_execution_pb2.ExecuteResponse()
 | 
|
| 403 | 
+                if self.__execute_response is not None:
 | 
|
| 404 | 
+                    execute_response.CopyFrom(self.__execute_response)
 | 
|
| 405 | 
+                execute_response.status.code = code_pb2.CANCELLED
 | 
|
| 406 | 
+                execute_response.status.message = "Operation cancelled by client."
 | 
|
| 272 | 407 | 
 | 
| 273 | 
-        self.__execute_response = remote_execution_pb2.ExecuteResponse()
 | 
|
| 274 | 
-        self.__execute_response.status.code = code_pb2.CANCELLED
 | 
|
| 275 | 
-        self.__execute_response.status.message = "Operation cancelled by client."
 | 
|
| 408 | 
+                operation_done = True
 | 
|
| 276 | 409 | 
 | 
| 277 | 
-        self.update_operation_stage(OperationStage.COMPLETED)
 | 
|
| 410 | 
+            if execute_response is not None:
 | 
|
| 411 | 
+                operation.response.Pack(execute_response)
 | 
|
| 412 | 
+            operation.metadata.Pack(operation_metadata)
 | 
|
| 413 | 
+            operation.done = operation_done
 | 
|
| 278 | 414 | 
 | 
| 279 | 
-    # --- Public API: Monitoring ---
 | 
|
| 415 | 
+        for peer, message_queue in self.__operations_message_queues.items():
 | 
|
| 416 | 
+            if peer not in self.__operations_by_peer:
 | 
|
| 417 | 
+                continue
 | 
|
| 418 | 
+            elif peers and peer not in peers:
 | 
|
| 419 | 
+                continue
 | 
|
| 280 | 420 | 
 | 
| 281 | 
-    def query_queue_time(self):
 | 
|
| 282 | 
-        return self.__queued_time_duration.ToTimedelta()
 | 
|
| 421 | 
+            operation = self.__operations_by_peer[peer]
 | 
|
| 422 | 
+            # Messages are pairs of (Exception, Operation,):
 | 
|
| 423 | 
+            if not notify_cancelled and operation.name in self.__operations_cancelled:
 | 
|
| 424 | 
+                continue
 | 
|
| 425 | 
+            elif operation.name not in self.__operations_cancelled:
 | 
|
| 426 | 
+                message = (None, self._copy_operation(operation),)
 | 
|
| 427 | 
+            else:
 | 
|
| 428 | 
+                message = (CancelledError("Operation has been cancelled"),
 | 
|
| 429 | 
+                           self._copy_operation(operation),)
 | 
|
| 283 | 430 | 
 | 
| 284 | 
-    def query_n_retries(self):
 | 
|
| 285 | 
-        return self._n_tries - 1 if self._n_tries > 0 else 0
 | 
|
| 431 | 
+            message_queue.put(message)
 | 
| ... | ... | @@ -21,7 +21,7 @@ An instance of the LongRunningOperations Service. | 
| 21 | 21 | 
 | 
| 22 | 22 | 
 import logging
 | 
| 23 | 23 | 
 | 
| 24 | 
-from buildgrid._exceptions import InvalidArgumentError
 | 
|
| 24 | 
+from buildgrid._exceptions import InvalidArgumentError, NotFoundError
 | 
|
| 25 | 25 | 
 from buildgrid._protos.google.longrunning import operations_pb2
 | 
| 26 | 26 | 
 | 
| 27 | 27 | 
 | 
| ... | ... | @@ -39,62 +39,43 @@ class OperationsInstance: | 
| 39 | 39 | 
     def register_instance_with_server(self, instance_name, server):
 | 
| 40 | 40 | 
         server.add_operations_instance(self, instance_name)
 | 
| 41 | 41 | 
 | 
| 42 | 
-    def get_operation(self, name):
 | 
|
| 43 | 
-        job = self._scheduler.jobs.get(name)
 | 
|
| 42 | 
+    def get_operation(self, job_name):
 | 
|
| 43 | 
+        try:
 | 
|
| 44 | 
+            operation = self._scheduler.get_job_operation(job_name)
 | 
|
| 44 | 45 | 
 | 
| 45 | 
-        if job is None:
 | 
|
| 46 | 
-            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
 | 
|
| 46 | 
+        except NotFoundError:
 | 
|
| 47 | 
+            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
 | 
|
| 47 | 48 | 
 | 
| 48 | 
-        else:
 | 
|
| 49 | 
-            return job.operation
 | 
|
| 49 | 
+        return operation
 | 
|
| 50 | 50 | 
 | 
| 51 | 51 | 
     def list_operations(self, list_filter, page_size, page_token):
 | 
| 52 | 52 | 
         # TODO: Pages
 | 
| 53 | 53 | 
         # Spec says number of pages and length of a page are optional
 | 
| 54 | 54 | 
         response = operations_pb2.ListOperationsResponse()
 | 
| 55 | 
+  | 
|
| 56 | 
+        operation_names = [operation_name for job_name in
 | 
|
| 57 | 
+                           self._scheduler.list_current_jobs() for operation_name in
 | 
|
| 58 | 
+                           self._scheduler.list_job_operations(job_name)]
 | 
|
| 59 | 
+  | 
|
| 55 | 60 | 
         operations = []
 | 
| 56 | 
-        for job in self._scheduler.list_jobs():
 | 
|
| 57 | 
-            op = operations_pb2.Operation()
 | 
|
| 58 | 
-            op.CopyFrom(job.operation)
 | 
|
| 59 | 
-            operations.append(op)
 | 
|
| 61 | 
+        for operation_name in operation_names:
 | 
|
| 62 | 
+            operation = self._scheduler.get_job_operation(operation_name)
 | 
|
| 63 | 
+            operations.append(operation)
 | 
|
| 60 | 64 | 
 | 
| 61 | 65 | 
         response.operations.extend(operations)
 | 
| 62 | 66 | 
 | 
| 63 | 67 | 
         return response
 | 
| 64 | 68 | 
 | 
| 65 | 
-    def delete_operation(self, name):
 | 
|
| 66 | 
-        try:
 | 
|
| 67 | 
-            self._scheduler.jobs.pop(name)
 | 
|
| 68 | 
-  | 
|
| 69 | 
-        except KeyError:
 | 
|
| 70 | 
-            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
 | 
|
| 71 | 
-  | 
|
| 72 | 
-    def cancel_operation(self, name):
 | 
|
| 69 | 
+    def delete_operation(self, job_name):
 | 
|
| 73 | 70 | 
         try:
 | 
| 74 | 
-            self._scheduler.cancel_job_operation(name)
 | 
|
| 71 | 
+            self._scheduler.delete_job_operation(job_name)
 | 
|
| 75 | 72 | 
 | 
| 76 | 
-        except KeyError:
 | 
|
| 77 | 
-            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
 | 
|
| 73 | 
+        except NotFoundError:
 | 
|
| 74 | 
+            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
 | 
|
| 78 | 75 | 
 | 
| 79 | 
-    def register_message_client(self, name, queue):
 | 
|
| 76 | 
+    def cancel_operation(self, job_name):
 | 
|
| 80 | 77 | 
         try:
 | 
| 81 | 
-            self._scheduler.register_client(name, queue)
 | 
|
| 82 | 
-  | 
|
| 83 | 
-        except KeyError:
 | 
|
| 84 | 
-            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
 | 
|
| 85 | 
-  | 
|
| 86 | 
-    def unregister_message_client(self, name, queue):
 | 
|
| 87 | 
-        try:
 | 
|
| 88 | 
-            self._scheduler.unregister_client(name, queue)
 | 
|
| 89 | 
-  | 
|
| 90 | 
-        except KeyError:
 | 
|
| 91 | 
-            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
 | 
|
| 92 | 
-  | 
|
| 93 | 
-    def stream_operation_updates(self, message_queue, operation_name):
 | 
|
| 94 | 
-        job = message_queue.get()
 | 
|
| 95 | 
-        while not job.operation.done:
 | 
|
| 96 | 
-            yield job.operation
 | 
|
| 97 | 
-            job = message_queue.get()
 | 
|
| 98 | 
-            job.check_operation_status()
 | 
|
| 78 | 
+            self._scheduler.cancel_job_operation(job_name)
 | 
|
| 99 | 79 | 
 | 
| 100 | 
-        yield job.operation
 | 
|
| 80 | 
+        except NotFoundError:
 | 
|
| 81 | 
+            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
 | 
| ... | ... | @@ -25,6 +25,7 @@ import logging | 
| 25 | 25 | 
 | 
| 26 | 26 | 
 from buildgrid._enums import LeaseState, OperationStage
 | 
| 27 | 27 | 
 from buildgrid._exceptions import NotFoundError
 | 
| 28 | 
+from buildgrid.server.job import Job
 | 
|
| 28 | 29 | 
 | 
| 29 | 30 | 
 | 
| 30 | 31 | 
 class Scheduler:
 | 
| ... | ... | @@ -42,8 +43,12 @@ class Scheduler: | 
| 42 | 43 | 
         self.__retries_count = 0
 | 
| 43 | 44 | 
 | 
| 44 | 45 | 
         self._action_cache = action_cache
 | 
| 45 | 
-        self.jobs = {}
 | 
|
| 46 | 
-        self.queue = deque()
 | 
|
| 46 | 
+  | 
|
| 47 | 
+        self.__jobs_by_action = {}
 | 
|
| 48 | 
+        self.__jobs_by_operation = {}
 | 
|
| 49 | 
+        self.__jobs_by_name = {}
 | 
|
| 50 | 
+  | 
|
| 51 | 
+        self.__queue = deque()
 | 
|
| 47 | 52 | 
 | 
| 48 | 53 | 
         self._is_instrumented = monitor
 | 
| 49 | 54 | 
 | 
| ... | ... | @@ -52,39 +57,132 @@ class Scheduler: | 
| 52 | 57 | 
 | 
| 53 | 58 | 
     # --- Public API ---
 | 
| 54 | 59 | 
 | 
| 55 | 
-    def register_client(self, job_name, queue):
 | 
|
| 56 | 
-        job = self.jobs[job_name]
 | 
|
| 60 | 
+    def list_current_jobs(self):
 | 
|
| 61 | 
+        """Returns a list of the :class:`Job` names currently managed."""
 | 
|
| 62 | 
+        return self.__jobs_by_name.keys()
 | 
|
| 57 | 63 | 
 | 
| 58 | 
-        job.register_client(queue)
 | 
|
| 64 | 
+    def list_job_operations(self, job_name):
 | 
|
| 65 | 
+        """Returns a list of :class:`Operation` names for a :class:`Job`."""
 | 
|
| 66 | 
+        if job_name in self.__jobs_by_name:
 | 
|
| 67 | 
+            return self.__jobs_by_name[job_name].list_operations()
 | 
|
| 68 | 
+        else:
 | 
|
| 69 | 
+            return []
 | 
|
| 59 | 70 | 
 | 
| 60 | 
-    def unregister_client(self, job_name, queue):
 | 
|
| 61 | 
-        job = self.jobs[job_name]
 | 
|
| 71 | 
+    # --- Public API: REAPI ---
 | 
|
| 62 | 72 | 
 | 
| 63 | 
-        job.unregister_client(queue)
 | 
|
| 73 | 
+    def register_job_operation_client(self, operation_name, peer, message_queue):
 | 
|
| 74 | 
+        """Subscribes to one of the job's :class:`Operation` stage changes.
 | 
|
| 64 | 75 | 
 | 
| 65 | 
-        if not job.n_clients and job.operation.done:
 | 
|
| 66 | 
-            del self.jobs[job_name]
 | 
|
| 76 | 
+        Args:
 | 
|
| 77 | 
+            operation_name (str): name of the operation to subscribe to.
 | 
|
| 78 | 
+            peer (str): a unique string identifying the client.
 | 
|
| 79 | 
+            message_queue (queue.Queue): the event queue to register.
 | 
|
| 67 | 80 | 
 | 
| 68 | 
-            if self._is_instrumented:
 | 
|
| 69 | 
-                self.__operations_by_stage[OperationStage.CACHE_CHECK].discard(job_name)
 | 
|
| 70 | 
-                self.__operations_by_stage[OperationStage.QUEUED].discard(job_name)
 | 
|
| 71 | 
-                self.__operations_by_stage[OperationStage.EXECUTING].discard(job_name)
 | 
|
| 72 | 
-                self.__operations_by_stage[OperationStage.COMPLETED].discard(job_name)
 | 
|
| 81 | 
+        Returns:
 | 
|
| 82 | 
+            str: The name of the subscribed :class:`Operation`.
 | 
|
| 83 | 
+  | 
|
| 84 | 
+        Raises:
 | 
|
| 85 | 
+            NotFoundError: If no operation with `operation_name` exists.
 | 
|
| 86 | 
+        """
 | 
|
| 87 | 
+        if operation_name in self.__jobs_by_operation:
 | 
|
| 88 | 
+            job = self.__jobs_by_operation[operation_name]
 | 
|
| 89 | 
+  | 
|
| 90 | 
+        elif operation_name in self.__jobs_by_name:
 | 
|
| 91 | 
+            job = self.__jobs_by_name[operation_name]
 | 
|
| 92 | 
+  | 
|
| 93 | 
+        else:
 | 
|
| 94 | 
+            raise NotFoundError("Operation name does not exist: [{}]"
 | 
|
| 95 | 
+                                .format(operation_name))
 | 
|
| 96 | 
+  | 
|
| 97 | 
+        operation_name = job.register_operation_client(peer, message_queue)
 | 
|
| 98 | 
+  | 
|
| 99 | 
+        self.__jobs_by_operation[operation_name] = job
 | 
|
| 100 | 
+  | 
|
| 101 | 
+        return operation_name
 | 
|
| 102 | 
+  | 
|
| 103 | 
+    def unregister_job_operation_client(self, operation_name, peer):
 | 
|
| 104 | 
+        """Unsubscribes to one of the job's :class:`Operation` stage change.
 | 
|
| 105 | 
+  | 
|
| 106 | 
+        Args:
 | 
|
| 107 | 
+            operation_name (str): name of the operation to unsubscribe from.
 | 
|
| 108 | 
+            peer (str): a unique string identifying the client.
 | 
|
| 109 | 
+  | 
|
| 110 | 
+        Raises:
 | 
|
| 111 | 
+            NotFoundError: If no operation with `operation_name` exists.
 | 
|
| 112 | 
+        """
 | 
|
| 113 | 
+        if operation_name in self.__jobs_by_operation:
 | 
|
| 114 | 
+            job = self.__jobs_by_operation[operation_name]
 | 
|
| 115 | 
+  | 
|
| 116 | 
+        elif operation_name in self.__jobs_by_name:
 | 
|
| 117 | 
+            job = self.__jobs_by_name[operation_name]
 | 
|
| 118 | 
+  | 
|
| 119 | 
+        else:
 | 
|
| 120 | 
+            raise NotFoundError("Operation name does not exist: [{}]"
 | 
|
| 121 | 
+                                .format(operation_name))
 | 
|
| 122 | 
+  | 
|
| 123 | 
+        if operation_name in self.__jobs_by_operation:
 | 
|
| 124 | 
+            del self.__jobs_by_operation[operation_name]
 | 
|
| 125 | 
+  | 
|
| 126 | 
+        job.unregister_operation_client(peer)
 | 
|
| 127 | 
+  | 
|
| 128 | 
+        if job.n_clients == 0 and job.done:
 | 
|
| 129 | 
+            self._delete_job(job.name)
 | 
|
| 130 | 
+  | 
|
| 131 | 
+    def queue_job_operation(self, action, action_digest, priority=0, skip_cache_lookup=False):
 | 
|
| 132 | 
+        """Inserts a newly created job into the execution queue.
 | 
|
| 133 | 
+  | 
|
| 134 | 
+        Warning:
 | 
|
| 135 | 
+            Priority is handle like a POSIX ``nice`` values: a higher value
 | 
|
| 136 | 
+            means a low priority, 0 being default priority.
 | 
|
| 73 | 137 | 
 | 
| 74 | 
-                self.__leases_by_state[LeaseState.PENDING].discard(job_name)
 | 
|
| 75 | 
-                self.__leases_by_state[LeaseState.ACTIVE].discard(job_name)
 | 
|
| 76 | 
-                self.__leases_by_state[LeaseState.COMPLETED].discard(job_name)
 | 
|
| 138 | 
+        Args:
 | 
|
| 139 | 
+            action (Action): the given action to queue for execution.
 | 
|
| 140 | 
+            action_digest (Digest): the digest of the given action.
 | 
|
| 141 | 
+            priority (int): the execution job's priority.
 | 
|
| 142 | 
+            skip_cache_lookup (bool): whether or not to look for pre-computed
 | 
|
| 143 | 
+                result for the given action.
 | 
|
| 144 | 
+  | 
|
| 145 | 
+        Returns:
 | 
|
| 146 | 
+            str: the newly created operation's name.
 | 
|
| 147 | 
+        """
 | 
|
| 148 | 
+        def __queue_job(jobs_queue, new_job):
 | 
|
| 149 | 
+            index = 0
 | 
|
| 150 | 
+            for queued_job in reversed(jobs_queue):
 | 
|
| 151 | 
+                if new_job.priority < queued_job.priority:
 | 
|
| 152 | 
+                    index += 1
 | 
|
| 153 | 
+                else:
 | 
|
| 154 | 
+                    break
 | 
|
| 155 | 
+  | 
|
| 156 | 
+            index = len(jobs_queue) - index
 | 
|
| 157 | 
+  | 
|
| 158 | 
+            jobs_queue.insert(index, new_job)
 | 
|
| 159 | 
+  | 
|
| 160 | 
+        if action_digest.hash in self.__jobs_by_action:
 | 
|
| 161 | 
+            job = self.__jobs_by_action[action_digest.hash]
 | 
|
| 162 | 
+  | 
|
| 163 | 
+            # Reschedule if priority is now greater:
 | 
|
| 164 | 
+            if priority < job.priority:
 | 
|
| 165 | 
+                job.priority = priority
 | 
|
| 166 | 
+  | 
|
| 167 | 
+                if job in self.__queue:
 | 
|
| 168 | 
+                    self.__queue.remove(job)
 | 
|
| 169 | 
+                    __queue_job(self.__queue, job)
 | 
|
| 77 | 170 | 
 | 
| 78 | 
-    def queue_job(self, job, skip_cache_lookup=False):
 | 
|
| 79 | 
-        self.jobs[job.name] = job
 | 
|
| 171 | 
+            return job.name
 | 
|
| 172 | 
+  | 
|
| 173 | 
+        job = Job(action, action_digest, priority=priority)
 | 
|
| 174 | 
+  | 
|
| 175 | 
+        self.__jobs_by_action[job.action_digest.hash] = job
 | 
|
| 176 | 
+        self.__jobs_by_name[job.name] = job
 | 
|
| 80 | 177 | 
 | 
| 81 | 178 | 
         operation_stage = None
 | 
| 82 | 179 | 
         if self._action_cache is not None and not skip_cache_lookup:
 | 
| 83 | 180 | 
             try:
 | 
| 84 | 181 | 
                 action_result = self._action_cache.get_action_result(job.action_digest)
 | 
| 182 | 
+  | 
|
| 85 | 183 | 
             except NotFoundError:
 | 
| 86 | 184 | 
                 operation_stage = OperationStage.QUEUED
 | 
| 87 | 
-                self.queue.append(job)
 | 
|
| 185 | 
+                __queue_job(self.__queue, job)
 | 
|
| 88 | 186 | 
 | 
| 89 | 187 | 
             else:
 | 
| 90 | 188 | 
                 job.set_cached_result(action_result)
 | 
| ... | ... | @@ -95,28 +193,68 @@ class Scheduler: | 
| 95 | 193 | 
 | 
| 96 | 194 | 
         else:
 | 
| 97 | 195 | 
             operation_stage = OperationStage.QUEUED
 | 
| 98 | 
-            self.queue.append(job)
 | 
|
| 196 | 
+            __queue_job(self.__queue, job)
 | 
|
| 99 | 197 | 
 | 
| 100 | 198 | 
         self._update_job_operation_stage(job.name, operation_stage)
 | 
| 101 | 199 | 
 | 
| 102 | 
-    def retry_job(self, job_name):
 | 
|
| 103 | 
-        job = self.jobs[job_name]
 | 
|
| 200 | 
+        return job.name
 | 
|
| 104 | 201 | 
 | 
| 105 | 
-        operation_stage = None
 | 
|
| 106 | 
-        if job.n_tries >= self.MAX_N_TRIES:
 | 
|
| 107 | 
-            # TODO: Decide what to do with these jobs
 | 
|
| 108 | 
-            operation_stage = OperationStage.COMPLETED
 | 
|
| 109 | 
-            # TODO: Mark these jobs as done
 | 
|
| 202 | 
+    def get_job_operation(self, operation_name):
 | 
|
| 203 | 
+        """Retrieves a job's :class:`Operation` by name.
 | 
|
| 110 | 204 | 
 | 
| 111 | 
-        else:
 | 
|
| 112 | 
-            operation_stage = OperationStage.QUEUED
 | 
|
| 113 | 
-            job.update_lease_state(LeaseState.PENDING)
 | 
|
| 114 | 
-            self.queue.append(job)
 | 
|
| 205 | 
+        Args:
 | 
|
| 206 | 
+            operation_name (str): name of the operation to query.
 | 
|
| 115 | 207 | 
 | 
| 116 | 
-        self._update_job_operation_stage(job_name, operation_stage)
 | 
|
| 208 | 
+        Raises:
 | 
|
| 209 | 
+            NotFoundError: If no operation with `operation_name` exists.
 | 
|
| 210 | 
+        """
 | 
|
| 211 | 
+        try:
 | 
|
| 212 | 
+            job = self.__jobs_by_operation[operation_name]
 | 
|
| 117 | 213 | 
 | 
| 118 | 
-    def list_jobs(self):
 | 
|
| 119 | 
-        return self.jobs.values()
 | 
|
| 214 | 
+        except KeyError:
 | 
|
| 215 | 
+            raise NotFoundError("Operation name does not exist: [{}]"
 | 
|
| 216 | 
+                                .format(operation_name))
 | 
|
| 217 | 
+  | 
|
| 218 | 
+        return job.get_operation(operation_name)
 | 
|
| 219 | 
+  | 
|
| 220 | 
+    def cancel_job_operation(self, operation_name):
 | 
|
| 221 | 
+        """"Cancels a job's :class:`Operation` by name.
 | 
|
| 222 | 
+  | 
|
| 223 | 
+        Args:
 | 
|
| 224 | 
+            operation_name (str): name of the operation to cancel.
 | 
|
| 225 | 
+  | 
|
| 226 | 
+        Raises:
 | 
|
| 227 | 
+            NotFoundError: If no operation with `operation_name` exists.
 | 
|
| 228 | 
+        """
 | 
|
| 229 | 
+        try:
 | 
|
| 230 | 
+            job = self.__jobs_by_operation[operation_name]
 | 
|
| 231 | 
+  | 
|
| 232 | 
+        except KeyError:
 | 
|
| 233 | 
+            raise NotFoundError("Operation name does not exist: [{}]"
 | 
|
| 234 | 
+                                .format(operation_name))
 | 
|
| 235 | 
+  | 
|
| 236 | 
+        job.cancel_operation(operation_name)
 | 
|
| 237 | 
+  | 
|
| 238 | 
+    def delete_job_operation(self, operation_name):
 | 
|
| 239 | 
+        """"Removes a job.
 | 
|
| 240 | 
+  | 
|
| 241 | 
+        Args:
 | 
|
| 242 | 
+            operation_name (str): name of the operation to cancel.
 | 
|
| 243 | 
+  | 
|
| 244 | 
+        Raises:
 | 
|
| 245 | 
+            NotFoundError: If no operation with `operation_name` exists.
 | 
|
| 246 | 
+        """
 | 
|
| 247 | 
+        try:
 | 
|
| 248 | 
+            job = self.__jobs_by_name[operation_name]
 | 
|
| 249 | 
+  | 
|
| 250 | 
+        except KeyError:
 | 
|
| 251 | 
+            raise NotFoundError("Operation name does not exist: [{}]"
 | 
|
| 252 | 
+                                .format(operation_name))
 | 
|
| 253 | 
+  | 
|
| 254 | 
+        if job.n_clients == 0 and (job.done or job.lease is None):
 | 
|
| 255 | 
+            self._delete_job(job.name)
 | 
|
| 256 | 
+  | 
|
| 257 | 
+    # --- Public API: RWAPI ---
 | 
|
| 120 | 258 | 
 | 
| 121 | 259 | 
     def request_job_leases(self, worker_capabilities):
 | 
| 122 | 260 | 
         """Generates a list of the highest priority leases to be run.
 | 
| ... | ... | @@ -126,10 +264,10 @@ class Scheduler: | 
| 126 | 264 | 
                 worker properties, configuration and state at the time of the
 | 
| 127 | 265 | 
                 request.
 | 
| 128 | 266 | 
         """
 | 
| 129 | 
-        if not self.queue:
 | 
|
| 267 | 
+        if not self.__queue:
 | 
|
| 130 | 268 | 
             return []
 | 
| 131 | 269 | 
 | 
| 132 | 
-        job = self.queue.popleft()
 | 
|
| 270 | 
+        job = self.__queue.popleft()
 | 
|
| 133 | 271 | 
 | 
| 134 | 272 | 
         lease = job.lease
 | 
| 135 | 273 | 
 | 
| ... | ... | @@ -142,18 +280,25 @@ class Scheduler: | 
| 142 | 280 | 
 | 
| 143 | 281 | 
         return None
 | 
| 144 | 282 | 
 | 
| 145 | 
-    def update_job_lease(self, lease):
 | 
|
| 283 | 
+    def update_job_lease_state(self, job_name, lease):
 | 
|
| 146 | 284 | 
         """Requests a state transition for a job's current :class:Lease.
 | 
| 147 | 285 | 
 | 
| 286 | 
+        Note:
 | 
|
| 287 | 
+            This may trigger a job's :class:`Operation` stage transition.
 | 
|
| 288 | 
+  | 
|
| 148 | 289 | 
         Args:
 | 
| 149 | 290 | 
             job_name (str): name of the job to query.
 | 
| 150 | 
-            lease_state (LeaseState): the lease state to transition to.
 | 
|
| 151 | 
-            lease_status (google.rpc.Status): the lease execution status, only
 | 
|
| 152 | 
-                required if `lease_state` is `COMPLETED`.
 | 
|
| 153 | 
-            lease_result (google.protobuf.Any): the lease execution result, only
 | 
|
| 154 | 
-                required if `lease_state` is `COMPLETED`.
 | 
|
| 291 | 
+            lease (Lease): the lease holding the new state.
 | 
|
| 292 | 
+  | 
|
| 293 | 
+        Raises:
 | 
|
| 294 | 
+            NotFoundError: If no job with `job_name` exists.
 | 
|
| 155 | 295 | 
         """
 | 
| 156 | 
-        job = self.jobs[lease.id]
 | 
|
| 296 | 
+        try:
 | 
|
| 297 | 
+            job = self.__jobs_by_name[job_name]
 | 
|
| 298 | 
+  | 
|
| 299 | 
+        except KeyError:
 | 
|
| 300 | 
+            raise NotFoundError("Job name does not exist: [{}]".format(job_name))
 | 
|
| 301 | 
+  | 
|
| 157 | 302 | 
         lease_state = LeaseState(lease.state)
 | 
| 158 | 303 | 
 | 
| 159 | 304 | 
         operation_stage = None
 | 
| ... | ... | @@ -189,29 +334,72 @@ class Scheduler: | 
| 189 | 334 | 
                 self.__leases_by_state[LeaseState.ACTIVE].discard(lease.id)
 | 
| 190 | 335 | 
                 self.__leases_by_state[LeaseState.COMPLETED].add(lease.id)
 | 
| 191 | 336 | 
 | 
| 192 | 
-        self._update_job_operation_stage(lease.id, operation_stage)
 | 
|
| 337 | 
+        self._update_job_operation_stage(job_name, operation_stage)
 | 
|
| 338 | 
+  | 
|
| 339 | 
+    def retry_job_lease(self, job_name):
 | 
|
| 340 | 
+        """Re-queues a job on lease execution failure.
 | 
|
| 341 | 
+  | 
|
| 342 | 
+        Note:
 | 
|
| 343 | 
+            This may trigger a job's :class:`Operation` stage transition.
 | 
|
| 344 | 
+  | 
|
| 345 | 
+        Args:
 | 
|
| 346 | 
+            job_name (str): name of the job to query.
 | 
|
| 347 | 
+  | 
|
| 348 | 
+        Raises:
 | 
|
| 349 | 
+            NotFoundError: If no job with `job_name` exists.
 | 
|
| 350 | 
+        """
 | 
|
| 351 | 
+        try:
 | 
|
| 352 | 
+            job = self.__jobs_by_name[job_name]
 | 
|
| 353 | 
+  | 
|
| 354 | 
+        except KeyError:
 | 
|
| 355 | 
+            raise NotFoundError("Job name does not exist: [{}]".format(job_name))
 | 
|
| 356 | 
+  | 
|
| 357 | 
+        operation_stage = None
 | 
|
| 358 | 
+        if job.n_tries >= self.MAX_N_TRIES:
 | 
|
| 359 | 
+            # TODO: Decide what to do with these jobs
 | 
|
| 360 | 
+            operation_stage = OperationStage.COMPLETED
 | 
|
| 361 | 
+            # TODO: Mark these jobs as done
 | 
|
| 362 | 
+  | 
|
| 363 | 
+        else:
 | 
|
| 364 | 
+            operation_stage = OperationStage.QUEUED
 | 
|
| 365 | 
+            job.update_lease_state(LeaseState.PENDING)
 | 
|
| 366 | 
+            self.__queue.append(job)
 | 
|
| 367 | 
+  | 
|
| 368 | 
+        self._update_job_operation_stage(job_name, operation_stage)
 | 
|
| 193 | 369 | 
 | 
| 194 | 370 | 
     def get_job_lease(self, job_name):
 | 
| 195 | 
-        """Returns the lease associated to job, if any have been emitted yet."""
 | 
|
| 196 | 
-        return self.jobs[job_name].lease
 | 
|
| 371 | 
+        """Returns the lease associated to job, if any have been emitted yet.
 | 
|
| 197 | 372 | 
 | 
| 198 | 
-    def get_job_lease_cancelled(self, job_name):
 | 
|
| 199 | 
-        """Returns true if the lease is cancelled"""
 | 
|
| 200 | 
-        return self.jobs[job_name].lease_cancelled
 | 
|
| 373 | 
+        Args:
 | 
|
| 374 | 
+            job_name (str): name of the job to query.
 | 
|
| 375 | 
+  | 
|
| 376 | 
+        Raises:
 | 
|
| 377 | 
+            NotFoundError: If no job with `job_name` exists.
 | 
|
| 378 | 
+        """
 | 
|
| 379 | 
+        try:
 | 
|
| 380 | 
+            job = self.__jobs_by_name[job_name]
 | 
|
| 201 | 381 | 
 | 
| 202 | 
-    def get_job_operation(self, job_name):
 | 
|
| 203 | 
-        """Returns the operation associated to job."""
 | 
|
| 204 | 
-        return self.jobs[job_name].operation
 | 
|
| 382 | 
+        except KeyError:
 | 
|
| 383 | 
+            raise NotFoundError("Job name does not exist: [{}]".format(job_name))
 | 
|
| 205 | 384 | 
 | 
| 206 | 
-    def cancel_job_operation(self, job_name):
 | 
|
| 207 | 
-        """"Cancels the underlying operation of a given job.
 | 
|
| 385 | 
+        return job.lease
 | 
|
| 208 | 386 | 
 | 
| 209 | 
-        This will also cancel any job's lease that may have been issued.
 | 
|
| 387 | 
+    def get_job_lease_cancelled(self, job_name):
 | 
|
| 388 | 
+        """Returns true if the lease is cancelled.
 | 
|
| 210 | 389 | 
 | 
| 211 | 390 | 
         Args:
 | 
| 212 | 
-            job_name (str): name of the job holding the operation to cancel.
 | 
|
| 391 | 
+            job_name (str): name of the job to query.
 | 
|
| 392 | 
+  | 
|
| 393 | 
+        Raises:
 | 
|
| 394 | 
+            NotFoundError: If no job with `job_name` exists.
 | 
|
| 213 | 395 | 
         """
 | 
| 214 | 
-        self.jobs[job_name].cancel_operation()
 | 
|
| 396 | 
+        try:
 | 
|
| 397 | 
+            job = self.__jobs_by_name[job_name]
 | 
|
| 398 | 
+  | 
|
| 399 | 
+        except KeyError:
 | 
|
| 400 | 
+            raise NotFoundError("Job name does not exist: [{}]".format(job_name))
 | 
|
| 401 | 
+  | 
|
| 402 | 
+        return job.lease_cancelled
 | 
|
| 215 | 403 | 
 | 
| 216 | 404 | 
     # --- Public API: Monitoring ---
 | 
| 217 | 405 | 
 | 
| ... | ... | @@ -261,11 +449,11 @@ class Scheduler: | 
| 261 | 449 | 
             self.__build_metadata_queues.append(message_queue)
 | 
| 262 | 450 | 
 | 
| 263 | 451 | 
     def query_n_jobs(self):
 | 
| 264 | 
-        return len(self.jobs)
 | 
|
| 452 | 
+        return len(self.__jobs_by_name)
 | 
|
| 265 | 453 | 
 | 
| 266 | 454 | 
     def query_n_operations(self):
 | 
| 267 | 455 | 
         # For now n_operations == n_jobs:
 | 
| 268 | 
-        return len(self.jobs)
 | 
|
| 456 | 
+        return len(self.__jobs_by_operation)
 | 
|
| 269 | 457 | 
 | 
| 270 | 458 | 
     def query_n_operations_by_stage(self, operation_stage):
 | 
| 271 | 459 | 
         try:
 | 
| ... | ... | @@ -276,7 +464,7 @@ class Scheduler: | 
| 276 | 464 | 
         return 0
 | 
| 277 | 465 | 
 | 
| 278 | 466 | 
     def query_n_leases(self):
 | 
| 279 | 
-        return len(self.jobs)
 | 
|
| 467 | 
+        return len(self.__jobs_by_name)
 | 
|
| 280 | 468 | 
 | 
| 281 | 469 | 
     def query_n_leases_by_state(self, lease_state):
 | 
| 282 | 470 | 
         try:
 | 
| ... | ... | @@ -296,6 +484,23 @@ class Scheduler: | 
| 296 | 484 | 
 | 
| 297 | 485 | 
     # --- Private API ---
 | 
| 298 | 486 | 
 | 
| 487 | 
+    def _delete_job(self, job_name):
 | 
|
| 488 | 
+        """Drops an entry from the internal list of jobs."""
 | 
|
| 489 | 
+        job = self.__jobs_by_name[job_name]
 | 
|
| 490 | 
+  | 
|
| 491 | 
+        del self.__jobs_by_action[job.action_digest.hash]
 | 
|
| 492 | 
+        del self.__jobs_by_name[job.name]
 | 
|
| 493 | 
+  | 
|
| 494 | 
+        if self._is_instrumented:
 | 
|
| 495 | 
+            self.__operations_by_stage[OperationStage.CACHE_CHECK].discard(job.name)
 | 
|
| 496 | 
+            self.__operations_by_stage[OperationStage.QUEUED].discard(job.name)
 | 
|
| 497 | 
+            self.__operations_by_stage[OperationStage.EXECUTING].discard(job.name)
 | 
|
| 498 | 
+            self.__operations_by_stage[OperationStage.COMPLETED].discard(job.name)
 | 
|
| 499 | 
+  | 
|
| 500 | 
+            self.__leases_by_state[LeaseState.PENDING].discard(job.name)
 | 
|
| 501 | 
+            self.__leases_by_state[LeaseState.ACTIVE].discard(job.name)
 | 
|
| 502 | 
+            self.__leases_by_state[LeaseState.COMPLETED].discard(job.name)
 | 
|
| 503 | 
+  | 
|
| 299 | 504 | 
     def _update_job_operation_stage(self, job_name, operation_stage):
 | 
| 300 | 505 | 
         """Requests a stage transition for the job's :class:Operations.
 | 
| 301 | 506 | 
 | 
| ... | ... | @@ -303,7 +508,7 @@ class Scheduler: | 
| 303 | 508 | 
             job_name (str): name of the job to query.
 | 
| 304 | 509 | 
             operation_stage (OperationStage): the stage to transition to.
 | 
| 305 | 510 | 
         """
 | 
| 306 | 
-        job = self.jobs[job_name]
 | 
|
| 511 | 
+        job = self.__jobs_by_name[job_name]
 | 
|
| 307 | 512 | 
 | 
| 308 | 513 | 
         if operation_stage == OperationStage.CACHE_CHECK:
 | 
| 309 | 514 | 
             job.update_operation_stage(OperationStage.CACHE_CHECK)
 | 
| ... | ... | @@ -352,7 +557,7 @@ class Scheduler: | 
| 352 | 557 | 
 | 
| 353 | 558 | 
                 self.__queue_time_average = average_order, average_time
 | 
| 354 | 559 | 
 | 
| 355 | 
-                if not job.holds_cached_action_result:
 | 
|
| 560 | 
+                if not job.holds_cached_result:
 | 
|
| 356 | 561 | 
                     execution_metadata = job.action_result.execution_metadata
 | 
| 357 | 562 | 
                     context_metadata = {'job-is': job.name}
 | 
| 358 | 563 | 
 | 
| ... | ... | @@ -182,3 +182,11 @@ texinfo_documents = [ | 
| 182 | 182 | 
      author, 'BuildGrid', 'One line description of project.',
 | 
| 183 | 183 | 
      'Miscellaneous'),
 | 
| 184 | 184 | 
 ]
 | 
| 185 | 
+  | 
|
| 186 | 
+# -- Options for the autodoc extension ----------------------------------------
 | 
|
| 187 | 
+  | 
|
| 188 | 
+# This value selects if automatically documented members are sorted
 | 
|
| 189 | 
+# alphabetical (value 'alphabetical'), by member type (value 'groupwise') or
 | 
|
| 190 | 
+# by source order (value 'bysource'). The default is alphabetical.
 | 
|
| 191 | 
+autodoc_member_order = 'bysource'
 | 
|
| 192 | 
+  | 
| ... | ... | @@ -25,7 +25,6 @@ import pytest | 
| 25 | 25 | 
 | 
| 26 | 26 | 
 from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 | 
| 27 | 27 | 
 from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
 | 
| 28 | 
-from buildgrid.server import job
 | 
|
| 29 | 28 | 
 from buildgrid.server.controller import ExecutionController
 | 
| 30 | 29 | 
 from buildgrid.server.job import LeaseState
 | 
| 31 | 30 | 
 from buildgrid.server.bots import service
 | 
| ... | ... | @@ -159,7 +158,8 @@ def test_post_bot_event_temp(context, instance): | 
| 159 | 158 | 
 def _inject_work(scheduler, action=None, action_digest=None):
 | 
| 160 | 159 | 
     if not action:
 | 
| 161 | 160 | 
         action = remote_execution_pb2.Action()
 | 
| 161 | 
+  | 
|
| 162 | 162 | 
     if not action_digest:
 | 
| 163 | 163 | 
         action_digest = remote_execution_pb2.Digest()
 | 
| 164 | 
-    j = job.Job(action, action_digest)
 | 
|
| 165 | 
-    scheduler.queue_job(j, True)
 | 
|
| 164 | 
+  | 
|
| 165 | 
+    scheduler.queue_job_operation(action, action_digest, skip_cache_lookup=True)
 | 
| ... | ... | @@ -20,11 +20,11 @@ | 
| 20 | 20 | 
 import uuid
 | 
| 21 | 21 | 
 from unittest import mock
 | 
| 22 | 22 | 
 | 
| 23 | 
-from google.protobuf import any_pb2
 | 
|
| 24 | 23 | 
 import grpc
 | 
| 25 | 24 | 
 from grpc._server import _Context
 | 
| 26 | 25 | 
 import pytest
 | 
| 27 | 26 | 
 | 
| 27 | 
+from buildgrid._enums import OperationStage
 | 
|
| 28 | 28 | 
 from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 | 
| 29 | 29 | 
 from buildgrid._protos.google.longrunning import operations_pb2
 | 
| 30 | 30 | 
 | 
| ... | ... | @@ -82,7 +82,7 @@ def test_execute(skip_cache_lookup, instance, context): | 
| 82 | 82 | 
     assert isinstance(result, operations_pb2.Operation)
 | 
| 83 | 83 | 
     metadata = remote_execution_pb2.ExecuteOperationMetadata()
 | 
| 84 | 84 | 
     result.metadata.Unpack(metadata)
 | 
| 85 | 
-    assert metadata.stage == job.OperationStage.QUEUED.value
 | 
|
| 85 | 
+    assert metadata.stage == OperationStage.QUEUED.value
 | 
|
| 86 | 86 | 
     operation_uuid = result.name.split('/')[-1]
 | 
| 87 | 87 | 
     assert uuid.UUID(operation_uuid, version=4)
 | 
| 88 | 88 | 
     assert result.done is False
 | 
| ... | ... | @@ -106,18 +106,14 @@ def test_no_action_digest_in_storage(instance, context): | 
| 106 | 106 | 
 | 
| 107 | 107 | 
 | 
| 108 | 108 | 
 def test_wait_execution(instance, controller, context):
 | 
| 109 | 
-    j = job.Job(action, action_digest)
 | 
|
| 110 | 
-    j._operation.done = True
 | 
|
| 109 | 
+    job_name = controller.execution_instance._scheduler.queue_job_operation(action,
 | 
|
| 110 | 
+                                                                            action_digest,
 | 
|
| 111 | 
+                                                                            skip_cache_lookup=True)
 | 
|
| 111 | 112 | 
 | 
| 112 | 
-    request = remote_execution_pb2.WaitExecutionRequest(name=j.name)
 | 
|
| 113 | 
+    controller.execution_instance._scheduler._update_job_operation_stage(job_name,
 | 
|
| 114 | 
+                                                                         OperationStage.COMPLETED)
 | 
|
| 113 | 115 | 
 | 
| 114 | 
-    controller.execution_instance._scheduler.jobs[j.name] = j
 | 
|
| 115 | 
-  | 
|
| 116 | 
-    action_result_any = any_pb2.Any()
 | 
|
| 117 | 
-    action_result = remote_execution_pb2.ActionResult()
 | 
|
| 118 | 
-    action_result_any.Pack(action_result)
 | 
|
| 119 | 
-  | 
|
| 120 | 
-    j.update_operation_stage(job.OperationStage.COMPLETED)
 | 
|
| 116 | 
+    request = remote_execution_pb2.WaitExecutionRequest(name=job_name)
 | 
|
| 121 | 117 | 
 | 
| 122 | 118 | 
     response = instance.WaitExecution(request, context)
 | 
| 123 | 119 | 
 | 
| ... | ... | @@ -127,7 +123,6 @@ def test_wait_execution(instance, controller, context): | 
| 127 | 123 | 
     metadata = remote_execution_pb2.ExecuteOperationMetadata()
 | 
| 128 | 124 | 
     result.metadata.Unpack(metadata)
 | 
| 129 | 125 | 
     assert metadata.stage == job.OperationStage.COMPLETED.value
 | 
| 130 | 
-    assert uuid.UUID(result.name, version=4)
 | 
|
| 131 | 126 | 
     assert result.done is True
 | 
| 132 | 127 | 
 | 
| 133 | 128 | 
 | 
| ... | ... | @@ -17,6 +17,7 @@ | 
| 17 | 17 | 
 | 
| 18 | 18 | 
 # pylint: disable=redefined-outer-name
 | 
| 19 | 19 | 
 | 
| 20 | 
+import queue
 | 
|
| 20 | 21 | 
 from unittest import mock
 | 
| 21 | 22 | 
 | 
| 22 | 23 | 
 from google.protobuf import any_pb2
 | 
| ... | ... | @@ -86,8 +87,13 @@ def blank_instance(controller): | 
| 86 | 87 | 
 | 
| 87 | 88 | 
 # Queue an execution, get operation corresponding to that request
 | 
| 88 | 89 | 
 def test_get_operation(instance, controller, execute_request, context):
 | 
| 89 | 
-    response_execute = controller.execution_instance.execute(execute_request.action_digest,
 | 
|
| 90 | 
-                                                             execute_request.skip_cache_lookup)
 | 
|
| 90 | 
+    job_name = controller.execution_instance.execute(execute_request.action_digest,
 | 
|
| 91 | 
+                                                     execute_request.skip_cache_lookup)
 | 
|
| 92 | 
+  | 
|
| 93 | 
+    message_queue = queue.Queue()
 | 
|
| 94 | 
+    operation_name = controller.execution_instance.register_operation_client(job_name,
 | 
|
| 95 | 
+                                                                             context.peer(),
 | 
|
| 96 | 
+                                                                             message_queue)
 | 
|
| 91 | 97 | 
 | 
| 92 | 98 | 
     request = operations_pb2.GetOperationRequest()
 | 
| 93 | 99 | 
 | 
| ... | ... | @@ -95,25 +101,28 @@ def test_get_operation(instance, controller, execute_request, context): | 
| 95 | 101 | 
     # we're manually creating the instance here, it doesn't get a name.
 | 
| 96 | 102 | 
     # Therefore we need to manually add the instance name to the operation
 | 
| 97 | 103 | 
     # name in the GetOperation request.
 | 
| 98 | 
-    request.name = "{}/{}".format(instance_name, response_execute.name)
 | 
|
| 104 | 
+    request.name = "{}/{}".format(instance_name, operation_name)
 | 
|
| 99 | 105 | 
 | 
| 100 | 106 | 
     response = instance.GetOperation(request, context)
 | 
| 101 | 
-    assert response.name == "{}/{}".format(instance_name, response_execute.name)
 | 
|
| 102 | 
-    assert response.done == response_execute.done
 | 
|
| 107 | 
+    assert response.name == "{}/{}".format(instance_name, operation_name)
 | 
|
| 103 | 108 | 
 | 
| 104 | 109 | 
 | 
| 105 | 110 | 
 # Queue an execution, get operation corresponding to that request
 | 
| 106 | 111 | 
 def test_get_operation_blank(blank_instance, controller, execute_request, context):
 | 
| 107 | 
-    response_execute = controller.execution_instance.execute(execute_request.action_digest,
 | 
|
| 108 | 
-                                                             execute_request.skip_cache_lookup)
 | 
|
| 112 | 
+    job_name = controller.execution_instance.execute(execute_request.action_digest,
 | 
|
| 113 | 
+                                                     execute_request.skip_cache_lookup)
 | 
|
| 114 | 
+  | 
|
| 115 | 
+    message_queue = queue.Queue()
 | 
|
| 116 | 
+    operation_name = controller.execution_instance.register_operation_client(job_name,
 | 
|
| 117 | 
+                                                                             context.peer(),
 | 
|
| 118 | 
+                                                                             message_queue)
 | 
|
| 109 | 119 | 
 | 
| 110 | 120 | 
     request = operations_pb2.GetOperationRequest()
 | 
| 111 | 121 | 
 | 
| 112 | 
-    request.name = response_execute.name
 | 
|
| 122 | 
+    request.name = operation_name
 | 
|
| 113 | 123 | 
 | 
| 114 | 124 | 
     response = blank_instance.GetOperation(request, context)
 | 
| 115 | 
-    assert response.name == response_execute.name
 | 
|
| 116 | 
-    assert response.done == response_execute.done
 | 
|
| 125 | 
+    assert response.name == operation_name
 | 
|
| 117 | 126 | 
 | 
| 118 | 127 | 
 | 
| 119 | 128 | 
 def test_get_operation_fail(instance, context):
 | 
| ... | ... | @@ -133,25 +142,35 @@ def test_get_operation_instance_fail(instance, context): | 
| 133 | 142 | 
 | 
| 134 | 143 | 
 | 
| 135 | 144 | 
 def test_list_operations(instance, controller, execute_request, context):
 | 
| 136 | 
-    response_execute = controller.execution_instance.execute(execute_request.action_digest,
 | 
|
| 137 | 
-                                                             execute_request.skip_cache_lookup)
 | 
|
| 145 | 
+    job_name = controller.execution_instance.execute(execute_request.action_digest,
 | 
|
| 146 | 
+                                                     execute_request.skip_cache_lookup)
 | 
|
| 147 | 
+  | 
|
| 148 | 
+    message_queue = queue.Queue()
 | 
|
| 149 | 
+    operation_name = controller.execution_instance.register_operation_client(job_name,
 | 
|
| 150 | 
+                                                                             context.peer(),
 | 
|
| 151 | 
+                                                                             message_queue)
 | 
|
| 138 | 152 | 
 | 
| 139 | 153 | 
     request = operations_pb2.ListOperationsRequest(name=instance_name)
 | 
| 140 | 154 | 
     response = instance.ListOperations(request, context)
 | 
| 141 | 155 | 
 | 
| 142 | 156 | 
     names = response.operations[0].name.split('/')
 | 
| 143 | 157 | 
     assert names[0] == instance_name
 | 
| 144 | 
-    assert names[1] == response_execute.name
 | 
|
| 158 | 
+    assert names[1] == operation_name
 | 
|
| 145 | 159 | 
 | 
| 146 | 160 | 
 | 
| 147 | 161 | 
 def test_list_operations_blank(blank_instance, controller, execute_request, context):
 | 
| 148 | 
-    response_execute = controller.execution_instance.execute(execute_request.action_digest,
 | 
|
| 149 | 
-                                                             execute_request.skip_cache_lookup)
 | 
|
| 162 | 
+    job_name = controller.execution_instance.execute(execute_request.action_digest,
 | 
|
| 163 | 
+                                                     execute_request.skip_cache_lookup)
 | 
|
| 164 | 
+  | 
|
| 165 | 
+    message_queue = queue.Queue()
 | 
|
| 166 | 
+    operation_name = controller.execution_instance.register_operation_client(job_name,
 | 
|
| 167 | 
+                                                                             context.peer(),
 | 
|
| 168 | 
+                                                                             message_queue)
 | 
|
| 150 | 169 | 
 | 
| 151 | 170 | 
     request = operations_pb2.ListOperationsRequest(name='')
 | 
| 152 | 171 | 
     response = blank_instance.ListOperations(request, context)
 | 
| 153 | 172 | 
 | 
| 154 | 
-    assert response.operations[0].name.split('/')[-1] == response_execute.name
 | 
|
| 173 | 
+    assert response.operations[0].name.split('/')[-1] == operation_name
 | 
|
| 155 | 174 | 
 | 
| 156 | 175 | 
 | 
| 157 | 176 | 
 def test_list_operations_instance_fail(instance, controller, execute_request, context):
 | 
| ... | ... | @@ -174,14 +193,19 @@ def test_list_operations_empty(instance, context): | 
| 174 | 193 | 
 | 
| 175 | 194 | 
 # Send execution off, delete, try to find operation should fail
 | 
| 176 | 195 | 
 def test_delete_operation(instance, controller, execute_request, context):
 | 
| 177 | 
-    response_execute = controller.execution_instance.execute(execute_request.action_digest,
 | 
|
| 178 | 
-                                                             execute_request.skip_cache_lookup)
 | 
|
| 196 | 
+    job_name = controller.execution_instance.execute(execute_request.action_digest,
 | 
|
| 197 | 
+                                                     execute_request.skip_cache_lookup)
 | 
|
| 198 | 
+  | 
|
| 199 | 
+    message_queue = queue.Queue()
 | 
|
| 200 | 
+    operation_name = controller.execution_instance.register_operation_client(job_name,
 | 
|
| 201 | 
+                                                                             context.peer(),
 | 
|
| 202 | 
+                                                                             message_queue)
 | 
|
| 179 | 203 | 
 | 
| 180 | 204 | 
     request = operations_pb2.DeleteOperationRequest()
 | 
| 181 | 
-    request.name = response_execute.name
 | 
|
| 205 | 
+    request.name = operation_name
 | 
|
| 182 | 206 | 
     instance.DeleteOperation(request, context)
 | 
| 183 | 207 | 
 | 
| 184 | 
-    request_name = "{}/{}".format(instance_name, response_execute.name)
 | 
|
| 208 | 
+    request_name = "{}/{}".format(instance_name, operation_name)
 | 
|
| 185 | 209 | 
 | 
| 186 | 210 | 
     with pytest.raises(InvalidArgumentError):
 | 
| 187 | 211 | 
         controller.operations_instance.get_operation(request_name)
 | 
| ... | ... | @@ -189,17 +213,11 @@ def test_delete_operation(instance, controller, execute_request, context): | 
| 189 | 213 | 
 | 
| 190 | 214 | 
 # Send execution off, delete, try to find operation should fail
 | 
| 191 | 215 | 
 def test_delete_operation_blank(blank_instance, controller, execute_request, context):
 | 
| 192 | 
-    response_execute = controller.execution_instance.execute(execute_request.action_digest,
 | 
|
| 193 | 
-                                                             execute_request.skip_cache_lookup)
 | 
|
| 194 | 
-  | 
|
| 195 | 216 | 
     request = operations_pb2.DeleteOperationRequest()
 | 
| 196 | 
-    request.name = response_execute.name
 | 
|
| 217 | 
+    request.name = "runner"
 | 
|
| 197 | 218 | 
     blank_instance.DeleteOperation(request, context)
 | 
| 198 | 219 | 
 | 
| 199 | 
-    request_name = response_execute.name
 | 
|
| 200 | 
-  | 
|
| 201 | 
-    with pytest.raises(InvalidArgumentError):
 | 
|
| 202 | 
-        controller.operations_instance.get_operation(request_name)
 | 
|
| 220 | 
+    context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
 | 
|
| 203 | 221 | 
 | 
| 204 | 222 | 
 | 
| 205 | 223 | 
 def test_delete_operation_fail(instance, context):
 | 
| ... | ... | @@ -211,11 +229,16 @@ def test_delete_operation_fail(instance, context): | 
| 211 | 229 | 
 | 
| 212 | 230 | 
 | 
| 213 | 231 | 
 def test_cancel_operation(instance, controller, execute_request, context):
 | 
| 214 | 
-    response_execute = controller.execution_instance.execute(execute_request.action_digest,
 | 
|
| 215 | 
-                                                             execute_request.skip_cache_lookup)
 | 
|
| 232 | 
+    job_name = controller.execution_instance.execute(execute_request.action_digest,
 | 
|
| 233 | 
+                                                     execute_request.skip_cache_lookup)
 | 
|
| 234 | 
+  | 
|
| 235 | 
+    message_queue = queue.Queue()
 | 
|
| 236 | 
+    operation_name = controller.execution_instance.register_operation_client(job_name,
 | 
|
| 237 | 
+                                                                             context.peer(),
 | 
|
| 238 | 
+                                                                             message_queue)
 | 
|
| 216 | 239 | 
 | 
| 217 | 240 | 
     request = operations_pb2.CancelOperationRequest()
 | 
| 218 | 
-    request.name = "{}/{}".format(instance_name, response_execute.name)
 | 
|
| 241 | 
+    request.name = "{}/{}".format(instance_name, operation_name)
 | 
|
| 219 | 242 | 
 | 
| 220 | 243 | 
     instance.CancelOperation(request, context)
 | 
| 221 | 244 | 
 | 
| ... | ... | @@ -238,7 +261,7 @@ def test_cancel_operation_blank(blank_instance, context): | 
| 238 | 261 | 
     context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
 | 
| 239 | 262 | 
 | 
| 240 | 263 | 
 | 
| 241 | 
-def test_cancel_operation_instance_fail(instance, context):
 | 
|
| 264 | 
+def test_cancel_operation__fail(instance, context):
 | 
|
| 242 | 265 | 
     request = operations_pb2.CancelOperationRequest()
 | 
| 243 | 266 | 
     instance.CancelOperation(request, context)
 | 
| 244 | 267 | 
 | 
