Martin Blanchard pushed to branch mablanch/83-executed-action-metadata at BuildGrid / buildgrid
Commits:
-
fff0e133
by Martin Blanchard at 2018-10-25T08:23:06Z
-
bba10d36
by Martin Blanchard at 2018-10-25T08:24:20Z
-
d1849bc1
by Martin Blanchard at 2018-10-25T08:24:20Z
3 changed files:
Changes:
... | ... | @@ -17,16 +17,32 @@ import random |
17 | 17 |
import time
|
18 | 18 |
|
19 | 19 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
20 |
+from buildgrid.utils import get_hostname
|
|
20 | 21 |
|
21 | 22 |
|
22 | 23 |
def work_dummy(context, lease):
|
23 | 24 |
""" Just returns lease after some random time
|
24 | 25 |
"""
|
26 |
+ action_result = remote_execution_pb2.ActionResult()
|
|
27 |
+ |
|
25 | 28 |
lease.result.Clear()
|
26 | 29 |
|
27 |
- time.sleep(random.randint(1, 5))
|
|
30 |
+ action_result.execution_metadata.worker = get_hostname()
|
|
28 | 31 |
|
29 |
- action_result = remote_execution_pb2.ActionResult()
|
|
32 |
+ # Simulation input-downloading phase:
|
|
33 |
+ action_result.execution_metadata.input_fetch_start_timestamp.GetCurrentTime()
|
|
34 |
+ time.sleep(random.random())
|
|
35 |
+ action_result.execution_metadata.input_fetch_completed_timestamp.GetCurrentTime()
|
|
36 |
+ |
|
37 |
+ # Simulation execution phase:
|
|
38 |
+ action_result.execution_metadata.execution_start_timestamp.GetCurrentTime()
|
|
39 |
+ time.sleep(random.random())
|
|
40 |
+ action_result.execution_metadata.execution_completed_timestamp.GetCurrentTime()
|
|
41 |
+ |
|
42 |
+ # Simulation output-uploading phase:
|
|
43 |
+ action_result.execution_metadata.output_upload_start_timestamp.GetCurrentTime()
|
|
44 |
+ time.sleep(random.random())
|
|
45 |
+ action_result.execution_metadata.output_upload_completed_timestamp.GetCurrentTime()
|
|
30 | 46 |
|
31 | 47 |
lease.result.Pack(action_result)
|
32 | 48 |
|
... | ... | @@ -19,7 +19,7 @@ import tempfile |
19 | 19 |
|
20 | 20 |
from buildgrid.client.cas import download, upload
|
21 | 21 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
22 |
-from buildgrid.utils import output_file_maker, output_directory_maker
|
|
22 |
+from buildgrid.utils import get_hostname, output_file_maker, output_directory_maker
|
|
23 | 23 |
|
24 | 24 |
|
25 | 25 |
def work_host_tools(context, lease):
|
... | ... | @@ -29,10 +29,13 @@ def work_host_tools(context, lease): |
29 | 29 |
logger = context.logger
|
30 | 30 |
|
31 | 31 |
action_digest = remote_execution_pb2.Digest()
|
32 |
+ action_result = remote_execution_pb2.ActionResult()
|
|
32 | 33 |
|
33 | 34 |
lease.payload.Unpack(action_digest)
|
34 | 35 |
lease.result.Clear()
|
35 | 36 |
|
37 |
+ action_result.execution_metadata.worker = get_hostname()
|
|
38 |
+ |
|
36 | 39 |
with tempfile.TemporaryDirectory() as temp_directory:
|
37 | 40 |
with download(context.cas_channel, instance=instance_name) as downloader:
|
38 | 41 |
action = downloader.get_message(action_digest,
|
... | ... | @@ -43,8 +46,12 @@ def work_host_tools(context, lease): |
43 | 46 |
command = downloader.get_message(action.command_digest,
|
44 | 47 |
remote_execution_pb2.Command())
|
45 | 48 |
|
49 |
+ action_result.execution_metadata.input_fetch_start_timestamp.GetCurrentTime()
|
|
50 |
+ |
|
46 | 51 |
downloader.download_directory(action.input_root_digest, temp_directory)
|
47 | 52 |
|
53 |
+ action_result.execution_metadata.input_fetch_completed_timestamp.GetCurrentTime()
|
|
54 |
+ |
|
48 | 55 |
environment = os.environ.copy()
|
49 | 56 |
for variable in command.environment_variables:
|
50 | 57 |
if variable.name not in ['PATH', 'PWD']:
|
... | ... | @@ -70,6 +77,8 @@ def work_host_tools(context, lease): |
70 | 77 |
|
71 | 78 |
logger.debug(' '.join(command_line))
|
72 | 79 |
|
80 |
+ action_result.execution_metadata.execution_start_timestamp.GetCurrentTime()
|
|
81 |
+ |
|
73 | 82 |
process = subprocess.Popen(command_line,
|
74 | 83 |
cwd=working_directory,
|
75 | 84 |
env=environment,
|
... | ... | @@ -80,7 +89,8 @@ def work_host_tools(context, lease): |
80 | 89 |
stdout, stderr = process.communicate()
|
81 | 90 |
returncode = process.returncode
|
82 | 91 |
|
83 |
- action_result = remote_execution_pb2.ActionResult()
|
|
92 |
+ action_result.execution_metadata.execution_completed_timestamp.GetCurrentTime()
|
|
93 |
+ |
|
84 | 94 |
# TODO: Upload to CAS or output RAW
|
85 | 95 |
# For now, just pass raw
|
86 | 96 |
# https://gitlab.com/BuildGrid/buildgrid/issues/90
|
... | ... | @@ -92,6 +102,8 @@ def work_host_tools(context, lease): |
92 | 102 |
logger.debug("Command stdout: [{}]".format(stdout))
|
93 | 103 |
logger.debug("Command exit code: [{}]".format(returncode))
|
94 | 104 |
|
105 |
+ action_result.execution_metadata.output_upload_start_timestamp.GetCurrentTime()
|
|
106 |
+ |
|
95 | 107 |
with upload(context.cas_channel, instance=instance_name) as uploader:
|
96 | 108 |
output_files, output_directories = [], []
|
97 | 109 |
|
... | ... | @@ -121,6 +133,8 @@ def work_host_tools(context, lease): |
121 | 133 |
|
122 | 134 |
action_result.output_directories.extend(output_directories)
|
123 | 135 |
|
136 |
+ action_result.execution_metadata.output_upload_completed_timestamp.GetCurrentTime()
|
|
137 |
+ |
|
124 | 138 |
lease.result.Pack(action_result)
|
125 | 139 |
|
126 | 140 |
return lease
|
... | ... | @@ -25,10 +25,12 @@ from urllib.parse import urlparse |
25 | 25 |
import sys
|
26 | 26 |
|
27 | 27 |
import click
|
28 |
+from google.protobuf import json_format
|
|
28 | 29 |
import grpc
|
29 | 30 |
|
30 | 31 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
31 | 32 |
from buildgrid._protos.google.longrunning import operations_pb2, operations_pb2_grpc
|
33 |
+from buildgrid._protos.google.rpc import code_pb2
|
|
32 | 34 |
|
33 | 35 |
from ..cli import pass_context
|
34 | 36 |
|
... | ... | @@ -65,45 +67,125 @@ def cli(context, remote, instance_name, client_key, client_cert, server_cert): |
65 | 67 |
context.logger.debug("Starting for remote {}".format(context.remote))
|
66 | 68 |
|
67 | 69 |
|
70 |
+def _print_operation_status(operation, print_details=False):
|
|
71 |
+ metadata = remote_execution_pb2.ExecuteOperationMetadata()
|
|
72 |
+ # The metadata is expected to be an ExecuteOperationMetadata message:
|
|
73 |
+ assert operation.metadata.Is(metadata.DESCRIPTOR)
|
|
74 |
+ operation.metadata.Unpack(metadata)
|
|
75 |
+ |
|
76 |
+ stage_name = remote_execution_pb2.ExecuteOperationMetadata.Stage.Name(
|
|
77 |
+ metadata.stage).upper()
|
|
78 |
+ |
|
79 |
+ if not operation.done:
|
|
80 |
+ if stage_name == 'CACHE_CHECK':
|
|
81 |
+ click.echo('CacheCheck: {}: Querying action-cache (stage={})'
|
|
82 |
+ .format(operation.name, metadata.stage))
|
|
83 |
+ elif stage_name == 'QUEUED':
|
|
84 |
+ click.echo('Queued: {}: Waiting for execution (stage={})'
|
|
85 |
+ .format(operation.name, metadata.stage))
|
|
86 |
+ elif stage_name == 'EXECUTING':
|
|
87 |
+ click.echo('Executing: {}: Currently running (stage={})'
|
|
88 |
+ .format(operation.name, metadata.stage))
|
|
89 |
+ else:
|
|
90 |
+ click.echo('Error: {}: In an invalid state (stage={})'
|
|
91 |
+ .format(operation.name, metadata.stage), err=True)
|
|
92 |
+ return
|
|
93 |
+ |
|
94 |
+ response = remote_execution_pb2.ExecuteResponse()
|
|
95 |
+ # The response is expected to be an ExecutionResponse message:
|
|
96 |
+ assert operation.response.Is(response.DESCRIPTOR)
|
|
97 |
+ operation.response.Unpack(response)
|
|
98 |
+ |
|
99 |
+ if response.status.code != code_pb2.OK:
|
|
100 |
+ click.echo('Failure: {}: {} (code={})'
|
|
101 |
+ .format(operation.name, response.status.message, response.status.code))
|
|
102 |
+ else:
|
|
103 |
+ if response.result.exit_code != 0:
|
|
104 |
+ click.echo('Success: {}: Completed with failure (stage={}, exit_code={})'
|
|
105 |
+ .format(operation.name, metadata.stage, response.result.exit_code))
|
|
106 |
+ else:
|
|
107 |
+ click.echo('Success: {}: Completed succesfully (stage={}, exit_code={})'
|
|
108 |
+ .format(operation.name, metadata.stage, response.result.exit_code))
|
|
109 |
+ |
|
110 |
+ if print_details:
|
|
111 |
+ metadata = response.result.execution_metadata
|
|
112 |
+ click.echo(' worker={}'.format(metadata.worker))
|
|
113 |
+ |
|
114 |
+ queued = metadata.queued_timestamp.ToDatetime()
|
|
115 |
+ click.echo(' queued_at={}'.format(queued))
|
|
116 |
+ |
|
117 |
+ worker_start = metadata.worker_start_timestamp.ToDatetime()
|
|
118 |
+ worker_completed = metadata.worker_completed_timestamp.ToDatetime()
|
|
119 |
+ click.echo(' work_duration={}'.format(worker_completed - worker_start))
|
|
120 |
+ |
|
121 |
+ fetch_start = metadata.input_fetch_start_timestamp.ToDatetime()
|
|
122 |
+ fetch_completed = metadata.input_fetch_completed_timestamp.ToDatetime()
|
|
123 |
+ click.echo(' fetch_duration={}'.format(fetch_completed - fetch_start))
|
|
124 |
+ |
|
125 |
+ execution_start = metadata.execution_start_timestamp.ToDatetime()
|
|
126 |
+ execution_completed = metadata.execution_completed_timestamp.ToDatetime()
|
|
127 |
+ click.echo(' exection_duration={}'.format(execution_completed - execution_start))
|
|
128 |
+ |
|
129 |
+ upload_start = metadata.output_upload_start_timestamp.ToDatetime()
|
|
130 |
+ upload_completed = metadata.output_upload_completed_timestamp.ToDatetime()
|
|
131 |
+ click.echo(' upload_duration={}'.format(upload_completed - upload_start))
|
|
132 |
+ |
|
133 |
+ click.echo(' total_duration={}'.format(worker_completed - queued))
|
|
134 |
+ |
|
135 |
+ |
|
68 | 136 |
@cli.command('status', short_help="Get the status of an operation.")
|
69 | 137 |
@click.argument('operation-name', nargs=1, type=click.STRING, required=True)
|
138 |
+@click.option('--json', is_flag=True, show_default=True,
|
|
139 |
+ help="Print operations status in JSON format.")
|
|
70 | 140 |
@pass_context
|
71 |
-def status(context, operation_name):
|
|
72 |
- context.logger.info("Getting operation status...")
|
|
141 |
+def status(context, operation_name, json):
|
|
73 | 142 |
stub = operations_pb2_grpc.OperationsStub(context.channel)
|
74 |
- |
|
75 | 143 |
request = operations_pb2.GetOperationRequest(name=operation_name)
|
76 | 144 |
|
77 |
- response = stub.GetOperation(request)
|
|
78 |
- context.logger.info(response)
|
|
145 |
+ operation = stub.GetOperation(request)
|
|
146 |
+ |
|
147 |
+ if not json:
|
|
148 |
+ _print_operation_status(operation, print_details=True)
|
|
149 |
+ else:
|
|
150 |
+ click.echo(json_format.MessageToJson(operation))
|
|
79 | 151 |
|
80 | 152 |
|
81 | 153 |
@cli.command('list', short_help="List operations.")
|
154 |
+@click.option('--json', is_flag=True, show_default=True,
|
|
155 |
+ help="Print operations list in JSON format.")
|
|
82 | 156 |
@pass_context
|
83 |
-def lists(context):
|
|
84 |
- context.logger.info("Getting list of operations")
|
|
157 |
+def lists(context, json):
|
|
85 | 158 |
stub = operations_pb2_grpc.OperationsStub(context.channel)
|
86 |
- |
|
87 | 159 |
request = operations_pb2.ListOperationsRequest(name=context.instance_name)
|
88 | 160 |
|
89 | 161 |
response = stub.ListOperations(request)
|
90 | 162 |
|
91 | 163 |
if not response.operations:
|
92 |
- context.logger.warning("No operations to list")
|
|
164 |
+ click.echo('Error: No operations to list', err=True)
|
|
93 | 165 |
return
|
94 | 166 |
|
95 |
- for op in response.operations:
|
|
96 |
- context.logger.info(op)
|
|
167 |
+ for operation in response.operations:
|
|
168 |
+ if not json:
|
|
169 |
+ _print_operation_status(operation)
|
|
170 |
+ else:
|
|
171 |
+ click.echo(json_format.MessageToJson(operation))
|
|
97 | 172 |
|
98 | 173 |
|
99 | 174 |
@cli.command('wait', short_help="Streams an operation until it is complete.")
|
100 | 175 |
@click.argument('operation-name', nargs=1, type=click.STRING, required=True)
|
176 |
+@click.option('--json', is_flag=True, show_default=True,
|
|
177 |
+ help="Print operations statuses in JSON format.")
|
|
101 | 178 |
@pass_context
|
102 |
-def wait(context, operation_name):
|
|
179 |
+def wait(context, operation_name, json):
|
|
103 | 180 |
stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
|
104 | 181 |
request = remote_execution_pb2.WaitExecutionRequest(name=operation_name)
|
105 | 182 |
|
106 |
- response = stub.WaitExecution(request)
|
|
183 |
+ operation_iterator = stub.WaitExecution(request)
|
|
107 | 184 |
|
108 |
- for stream in response:
|
|
109 |
- context.logger.info(stream)
|
|
185 |
+ for operation in operation_iterator:
|
|
186 |
+ if not json and operation.done:
|
|
187 |
+ _print_operation_status(operation, print_details=True)
|
|
188 |
+ elif not json:
|
|
189 |
+ _print_operation_status(operation)
|
|
190 |
+ else:
|
|
191 |
+ click.echo(json_format.MessageToJson(operation))
|