[Notes] [Git][BuildGrid/buildgrid][mablanch/117-job-scheduler-refactoring] 7 commits: Update CONTRIBUTING.rst



Title: GitLab

Martin Blanchard pushed to branch mablanch/117-job-scheduler-refactoring at BuildGrid / buildgrid

Commits:

11 changed files:

Changes:

  • CONTRIBUTING.rst
    ... ... @@ -17,9 +17,13 @@ discuss with us before submitting anything, as we may well have some important
    17 17
     context which will could help guide your efforts.
    
    18 18
     
    
    19 19
     Any major feature additions should be raised first as a proposal on the
    
    20
    -`BuildGrid mailing list`_ to be discussed, and then eventually followed up with
    
    21
    -an issue on GitLab. We recommend that you propose the feature in advance of
    
    22
    -commencing work.
    
    20
    +BuildGrid mailing list to be discussed between the core contributors. Once 
    
    21
    +this discussion has taken place and there is agreement on how to proceed, 
    
    22
    +it should be followed by with a Gitlab issue being raised which summarizes 
    
    23
    +the tasks required.
    
    24
    +
    
    25
    +We strongly recommend that you propose the feature in advance of
    
    26
    +commencing any work.
    
    23 27
     
    
    24 28
     The author of any patch is expected to take ownership of that code and is to
    
    25 29
     support it for a reasonable time-frame. This means addressing any unforeseen
    
    ... ... @@ -237,21 +241,49 @@ following goals:
    237 241
       for the viewer to digest.
    
    238 242
     - Ensure that we keep it simple and easy to contribute to the project.
    
    239 243
     
    
    240
    -We are currenlty using the following GitLab features:
    
    244
    +Explanation of how the project is currenlty using some GitLab features:
    
    241 245
     
    
    242 246
     - `Milestones`_: we have seen them used in the same way as `Epics`_ in other
    
    243
    -  projects. BuildGrid milestones must be time-line based, can overlap and we can
    
    244
    -  be working towards multiple milestones at any one time. They allow us to group
    
    245
    -  together all sub tasks into an overall aim. See our `BuildGrid milestones`_.
    
    246
    -- `Labels`_: allow us to filter tickets in useful ways. They do complexity and
    
    247
    -  effort as they grow in number and usage, though, so the general approach is
    
    248
    -  to have the minimum possible. See our `BuildGrid labels`_.
    
    247
    +  projects and are trying not to do that here. Instead we are going to 
    
    248
    +  use milestones to denote development cycles (ie, two week 'sprints'). See the
    
    249
    +  `BuildGrid milestones`_.
    
    250
    +- `Labels`_: allow us to filter tickets (ie, 'issues' in gitlab terminology)
    
    251
    +  in useful ways. They add complexity and effort as they grow in number, so the
    
    252
    +  general approach is to have the minimum possible but 
    
    253
    +  ensure we use them consistently. See the `BuildGrid labels`_. 
    
    249 254
     - `Boards`_: allow us to visualise and manage issues and labels in a simple way.
    
    250
    -  For now, we are only utilising one boards. Issues start life in the
    
    251
    -  ``Backlog`` column by default, and we move them into ``ToDo`` when they are
    
    252
    -  coming up in the next few weeks. ``Doing`` is only for when an item is
    
    253
    -  currently being worked on. Moving an issue from column to column automatically
    
    254
    -  adjust the tagged labels. See our `BuildGrid boards`_.
    
    255
    +  Issues start life in the ``Backlog`` column by default, and we move them into
    
    256
    +  ``ToDo`` when we aim to complete them in the current development cycle.
    
    257
    +  ``Doing`` is only for when an item is currently being worked on. When on the
    
    258
    +  Board view, dragging and dropping an issue from column to column automatically
    
    259
    +  adjusts the relevant labels. See the `BuildGrid boards`_.
    
    260
    +  
    
    261
    +  
    
    262
    +Guidelines for using GitLab features when working on this project: 
    
    263
    +  
    
    264
    +- When raising an issue, please:
    
    265
    +   
    
    266
    +  - check to see if there already is an issue to cover this task (if not then 
    
    267
    +    raise a new one)
    
    268
    +  - assign the appropriate label or labels (tip: the vast majority of issues 
    
    269
    +    raised will be either an enhancement or a bug)
    
    270
    +    
    
    271
    +- If you plan to work on an issue, please:
    
    272
    +
    
    273
    +  - self-assign the ticket
    
    274
    +  - ensure it's captured in the current sprint (ie, Gitlab milestone)
    
    275
    +  - ensure the ticket is in the ``ToDo`` column of the board if you aim to 
    
    276
    +    complete in the current sprint but aren't yet working on it, and
    
    277
    +    the ``Doing`` column if you are working on it currently.
    
    278
    +
    
    279
    +- Please note that Gitlab issues are for either 'tasks' or 'bugs' - ie not for 
    
    280
    +  long discussions (where the mailing list is a better choice) or for ranting, 
    
    281
    +  for example.
    
    282
    +  
    
    283
    +The above may seem like a lot to take in, but please don't worry about getting 
    
    284
    +it right the first few times. The worst that can happen is that you'll get a 
    
    285
    +friendly message from a current contributor who explains the process. We welcome
    
    286
    +and value all contributions to the project!  
    
    255 287
     
    
    256 288
     .. _Milestones: https://docs.gitlab.com/ee/user/project/milestones
    
    257 289
     .. _Epics: https://docs.gitlab.com/ee/user/group/epics
    

  • buildgrid/_app/bots/buildbox.py
    ... ... @@ -17,8 +17,6 @@ import os
    17 17
     import subprocess
    
    18 18
     import tempfile
    
    19 19
     
    
    20
    -from google.protobuf import any_pb2
    
    21
    -
    
    22 20
     from buildgrid.client.cas import download, upload
    
    23 21
     from buildgrid._exceptions import BotError
    
    24 22
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    ... ... @@ -29,13 +27,14 @@ from buildgrid.utils import read_file, write_file
    29 27
     def work_buildbox(context, lease):
    
    30 28
         """Executes a lease for a build action, using buildbox.
    
    31 29
         """
    
    32
    -
    
    33 30
         local_cas_directory = context.local_cas
    
    34 31
         # instance_name = context.parent
    
    35 32
         logger = context.logger
    
    36 33
     
    
    37 34
         action_digest = remote_execution_pb2.Digest()
    
    35
    +
    
    38 36
         lease.payload.Unpack(action_digest)
    
    37
    +    lease.result.Clear()
    
    39 38
     
    
    40 39
         with download(context.cas_channel) as downloader:
    
    41 40
             action = downloader.get_message(action_digest,
    
    ... ... @@ -131,10 +130,7 @@ def work_buildbox(context, lease):
    131 130
     
    
    132 131
                 action_result.output_directories.extend([output_directory])
    
    133 132
     
    
    134
    -            action_result_any = any_pb2.Any()
    
    135
    -            action_result_any.Pack(action_result)
    
    136
    -
    
    137
    -            lease.result.CopyFrom(action_result_any)
    
    133
    +            lease.result.Pack(action_result)
    
    138 134
     
    
    139 135
         return lease
    
    140 136
     
    

  • buildgrid/_app/bots/host.py
    ... ... @@ -17,8 +17,6 @@ import os
    17 17
     import subprocess
    
    18 18
     import tempfile
    
    19 19
     
    
    20
    -from google.protobuf import any_pb2
    
    21
    -
    
    22 20
     from buildgrid.client.cas import download, upload
    
    23 21
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    24 22
     from buildgrid.utils import output_file_maker, output_directory_maker
    
    ... ... @@ -27,12 +25,13 @@ from buildgrid.utils import output_file_maker, output_directory_maker
    27 25
     def work_host_tools(context, lease):
    
    28 26
         """Executes a lease for a build action, using host tools.
    
    29 27
         """
    
    30
    -
    
    31 28
         instance_name = context.parent
    
    32 29
         logger = context.logger
    
    33 30
     
    
    34 31
         action_digest = remote_execution_pb2.Digest()
    
    32
    +
    
    35 33
         lease.payload.Unpack(action_digest)
    
    34
    +    lease.result.Clear()
    
    36 35
     
    
    37 36
         with tempfile.TemporaryDirectory() as temp_directory:
    
    38 37
             with download(context.cas_channel, instance=instance_name) as downloader:
    
    ... ... @@ -122,9 +121,6 @@ def work_host_tools(context, lease):
    122 121
     
    
    123 122
                 action_result.output_directories.extend(output_directories)
    
    124 123
     
    
    125
    -        action_result_any = any_pb2.Any()
    
    126
    -        action_result_any.Pack(action_result)
    
    127
    -
    
    128
    -        lease.result.CopyFrom(action_result_any)
    
    124
    +        lease.result.Pack(action_result)
    
    129 125
     
    
    130 126
         return lease

  • buildgrid/server/bots/instance.py
    ... ... @@ -109,7 +109,7 @@ class BotsInterface:
    109 109
             if server_state == LeaseState.PENDING:
    
    110 110
     
    
    111 111
                 if client_state == LeaseState.ACTIVE:
    
    112
    -                self._scheduler.update_job_lease_state(client_lease.id, client_lease.state)
    
    112
    +                self._scheduler.update_job_lease_state(client_lease.id, client_state)
    
    113 113
                 elif client_state == LeaseState.COMPLETED:
    
    114 114
                     # TODO: Lease was rejected
    
    115 115
                     raise NotImplementedError("'Not Accepted' is unsupported")
    
    ... ... @@ -122,8 +122,7 @@ class BotsInterface:
    122 122
                     pass
    
    123 123
     
    
    124 124
                 elif client_state == LeaseState.COMPLETED:
    
    125
    -                self._scheduler.update_job_lease_state(client_lease.id, client_lease.state)
    
    126
    -                self._scheduler.job_complete(client_lease.id, client_lease.result, client_lease.status)
    
    125
    +                self._scheduler.update_job_lease_state(client_lease.id, client_state, lease_status=client_lease.status, lease_result=client_lease.result)
    
    127 126
                     return None
    
    128 127
     
    
    129 128
                 else:
    

  • buildgrid/server/execution/instance.py
    ... ... @@ -51,9 +51,9 @@ class ExecutionInstance:
    51 51
             job = Job(action_digest, action.do_not_cache, message_queue)
    
    52 52
             self.logger.info("Operation name: [{}]".format(job.name))
    
    53 53
     
    
    54
    -        self._scheduler.append_job(job, skip_cache_lookup)
    
    54
    +        self._scheduler.queue_job(job, skip_cache_lookup)
    
    55 55
     
    
    56
    -        return job.get_operation()
    
    56
    +        return job.operation
    
    57 57
     
    
    58 58
         def register_message_client(self, name, queue):
    
    59 59
             try:
    

  • buildgrid/server/job.py
    ... ... @@ -19,57 +19,33 @@ import logging
    19 19
     import uuid
    
    20 20
     from enum import Enum
    
    21 21
     
    
    22
    -from google.protobuf import any_pb2
    
    23
    -
    
    24 22
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    25 23
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    26 24
     from buildgrid._protos.google.longrunning import operations_pb2
    
    27 25
     
    
    28 26
     
    
    29
    -class ExecuteStage(Enum):
    
    27
    +class OperationStage(Enum):
    
    28
    +    # Initially unknown stage.
    
    30 29
         UNKNOWN = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('UNKNOWN')
    
    31
    -
    
    32 30
         # Checking the result against the cache.
    
    33 31
         CACHE_CHECK = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('CACHE_CHECK')
    
    34
    -
    
    35 32
         # Currently idle, awaiting a free machine to execute.
    
    36 33
         QUEUED = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('QUEUED')
    
    37
    -
    
    38 34
         # Currently being executed by a worker.
    
    39 35
         EXECUTING = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('EXECUTING')
    
    40
    -
    
    41 36
         # Finished execution.
    
    42 37
         COMPLETED = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('COMPLETED')
    
    43 38
     
    
    44 39
     
    
    45
    -class BotStatus(Enum):
    
    46
    -    BOT_STATUS_UNSPECIFIED = bots_pb2.BotStatus.Value('BOT_STATUS_UNSPECIFIED')
    
    47
    -
    
    48
    -    # The bot is healthy, and will accept leases as normal.
    
    49
    -    OK = bots_pb2.BotStatus.Value('OK')
    
    50
    -
    
    51
    -    # The bot is unhealthy and will not accept new leases.
    
    52
    -    UNHEALTHY = bots_pb2.BotStatus.Value('UNHEALTHY')
    
    53
    -
    
    54
    -    # The bot has been asked to reboot the host.
    
    55
    -    HOST_REBOOTING = bots_pb2.BotStatus.Value('HOST_REBOOTING')
    
    56
    -
    
    57
    -    # The bot has been asked to shut down.
    
    58
    -    BOT_TERMINATING = bots_pb2.BotStatus.Value('BOT_TERMINATING')
    
    59
    -
    
    60
    -
    
    61 40
     class LeaseState(Enum):
    
    41
    +    # Initially unknown state.
    
    62 42
         LEASE_STATE_UNSPECIFIED = bots_pb2.LeaseState.Value('LEASE_STATE_UNSPECIFIED')
    
    63
    -
    
    64 43
         # The server expects the bot to accept this lease.
    
    65 44
         PENDING = bots_pb2.LeaseState.Value('PENDING')
    
    66
    -
    
    67 45
         # The bot has accepted this lease.
    
    68 46
         ACTIVE = bots_pb2.LeaseState.Value('ACTIVE')
    
    69
    -
    
    70 47
         # The bot is no longer leased.
    
    71 48
         COMPLETED = bots_pb2.LeaseState.Value('COMPLETED')
    
    72
    -
    
    73 49
         # The bot should immediately release all resources associated with the lease.
    
    74 50
         CANCELLED = bots_pb2.LeaseState.Value('CANCELLED')
    
    75 51
     
    
    ... ... @@ -77,18 +53,22 @@ class LeaseState(Enum):
    77 53
     class Job:
    
    78 54
     
    
    79 55
         def __init__(self, action_digest, do_not_cache=False, message_queue=None):
    
    80
    -        self.lease = None
    
    81 56
             self.logger = logging.getLogger(__name__)
    
    82
    -        self.n_tries = 0
    
    83
    -        self.result = None
    
    84
    -        self.result_cached = False
    
    85 57
     
    
    86
    -        self._action_digest = action_digest
    
    87
    -        self._do_not_cache = do_not_cache
    
    88
    -        self._execute_stage = ExecuteStage.UNKNOWN
    
    89 58
             self._name = str(uuid.uuid4())
    
    90
    -        self._operation = operations_pb2.Operation(name=self._name)
    
    59
    +        self._do_not_cache = do_not_cache
    
    60
    +        self._operation = operations_pb2.Operation()
    
    61
    +        self._lease = None
    
    62
    +        self._n_tries = 0
    
    63
    +
    
    64
    +        self.__operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    65
    +
    
    66
    +        self.__operation_metadata.action_digest.CopyFrom(action_digest)
    
    67
    +        self.__operation_metadata.stage = OperationStage.UNKNOWN.value
    
    68
    +
    
    91 69
             self._operation_update_queues = []
    
    70
    +        self._operation.name = self._name
    
    71
    +        self._operation.done = False
    
    92 72
     
    
    93 73
             if message_queue is not None:
    
    94 74
                 self.register_client(message_queue)
    
    ... ... @@ -99,63 +79,75 @@ class Job:
    99 79
     
    
    100 80
         @property
    
    101 81
         def action_digest(self):
    
    102
    -        return self._action_digest
    
    82
    +        return self.__operation_metadata.action_digest
    
    103 83
     
    
    104 84
         @property
    
    105 85
         def do_not_cache(self):
    
    106 86
             return self._do_not_cache
    
    107 87
     
    
    108
    -    def check_job_finished(self):
    
    109
    -        if not self._operation_update_queues:
    
    110
    -            return self._operation.done
    
    111
    -        return False
    
    88
    +    @property
    
    89
    +    def operation(self):
    
    90
    +        return self._operation
    
    91
    +
    
    92
    +    @property
    
    93
    +    def operation_stage(self):
    
    94
    +        return OperationStage(self.__operation_metadata.state)
    
    95
    +
    
    96
    +    @property
    
    97
    +    def lease(self):
    
    98
    +        return self._lease
    
    99
    +
    
    100
    +    @property
    
    101
    +    def lease_state(self):
    
    102
    +        if self._lease is not None:
    
    103
    +            return LeaseState(self._lease.state)
    
    104
    +        else:
    
    105
    +            return None
    
    106
    +
    
    107
    +    @property
    
    108
    +    def n_tries(self):
    
    109
    +        return self._n_tries
    
    112 110
     
    
    113 111
         def register_client(self, queue):
    
    114 112
             self._operation_update_queues.append(queue)
    
    115
    -        queue.put(self.get_operation())
    
    113
    +        queue.put(self._operation)
    
    116 114
     
    
    117 115
         def unregister_client(self, queue):
    
    118 116
             self._operation_update_queues.remove(queue)
    
    119 117
     
    
    120
    -    def get_operation(self):
    
    121
    -        self._operation.metadata.CopyFrom(self._pack_any(self.get_operation_meta()))
    
    122
    -        if self.result is not None:
    
    123
    -            self._operation.done = True
    
    124
    -            response = remote_execution_pb2.ExecuteResponse(result=self.result,
    
    125
    -                                                            cached_result=self.result_cached)
    
    118
    +    def create_lease(self):
    
    119
    +        self._lease = bots_pb2.Lease()
    
    120
    +        self._lease.id = self._name
    
    121
    +        self._lease.payload.Pack(self.__operation_metadata.action_digest)
    
    122
    +        self._lease.state = LeaseState.PENDING.value
    
    126 123
     
    
    127
    -            if not self.result_cached:
    
    128
    -                response.status.CopyFrom(self.lease.status)
    
    124
    +        return self._lease
    
    129 125
     
    
    130
    -            self._operation.response.CopyFrom(self._pack_any(response))
    
    126
    +    def update_lease_state(self, state, status=None, result=None):
    
    127
    +        if state.value == self._lease.state:
    
    128
    +            return
    
    131 129
     
    
    132
    -        return self._operation
    
    130
    +        self._lease.state = state.value
    
    133 131
     
    
    134
    -    def get_operation_meta(self):
    
    135
    -        meta = remote_execution_pb2.ExecuteOperationMetadata()
    
    136
    -        meta.stage = self._execute_stage.value
    
    137
    -        meta.action_digest.CopyFrom(self._action_digest)
    
    132
    +        if self._lease.state == LeaseState.COMPLETED.value:
    
    133
    +            response = remote_execution_pb2.ExecuteResponse()
    
    134
    +            response.result.CopyFrom(result)
    
    135
    +            response.cached_result = False
    
    136
    +            response.status.CopyFrom(status)
    
    138 137
     
    
    139
    -        return meta
    
    138
    +            self._operation.response.Pack(response)
    
    139
    +            self._operation.done = True
    
    140 140
     
    
    141
    -    def create_lease(self):
    
    142
    -        action_digest = self._pack_any(self._action_digest)
    
    141
    +    def update_operation_stage(self, stage):
    
    142
    +        if stage.value == self.__operation_metadata.stage:
    
    143
    +            return
    
    143 144
     
    
    144
    -        lease = bots_pb2.Lease(id=self.name,
    
    145
    -                               payload=action_digest,
    
    146
    -                               state=LeaseState.PENDING.value)
    
    147
    -        self.lease = lease
    
    148
    -        return lease
    
    145
    +        self.__operation_metadata.stage = stage.value
    
    149 146
     
    
    150
    -    def get_operations(self):
    
    151
    -        return operations_pb2.ListOperationsResponse(operations=[self.get_operation()])
    
    147
    +        if self.__operation_metadata.stage == OperationStage.QUEUED.value:
    
    148
    +            self._n_tries += 1
    
    152 149
     
    
    153
    -    def update_execute_stage(self, stage):
    
    154
    -        self._execute_stage = stage
    
    155
    -        for queue in self._operation_update_queues:
    
    156
    -            queue.put(self.get_operation())
    
    150
    +        self._operation.metadata.Pack(self.__operation_metadata)
    
    157 151
     
    
    158
    -    def _pack_any(self, pack):
    
    159
    -        some_any = any_pb2.Any()
    
    160
    -        some_any.Pack(pack)
    
    161
    -        return some_any
    152
    +        for queue in self._operation_update_queues:
    
    153
    +            queue.put(self._operation)

  • buildgrid/server/operations/instance.py
    ... ... @@ -22,6 +22,7 @@ An instance of the LongRunningOperations Service.
    22 22
     import logging
    
    23 23
     
    
    24 24
     from buildgrid._exceptions import InvalidArgumentError
    
    25
    +from buildgrid._protos.google.longrunning import operations_pb2
    
    25 26
     
    
    26 27
     
    
    27 28
     class OperationsInstance:
    
    ... ... @@ -45,7 +46,10 @@ class OperationsInstance:
    45 46
         def list_operations(self, list_filter, page_size, page_token):
    
    46 47
             # TODO: Pages
    
    47 48
             # Spec says number of pages and length of a page are optional
    
    48
    -        return self._scheduler.get_operations()
    
    49
    +        response = operations_pb2.ListOperationsResponse()
    
    50
    +        response.operations.extend(self._scheduler.list_operations())
    
    51
    +
    
    52
    +        return response
    
    49 53
     
    
    50 54
         def delete_operation(self, name):
    
    51 55
             try:
    

  • buildgrid/server/scheduler.py
    ... ... @@ -25,9 +25,8 @@ from collections import deque
    25 25
     
    
    26 26
     from buildgrid._exceptions import NotFoundError
    
    27 27
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    28
    -from buildgrid._protos.google.longrunning import operations_pb2
    
    29 28
     
    
    30
    -from .job import ExecuteStage, LeaseState
    
    29
    +from .job import OperationStage, LeaseState
    
    31 30
     
    
    32 31
     
    
    33 32
     class Scheduler:
    
    ... ... @@ -39,80 +38,70 @@ class Scheduler:
    39 38
             self.jobs = {}
    
    40 39
             self.queue = deque()
    
    41 40
     
    
    42
    -    def register_client(self, name, queue):
    
    43
    -        self.jobs[name].register_client(queue)
    
    41
    +    def register_client(self, job_name, queue):
    
    42
    +        self.jobs[job_name].register_client(queue)
    
    44 43
     
    
    45
    -    def unregister_client(self, name, queue):
    
    46
    -        job = self.jobs[name]
    
    47
    -        job.unregister_client(queue)
    
    48
    -        if job.check_job_finished():
    
    49
    -            del self.jobs[name]
    
    44
    +    def unregister_client(self, job_name, queue):
    
    45
    +        self.jobs[job_name].unregister_client(queue)
    
    50 46
     
    
    51
    -    def append_job(self, job, skip_cache_lookup=False):
    
    47
    +        if self.jobs[job_name].operation.done:
    
    48
    +            del self.jobs[job_name]
    
    49
    +
    
    50
    +    def queue_job(self, job, skip_cache_lookup=False):
    
    52 51
             self.jobs[job.name] = job
    
    52
    +
    
    53 53
             if self._action_cache is not None and not skip_cache_lookup:
    
    54
    +            job.update_operation_stage(OperationStage.CACHE_CHECK)
    
    55
    +
    
    54 56
                 try:
    
    55 57
                     cached_result = self._action_cache.get_action_result(job.action_digest)
    
    56 58
                 except NotFoundError:
    
    57 59
                     self.queue.append(job)
    
    58
    -                job.update_execute_stage(ExecuteStage.QUEUED)
    
    59
    -
    
    60
    +                job.update_operation_stage(OperationStage.QUEUED)
    
    60 61
                 else:
    
    61
    -                job.result = cached_result
    
    62
    -                job.result_cached = True
    
    63
    -                job.update_execute_stage(ExecuteStage.COMPLETED)
    
    62
    +                job.update_operation_stage(OperationStage.COMPLETED)
    
    64 63
     
    
    65 64
             else:
    
    66 65
                 self.queue.append(job)
    
    67
    -            job.update_execute_stage(ExecuteStage.QUEUED)
    
    66
    +            job.update_operation_stage(OperationStage.QUEUED)
    
    68 67
     
    
    69
    -    def retry_job(self, name):
    
    70
    -        if name in self.jobs:
    
    71
    -            job = self.jobs[name]
    
    68
    +    def retry_job(self, job_name):
    
    69
    +        if job_name in self.jobs:
    
    70
    +            job = self.jobs[job_name]
    
    72 71
                 if job.n_tries >= self.MAX_N_TRIES:
    
    73 72
                     # TODO: Decide what to do with these jobs
    
    74
    -                job.update_execute_stage(ExecuteStage.COMPLETED)
    
    73
    +                job.update_operation_stage(OperationStage.COMPLETED)
    
    75 74
                     # TODO: Mark these jobs as done
    
    76 75
                 else:
    
    77
    -                job.update_execute_stage(ExecuteStage.QUEUED)
    
    78
    -                job.n_tries += 1
    
    76
    +                job.update_operation_stage(OperationStage.QUEUED)
    
    79 77
                     self.queue.appendleft(job)
    
    80 78
     
    
    81
    -    def job_complete(self, name, result, status):
    
    82
    -        job = self.jobs[name]
    
    83
    -        job.lease.status.CopyFrom(status)
    
    84
    -        action_result = remote_execution_pb2.ActionResult()
    
    85
    -        result.Unpack(action_result)
    
    86
    -        job.result = action_result
    
    87
    -        if not job.do_not_cache and self._action_cache is not None:
    
    88
    -            if not job.lease.status.code:
    
    89
    -                self._action_cache.update_action_result(job.action_digest, action_result)
    
    90
    -        job.update_execute_stage(ExecuteStage.COMPLETED)
    
    91
    -
    
    92
    -    def get_operations(self):
    
    93
    -        response = operations_pb2.ListOperationsResponse()
    
    94
    -        for v in self.jobs.values():
    
    95
    -            response.operations.extend([v.get_operation()])
    
    96
    -        return response
    
    97
    -
    
    98
    -    def update_job_lease_state(self, name, state):
    
    99
    -        job = self.jobs[name]
    
    100
    -        job.lease.state = state
    
    79
    +    def list_operations(self):
    
    80
    +        return [job.operation for job in self.jobs.values()]
    
    81
    +
    
    82
    +    def update_job_lease_state(self, job_name, lease_state, lease_status=None, lease_result=None):
    
    83
    +        job = self.jobs[job_name]
    
    84
    +        if lease_state != LeaseState.COMPLETED:
    
    85
    +            job.update_lease_state(lease_state)
    
    86
    +        else:
    
    87
    +            action_result = remote_execution_pb2.ActionResult()
    
    88
    +            lease_result.Unpack(action_result)
    
    89
    +
    
    90
    +            job.update_lease_state(lease_state, status=lease_status, result=action_result)
    
    91
    +
    
    92
    +            if not job.do_not_cache and self._action_cache is not None:
    
    93
    +                if not job.lease.status.code:
    
    94
    +                    self._action_cache.update_action_result(job.action_digest, action_result)
    
    95
    +
    
    96
    +            job.update_operation_stage(OperationStage.COMPLETED)
    
    101 97
     
    
    102 98
         def get_job_lease(self, name):
    
    103 99
             return self.jobs[name].lease
    
    104 100
     
    
    105
    -    def cancel_session(self, name):
    
    106
    -        job = self.jobs[name]
    
    107
    -        state = job.lease.state
    
    108
    -        if state in (LeaseState.PENDING.value, LeaseState.ACTIVE.value):
    
    109
    -            self.retry_job(name)
    
    110
    -
    
    111 101
         def create_lease(self):
    
    112 102
             if self.queue:
    
    113 103
                 job = self.queue.popleft()
    
    114
    -            job.update_execute_stage(ExecuteStage.EXECUTING)
    
    104
    +            job.update_operation_stage(OperationStage.EXECUTING)
    
    115 105
                 job.create_lease()
    
    116
    -            job.lease.state = LeaseState.PENDING.value
    
    117 106
                 return job.lease
    
    118 107
             return None

  • tests/integration/bots_service.py
    ... ... @@ -283,4 +283,4 @@ def _inject_work(scheduler, action_digest=None):
    283 283
         if not action_digest:
    
    284 284
             action_digest = remote_execution_pb2.Digest()
    
    285 285
         j = job.Job(action_digest, False)
    
    286
    -    scheduler.append_job(j, True)
    286
    +    scheduler.queue_job(j, True)

  • tests/integration/execution_service.py
    ... ... @@ -82,7 +82,7 @@ def test_execute(skip_cache_lookup, instance, context):
    82 82
         assert isinstance(result, operations_pb2.Operation)
    
    83 83
         metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    84 84
         result.metadata.Unpack(metadata)
    
    85
    -    assert metadata.stage == job.ExecuteStage.QUEUED.value
    
    85
    +    assert metadata.stage == job.OperationStage.QUEUED.value
    
    86 86
         assert uuid.UUID(result.name, version=4)
    
    87 87
         assert result.done is False
    
    88 88
     
    
    ... ... @@ -116,7 +116,7 @@ def test_wait_execution(instance, controller, context):
    116 116
         action_result = remote_execution_pb2.ActionResult()
    
    117 117
         action_result_any.Pack(action_result)
    
    118 118
     
    
    119
    -    j.update_execute_stage(job.ExecuteStage.COMPLETED)
    
    119
    +    j.update_operation_stage(job.OperationStage.COMPLETED)
    
    120 120
     
    
    121 121
         response = instance.WaitExecution(request, context)
    
    122 122
     
    
    ... ... @@ -125,7 +125,7 @@ def test_wait_execution(instance, controller, context):
    125 125
         assert isinstance(result, operations_pb2.Operation)
    
    126 126
         metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    127 127
         result.metadata.Unpack(metadata)
    
    128
    -    assert metadata.stage == job.ExecuteStage.COMPLETED.value
    
    128
    +    assert metadata.stage == job.OperationStage.COMPLETED.value
    
    129 129
         assert uuid.UUID(result.name, version=4)
    
    130 130
         assert result.done is True
    
    131 131
     
    

  • tests/integration/operations_service.py
    ... ... @@ -30,6 +30,7 @@ from buildgrid._protos.google.longrunning import operations_pb2
    30 30
     from buildgrid._protos.google.rpc import status_pb2
    
    31 31
     from buildgrid.server.cas.storage import lru_memory_cache
    
    32 32
     from buildgrid.server.controller import ExecutionController
    
    33
    +from buildgrid.server.job import OperationStage
    
    33 34
     from buildgrid.server.operations import service
    
    34 35
     from buildgrid.server.operations.service import OperationsService
    
    35 36
     from buildgrid.utils import create_digest
    
    ... ... @@ -131,9 +132,10 @@ def test_list_operations_with_result(instance, controller, execute_request, cont
    131 132
         action_result.output_files.extend([output_file])
    
    132 133
     
    
    133 134
         controller.operations_instance._scheduler.jobs[response_execute.name].create_lease()
    
    134
    -    controller.operations_instance._scheduler.job_complete(response_execute.name,
    
    135
    -                                                           _pack_any(action_result),
    
    136
    -                                                           status_pb2.Status())
    
    135
    +    controller.operations_instance._scheduler.update_job_lease_state(response_execute.name,
    
    136
    +                                                                     OperationStage.COMPLETED,
    
    137
    +                                                                     lease_status=status_pb2.Status(),
    
    138
    +                                                                     lease_result=_pack_any(action_result))
    
    137 139
     
    
    138 140
         request = operations_pb2.ListOperationsRequest(name=instance_name)
    
    139 141
         response = instance.ListOperations(request, context)
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]