Qinusty pushed to branch Qinusty/531-fetch-retries-on-terminate at BuildStream / buildstream
Commits:
-
8b84a51a
by Jürg Billeter at 2018-08-15T13:02:32Z
-
1202ef8a
by Jürg Billeter at 2018-08-15T13:02:32Z
-
6a9d737e
by Jürg Billeter at 2018-08-15T13:02:32Z
-
00762442
by Jürg Billeter at 2018-08-15T14:13:08Z
-
24a35ccf
by Josh Smith at 2018-08-15T14:21:09Z
-
a9874620
by Josh Smith at 2018-08-15T14:21:09Z
6 changed files:
- buildstream/_artifactcache/cascache.py
- buildstream/_artifactcache/casserver.py
- buildstream/_scheduler/jobs/job.py
- + tests/frontend/interruptions.py
- + tests/frontend/interruptions/elements/remote.bst
- + tests/frontend/interruptions/project.conf
Changes:
... | ... | @@ -24,6 +24,7 @@ import os |
24 | 24 |
import signal
|
25 | 25 |
import stat
|
26 | 26 |
import tempfile
|
27 |
+import uuid
|
|
27 | 28 |
from urllib.parse import urlparse
|
28 | 29 |
|
29 | 30 |
import grpc
|
... | ... | @@ -309,8 +310,11 @@ class CASCache(ArtifactCache): |
309 | 310 |
# Upload any blobs missing on the server
|
310 | 311 |
skipped_remote = False
|
311 | 312 |
for digest in missing_blobs.values():
|
313 |
+ uuid_ = uuid.uuid4()
|
|
314 |
+ resource_name = '/'.join(['uploads', str(uuid_), 'blobs',
|
|
315 |
+ digest.hash, str(digest.size_bytes)])
|
|
316 |
+ |
|
312 | 317 |
def request_stream():
|
313 |
- resource_name = os.path.join(digest.hash, str(digest.size_bytes))
|
|
314 | 318 |
with open(self.objpath(digest), 'rb') as f:
|
315 | 319 |
assert os.fstat(f.fileno()).st_size == digest.size_bytes
|
316 | 320 |
offset = 0
|
... | ... | @@ -747,7 +751,7 @@ class CASCache(ArtifactCache): |
747 | 751 |
yield from self._required_blobs(dirnode.digest)
|
748 | 752 |
|
749 | 753 |
def _fetch_blob(self, remote, digest, out):
|
750 |
- resource_name = os.path.join(digest.hash, str(digest.size_bytes))
|
|
754 |
+ resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
|
|
751 | 755 |
request = bytestream_pb2.ReadRequest()
|
752 | 756 |
request.resource_name = resource_name
|
753 | 757 |
request.read_offset = 0
|
... | ... | @@ -23,6 +23,7 @@ import os |
23 | 23 |
import signal
|
24 | 24 |
import sys
|
25 | 25 |
import tempfile
|
26 |
+import uuid
|
|
26 | 27 |
|
27 | 28 |
import click
|
28 | 29 |
import grpc
|
... | ... | @@ -130,12 +131,21 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
130 | 131 |
|
131 | 132 |
def Read(self, request, context):
|
132 | 133 |
resource_name = request.resource_name
|
133 |
- client_digest = _digest_from_resource_name(resource_name)
|
|
134 |
- assert request.read_offset <= client_digest.size_bytes
|
|
134 |
+ client_digest = _digest_from_download_resource_name(resource_name)
|
|
135 |
+ if client_digest is None:
|
|
136 |
+ context.set_code(grpc.StatusCode.NOT_FOUND)
|
|
137 |
+ return
|
|
138 |
+ |
|
139 |
+ if request.read_offset > client_digest.size_bytes:
|
|
140 |
+ context.set_code(grpc.StatusCode.OUT_OF_RANGE)
|
|
141 |
+ return
|
|
135 | 142 |
|
136 | 143 |
try:
|
137 | 144 |
with open(self.cas.objpath(client_digest), 'rb') as f:
|
138 |
- assert os.fstat(f.fileno()).st_size == client_digest.size_bytes
|
|
145 |
+ if os.fstat(f.fileno()).st_size != client_digest.size_bytes:
|
|
146 |
+ context.set_code(grpc.StatusCode.NOT_FOUND)
|
|
147 |
+ return
|
|
148 |
+ |
|
139 | 149 |
if request.read_offset > 0:
|
140 | 150 |
f.seek(request.read_offset)
|
141 | 151 |
|
... | ... | @@ -163,12 +173,18 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
163 | 173 |
resource_name = None
|
164 | 174 |
with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
|
165 | 175 |
for request in request_iterator:
|
166 |
- assert not finished
|
|
167 |
- assert request.write_offset == offset
|
|
176 |
+ if finished or request.write_offset != offset:
|
|
177 |
+ context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
|
178 |
+ return response
|
|
179 |
+ |
|
168 | 180 |
if resource_name is None:
|
169 | 181 |
# First request
|
170 | 182 |
resource_name = request.resource_name
|
171 |
- client_digest = _digest_from_resource_name(resource_name)
|
|
183 |
+ client_digest = _digest_from_upload_resource_name(resource_name)
|
|
184 |
+ if client_digest is None:
|
|
185 |
+ context.set_code(grpc.StatusCode.NOT_FOUND)
|
|
186 |
+ return response
|
|
187 |
+ |
|
172 | 188 |
try:
|
173 | 189 |
_clean_up_cache(self.cas, client_digest.size_bytes)
|
174 | 190 |
except ArtifactTooLargeException as e:
|
... | ... | @@ -177,14 +193,20 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
177 | 193 |
return response
|
178 | 194 |
elif request.resource_name:
|
179 | 195 |
# If it is set on subsequent calls, it **must** match the value of the first request.
|
180 |
- assert request.resource_name == resource_name
|
|
196 |
+ if request.resource_name != resource_name:
|
|
197 |
+ context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
|
198 |
+ return response
|
|
181 | 199 |
out.write(request.data)
|
182 | 200 |
offset += len(request.data)
|
183 | 201 |
if request.finish_write:
|
184 |
- assert client_digest.size_bytes == offset
|
|
202 |
+ if client_digest.size_bytes != offset:
|
|
203 |
+ context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
|
204 |
+ return response
|
|
185 | 205 |
out.flush()
|
186 | 206 |
digest = self.cas.add_object(path=out.name)
|
187 |
- assert digest.hash == client_digest.hash
|
|
207 |
+ if digest.hash != client_digest.hash:
|
|
208 |
+ context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
|
209 |
+ return response
|
|
188 | 210 |
finished = True
|
189 | 211 |
|
190 | 212 |
assert finished
|
... | ... | @@ -247,13 +269,48 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): |
247 | 269 |
return response
|
248 | 270 |
|
249 | 271 |
|
250 |
-def _digest_from_resource_name(resource_name):
|
|
272 |
+def _digest_from_download_resource_name(resource_name):
|
|
273 |
+ parts = resource_name.split('/')
|
|
274 |
+ |
|
275 |
+ # Accept requests from non-conforming BuildStream 1.1.x clients
|
|
276 |
+ if len(parts) == 2:
|
|
277 |
+ parts.insert(0, 'blobs')
|
|
278 |
+ |
|
279 |
+ if len(parts) != 3 or parts[0] != 'blobs':
|
|
280 |
+ return None
|
|
281 |
+ |
|
282 |
+ try:
|
|
283 |
+ digest = remote_execution_pb2.Digest()
|
|
284 |
+ digest.hash = parts[1]
|
|
285 |
+ digest.size_bytes = int(parts[2])
|
|
286 |
+ return digest
|
|
287 |
+ except ValueError:
|
|
288 |
+ return None
|
|
289 |
+ |
|
290 |
+ |
|
291 |
+def _digest_from_upload_resource_name(resource_name):
|
|
251 | 292 |
parts = resource_name.split('/')
|
252 |
- assert len(parts) == 2
|
|
253 |
- digest = remote_execution_pb2.Digest()
|
|
254 |
- digest.hash = parts[0]
|
|
255 |
- digest.size_bytes = int(parts[1])
|
|
256 |
- return digest
|
|
293 |
+ |
|
294 |
+ # Accept requests from non-conforming BuildStream 1.1.x clients
|
|
295 |
+ if len(parts) == 2:
|
|
296 |
+ parts.insert(0, 'uploads')
|
|
297 |
+ parts.insert(1, str(uuid.uuid4()))
|
|
298 |
+ parts.insert(2, 'blobs')
|
|
299 |
+ |
|
300 |
+ if len(parts) < 5 or parts[0] != 'uploads' or parts[2] != 'blobs':
|
|
301 |
+ return None
|
|
302 |
+ |
|
303 |
+ try:
|
|
304 |
+ uuid_ = uuid.UUID(hex=parts[1])
|
|
305 |
+ if uuid_.version != 4:
|
|
306 |
+ return None
|
|
307 |
+ |
|
308 |
+ digest = remote_execution_pb2.Digest()
|
|
309 |
+ digest.hash = parts[3]
|
|
310 |
+ digest.size_bytes = int(parts[4])
|
|
311 |
+ return digest
|
|
312 |
+ except ValueError:
|
|
313 |
+ return None
|
|
257 | 314 |
|
258 | 315 |
|
259 | 316 |
def _has_object(cas, digest):
|
... | ... | @@ -250,7 +250,7 @@ class Job(): |
250 | 250 |
#
|
251 | 251 |
def resume(self, silent=False):
|
252 | 252 |
if self._suspended:
|
253 |
- if not silent:
|
|
253 |
+ if not silent and not self._scheduler.terminated:
|
|
254 | 254 |
self.message(MessageType.STATUS,
|
255 | 255 |
"{} resuming".format(self.action_name))
|
256 | 256 |
|
... | ... | @@ -549,7 +549,7 @@ class Job(): |
549 | 549 |
#
|
550 | 550 |
self._retry_flag = returncode not in (RC_OK, RC_PERM_FAIL)
|
551 | 551 |
|
552 |
- if self._retry_flag and (self._tries <= self._max_retries):
|
|
552 |
+ if self._retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated:
|
|
553 | 553 |
self.spawn()
|
554 | 554 |
return
|
555 | 555 |
|
1 |
+import pytest
|
|
2 |
+import os
|
|
3 |
+import signal
|
|
4 |
+import re
|
|
5 |
+import time
|
|
6 |
+import multiprocessing as mp
|
|
7 |
+from multiprocessing import Process
|
|
8 |
+from multiprocessing.queues import Queue
|
|
9 |
+ |
|
10 |
+from buildstream import _yaml
|
|
11 |
+from tests.testutils import cli
|
|
12 |
+ |
|
13 |
+ |
|
14 |
+DATA_DIR = os.path.join(
|
|
15 |
+ os.path.dirname(os.path.realpath(__file__)),
|
|
16 |
+ "interruptions",
|
|
17 |
+)
|
|
18 |
+ |
|
19 |
+ |
|
20 |
+def cli_run_in_process(cli, path, args):
|
|
21 |
+ def do_run(cli, path, args, queue):
|
|
22 |
+ result = cli.run(project=path, args=args)
|
|
23 |
+ queue.put(result.output)
|
|
24 |
+ queue.put(result.stderr)
|
|
25 |
+ |
|
26 |
+ queue = mp.Queue()
|
|
27 |
+ p = mp.Process(target=do_run, args=[cli, path, args, queue])
|
|
28 |
+ p.start()
|
|
29 |
+ return queue, p
|
|
30 |
+ |
|
31 |
+ |
|
32 |
+@pytest.mark.datafiles(DATA_DIR)
|
|
33 |
+def test_interrupt_fetch(cli, datafiles):
|
|
34 |
+ path = os.path.join(datafiles.dirname, datafiles.basename)
|
|
35 |
+ |
|
36 |
+ queue, proc = cli_run_in_process(cli, path, args=['--on-error', 'terminate',
|
|
37 |
+ 'fetch', 'remote.bst'])
|
|
38 |
+ time.sleep(1)
|
|
39 |
+ os.kill(proc.pid, signal.SIGINT)
|
|
40 |
+ |
|
41 |
+ # 5 second timeout
|
|
42 |
+ try:
|
|
43 |
+ output = queue.get(timeout=3)
|
|
44 |
+ stderr = queue.get(timeout=3)
|
|
45 |
+ except mp.queues.Empty:
|
|
46 |
+ assert False, 'Fetch failed to terminate'
|
|
47 |
+ assert output is not None
|
|
48 |
+ |
|
49 |
+ matches = re.findall('FAILURE Fetch', stderr)
|
|
50 |
+ assert len(matches), "Unexpected success"
|
|
51 |
+ |
|
52 |
+ matches = re.findall(r'STATUS\s*Fetch terminating', stderr)
|
|
53 |
+ assert len(matches) != 0, "Fetch failed to terminate"
|
|
54 |
+ assert len(matches) == 1, "Fetch attempted to terminate more than once"
|
1 |
+kind: import
|
|
2 |
+ |
|
3 |
+sources:
|
|
4 |
+ - kind: git
|
|
5 |
+ track: master
|
|
6 |
+ ref: 03527d5afd967a94f95f2ee8f035236b387dd335
|
|
7 |
+ url: https://gitlab.com/BuildStream/buildstream.git
|
|
\ No newline at end of file |
1 |
+name: interruptions
|
|
2 |
+element-path: elements
|
|
\ No newline at end of file |