finnball pushed to branch finn/update-execution-api at BuildGrid / buildgrid
Commits:
-
260ecd2b
by finn at 2018-08-01T08:34:17Z
4 changed files:
- app/commands/cmd_execute.py
- buildgrid/server/execution/execution_instance.py
- buildgrid/server/execution/execution_service.py
- tests/integration/execution_service.py
Changes:
... | ... | @@ -53,7 +53,6 @@ def cli(context, host, port): |
53 | 53 |
@pass_context
|
54 | 54 |
def request(context, number, instance_name, wait_for_completion):
|
55 | 55 |
action_digest = remote_execution_pb2.Digest()
|
56 |
- action_digest.hash = 'zhora'
|
|
57 | 56 |
|
58 | 57 |
context.logger.info("Sending execution request...\n")
|
59 | 58 |
stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
|
... | ... | @@ -61,24 +60,16 @@ def request(context, number, instance_name, wait_for_completion): |
61 | 60 |
request = remote_execution_pb2.ExecuteRequest(instance_name = instance_name,
|
62 | 61 |
action_digest = action_digest,
|
63 | 62 |
skip_cache_lookup = True)
|
63 |
+ responses = []
|
|
64 | 64 |
for i in range(0, number):
|
65 |
- response = stub.Execute(request)
|
|
66 |
- for r in response:
|
|
67 |
- context.logger.info(r)
|
|
68 |
- |
|
69 |
- try:
|
|
70 |
- while wait_for_completion:
|
|
71 |
- request = operations_pb2.ListOperationsRequest()
|
|
72 |
- context.logger.debug('Querying to see if jobs are complete.')
|
|
73 |
- stub = operations_pb2_grpc.OperationsStub(context.channel)
|
|
74 |
- response = stub.ListOperations(request)
|
|
75 |
- if all(operation.done for operation in response.operations):
|
|
76 |
- context.logger.info('Jobs complete')
|
|
77 |
- break
|
|
78 |
- time.sleep(1)
|
|
79 |
- |
|
80 |
- except KeyboardInterrupt:
|
|
81 |
- pass
|
|
65 |
+ responses.append(stub.Execute(request))
|
|
66 |
+ |
|
67 |
+ for response in responses:
|
|
68 |
+ if wait_for_completion:
|
|
69 |
+ for stream in response:
|
|
70 |
+ context.logger.info(stream)
|
|
71 |
+ else:
|
|
72 |
+ context.logger.info(next(response))
|
|
82 | 73 |
|
83 | 74 |
@cli.command('status', short_help='Get the status of an operation')
|
84 | 75 |
@click.argument('operation-name')
|
... | ... | @@ -108,3 +99,15 @@ def list_operations(context): |
108 | 99 |
|
109 | 100 |
for op in response.operations:
|
110 | 101 |
context.logger.info(op)
|
102 |
+ |
|
103 |
+@cli.command('wait', short_help='Streams an operation until it is complete')
|
|
104 |
+@click.argument('operation-name')
|
|
105 |
+@pass_context
|
|
106 |
+def wait_execution(context, operation_name):
|
|
107 |
+ stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
|
|
108 |
+ request = remote_execution_pb2.WaitExecutionRequest(name=operation_name)
|
|
109 |
+ |
|
110 |
+ response = stub.WaitExecution(request)
|
|
111 |
+ |
|
112 |
+ for stream in response:
|
|
113 |
+ context.logger.info(stream)
|
... | ... | @@ -50,7 +50,6 @@ class ExecutionInstance(): |
50 | 50 |
return job.get_operation()
|
51 | 51 |
|
52 | 52 |
def get_operation(self, name):
|
53 |
- self.logger.debug("Getting operation: {}".format(name))
|
|
54 | 53 |
operation = self._scheduler.jobs.get(name)
|
55 | 54 |
if operation is None:
|
56 | 55 |
raise InvalidArgumentError("Operation name does not exist: {}".format(name))
|
... | ... | @@ -60,11 +59,9 @@ class ExecutionInstance(): |
60 | 59 |
def list_operations(self, name, list_filter, page_size, page_token):
|
61 | 60 |
# TODO: Pages
|
62 | 61 |
# Spec says number of pages and length of a page are optional
|
63 |
- self.logger.debug("Listing operations")
|
|
64 | 62 |
return self._scheduler.get_operations()
|
65 | 63 |
|
66 | 64 |
def delete_operation(self, name):
|
67 |
- self.logger.debug("Deleting operation {}".format(name))
|
|
68 | 65 |
try:
|
69 | 66 |
self._scheduler.jobs.pop(name)
|
70 | 67 |
except KeyError:
|
... | ... | @@ -22,8 +22,10 @@ ExecutionService |
22 | 22 |
Serves remote execution requests.
|
23 | 23 |
"""
|
24 | 24 |
|
25 |
+import copy
|
|
25 | 26 |
import grpc
|
26 | 27 |
import logging
|
28 |
+import time
|
|
27 | 29 |
|
28 | 30 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
29 | 31 |
from buildgrid._protos.google.longrunning import operations_pb2_grpc, operations_pb2
|
... | ... | @@ -40,17 +42,36 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
40 | 42 |
# Ignore request.instance_name for now
|
41 | 43 |
# Have only one instance
|
42 | 44 |
try:
|
43 |
- yield self._instance.execute(request.action_digest,
|
|
44 |
- request.skip_cache_lookup)
|
|
45 |
+ operation = self._instance.execute(request.action_digest,
|
|
46 |
+ request.skip_cache_lookup)
|
|
47 |
+ |
|
48 |
+ yield from self._stream_operation_updates(operation.name)
|
|
45 | 49 |
|
46 | 50 |
except InvalidArgumentError as e:
|
47 | 51 |
self.logger.error(e)
|
48 | 52 |
context.set_details(str(e))
|
49 | 53 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
50 |
- yield operations_pb2.Operation()
|
|
51 | 54 |
|
52 | 55 |
except NotImplementedError as e:
|
53 | 56 |
self.logger.error(e)
|
54 | 57 |
context.set_details(str(e))
|
55 | 58 |
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
56 |
- yield operations_pb2.Operation()
|
|
59 |
+ |
|
60 |
+ def WaitExecution(self, request, context):
|
|
61 |
+ try:
|
|
62 |
+ yield from self._stream_operation_updates(request.name)
|
|
63 |
+ |
|
64 |
+ except InvalidArgumentError as e:
|
|
65 |
+ self.logger.error(e)
|
|
66 |
+ context.set_details(str(e))
|
|
67 |
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
|
68 |
+ |
|
69 |
+ def _stream_operation_updates(self, name):
|
|
70 |
+ stream_previous = None
|
|
71 |
+ while True:
|
|
72 |
+ stream = self._instance.get_operation(name)
|
|
73 |
+ if stream != stream_previous:
|
|
74 |
+ yield stream
|
|
75 |
+ if stream.done == True: break
|
|
76 |
+ stream_previous = copy.deepcopy(stream)
|
|
77 |
+ time.sleep(1)
|
... | ... | @@ -56,15 +56,30 @@ def test_execute(skip_cache_lookup, instance, context): |
56 | 56 |
action_digest = action_digest,
|
57 | 57 |
skip_cache_lookup = skip_cache_lookup)
|
58 | 58 |
response = instance.Execute(request, context)
|
59 |
- |
|
60 |
- for result in response:
|
|
59 |
+ if skip_cache_lookup is False:
|
|
60 |
+ [r for r in response]
|
|
61 |
+ context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
|
|
62 |
+ else:
|
|
63 |
+ result = next(response)
|
|
61 | 64 |
assert isinstance(result, operations_pb2.Operation)
|
65 |
+ metadata = remote_execution_pb2.ExecuteOperationMetadata()
|
|
66 |
+ result.metadata.Unpack(metadata)
|
|
67 |
+ assert metadata.stage == job.ExecuteStage.QUEUED.value
|
|
68 |
+ assert uuid.UUID(result.name, version=4)
|
|
69 |
+ assert result.done is False
|
|
70 |
+ |
|
71 |
+def test_wait_execution(instance, context):
|
|
72 |
+ action_digest = remote_execution_pb2.Digest()
|
|
73 |
+ action_digest.hash = 'zhora'
|
|
74 |
+ |
|
75 |
+ execution_request = remote_execution_pb2.ExecuteRequest(instance_name = '',
|
|
76 |
+ action_digest = action_digest,
|
|
77 |
+ skip_cache_lookup = True)
|
|
78 |
+ execution_response = next(instance.Execute(execution_request, context))
|
|
79 |
+ |
|
80 |
+ |
|
81 |
+ request = remote_execution_pb2.WaitExecutionRequest(name=execution_response.name)
|
|
82 |
+ |
|
83 |
+ response = next(instance.WaitExecution(request, context))
|
|
62 | 84 |
|
63 |
- if skip_cache_lookup is False:
|
|
64 |
- context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
|
|
65 |
- else:
|
|
66 |
- metadata = remote_execution_pb2.ExecuteOperationMetadata()
|
|
67 |
- result.metadata.Unpack(metadata)
|
|
68 |
- assert metadata.stage == job.ExecuteStage.QUEUED.value
|
|
69 |
- assert uuid.UUID(result.name, version=4)
|
|
70 |
- assert result.done is False
|
|
85 |
+ assert response == execution_response
|