[Notes] [Git][BuildGrid/buildgrid][santigl/104-platform-matching] Scheduler: match OS and ISA between job and worker



Title: GitLab

Santiago Gil pushed to branch santigl/104-platform-matching at BuildGrid / buildgrid

Commits:

7 changed files:

Changes:

  • buildgrid/server/bots/instance.py
    ... ... @@ -50,7 +50,6 @@ class BotsInterface:
    50 50
             register with the service, the old one should be closed along
    
    51 51
             with all its jobs.
    
    52 52
             """
    
    53
    -
    
    54 53
             bot_id = bot_session.bot_id
    
    55 54
     
    
    56 55
             if bot_id == "":
    
    ... ... @@ -100,10 +99,25 @@ class BotsInterface:
    100 99
             return bot_session
    
    101 100
     
    
    102 101
         def _request_leases(self, bot_session):
    
    103
    -        # TODO: Send worker capabilities to the scheduler!
    
    104 102
             # Only send one lease at a time currently.
    
    105 103
             if not bot_session.leases:
    
    106
    -            leases = self._scheduler.request_job_leases({})
    
    104
    +            worker_capabilities = {}
    
    105
    +
    
    106
    +            # TODO? Fail if there are no devices in the worker?
    
    107
    +            if bot_session.worker.devices:
    
    108
    +                # According to the spec:
    
    109
    +                #   "The first device in the worker is the "primary device" -
    
    110
    +                #   that is, the device running a bot and which is
    
    111
    +                #   responsible for actually executing commands."
    
    112
    +                primary_device = bot_session.worker.devices[0]
    
    113
    +
    
    114
    +                for device_property in primary_device.properties:
    
    115
    +                    if device_property.key not in worker_capabilities:
    
    116
    +                        worker_capabilities[device_property.key] = set()
    
    117
    +                    worker_capabilities[device_property.key].add(device_property.value)
    
    118
    +
    
    119
    +            leases = self._scheduler.request_job_leases(worker_capabilities)
    
    120
    +
    
    107 121
                 if leases:
    
    108 122
                     for lease in leases:
    
    109 123
                         self._assigned_leases[bot_session.name].add(lease.id)
    

  • buildgrid/server/execution/instance.py
    ... ... @@ -22,7 +22,7 @@ An instance of the Remote Execution Service.
    22 22
     import logging
    
    23 23
     
    
    24 24
     from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, NotFoundError
    
    25
    -from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
    
    25
    +from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action, Command
    
    26 26
     from buildgrid.utils import get_hash_type
    
    27 27
     
    
    28 28
     
    
    ... ... @@ -50,11 +50,22 @@ class ExecutionInstance:
    50 50
             this action.
    
    51 51
             """
    
    52 52
             action = self._storage.get_message(action_digest, Action)
    
    53
    -
    
    54 53
             if not action:
    
    55 54
                 raise FailedPreconditionError("Could not get action from storage.")
    
    56 55
     
    
    56
    +        command = self._storage.get_message(action.command_digest, Command)
    
    57
    +
    
    58
    +        if not command:
    
    59
    +            raise FailedPreconditionError("Could not get command from storage.")
    
    60
    +
    
    61
    +        platform_requirements = {}
    
    62
    +        for platform_property in command.platform.properties:
    
    63
    +            if platform_property.name not in platform_requirements:
    
    64
    +                platform_requirements[platform_property.name] = set()
    
    65
    +            platform_requirements[platform_property.name].add(platform_property.value)
    
    66
    +
    
    57 67
             return self._scheduler.queue_job_action(action, action_digest,
    
    68
    +                                                platform_requirements=platform_requirements,
    
    58 69
                                                     skip_cache_lookup=skip_cache_lookup)
    
    59 70
     
    
    60 71
         def register_job_peer(self, job_name, peer, message_queue):
    

  • buildgrid/server/job.py
    ... ... @@ -29,7 +29,7 @@ from buildgrid._protos.google.rpc import code_pb2
    29 29
     
    
    30 30
     class Job:
    
    31 31
     
    
    32
    -    def __init__(self, action, action_digest, priority=0):
    
    32
    +    def __init__(self, action, action_digest, platform_requirements=None, priority=0):
    
    33 33
             self.__logger = logging.getLogger(__name__)
    
    34 34
     
    
    35 35
             self._name = str(uuid.uuid4())
    
    ... ... @@ -59,6 +59,9 @@ class Job:
    59 59
             self._do_not_cache = self._action.do_not_cache
    
    60 60
             self._n_tries = 0
    
    61 61
     
    
    62
    +        self._platform_requirements = platform_requirements \
    
    63
    +            if platform_requirements else dict()
    
    64
    +
    
    62 65
             self._done = False
    
    63 66
     
    
    64 67
         def __lt__(self, other):
    
    ... ... @@ -113,6 +116,10 @@ class Job:
    113 116
     
    
    114 117
         # --- Public API: REAPI ---
    
    115 118
     
    
    119
    +    @property
    
    120
    +    def platform_requirements(self):
    
    121
    +        return self._platform_requirements
    
    122
    +
    
    116 123
         @property
    
    117 124
         def do_not_cache(self):
    
    118 125
             return self._do_not_cache
    

  • buildgrid/server/scheduler.py
    ... ... @@ -145,7 +145,8 @@ class Scheduler:
    145 145
             if not job.n_peers and job.done and not job.lease:
    
    146 146
                 self._delete_job(job.name)
    
    147 147
     
    
    148
    -    def queue_job_action(self, action, action_digest, priority=0, skip_cache_lookup=False):
    
    148
    +    def queue_job_action(self, action, action_digest, platform_requirements=None,
    
    149
    +                         priority=0, skip_cache_lookup=False):
    
    149 150
             """Inserts a newly created job into the execution queue.
    
    150 151
     
    
    151 152
             Warning:
    
    ... ... @@ -155,6 +156,9 @@ class Scheduler:
    155 156
             Args:
    
    156 157
                 action (Action): the given action to queue for execution.
    
    157 158
                 action_digest (Digest): the digest of the given action.
    
    159
    +            platform_requirements (dict(set)): platform attributes that a worker
    
    160
    +                must satisfy in order to be assigned the job. (Each key can
    
    161
    +                have multiple values.)
    
    158 162
                 priority (int): the execution job's priority.
    
    159 163
                 skip_cache_lookup (bool): whether or not to look for pre-computed
    
    160 164
                     result for the given action.
    
    ... ... @@ -178,7 +182,9 @@ class Scheduler:
    178 182
     
    
    179 183
                     return job.name
    
    180 184
     
    
    181
    -        job = Job(action, action_digest, priority=priority)
    
    185
    +        job = Job(action, action_digest,
    
    186
    +                  platform_requirements=platform_requirements,
    
    187
    +                  priority=priority)
    
    182 188
     
    
    183 189
             self.__logger.debug("Job created for action [%s]: [%s]",
    
    184 190
                                 action_digest.hash[:8], job.name)
    
    ... ... @@ -271,28 +277,29 @@ class Scheduler:
    271 277
             """Generates a list of the highest priority leases to be run.
    
    272 278
     
    
    273 279
             Args:
    
    274
    -            worker_capabilities (dict): a set of key-value pairs decribing the
    
    280
    +            worker_capabilities (dict): a set of key-value pairs describing the
    
    275 281
                     worker properties, configuration and state at the time of the
    
    276 282
                     request.
    
    277
    -
    
    278
    -        Warning: Worker capabilities handling is not implemented at the moment!
    
    279 283
             """
    
    280 284
             if not self.__queue:
    
    281 285
                 return []
    
    282 286
     
    
    283
    -        # TODO: Try to match worker_capabilities with jobs properties.
    
    284
    -        job = self.__queue.pop()
    
    287
    +        # Looking for the first job that could be assigned to the worker...
    
    288
    +        for job_index, job in enumerate(self.__queue):
    
    289
    +            if self._worker_is_capable(worker_capabilities, job):
    
    290
    +                self.__logger.info("Job scheduled to run: [%s]", job.name)
    
    285 291
     
    
    286
    -        self.__logger.info("Job scheduled to run: [%s]", job.name)
    
    292
    +                lease = job.lease
    
    287 293
     
    
    288
    -        lease = job.lease
    
    294
    +                if not lease:
    
    295
    +                    # For now, one lease at a time:
    
    296
    +                    lease = job.create_lease()
    
    289 297
     
    
    290
    -        if not lease:
    
    291
    -            # For now, one lease at a time:
    
    292
    -            lease = job.create_lease()
    
    298
    +                if lease:
    
    299
    +                    del self.__queue[job_index]
    
    300
    +                    return [lease]
    
    293 301
     
    
    294
    -        if lease:
    
    295
    -            return [lease]
    
    302
    +                return None
    
    296 303
     
    
    297 304
             return None
    
    298 305
     
    
    ... ... @@ -622,3 +629,28 @@ class Scheduler:
    622 629
     
    
    623 630
                         for message_queue in self.__build_metadata_queues:
    
    624 631
                             message_queue.put(message)
    
    632
    +
    
    633
    +    def _worker_is_capable(self, worker_capabilities, job):
    
    634
    +        """Returns whether the worker is suitable to run the job."""
    
    635
    +        # TODO: Replace this with the logic defined in the Platform msg. standard.
    
    636
    +
    
    637
    +        job_requirements = job.platform_requirements
    
    638
    +        # For now we'll only check OS and ISA properties.
    
    639
    +
    
    640
    +        if not job_requirements:
    
    641
    +            return True
    
    642
    +
    
    643
    +        # OS:
    
    644
    +        worker_oses = worker_capabilities.get('os', set())
    
    645
    +        job_oses = job_requirements.get('os', set())
    
    646
    +        if job_oses and not (job_oses & worker_oses):
    
    647
    +            return False
    
    648
    +
    
    649
    +        # ISAs:
    
    650
    +        worker_isas = worker_capabilities.get('isa', [])
    
    651
    +        job_isas = job_requirements.get('isa', None)
    
    652
    +
    
    653
    +        if job_isas and not (job_isas & worker_isas):
    
    654
    +            return False
    
    655
    +
    
    656
    +        return True

  • tests/integration/bots_service.py
    ... ... @@ -153,11 +153,27 @@ def test_post_bot_event_temp(context, instance):
    153 153
         context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
    
    154 154
     
    
    155 155
     
    
    156
    -def _inject_work(scheduler, action=None, action_digest=None):
    
    156
    +def test_unmet_platform_requirements(bot_session, context, instance):
    
    157
    +    request = bots_pb2.CreateBotSessionRequest(parent='',
    
    158
    +                                               bot_session=bot_session)
    
    159
    +
    
    160
    +    action_digest = remote_execution_pb2.Digest(hash='gaff')
    
    161
    +    _inject_work(instance._instances[""]._scheduler,
    
    162
    +                 action_digest=action_digest,
    
    163
    +                 platform_requirements={'os': set('wonderful-os')})
    
    164
    +
    
    165
    +    response = instance.CreateBotSession(request, context)
    
    166
    +
    
    167
    +    assert len(response.leases) == 0
    
    168
    +
    
    169
    +
    
    170
    +def _inject_work(scheduler, action=None, action_digest=None,
    
    171
    +                 platform_requirements=None):
    
    157 172
         if not action:
    
    158 173
             action = remote_execution_pb2.Action()
    
    159 174
     
    
    160 175
         if not action_digest:
    
    161 176
             action_digest = remote_execution_pb2.Digest()
    
    162 177
     
    
    163
    -    scheduler.queue_job_action(action, action_digest, skip_cache_lookup=True)
    178
    +    scheduler.queue_job_action(action, action_digest, platform_requirements,
    
    179
    +                               skip_cache_lookup=True)

  • tests/integration/execution_service.py
    ... ... @@ -37,7 +37,12 @@ from buildgrid.server.execution.service import ExecutionService
    37 37
     
    
    38 38
     
    
    39 39
     server = mock.create_autospec(grpc.server)
    
    40
    -action = remote_execution_pb2.Action(do_not_cache=True)
    
    40
    +
    
    41
    +command = remote_execution_pb2.Command()
    
    42
    +command_digest = create_digest(command.SerializeToString())
    
    43
    +
    
    44
    +action = remote_execution_pb2.Action(command_digest=command_digest,
    
    45
    +                                     do_not_cache=True)
    
    41 46
     action_digest = create_digest(action.SerializeToString())
    
    42 47
     
    
    43 48
     
    
    ... ... @@ -50,7 +55,13 @@ def context():
    50 55
     @pytest.fixture(params=["action-cache", "no-action-cache"])
    
    51 56
     def controller(request):
    
    52 57
         storage = lru_memory_cache.LRUMemoryCache(1024 * 1024)
    
    58
    +
    
    59
    +    write_session = storage.begin_write(command_digest)
    
    60
    +    write_session.write(command.SerializeToString())
    
    61
    +    storage.commit_write(command_digest, write_session)
    
    62
    +
    
    53 63
         write_session = storage.begin_write(action_digest)
    
    64
    +    write_session.write(action.SerializeToString())
    
    54 65
         storage.commit_write(action_digest, write_session)
    
    55 66
     
    
    56 67
         if request.param == "action-cache":
    

  • tests/integration/operations_service.py
    ... ... @@ -36,7 +36,12 @@ from buildgrid.utils import create_digest
    36 36
     
    
    37 37
     server = mock.create_autospec(grpc.server)
    
    38 38
     instance_name = "blade"
    
    39
    -action = remote_execution_pb2.Action(do_not_cache=True)
    
    39
    +
    
    40
    +command = remote_execution_pb2.Command()
    
    41
    +command_digest = create_digest(command.SerializeToString())
    
    42
    +
    
    43
    +action = remote_execution_pb2.Action(command_digest=command_digest,
    
    44
    +                                     do_not_cache=True)
    
    40 45
     action_digest = create_digest(action.SerializeToString())
    
    41 46
     
    
    42 47
     
    
    ... ... @@ -57,7 +62,13 @@ def execute_request():
    57 62
     @pytest.fixture
    
    58 63
     def controller():
    
    59 64
         storage = lru_memory_cache.LRUMemoryCache(1024 * 1024)
    
    65
    +
    
    66
    +    write_session = storage.begin_write(command_digest)
    
    67
    +    write_session.write(command.SerializeToString())
    
    68
    +    storage.commit_write(command_digest, write_session)
    
    69
    +
    
    60 70
         write_session = storage.begin_write(action_digest)
    
    71
    +    write_session.write(action.SerializeToString())
    
    61 72
         storage.commit_write(action_digest, write_session)
    
    62 73
     
    
    63 74
         yield ExecutionController(None, storage)
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]