Santiago Gil pushed to branch santigl/104-platform-matching at BuildGrid / buildgrid
Commits:
- 
2c9952e5
by Santiago Gil at 2019-02-12T14:35:31Z
7 changed files:
- buildgrid/server/bots/instance.py
- buildgrid/server/execution/instance.py
- buildgrid/server/job.py
- buildgrid/server/scheduler.py
- tests/integration/bots_service.py
- tests/integration/execution_service.py
- tests/integration/operations_service.py
Changes:
| ... | ... | @@ -50,7 +50,6 @@ class BotsInterface: | 
| 50 | 50 |          register with the service, the old one should be closed along
 | 
| 51 | 51 |          with all its jobs.
 | 
| 52 | 52 |          """
 | 
| 53 | - | |
| 54 | 53 |          bot_id = bot_session.bot_id
 | 
| 55 | 54 |  | 
| 56 | 55 |          if bot_id == "":
 | 
| ... | ... | @@ -100,10 +99,25 @@ class BotsInterface: | 
| 100 | 99 |          return bot_session
 | 
| 101 | 100 |  | 
| 102 | 101 |      def _request_leases(self, bot_session):
 | 
| 103 | -        # TODO: Send worker capabilities to the scheduler!
 | |
| 104 | 102 |          # Only send one lease at a time currently.
 | 
| 105 | 103 |          if not bot_session.leases:
 | 
| 106 | -            leases = self._scheduler.request_job_leases({})
 | |
| 104 | +            worker_capabilities = {}
 | |
| 105 | + | |
| 106 | +            # TODO? Fail if there are no devices in the worker?
 | |
| 107 | +            if bot_session.worker.devices:
 | |
| 108 | +                # According to the spec:
 | |
| 109 | +                #   "The first device in the worker is the "primary device" -
 | |
| 110 | +                #   that is, the device running a bot and which is
 | |
| 111 | +                #   responsible for actually executing commands."
 | |
| 112 | +                primary_device = bot_session.worker.devices[0]
 | |
| 113 | + | |
| 114 | +                for device_property in primary_device.properties:
 | |
| 115 | +                    if device_property.key not in worker_capabilities:
 | |
| 116 | +                        worker_capabilities[device_property.key] = set()
 | |
| 117 | +                    worker_capabilities[device_property.key].add(device_property.value)
 | |
| 118 | + | |
| 119 | +            leases = self._scheduler.request_job_leases(worker_capabilities)
 | |
| 120 | + | |
| 107 | 121 |              if leases:
 | 
| 108 | 122 |                  for lease in leases:
 | 
| 109 | 123 |                      self._assigned_leases[bot_session.name].add(lease.id)
 | 
| ... | ... | @@ -22,7 +22,7 @@ An instance of the Remote Execution Service. | 
| 22 | 22 |  import logging
 | 
| 23 | 23 |  | 
| 24 | 24 |  from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, NotFoundError
 | 
| 25 | -from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
 | |
| 25 | +from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action, Command
 | |
| 26 | 26 |  from buildgrid.utils import get_hash_type
 | 
| 27 | 27 |  | 
| 28 | 28 |  | 
| ... | ... | @@ -50,11 +50,22 @@ class ExecutionInstance: | 
| 50 | 50 |          this action.
 | 
| 51 | 51 |          """
 | 
| 52 | 52 |          action = self._storage.get_message(action_digest, Action)
 | 
| 53 | - | |
| 54 | 53 |          if not action:
 | 
| 55 | 54 |              raise FailedPreconditionError("Could not get action from storage.")
 | 
| 56 | 55 |  | 
| 56 | +        command = self._storage.get_message(action.command_digest, Command)
 | |
| 57 | + | |
| 58 | +        if not command:
 | |
| 59 | +            raise FailedPreconditionError("Could not get command from storage.")
 | |
| 60 | + | |
| 61 | +        platform_requirements = {}
 | |
| 62 | +        for platform_property in command.platform.properties:
 | |
| 63 | +            if platform_property.name not in platform_requirements:
 | |
| 64 | +                platform_requirements[platform_property.name] = set()
 | |
| 65 | +            platform_requirements[platform_property.name].add(platform_property.value)
 | |
| 66 | + | |
| 57 | 67 |          return self._scheduler.queue_job_action(action, action_digest,
 | 
| 68 | +                                                platform_requirements,
 | |
| 58 | 69 |                                                  skip_cache_lookup=skip_cache_lookup)
 | 
| 59 | 70 |  | 
| 60 | 71 |      def register_job_peer(self, job_name, peer, message_queue):
 | 
| ... | ... | @@ -29,7 +29,7 @@ from buildgrid._protos.google.rpc import code_pb2 | 
| 29 | 29 |  | 
| 30 | 30 |  class Job:
 | 
| 31 | 31 |  | 
| 32 | -    def __init__(self, action, action_digest, priority=0):
 | |
| 32 | +    def __init__(self, action, action_digest, platform_requirements, priority=0):
 | |
| 33 | 33 |          self.__logger = logging.getLogger(__name__)
 | 
| 34 | 34 |  | 
| 35 | 35 |          self._name = str(uuid.uuid4())
 | 
| ... | ... | @@ -59,6 +59,8 @@ class Job: | 
| 59 | 59 |          self._do_not_cache = self._action.do_not_cache
 | 
| 60 | 60 |          self._n_tries = 0
 | 
| 61 | 61 |  | 
| 62 | +        self._platform_requirements = platform_requirements
 | |
| 63 | + | |
| 62 | 64 |          self._done = False
 | 
| 63 | 65 |  | 
| 64 | 66 |      def __lt__(self, other):
 | 
| ... | ... | @@ -113,6 +115,10 @@ class Job: | 
| 113 | 115 |  | 
| 114 | 116 |      # --- Public API: REAPI ---
 | 
| 115 | 117 |  | 
| 118 | +    @property
 | |
| 119 | +    def platform_requirements(self):
 | |
| 120 | +        return self._platform_requirements
 | |
| 121 | + | |
| 116 | 122 |      @property
 | 
| 117 | 123 |      def do_not_cache(self):
 | 
| 118 | 124 |          return self._do_not_cache
 | 
| ... | ... | @@ -145,7 +145,8 @@ class Scheduler: | 
| 145 | 145 |          if not job.n_peers and job.done and not job.lease:
 | 
| 146 | 146 |              self._delete_job(job.name)
 | 
| 147 | 147 |  | 
| 148 | -    def queue_job_action(self, action, action_digest, priority=0, skip_cache_lookup=False):
 | |
| 148 | +    def queue_job_action(self, action, action_digest, platform_requirements={},
 | |
| 149 | +                         priority=0, skip_cache_lookup=False):
 | |
| 149 | 150 |          """Inserts a newly created job into the execution queue.
 | 
| 150 | 151 |  | 
| 151 | 152 |          Warning:
 | 
| ... | ... | @@ -155,6 +156,9 @@ class Scheduler: | 
| 155 | 156 |          Args:
 | 
| 156 | 157 |              action (Action): the given action to queue for execution.
 | 
| 157 | 158 |              action_digest (Digest): the digest of the given action.
 | 
| 159 | +            platform_requirements (dict(set)): platform attributes that a worker
 | |
| 160 | +                must satisfy in order to be assigned the job. (Each key can
 | |
| 161 | +                have multiple values.)
 | |
| 158 | 162 |              priority (int): the execution job's priority.
 | 
| 159 | 163 |              skip_cache_lookup (bool): whether or not to look for pre-computed
 | 
| 160 | 164 |                  result for the given action.
 | 
| ... | ... | @@ -178,7 +182,8 @@ class Scheduler: | 
| 178 | 182 |  | 
| 179 | 183 |                  return job.name
 | 
| 180 | 184 |  | 
| 181 | -        job = Job(action, action_digest, priority=priority)
 | |
| 185 | +        job = Job(action, action_digest, platform_requirements,
 | |
| 186 | +                  priority=priority)
 | |
| 182 | 187 |  | 
| 183 | 188 |          self.__logger.debug("Job created for action [%s]: [%s]",
 | 
| 184 | 189 |                              action_digest.hash[:8], job.name)
 | 
| ... | ... | @@ -271,28 +276,29 @@ class Scheduler: | 
| 271 | 276 |          """Generates a list of the highest priority leases to be run.
 | 
| 272 | 277 |  | 
| 273 | 278 |          Args:
 | 
| 274 | -            worker_capabilities (dict): a set of key-value pairs decribing the
 | |
| 279 | +            worker_capabilities (dict): a set of key-value pairs describing the
 | |
| 275 | 280 |                  worker properties, configuration and state at the time of the
 | 
| 276 | 281 |                  request.
 | 
| 277 | - | |
| 278 | -        Warning: Worker capabilities handling is not implemented at the moment!
 | |
| 279 | 282 |          """
 | 
| 280 | 283 |          if not self.__queue:
 | 
| 281 | 284 |              return []
 | 
| 282 | 285 |  | 
| 283 | -        # TODO: Try to match worker_capabilities with jobs properties.
 | |
| 284 | -        job = self.__queue.pop()
 | |
| 286 | +        # Looking for the first job that could be assigned to the worker...
 | |
| 287 | +        for job_index, job in enumerate(self.__queue):
 | |
| 288 | +            if self._worker_is_capable(worker_capabilities, job):
 | |
| 289 | +                self.__logger.info("Job scheduled to run: [%s]", job.name)
 | |
| 285 | 290 |  | 
| 286 | -        self.__logger.info("Job scheduled to run: [%s]", job.name)
 | |
| 291 | +                lease = job.lease
 | |
| 287 | 292 |  | 
| 288 | -        lease = job.lease
 | |
| 293 | +                if not lease:
 | |
| 294 | +                    # For now, one lease at a time:
 | |
| 295 | +                    lease = job.create_lease()
 | |
| 289 | 296 |  | 
| 290 | -        if not lease:
 | |
| 291 | -            # For now, one lease at a time:
 | |
| 292 | -            lease = job.create_lease()
 | |
| 297 | +                if lease:
 | |
| 298 | +                    del self.__queue[job_index]
 | |
| 299 | +                    return [lease]
 | |
| 293 | 300 |  | 
| 294 | -        if lease:
 | |
| 295 | -            return [lease]
 | |
| 301 | +                return None
 | |
| 296 | 302 |  | 
| 297 | 303 |          return None
 | 
| 298 | 304 |  | 
| ... | ... | @@ -622,3 +628,28 @@ class Scheduler: | 
| 622 | 628 |  | 
| 623 | 629 |                      for message_queue in self.__build_metadata_queues:
 | 
| 624 | 630 |                          message_queue.put(message)
 | 
| 631 | + | |
| 632 | +    def _worker_is_capable(self, worker_capabilities, job):
 | |
| 633 | +        """Returns whether the worker is suitable to run the job."""
 | |
| 634 | +        # TODO: Replace this with the logic defined in the Platform msg. standard.
 | |
| 635 | + | |
| 636 | +        job_requirements = job.platform_requirements
 | |
| 637 | +        # For now we'll only check OS and ISA properties.
 | |
| 638 | + | |
| 639 | +        if not job_requirements:
 | |
| 640 | +            return True
 | |
| 641 | + | |
| 642 | +        # OS:
 | |
| 643 | +        worker_oses = worker_capabilities.get('os', set())
 | |
| 644 | +        job_oses = job_requirements.get('os', set())
 | |
| 645 | +        if job_oses and not (job_oses & worker_oses):
 | |
| 646 | +            return False
 | |
| 647 | + | |
| 648 | +        # ISAs:
 | |
| 649 | +        worker_isas = worker_capabilities.get('isa', [])
 | |
| 650 | +        job_isas = job_requirements.get('isa', None)
 | |
| 651 | + | |
| 652 | +        if job_isas and not (job_isas & worker_isas):
 | |
| 653 | +            return False
 | |
| 654 | + | |
| 655 | +        return True | 
| ... | ... | @@ -153,11 +153,27 @@ def test_post_bot_event_temp(context, instance): | 
| 153 | 153 |      context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
 | 
| 154 | 154 |  | 
| 155 | 155 |  | 
| 156 | -def _inject_work(scheduler, action=None, action_digest=None):
 | |
| 156 | +def test_unmet_platform_requirements(bot_session, context, instance):
 | |
| 157 | +    request = bots_pb2.CreateBotSessionRequest(parent='',
 | |
| 158 | +                                               bot_session=bot_session)
 | |
| 159 | + | |
| 160 | +    action_digest = remote_execution_pb2.Digest(hash='gaff')
 | |
| 161 | +    _inject_work(instance._instances[""]._scheduler,
 | |
| 162 | +                 action_digest=action_digest,
 | |
| 163 | +                 platform_requirements={'os': set('wonderful-os')})
 | |
| 164 | + | |
| 165 | +    response = instance.CreateBotSession(request, context)
 | |
| 166 | + | |
| 167 | +    assert len(response.leases) == 0
 | |
| 168 | + | |
| 169 | + | |
| 170 | +def _inject_work(scheduler, action=None, action_digest=None,
 | |
| 171 | +                 platform_requirements=None):
 | |
| 157 | 172 |      if not action:
 | 
| 158 | 173 |          action = remote_execution_pb2.Action()
 | 
| 159 | 174 |  | 
| 160 | 175 |      if not action_digest:
 | 
| 161 | 176 |          action_digest = remote_execution_pb2.Digest()
 | 
| 162 | 177 |  | 
| 163 | -    scheduler.queue_job_action(action, action_digest, skip_cache_lookup=True) | |
| 178 | +    scheduler.queue_job_action(action, action_digest, platform_requirements,
 | |
| 179 | +                               skip_cache_lookup=True) | 
| ... | ... | @@ -37,7 +37,12 @@ from buildgrid.server.execution.service import ExecutionService | 
| 37 | 37 |  | 
| 38 | 38 |  | 
| 39 | 39 |  server = mock.create_autospec(grpc.server)
 | 
| 40 | -action = remote_execution_pb2.Action(do_not_cache=True)
 | |
| 40 | + | |
| 41 | +command = remote_execution_pb2.Command()
 | |
| 42 | +command_digest = create_digest(command.SerializeToString())
 | |
| 43 | + | |
| 44 | +action = remote_execution_pb2.Action(command_digest=command_digest,
 | |
| 45 | +                                     do_not_cache=True)
 | |
| 41 | 46 |  action_digest = create_digest(action.SerializeToString())
 | 
| 42 | 47 |  | 
| 43 | 48 |  | 
| ... | ... | @@ -50,7 +55,13 @@ def context(): | 
| 50 | 55 |  @pytest.fixture(params=["action-cache", "no-action-cache"])
 | 
| 51 | 56 |  def controller(request):
 | 
| 52 | 57 |      storage = lru_memory_cache.LRUMemoryCache(1024 * 1024)
 | 
| 58 | + | |
| 59 | +    write_session = storage.begin_write(command_digest)
 | |
| 60 | +    write_session.write(command.SerializeToString())
 | |
| 61 | +    storage.commit_write(command_digest, write_session)
 | |
| 62 | + | |
| 53 | 63 |      write_session = storage.begin_write(action_digest)
 | 
| 64 | +    write_session.write(action.SerializeToString())
 | |
| 54 | 65 |      storage.commit_write(action_digest, write_session)
 | 
| 55 | 66 |  | 
| 56 | 67 |      if request.param == "action-cache":
 | 
| ... | ... | @@ -36,7 +36,12 @@ from buildgrid.utils import create_digest | 
| 36 | 36 |  | 
| 37 | 37 |  server = mock.create_autospec(grpc.server)
 | 
| 38 | 38 |  instance_name = "blade"
 | 
| 39 | -action = remote_execution_pb2.Action(do_not_cache=True)
 | |
| 39 | + | |
| 40 | +command = remote_execution_pb2.Command()
 | |
| 41 | +command_digest = create_digest(command.SerializeToString())
 | |
| 42 | + | |
| 43 | +action = remote_execution_pb2.Action(command_digest=command_digest,
 | |
| 44 | +                                     do_not_cache=True)
 | |
| 40 | 45 |  action_digest = create_digest(action.SerializeToString())
 | 
| 41 | 46 |  | 
| 42 | 47 |  | 
| ... | ... | @@ -57,7 +62,13 @@ def execute_request(): | 
| 57 | 62 |  @pytest.fixture
 | 
| 58 | 63 |  def controller():
 | 
| 59 | 64 |      storage = lru_memory_cache.LRUMemoryCache(1024 * 1024)
 | 
| 65 | + | |
| 66 | +    write_session = storage.begin_write(command_digest)
 | |
| 67 | +    write_session.write(command.SerializeToString())
 | |
| 68 | +    storage.commit_write(command_digest, write_session)
 | |
| 69 | + | |
| 60 | 70 |      write_session = storage.begin_write(action_digest)
 | 
| 71 | +    write_session.write(action.SerializeToString())
 | |
| 61 | 72 |      storage.commit_write(action_digest, write_session)
 | 
| 62 | 73 |  | 
| 63 | 74 |      yield ExecutionController(None, storage)
 | 
