finnball pushed to branch finn/update-execution-api at BuildGrid / buildgrid
Commits:
-
d5483e72
by finn at 2018-07-31T14:44:33Z
2 changed files:
Changes:
... | ... | @@ -61,24 +61,16 @@ def request(context, number, instance_name, wait_for_completion): |
61 | 61 |
request = remote_execution_pb2.ExecuteRequest(instance_name = instance_name,
|
62 | 62 |
action_digest = action_digest,
|
63 | 63 |
skip_cache_lookup = True)
|
64 |
+ responses = []
|
|
64 | 65 |
for i in range(0, number):
|
65 |
- response = stub.Execute(request)
|
|
66 |
- for r in response:
|
|
67 |
- context.logger.info(r)
|
|
68 |
- |
|
69 |
- try:
|
|
70 |
- while wait_for_completion:
|
|
71 |
- request = operations_pb2.ListOperationsRequest()
|
|
72 |
- context.logger.debug('Querying to see if jobs are complete.')
|
|
73 |
- stub = operations_pb2_grpc.OperationsStub(context.channel)
|
|
74 |
- response = stub.ListOperations(request)
|
|
75 |
- if all(operation.done for operation in response.operations):
|
|
76 |
- context.logger.info('Jobs complete')
|
|
77 |
- break
|
|
78 |
- time.sleep(1)
|
|
79 |
- |
|
80 |
- except KeyboardInterrupt:
|
|
81 |
- pass
|
|
66 |
+ responses.append(stub.Execute(request))
|
|
67 |
+ |
|
68 |
+ for response in responses:
|
|
69 |
+ if wait_for_completion:
|
|
70 |
+ for stream in response:
|
|
71 |
+ context.logger.info(stream)
|
|
72 |
+ else:
|
|
73 |
+ context.logger.info(next(response))
|
|
82 | 74 |
|
83 | 75 |
@cli.command('status', short_help='Get the status of an operation')
|
84 | 76 |
@click.argument('operation-name')
|
... | ... | @@ -108,3 +100,15 @@ def list_operations(context): |
108 | 100 |
|
109 | 101 |
for op in response.operations:
|
110 | 102 |
context.logger.info(op)
|
103 |
+ |
|
104 |
+@cli.command('wait', short_help='Streams an operation until it is complete')
|
|
105 |
+@click.argument('operation-name')
|
|
106 |
+@pass_context
|
|
107 |
+def wait_execution(context, operation_name):
|
|
108 |
+ stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
|
|
109 |
+ request = remote_execution_pb2.WaitExecutionRequest(name=operation_name)
|
|
110 |
+ |
|
111 |
+ response = stub.WaitExecution(request)
|
|
112 |
+ |
|
113 |
+ for stream in response:
|
|
114 |
+ context.logger.info(stream)
|
... | ... | @@ -22,6 +22,7 @@ ExecutionService |
22 | 22 |
Serves remote execution requests.
|
23 | 23 |
"""
|
24 | 24 |
|
25 |
+import copy
|
|
25 | 26 |
import grpc
|
26 | 27 |
import logging
|
27 | 28 |
|
... | ... | @@ -40,8 +41,16 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
40 | 41 |
# Ignore request.instance_name for now
|
41 | 42 |
# Have only one instance
|
42 | 43 |
try:
|
43 |
- yield self._instance.execute(request.action_digest,
|
|
44 |
- request.skip_cache_lookup)
|
|
44 |
+ operation = self._instance.execute(request.action_digest,
|
|
45 |
+ request.skip_cache_lookup)
|
|
46 |
+ |
|
47 |
+ stream_previous = None
|
|
48 |
+ while True:
|
|
49 |
+ stream = self._instance.get_operation(operation.name)
|
|
50 |
+ if stream != stream_previous:
|
|
51 |
+ yield stream
|
|
52 |
+ if stream.done == True: break
|
|
53 |
+ stream_previous = copy.deepcopy(stream)
|
|
45 | 54 |
|
46 | 55 |
except InvalidArgumentError as e:
|
47 | 56 |
self.logger.error(e)
|
... | ... | @@ -54,3 +63,19 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
54 | 63 |
context.set_details(str(e))
|
55 | 64 |
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
56 | 65 |
yield operations_pb2.Operation()
|
66 |
+ |
|
67 |
+ def WaitExecution(self, request, context):
|
|
68 |
+ try:
|
|
69 |
+ stream_previous = None
|
|
70 |
+ while True:
|
|
71 |
+ stream = self._instance.get_operation(request.name)
|
|
72 |
+ if stream != stream_previous:
|
|
73 |
+ yield stream
|
|
74 |
+ if stream.done == True: break
|
|
75 |
+ stream_previous = copy.deepcopy(stream)
|
|
76 |
+ |
|
77 |
+ except InvalidArgumentError as e:
|
|
78 |
+ self.logger.error(e)
|
|
79 |
+ context.set_details(str(e))
|
|
80 |
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
|
81 |
+ yield operations_pb2.Operation()
|