[Notes] [Git][BuildGrid/buildgrid][mablanch/74-operation-cancelation] 14 commits: Update CONTRIBUTING.rst



Title: GitLab

Martin Blanchard pushed to branch mablanch/74-operation-cancelation at BuildGrid / buildgrid

Commits:

14 changed files:

Changes:

  • CONTRIBUTING.rst
    ... ... @@ -17,9 +17,13 @@ discuss with us before submitting anything, as we may well have some important
    17 17
     context which will could help guide your efforts.
    
    18 18
     
    
    19 19
     Any major feature additions should be raised first as a proposal on the
    
    20
    -`BuildGrid mailing list`_ to be discussed, and then eventually followed up with
    
    21
    -an issue on GitLab. We recommend that you propose the feature in advance of
    
    22
    -commencing work.
    
    20
    +BuildGrid mailing list to be discussed between the core contributors. Once 
    
    21
    +this discussion has taken place and there is agreement on how to proceed, 
    
    22
    +it should be followed by with a Gitlab issue being raised which summarizes 
    
    23
    +the tasks required.
    
    24
    +
    
    25
    +We strongly recommend that you propose the feature in advance of
    
    26
    +commencing any work.
    
    23 27
     
    
    24 28
     The author of any patch is expected to take ownership of that code and is to
    
    25 29
     support it for a reasonable time-frame. This means addressing any unforeseen
    
    ... ... @@ -237,21 +241,49 @@ following goals:
    237 241
       for the viewer to digest.
    
    238 242
     - Ensure that we keep it simple and easy to contribute to the project.
    
    239 243
     
    
    240
    -We are currenlty using the following GitLab features:
    
    244
    +Explanation of how the project is currenlty using some GitLab features:
    
    241 245
     
    
    242 246
     - `Milestones`_: we have seen them used in the same way as `Epics`_ in other
    
    243
    -  projects. BuildGrid milestones must be time-line based, can overlap and we can
    
    244
    -  be working towards multiple milestones at any one time. They allow us to group
    
    245
    -  together all sub tasks into an overall aim. See our `BuildGrid milestones`_.
    
    246
    -- `Labels`_: allow us to filter tickets in useful ways. They do complexity and
    
    247
    -  effort as they grow in number and usage, though, so the general approach is
    
    248
    -  to have the minimum possible. See our `BuildGrid labels`_.
    
    247
    +  projects and are trying not to do that here. Instead we are going to 
    
    248
    +  use milestones to denote development cycles (ie, two week 'sprints'). See the
    
    249
    +  `BuildGrid milestones`_.
    
    250
    +- `Labels`_: allow us to filter tickets (ie, 'issues' in gitlab terminology)
    
    251
    +  in useful ways. They add complexity and effort as they grow in number, so the
    
    252
    +  general approach is to have the minimum possible but 
    
    253
    +  ensure we use them consistently. See the `BuildGrid labels`_. 
    
    249 254
     - `Boards`_: allow us to visualise and manage issues and labels in a simple way.
    
    250
    -  For now, we are only utilising one boards. Issues start life in the
    
    251
    -  ``Backlog`` column by default, and we move them into ``ToDo`` when they are
    
    252
    -  coming up in the next few weeks. ``Doing`` is only for when an item is
    
    253
    -  currently being worked on. Moving an issue from column to column automatically
    
    254
    -  adjust the tagged labels. See our `BuildGrid boards`_.
    
    255
    +  Issues start life in the ``Backlog`` column by default, and we move them into
    
    256
    +  ``ToDo`` when we aim to complete them in the current development cycle.
    
    257
    +  ``Doing`` is only for when an item is currently being worked on. When on the
    
    258
    +  Board view, dragging and dropping an issue from column to column automatically
    
    259
    +  adjusts the relevant labels. See the `BuildGrid boards`_.
    
    260
    +  
    
    261
    +  
    
    262
    +Guidelines for using GitLab features when working on this project: 
    
    263
    +  
    
    264
    +- When raising an issue, please:
    
    265
    +   
    
    266
    +  - check to see if there already is an issue to cover this task (if not then 
    
    267
    +    raise a new one)
    
    268
    +  - assign the appropriate label or labels (tip: the vast majority of issues 
    
    269
    +    raised will be either an enhancement or a bug)
    
    270
    +    
    
    271
    +- If you plan to work on an issue, please:
    
    272
    +
    
    273
    +  - self-assign the ticket
    
    274
    +  - ensure it's captured in the current sprint (ie, Gitlab milestone)
    
    275
    +  - ensure the ticket is in the ``ToDo`` column of the board if you aim to 
    
    276
    +    complete in the current sprint but aren't yet working on it, and
    
    277
    +    the ``Doing`` column if you are working on it currently.
    
    278
    +
    
    279
    +- Please note that Gitlab issues are for either 'tasks' or 'bugs' - ie not for 
    
    280
    +  long discussions (where the mailing list is a better choice) or for ranting, 
    
    281
    +  for example.
    
    282
    +  
    
    283
    +The above may seem like a lot to take in, but please don't worry about getting 
    
    284
    +it right the first few times. The worst that can happen is that you'll get a 
    
    285
    +friendly message from a current contributor who explains the process. We welcome
    
    286
    +and value all contributions to the project!  
    
    255 287
     
    
    256 288
     .. _Milestones: https://docs.gitlab.com/ee/user/project/milestones
    
    257 289
     .. _Epics: https://docs.gitlab.com/ee/user/group/epics
    

  • buildgrid/_app/bots/buildbox.py
    ... ... @@ -17,8 +17,6 @@ import os
    17 17
     import subprocess
    
    18 18
     import tempfile
    
    19 19
     
    
    20
    -from google.protobuf import any_pb2
    
    21
    -
    
    22 20
     from buildgrid.client.cas import download, upload
    
    23 21
     from buildgrid._exceptions import BotError
    
    24 22
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    ... ... @@ -29,13 +27,14 @@ from buildgrid.utils import read_file, write_file
    29 27
     def work_buildbox(context, lease):
    
    30 28
         """Executes a lease for a build action, using buildbox.
    
    31 29
         """
    
    32
    -
    
    33 30
         local_cas_directory = context.local_cas
    
    34 31
         # instance_name = context.parent
    
    35 32
         logger = context.logger
    
    36 33
     
    
    37 34
         action_digest = remote_execution_pb2.Digest()
    
    35
    +
    
    38 36
         lease.payload.Unpack(action_digest)
    
    37
    +    lease.result.Clear()
    
    39 38
     
    
    40 39
         with download(context.cas_channel) as downloader:
    
    41 40
             action = downloader.get_message(action_digest,
    
    ... ... @@ -131,10 +130,7 @@ def work_buildbox(context, lease):
    131 130
     
    
    132 131
                 action_result.output_directories.extend([output_directory])
    
    133 132
     
    
    134
    -            action_result_any = any_pb2.Any()
    
    135
    -            action_result_any.Pack(action_result)
    
    136
    -
    
    137
    -            lease.result.CopyFrom(action_result_any)
    
    133
    +            lease.result.Pack(action_result)
    
    138 134
     
    
    139 135
         return lease
    
    140 136
     
    

  • buildgrid/_app/bots/dummy.py
    ... ... @@ -16,9 +16,18 @@
    16 16
     import random
    
    17 17
     import time
    
    18 18
     
    
    19
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    20
    +
    
    19 21
     
    
    20 22
     def work_dummy(context, lease):
    
    21 23
         """ Just returns lease after some random time
    
    22 24
         """
    
    25
    +    lease.result.Clear()
    
    26
    +
    
    23 27
         time.sleep(random.randint(1, 5))
    
    28
    +
    
    29
    +    action_result = remote_execution_pb2.ActionResult()
    
    30
    +
    
    31
    +    lease.result.Pack(action_result)
    
    32
    +
    
    24 33
         return lease

  • buildgrid/_app/bots/host.py
    ... ... @@ -17,8 +17,6 @@ import os
    17 17
     import subprocess
    
    18 18
     import tempfile
    
    19 19
     
    
    20
    -from google.protobuf import any_pb2
    
    21
    -
    
    22 20
     from buildgrid.client.cas import download, upload
    
    23 21
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    24 22
     from buildgrid.utils import output_file_maker, output_directory_maker
    
    ... ... @@ -27,12 +25,13 @@ from buildgrid.utils import output_file_maker, output_directory_maker
    27 25
     def work_host_tools(context, lease):
    
    28 26
         """Executes a lease for a build action, using host tools.
    
    29 27
         """
    
    30
    -
    
    31 28
         instance_name = context.parent
    
    32 29
         logger = context.logger
    
    33 30
     
    
    34 31
         action_digest = remote_execution_pb2.Digest()
    
    32
    +
    
    35 33
         lease.payload.Unpack(action_digest)
    
    34
    +    lease.result.Clear()
    
    36 35
     
    
    37 36
         with tempfile.TemporaryDirectory() as temp_directory:
    
    38 37
             with download(context.cas_channel, instance=instance_name) as downloader:
    
    ... ... @@ -122,9 +121,6 @@ def work_host_tools(context, lease):
    122 121
     
    
    123 122
                 action_result.output_directories.extend(output_directories)
    
    124 123
     
    
    125
    -        action_result_any = any_pb2.Any()
    
    126
    -        action_result_any.Pack(action_result)
    
    127
    -
    
    128
    -        lease.result.CopyFrom(action_result_any)
    
    124
    +        lease.result.Pack(action_result)
    
    129 125
     
    
    130 126
         return lease

  • buildgrid/_exceptions.py
    ... ... @@ -52,6 +52,11 @@ class BotError(BgdError):
    52 52
             super().__init__(message, detail=detail, domain=ErrorDomain.BOT, reason=reason)
    
    53 53
     
    
    54 54
     
    
    55
    +class CancelledError(BgdError):
    
    56
    +    def __init__(self, message, detail=None, reason=None):
    
    57
    +        super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
    
    58
    +
    
    59
    +
    
    55 60
     class InvalidArgumentError(BgdError):
    
    56 61
         """A bad argument was passed, such as a name which doesn't exist."""
    
    57 62
         def __init__(self, message, detail=None, reason=None):
    

  • buildgrid/server/bots/instance.py
    ... ... @@ -66,10 +66,10 @@ class BotsInterface:
    66 66
             self._bot_sessions[name] = bot_session
    
    67 67
             self.logger.info("Created bot session name=[{}] with bot_id=[{}]".format(name, bot_id))
    
    68 68
     
    
    69
    -        # For now, one lease at a time.
    
    70
    -        lease = self._scheduler.create_lease()
    
    71
    -        if lease:
    
    72
    -            bot_session.leases.extend([lease])
    
    69
    +        # TODO: Send worker capabilities to the scheduler!
    
    70
    +        leases = self._scheduler.request_job_leases({})
    
    71
    +        if leases:
    
    72
    +            bot_session.leases.extend(leases)
    
    73 73
     
    
    74 74
             return bot_session
    
    75 75
     
    
    ... ... @@ -85,11 +85,11 @@ class BotsInterface:
    85 85
             del bot_session.leases[:]
    
    86 86
             bot_session.leases.extend(leases)
    
    87 87
     
    
    88
    -        # For now, one lease at a time
    
    88
    +        # TODO: Send worker capabilities to the scheduler!
    
    89 89
             if not bot_session.leases:
    
    90
    -            lease = self._scheduler.create_lease()
    
    91
    -            if lease:
    
    92
    -                bot_session.leases.extend([lease])
    
    90
    +            leases = self._scheduler.request_job_leases({})
    
    91
    +            if leases:
    
    92
    +                bot_session.leases.extend(leases)
    
    93 93
     
    
    94 94
             self._bot_sessions[name] = bot_session
    
    95 95
             return bot_session
    
    ... ... @@ -109,7 +109,8 @@ class BotsInterface:
    109 109
             if server_state == LeaseState.PENDING:
    
    110 110
     
    
    111 111
                 if client_state == LeaseState.ACTIVE:
    
    112
    -                self._scheduler.update_job_lease_state(client_lease.id, client_lease.state)
    
    112
    +                self._scheduler.update_job_lease_state(client_lease.id,
    
    113
    +                                                       LeaseState.ACTIVE)
    
    113 114
                 elif client_state == LeaseState.COMPLETED:
    
    114 115
                     # TODO: Lease was rejected
    
    115 116
                     raise NotImplementedError("'Not Accepted' is unsupported")
    
    ... ... @@ -122,8 +123,10 @@ class BotsInterface:
    122 123
                     pass
    
    123 124
     
    
    124 125
                 elif client_state == LeaseState.COMPLETED:
    
    125
    -                self._scheduler.update_job_lease_state(client_lease.id, client_lease.state)
    
    126
    -                self._scheduler.job_complete(client_lease.id, client_lease.result, client_lease.status)
    
    126
    +                self._scheduler.update_job_lease_state(client_lease.id,
    
    127
    +                                                       LeaseState.COMPLETED,
    
    128
    +                                                       lease_status=client_lease.status,
    
    129
    +                                                       lease_result=client_lease.result)
    
    127 130
                     return None
    
    128 131
     
    
    129 132
                 else:
    

  • buildgrid/server/execution/instance.py
    ... ... @@ -21,8 +21,9 @@ An instance of the Remote Execution Service.
    21 21
     
    
    22 22
     import logging
    
    23 23
     
    
    24
    -from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
    
    25
    -from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
    
    24
    +from buildgrid._exceptions import CancelledError, FailedPreconditionError, InvalidArgumentError
    
    25
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    26
    +from buildgrid._protos.google.rpc import code_pb2
    
    26 27
     
    
    27 28
     from ..job import Job
    
    28 29
     
    
    ... ... @@ -43,17 +44,20 @@ class ExecutionInstance:
    43 44
             this action.
    
    44 45
             """
    
    45 46
     
    
    46
    -        action = self._storage.get_message(action_digest, Action)
    
    47
    +        action = self._storage.get_message(action_digest, remote_execution_pb2.Action)
    
    47 48
     
    
    48 49
             if not action:
    
    49 50
                 raise FailedPreconditionError("Could not get action from storage.")
    
    50 51
     
    
    51
    -        job = Job(action_digest, action.do_not_cache, message_queue)
    
    52
    +        job = Job(action, action_digest)
    
    53
    +        if message_queue is not None:
    
    54
    +            job.register_client(message_queue)
    
    55
    +
    
    52 56
             self.logger.info("Operation name: [{}]".format(job.name))
    
    53 57
     
    
    54
    -        self._scheduler.append_job(job, skip_cache_lookup)
    
    58
    +        self._scheduler.queue_job(job, skip_cache_lookup)
    
    55 59
     
    
    56
    -        return job.get_operation()
    
    60
    +        return job.operation
    
    57 61
     
    
    58 62
         def register_message_client(self, name, queue):
    
    59 63
             try:
    
    ... ... @@ -74,4 +78,11 @@ class ExecutionInstance:
    74 78
             while not operation.done:
    
    75 79
                 yield operation
    
    76 80
                 operation = message_queue.get()
    
    81
    +
    
    82
    +        response = remote_execution_pb2.ExecuteResponse()
    
    83
    +        operation.response.Unpack(response)
    
    84
    +
    
    85
    +        if response.status.code == code_pb2.CANCELLED:
    
    86
    +            raise CancelledError(response.status.message)
    
    87
    +
    
    77 88
             yield operation

  • buildgrid/server/execution/service.py
    ... ... @@ -26,7 +26,7 @@ from functools import partial
    26 26
     
    
    27 27
     import grpc
    
    28 28
     
    
    29
    -from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
    
    29
    +from buildgrid._exceptions import CancelledError, FailedPreconditionError, InvalidArgumentError
    
    30 30
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
    
    31 31
     from buildgrid._protos.google.longrunning import operations_pb2
    
    32 32
     
    
    ... ... @@ -67,6 +67,12 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    67 67
                 context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    68 68
                 yield operations_pb2.Operation()
    
    69 69
     
    
    70
    +        except CancelledError as e:
    
    71
    +            self.logger.error(e)
    
    72
    +            context.set_details(str(e))
    
    73
    +            context.set_code(grpc.StatusCode.CANCELLED)
    
    74
    +            yield operations_pb2.Operation()
    
    75
    +
    
    70 76
         def WaitExecution(self, request, context):
    
    71 77
             try:
    
    72 78
                 names = request.name.split("/")
    

  • buildgrid/server/job.py
    ... ... @@ -11,151 +11,231 @@
    11 11
     # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12 12
     # See the License for the specific language governing permissions and
    
    13 13
     # limitations under the License.
    
    14
    -#
    
    15
    -# Authors:
    
    16
    -#        Finn Ball <finn ball codethink co uk>
    
    14
    +
    
    17 15
     
    
    18 16
     import logging
    
    19 17
     import uuid
    
    20 18
     from enum import Enum
    
    21 19
     
    
    22
    -from google.protobuf import any_pb2
    
    23
    -
    
    24 20
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    25 21
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    26 22
     from buildgrid._protos.google.longrunning import operations_pb2
    
    23
    +from buildgrid._protos.google.rpc import code_pb2
    
    27 24
     
    
    28 25
     
    
    29
    -class ExecuteStage(Enum):
    
    26
    +class OperationStage(Enum):
    
    27
    +    # Initially unknown stage.
    
    30 28
         UNKNOWN = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('UNKNOWN')
    
    31
    -
    
    32 29
         # Checking the result against the cache.
    
    33 30
         CACHE_CHECK = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('CACHE_CHECK')
    
    34
    -
    
    35 31
         # Currently idle, awaiting a free machine to execute.
    
    36 32
         QUEUED = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('QUEUED')
    
    37
    -
    
    38 33
         # Currently being executed by a worker.
    
    39 34
         EXECUTING = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('EXECUTING')
    
    40
    -
    
    41 35
         # Finished execution.
    
    42 36
         COMPLETED = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('COMPLETED')
    
    43 37
     
    
    44 38
     
    
    45
    -class BotStatus(Enum):
    
    46
    -    BOT_STATUS_UNSPECIFIED = bots_pb2.BotStatus.Value('BOT_STATUS_UNSPECIFIED')
    
    47
    -
    
    48
    -    # The bot is healthy, and will accept leases as normal.
    
    49
    -    OK = bots_pb2.BotStatus.Value('OK')
    
    50
    -
    
    51
    -    # The bot is unhealthy and will not accept new leases.
    
    52
    -    UNHEALTHY = bots_pb2.BotStatus.Value('UNHEALTHY')
    
    53
    -
    
    54
    -    # The bot has been asked to reboot the host.
    
    55
    -    HOST_REBOOTING = bots_pb2.BotStatus.Value('HOST_REBOOTING')
    
    56
    -
    
    57
    -    # The bot has been asked to shut down.
    
    58
    -    BOT_TERMINATING = bots_pb2.BotStatus.Value('BOT_TERMINATING')
    
    59
    -
    
    60
    -
    
    61 39
     class LeaseState(Enum):
    
    40
    +    # Initially unknown state.
    
    62 41
         LEASE_STATE_UNSPECIFIED = bots_pb2.LeaseState.Value('LEASE_STATE_UNSPECIFIED')
    
    63
    -
    
    64 42
         # The server expects the bot to accept this lease.
    
    65 43
         PENDING = bots_pb2.LeaseState.Value('PENDING')
    
    66
    -
    
    67 44
         # The bot has accepted this lease.
    
    68 45
         ACTIVE = bots_pb2.LeaseState.Value('ACTIVE')
    
    69
    -
    
    70 46
         # The bot is no longer leased.
    
    71 47
         COMPLETED = bots_pb2.LeaseState.Value('COMPLETED')
    
    72
    -
    
    73 48
         # The bot should immediately release all resources associated with the lease.
    
    74 49
         CANCELLED = bots_pb2.LeaseState.Value('CANCELLED')
    
    75 50
     
    
    76 51
     
    
    77 52
     class Job:
    
    78 53
     
    
    79
    -    def __init__(self, action_digest, do_not_cache=False, message_queue=None):
    
    80
    -        self.lease = None
    
    54
    +    def __init__(self, action, action_digest):
    
    81 55
             self.logger = logging.getLogger(__name__)
    
    82
    -        self.n_tries = 0
    
    83
    -        self.result = None
    
    84
    -        self.result_cached = False
    
    85 56
     
    
    86
    -        self._action_digest = action_digest
    
    87
    -        self._do_not_cache = do_not_cache
    
    88
    -        self._execute_stage = ExecuteStage.UNKNOWN
    
    89 57
             self._name = str(uuid.uuid4())
    
    90
    -        self._operation = operations_pb2.Operation(name=self._name)
    
    91
    -        self._operation_update_queues = []
    
    58
    +        self._action = remote_execution_pb2.Action()
    
    59
    +        self._operation = operations_pb2.Operation()
    
    60
    +        self._lease = None
    
    61
    +
    
    62
    +        self.__execute_response = None
    
    63
    +        self.__operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    64
    +        self.__operation_canceled = False
    
    65
    +
    
    66
    +        self.__operation_metadata.action_digest.CopyFrom(action_digest)
    
    67
    +        self.__operation_metadata.stage = OperationStage.UNKNOWN.value
    
    92 68
     
    
    93
    -        if message_queue is not None:
    
    94
    -            self.register_client(message_queue)
    
    69
    +        self._action.CopyFrom(action)
    
    70
    +        self._do_not_cache = self._action.do_not_cache
    
    71
    +        self._operation_update_queues = []
    
    72
    +        self._operation.name = self._name
    
    73
    +        self._operation.done = False
    
    74
    +        self._n_tries = 0
    
    95 75
     
    
    96 76
         @property
    
    97 77
         def name(self):
    
    98 78
             return self._name
    
    99 79
     
    
    80
    +    @property
    
    81
    +    def do_not_cache(self):
    
    82
    +        return self._do_not_cache
    
    83
    +
    
    84
    +    @property
    
    85
    +    def action(self):
    
    86
    +        return self._action
    
    87
    +
    
    100 88
         @property
    
    101 89
         def action_digest(self):
    
    102
    -        return self._action_digest
    
    90
    +        return self.__operation_metadata.action_digest
    
    103 91
     
    
    104 92
         @property
    
    105
    -    def do_not_cache(self):
    
    106
    -        return self._do_not_cache
    
    93
    +    def action_result(self):
    
    94
    +        if self.__execute_response is not None:
    
    95
    +            return self.__execute_response.result
    
    96
    +        else:
    
    97
    +            return None
    
    107 98
     
    
    108
    -    def check_job_finished(self):
    
    109
    -        if not self._operation_update_queues:
    
    110
    -            return self._operation.done
    
    111
    -        return False
    
    99
    +    @property
    
    100
    +    def operation(self):
    
    101
    +        return self._operation
    
    102
    +
    
    103
    +    @property
    
    104
    +    def operation_stage(self):
    
    105
    +        return OperationStage(self.__operation_metadata.state)
    
    106
    +
    
    107
    +    @property
    
    108
    +    def lease(self):
    
    109
    +        return self._lease
    
    110
    +
    
    111
    +    @property
    
    112
    +    def lease_state(self):
    
    113
    +        if self._lease is not None:
    
    114
    +            return LeaseState(self._lease.state)
    
    115
    +        else:
    
    116
    +            return None
    
    117
    +
    
    118
    +    @property
    
    119
    +    def n_tries(self):
    
    120
    +        return self._n_tries
    
    121
    +
    
    122
    +    @property
    
    123
    +    def n_clients(self):
    
    124
    +        return len(self._operation_update_queues)
    
    112 125
     
    
    113 126
         def register_client(self, queue):
    
    127
    +        """Subscribes to the job's :class:`Operation` stage change events.
    
    128
    +
    
    129
    +        Args:
    
    130
    +            queue (queue.Queue): the event queue to register.
    
    131
    +        """
    
    114 132
             self._operation_update_queues.append(queue)
    
    115
    -        queue.put(self.get_operation())
    
    133
    +        queue.put(self._operation)
    
    116 134
     
    
    117 135
         def unregister_client(self, queue):
    
    136
    +        """Unsubscribes to the job's :class:`Operation` stage change events.
    
    137
    +
    
    138
    +        Args:
    
    139
    +            queue (queue.Queue): the event queue to unregister.
    
    140
    +        """
    
    118 141
             self._operation_update_queues.remove(queue)
    
    119 142
     
    
    120
    -    def get_operation(self):
    
    121
    -        self._operation.metadata.CopyFrom(self._pack_any(self.get_operation_meta()))
    
    122
    -        if self.result is not None:
    
    123
    -            self._operation.done = True
    
    124
    -            response = remote_execution_pb2.ExecuteResponse(result=self.result,
    
    125
    -                                                            cached_result=self.result_cached)
    
    143
    +    def set_cached_result(self, action_result):
    
    144
    +        """Allows specifying an action result form the action cache for the job.
    
    145
    +        """
    
    146
    +        self.__execute_response = remote_execution_pb2.ExecuteResponse()
    
    147
    +        self.__execute_response.result.CopyFrom(action_result)
    
    148
    +        self.__execute_response.cached_result = True
    
    126 149
     
    
    127
    -            if not self.result_cached:
    
    128
    -                response.status.CopyFrom(self.lease.status)
    
    150
    +    def create_lease(self):
    
    151
    +        """Emits a new :class:`Lease` for the job.
    
    129 152
     
    
    130
    -            self._operation.response.CopyFrom(self._pack_any(response))
    
    153
    +        Only one :class:`Lease` can be emitted for a given job. This method
    
    154
    +        should only be used once, any furhter calls are ignored.
    
    155
    +        """
    
    156
    +        if self._lease is not None:
    
    157
    +            return None
    
    131 158
     
    
    132
    -        return self._operation
    
    159
    +        self._lease = bots_pb2.Lease()
    
    160
    +        self._lease.id = self._name
    
    161
    +        self._lease.payload.Pack(self.__operation_metadata.action_digest)
    
    162
    +        self._lease.state = LeaseState.PENDING.value
    
    133 163
     
    
    134
    -    def get_operation_meta(self):
    
    135
    -        meta = remote_execution_pb2.ExecuteOperationMetadata()
    
    136
    -        meta.stage = self._execute_stage.value
    
    137
    -        meta.action_digest.CopyFrom(self._action_digest)
    
    164
    +        return self._lease
    
    138 165
     
    
    139
    -        return meta
    
    166
    +    def update_lease_state(self, state, status=None, result=None):
    
    167
    +        """Operates a state transition for the job's current :class:Lease.
    
    140 168
     
    
    141
    -    def create_lease(self):
    
    142
    -        action_digest = self._pack_any(self._action_digest)
    
    169
    +        Args:
    
    170
    +            state (LeaseState): the lease state to transition to.
    
    171
    +            status (google.rpc.Status): the lease execution status, only
    
    172
    +                required if `state` is `COMPLETED`.
    
    173
    +            result (google.protobuf.Any): the lease execution result, only
    
    174
    +                required if `state` is `COMPLETED`.
    
    175
    +        """
    
    176
    +        if state.value == self._lease.state:
    
    177
    +            return
    
    178
    +
    
    179
    +        self._lease.state = state.value
    
    143 180
     
    
    144
    -        lease = bots_pb2.Lease(id=self.name,
    
    145
    -                               payload=action_digest,
    
    146
    -                               state=LeaseState.PENDING.value)
    
    147
    -        self.lease = lease
    
    148
    -        return lease
    
    181
    +        if self._lease.state == LeaseState.PENDING.value:
    
    182
    +            self._lease.status.Clear()
    
    183
    +            self._lease.result.Clear()
    
    149 184
     
    
    150
    -    def get_operations(self):
    
    151
    -        return operations_pb2.ListOperationsResponse(operations=[self.get_operation()])
    
    185
    +        elif self._lease.state == LeaseState.COMPLETED.value:
    
    186
    +            action_result = remote_execution_pb2.ActionResult()
    
    187
    +
    
    188
    +            # TODO: Make a distinction between build and bot failures!
    
    189
    +            if status.code != 0:
    
    190
    +                self._do_not_cache = True
    
    191
    +
    
    192
    +            if result is not None:
    
    193
    +                assert result.Is(action_result.DESCRIPTOR)
    
    194
    +                result.Unpack(action_result)
    
    195
    +
    
    196
    +            self.__execute_response = remote_execution_pb2.ExecuteResponse()
    
    197
    +            self.__execute_response.result.CopyFrom(action_result)
    
    198
    +            self.__execute_response.cached_result = False
    
    199
    +            self.__execute_response.status.CopyFrom(status)
    
    200
    +
    
    201
    +    def cancel_lease(self):
    
    202
    +        if self._lease is not None:
    
    203
    +            self.update_lease_state(LeaseState.CANCELLED)
    
    204
    +
    
    205
    +    def update_operation_stage(self, stage):
    
    206
    +        """Operates a stage transition for the job's :class:Operation.
    
    207
    +
    
    208
    +        Args:
    
    209
    +            stage (OperationStage): the operation stage to transition to.
    
    210
    +        """
    
    211
    +        if stage.value == self.__operation_metadata.stage:
    
    212
    +            return
    
    213
    +
    
    214
    +        self.__operation_metadata.stage = stage.value
    
    215
    +
    
    216
    +        if self.__operation_metadata.stage == OperationStage.QUEUED.value:
    
    217
    +            self._n_tries += 1
    
    218
    +
    
    219
    +        elif self.__operation_metadata.stage == OperationStage.COMPLETED.value:
    
    220
    +            if self.__execute_response is not None:
    
    221
    +                self._operation.response.Pack(self.__execute_response)
    
    222
    +            self._operation.done = True
    
    223
    +
    
    224
    +        self._operation.metadata.Pack(self.__operation_metadata)
    
    152 225
     
    
    153
    -    def update_execute_stage(self, stage):
    
    154
    -        self._execute_stage = stage
    
    155 226
             for queue in self._operation_update_queues:
    
    156
    -            queue.put(self.get_operation())
    
    227
    +            queue.put(self._operation)
    
    228
    +
    
    229
    +    def cancel_operation(self):
    
    230
    +        self.__operation_canceled = True
    
    231
    +
    
    232
    +        self.cancel_lease()
    
    233
    +
    
    234
    +        response = remote_execution_pb2.ExecuteResponse()
    
    235
    +        response.status.code = code_pb2.CANCELLED
    
    236
    +        response.status.message = "The operation was cancelled by the caller."
    
    237
    +
    
    238
    +        self._operation.response.Pack(response)
    
    239
    +        self._operation.done = True
    
    157 240
     
    
    158
    -    def _pack_any(self, pack):
    
    159
    -        some_any = any_pb2.Any()
    
    160
    -        some_any.Pack(pack)
    
    161
    -        return some_any
    241
    +        self.update_operation_stage(OperationStage.COMPLETED)

  • buildgrid/server/operations/instance.py
    ... ... @@ -22,6 +22,7 @@ An instance of the LongRunningOperations Service.
    22 22
     import logging
    
    23 23
     
    
    24 24
     from buildgrid._exceptions import InvalidArgumentError
    
    25
    +from buildgrid._protos.google.longrunning import operations_pb2
    
    25 26
     
    
    26 27
     
    
    27 28
     class OperationsInstance:
    
    ... ... @@ -34,18 +35,21 @@ class OperationsInstance:
    34 35
             server.add_operations_instance(self, instance_name)
    
    35 36
     
    
    36 37
         def get_operation(self, name):
    
    37
    -        operation = self._scheduler.jobs.get(name)
    
    38
    +        job = self._scheduler.jobs.get(name)
    
    38 39
     
    
    39
    -        if operation is None:
    
    40
    +        if job is None:
    
    40 41
                 raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    41 42
     
    
    42 43
             else:
    
    43
    -            return operation.get_operation()
    
    44
    +            return job.operation
    
    44 45
     
    
    45 46
         def list_operations(self, list_filter, page_size, page_token):
    
    46 47
             # TODO: Pages
    
    47 48
             # Spec says number of pages and length of a page are optional
    
    48
    -        return self._scheduler.get_operations()
    
    49
    +        response = operations_pb2.ListOperationsResponse()
    
    50
    +        response.operations.extend([job.operation for job in self._scheduler.list_jobs()])
    
    51
    +
    
    52
    +        return response
    
    49 53
     
    
    50 54
         def delete_operation(self, name):
    
    51 55
             try:
    

  • buildgrid/server/scheduler.py
    ... ... @@ -11,9 +11,7 @@
    11 11
     # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12 12
     # See the License for the specific language governing permissions and
    
    13 13
     # limitations under the License.
    
    14
    -#
    
    15
    -# Authors:
    
    16
    -#        Finn Ball <finn ball codethink co uk>
    
    14
    +
    
    17 15
     
    
    18 16
     """
    
    19 17
     Scheduler
    
    ... ... @@ -24,10 +22,8 @@ Schedules jobs.
    24 22
     from collections import deque
    
    25 23
     
    
    26 24
     from buildgrid._exceptions import NotFoundError
    
    27
    -from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    28
    -from buildgrid._protos.google.longrunning import operations_pb2
    
    29 25
     
    
    30
    -from .job import ExecuteStage, LeaseState
    
    26
    +from .job import OperationStage, LeaseState
    
    31 27
     
    
    32 28
     
    
    33 29
     class Scheduler:
    
    ... ... @@ -39,80 +35,106 @@ class Scheduler:
    39 35
             self.jobs = {}
    
    40 36
             self.queue = deque()
    
    41 37
     
    
    42
    -    def register_client(self, name, queue):
    
    43
    -        self.jobs[name].register_client(queue)
    
    38
    +    def register_client(self, job_name, queue):
    
    39
    +        self.jobs[job_name].register_client(queue)
    
    40
    +
    
    41
    +    def unregister_client(self, job_name, queue):
    
    42
    +        self.jobs[job_name].unregister_client(queue)
    
    44 43
     
    
    45
    -    def unregister_client(self, name, queue):
    
    46
    -        job = self.jobs[name]
    
    47
    -        job.unregister_client(queue)
    
    48
    -        if job.check_job_finished():
    
    49
    -            del self.jobs[name]
    
    44
    +        if self.jobs[job_name].n_clients == 0:
    
    45
    +            del self.jobs[job_name]
    
    50 46
     
    
    51
    -    def append_job(self, job, skip_cache_lookup=False):
    
    47
    +    def queue_job(self, job, skip_cache_lookup=False):
    
    52 48
             self.jobs[job.name] = job
    
    49
    +
    
    50
    +        operation_stage = None
    
    53 51
             if self._action_cache is not None and not skip_cache_lookup:
    
    54 52
                 try:
    
    55
    -                cached_result = self._action_cache.get_action_result(job.action_digest)
    
    53
    +                action_result = self._action_cache.get_action_result(job.action_digest)
    
    56 54
                 except NotFoundError:
    
    55
    +                operation_stage = OperationStage.QUEUED
    
    57 56
                     self.queue.append(job)
    
    58
    -                job.update_execute_stage(ExecuteStage.QUEUED)
    
    59 57
     
    
    60 58
                 else:
    
    61
    -                job.result = cached_result
    
    62
    -                job.result_cached = True
    
    63
    -                job.update_execute_stage(ExecuteStage.COMPLETED)
    
    59
    +                job.set_cached_result(action_result)
    
    60
    +                operation_stage = OperationStage.COMPLETED
    
    64 61
     
    
    65 62
             else:
    
    63
    +            operation_stage = OperationStage.QUEUED
    
    66 64
                 self.queue.append(job)
    
    67
    -            job.update_execute_stage(ExecuteStage.QUEUED)
    
    68 65
     
    
    69
    -    def retry_job(self, name):
    
    70
    -        if name in self.jobs:
    
    71
    -            job = self.jobs[name]
    
    66
    +        job.update_operation_stage(operation_stage)
    
    67
    +
    
    68
    +    def retry_job(self, job_name):
    
    69
    +        if job_name in self.jobs:
    
    70
    +            job = self.jobs[job_name]
    
    72 71
                 if job.n_tries >= self.MAX_N_TRIES:
    
    73 72
                     # TODO: Decide what to do with these jobs
    
    74
    -                job.update_execute_stage(ExecuteStage.COMPLETED)
    
    73
    +                job.update_operation_stage(OperationStage.COMPLETED)
    
    75 74
                     # TODO: Mark these jobs as done
    
    76 75
                 else:
    
    77
    -                job.update_execute_stage(ExecuteStage.QUEUED)
    
    78
    -                job.n_tries += 1
    
    76
    +                job.update_operation_stage(OperationStage.QUEUED)
    
    79 77
                     self.queue.appendleft(job)
    
    80 78
     
    
    81
    -    def job_complete(self, name, result, status):
    
    82
    -        job = self.jobs[name]
    
    83
    -        job.lease.status.CopyFrom(status)
    
    84
    -        action_result = remote_execution_pb2.ActionResult()
    
    85
    -        result.Unpack(action_result)
    
    86
    -        job.result = action_result
    
    87
    -        if not job.do_not_cache and self._action_cache is not None:
    
    88
    -            if not job.lease.status.code:
    
    89
    -                self._action_cache.update_action_result(job.action_digest, action_result)
    
    90
    -        job.update_execute_stage(ExecuteStage.COMPLETED)
    
    91
    -
    
    92
    -    def get_operations(self):
    
    93
    -        response = operations_pb2.ListOperationsResponse()
    
    94
    -        for v in self.jobs.values():
    
    95
    -            response.operations.extend([v.get_operation()])
    
    96
    -        return response
    
    97
    -
    
    98
    -    def update_job_lease_state(self, name, state):
    
    99
    -        job = self.jobs[name]
    
    100
    -        job.lease.state = state
    
    101
    -
    
    102
    -    def get_job_lease(self, name):
    
    103
    -        return self.jobs[name].lease
    
    104
    -
    
    105
    -    def cancel_session(self, name):
    
    106
    -        job = self.jobs[name]
    
    107
    -        state = job.lease.state
    
    108
    -        if state in (LeaseState.PENDING.value, LeaseState.ACTIVE.value):
    
    109
    -            self.retry_job(name)
    
    110
    -
    
    111
    -    def create_lease(self):
    
    112
    -        if self.queue:
    
    113
    -            job = self.queue.popleft()
    
    114
    -            job.update_execute_stage(ExecuteStage.EXECUTING)
    
    115
    -            job.create_lease()
    
    116
    -            job.lease.state = LeaseState.PENDING.value
    
    117
    -            return job.lease
    
    118
    -        return None
    79
    +    def list_jobs(self):
    
    80
    +        return self.jobs.values()
    
    81
    +
    
    82
    +    def request_job_leases(self, worker_capabilities):
    
    83
    +        """Generates a list of the highest priority leases to be run.
    
    84
    +
    
    85
    +        Args:
    
    86
    +            worker_capabilities (dict): a set of key-value pairs decribing the
    
    87
    +                worker properties, configuration and state at the time of the
    
    88
    +                request.
    
    89
    +        """
    
    90
    +        if not self.queue:
    
    91
    +            return []
    
    92
    +
    
    93
    +        job = self.queue.popleft()
    
    94
    +        # For now, one lease at a time:
    
    95
    +        lease = job.create_lease()
    
    96
    +
    
    97
    +        return [lease]
    
    98
    +
    
    99
    +    def update_job_lease_state(self, job_name, lease_state, lease_status=None, lease_result=None):
    
    100
    +        """Requests a state transition for a job's current :class:Lease.
    
    101
    +
    
    102
    +        Args:
    
    103
    +            job_name (str): name of the job to query.
    
    104
    +            lease_state (LeaseState): the lease state to transition to.
    
    105
    +            lease_status (google.rpc.Status): the lease execution status, only
    
    106
    +                required if `lease_state` is `COMPLETED`.
    
    107
    +            lease_result (google.protobuf.Any): the lease execution result, only
    
    108
    +                required if `lease_state` is `COMPLETED`.
    
    109
    +        """
    
    110
    +        job = self.jobs[job_name]
    
    111
    +
    
    112
    +        if lease_state != LeaseState.COMPLETED:
    
    113
    +            job.update_lease_state(lease_state)
    
    114
    +
    
    115
    +        else:
    
    116
    +            job.update_lease_state(lease_state,
    
    117
    +                                   status=lease_status, result=lease_result)
    
    118
    +
    
    119
    +            if self._action_cache is not None and not job.do_not_cache:
    
    120
    +                self._action_cache.update_action_result(job.action_digest, job.action_result)
    
    121
    +
    
    122
    +            job.update_operation_stage(OperationStage.COMPLETED)
    
    123
    +
    
    124
    +    def get_job_lease(self, job_name):
    
    125
    +        """Returns the lease associated to job, if any have been emitted yet."""
    
    126
    +        return self.jobs[job_name].lease
    
    127
    +
    
    128
    +    def get_job_operation(self, job_name):
    
    129
    +        """Returns the operation associated to job."""
    
    130
    +        return self.jobs[job_name].operation
    
    131
    +
    
    132
    +    def cancel_job_operation(self, job_name):
    
    133
    +        """"Cancels the underlying operation of a given job.
    
    134
    +
    
    135
    +        This will also cancel any job's lease that may have been issued.
    
    136
    +
    
    137
    +        Args:
    
    138
    +            job_name (str): name of the job holding the operation to cancel.
    
    139
    +        """
    
    140
    +        self.jobs[job_name].cancel_operation()

  • tests/integration/bots_service.py
    ... ... @@ -137,7 +137,7 @@ def test_update_leases_with_work(bot_session, context, instance):
    137 137
                                                    bot_session=bot_session)
    
    138 138
     
    
    139 139
         action_digest = remote_execution_pb2.Digest(hash='gaff')
    
    140
    -    _inject_work(instance._instances[""]._scheduler, action_digest)
    
    140
    +    _inject_work(instance._instances[""]._scheduler, action_digest=action_digest)
    
    141 141
     
    
    142 142
         response = instance.CreateBotSession(request, context)
    
    143 143
     
    
    ... ... @@ -159,7 +159,7 @@ def test_update_leases_work_complete(bot_session, context, instance):
    159 159
     
    
    160 160
         # Inject work
    
    161 161
         action_digest = remote_execution_pb2.Digest(hash='gaff')
    
    162
    -    _inject_work(instance._instances[""]._scheduler, action_digest)
    
    162
    +    _inject_work(instance._instances[""]._scheduler, action_digest=action_digest)
    
    163 163
     
    
    164 164
         request = bots_pb2.UpdateBotSessionRequest(name=response.name,
    
    165 165
                                                    bot_session=response)
    
    ... ... @@ -174,6 +174,7 @@ def test_update_leases_work_complete(bot_session, context, instance):
    174 174
         response = copy.deepcopy(instance.UpdateBotSession(request, context))
    
    175 175
     
    
    176 176
         response.leases[0].state = LeaseState.COMPLETED.value
    
    177
    +    response.leases[0].result.Pack(remote_execution_pb2.ActionResult())
    
    177 178
     
    
    178 179
         request = bots_pb2.UpdateBotSessionRequest(name=response.name,
    
    179 180
                                                    bot_session=response)
    
    ... ... @@ -187,7 +188,7 @@ def test_work_rejected_by_bot(bot_session, context, instance):
    187 188
                                                    bot_session=bot_session)
    
    188 189
         # Inject work
    
    189 190
         action_digest = remote_execution_pb2.Digest(hash='gaff')
    
    190
    -    _inject_work(instance._instances[""]._scheduler, action_digest)
    
    191
    +    _inject_work(instance._instances[""]._scheduler, action_digest=action_digest)
    
    191 192
     
    
    192 193
         # Simulated the severed binding between client and server
    
    193 194
         response = copy.deepcopy(instance.CreateBotSession(request, context))
    
    ... ... @@ -209,7 +210,7 @@ def test_work_out_of_sync_from_pending(state, bot_session, context, instance):
    209 210
                                                    bot_session=bot_session)
    
    210 211
         # Inject work
    
    211 212
         action_digest = remote_execution_pb2.Digest(hash='gaff')
    
    212
    -    _inject_work(instance._instances[""]._scheduler, action_digest)
    
    213
    +    _inject_work(instance._instances[""]._scheduler, action_digest=action_digest)
    
    213 214
     
    
    214 215
         # Simulated the severed binding between client and server
    
    215 216
         response = copy.deepcopy(instance.CreateBotSession(request, context))
    
    ... ... @@ -230,7 +231,7 @@ def test_work_out_of_sync_from_active(state, bot_session, context, instance):
    230 231
                                                    bot_session=bot_session)
    
    231 232
         # Inject work
    
    232 233
         action_digest = remote_execution_pb2.Digest(hash='gaff')
    
    233
    -    _inject_work(instance._instances[""]._scheduler, action_digest)
    
    234
    +    _inject_work(instance._instances[""]._scheduler, action_digest=action_digest)
    
    234 235
     
    
    235 236
         # Simulated the severed binding between client and server
    
    236 237
         response = copy.deepcopy(instance.CreateBotSession(request, context))
    
    ... ... @@ -257,7 +258,7 @@ def test_work_active_to_active(bot_session, context, instance):
    257 258
                                                    bot_session=bot_session)
    
    258 259
         # Inject work
    
    259 260
         action_digest = remote_execution_pb2.Digest(hash='gaff')
    
    260
    -    _inject_work(instance._instances[""]._scheduler, action_digest)
    
    261
    +    _inject_work(instance._instances[""]._scheduler, action_digest=action_digest)
    
    261 262
     
    
    262 263
         # Simulated the severed binding between client and server
    
    263 264
         response = copy.deepcopy(instance.CreateBotSession(request, context))
    
    ... ... @@ -279,8 +280,10 @@ def test_post_bot_event_temp(context, instance):
    279 280
         context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
    
    280 281
     
    
    281 282
     
    
    282
    -def _inject_work(scheduler, action_digest=None):
    
    283
    +def _inject_work(scheduler, action=None, action_digest=None):
    
    284
    +    if not action:
    
    285
    +        action = remote_execution_pb2.Action()
    
    283 286
         if not action_digest:
    
    284 287
             action_digest = remote_execution_pb2.Digest()
    
    285
    -    j = job.Job(action_digest, False)
    
    286
    -    scheduler.append_job(j, True)
    288
    +    j = job.Job(action, action_digest)
    
    289
    +    scheduler.queue_job(j, True)

  • tests/integration/execution_service.py
    ... ... @@ -82,7 +82,7 @@ def test_execute(skip_cache_lookup, instance, context):
    82 82
         assert isinstance(result, operations_pb2.Operation)
    
    83 83
         metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    84 84
         result.metadata.Unpack(metadata)
    
    85
    -    assert metadata.stage == job.ExecuteStage.QUEUED.value
    
    85
    +    assert metadata.stage == job.OperationStage.QUEUED.value
    
    86 86
         assert uuid.UUID(result.name, version=4)
    
    87 87
         assert result.done is False
    
    88 88
     
    
    ... ... @@ -105,7 +105,7 @@ def test_no_action_digest_in_storage(instance, context):
    105 105
     
    
    106 106
     
    
    107 107
     def test_wait_execution(instance, controller, context):
    
    108
    -    j = job.Job(action_digest, None)
    
    108
    +    j = job.Job(action, action_digest)
    
    109 109
         j._operation.done = True
    
    110 110
     
    
    111 111
         request = remote_execution_pb2.WaitExecutionRequest(name="{}/{}".format('', j.name))
    
    ... ... @@ -116,7 +116,7 @@ def test_wait_execution(instance, controller, context):
    116 116
         action_result = remote_execution_pb2.ActionResult()
    
    117 117
         action_result_any.Pack(action_result)
    
    118 118
     
    
    119
    -    j.update_execute_stage(job.ExecuteStage.COMPLETED)
    
    119
    +    j.update_operation_stage(job.OperationStage.COMPLETED)
    
    120 120
     
    
    121 121
         response = instance.WaitExecution(request, context)
    
    122 122
     
    
    ... ... @@ -125,7 +125,7 @@ def test_wait_execution(instance, controller, context):
    125 125
         assert isinstance(result, operations_pb2.Operation)
    
    126 126
         metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    127 127
         result.metadata.Unpack(metadata)
    
    128
    -    assert metadata.stage == job.ExecuteStage.COMPLETED.value
    
    128
    +    assert metadata.stage == job.OperationStage.COMPLETED.value
    
    129 129
         assert uuid.UUID(result.name, version=4)
    
    130 130
         assert result.done is True
    
    131 131
     
    

  • tests/integration/operations_service.py
    ... ... @@ -30,6 +30,7 @@ from buildgrid._protos.google.longrunning import operations_pb2
    30 30
     from buildgrid._protos.google.rpc import status_pb2
    
    31 31
     from buildgrid.server.cas.storage import lru_memory_cache
    
    32 32
     from buildgrid.server.controller import ExecutionController
    
    33
    +from buildgrid.server.job import LeaseState
    
    33 34
     from buildgrid.server.operations import service
    
    34 35
     from buildgrid.server.operations.service import OperationsService
    
    35 36
     from buildgrid.utils import create_digest
    
    ... ... @@ -131,9 +132,10 @@ def test_list_operations_with_result(instance, controller, execute_request, cont
    131 132
         action_result.output_files.extend([output_file])
    
    132 133
     
    
    133 134
         controller.operations_instance._scheduler.jobs[response_execute.name].create_lease()
    
    134
    -    controller.operations_instance._scheduler.job_complete(response_execute.name,
    
    135
    -                                                           _pack_any(action_result),
    
    136
    -                                                           status_pb2.Status())
    
    135
    +    controller.operations_instance._scheduler.update_job_lease_state(response_execute.name,
    
    136
    +                                                                     LeaseState.COMPLETED,
    
    137
    +                                                                     lease_status=status_pb2.Status(),
    
    138
    +                                                                     lease_result=_pack_any(action_result))
    
    137 139
     
    
    138 140
         request = operations_pb2.ListOperationsRequest(name=instance_name)
    
    139 141
         response = instance.ListOperations(request, context)
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]