Martin Blanchard pushed to branch mablanch/83-executed-action-metadata at BuildGrid / buildgrid
Commits:
-
070e1136
by Martin Blanchard at 2018-10-05T11:35:43Z
-
92564790
by Martin Blanchard at 2018-10-05T11:35:49Z
5 changed files:
- buildgrid/_app/bots/buildbox.py
- buildgrid/_app/bots/host.py
- buildgrid/server/job.py
- buildgrid/server/scheduler.py
- tests/integration/execution_service.py
Changes:
... | ... | @@ -17,8 +17,6 @@ import os |
17 | 17 |
import subprocess
|
18 | 18 |
import tempfile
|
19 | 19 |
|
20 |
-from google.protobuf import any_pb2
|
|
21 |
- |
|
22 | 20 |
from buildgrid.client.cas import download, upload
|
23 | 21 |
from buildgrid._exceptions import BotError
|
24 | 22 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
... | ... | @@ -29,13 +27,14 @@ from buildgrid.utils import read_file, write_file |
29 | 27 |
def work_buildbox(context, lease):
|
30 | 28 |
"""Executes a lease for a build action, using buildbox.
|
31 | 29 |
"""
|
32 |
- |
|
33 | 30 |
local_cas_directory = context.local_cas
|
34 | 31 |
# instance_name = context.parent
|
35 | 32 |
logger = context.logger
|
36 | 33 |
|
37 | 34 |
action_digest = remote_execution_pb2.Digest()
|
35 |
+ |
|
38 | 36 |
lease.payload.Unpack(action_digest)
|
37 |
+ lease.result.Clear()
|
|
39 | 38 |
|
40 | 39 |
with download(context.cas_channel) as downloader:
|
41 | 40 |
action = downloader.get_message(action_digest,
|
... | ... | @@ -131,10 +130,7 @@ def work_buildbox(context, lease): |
131 | 130 |
|
132 | 131 |
action_result.output_directories.extend([output_directory])
|
133 | 132 |
|
134 |
- action_result_any = any_pb2.Any()
|
|
135 |
- action_result_any.Pack(action_result)
|
|
136 |
- |
|
137 |
- lease.result.CopyFrom(action_result_any)
|
|
133 |
+ lease.result.Pack(action_result)
|
|
138 | 134 |
|
139 | 135 |
return lease
|
140 | 136 |
|
... | ... | @@ -17,8 +17,6 @@ import os |
17 | 17 |
import subprocess
|
18 | 18 |
import tempfile
|
19 | 19 |
|
20 |
-from google.protobuf import any_pb2
|
|
21 |
- |
|
22 | 20 |
from buildgrid.client.cas import download, upload
|
23 | 21 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
24 | 22 |
from buildgrid.utils import output_file_maker, output_directory_maker
|
... | ... | @@ -27,12 +25,13 @@ from buildgrid.utils import output_file_maker, output_directory_maker |
27 | 25 |
def work_host_tools(context, lease):
|
28 | 26 |
"""Executes a lease for a build action, using host tools.
|
29 | 27 |
"""
|
30 |
- |
|
31 | 28 |
instance_name = context.parent
|
32 | 29 |
logger = context.logger
|
33 | 30 |
|
34 | 31 |
action_digest = remote_execution_pb2.Digest()
|
32 |
+ |
|
35 | 33 |
lease.payload.Unpack(action_digest)
|
34 |
+ lease.result.Clear()
|
|
36 | 35 |
|
37 | 36 |
with tempfile.TemporaryDirectory() as temp_directory:
|
38 | 37 |
with download(context.cas_channel, instance=instance_name) as downloader:
|
... | ... | @@ -122,9 +121,6 @@ def work_host_tools(context, lease): |
122 | 121 |
|
123 | 122 |
action_result.output_directories.extend(output_directories)
|
124 | 123 |
|
125 |
- action_result_any = any_pb2.Any()
|
|
126 |
- action_result_any.Pack(action_result)
|
|
127 |
- |
|
128 |
- lease.result.CopyFrom(action_result_any)
|
|
124 |
+ lease.result.Pack(action_result)
|
|
129 | 125 |
|
130 | 126 |
return lease
|
... | ... | @@ -19,57 +19,33 @@ import logging |
19 | 19 |
import uuid
|
20 | 20 |
from enum import Enum
|
21 | 21 |
|
22 |
-from google.protobuf import any_pb2
|
|
23 |
- |
|
24 | 22 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
25 | 23 |
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
|
26 | 24 |
from buildgrid._protos.google.longrunning import operations_pb2
|
27 | 25 |
|
28 | 26 |
|
29 |
-class ExecuteStage(Enum):
|
|
27 |
+class OperationStage(Enum):
|
|
28 |
+ # Initially unknown stage.
|
|
30 | 29 |
UNKNOWN = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('UNKNOWN')
|
31 |
- |
|
32 | 30 |
# Checking the result against the cache.
|
33 | 31 |
CACHE_CHECK = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('CACHE_CHECK')
|
34 |
- |
|
35 | 32 |
# Currently idle, awaiting a free machine to execute.
|
36 | 33 |
QUEUED = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('QUEUED')
|
37 |
- |
|
38 | 34 |
# Currently being executed by a worker.
|
39 | 35 |
EXECUTING = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('EXECUTING')
|
40 |
- |
|
41 | 36 |
# Finished execution.
|
42 | 37 |
COMPLETED = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('COMPLETED')
|
43 | 38 |
|
44 | 39 |
|
45 |
-class BotStatus(Enum):
|
|
46 |
- BOT_STATUS_UNSPECIFIED = bots_pb2.BotStatus.Value('BOT_STATUS_UNSPECIFIED')
|
|
47 |
- |
|
48 |
- # The bot is healthy, and will accept leases as normal.
|
|
49 |
- OK = bots_pb2.BotStatus.Value('OK')
|
|
50 |
- |
|
51 |
- # The bot is unhealthy and will not accept new leases.
|
|
52 |
- UNHEALTHY = bots_pb2.BotStatus.Value('UNHEALTHY')
|
|
53 |
- |
|
54 |
- # The bot has been asked to reboot the host.
|
|
55 |
- HOST_REBOOTING = bots_pb2.BotStatus.Value('HOST_REBOOTING')
|
|
56 |
- |
|
57 |
- # The bot has been asked to shut down.
|
|
58 |
- BOT_TERMINATING = bots_pb2.BotStatus.Value('BOT_TERMINATING')
|
|
59 |
- |
|
60 |
- |
|
61 | 40 |
class LeaseState(Enum):
|
41 |
+ # Initially unknown state.
|
|
62 | 42 |
LEASE_STATE_UNSPECIFIED = bots_pb2.LeaseState.Value('LEASE_STATE_UNSPECIFIED')
|
63 |
- |
|
64 | 43 |
# The server expects the bot to accept this lease.
|
65 | 44 |
PENDING = bots_pb2.LeaseState.Value('PENDING')
|
66 |
- |
|
67 | 45 |
# The bot has accepted this lease.
|
68 | 46 |
ACTIVE = bots_pb2.LeaseState.Value('ACTIVE')
|
69 |
- |
|
70 | 47 |
# The bot is no longer leased.
|
71 | 48 |
COMPLETED = bots_pb2.LeaseState.Value('COMPLETED')
|
72 |
- |
|
73 | 49 |
# The bot should immediately release all resources associated with the lease.
|
74 | 50 |
CANCELLED = bots_pb2.LeaseState.Value('CANCELLED')
|
75 | 51 |
|
... | ... | @@ -85,7 +61,7 @@ class Job: |
85 | 61 |
|
86 | 62 |
self._action_digest = action_digest
|
87 | 63 |
self._do_not_cache = do_not_cache
|
88 |
- self._execute_stage = ExecuteStage.UNKNOWN
|
|
64 |
+ self._execute_stage = OperationStage.UNKNOWN
|
|
89 | 65 |
self._name = str(uuid.uuid4())
|
90 | 66 |
self._operation = operations_pb2.Operation(name=self._name)
|
91 | 67 |
self._operation_update_queues = []
|
... | ... | @@ -118,7 +94,7 @@ class Job: |
118 | 94 |
self._operation_update_queues.remove(queue)
|
119 | 95 |
|
120 | 96 |
def get_operation(self):
|
121 |
- self._operation.metadata.CopyFrom(self._pack_any(self.get_operation_meta()))
|
|
97 |
+ self._operation.metadata.Pack(self.get_operation_meta())
|
|
122 | 98 |
if self.result is not None:
|
123 | 99 |
self._operation.done = True
|
124 | 100 |
response = remote_execution_pb2.ExecuteResponse(result=self.result,
|
... | ... | @@ -127,7 +103,7 @@ class Job: |
127 | 103 |
if not self.result_cached:
|
128 | 104 |
response.status.CopyFrom(self.lease.status)
|
129 | 105 |
|
130 |
- self._operation.response.CopyFrom(self._pack_any(response))
|
|
106 |
+ self._operation.response.Pack(response)
|
|
131 | 107 |
|
132 | 108 |
return self._operation
|
133 | 109 |
|
... | ... | @@ -139,11 +115,9 @@ class Job: |
139 | 115 |
return meta
|
140 | 116 |
|
141 | 117 |
def create_lease(self):
|
142 |
- action_digest = self._pack_any(self._action_digest)
|
|
118 |
+ lease = bots_pb2.Lease(id=self.name, state=LeaseState.PENDING.value)
|
|
119 |
+ lease.payload.Pack(self._action_digest)
|
|
143 | 120 |
|
144 |
- lease = bots_pb2.Lease(id=self.name,
|
|
145 |
- payload=action_digest,
|
|
146 |
- state=LeaseState.PENDING.value)
|
|
147 | 121 |
self.lease = lease
|
148 | 122 |
return lease
|
149 | 123 |
|
... | ... | @@ -154,8 +128,3 @@ class Job: |
154 | 128 |
self._execute_stage = stage
|
155 | 129 |
for queue in self._operation_update_queues:
|
156 | 130 |
queue.put(self.get_operation())
|
157 |
- |
|
158 |
- def _pack_any(self, pack):
|
|
159 |
- some_any = any_pb2.Any()
|
|
160 |
- some_any.Pack(pack)
|
|
161 |
- return some_any
|
... | ... | @@ -27,7 +27,7 @@ from buildgrid._exceptions import NotFoundError |
27 | 27 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
28 | 28 |
from buildgrid._protos.google.longrunning import operations_pb2
|
29 | 29 |
|
30 |
-from .job import ExecuteStage, LeaseState
|
|
30 |
+from .job import OperationStage, LeaseState
|
|
31 | 31 |
|
32 | 32 |
|
33 | 33 |
class Scheduler:
|
... | ... | @@ -55,26 +55,26 @@ class Scheduler: |
55 | 55 |
cached_result = self._action_cache.get_action_result(job.action_digest)
|
56 | 56 |
except NotFoundError:
|
57 | 57 |
self.queue.append(job)
|
58 |
- job.update_execute_stage(ExecuteStage.QUEUED)
|
|
58 |
+ job.update_execute_stage(OperationStage.QUEUED)
|
|
59 | 59 |
|
60 | 60 |
else:
|
61 | 61 |
job.result = cached_result
|
62 | 62 |
job.result_cached = True
|
63 |
- job.update_execute_stage(ExecuteStage.COMPLETED)
|
|
63 |
+ job.update_execute_stage(OperationStage.COMPLETED)
|
|
64 | 64 |
|
65 | 65 |
else:
|
66 | 66 |
self.queue.append(job)
|
67 |
- job.update_execute_stage(ExecuteStage.QUEUED)
|
|
67 |
+ job.update_execute_stage(OperationStage.QUEUED)
|
|
68 | 68 |
|
69 | 69 |
def retry_job(self, name):
|
70 | 70 |
if name in self.jobs:
|
71 | 71 |
job = self.jobs[name]
|
72 | 72 |
if job.n_tries >= self.MAX_N_TRIES:
|
73 | 73 |
# TODO: Decide what to do with these jobs
|
74 |
- job.update_execute_stage(ExecuteStage.COMPLETED)
|
|
74 |
+ job.update_execute_stage(OperationStage.COMPLETED)
|
|
75 | 75 |
# TODO: Mark these jobs as done
|
76 | 76 |
else:
|
77 |
- job.update_execute_stage(ExecuteStage.QUEUED)
|
|
77 |
+ job.update_execute_stage(OperationStage.QUEUED)
|
|
78 | 78 |
job.n_tries += 1
|
79 | 79 |
self.queue.appendleft(job)
|
80 | 80 |
|
... | ... | @@ -87,7 +87,7 @@ class Scheduler: |
87 | 87 |
if not job.do_not_cache and self._action_cache is not None:
|
88 | 88 |
if not job.lease.status.code:
|
89 | 89 |
self._action_cache.update_action_result(job.action_digest, action_result)
|
90 |
- job.update_execute_stage(ExecuteStage.COMPLETED)
|
|
90 |
+ job.update_execute_stage(OperationStage.COMPLETED)
|
|
91 | 91 |
|
92 | 92 |
def get_operations(self):
|
93 | 93 |
response = operations_pb2.ListOperationsResponse()
|
... | ... | @@ -111,7 +111,7 @@ class Scheduler: |
111 | 111 |
def create_lease(self):
|
112 | 112 |
if self.queue:
|
113 | 113 |
job = self.queue.popleft()
|
114 |
- job.update_execute_stage(ExecuteStage.EXECUTING)
|
|
114 |
+ job.update_execute_stage(OperationStage.EXECUTING)
|
|
115 | 115 |
job.create_lease()
|
116 | 116 |
job.lease.state = LeaseState.PENDING.value
|
117 | 117 |
return job.lease
|
... | ... | @@ -82,7 +82,7 @@ def test_execute(skip_cache_lookup, instance, context): |
82 | 82 |
assert isinstance(result, operations_pb2.Operation)
|
83 | 83 |
metadata = remote_execution_pb2.ExecuteOperationMetadata()
|
84 | 84 |
result.metadata.Unpack(metadata)
|
85 |
- assert metadata.stage == job.ExecuteStage.QUEUED.value
|
|
85 |
+ assert metadata.stage == job.OperationStage.QUEUED.value
|
|
86 | 86 |
assert uuid.UUID(result.name, version=4)
|
87 | 87 |
assert result.done is False
|
88 | 88 |
|
... | ... | @@ -116,7 +116,7 @@ def test_wait_execution(instance, controller, context): |
116 | 116 |
action_result = remote_execution_pb2.ActionResult()
|
117 | 117 |
action_result_any.Pack(action_result)
|
118 | 118 |
|
119 |
- j.update_execute_stage(job.ExecuteStage.COMPLETED)
|
|
119 |
+ j.update_execute_stage(job.OperationStage.COMPLETED)
|
|
120 | 120 |
|
121 | 121 |
response = instance.WaitExecution(request, context)
|
122 | 122 |
|
... | ... | @@ -125,7 +125,7 @@ def test_wait_execution(instance, controller, context): |
125 | 125 |
assert isinstance(result, operations_pb2.Operation)
|
126 | 126 |
metadata = remote_execution_pb2.ExecuteOperationMetadata()
|
127 | 127 |
result.metadata.Unpack(metadata)
|
128 |
- assert metadata.stage == job.ExecuteStage.COMPLETED.value
|
|
128 |
+ assert metadata.stage == job.OperationStage.COMPLETED.value
|
|
129 | 129 |
assert uuid.UUID(result.name, version=4)
|
130 | 130 |
assert result.done is True
|
131 | 131 |
|