Martin Blanchard pushed to branch mablanch/117-job-scheduler-refactoring at BuildGrid / buildgrid
Commits:
- 
5e8eb6a9
by Laurence Urhegyi at 2018-10-15T18:39:59Z
- 
cee31b4a
by Laurence Urhegyi at 2018-10-15T18:55:20Z
- 
07d66e27
by Martin Blanchard at 2018-10-17T10:27:59Z
- 
d6865d83
by Martin Blanchard at 2018-10-17T10:27:59Z
- 
7793c2c7
by Martin Blanchard at 2018-10-17T10:27:59Z
- 
d6981855
by Martin Blanchard at 2018-10-17T10:28:09Z
- 
3933e904
by Martin Blanchard at 2018-10-17T10:34:35Z
11 changed files:
- CONTRIBUTING.rst
- buildgrid/_app/bots/buildbox.py
- buildgrid/_app/bots/host.py
- buildgrid/server/bots/instance.py
- buildgrid/server/execution/instance.py
- buildgrid/server/job.py
- buildgrid/server/operations/instance.py
- buildgrid/server/scheduler.py
- tests/integration/bots_service.py
- tests/integration/execution_service.py
- tests/integration/operations_service.py
Changes:
| ... | ... | @@ -17,9 +17,13 @@ discuss with us before submitting anything, as we may well have some important | 
| 17 | 17 |  context which will could help guide your efforts.
 | 
| 18 | 18 |  | 
| 19 | 19 |  Any major feature additions should be raised first as a proposal on the
 | 
| 20 | -`BuildGrid mailing list`_ to be discussed, and then eventually followed up with
 | |
| 21 | -an issue on GitLab. We recommend that you propose the feature in advance of
 | |
| 22 | -commencing work.
 | |
| 20 | +BuildGrid mailing list to be discussed between the core contributors. Once 
 | |
| 21 | +this discussion has taken place and there is agreement on how to proceed, 
 | |
| 22 | +it should be followed by with a Gitlab issue being raised which summarizes 
 | |
| 23 | +the tasks required.
 | |
| 24 | + | |
| 25 | +We strongly recommend that you propose the feature in advance of
 | |
| 26 | +commencing any work.
 | |
| 23 | 27 |  | 
| 24 | 28 |  The author of any patch is expected to take ownership of that code and is to
 | 
| 25 | 29 |  support it for a reasonable time-frame. This means addressing any unforeseen
 | 
| ... | ... | @@ -237,21 +241,49 @@ following goals: | 
| 237 | 241 |    for the viewer to digest.
 | 
| 238 | 242 |  - Ensure that we keep it simple and easy to contribute to the project.
 | 
| 239 | 243 |  | 
| 240 | -We are currenlty using the following GitLab features:
 | |
| 244 | +Explanation of how the project is currenlty using some GitLab features:
 | |
| 241 | 245 |  | 
| 242 | 246 |  - `Milestones`_: we have seen them used in the same way as `Epics`_ in other
 | 
| 243 | -  projects. BuildGrid milestones must be time-line based, can overlap and we can
 | |
| 244 | -  be working towards multiple milestones at any one time. They allow us to group
 | |
| 245 | -  together all sub tasks into an overall aim. See our `BuildGrid milestones`_.
 | |
| 246 | -- `Labels`_: allow us to filter tickets in useful ways. They do complexity and
 | |
| 247 | -  effort as they grow in number and usage, though, so the general approach is
 | |
| 248 | -  to have the minimum possible. See our `BuildGrid labels`_.
 | |
| 247 | +  projects and are trying not to do that here. Instead we are going to 
 | |
| 248 | +  use milestones to denote development cycles (ie, two week 'sprints'). See the
 | |
| 249 | +  `BuildGrid milestones`_.
 | |
| 250 | +- `Labels`_: allow us to filter tickets (ie, 'issues' in gitlab terminology)
 | |
| 251 | +  in useful ways. They add complexity and effort as they grow in number, so the
 | |
| 252 | +  general approach is to have the minimum possible but 
 | |
| 253 | +  ensure we use them consistently. See the `BuildGrid labels`_. 
 | |
| 249 | 254 |  - `Boards`_: allow us to visualise and manage issues and labels in a simple way.
 | 
| 250 | -  For now, we are only utilising one boards. Issues start life in the
 | |
| 251 | -  ``Backlog`` column by default, and we move them into ``ToDo`` when they are
 | |
| 252 | -  coming up in the next few weeks. ``Doing`` is only for when an item is
 | |
| 253 | -  currently being worked on. Moving an issue from column to column automatically
 | |
| 254 | -  adjust the tagged labels. See our `BuildGrid boards`_.
 | |
| 255 | +  Issues start life in the ``Backlog`` column by default, and we move them into
 | |
| 256 | +  ``ToDo`` when we aim to complete them in the current development cycle.
 | |
| 257 | +  ``Doing`` is only for when an item is currently being worked on. When on the
 | |
| 258 | +  Board view, dragging and dropping an issue from column to column automatically
 | |
| 259 | +  adjusts the relevant labels. See the `BuildGrid boards`_.
 | |
| 260 | +  
 | |
| 261 | +  
 | |
| 262 | +Guidelines for using GitLab features when working on this project: 
 | |
| 263 | +  
 | |
| 264 | +- When raising an issue, please:
 | |
| 265 | +   
 | |
| 266 | +  - check to see if there already is an issue to cover this task (if not then 
 | |
| 267 | +    raise a new one)
 | |
| 268 | +  - assign the appropriate label or labels (tip: the vast majority of issues 
 | |
| 269 | +    raised will be either an enhancement or a bug)
 | |
| 270 | +    
 | |
| 271 | +- If you plan to work on an issue, please:
 | |
| 272 | + | |
| 273 | +  - self-assign the ticket
 | |
| 274 | +  - ensure it's captured in the current sprint (ie, Gitlab milestone)
 | |
| 275 | +  - ensure the ticket is in the ``ToDo`` column of the board if you aim to 
 | |
| 276 | +    complete in the current sprint but aren't yet working on it, and
 | |
| 277 | +    the ``Doing`` column if you are working on it currently.
 | |
| 278 | + | |
| 279 | +- Please note that Gitlab issues are for either 'tasks' or 'bugs' - ie not for 
 | |
| 280 | +  long discussions (where the mailing list is a better choice) or for ranting, 
 | |
| 281 | +  for example.
 | |
| 282 | +  
 | |
| 283 | +The above may seem like a lot to take in, but please don't worry about getting 
 | |
| 284 | +it right the first few times. The worst that can happen is that you'll get a 
 | |
| 285 | +friendly message from a current contributor who explains the process. We welcome
 | |
| 286 | +and value all contributions to the project!  
 | |
| 255 | 287 |  | 
| 256 | 288 |  .. _Milestones: https://docs.gitlab.com/ee/user/project/milestones
 | 
| 257 | 289 |  .. _Epics: https://docs.gitlab.com/ee/user/group/epics
 | 
| ... | ... | @@ -17,8 +17,6 @@ import os | 
| 17 | 17 |  import subprocess
 | 
| 18 | 18 |  import tempfile
 | 
| 19 | 19 |  | 
| 20 | -from google.protobuf import any_pb2
 | |
| 21 | - | |
| 22 | 20 |  from buildgrid.client.cas import download, upload
 | 
| 23 | 21 |  from buildgrid._exceptions import BotError
 | 
| 24 | 22 |  from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 | 
| ... | ... | @@ -29,13 +27,14 @@ from buildgrid.utils import read_file, write_file | 
| 29 | 27 |  def work_buildbox(context, lease):
 | 
| 30 | 28 |      """Executes a lease for a build action, using buildbox.
 | 
| 31 | 29 |      """
 | 
| 32 | - | |
| 33 | 30 |      local_cas_directory = context.local_cas
 | 
| 34 | 31 |      # instance_name = context.parent
 | 
| 35 | 32 |      logger = context.logger
 | 
| 36 | 33 |  | 
| 37 | 34 |      action_digest = remote_execution_pb2.Digest()
 | 
| 35 | + | |
| 38 | 36 |      lease.payload.Unpack(action_digest)
 | 
| 37 | +    lease.result.Clear()
 | |
| 39 | 38 |  | 
| 40 | 39 |      with download(context.cas_channel) as downloader:
 | 
| 41 | 40 |          action = downloader.get_message(action_digest,
 | 
| ... | ... | @@ -131,10 +130,7 @@ def work_buildbox(context, lease): | 
| 131 | 130 |  | 
| 132 | 131 |              action_result.output_directories.extend([output_directory])
 | 
| 133 | 132 |  | 
| 134 | -            action_result_any = any_pb2.Any()
 | |
| 135 | -            action_result_any.Pack(action_result)
 | |
| 136 | - | |
| 137 | -            lease.result.CopyFrom(action_result_any)
 | |
| 133 | +            lease.result.Pack(action_result)
 | |
| 138 | 134 |  | 
| 139 | 135 |      return lease
 | 
| 140 | 136 |  | 
| ... | ... | @@ -17,8 +17,6 @@ import os | 
| 17 | 17 |  import subprocess
 | 
| 18 | 18 |  import tempfile
 | 
| 19 | 19 |  | 
| 20 | -from google.protobuf import any_pb2
 | |
| 21 | - | |
| 22 | 20 |  from buildgrid.client.cas import download, upload
 | 
| 23 | 21 |  from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 | 
| 24 | 22 |  from buildgrid.utils import output_file_maker, output_directory_maker
 | 
| ... | ... | @@ -27,12 +25,13 @@ from buildgrid.utils import output_file_maker, output_directory_maker | 
| 27 | 25 |  def work_host_tools(context, lease):
 | 
| 28 | 26 |      """Executes a lease for a build action, using host tools.
 | 
| 29 | 27 |      """
 | 
| 30 | - | |
| 31 | 28 |      instance_name = context.parent
 | 
| 32 | 29 |      logger = context.logger
 | 
| 33 | 30 |  | 
| 34 | 31 |      action_digest = remote_execution_pb2.Digest()
 | 
| 32 | + | |
| 35 | 33 |      lease.payload.Unpack(action_digest)
 | 
| 34 | +    lease.result.Clear()
 | |
| 36 | 35 |  | 
| 37 | 36 |      with tempfile.TemporaryDirectory() as temp_directory:
 | 
| 38 | 37 |          with download(context.cas_channel, instance=instance_name) as downloader:
 | 
| ... | ... | @@ -122,9 +121,6 @@ def work_host_tools(context, lease): | 
| 122 | 121 |  | 
| 123 | 122 |              action_result.output_directories.extend(output_directories)
 | 
| 124 | 123 |  | 
| 125 | -        action_result_any = any_pb2.Any()
 | |
| 126 | -        action_result_any.Pack(action_result)
 | |
| 127 | - | |
| 128 | -        lease.result.CopyFrom(action_result_any)
 | |
| 124 | +        lease.result.Pack(action_result)
 | |
| 129 | 125 |  | 
| 130 | 126 |      return lease | 
| ... | ... | @@ -109,7 +109,7 @@ class BotsInterface: | 
| 109 | 109 |          if server_state == LeaseState.PENDING:
 | 
| 110 | 110 |  | 
| 111 | 111 |              if client_state == LeaseState.ACTIVE:
 | 
| 112 | -                self._scheduler.update_job_lease_state(client_lease.id, client_lease.state)
 | |
| 112 | +                self._scheduler.update_job_lease_state(client_lease.id, client_state)
 | |
| 113 | 113 |              elif client_state == LeaseState.COMPLETED:
 | 
| 114 | 114 |                  # TODO: Lease was rejected
 | 
| 115 | 115 |                  raise NotImplementedError("'Not Accepted' is unsupported")
 | 
| ... | ... | @@ -122,8 +122,7 @@ class BotsInterface: | 
| 122 | 122 |                  pass
 | 
| 123 | 123 |  | 
| 124 | 124 |              elif client_state == LeaseState.COMPLETED:
 | 
| 125 | -                self._scheduler.update_job_lease_state(client_lease.id, client_lease.state)
 | |
| 126 | -                self._scheduler.job_complete(client_lease.id, client_lease.result, client_lease.status)
 | |
| 125 | +                self._scheduler.update_job_lease_state(client_lease.id, client_state, lease_status=client_lease.status, lease_result=client_lease.result)
 | |
| 127 | 126 |                  return None
 | 
| 128 | 127 |  | 
| 129 | 128 |              else:
 | 
| ... | ... | @@ -51,9 +51,9 @@ class ExecutionInstance: | 
| 51 | 51 |          job = Job(action_digest, action.do_not_cache, message_queue)
 | 
| 52 | 52 |          self.logger.info("Operation name: [{}]".format(job.name))
 | 
| 53 | 53 |  | 
| 54 | -        self._scheduler.append_job(job, skip_cache_lookup)
 | |
| 54 | +        self._scheduler.queue_job(job, skip_cache_lookup)
 | |
| 55 | 55 |  | 
| 56 | -        return job.get_operation()
 | |
| 56 | +        return job.operation
 | |
| 57 | 57 |  | 
| 58 | 58 |      def register_message_client(self, name, queue):
 | 
| 59 | 59 |          try:
 | 
| ... | ... | @@ -19,57 +19,33 @@ import logging | 
| 19 | 19 |  import uuid
 | 
| 20 | 20 |  from enum import Enum
 | 
| 21 | 21 |  | 
| 22 | -from google.protobuf import any_pb2
 | |
| 23 | - | |
| 24 | 22 |  from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 | 
| 25 | 23 |  from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
 | 
| 26 | 24 |  from buildgrid._protos.google.longrunning import operations_pb2
 | 
| 27 | 25 |  | 
| 28 | 26 |  | 
| 29 | -class ExecuteStage(Enum):
 | |
| 27 | +class OperationStage(Enum):
 | |
| 28 | +    # Initially unknown stage.
 | |
| 30 | 29 |      UNKNOWN = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('UNKNOWN')
 | 
| 31 | - | |
| 32 | 30 |      # Checking the result against the cache.
 | 
| 33 | 31 |      CACHE_CHECK = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('CACHE_CHECK')
 | 
| 34 | - | |
| 35 | 32 |      # Currently idle, awaiting a free machine to execute.
 | 
| 36 | 33 |      QUEUED = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('QUEUED')
 | 
| 37 | - | |
| 38 | 34 |      # Currently being executed by a worker.
 | 
| 39 | 35 |      EXECUTING = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('EXECUTING')
 | 
| 40 | - | |
| 41 | 36 |      # Finished execution.
 | 
| 42 | 37 |      COMPLETED = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('COMPLETED')
 | 
| 43 | 38 |  | 
| 44 | 39 |  | 
| 45 | -class BotStatus(Enum):
 | |
| 46 | -    BOT_STATUS_UNSPECIFIED = bots_pb2.BotStatus.Value('BOT_STATUS_UNSPECIFIED')
 | |
| 47 | - | |
| 48 | -    # The bot is healthy, and will accept leases as normal.
 | |
| 49 | -    OK = bots_pb2.BotStatus.Value('OK')
 | |
| 50 | - | |
| 51 | -    # The bot is unhealthy and will not accept new leases.
 | |
| 52 | -    UNHEALTHY = bots_pb2.BotStatus.Value('UNHEALTHY')
 | |
| 53 | - | |
| 54 | -    # The bot has been asked to reboot the host.
 | |
| 55 | -    HOST_REBOOTING = bots_pb2.BotStatus.Value('HOST_REBOOTING')
 | |
| 56 | - | |
| 57 | -    # The bot has been asked to shut down.
 | |
| 58 | -    BOT_TERMINATING = bots_pb2.BotStatus.Value('BOT_TERMINATING')
 | |
| 59 | - | |
| 60 | - | |
| 61 | 40 |  class LeaseState(Enum):
 | 
| 41 | +    # Initially unknown state.
 | |
| 62 | 42 |      LEASE_STATE_UNSPECIFIED = bots_pb2.LeaseState.Value('LEASE_STATE_UNSPECIFIED')
 | 
| 63 | - | |
| 64 | 43 |      # The server expects the bot to accept this lease.
 | 
| 65 | 44 |      PENDING = bots_pb2.LeaseState.Value('PENDING')
 | 
| 66 | - | |
| 67 | 45 |      # The bot has accepted this lease.
 | 
| 68 | 46 |      ACTIVE = bots_pb2.LeaseState.Value('ACTIVE')
 | 
| 69 | - | |
| 70 | 47 |      # The bot is no longer leased.
 | 
| 71 | 48 |      COMPLETED = bots_pb2.LeaseState.Value('COMPLETED')
 | 
| 72 | - | |
| 73 | 49 |      # The bot should immediately release all resources associated with the lease.
 | 
| 74 | 50 |      CANCELLED = bots_pb2.LeaseState.Value('CANCELLED')
 | 
| 75 | 51 |  | 
| ... | ... | @@ -77,18 +53,22 @@ class LeaseState(Enum): | 
| 77 | 53 |  class Job:
 | 
| 78 | 54 |  | 
| 79 | 55 |      def __init__(self, action_digest, do_not_cache=False, message_queue=None):
 | 
| 80 | -        self.lease = None
 | |
| 81 | 56 |          self.logger = logging.getLogger(__name__)
 | 
| 82 | -        self.n_tries = 0
 | |
| 83 | -        self.result = None
 | |
| 84 | -        self.result_cached = False
 | |
| 85 | 57 |  | 
| 86 | -        self._action_digest = action_digest
 | |
| 87 | -        self._do_not_cache = do_not_cache
 | |
| 88 | -        self._execute_stage = ExecuteStage.UNKNOWN
 | |
| 89 | 58 |          self._name = str(uuid.uuid4())
 | 
| 90 | -        self._operation = operations_pb2.Operation(name=self._name)
 | |
| 59 | +        self._do_not_cache = do_not_cache
 | |
| 60 | +        self._operation = operations_pb2.Operation()
 | |
| 61 | +        self._lease = None
 | |
| 62 | +        self._n_tries = 0
 | |
| 63 | + | |
| 64 | +        self.__operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
 | |
| 65 | + | |
| 66 | +        self.__operation_metadata.action_digest.CopyFrom(action_digest)
 | |
| 67 | +        self.__operation_metadata.stage = OperationStage.UNKNOWN.value
 | |
| 68 | + | |
| 91 | 69 |          self._operation_update_queues = []
 | 
| 70 | +        self._operation.name = self._name
 | |
| 71 | +        self._operation.done = False
 | |
| 92 | 72 |  | 
| 93 | 73 |          if message_queue is not None:
 | 
| 94 | 74 |              self.register_client(message_queue)
 | 
| ... | ... | @@ -99,63 +79,75 @@ class Job: | 
| 99 | 79 |  | 
| 100 | 80 |      @property
 | 
| 101 | 81 |      def action_digest(self):
 | 
| 102 | -        return self._action_digest
 | |
| 82 | +        return self.__operation_metadata.action_digest
 | |
| 103 | 83 |  | 
| 104 | 84 |      @property
 | 
| 105 | 85 |      def do_not_cache(self):
 | 
| 106 | 86 |          return self._do_not_cache
 | 
| 107 | 87 |  | 
| 108 | -    def check_job_finished(self):
 | |
| 109 | -        if not self._operation_update_queues:
 | |
| 110 | -            return self._operation.done
 | |
| 111 | -        return False
 | |
| 88 | +    @property
 | |
| 89 | +    def operation(self):
 | |
| 90 | +        return self._operation
 | |
| 91 | + | |
| 92 | +    @property
 | |
| 93 | +    def operation_stage(self):
 | |
| 94 | +        return OperationStage(self.__operation_metadata.state)
 | |
| 95 | + | |
| 96 | +    @property
 | |
| 97 | +    def lease(self):
 | |
| 98 | +        return self._lease
 | |
| 99 | + | |
| 100 | +    @property
 | |
| 101 | +    def lease_state(self):
 | |
| 102 | +        if self._lease is not None:
 | |
| 103 | +            return LeaseState(self._lease.state)
 | |
| 104 | +        else:
 | |
| 105 | +            return None
 | |
| 106 | + | |
| 107 | +    @property
 | |
| 108 | +    def n_tries(self):
 | |
| 109 | +        return self._n_tries
 | |
| 112 | 110 |  | 
| 113 | 111 |      def register_client(self, queue):
 | 
| 114 | 112 |          self._operation_update_queues.append(queue)
 | 
| 115 | -        queue.put(self.get_operation())
 | |
| 113 | +        queue.put(self._operation)
 | |
| 116 | 114 |  | 
| 117 | 115 |      def unregister_client(self, queue):
 | 
| 118 | 116 |          self._operation_update_queues.remove(queue)
 | 
| 119 | 117 |  | 
| 120 | -    def get_operation(self):
 | |
| 121 | -        self._operation.metadata.CopyFrom(self._pack_any(self.get_operation_meta()))
 | |
| 122 | -        if self.result is not None:
 | |
| 123 | -            self._operation.done = True
 | |
| 124 | -            response = remote_execution_pb2.ExecuteResponse(result=self.result,
 | |
| 125 | -                                                            cached_result=self.result_cached)
 | |
| 118 | +    def create_lease(self):
 | |
| 119 | +        self._lease = bots_pb2.Lease()
 | |
| 120 | +        self._lease.id = self._name
 | |
| 121 | +        self._lease.payload.Pack(self.__operation_metadata.action_digest)
 | |
| 122 | +        self._lease.state = LeaseState.PENDING.value
 | |
| 126 | 123 |  | 
| 127 | -            if not self.result_cached:
 | |
| 128 | -                response.status.CopyFrom(self.lease.status)
 | |
| 124 | +        return self._lease
 | |
| 129 | 125 |  | 
| 130 | -            self._operation.response.CopyFrom(self._pack_any(response))
 | |
| 126 | +    def update_lease_state(self, state, status=None, result=None):
 | |
| 127 | +        if state.value == self._lease.state:
 | |
| 128 | +            return
 | |
| 131 | 129 |  | 
| 132 | -        return self._operation
 | |
| 130 | +        self._lease.state = state.value
 | |
| 133 | 131 |  | 
| 134 | -    def get_operation_meta(self):
 | |
| 135 | -        meta = remote_execution_pb2.ExecuteOperationMetadata()
 | |
| 136 | -        meta.stage = self._execute_stage.value
 | |
| 137 | -        meta.action_digest.CopyFrom(self._action_digest)
 | |
| 132 | +        if self._lease.state == LeaseState.COMPLETED.value:
 | |
| 133 | +            response = remote_execution_pb2.ExecuteResponse()
 | |
| 134 | +            response.result.CopyFrom(result)
 | |
| 135 | +            response.cached_result = False
 | |
| 136 | +            response.status.CopyFrom(status)
 | |
| 138 | 137 |  | 
| 139 | -        return meta
 | |
| 138 | +            self._operation.response.Pack(response)
 | |
| 139 | +            self._operation.done = True
 | |
| 140 | 140 |  | 
| 141 | -    def create_lease(self):
 | |
| 142 | -        action_digest = self._pack_any(self._action_digest)
 | |
| 141 | +    def update_operation_stage(self, stage):
 | |
| 142 | +        if stage.value == self.__operation_metadata.stage:
 | |
| 143 | +            return
 | |
| 143 | 144 |  | 
| 144 | -        lease = bots_pb2.Lease(id=self.name,
 | |
| 145 | -                               payload=action_digest,
 | |
| 146 | -                               state=LeaseState.PENDING.value)
 | |
| 147 | -        self.lease = lease
 | |
| 148 | -        return lease
 | |
| 145 | +        self.__operation_metadata.stage = stage.value
 | |
| 149 | 146 |  | 
| 150 | -    def get_operations(self):
 | |
| 151 | -        return operations_pb2.ListOperationsResponse(operations=[self.get_operation()])
 | |
| 147 | +        if self.__operation_metadata.stage == OperationStage.QUEUED.value:
 | |
| 148 | +            self._n_tries += 1
 | |
| 152 | 149 |  | 
| 153 | -    def update_execute_stage(self, stage):
 | |
| 154 | -        self._execute_stage = stage
 | |
| 155 | -        for queue in self._operation_update_queues:
 | |
| 156 | -            queue.put(self.get_operation())
 | |
| 150 | +        self._operation.metadata.Pack(self.__operation_metadata)
 | |
| 157 | 151 |  | 
| 158 | -    def _pack_any(self, pack):
 | |
| 159 | -        some_any = any_pb2.Any()
 | |
| 160 | -        some_any.Pack(pack)
 | |
| 161 | -        return some_any | |
| 152 | +        for queue in self._operation_update_queues:
 | |
| 153 | +            queue.put(self._operation) | 
| ... | ... | @@ -22,6 +22,7 @@ An instance of the LongRunningOperations Service. | 
| 22 | 22 |  import logging
 | 
| 23 | 23 |  | 
| 24 | 24 |  from buildgrid._exceptions import InvalidArgumentError
 | 
| 25 | +from buildgrid._protos.google.longrunning import operations_pb2
 | |
| 25 | 26 |  | 
| 26 | 27 |  | 
| 27 | 28 |  class OperationsInstance:
 | 
| ... | ... | @@ -45,7 +46,10 @@ class OperationsInstance: | 
| 45 | 46 |      def list_operations(self, list_filter, page_size, page_token):
 | 
| 46 | 47 |          # TODO: Pages
 | 
| 47 | 48 |          # Spec says number of pages and length of a page are optional
 | 
| 48 | -        return self._scheduler.get_operations()
 | |
| 49 | +        response = operations_pb2.ListOperationsResponse()
 | |
| 50 | +        response.operations.extend(self._scheduler.list_operations())
 | |
| 51 | + | |
| 52 | +        return response
 | |
| 49 | 53 |  | 
| 50 | 54 |      def delete_operation(self, name):
 | 
| 51 | 55 |          try:
 | 
| ... | ... | @@ -25,9 +25,8 @@ from collections import deque | 
| 25 | 25 |  | 
| 26 | 26 |  from buildgrid._exceptions import NotFoundError
 | 
| 27 | 27 |  from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 | 
| 28 | -from buildgrid._protos.google.longrunning import operations_pb2
 | |
| 29 | 28 |  | 
| 30 | -from .job import ExecuteStage, LeaseState
 | |
| 29 | +from .job import OperationStage, LeaseState
 | |
| 31 | 30 |  | 
| 32 | 31 |  | 
| 33 | 32 |  class Scheduler:
 | 
| ... | ... | @@ -39,80 +38,70 @@ class Scheduler: | 
| 39 | 38 |          self.jobs = {}
 | 
| 40 | 39 |          self.queue = deque()
 | 
| 41 | 40 |  | 
| 42 | -    def register_client(self, name, queue):
 | |
| 43 | -        self.jobs[name].register_client(queue)
 | |
| 41 | +    def register_client(self, job_name, queue):
 | |
| 42 | +        self.jobs[job_name].register_client(queue)
 | |
| 44 | 43 |  | 
| 45 | -    def unregister_client(self, name, queue):
 | |
| 46 | -        job = self.jobs[name]
 | |
| 47 | -        job.unregister_client(queue)
 | |
| 48 | -        if job.check_job_finished():
 | |
| 49 | -            del self.jobs[name]
 | |
| 44 | +    def unregister_client(self, job_name, queue):
 | |
| 45 | +        self.jobs[job_name].unregister_client(queue)
 | |
| 50 | 46 |  | 
| 51 | -    def append_job(self, job, skip_cache_lookup=False):
 | |
| 47 | +        if self.jobs[job_name].operation.done:
 | |
| 48 | +            del self.jobs[job_name]
 | |
| 49 | + | |
| 50 | +    def queue_job(self, job, skip_cache_lookup=False):
 | |
| 52 | 51 |          self.jobs[job.name] = job
 | 
| 52 | + | |
| 53 | 53 |          if self._action_cache is not None and not skip_cache_lookup:
 | 
| 54 | +            job.update_operation_stage(OperationStage.CACHE_CHECK)
 | |
| 55 | + | |
| 54 | 56 |              try:
 | 
| 55 | 57 |                  cached_result = self._action_cache.get_action_result(job.action_digest)
 | 
| 56 | 58 |              except NotFoundError:
 | 
| 57 | 59 |                  self.queue.append(job)
 | 
| 58 | -                job.update_execute_stage(ExecuteStage.QUEUED)
 | |
| 59 | - | |
| 60 | +                job.update_operation_stage(OperationStage.QUEUED)
 | |
| 60 | 61 |              else:
 | 
| 61 | -                job.result = cached_result
 | |
| 62 | -                job.result_cached = True
 | |
| 63 | -                job.update_execute_stage(ExecuteStage.COMPLETED)
 | |
| 62 | +                job.update_operation_stage(OperationStage.COMPLETED)
 | |
| 64 | 63 |  | 
| 65 | 64 |          else:
 | 
| 66 | 65 |              self.queue.append(job)
 | 
| 67 | -            job.update_execute_stage(ExecuteStage.QUEUED)
 | |
| 66 | +            job.update_operation_stage(OperationStage.QUEUED)
 | |
| 68 | 67 |  | 
| 69 | -    def retry_job(self, name):
 | |
| 70 | -        if name in self.jobs:
 | |
| 71 | -            job = self.jobs[name]
 | |
| 68 | +    def retry_job(self, job_name):
 | |
| 69 | +        if job_name in self.jobs:
 | |
| 70 | +            job = self.jobs[job_name]
 | |
| 72 | 71 |              if job.n_tries >= self.MAX_N_TRIES:
 | 
| 73 | 72 |                  # TODO: Decide what to do with these jobs
 | 
| 74 | -                job.update_execute_stage(ExecuteStage.COMPLETED)
 | |
| 73 | +                job.update_operation_stage(OperationStage.COMPLETED)
 | |
| 75 | 74 |                  # TODO: Mark these jobs as done
 | 
| 76 | 75 |              else:
 | 
| 77 | -                job.update_execute_stage(ExecuteStage.QUEUED)
 | |
| 78 | -                job.n_tries += 1
 | |
| 76 | +                job.update_operation_stage(OperationStage.QUEUED)
 | |
| 79 | 77 |                  self.queue.appendleft(job)
 | 
| 80 | 78 |  | 
| 81 | -    def job_complete(self, name, result, status):
 | |
| 82 | -        job = self.jobs[name]
 | |
| 83 | -        job.lease.status.CopyFrom(status)
 | |
| 84 | -        action_result = remote_execution_pb2.ActionResult()
 | |
| 85 | -        result.Unpack(action_result)
 | |
| 86 | -        job.result = action_result
 | |
| 87 | -        if not job.do_not_cache and self._action_cache is not None:
 | |
| 88 | -            if not job.lease.status.code:
 | |
| 89 | -                self._action_cache.update_action_result(job.action_digest, action_result)
 | |
| 90 | -        job.update_execute_stage(ExecuteStage.COMPLETED)
 | |
| 91 | - | |
| 92 | -    def get_operations(self):
 | |
| 93 | -        response = operations_pb2.ListOperationsResponse()
 | |
| 94 | -        for v in self.jobs.values():
 | |
| 95 | -            response.operations.extend([v.get_operation()])
 | |
| 96 | -        return response
 | |
| 97 | - | |
| 98 | -    def update_job_lease_state(self, name, state):
 | |
| 99 | -        job = self.jobs[name]
 | |
| 100 | -        job.lease.state = state
 | |
| 79 | +    def list_operations(self):
 | |
| 80 | +        return [job.operation for job in self.jobs.values()]
 | |
| 81 | + | |
| 82 | +    def update_job_lease_state(self, job_name, lease_state, lease_status=None, lease_result=None):
 | |
| 83 | +        job = self.jobs[job_name]
 | |
| 84 | +        if lease_state != LeaseState.COMPLETED:
 | |
| 85 | +            job.update_lease_state(lease_state)
 | |
| 86 | +        else:
 | |
| 87 | +            action_result = remote_execution_pb2.ActionResult()
 | |
| 88 | +            lease_result.Unpack(action_result)
 | |
| 89 | + | |
| 90 | +            job.update_lease_state(lease_state, status=lease_status, result=action_result)
 | |
| 91 | + | |
| 92 | +            if not job.do_not_cache and self._action_cache is not None:
 | |
| 93 | +                if not job.lease.status.code:
 | |
| 94 | +                    self._action_cache.update_action_result(job.action_digest, action_result)
 | |
| 95 | + | |
| 96 | +            job.update_operation_stage(OperationStage.COMPLETED)
 | |
| 101 | 97 |  | 
| 102 | 98 |      def get_job_lease(self, name):
 | 
| 103 | 99 |          return self.jobs[name].lease
 | 
| 104 | 100 |  | 
| 105 | -    def cancel_session(self, name):
 | |
| 106 | -        job = self.jobs[name]
 | |
| 107 | -        state = job.lease.state
 | |
| 108 | -        if state in (LeaseState.PENDING.value, LeaseState.ACTIVE.value):
 | |
| 109 | -            self.retry_job(name)
 | |
| 110 | - | |
| 111 | 101 |      def create_lease(self):
 | 
| 112 | 102 |          if self.queue:
 | 
| 113 | 103 |              job = self.queue.popleft()
 | 
| 114 | -            job.update_execute_stage(ExecuteStage.EXECUTING)
 | |
| 104 | +            job.update_operation_stage(OperationStage.EXECUTING)
 | |
| 115 | 105 |              job.create_lease()
 | 
| 116 | -            job.lease.state = LeaseState.PENDING.value
 | |
| 117 | 106 |              return job.lease
 | 
| 118 | 107 |          return None | 
| ... | ... | @@ -283,4 +283,4 @@ def _inject_work(scheduler, action_digest=None): | 
| 283 | 283 |      if not action_digest:
 | 
| 284 | 284 |          action_digest = remote_execution_pb2.Digest()
 | 
| 285 | 285 |      j = job.Job(action_digest, False)
 | 
| 286 | -    scheduler.append_job(j, True) | |
| 286 | +    scheduler.queue_job(j, True) | 
| ... | ... | @@ -82,7 +82,7 @@ def test_execute(skip_cache_lookup, instance, context): | 
| 82 | 82 |      assert isinstance(result, operations_pb2.Operation)
 | 
| 83 | 83 |      metadata = remote_execution_pb2.ExecuteOperationMetadata()
 | 
| 84 | 84 |      result.metadata.Unpack(metadata)
 | 
| 85 | -    assert metadata.stage == job.ExecuteStage.QUEUED.value
 | |
| 85 | +    assert metadata.stage == job.OperationStage.QUEUED.value
 | |
| 86 | 86 |      assert uuid.UUID(result.name, version=4)
 | 
| 87 | 87 |      assert result.done is False
 | 
| 88 | 88 |  | 
| ... | ... | @@ -116,7 +116,7 @@ def test_wait_execution(instance, controller, context): | 
| 116 | 116 |      action_result = remote_execution_pb2.ActionResult()
 | 
| 117 | 117 |      action_result_any.Pack(action_result)
 | 
| 118 | 118 |  | 
| 119 | -    j.update_execute_stage(job.ExecuteStage.COMPLETED)
 | |
| 119 | +    j.update_operation_stage(job.OperationStage.COMPLETED)
 | |
| 120 | 120 |  | 
| 121 | 121 |      response = instance.WaitExecution(request, context)
 | 
| 122 | 122 |  | 
| ... | ... | @@ -125,7 +125,7 @@ def test_wait_execution(instance, controller, context): | 
| 125 | 125 |      assert isinstance(result, operations_pb2.Operation)
 | 
| 126 | 126 |      metadata = remote_execution_pb2.ExecuteOperationMetadata()
 | 
| 127 | 127 |      result.metadata.Unpack(metadata)
 | 
| 128 | -    assert metadata.stage == job.ExecuteStage.COMPLETED.value
 | |
| 128 | +    assert metadata.stage == job.OperationStage.COMPLETED.value
 | |
| 129 | 129 |      assert uuid.UUID(result.name, version=4)
 | 
| 130 | 130 |      assert result.done is True
 | 
| 131 | 131 |  | 
| ... | ... | @@ -30,6 +30,7 @@ from buildgrid._protos.google.longrunning import operations_pb2 | 
| 30 | 30 |  from buildgrid._protos.google.rpc import status_pb2
 | 
| 31 | 31 |  from buildgrid.server.cas.storage import lru_memory_cache
 | 
| 32 | 32 |  from buildgrid.server.controller import ExecutionController
 | 
| 33 | +from buildgrid.server.job import OperationStage
 | |
| 33 | 34 |  from buildgrid.server.operations import service
 | 
| 34 | 35 |  from buildgrid.server.operations.service import OperationsService
 | 
| 35 | 36 |  from buildgrid.utils import create_digest
 | 
| ... | ... | @@ -131,9 +132,10 @@ def test_list_operations_with_result(instance, controller, execute_request, cont | 
| 131 | 132 |      action_result.output_files.extend([output_file])
 | 
| 132 | 133 |  | 
| 133 | 134 |      controller.operations_instance._scheduler.jobs[response_execute.name].create_lease()
 | 
| 134 | -    controller.operations_instance._scheduler.job_complete(response_execute.name,
 | |
| 135 | -                                                           _pack_any(action_result),
 | |
| 136 | -                                                           status_pb2.Status())
 | |
| 135 | +    controller.operations_instance._scheduler.update_job_lease_state(response_execute.name,
 | |
| 136 | +                                                                     OperationStage.COMPLETED,
 | |
| 137 | +                                                                     lease_status=status_pb2.Status(),
 | |
| 138 | +                                                                     lease_result=_pack_any(action_result))
 | |
| 137 | 139 |  | 
| 138 | 140 |      request = operations_pb2.ListOperationsRequest(name=instance_name)
 | 
| 139 | 141 |      response = instance.ListOperations(request, context)
 | 
