... |
... |
@@ -21,16 +21,16 @@ import grpc |
21
|
21
|
from google.protobuf import any_pb2
|
22
|
22
|
|
23
|
23
|
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
24
|
|
-from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
|
25
|
|
-from buildgrid.utils import read_file
|
|
24
|
+from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
|
|
25
|
+from buildgrid.utils import read_file, parse_to_pb2_from_fetch
|
26
|
26
|
|
27
|
27
|
|
28
|
28
|
def work_buildbox(context, lease):
|
29
|
29
|
logger = context.logger
|
30
|
30
|
|
31
|
|
- action_any = lease.payload
|
32
|
|
- action = remote_execution_pb2.Action()
|
33
|
|
- action_any.Unpack(action)
|
|
31
|
+ action_digest_any = lease.payload
|
|
32
|
+ action_digest = remote_execution_pb2.Digest()
|
|
33
|
+ action_digest_any.Unpack(action_digest)
|
34
|
34
|
|
35
|
35
|
cert_server = read_file(context.server_cert)
|
36
|
36
|
cert_client = read_file(context.client_cert)
|
... |
... |
@@ -45,38 +45,57 @@ def work_buildbox(context, lease): |
45
|
45
|
|
46
|
46
|
stub = bytestream_pb2_grpc.ByteStreamStub(channel)
|
47
|
47
|
|
48
|
|
- remote_command = _buildstream_fetch_command(context.local_cas, stub, action.command_digest)
|
|
48
|
+ action = remote_execution_pb2.Action()
|
|
49
|
+ parse_to_pb2_from_fetch(action, stub, action_digest)
|
|
50
|
+
|
|
51
|
+ casdir = context.local_cas
|
|
52
|
+ remote_command = remote_execution_pb2.Command()
|
|
53
|
+ parse_to_pb2_from_fetch(remote_command, stub, action.command_digest)
|
|
54
|
+
|
49
|
55
|
environment = dict((x.name, x.value) for x in remote_command.environment_variables)
|
50
|
56
|
logger.debug("command hash: {}".format(action.command_digest.hash))
|
51
|
57
|
logger.debug("vdir hash: {}".format(action.input_root_digest.hash))
|
52
|
58
|
logger.debug("\n{}".format(' '.join(remote_command.arguments)))
|
53
|
59
|
|
54
|
|
- command = ['buildbox',
|
55
|
|
- '--remote={}'.format('https://{}:{}'.format(context.remote, context.port)),
|
56
|
|
- '--server-cert={}'.format(context.server_cert),
|
57
|
|
- '--client-key={}'.format(context.client_key),
|
58
|
|
- '--client-cert={}'.format(context.client_cert),
|
59
|
|
- '--local={}'.format(context.local_cas),
|
60
|
|
- '--chdir={}'.format(environment['PWD']),
|
61
|
|
- context.fuse_dir]
|
62
|
|
-
|
63
|
|
- command.extend(remote_command.arguments)
|
64
|
|
-
|
65
|
|
- logger.debug(' '.join(command))
|
66
|
|
- logger.debug("Input root digest:\n{}".format(action.input_root_digest))
|
67
|
|
- logger.info("Launching process")
|
68
|
|
-
|
69
|
|
- proc = subprocess.Popen(command,
|
70
|
|
- stdin=subprocess.PIPE,
|
71
|
|
- stdout=subprocess.PIPE)
|
72
|
|
- std_send = action.input_root_digest.SerializeToString()
|
73
|
|
- std_out, _ = proc.communicate(std_send)
|
74
|
|
-
|
75
|
|
- output_root_digest = remote_execution_pb2.Digest()
|
76
|
|
- output_root_digest.ParseFromString(std_out)
|
77
|
|
- logger.debug("Output root digest: {}".format(output_root_digest))
|
78
|
|
-
|
79
|
|
- output_file = remote_execution_pb2.OutputDirectory(tree_digest=output_root_digest)
|
|
60
|
+ # Input hash must be written to disk for buildbox.
|
|
61
|
+ os.makedirs(os.path.join(casdir, 'tmp'), exist_ok=True)
|
|
62
|
+ with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as input_digest_file:
|
|
63
|
+ with open(input_digest_file.name, 'wb') as f:
|
|
64
|
+ f.write(action.input_root_digest.SerializeToString())
|
|
65
|
+ f.flush()
|
|
66
|
+
|
|
67
|
+ with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as output_digest_file:
|
|
68
|
+ command = ['buildbox',
|
|
69
|
+ '--remote={}'.format('https://{}:{}'.format(context.remote, context.port)),
|
|
70
|
+ '--server-cert={}'.format(context.server_cert),
|
|
71
|
+ '--client-key={}'.format(context.client_key),
|
|
72
|
+ '--client-cert={}'.format(context.client_cert),
|
|
73
|
+ '--input-digest={}'.format(input_digest_file.name),
|
|
74
|
+ '--output-digest={}'.format(output_digest_file.name),
|
|
75
|
+ '--local={}'.format(casdir)]
|
|
76
|
+ if 'PWD' in environment and environment['PWD']:
|
|
77
|
+ command.append('--chdir={}'.format(environment['PWD']))
|
|
78
|
+
|
|
79
|
+ command.append(context.fuse_dir)
|
|
80
|
+ command.extend(remote_command.arguments)
|
|
81
|
+
|
|
82
|
+ logger.debug(' '.join(command))
|
|
83
|
+ logger.debug("Input root digest:\n{}".format(action.input_root_digest))
|
|
84
|
+ logger.info("Launching process")
|
|
85
|
+
|
|
86
|
+ proc = subprocess.Popen(command,
|
|
87
|
+ stdin=subprocess.PIPE,
|
|
88
|
+ stdout=subprocess.PIPE)
|
|
89
|
+ proc.communicate()
|
|
90
|
+
|
|
91
|
+ output_root_digest = remote_execution_pb2.Digest()
|
|
92
|
+ with open(output_digest_file.name, 'rb') as f:
|
|
93
|
+ output_root_digest.ParseFromString(f.read())
|
|
94
|
+ logger.debug("Output root digest: {}".format(output_root_digest))
|
|
95
|
+
|
|
96
|
+ if len(output_root_digest.hash) < 64:
|
|
97
|
+ logger.warning("Buildbox command failed - no output root digest present.")
|
|
98
|
+ output_file = remote_execution_pb2.OutputDirectory(tree_digest=output_root_digest)
|
80
|
99
|
|
81
|
100
|
action_result = remote_execution_pb2.ActionResult()
|
82
|
101
|
action_result.output_directories.extend([output_file])
|
... |
... |
@@ -87,33 +106,3 @@ def work_buildbox(context, lease): |
87
|
106
|
lease.result.CopyFrom(action_result_any)
|
88
|
107
|
|
89
|
108
|
return lease
|
90
|
|
-
|
91
|
|
-
|
92
|
|
-def _buildstream_fetch_blob(remote, digest, out):
|
93
|
|
- resource_name = os.path.join(digest.hash, str(digest.size_bytes))
|
94
|
|
- request = bytestream_pb2.ReadRequest()
|
95
|
|
- request.resource_name = resource_name
|
96
|
|
- request.read_offset = 0
|
97
|
|
- for response in remote.Read(request):
|
98
|
|
- out.write(response.data)
|
99
|
|
-
|
100
|
|
- out.flush()
|
101
|
|
- assert digest.size_bytes == os.fstat(out.fileno()).st_size
|
102
|
|
-
|
103
|
|
-
|
104
|
|
-def _buildstream_fetch_command(casdir, remote, digest):
|
105
|
|
- with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as out:
|
106
|
|
- _buildstream_fetch_blob(remote, digest, out)
|
107
|
|
- remote_command = remote_execution_pb2.Command()
|
108
|
|
- with open(out.name, 'rb') as f:
|
109
|
|
- remote_command.ParseFromString(f.read())
|
110
|
|
- return remote_command
|
111
|
|
-
|
112
|
|
-
|
113
|
|
-def _buildstream_fetch_action(casdir, remote, digest):
|
114
|
|
- with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as out:
|
115
|
|
- _buildstream_fetch_blob(remote, digest, out)
|
116
|
|
- remote_action = remote_execution_pb2.Action()
|
117
|
|
- with open(out.name, 'rb') as f:
|
118
|
|
- remote_action.ParseFromString(f.read())
|
119
|
|
- return remote_action
|