[Notes] [Git][BuildGrid/buildgrid][mablanch/83-executed-action-metadata] 2 commits: Renaming ExecutionStage to OperationStage for clarity



Title: GitLab

Martin Blanchard pushed to branch mablanch/83-executed-action-metadata at BuildGrid / buildgrid

Commits:

5 changed files:

Changes:

  • buildgrid/_app/bots/buildbox.py
    ... ... @@ -17,8 +17,6 @@ import os
    17 17
     import subprocess
    
    18 18
     import tempfile
    
    19 19
     
    
    20
    -from google.protobuf import any_pb2
    
    21
    -
    
    22 20
     from buildgrid.client.cas import download, upload
    
    23 21
     from buildgrid._exceptions import BotError
    
    24 22
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    ... ... @@ -29,13 +27,14 @@ from buildgrid.utils import read_file, write_file
    29 27
     def work_buildbox(context, lease):
    
    30 28
         """Executes a lease for a build action, using buildbox.
    
    31 29
         """
    
    32
    -
    
    33 30
         local_cas_directory = context.local_cas
    
    34 31
         # instance_name = context.parent
    
    35 32
         logger = context.logger
    
    36 33
     
    
    37 34
         action_digest = remote_execution_pb2.Digest()
    
    35
    +
    
    38 36
         lease.payload.Unpack(action_digest)
    
    37
    +    lease.result.Clear()
    
    39 38
     
    
    40 39
         with download(context.cas_channel) as downloader:
    
    41 40
             action = downloader.get_message(action_digest,
    
    ... ... @@ -131,10 +130,7 @@ def work_buildbox(context, lease):
    131 130
     
    
    132 131
                 action_result.output_directories.extend([output_directory])
    
    133 132
     
    
    134
    -            action_result_any = any_pb2.Any()
    
    135
    -            action_result_any.Pack(action_result)
    
    136
    -
    
    137
    -            lease.result.CopyFrom(action_result_any)
    
    133
    +            lease.result.Pack(action_result)
    
    138 134
     
    
    139 135
         return lease
    
    140 136
     
    

  • buildgrid/_app/bots/host.py
    ... ... @@ -17,8 +17,6 @@ import os
    17 17
     import subprocess
    
    18 18
     import tempfile
    
    19 19
     
    
    20
    -from google.protobuf import any_pb2
    
    21
    -
    
    22 20
     from buildgrid.client.cas import download, upload
    
    23 21
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    24 22
     from buildgrid.utils import output_file_maker, output_directory_maker
    
    ... ... @@ -27,12 +25,13 @@ from buildgrid.utils import output_file_maker, output_directory_maker
    27 25
     def work_host_tools(context, lease):
    
    28 26
         """Executes a lease for a build action, using host tools.
    
    29 27
         """
    
    30
    -
    
    31 28
         instance_name = context.parent
    
    32 29
         logger = context.logger
    
    33 30
     
    
    34 31
         action_digest = remote_execution_pb2.Digest()
    
    32
    +
    
    35 33
         lease.payload.Unpack(action_digest)
    
    34
    +    lease.result.Clear()
    
    36 35
     
    
    37 36
         with tempfile.TemporaryDirectory() as temp_directory:
    
    38 37
             with download(context.cas_channel, instance=instance_name) as downloader:
    
    ... ... @@ -122,9 +121,6 @@ def work_host_tools(context, lease):
    122 121
     
    
    123 122
                 action_result.output_directories.extend(output_directories)
    
    124 123
     
    
    125
    -        action_result_any = any_pb2.Any()
    
    126
    -        action_result_any.Pack(action_result)
    
    127
    -
    
    128
    -        lease.result.CopyFrom(action_result_any)
    
    124
    +        lease.result.Pack(action_result)
    
    129 125
     
    
    130 126
         return lease

  • buildgrid/server/job.py
    ... ... @@ -19,57 +19,33 @@ import logging
    19 19
     import uuid
    
    20 20
     from enum import Enum
    
    21 21
     
    
    22
    -from google.protobuf import any_pb2
    
    23
    -
    
    24 22
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    25 23
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    26 24
     from buildgrid._protos.google.longrunning import operations_pb2
    
    27 25
     
    
    28 26
     
    
    29
    -class ExecuteStage(Enum):
    
    27
    +class OperationStage(Enum):
    
    28
    +    # Initially unknown stage.
    
    30 29
         UNKNOWN = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('UNKNOWN')
    
    31
    -
    
    32 30
         # Checking the result against the cache.
    
    33 31
         CACHE_CHECK = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('CACHE_CHECK')
    
    34
    -
    
    35 32
         # Currently idle, awaiting a free machine to execute.
    
    36 33
         QUEUED = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('QUEUED')
    
    37
    -
    
    38 34
         # Currently being executed by a worker.
    
    39 35
         EXECUTING = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('EXECUTING')
    
    40
    -
    
    41 36
         # Finished execution.
    
    42 37
         COMPLETED = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('COMPLETED')
    
    43 38
     
    
    44 39
     
    
    45
    -class BotStatus(Enum):
    
    46
    -    BOT_STATUS_UNSPECIFIED = bots_pb2.BotStatus.Value('BOT_STATUS_UNSPECIFIED')
    
    47
    -
    
    48
    -    # The bot is healthy, and will accept leases as normal.
    
    49
    -    OK = bots_pb2.BotStatus.Value('OK')
    
    50
    -
    
    51
    -    # The bot is unhealthy and will not accept new leases.
    
    52
    -    UNHEALTHY = bots_pb2.BotStatus.Value('UNHEALTHY')
    
    53
    -
    
    54
    -    # The bot has been asked to reboot the host.
    
    55
    -    HOST_REBOOTING = bots_pb2.BotStatus.Value('HOST_REBOOTING')
    
    56
    -
    
    57
    -    # The bot has been asked to shut down.
    
    58
    -    BOT_TERMINATING = bots_pb2.BotStatus.Value('BOT_TERMINATING')
    
    59
    -
    
    60
    -
    
    61 40
     class LeaseState(Enum):
    
    41
    +    # Initially unknown state.
    
    62 42
         LEASE_STATE_UNSPECIFIED = bots_pb2.LeaseState.Value('LEASE_STATE_UNSPECIFIED')
    
    63
    -
    
    64 43
         # The server expects the bot to accept this lease.
    
    65 44
         PENDING = bots_pb2.LeaseState.Value('PENDING')
    
    66
    -
    
    67 45
         # The bot has accepted this lease.
    
    68 46
         ACTIVE = bots_pb2.LeaseState.Value('ACTIVE')
    
    69
    -
    
    70 47
         # The bot is no longer leased.
    
    71 48
         COMPLETED = bots_pb2.LeaseState.Value('COMPLETED')
    
    72
    -
    
    73 49
         # The bot should immediately release all resources associated with the lease.
    
    74 50
         CANCELLED = bots_pb2.LeaseState.Value('CANCELLED')
    
    75 51
     
    
    ... ... @@ -85,7 +61,7 @@ class Job:
    85 61
     
    
    86 62
             self._action_digest = action_digest
    
    87 63
             self._do_not_cache = do_not_cache
    
    88
    -        self._execute_stage = ExecuteStage.UNKNOWN
    
    64
    +        self._execute_stage = OperationStage.UNKNOWN
    
    89 65
             self._name = str(uuid.uuid4())
    
    90 66
             self._operation = operations_pb2.Operation(name=self._name)
    
    91 67
             self._operation_update_queues = []
    
    ... ... @@ -118,7 +94,7 @@ class Job:
    118 94
             self._operation_update_queues.remove(queue)
    
    119 95
     
    
    120 96
         def get_operation(self):
    
    121
    -        self._operation.metadata.CopyFrom(self._pack_any(self.get_operation_meta()))
    
    97
    +        self._operation.metadata.Pack(self.get_operation_meta())
    
    122 98
             if self.result is not None:
    
    123 99
                 self._operation.done = True
    
    124 100
                 response = remote_execution_pb2.ExecuteResponse(result=self.result,
    
    ... ... @@ -127,7 +103,7 @@ class Job:
    127 103
                 if not self.result_cached:
    
    128 104
                     response.status.CopyFrom(self.lease.status)
    
    129 105
     
    
    130
    -            self._operation.response.CopyFrom(self._pack_any(response))
    
    106
    +            self._operation.response.Pack(response)
    
    131 107
     
    
    132 108
             return self._operation
    
    133 109
     
    
    ... ... @@ -139,11 +115,9 @@ class Job:
    139 115
             return meta
    
    140 116
     
    
    141 117
         def create_lease(self):
    
    142
    -        action_digest = self._pack_any(self._action_digest)
    
    118
    +        lease = bots_pb2.Lease(id=self.name, state=LeaseState.PENDING.value)
    
    119
    +        lease.payload.Pack(self._action_digest)
    
    143 120
     
    
    144
    -        lease = bots_pb2.Lease(id=self.name,
    
    145
    -                               payload=action_digest,
    
    146
    -                               state=LeaseState.PENDING.value)
    
    147 121
             self.lease = lease
    
    148 122
             return lease
    
    149 123
     
    
    ... ... @@ -154,8 +128,3 @@ class Job:
    154 128
             self._execute_stage = stage
    
    155 129
             for queue in self._operation_update_queues:
    
    156 130
                 queue.put(self.get_operation())
    157
    -
    
    158
    -    def _pack_any(self, pack):
    
    159
    -        some_any = any_pb2.Any()
    
    160
    -        some_any.Pack(pack)
    
    161
    -        return some_any

  • buildgrid/server/scheduler.py
    ... ... @@ -27,7 +27,7 @@ from buildgrid._exceptions import NotFoundError
    27 27
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    28 28
     from buildgrid._protos.google.longrunning import operations_pb2
    
    29 29
     
    
    30
    -from .job import ExecuteStage, LeaseState
    
    30
    +from .job import OperationStage, LeaseState
    
    31 31
     
    
    32 32
     
    
    33 33
     class Scheduler:
    
    ... ... @@ -55,26 +55,26 @@ class Scheduler:
    55 55
                     cached_result = self._action_cache.get_action_result(job.action_digest)
    
    56 56
                 except NotFoundError:
    
    57 57
                     self.queue.append(job)
    
    58
    -                job.update_execute_stage(ExecuteStage.QUEUED)
    
    58
    +                job.update_execute_stage(OperationStage.QUEUED)
    
    59 59
     
    
    60 60
                 else:
    
    61 61
                     job.result = cached_result
    
    62 62
                     job.result_cached = True
    
    63
    -                job.update_execute_stage(ExecuteStage.COMPLETED)
    
    63
    +                job.update_execute_stage(OperationStage.COMPLETED)
    
    64 64
     
    
    65 65
             else:
    
    66 66
                 self.queue.append(job)
    
    67
    -            job.update_execute_stage(ExecuteStage.QUEUED)
    
    67
    +            job.update_execute_stage(OperationStage.QUEUED)
    
    68 68
     
    
    69 69
         def retry_job(self, name):
    
    70 70
             if name in self.jobs:
    
    71 71
                 job = self.jobs[name]
    
    72 72
                 if job.n_tries >= self.MAX_N_TRIES:
    
    73 73
                     # TODO: Decide what to do with these jobs
    
    74
    -                job.update_execute_stage(ExecuteStage.COMPLETED)
    
    74
    +                job.update_execute_stage(OperationStage.COMPLETED)
    
    75 75
                     # TODO: Mark these jobs as done
    
    76 76
                 else:
    
    77
    -                job.update_execute_stage(ExecuteStage.QUEUED)
    
    77
    +                job.update_execute_stage(OperationStage.QUEUED)
    
    78 78
                     job.n_tries += 1
    
    79 79
                     self.queue.appendleft(job)
    
    80 80
     
    
    ... ... @@ -87,7 +87,7 @@ class Scheduler:
    87 87
             if not job.do_not_cache and self._action_cache is not None:
    
    88 88
                 if not job.lease.status.code:
    
    89 89
                     self._action_cache.update_action_result(job.action_digest, action_result)
    
    90
    -        job.update_execute_stage(ExecuteStage.COMPLETED)
    
    90
    +        job.update_execute_stage(OperationStage.COMPLETED)
    
    91 91
     
    
    92 92
         def get_operations(self):
    
    93 93
             response = operations_pb2.ListOperationsResponse()
    
    ... ... @@ -111,7 +111,7 @@ class Scheduler:
    111 111
         def create_lease(self):
    
    112 112
             if self.queue:
    
    113 113
                 job = self.queue.popleft()
    
    114
    -            job.update_execute_stage(ExecuteStage.EXECUTING)
    
    114
    +            job.update_execute_stage(OperationStage.EXECUTING)
    
    115 115
                 job.create_lease()
    
    116 116
                 job.lease.state = LeaseState.PENDING.value
    
    117 117
                 return job.lease
    

  • tests/integration/execution_service.py
    ... ... @@ -82,7 +82,7 @@ def test_execute(skip_cache_lookup, instance, context):
    82 82
         assert isinstance(result, operations_pb2.Operation)
    
    83 83
         metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    84 84
         result.metadata.Unpack(metadata)
    
    85
    -    assert metadata.stage == job.ExecuteStage.QUEUED.value
    
    85
    +    assert metadata.stage == job.OperationStage.QUEUED.value
    
    86 86
         assert uuid.UUID(result.name, version=4)
    
    87 87
         assert result.done is False
    
    88 88
     
    
    ... ... @@ -116,7 +116,7 @@ def test_wait_execution(instance, controller, context):
    116 116
         action_result = remote_execution_pb2.ActionResult()
    
    117 117
         action_result_any.Pack(action_result)
    
    118 118
     
    
    119
    -    j.update_execute_stage(job.ExecuteStage.COMPLETED)
    
    119
    +    j.update_execute_stage(job.OperationStage.COMPLETED)
    
    120 120
     
    
    121 121
         response = instance.WaitExecution(request, context)
    
    122 122
     
    
    ... ... @@ -125,7 +125,7 @@ def test_wait_execution(instance, controller, context):
    125 125
         assert isinstance(result, operations_pb2.Operation)
    
    126 126
         metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    127 127
         result.metadata.Unpack(metadata)
    
    128
    -    assert metadata.stage == job.ExecuteStage.COMPLETED.value
    
    128
    +    assert metadata.stage == job.OperationStage.COMPLETED.value
    
    129 129
         assert uuid.UUID(result.name, version=4)
    
    130 130
         assert result.done is True
    
    131 131
     
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]