[Notes] [Git][BuildGrid/buildgrid][mablanch/117-job-scheduler-refactoring] job.py: Hold the original Action object



Title: GitLab

Martin Blanchard pushed to branch mablanch/117-job-scheduler-refactoring at BuildGrid / buildgrid

Commits:

5 changed files:

Changes:

  • buildgrid/server/execution/instance.py
    ... ... @@ -48,7 +48,10 @@ class ExecutionInstance:
    48 48
             if not action:
    
    49 49
                 raise FailedPreconditionError("Could not get action from storage.")
    
    50 50
     
    
    51
    -        job = Job(action_digest, action.do_not_cache, message_queue)
    
    51
    +        job = Job(action, action_digest)
    
    52
    +        if message_queue is not None:
    
    53
    +            job.register_client(message_queue)
    
    54
    +
    
    52 55
             self.logger.info("Operation name: [{}]".format(job.name))
    
    53 56
     
    
    54 57
             self._scheduler.queue_job(job, skip_cache_lookup)
    

  • buildgrid/server/job.py
    ... ... @@ -50,11 +50,12 @@ class LeaseState(Enum):
    50 50
     
    
    51 51
     class Job:
    
    52 52
     
    
    53
    -    def __init__(self, action_digest, do_not_cache=False, message_queue=None):
    
    53
    +    def __init__(self, action, action_digest):
    
    54 54
             self.logger = logging.getLogger(__name__)
    
    55 55
     
    
    56 56
             self._name = str(uuid.uuid4())
    
    57
    -        self._do_not_cache = do_not_cache
    
    57
    +        self._do_not_cache = False
    
    58
    +        self._action = remote_execution_pb2.Action()
    
    58 59
             self._operation = operations_pb2.Operation()
    
    59 60
             self._lease = None
    
    60 61
             self._n_tries = 0
    
    ... ... @@ -65,13 +66,12 @@ class Job:
    65 66
             self.__operation_metadata.action_digest.CopyFrom(action_digest)
    
    66 67
             self.__operation_metadata.stage = OperationStage.UNKNOWN.value
    
    67 68
     
    
    69
    +        self._action.CopyFrom(action)
    
    70
    +        self._do_not_cache = self._action.do_not_cache
    
    68 71
             self._operation_update_queues = []
    
    69 72
             self._operation.name = self._name
    
    70 73
             self._operation.done = False
    
    71 74
     
    
    72
    -        if message_queue is not None:
    
    73
    -            self.register_client(message_queue)
    
    74
    -
    
    75 75
         @property
    
    76 76
         def name(self):
    
    77 77
             return self._name
    
    ... ... @@ -80,6 +80,10 @@ class Job:
    80 80
         def do_not_cache(self):
    
    81 81
             return self._do_not_cache
    
    82 82
     
    
    83
    +    @property
    
    84
    +    def action(self):
    
    85
    +        return self._action
    
    86
    +
    
    83 87
         @property
    
    84 88
         def action_digest(self):
    
    85 89
             return self.__operation_metadata.action_digest
    
    ... ... @@ -157,6 +161,10 @@ class Job:
    157 161
             if self._lease.state == LeaseState.COMPLETED.value:
    
    158 162
                 action_result = remote_execution_pb2.ActionResult()
    
    159 163
     
    
    164
    +            # TODO: Make a distinction between build and bot failures!
    
    165
    +            if status.code != 0:
    
    166
    +                self._do_not_cache = True
    
    167
    +
    
    160 168
                 if result is not None:
    
    161 169
                     assert result.Is(action_result.DESCRIPTOR)
    
    162 170
                     result.Unpack(action_result)
    

  • buildgrid/server/scheduler.py
    ... ... @@ -86,9 +86,8 @@ class Scheduler:
    86 86
             else:
    
    87 87
                 job.update_lease_state(lease_state, status=lease_status, result=lease_result)
    
    88 88
     
    
    89
    -            if not job.do_not_cache and self._action_cache is not None:
    
    90
    -                if not job.lease.status.code:
    
    91
    -                    self._action_cache.update_action_result(job.action_digest, job.action_result)
    
    89
    +            if self._action_cache is not None and not job.do_not_cache:
    
    90
    +                self._action_cache.update_action_result(job.action_digest, job.action_result)
    
    92 91
     
    
    93 92
                 job.update_operation_stage(OperationStage.COMPLETED)
    
    94 93
     
    

  • tests/integration/bots_service.py
    ... ... @@ -137,7 +137,7 @@ def test_update_leases_with_work(bot_session, context, instance):
    137 137
                                                    bot_session=bot_session)
    
    138 138
     
    
    139 139
         action_digest = remote_execution_pb2.Digest(hash='gaff')
    
    140
    -    _inject_work(instance._instances[""]._scheduler, action_digest)
    
    140
    +    _inject_work(instance._instances[""]._scheduler, action_digest=action_digest)
    
    141 141
     
    
    142 142
         response = instance.CreateBotSession(request, context)
    
    143 143
     
    
    ... ... @@ -159,7 +159,7 @@ def test_update_leases_work_complete(bot_session, context, instance):
    159 159
     
    
    160 160
         # Inject work
    
    161 161
         action_digest = remote_execution_pb2.Digest(hash='gaff')
    
    162
    -    _inject_work(instance._instances[""]._scheduler, action_digest)
    
    162
    +    _inject_work(instance._instances[""]._scheduler, action_digest=action_digest)
    
    163 163
     
    
    164 164
         request = bots_pb2.UpdateBotSessionRequest(name=response.name,
    
    165 165
                                                    bot_session=response)
    
    ... ... @@ -188,7 +188,7 @@ def test_work_rejected_by_bot(bot_session, context, instance):
    188 188
                                                    bot_session=bot_session)
    
    189 189
         # Inject work
    
    190 190
         action_digest = remote_execution_pb2.Digest(hash='gaff')
    
    191
    -    _inject_work(instance._instances[""]._scheduler, action_digest)
    
    191
    +    _inject_work(instance._instances[""]._scheduler, action_digest=action_digest)
    
    192 192
     
    
    193 193
         # Simulated the severed binding between client and server
    
    194 194
         response = copy.deepcopy(instance.CreateBotSession(request, context))
    
    ... ... @@ -210,7 +210,7 @@ def test_work_out_of_sync_from_pending(state, bot_session, context, instance):
    210 210
                                                    bot_session=bot_session)
    
    211 211
         # Inject work
    
    212 212
         action_digest = remote_execution_pb2.Digest(hash='gaff')
    
    213
    -    _inject_work(instance._instances[""]._scheduler, action_digest)
    
    213
    +    _inject_work(instance._instances[""]._scheduler, action_digest=action_digest)
    
    214 214
     
    
    215 215
         # Simulated the severed binding between client and server
    
    216 216
         response = copy.deepcopy(instance.CreateBotSession(request, context))
    
    ... ... @@ -231,7 +231,7 @@ def test_work_out_of_sync_from_active(state, bot_session, context, instance):
    231 231
                                                    bot_session=bot_session)
    
    232 232
         # Inject work
    
    233 233
         action_digest = remote_execution_pb2.Digest(hash='gaff')
    
    234
    -    _inject_work(instance._instances[""]._scheduler, action_digest)
    
    234
    +    _inject_work(instance._instances[""]._scheduler, action_digest=action_digest)
    
    235 235
     
    
    236 236
         # Simulated the severed binding between client and server
    
    237 237
         response = copy.deepcopy(instance.CreateBotSession(request, context))
    
    ... ... @@ -258,7 +258,7 @@ def test_work_active_to_active(bot_session, context, instance):
    258 258
                                                    bot_session=bot_session)
    
    259 259
         # Inject work
    
    260 260
         action_digest = remote_execution_pb2.Digest(hash='gaff')
    
    261
    -    _inject_work(instance._instances[""]._scheduler, action_digest)
    
    261
    +    _inject_work(instance._instances[""]._scheduler, action_digest=action_digest)
    
    262 262
     
    
    263 263
         # Simulated the severed binding between client and server
    
    264 264
         response = copy.deepcopy(instance.CreateBotSession(request, context))
    
    ... ... @@ -280,8 +280,10 @@ def test_post_bot_event_temp(context, instance):
    280 280
         context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
    
    281 281
     
    
    282 282
     
    
    283
    -def _inject_work(scheduler, action_digest=None):
    
    283
    +def _inject_work(scheduler, action=None, action_digest=None):
    
    284
    +    if not action:
    
    285
    +        action = remote_execution_pb2.Action()
    
    284 286
         if not action_digest:
    
    285 287
             action_digest = remote_execution_pb2.Digest()
    
    286
    -    j = job.Job(action_digest, False)
    
    288
    +    j = job.Job(action, action_digest)
    
    287 289
         scheduler.queue_job(j, True)

  • tests/integration/execution_service.py
    ... ... @@ -105,7 +105,7 @@ def test_no_action_digest_in_storage(instance, context):
    105 105
     
    
    106 106
     
    
    107 107
     def test_wait_execution(instance, controller, context):
    
    108
    -    j = job.Job(action_digest, None)
    
    108
    +    j = job.Job(action, action_digest)
    
    109 109
         j._operation.done = True
    
    110 110
     
    
    111 111
         request = remote_execution_pb2.WaitExecutionRequest(name="{}/{}".format('', j.name))
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]