[Notes] [Git][BuildGrid/buildgrid][santigl/104-platform-matching] Add platform_attributes property to Job class



Title: GitLab

Santiago Gil pushed to branch santigl/104-platform-matching at BuildGrid / buildgrid

Commits:

5 changed files:

Changes:

  • buildgrid/server/execution/instance.py
    ... ... @@ -22,7 +22,7 @@ An instance of the Remote Execution Service.
    22 22
     import logging
    
    23 23
     
    
    24 24
     from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, NotFoundError
    
    25
    -from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
    
    25
    +from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action, Command
    
    26 26
     from buildgrid.utils import get_hash_type
    
    27 27
     
    
    28 28
     
    
    ... ... @@ -50,11 +50,21 @@ class ExecutionInstance:
    50 50
             this action.
    
    51 51
             """
    
    52 52
             action = self._storage.get_message(action_digest, Action)
    
    53
    -
    
    54 53
             if not action:
    
    55 54
                 raise FailedPreconditionError("Could not get action from storage.")
    
    56 55
     
    
    56
    +        platform_requirements = None
    
    57
    +        if action.command_digest.hash:
    
    58
    +            command = self._storage.get_message(action.command_digest, Command)
    
    59
    +
    
    60
    +            if not command:
    
    61
    +                raise FailedPreconditionError("Could not get command from storage.")
    
    62
    +
    
    63
    +            if command:
    
    64
    +                platform_requirements = command.platform
    
    65
    +
    
    57 66
             return self._scheduler.queue_job_action(action, action_digest,
    
    67
    +                                                platform_requirements,
    
    58 68
                                                     skip_cache_lookup=skip_cache_lookup)
    
    59 69
     
    
    60 70
         def register_job_peer(self, job_name, peer, message_queue):
    

  • buildgrid/server/job.py
    ... ... @@ -29,7 +29,7 @@ from buildgrid._protos.google.rpc import code_pb2
    29 29
     
    
    30 30
     class Job:
    
    31 31
     
    
    32
    -    def __init__(self, action, action_digest, priority=0):
    
    32
    +    def __init__(self, action, action_digest, platform_requirements, priority=0):
    
    33 33
             self.__logger = logging.getLogger(__name__)
    
    34 34
     
    
    35 35
             self._name = str(uuid.uuid4())
    
    ... ... @@ -59,6 +59,8 @@ class Job:
    59 59
             self._do_not_cache = self._action.do_not_cache
    
    60 60
             self._n_tries = 0
    
    61 61
     
    
    62
    +        self._platform_requirements = platform_requirements
    
    63
    +
    
    62 64
             self._done = False
    
    63 65
     
    
    64 66
         def __lt__(self, other):
    
    ... ... @@ -111,6 +113,10 @@ class Job:
    111 113
         def done(self):
    
    112 114
             return self._done
    
    113 115
     
    
    116
    +    @property
    
    117
    +    def platform_requirements(self):
    
    118
    +        return self._platform_requirements
    
    119
    +
    
    114 120
         # --- Public API: REAPI ---
    
    115 121
     
    
    116 122
         @property
    

  • buildgrid/server/scheduler.py
    ... ... @@ -145,7 +145,8 @@ class Scheduler:
    145 145
             if not job.n_peers and job.done and not job.lease:
    
    146 146
                 self._delete_job(job.name)
    
    147 147
     
    
    148
    -    def queue_job_action(self, action, action_digest, priority=0, skip_cache_lookup=False):
    
    148
    +    def queue_job_action(self, action, action_digest, platform_requirements,
    
    149
    +                         priority=0, skip_cache_lookup=False):
    
    149 150
             """Inserts a newly created job into the execution queue.
    
    150 151
     
    
    151 152
             Warning:
    
    ... ... @@ -155,6 +156,9 @@ class Scheduler:
    155 156
             Args:
    
    156 157
                 action (Action): the given action to queue for execution.
    
    157 158
                 action_digest (Digest): the digest of the given action.
    
    159
    +            platform_requirements (dict(list)): platform attributes that a worker
    
    160
    +                must satisfy in order to be assigned the job. (Each key can
    
    161
    +                have multiple values.)
    
    158 162
                 priority (int): the execution job's priority.
    
    159 163
                 skip_cache_lookup (bool): whether or not to look for pre-computed
    
    160 164
                     result for the given action.
    
    ... ... @@ -178,7 +182,8 @@ class Scheduler:
    178 182
     
    
    179 183
                     return job.name
    
    180 184
     
    
    181
    -        job = Job(action, action_digest, priority=priority)
    
    185
    +        job = Job(action, action_digest, platform_requirements,
    
    186
    +                  priority=priority)
    
    182 187
     
    
    183 188
             self.__logger.debug("Job created for action [%s]: [%s]",
    
    184 189
                                 action_digest.hash[:8], job.name)
    
    ... ... @@ -271,7 +276,7 @@ class Scheduler:
    271 276
             """Generates a list of the highest priority leases to be run.
    
    272 277
     
    
    273 278
             Args:
    
    274
    -            worker_capabilities (dict): a set of key-value pairs decribing the
    
    279
    +            worker_capabilities (dict): a set of key-value pairs describing the
    
    275 280
                     worker properties, configuration and state at the time of the
    
    276 281
                     request.
    
    277 282
     
    
    ... ... @@ -280,7 +285,7 @@ class Scheduler:
    280 285
             if not self.__queue:
    
    281 286
                 return []
    
    282 287
     
    
    283
    -        # TODO: Try to match worker_capabilities with jobs properties.
    
    288
    +        # TODO: Try to match worker_capabilities with jobs properties (job.platform_requirements)
    
    284 289
             job = self.__queue.pop()
    
    285 290
     
    
    286 291
             self.__logger.info("Job scheduled to run: [%s]", job.name)
    

  • tests/integration/bots_service.py
    ... ... @@ -153,11 +153,13 @@ def test_post_bot_event_temp(context, instance):
    153 153
         context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
    
    154 154
     
    
    155 155
     
    
    156
    -def _inject_work(scheduler, action=None, action_digest=None):
    
    156
    +def _inject_work(scheduler, action=None, action_digest=None,
    
    157
    +                 platform_requirements=None):
    
    157 158
         if not action:
    
    158 159
             action = remote_execution_pb2.Action()
    
    159 160
     
    
    160 161
         if not action_digest:
    
    161 162
             action_digest = remote_execution_pb2.Digest()
    
    162 163
     
    
    163
    -    scheduler.queue_job_action(action, action_digest, skip_cache_lookup=True)
    164
    +    scheduler.queue_job_action(action, action_digest, platform_requirements,
    
    165
    +                               skip_cache_lookup=True)

  • tests/integration/execution_service.py
    ... ... @@ -107,6 +107,7 @@ def test_no_action_digest_in_storage(instance, context):
    107 107
     def test_wait_execution(instance, controller, context):
    
    108 108
         job_name = controller.execution_instance._scheduler.queue_job_action(action,
    
    109 109
                                                                              action_digest,
    
    110
    +                                                                         platform_requirements={},
    
    110 111
                                                                              skip_cache_lookup=True)
    
    111 112
     
    
    112 113
         message_queue = queue.Queue()
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]