Martin Blanchard pushed to branch mablanch/117-job-scheduler-refactoring at BuildGrid / buildgrid
Commits:
- 
76b8a6a0
by Martin Blanchard at 2018-10-22T08:58:37Z
- 
86766852
by Martin Blanchard at 2018-10-22T08:58:47Z
- 
c7e0326b
by Martin Blanchard at 2018-10-22T08:58:51Z
- 
390ca144
by Martin Blanchard at 2018-10-22T08:58:54Z
- 
f3170977
by Martin Blanchard at 2018-10-22T08:58:57Z
- 
efb086e7
by Martin Blanchard at 2018-10-22T08:59:02Z
- 
2c006309
by Martin Blanchard at 2018-10-22T08:59:05Z
11 changed files:
- buildgrid/_app/bots/buildbox.py
- buildgrid/_app/bots/dummy.py
- buildgrid/_app/bots/host.py
- buildgrid/server/bots/instance.py
- buildgrid/server/execution/instance.py
- buildgrid/server/job.py
- buildgrid/server/operations/instance.py
- buildgrid/server/scheduler.py
- tests/integration/bots_service.py
- tests/integration/execution_service.py
- tests/integration/operations_service.py
Changes:
| ... | ... | @@ -17,8 +17,6 @@ import os | 
| 17 | 17 |  import subprocess
 | 
| 18 | 18 |  import tempfile
 | 
| 19 | 19 |  | 
| 20 | -from google.protobuf import any_pb2
 | |
| 21 | - | |
| 22 | 20 |  from buildgrid.client.cas import download, upload
 | 
| 23 | 21 |  from buildgrid._exceptions import BotError
 | 
| 24 | 22 |  from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 | 
| ... | ... | @@ -29,13 +27,14 @@ from buildgrid.utils import read_file, write_file | 
| 29 | 27 |  def work_buildbox(context, lease):
 | 
| 30 | 28 |      """Executes a lease for a build action, using buildbox.
 | 
| 31 | 29 |      """
 | 
| 32 | - | |
| 33 | 30 |      local_cas_directory = context.local_cas
 | 
| 34 | 31 |      # instance_name = context.parent
 | 
| 35 | 32 |      logger = context.logger
 | 
| 36 | 33 |  | 
| 37 | 34 |      action_digest = remote_execution_pb2.Digest()
 | 
| 35 | + | |
| 38 | 36 |      lease.payload.Unpack(action_digest)
 | 
| 37 | +    lease.result.Clear()
 | |
| 39 | 38 |  | 
| 40 | 39 |      with download(context.cas_channel) as downloader:
 | 
| 41 | 40 |          action = downloader.get_message(action_digest,
 | 
| ... | ... | @@ -131,10 +130,7 @@ def work_buildbox(context, lease): | 
| 131 | 130 |  | 
| 132 | 131 |              action_result.output_directories.extend([output_directory])
 | 
| 133 | 132 |  | 
| 134 | -            action_result_any = any_pb2.Any()
 | |
| 135 | -            action_result_any.Pack(action_result)
 | |
| 136 | - | |
| 137 | -            lease.result.CopyFrom(action_result_any)
 | |
| 133 | +            lease.result.Pack(action_result)
 | |
| 138 | 134 |  | 
| 139 | 135 |      return lease
 | 
| 140 | 136 |  | 
| ... | ... | @@ -16,9 +16,18 @@ | 
| 16 | 16 |  import random
 | 
| 17 | 17 |  import time
 | 
| 18 | 18 |  | 
| 19 | +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 | |
| 20 | + | |
| 19 | 21 |  | 
| 20 | 22 |  def work_dummy(context, lease):
 | 
| 21 | 23 |      """ Just returns lease after some random time
 | 
| 22 | 24 |      """
 | 
| 25 | +    lease.result.Clear()
 | |
| 26 | + | |
| 23 | 27 |      time.sleep(random.randint(1, 5))
 | 
| 28 | + | |
| 29 | +    action_result = remote_execution_pb2.ActionResult()
 | |
| 30 | + | |
| 31 | +    lease.result.Pack(action_result)
 | |
| 32 | + | |
| 24 | 33 |      return lease | 
| ... | ... | @@ -17,8 +17,6 @@ import os | 
| 17 | 17 |  import subprocess
 | 
| 18 | 18 |  import tempfile
 | 
| 19 | 19 |  | 
| 20 | -from google.protobuf import any_pb2
 | |
| 21 | - | |
| 22 | 20 |  from buildgrid.client.cas import download, upload
 | 
| 23 | 21 |  from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 | 
| 24 | 22 |  from buildgrid.utils import output_file_maker, output_directory_maker
 | 
| ... | ... | @@ -27,12 +25,13 @@ from buildgrid.utils import output_file_maker, output_directory_maker | 
| 27 | 25 |  def work_host_tools(context, lease):
 | 
| 28 | 26 |      """Executes a lease for a build action, using host tools.
 | 
| 29 | 27 |      """
 | 
| 30 | - | |
| 31 | 28 |      instance_name = context.parent
 | 
| 32 | 29 |      logger = context.logger
 | 
| 33 | 30 |  | 
| 34 | 31 |      action_digest = remote_execution_pb2.Digest()
 | 
| 32 | + | |
| 35 | 33 |      lease.payload.Unpack(action_digest)
 | 
| 34 | +    lease.result.Clear()
 | |
| 36 | 35 |  | 
| 37 | 36 |      with tempfile.TemporaryDirectory() as temp_directory:
 | 
| 38 | 37 |          with download(context.cas_channel, instance=instance_name) as downloader:
 | 
| ... | ... | @@ -122,9 +121,6 @@ def work_host_tools(context, lease): | 
| 122 | 121 |  | 
| 123 | 122 |              action_result.output_directories.extend(output_directories)
 | 
| 124 | 123 |  | 
| 125 | -        action_result_any = any_pb2.Any()
 | |
| 126 | -        action_result_any.Pack(action_result)
 | |
| 127 | - | |
| 128 | -        lease.result.CopyFrom(action_result_any)
 | |
| 124 | +        lease.result.Pack(action_result)
 | |
| 129 | 125 |  | 
| 130 | 126 |      return lease | 
| ... | ... | @@ -66,10 +66,10 @@ class BotsInterface: | 
| 66 | 66 |          self._bot_sessions[name] = bot_session
 | 
| 67 | 67 |          self.logger.info("Created bot session name=[{}] with bot_id=[{}]".format(name, bot_id))
 | 
| 68 | 68 |  | 
| 69 | -        # For now, one lease at a time.
 | |
| 70 | -        lease = self._scheduler.create_lease()
 | |
| 71 | -        if lease:
 | |
| 72 | -            bot_session.leases.extend([lease])
 | |
| 69 | +        # TODO: Send worker capabilities to the scheduler!
 | |
| 70 | +        leases = self._scheduler.request_job_leases({})
 | |
| 71 | +        if leases:
 | |
| 72 | +            bot_session.leases.extend(leases)
 | |
| 73 | 73 |  | 
| 74 | 74 |          return bot_session
 | 
| 75 | 75 |  | 
| ... | ... | @@ -85,11 +85,11 @@ class BotsInterface: | 
| 85 | 85 |          del bot_session.leases[:]
 | 
| 86 | 86 |          bot_session.leases.extend(leases)
 | 
| 87 | 87 |  | 
| 88 | -        # For now, one lease at a time
 | |
| 88 | +        # TODO: Send worker capabilities to the scheduler!
 | |
| 89 | 89 |          if not bot_session.leases:
 | 
| 90 | -            lease = self._scheduler.create_lease()
 | |
| 91 | -            if lease:
 | |
| 92 | -                bot_session.leases.extend([lease])
 | |
| 90 | +            leases = self._scheduler.request_job_leases({})
 | |
| 91 | +            if leases:
 | |
| 92 | +                bot_session.leases.extend(leases)
 | |
| 93 | 93 |  | 
| 94 | 94 |          self._bot_sessions[name] = bot_session
 | 
| 95 | 95 |          return bot_session
 | 
| ... | ... | @@ -109,7 +109,8 @@ class BotsInterface: | 
| 109 | 109 |          if server_state == LeaseState.PENDING:
 | 
| 110 | 110 |  | 
| 111 | 111 |              if client_state == LeaseState.ACTIVE:
 | 
| 112 | -                self._scheduler.update_job_lease_state(client_lease.id, client_lease.state)
 | |
| 112 | +                self._scheduler.update_job_lease_state(client_lease.id,
 | |
| 113 | +                                                       LeaseState.ACTIVE)
 | |
| 113 | 114 |              elif client_state == LeaseState.COMPLETED:
 | 
| 114 | 115 |                  # TODO: Lease was rejected
 | 
| 115 | 116 |                  raise NotImplementedError("'Not Accepted' is unsupported")
 | 
| ... | ... | @@ -122,8 +123,10 @@ class BotsInterface: | 
| 122 | 123 |                  pass
 | 
| 123 | 124 |  | 
| 124 | 125 |              elif client_state == LeaseState.COMPLETED:
 | 
| 125 | -                self._scheduler.update_job_lease_state(client_lease.id, client_lease.state)
 | |
| 126 | -                self._scheduler.job_complete(client_lease.id, client_lease.result, client_lease.status)
 | |
| 126 | +                self._scheduler.update_job_lease_state(client_lease.id,
 | |
| 127 | +                                                       LeaseState.COMPLETED,
 | |
| 128 | +                                                       lease_status=client_lease.status,
 | |
| 129 | +                                                       lease_result=client_lease.result)
 | |
| 127 | 130 |                  return None
 | 
| 128 | 131 |  | 
| 129 | 132 |              else:
 | 
| ... | ... | @@ -48,12 +48,15 @@ class ExecutionInstance: | 
| 48 | 48 |          if not action:
 | 
| 49 | 49 |              raise FailedPreconditionError("Could not get action from storage.")
 | 
| 50 | 50 |  | 
| 51 | -        job = Job(action_digest, action.do_not_cache, message_queue)
 | |
| 51 | +        job = Job(action, action_digest)
 | |
| 52 | +        if message_queue is not None:
 | |
| 53 | +            job.register_client(message_queue)
 | |
| 54 | + | |
| 52 | 55 |          self.logger.info("Operation name: [{}]".format(job.name))
 | 
| 53 | 56 |  | 
| 54 | -        self._scheduler.append_job(job, skip_cache_lookup)
 | |
| 57 | +        self._scheduler.queue_job(job, skip_cache_lookup)
 | |
| 55 | 58 |  | 
| 56 | -        return job.get_operation()
 | |
| 59 | +        return job.operation
 | |
| 57 | 60 |  | 
| 58 | 61 |      def register_message_client(self, name, queue):
 | 
| 59 | 62 |          try:
 | 
| ... | ... | @@ -11,151 +11,211 @@ | 
| 11 | 11 |  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
| 12 | 12 |  # See the License for the specific language governing permissions and
 | 
| 13 | 13 |  # limitations under the License.
 | 
| 14 | -#
 | |
| 15 | -# Authors:
 | |
| 16 | -#        Finn Ball <finn ball codethink co uk>
 | |
| 14 | + | |
| 17 | 15 |  | 
| 18 | 16 |  import logging
 | 
| 19 | 17 |  import uuid
 | 
| 20 | 18 |  from enum import Enum
 | 
| 21 | 19 |  | 
| 22 | -from google.protobuf import any_pb2
 | |
| 23 | - | |
| 24 | 20 |  from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 | 
| 25 | 21 |  from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
 | 
| 26 | 22 |  from buildgrid._protos.google.longrunning import operations_pb2
 | 
| 27 | 23 |  | 
| 28 | 24 |  | 
| 29 | -class ExecuteStage(Enum):
 | |
| 25 | +class OperationStage(Enum):
 | |
| 26 | +    # Initially unknown stage.
 | |
| 30 | 27 |      UNKNOWN = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('UNKNOWN')
 | 
| 31 | - | |
| 32 | 28 |      # Checking the result against the cache.
 | 
| 33 | 29 |      CACHE_CHECK = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('CACHE_CHECK')
 | 
| 34 | - | |
| 35 | 30 |      # Currently idle, awaiting a free machine to execute.
 | 
| 36 | 31 |      QUEUED = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('QUEUED')
 | 
| 37 | - | |
| 38 | 32 |      # Currently being executed by a worker.
 | 
| 39 | 33 |      EXECUTING = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('EXECUTING')
 | 
| 40 | - | |
| 41 | 34 |      # Finished execution.
 | 
| 42 | 35 |      COMPLETED = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('COMPLETED')
 | 
| 43 | 36 |  | 
| 44 | 37 |  | 
| 45 | -class BotStatus(Enum):
 | |
| 46 | -    BOT_STATUS_UNSPECIFIED = bots_pb2.BotStatus.Value('BOT_STATUS_UNSPECIFIED')
 | |
| 47 | - | |
| 48 | -    # The bot is healthy, and will accept leases as normal.
 | |
| 49 | -    OK = bots_pb2.BotStatus.Value('OK')
 | |
| 50 | - | |
| 51 | -    # The bot is unhealthy and will not accept new leases.
 | |
| 52 | -    UNHEALTHY = bots_pb2.BotStatus.Value('UNHEALTHY')
 | |
| 53 | - | |
| 54 | -    # The bot has been asked to reboot the host.
 | |
| 55 | -    HOST_REBOOTING = bots_pb2.BotStatus.Value('HOST_REBOOTING')
 | |
| 56 | - | |
| 57 | -    # The bot has been asked to shut down.
 | |
| 58 | -    BOT_TERMINATING = bots_pb2.BotStatus.Value('BOT_TERMINATING')
 | |
| 59 | - | |
| 60 | - | |
| 61 | 38 |  class LeaseState(Enum):
 | 
| 39 | +    # Initially unknown state.
 | |
| 62 | 40 |      LEASE_STATE_UNSPECIFIED = bots_pb2.LeaseState.Value('LEASE_STATE_UNSPECIFIED')
 | 
| 63 | - | |
| 64 | 41 |      # The server expects the bot to accept this lease.
 | 
| 65 | 42 |      PENDING = bots_pb2.LeaseState.Value('PENDING')
 | 
| 66 | - | |
| 67 | 43 |      # The bot has accepted this lease.
 | 
| 68 | 44 |      ACTIVE = bots_pb2.LeaseState.Value('ACTIVE')
 | 
| 69 | - | |
| 70 | 45 |      # The bot is no longer leased.
 | 
| 71 | 46 |      COMPLETED = bots_pb2.LeaseState.Value('COMPLETED')
 | 
| 72 | - | |
| 73 | 47 |      # The bot should immediately release all resources associated with the lease.
 | 
| 74 | 48 |      CANCELLED = bots_pb2.LeaseState.Value('CANCELLED')
 | 
| 75 | 49 |  | 
| 76 | 50 |  | 
| 77 | 51 |  class Job:
 | 
| 78 | 52 |  | 
| 79 | -    def __init__(self, action_digest, do_not_cache=False, message_queue=None):
 | |
| 80 | -        self.lease = None
 | |
| 53 | +    def __init__(self, action, action_digest):
 | |
| 81 | 54 |          self.logger = logging.getLogger(__name__)
 | 
| 82 | -        self.n_tries = 0
 | |
| 83 | -        self.result = None
 | |
| 84 | -        self.result_cached = False
 | |
| 85 | 55 |  | 
| 86 | -        self._action_digest = action_digest
 | |
| 87 | -        self._do_not_cache = do_not_cache
 | |
| 88 | -        self._execute_stage = ExecuteStage.UNKNOWN
 | |
| 89 | 56 |          self._name = str(uuid.uuid4())
 | 
| 90 | -        self._operation = operations_pb2.Operation(name=self._name)
 | |
| 91 | -        self._operation_update_queues = []
 | |
| 57 | +        self._action = remote_execution_pb2.Action()
 | |
| 58 | +        self._operation = operations_pb2.Operation()
 | |
| 59 | +        self._lease = None
 | |
| 60 | + | |
| 61 | +        self.__execute_response = None
 | |
| 62 | +        self.__operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
 | |
| 92 | 63 |  | 
| 93 | -        if message_queue is not None:
 | |
| 94 | -            self.register_client(message_queue)
 | |
| 64 | +        self.__operation_metadata.action_digest.CopyFrom(action_digest)
 | |
| 65 | +        self.__operation_metadata.stage = OperationStage.UNKNOWN.value
 | |
| 66 | + | |
| 67 | +        self._action.CopyFrom(action)
 | |
| 68 | +        self._do_not_cache = self._action.do_not_cache
 | |
| 69 | +        self._operation_update_queues = []
 | |
| 70 | +        self._operation.name = self._name
 | |
| 71 | +        self._operation.done = False
 | |
| 72 | +        self._n_tries = 0
 | |
| 95 | 73 |  | 
| 96 | 74 |      @property
 | 
| 97 | 75 |      def name(self):
 | 
| 98 | 76 |          return self._name
 | 
| 99 | 77 |  | 
| 78 | +    @property
 | |
| 79 | +    def do_not_cache(self):
 | |
| 80 | +        return self._do_not_cache
 | |
| 81 | + | |
| 82 | +    @property
 | |
| 83 | +    def action(self):
 | |
| 84 | +        return self._action
 | |
| 85 | + | |
| 100 | 86 |      @property
 | 
| 101 | 87 |      def action_digest(self):
 | 
| 102 | -        return self._action_digest
 | |
| 88 | +        return self.__operation_metadata.action_digest
 | |
| 103 | 89 |  | 
| 104 | 90 |      @property
 | 
| 105 | -    def do_not_cache(self):
 | |
| 106 | -        return self._do_not_cache
 | |
| 91 | +    def action_result(self):
 | |
| 92 | +        if self.__execute_response is not None:
 | |
| 93 | +            return self.__execute_response.result
 | |
| 94 | +        else:
 | |
| 95 | +            return None
 | |
| 107 | 96 |  | 
| 108 | -    def check_job_finished(self):
 | |
| 109 | -        if not self._operation_update_queues:
 | |
| 110 | -            return self._operation.done
 | |
| 111 | -        return False
 | |
| 97 | +    @property
 | |
| 98 | +    def operation(self):
 | |
| 99 | +        return self._operation
 | |
| 100 | + | |
| 101 | +    @property
 | |
| 102 | +    def operation_stage(self):
 | |
| 103 | +        return OperationStage(self.__operation_metadata.state)
 | |
| 104 | + | |
| 105 | +    @property
 | |
| 106 | +    def lease(self):
 | |
| 107 | +        return self._lease
 | |
| 108 | + | |
| 109 | +    @property
 | |
| 110 | +    def lease_state(self):
 | |
| 111 | +        if self._lease is not None:
 | |
| 112 | +            return LeaseState(self._lease.state)
 | |
| 113 | +        else:
 | |
| 114 | +            return None
 | |
| 115 | + | |
| 116 | +    @property
 | |
| 117 | +    def n_tries(self):
 | |
| 118 | +        return self._n_tries
 | |
| 119 | + | |
| 120 | +    @property
 | |
| 121 | +    def n_clients(self):
 | |
| 122 | +        return len(self._operation_update_queues)
 | |
| 112 | 123 |  | 
| 113 | 124 |      def register_client(self, queue):
 | 
| 125 | +        """Subscribes to the job's :class:`Operation` stage change events.
 | |
| 126 | + | |
| 127 | +        Args:
 | |
| 128 | +            queue (queue.Queue): the event queue to register.
 | |
| 129 | +        """
 | |
| 114 | 130 |          self._operation_update_queues.append(queue)
 | 
| 115 | -        queue.put(self.get_operation())
 | |
| 131 | +        queue.put(self._operation)
 | |
| 116 | 132 |  | 
| 117 | 133 |      def unregister_client(self, queue):
 | 
| 134 | +        """Unsubscribes to the job's :class:`Operation` stage change events.
 | |
| 135 | + | |
| 136 | +        Args:
 | |
| 137 | +            queue (queue.Queue): the event queue to unregister.
 | |
| 138 | +        """
 | |
| 118 | 139 |          self._operation_update_queues.remove(queue)
 | 
| 119 | 140 |  | 
| 120 | -    def get_operation(self):
 | |
| 121 | -        self._operation.metadata.CopyFrom(self._pack_any(self.get_operation_meta()))
 | |
| 122 | -        if self.result is not None:
 | |
| 123 | -            self._operation.done = True
 | |
| 124 | -            response = remote_execution_pb2.ExecuteResponse(result=self.result,
 | |
| 125 | -                                                            cached_result=self.result_cached)
 | |
| 141 | +    def set_cached_result(self, action_result):
 | |
| 142 | +        """Allows specifying an action result form the action cache for the job.
 | |
| 143 | +        """
 | |
| 144 | +        self.__execute_response = remote_execution_pb2.ExecuteResponse()
 | |
| 145 | +        self.__execute_response.result.CopyFrom(action_result)
 | |
| 146 | +        self.__execute_response.cached_result = True
 | |
| 126 | 147 |  | 
| 127 | -            if not self.result_cached:
 | |
| 128 | -                response.status.CopyFrom(self.lease.status)
 | |
| 148 | +    def create_lease(self):
 | |
| 149 | +        """Emits a new :class:`Lease` for the job.
 | |
| 129 | 150 |  | 
| 130 | -            self._operation.response.CopyFrom(self._pack_any(response))
 | |
| 151 | +        Only one :class:`Lease` can be emitted for a given job. This method
 | |
| 152 | +        should only be used once, any furhter calls are ignored.
 | |
| 153 | +        """
 | |
| 154 | +        if self._lease is not None:
 | |
| 155 | +            return None
 | |
| 131 | 156 |  | 
| 132 | -        return self._operation
 | |
| 157 | +        self._lease = bots_pb2.Lease()
 | |
| 158 | +        self._lease.id = self._name
 | |
| 159 | +        self._lease.payload.Pack(self.__operation_metadata.action_digest)
 | |
| 160 | +        self._lease.state = LeaseState.PENDING.value
 | |
| 133 | 161 |  | 
| 134 | -    def get_operation_meta(self):
 | |
| 135 | -        meta = remote_execution_pb2.ExecuteOperationMetadata()
 | |
| 136 | -        meta.stage = self._execute_stage.value
 | |
| 137 | -        meta.action_digest.CopyFrom(self._action_digest)
 | |
| 162 | +        return self._lease
 | |
| 138 | 163 |  | 
| 139 | -        return meta
 | |
| 164 | +    def update_lease_state(self, state, status=None, result=None):
 | |
| 165 | +        """Operates a state transition for the job's current :class:Lease.
 | |
| 140 | 166 |  | 
| 141 | -    def create_lease(self):
 | |
| 142 | -        action_digest = self._pack_any(self._action_digest)
 | |
| 167 | +        Args:
 | |
| 168 | +            state (LeaseState): the lease state to transition to.
 | |
| 169 | +            status (google.rpc.Status): the lease execution status, only
 | |
| 170 | +                required if `state` is `COMPLETED`.
 | |
| 171 | +            result (google.protobuf.Any): the lease execution result, only
 | |
| 172 | +                required if `state` is `COMPLETED`.
 | |
| 173 | +        """
 | |
| 174 | +        if state.value == self._lease.state:
 | |
| 175 | +            return
 | |
| 143 | 176 |  | 
| 144 | -        lease = bots_pb2.Lease(id=self.name,
 | |
| 145 | -                               payload=action_digest,
 | |
| 146 | -                               state=LeaseState.PENDING.value)
 | |
| 147 | -        self.lease = lease
 | |
| 148 | -        return lease
 | |
| 177 | +        self._lease.state = state.value
 | |
| 149 | 178 |  | 
| 150 | -    def get_operations(self):
 | |
| 151 | -        return operations_pb2.ListOperationsResponse(operations=[self.get_operation()])
 | |
| 179 | +        if self._lease.state == LeaseState.PENDING.value:
 | |
| 180 | +            self._lease.status.Clear()
 | |
| 181 | +            self._lease.result.Clear()
 | |
| 152 | 182 |  | 
| 153 | -    def update_execute_stage(self, stage):
 | |
| 154 | -        self._execute_stage = stage
 | |
| 155 | -        for queue in self._operation_update_queues:
 | |
| 156 | -            queue.put(self.get_operation())
 | |
| 183 | +        elif self._lease.state == LeaseState.COMPLETED.value:
 | |
| 184 | +            action_result = remote_execution_pb2.ActionResult()
 | |
| 185 | + | |
| 186 | +            # TODO: Make a distinction between build and bot failures!
 | |
| 187 | +            if status.code != 0:
 | |
| 188 | +                self._do_not_cache = True
 | |
| 189 | + | |
| 190 | +            if result is not None:
 | |
| 191 | +                assert result.Is(action_result.DESCRIPTOR)
 | |
| 192 | +                result.Unpack(action_result)
 | |
| 193 | + | |
| 194 | +            self.__execute_response = remote_execution_pb2.ExecuteResponse()
 | |
| 195 | +            self.__execute_response.result.CopyFrom(action_result)
 | |
| 196 | +            self.__execute_response.cached_result = False
 | |
| 197 | +            self.__execute_response.status.CopyFrom(status)
 | |
| 198 | + | |
| 199 | +    def update_operation_stage(self, stage):
 | |
| 200 | +        """Operates a stage transition for the job's :class:Operation.
 | |
| 157 | 201 |  | 
| 158 | -    def _pack_any(self, pack):
 | |
| 159 | -        some_any = any_pb2.Any()
 | |
| 160 | -        some_any.Pack(pack)
 | |
| 161 | -        return some_any | |
| 202 | +        Args:
 | |
| 203 | +            stage (OperationStage): the operation stage to transition to.
 | |
| 204 | +        """
 | |
| 205 | +        if stage.value == self.__operation_metadata.stage:
 | |
| 206 | +            return
 | |
| 207 | + | |
| 208 | +        self.__operation_metadata.stage = stage.value
 | |
| 209 | + | |
| 210 | +        if self.__operation_metadata.stage == OperationStage.QUEUED.value:
 | |
| 211 | +            self._n_tries += 1
 | |
| 212 | + | |
| 213 | +        elif self.__operation_metadata.stage == OperationStage.COMPLETED.value:
 | |
| 214 | +            if self.__execute_response is not None:
 | |
| 215 | +                self._operation.response.Pack(self.__execute_response)
 | |
| 216 | +            self._operation.done = True
 | |
| 217 | + | |
| 218 | +        self._operation.metadata.Pack(self.__operation_metadata)
 | |
| 219 | + | |
| 220 | +        for queue in self._operation_update_queues:
 | |
| 221 | +            queue.put(self._operation) | 
| ... | ... | @@ -22,6 +22,7 @@ An instance of the LongRunningOperations Service. | 
| 22 | 22 |  import logging
 | 
| 23 | 23 |  | 
| 24 | 24 |  from buildgrid._exceptions import InvalidArgumentError
 | 
| 25 | +from buildgrid._protos.google.longrunning import operations_pb2
 | |
| 25 | 26 |  | 
| 26 | 27 |  | 
| 27 | 28 |  class OperationsInstance:
 | 
| ... | ... | @@ -34,18 +35,21 @@ class OperationsInstance: | 
| 34 | 35 |          server.add_operations_instance(self, instance_name)
 | 
| 35 | 36 |  | 
| 36 | 37 |      def get_operation(self, name):
 | 
| 37 | -        operation = self._scheduler.jobs.get(name)
 | |
| 38 | +        job = self._scheduler.jobs.get(name)
 | |
| 38 | 39 |  | 
| 39 | -        if operation is None:
 | |
| 40 | +        if job is None:
 | |
| 40 | 41 |              raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
 | 
| 41 | 42 |  | 
| 42 | 43 |          else:
 | 
| 43 | -            return operation.get_operation()
 | |
| 44 | +            return job.operation
 | |
| 44 | 45 |  | 
| 45 | 46 |      def list_operations(self, list_filter, page_size, page_token):
 | 
| 46 | 47 |          # TODO: Pages
 | 
| 47 | 48 |          # Spec says number of pages and length of a page are optional
 | 
| 48 | -        return self._scheduler.get_operations()
 | |
| 49 | +        response = operations_pb2.ListOperationsResponse()
 | |
| 50 | +        response.operations.extend([job.operation for job in self._scheduler.list_jobs()])
 | |
| 51 | + | |
| 52 | +        return response
 | |
| 49 | 53 |  | 
| 50 | 54 |      def delete_operation(self, name):
 | 
| 51 | 55 |          try:
 | 
| ... | ... | @@ -11,9 +11,7 @@ | 
| 11 | 11 |  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
| 12 | 12 |  # See the License for the specific language governing permissions and
 | 
| 13 | 13 |  # limitations under the License.
 | 
| 14 | -#
 | |
| 15 | -# Authors:
 | |
| 16 | -#        Finn Ball <finn ball codethink co uk>
 | |
| 14 | + | |
| 17 | 15 |  | 
| 18 | 16 |  """
 | 
| 19 | 17 |  Scheduler
 | 
| ... | ... | @@ -24,10 +22,8 @@ Schedules jobs. | 
| 24 | 22 |  from collections import deque
 | 
| 25 | 23 |  | 
| 26 | 24 |  from buildgrid._exceptions import NotFoundError
 | 
| 27 | -from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 | |
| 28 | -from buildgrid._protos.google.longrunning import operations_pb2
 | |
| 29 | 25 |  | 
| 30 | -from .job import ExecuteStage, LeaseState
 | |
| 26 | +from .job import OperationStage, LeaseState
 | |
| 31 | 27 |  | 
| 32 | 28 |  | 
| 33 | 29 |  class Scheduler:
 | 
| ... | ... | @@ -39,80 +35,96 @@ class Scheduler: | 
| 39 | 35 |          self.jobs = {}
 | 
| 40 | 36 |          self.queue = deque()
 | 
| 41 | 37 |  | 
| 42 | -    def register_client(self, name, queue):
 | |
| 43 | -        self.jobs[name].register_client(queue)
 | |
| 38 | +    def register_client(self, job_name, queue):
 | |
| 39 | +        self.jobs[job_name].register_client(queue)
 | |
| 40 | + | |
| 41 | +    def unregister_client(self, job_name, queue):
 | |
| 42 | +        self.jobs[job_name].unregister_client(queue)
 | |
| 44 | 43 |  | 
| 45 | -    def unregister_client(self, name, queue):
 | |
| 46 | -        job = self.jobs[name]
 | |
| 47 | -        job.unregister_client(queue)
 | |
| 48 | -        if job.check_job_finished():
 | |
| 49 | -            del self.jobs[name]
 | |
| 44 | +        if self.jobs[job_name].n_clients == 0:
 | |
| 45 | +            del self.jobs[job_name]
 | |
| 50 | 46 |  | 
| 51 | -    def append_job(self, job, skip_cache_lookup=False):
 | |
| 47 | +    def queue_job(self, job, skip_cache_lookup=False):
 | |
| 52 | 48 |          self.jobs[job.name] = job
 | 
| 49 | + | |
| 50 | +        operation_stage = None
 | |
| 53 | 51 |          if self._action_cache is not None and not skip_cache_lookup:
 | 
| 54 | 52 |              try:
 | 
| 55 | -                cached_result = self._action_cache.get_action_result(job.action_digest)
 | |
| 53 | +                action_result = self._action_cache.get_action_result(job.action_digest)
 | |
| 56 | 54 |              except NotFoundError:
 | 
| 55 | +                operation_stage = OperationStage.QUEUED
 | |
| 57 | 56 |                  self.queue.append(job)
 | 
| 58 | -                job.update_execute_stage(ExecuteStage.QUEUED)
 | |
| 59 | 57 |  | 
| 60 | 58 |              else:
 | 
| 61 | -                job.result = cached_result
 | |
| 62 | -                job.result_cached = True
 | |
| 63 | -                job.update_execute_stage(ExecuteStage.COMPLETED)
 | |
| 59 | +                job.set_cached_result(action_result)
 | |
| 60 | +                operation_stage = OperationStage.COMPLETED
 | |
| 64 | 61 |  | 
| 65 | 62 |          else:
 | 
| 63 | +            operation_stage = OperationStage.QUEUED
 | |
| 66 | 64 |              self.queue.append(job)
 | 
| 67 | -            job.update_execute_stage(ExecuteStage.QUEUED)
 | |
| 68 | 65 |  | 
| 69 | -    def retry_job(self, name):
 | |
| 70 | -        if name in self.jobs:
 | |
| 71 | -            job = self.jobs[name]
 | |
| 66 | +        job.update_operation_stage(operation_stage)
 | |
| 67 | + | |
| 68 | +    def retry_job(self, job_name):
 | |
| 69 | +        if job_name in self.jobs:
 | |
| 70 | +            job = self.jobs[job_name]
 | |
| 72 | 71 |              if job.n_tries >= self.MAX_N_TRIES:
 | 
| 73 | 72 |                  # TODO: Decide what to do with these jobs
 | 
| 74 | -                job.update_execute_stage(ExecuteStage.COMPLETED)
 | |
| 73 | +                job.update_operation_stage(OperationStage.COMPLETED)
 | |
| 75 | 74 |                  # TODO: Mark these jobs as done
 | 
| 76 | 75 |              else:
 | 
| 77 | -                job.update_execute_stage(ExecuteStage.QUEUED)
 | |
| 78 | -                job.n_tries += 1
 | |
| 76 | +                job.update_operation_stage(OperationStage.QUEUED)
 | |
| 79 | 77 |                  self.queue.appendleft(job)
 | 
| 80 | 78 |  | 
| 81 | -    def job_complete(self, name, result, status):
 | |
| 82 | -        job = self.jobs[name]
 | |
| 83 | -        job.lease.status.CopyFrom(status)
 | |
| 84 | -        action_result = remote_execution_pb2.ActionResult()
 | |
| 85 | -        result.Unpack(action_result)
 | |
| 86 | -        job.result = action_result
 | |
| 87 | -        if not job.do_not_cache and self._action_cache is not None:
 | |
| 88 | -            if not job.lease.status.code:
 | |
| 89 | -                self._action_cache.update_action_result(job.action_digest, action_result)
 | |
| 90 | -        job.update_execute_stage(ExecuteStage.COMPLETED)
 | |
| 91 | - | |
| 92 | -    def get_operations(self):
 | |
| 93 | -        response = operations_pb2.ListOperationsResponse()
 | |
| 94 | -        for v in self.jobs.values():
 | |
| 95 | -            response.operations.extend([v.get_operation()])
 | |
| 96 | -        return response
 | |
| 97 | - | |
| 98 | -    def update_job_lease_state(self, name, state):
 | |
| 99 | -        job = self.jobs[name]
 | |
| 100 | -        job.lease.state = state
 | |
| 101 | - | |
| 102 | -    def get_job_lease(self, name):
 | |
| 103 | -        return self.jobs[name].lease
 | |
| 104 | - | |
| 105 | -    def cancel_session(self, name):
 | |
| 106 | -        job = self.jobs[name]
 | |
| 107 | -        state = job.lease.state
 | |
| 108 | -        if state in (LeaseState.PENDING.value, LeaseState.ACTIVE.value):
 | |
| 109 | -            self.retry_job(name)
 | |
| 110 | - | |
| 111 | -    def create_lease(self):
 | |
| 112 | -        if self.queue:
 | |
| 113 | -            job = self.queue.popleft()
 | |
| 114 | -            job.update_execute_stage(ExecuteStage.EXECUTING)
 | |
| 115 | -            job.create_lease()
 | |
| 116 | -            job.lease.state = LeaseState.PENDING.value
 | |
| 117 | -            return job.lease
 | |
| 118 | -        return None | |
| 79 | +    def list_jobs(self):
 | |
| 80 | +        return self.jobs.values()
 | |
| 81 | + | |
| 82 | +    def request_job_leases(self, worker_capabilities):
 | |
| 83 | +        """Generates a list of the highest priority leases to be run.
 | |
| 84 | + | |
| 85 | +        Args:
 | |
| 86 | +            worker_capabilities (dict): a set of key-value pairs decribing the
 | |
| 87 | +                worker properties, configuration and state at the time of the
 | |
| 88 | +                request.
 | |
| 89 | +        """
 | |
| 90 | +        if not self.queue:
 | |
| 91 | +            return []
 | |
| 92 | + | |
| 93 | +        job = self.queue.popleft()
 | |
| 94 | +        # For now, one lease at a time:
 | |
| 95 | +        lease = job.create_lease()
 | |
| 96 | + | |
| 97 | +        return [lease]
 | |
| 98 | + | |
| 99 | +    def update_job_lease_state(self, job_name, lease_state, lease_status=None, lease_result=None):
 | |
| 100 | +        """Requests a state transition for a job's current :class:Lease.
 | |
| 101 | + | |
| 102 | +        Args:
 | |
| 103 | +            job_name (str): name of the job to query.
 | |
| 104 | +            lease_state (LeaseState): the lease state to transition to.
 | |
| 105 | +            lease_status (google.rpc.Status): the lease execution status, only
 | |
| 106 | +                required if `lease_state` is `COMPLETED`.
 | |
| 107 | +            lease_result (google.protobuf.Any): the lease execution result, only
 | |
| 108 | +                required if `lease_state` is `COMPLETED`.
 | |
| 109 | +        """
 | |
| 110 | +        job = self.jobs[job_name]
 | |
| 111 | + | |
| 112 | +        if lease_state != LeaseState.COMPLETED:
 | |
| 113 | +            job.update_lease_state(lease_state)
 | |
| 114 | + | |
| 115 | +        else:
 | |
| 116 | +            job.update_lease_state(lease_state,
 | |
| 117 | +                                   status=lease_status, result=lease_result)
 | |
| 118 | + | |
| 119 | +            if self._action_cache is not None and not job.do_not_cache:
 | |
| 120 | +                self._action_cache.update_action_result(job.action_digest, job.action_result)
 | |
| 121 | + | |
| 122 | +            job.update_operation_stage(OperationStage.COMPLETED)
 | |
| 123 | + | |
| 124 | +    def get_job_lease(self, job_name):
 | |
| 125 | +        """Returns the lease associated to job, if any have been emitted yet."""
 | |
| 126 | +        return self.jobs[job_name].lease
 | |
| 127 | + | |
| 128 | +    def get_job_operation(self, job_name):
 | |
| 129 | +        """Returns the operation associated to job."""
 | |
| 130 | +        return self.jobs[job_name].operation | 
| ... | ... | @@ -137,7 +137,7 @@ def test_update_leases_with_work(bot_session, context, instance): | 
| 137 | 137 |                                                 bot_session=bot_session)
 | 
| 138 | 138 |  | 
| 139 | 139 |      action_digest = remote_execution_pb2.Digest(hash='gaff')
 | 
| 140 | -    _inject_work(instance._instances[""]._scheduler, action_digest)
 | |
| 140 | +    _inject_work(instance._instances[""]._scheduler, action_digest=action_digest)
 | |
| 141 | 141 |  | 
| 142 | 142 |      response = instance.CreateBotSession(request, context)
 | 
| 143 | 143 |  | 
| ... | ... | @@ -159,7 +159,7 @@ def test_update_leases_work_complete(bot_session, context, instance): | 
| 159 | 159 |  | 
| 160 | 160 |      # Inject work
 | 
| 161 | 161 |      action_digest = remote_execution_pb2.Digest(hash='gaff')
 | 
| 162 | -    _inject_work(instance._instances[""]._scheduler, action_digest)
 | |
| 162 | +    _inject_work(instance._instances[""]._scheduler, action_digest=action_digest)
 | |
| 163 | 163 |  | 
| 164 | 164 |      request = bots_pb2.UpdateBotSessionRequest(name=response.name,
 | 
| 165 | 165 |                                                 bot_session=response)
 | 
| ... | ... | @@ -174,6 +174,7 @@ def test_update_leases_work_complete(bot_session, context, instance): | 
| 174 | 174 |      response = copy.deepcopy(instance.UpdateBotSession(request, context))
 | 
| 175 | 175 |  | 
| 176 | 176 |      response.leases[0].state = LeaseState.COMPLETED.value
 | 
| 177 | +    response.leases[0].result.Pack(remote_execution_pb2.ActionResult())
 | |
| 177 | 178 |  | 
| 178 | 179 |      request = bots_pb2.UpdateBotSessionRequest(name=response.name,
 | 
| 179 | 180 |                                                 bot_session=response)
 | 
| ... | ... | @@ -187,7 +188,7 @@ def test_work_rejected_by_bot(bot_session, context, instance): | 
| 187 | 188 |                                                 bot_session=bot_session)
 | 
| 188 | 189 |      # Inject work
 | 
| 189 | 190 |      action_digest = remote_execution_pb2.Digest(hash='gaff')
 | 
| 190 | -    _inject_work(instance._instances[""]._scheduler, action_digest)
 | |
| 191 | +    _inject_work(instance._instances[""]._scheduler, action_digest=action_digest)
 | |
| 191 | 192 |  | 
| 192 | 193 |      # Simulated the severed binding between client and server
 | 
| 193 | 194 |      response = copy.deepcopy(instance.CreateBotSession(request, context))
 | 
| ... | ... | @@ -209,7 +210,7 @@ def test_work_out_of_sync_from_pending(state, bot_session, context, instance): | 
| 209 | 210 |                                                 bot_session=bot_session)
 | 
| 210 | 211 |      # Inject work
 | 
| 211 | 212 |      action_digest = remote_execution_pb2.Digest(hash='gaff')
 | 
| 212 | -    _inject_work(instance._instances[""]._scheduler, action_digest)
 | |
| 213 | +    _inject_work(instance._instances[""]._scheduler, action_digest=action_digest)
 | |
| 213 | 214 |  | 
| 214 | 215 |      # Simulated the severed binding between client and server
 | 
| 215 | 216 |      response = copy.deepcopy(instance.CreateBotSession(request, context))
 | 
| ... | ... | @@ -230,7 +231,7 @@ def test_work_out_of_sync_from_active(state, bot_session, context, instance): | 
| 230 | 231 |                                                 bot_session=bot_session)
 | 
| 231 | 232 |      # Inject work
 | 
| 232 | 233 |      action_digest = remote_execution_pb2.Digest(hash='gaff')
 | 
| 233 | -    _inject_work(instance._instances[""]._scheduler, action_digest)
 | |
| 234 | +    _inject_work(instance._instances[""]._scheduler, action_digest=action_digest)
 | |
| 234 | 235 |  | 
| 235 | 236 |      # Simulated the severed binding between client and server
 | 
| 236 | 237 |      response = copy.deepcopy(instance.CreateBotSession(request, context))
 | 
| ... | ... | @@ -257,7 +258,7 @@ def test_work_active_to_active(bot_session, context, instance): | 
| 257 | 258 |                                                 bot_session=bot_session)
 | 
| 258 | 259 |      # Inject work
 | 
| 259 | 260 |      action_digest = remote_execution_pb2.Digest(hash='gaff')
 | 
| 260 | -    _inject_work(instance._instances[""]._scheduler, action_digest)
 | |
| 261 | +    _inject_work(instance._instances[""]._scheduler, action_digest=action_digest)
 | |
| 261 | 262 |  | 
| 262 | 263 |      # Simulated the severed binding between client and server
 | 
| 263 | 264 |      response = copy.deepcopy(instance.CreateBotSession(request, context))
 | 
| ... | ... | @@ -279,8 +280,10 @@ def test_post_bot_event_temp(context, instance): | 
| 279 | 280 |      context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
 | 
| 280 | 281 |  | 
| 281 | 282 |  | 
| 282 | -def _inject_work(scheduler, action_digest=None):
 | |
| 283 | +def _inject_work(scheduler, action=None, action_digest=None):
 | |
| 284 | +    if not action:
 | |
| 285 | +        action = remote_execution_pb2.Action()
 | |
| 283 | 286 |      if not action_digest:
 | 
| 284 | 287 |          action_digest = remote_execution_pb2.Digest()
 | 
| 285 | -    j = job.Job(action_digest, False)
 | |
| 286 | -    scheduler.append_job(j, True) | |
| 288 | +    j = job.Job(action, action_digest)
 | |
| 289 | +    scheduler.queue_job(j, True) | 
| ... | ... | @@ -82,7 +82,7 @@ def test_execute(skip_cache_lookup, instance, context): | 
| 82 | 82 |      assert isinstance(result, operations_pb2.Operation)
 | 
| 83 | 83 |      metadata = remote_execution_pb2.ExecuteOperationMetadata()
 | 
| 84 | 84 |      result.metadata.Unpack(metadata)
 | 
| 85 | -    assert metadata.stage == job.ExecuteStage.QUEUED.value
 | |
| 85 | +    assert metadata.stage == job.OperationStage.QUEUED.value
 | |
| 86 | 86 |      assert uuid.UUID(result.name, version=4)
 | 
| 87 | 87 |      assert result.done is False
 | 
| 88 | 88 |  | 
| ... | ... | @@ -105,7 +105,7 @@ def test_no_action_digest_in_storage(instance, context): | 
| 105 | 105 |  | 
| 106 | 106 |  | 
| 107 | 107 |  def test_wait_execution(instance, controller, context):
 | 
| 108 | -    j = job.Job(action_digest, None)
 | |
| 108 | +    j = job.Job(action, action_digest)
 | |
| 109 | 109 |      j._operation.done = True
 | 
| 110 | 110 |  | 
| 111 | 111 |      request = remote_execution_pb2.WaitExecutionRequest(name="{}/{}".format('', j.name))
 | 
| ... | ... | @@ -116,7 +116,7 @@ def test_wait_execution(instance, controller, context): | 
| 116 | 116 |      action_result = remote_execution_pb2.ActionResult()
 | 
| 117 | 117 |      action_result_any.Pack(action_result)
 | 
| 118 | 118 |  | 
| 119 | -    j.update_execute_stage(job.ExecuteStage.COMPLETED)
 | |
| 119 | +    j.update_operation_stage(job.OperationStage.COMPLETED)
 | |
| 120 | 120 |  | 
| 121 | 121 |      response = instance.WaitExecution(request, context)
 | 
| 122 | 122 |  | 
| ... | ... | @@ -125,7 +125,7 @@ def test_wait_execution(instance, controller, context): | 
| 125 | 125 |      assert isinstance(result, operations_pb2.Operation)
 | 
| 126 | 126 |      metadata = remote_execution_pb2.ExecuteOperationMetadata()
 | 
| 127 | 127 |      result.metadata.Unpack(metadata)
 | 
| 128 | -    assert metadata.stage == job.ExecuteStage.COMPLETED.value
 | |
| 128 | +    assert metadata.stage == job.OperationStage.COMPLETED.value
 | |
| 129 | 129 |      assert uuid.UUID(result.name, version=4)
 | 
| 130 | 130 |      assert result.done is True
 | 
| 131 | 131 |  | 
| ... | ... | @@ -30,6 +30,7 @@ from buildgrid._protos.google.longrunning import operations_pb2 | 
| 30 | 30 |  from buildgrid._protos.google.rpc import status_pb2
 | 
| 31 | 31 |  from buildgrid.server.cas.storage import lru_memory_cache
 | 
| 32 | 32 |  from buildgrid.server.controller import ExecutionController
 | 
| 33 | +from buildgrid.server.job import LeaseState
 | |
| 33 | 34 |  from buildgrid.server.operations import service
 | 
| 34 | 35 |  from buildgrid.server.operations.service import OperationsService
 | 
| 35 | 36 |  from buildgrid.utils import create_digest
 | 
| ... | ... | @@ -131,9 +132,10 @@ def test_list_operations_with_result(instance, controller, execute_request, cont | 
| 131 | 132 |      action_result.output_files.extend([output_file])
 | 
| 132 | 133 |  | 
| 133 | 134 |      controller.operations_instance._scheduler.jobs[response_execute.name].create_lease()
 | 
| 134 | -    controller.operations_instance._scheduler.job_complete(response_execute.name,
 | |
| 135 | -                                                           _pack_any(action_result),
 | |
| 136 | -                                                           status_pb2.Status())
 | |
| 135 | +    controller.operations_instance._scheduler.update_job_lease_state(response_execute.name,
 | |
| 136 | +                                                                     LeaseState.COMPLETED,
 | |
| 137 | +                                                                     lease_status=status_pb2.Status(),
 | |
| 138 | +                                                                     lease_result=_pack_any(action_result))
 | |
| 137 | 139 |  | 
| 138 | 140 |      request = operations_pb2.ListOperationsRequest(name=instance_name)
 | 
| 139 | 141 |      response = instance.ListOperations(request, context)
 | 
