Valentin David pushed to branch valentindavid/cache_server_fill_up-1.2 at BuildStream / buildstream
Commits:
-
34e81ae1
by Tiago Gomes at 2018-09-30T06:33:34Z
-
b842658c
by Tiago Gomes at 2018-09-30T06:33:46Z
-
e3ff069e
by Tiago Gomes at 2018-09-30T06:33:49Z
-
9bca9183
by Tiago Gomes at 2018-09-30T06:34:21Z
-
2d025076
by Jürg Billeter at 2018-09-30T06:39:57Z
-
01831afe
by Jürg Billeter at 2018-09-30T06:41:13Z
-
1b7245da
by Jürg Billeter at 2018-09-30T06:41:18Z
-
fd46a9f9
by Jürg Billeter at 2018-09-30T06:41:25Z
-
764b7517
by Jürg Billeter at 2018-10-01T15:42:08Z
-
a009dcbe
by Tristan Van Berkom at 2018-10-02T11:51:19Z
-
7da5104b
by Tristan Van Berkom at 2018-10-02T13:18:54Z
-
6e820362
by Tristan Van Berkom at 2018-10-03T07:35:03Z
-
9568824f
by Jim MacArthur at 2018-10-03T07:35:51Z
-
4a67e4e3
by Jürg Billeter at 2018-10-03T07:35:51Z
-
f585b233
by Jürg Billeter at 2018-10-03T07:35:51Z
-
244e3c7c
by Tristan Van Berkom at 2018-10-03T08:05:47Z
-
3f4587ab
by Tristan Van Berkom at 2018-10-03T09:36:34Z
-
a33fd160
by Tristan Van Berkom at 2018-10-03T10:00:50Z
-
e80f435a
by Tristan Van Berkom at 2018-10-03T12:06:46Z
-
013a8ad4
by Tristan Van Berkom at 2018-10-03T12:37:24Z
-
262e789f
by Tristan Van Berkom at 2018-10-03T13:03:52Z
-
10d69988
by Tristan Van Berkom at 2018-10-03T13:03:56Z
-
c02a1ae8
by Tristan Van Berkom at 2018-10-03T13:04:03Z
-
eb92e8e9
by Tristan Van Berkom at 2018-10-03T13:44:31Z
-
26d48cc9
by Valentin David at 2018-10-04T14:55:13Z
-
96f09d48
by Valentin David at 2018-10-04T15:16:46Z
-
10abe77f
by Tristan Van Berkom at 2018-10-05T07:01:57Z
-
f0fc54d4
by Valentin David at 2018-10-17T15:19:21Z
-
e66e32ae
by Javier Jardón at 2018-10-17T15:43:16Z
-
405d6879
by Valentin David at 2018-10-25T11:55:00Z
-
0a522cad
by Valentin David at 2018-10-25T11:55:00Z
-
5ebd4138
by Valentin David at 2018-10-25T11:55:00Z
-
c4dc49e1
by Valentin David at 2018-10-25T11:55:00Z
-
f0e5ac6f
by Valentin David at 2018-10-25T11:55:00Z
-
e7dad590
by Valentin David at 2018-10-25T11:55:00Z
-
4be0f7dc
by Valentin David at 2018-10-25T11:59:17Z
17 changed files:
- NEWS
- buildstream/_artifactcache/artifactcache.py
- buildstream/_artifactcache/cascache.py
- buildstream/_artifactcache/casserver.py
- buildstream/_scheduler/jobs/job.py
- buildstream/_scheduler/queues/buildqueue.py
- buildstream/_scheduler/queues/queue.py
- buildstream/_scheduler/scheduler.py
- buildstream/plugins/sources/git.py
- buildstream/source.py
- buildstream/utils.py
- doc/examples/autotools/project.conf
- doc/examples/integration-commands/project.conf
- doc/examples/running-commands/project.conf
- tests/frontend/push.py
- tests/integration/project/project.conf
- tests/testutils/artifactshare.py
Changes:
1 |
+=================
|
|
2 |
+buildstream 1.2.3
|
|
3 |
+=================
|
|
4 |
+ |
|
5 |
+ o Fixed an unhandled exception when cleaning up a build sandbox (#153)
|
|
6 |
+ |
|
7 |
+ o Fixed race condition when calculating cache size and commiting artifacts
|
|
8 |
+ |
|
9 |
+ o Fixed regression where terminating with `^C` results in a double user interrogation (#693)
|
|
10 |
+ |
|
11 |
+ o Fixed regression in summary when builds are terminated (#479)
|
|
12 |
+ |
|
13 |
+ o Fixed regression where irrelevant status messages appear from git sources
|
|
14 |
+ |
|
15 |
+ o Improve performance of artifact uploads by batching file transfers (#676/#677)
|
|
16 |
+ |
|
17 |
+ o Fixed performance of artifact downloads by batching file transfers (#554)
|
|
18 |
+ |
|
19 |
+ o Fixed checks for paths which escape the project directory (#673)
|
|
20 |
+ |
|
1 | 21 |
=================
|
2 | 22 |
buildstream 1.2.2
|
3 | 23 |
=================
|
... | ... | @@ -277,7 +277,7 @@ class ArtifactCache(): |
277 | 277 |
"Please increase the cache-quota in {}."
|
278 | 278 |
.format(self.context.config_origin or default_conf))
|
279 | 279 |
|
280 |
- if self.get_quota_exceeded():
|
|
280 |
+ if self.has_quota_exceeded():
|
|
281 | 281 |
raise ArtifactError("Cache too full. Aborting.",
|
282 | 282 |
detail=detail,
|
283 | 283 |
reason="cache-too-full")
|
... | ... | @@ -364,14 +364,14 @@ class ArtifactCache(): |
364 | 364 |
self._cache_size = cache_size
|
365 | 365 |
self._write_cache_size(self._cache_size)
|
366 | 366 |
|
367 |
- # get_quota_exceeded()
|
|
367 |
+ # has_quota_exceeded()
|
|
368 | 368 |
#
|
369 | 369 |
# Checks if the current artifact cache size exceeds the quota.
|
370 | 370 |
#
|
371 | 371 |
# Returns:
|
372 | 372 |
# (bool): True of the quota is exceeded
|
373 | 373 |
#
|
374 |
- def get_quota_exceeded(self):
|
|
374 |
+ def has_quota_exceeded(self):
|
|
375 | 375 |
return self.get_cache_size() > self._cache_quota
|
376 | 376 |
|
377 | 377 |
################################################
|
... | ... | @@ -26,6 +26,7 @@ import stat |
26 | 26 |
import tempfile
|
27 | 27 |
import uuid
|
28 | 28 |
import errno
|
29 |
+import contextlib
|
|
29 | 30 |
from urllib.parse import urlparse
|
30 | 31 |
|
31 | 32 |
import grpc
|
... | ... | @@ -43,6 +44,11 @@ from .._exceptions import ArtifactError |
43 | 44 |
from . import ArtifactCache
|
44 | 45 |
|
45 | 46 |
|
47 |
+# The default limit for gRPC messages is 4 MiB.
|
|
48 |
+# Limit payload to 1 MiB to leave sufficient headroom for metadata.
|
|
49 |
+_MAX_PAYLOAD_BYTES = 1024 * 1024
|
|
50 |
+ |
|
51 |
+ |
|
46 | 52 |
# A CASCache manages artifacts in a CAS repository as specified in the
|
47 | 53 |
# Remote Execution API.
|
48 | 54 |
#
|
... | ... | @@ -76,6 +82,7 @@ class CASCache(ArtifactCache): |
76 | 82 |
################################################
|
77 | 83 |
# Implementation of abstract methods #
|
78 | 84 |
################################################
|
85 |
+ |
|
79 | 86 |
def contains(self, element, key):
|
80 | 87 |
refpath = self._refpath(self.get_artifact_fullname(element, key))
|
81 | 88 |
|
... | ... | @@ -115,7 +122,7 @@ class CASCache(ArtifactCache): |
115 | 122 |
def commit(self, element, content, keys):
|
116 | 123 |
refs = [self.get_artifact_fullname(element, key) for key in keys]
|
117 | 124 |
|
118 |
- tree = self._create_tree(content)
|
|
125 |
+ tree = self._commit_directory(content)
|
|
119 | 126 |
|
120 | 127 |
for ref in refs:
|
121 | 128 |
self.set_ref(ref, tree)
|
... | ... | @@ -151,6 +158,7 @@ class CASCache(ArtifactCache): |
151 | 158 |
q = multiprocessing.Queue()
|
152 | 159 |
for remote_spec in remote_specs:
|
153 | 160 |
# Use subprocess to avoid creation of gRPC threads in main BuildStream process
|
161 |
+ # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
|
|
154 | 162 |
p = multiprocessing.Process(target=self._initialize_remote, args=(remote_spec, q))
|
155 | 163 |
|
156 | 164 |
try:
|
... | ... | @@ -263,109 +271,69 @@ class CASCache(ArtifactCache): |
263 | 271 |
|
264 | 272 |
self.set_ref(newref, tree)
|
265 | 273 |
|
274 |
+ def _push_refs_to_remote(self, refs, remote):
|
|
275 |
+ skipped_remote = True
|
|
276 |
+ try:
|
|
277 |
+ for ref in refs:
|
|
278 |
+ tree = self.resolve_ref(ref)
|
|
279 |
+ |
|
280 |
+ # Check whether ref is already on the server in which case
|
|
281 |
+ # there is no need to push the artifact
|
|
282 |
+ try:
|
|
283 |
+ request = buildstream_pb2.GetReferenceRequest()
|
|
284 |
+ request.key = ref
|
|
285 |
+ response = remote.ref_storage.GetReference(request)
|
|
286 |
+ |
|
287 |
+ if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
|
|
288 |
+ # ref is already on the server with the same tree
|
|
289 |
+ continue
|
|
290 |
+ |
|
291 |
+ except grpc.RpcError as e:
|
|
292 |
+ if e.code() != grpc.StatusCode.NOT_FOUND:
|
|
293 |
+ # Intentionally re-raise RpcError for outer except block.
|
|
294 |
+ raise
|
|
295 |
+ |
|
296 |
+ self._send_directory(remote, tree)
|
|
297 |
+ |
|
298 |
+ request = buildstream_pb2.UpdateReferenceRequest()
|
|
299 |
+ request.keys.append(ref)
|
|
300 |
+ request.digest.hash = tree.hash
|
|
301 |
+ request.digest.size_bytes = tree.size_bytes
|
|
302 |
+ remote.ref_storage.UpdateReference(request)
|
|
303 |
+ |
|
304 |
+ skipped_remote = False
|
|
305 |
+ except grpc.RpcError as e:
|
|
306 |
+ if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
|
|
307 |
+ raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
|
|
308 |
+ |
|
309 |
+ return not skipped_remote
|
|
310 |
+ |
|
266 | 311 |
def push(self, element, keys):
|
267 |
- refs = [self.get_artifact_fullname(element, key) for key in keys]
|
|
312 |
+ |
|
313 |
+ refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
|
|
268 | 314 |
|
269 | 315 |
project = element._get_project()
|
270 | 316 |
|
271 | 317 |
push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
272 | 318 |
|
273 | 319 |
pushed = False
|
274 |
- display_key = element._get_brief_display_key()
|
|
320 |
+ |
|
275 | 321 |
for remote in push_remotes:
|
276 | 322 |
remote.init()
|
277 |
- skipped_remote = True
|
|
323 |
+ display_key = element._get_brief_display_key()
|
|
278 | 324 |
element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
|
279 | 325 |
|
280 |
- try:
|
|
281 |
- for ref in refs:
|
|
282 |
- tree = self.resolve_ref(ref)
|
|
283 |
- |
|
284 |
- # Check whether ref is already on the server in which case
|
|
285 |
- # there is no need to push the artifact
|
|
286 |
- try:
|
|
287 |
- request = buildstream_pb2.GetReferenceRequest()
|
|
288 |
- request.key = ref
|
|
289 |
- response = remote.ref_storage.GetReference(request)
|
|
290 |
- |
|
291 |
- if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
|
|
292 |
- # ref is already on the server with the same tree
|
|
293 |
- continue
|
|
294 |
- |
|
295 |
- except grpc.RpcError as e:
|
|
296 |
- if e.code() != grpc.StatusCode.NOT_FOUND:
|
|
297 |
- # Intentionally re-raise RpcError for outer except block.
|
|
298 |
- raise
|
|
299 |
- |
|
300 |
- missing_blobs = {}
|
|
301 |
- required_blobs = self._required_blobs(tree)
|
|
302 |
- |
|
303 |
- # Limit size of FindMissingBlobs request
|
|
304 |
- for required_blobs_group in _grouper(required_blobs, 512):
|
|
305 |
- request = remote_execution_pb2.FindMissingBlobsRequest()
|
|
306 |
- |
|
307 |
- for required_digest in required_blobs_group:
|
|
308 |
- d = request.blob_digests.add()
|
|
309 |
- d.hash = required_digest.hash
|
|
310 |
- d.size_bytes = required_digest.size_bytes
|
|
311 |
- |
|
312 |
- response = remote.cas.FindMissingBlobs(request)
|
|
313 |
- for digest in response.missing_blob_digests:
|
|
314 |
- d = remote_execution_pb2.Digest()
|
|
315 |
- d.hash = digest.hash
|
|
316 |
- d.size_bytes = digest.size_bytes
|
|
317 |
- missing_blobs[d.hash] = d
|
|
318 |
- |
|
319 |
- # Upload any blobs missing on the server
|
|
320 |
- skipped_remote = False
|
|
321 |
- for digest in missing_blobs.values():
|
|
322 |
- uuid_ = uuid.uuid4()
|
|
323 |
- resource_name = '/'.join(['uploads', str(uuid_), 'blobs',
|
|
324 |
- digest.hash, str(digest.size_bytes)])
|
|
325 |
- |
|
326 |
- def request_stream(resname):
|
|
327 |
- with open(self.objpath(digest), 'rb') as f:
|
|
328 |
- assert os.fstat(f.fileno()).st_size == digest.size_bytes
|
|
329 |
- offset = 0
|
|
330 |
- finished = False
|
|
331 |
- remaining = digest.size_bytes
|
|
332 |
- while not finished:
|
|
333 |
- chunk_size = min(remaining, 64 * 1024)
|
|
334 |
- remaining -= chunk_size
|
|
335 |
- |
|
336 |
- request = bytestream_pb2.WriteRequest()
|
|
337 |
- request.write_offset = offset
|
|
338 |
- # max. 64 kB chunks
|
|
339 |
- request.data = f.read(chunk_size)
|
|
340 |
- request.resource_name = resname
|
|
341 |
- request.finish_write = remaining <= 0
|
|
342 |
- yield request
|
|
343 |
- offset += chunk_size
|
|
344 |
- finished = request.finish_write
|
|
345 |
- response = remote.bytestream.Write(request_stream(resource_name))
|
|
346 |
- |
|
347 |
- request = buildstream_pb2.UpdateReferenceRequest()
|
|
348 |
- request.keys.append(ref)
|
|
349 |
- request.digest.hash = tree.hash
|
|
350 |
- request.digest.size_bytes = tree.size_bytes
|
|
351 |
- remote.ref_storage.UpdateReference(request)
|
|
352 |
- |
|
353 |
- pushed = True
|
|
354 |
- |
|
355 |
- if not skipped_remote:
|
|
356 |
- element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
|
|
357 |
- |
|
358 |
- except grpc.RpcError as e:
|
|
359 |
- if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
|
|
360 |
- raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
|
|
361 |
- |
|
362 |
- if skipped_remote:
|
|
326 |
+ if self._push_refs_to_remote(refs, remote):
|
|
327 |
+ element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
|
|
328 |
+ pushed = True
|
|
329 |
+ else:
|
|
363 | 330 |
self.context.message(Message(
|
364 | 331 |
None,
|
365 | 332 |
MessageType.INFO,
|
366 | 333 |
"Remote ({}) already has {} cached".format(
|
367 | 334 |
remote.spec.url, element._get_brief_display_key())
|
368 | 335 |
))
|
336 |
+ |
|
369 | 337 |
return pushed
|
370 | 338 |
|
371 | 339 |
################################################
|
... | ... | @@ -393,13 +361,14 @@ class CASCache(ArtifactCache): |
393 | 361 |
# digest (Digest): An optional Digest object to populate
|
394 | 362 |
# path (str): Path to file to add
|
395 | 363 |
# buffer (bytes): Byte buffer to add
|
364 |
+ # link_directly (bool): Whether file given by path can be linked
|
|
396 | 365 |
#
|
397 | 366 |
# Returns:
|
398 | 367 |
# (Digest): The digest of the added object
|
399 | 368 |
#
|
400 | 369 |
# Either `path` or `buffer` must be passed, but not both.
|
401 | 370 |
#
|
402 |
- def add_object(self, *, digest=None, path=None, buffer=None):
|
|
371 |
+ def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False):
|
|
403 | 372 |
# Exactly one of the two parameters has to be specified
|
404 | 373 |
assert (path is None) != (buffer is None)
|
405 | 374 |
|
... | ... | @@ -409,28 +378,34 @@ class CASCache(ArtifactCache): |
409 | 378 |
try:
|
410 | 379 |
h = hashlib.sha256()
|
411 | 380 |
# Always write out new file to avoid corruption if input file is modified
|
412 |
- with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
|
|
413 |
- # Set mode bits to 0644
|
|
414 |
- os.chmod(out.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
|
|
415 |
- |
|
416 |
- if path:
|
|
417 |
- with open(path, 'rb') as f:
|
|
418 |
- for chunk in iter(lambda: f.read(4096), b""):
|
|
419 |
- h.update(chunk)
|
|
420 |
- out.write(chunk)
|
|
381 |
+ with contextlib.ExitStack() as stack:
|
|
382 |
+ if path is not None and link_directly:
|
|
383 |
+ tmp = stack.enter_context(open(path, 'rb'))
|
|
384 |
+ for chunk in iter(lambda: tmp.read(4096), b""):
|
|
385 |
+ h.update(chunk)
|
|
421 | 386 |
else:
|
422 |
- h.update(buffer)
|
|
423 |
- out.write(buffer)
|
|
387 |
+ tmp = stack.enter_context(tempfile.NamedTemporaryFile(dir=self.tmpdir))
|
|
388 |
+ # Set mode bits to 0644
|
|
389 |
+ os.chmod(tmp.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
|
|
390 |
+ |
|
391 |
+ if path:
|
|
392 |
+ with open(path, 'rb') as f:
|
|
393 |
+ for chunk in iter(lambda: f.read(4096), b""):
|
|
394 |
+ h.update(chunk)
|
|
395 |
+ tmp.write(chunk)
|
|
396 |
+ else:
|
|
397 |
+ h.update(buffer)
|
|
398 |
+ tmp.write(buffer)
|
|
424 | 399 |
|
425 |
- out.flush()
|
|
400 |
+ tmp.flush()
|
|
426 | 401 |
|
427 | 402 |
digest.hash = h.hexdigest()
|
428 |
- digest.size_bytes = os.fstat(out.fileno()).st_size
|
|
403 |
+ digest.size_bytes = os.fstat(tmp.fileno()).st_size
|
|
429 | 404 |
|
430 | 405 |
# Place file at final location
|
431 | 406 |
objpath = self.objpath(digest)
|
432 | 407 |
os.makedirs(os.path.dirname(objpath), exist_ok=True)
|
433 |
- os.link(out.name, objpath)
|
|
408 |
+ os.link(tmp.name, objpath)
|
|
434 | 409 |
|
435 | 410 |
except FileExistsError as e:
|
436 | 411 |
# We can ignore the failed link() if the object is already in the repo.
|
... | ... | @@ -451,7 +426,7 @@ class CASCache(ArtifactCache): |
451 | 426 |
def set_ref(self, ref, tree):
|
452 | 427 |
refpath = self._refpath(ref)
|
453 | 428 |
os.makedirs(os.path.dirname(refpath), exist_ok=True)
|
454 |
- with utils.save_file_atomic(refpath, 'wb') as f:
|
|
429 |
+ with utils.save_file_atomic(refpath, 'wb', tempdir=self.tmpdir) as f:
|
|
455 | 430 |
f.write(tree.SerializeToString())
|
456 | 431 |
|
457 | 432 |
# resolve_ref():
|
... | ... | @@ -565,7 +540,12 @@ class CASCache(ArtifactCache): |
565 | 540 |
#
|
566 | 541 |
# Prune unreachable objects from the repo.
|
567 | 542 |
#
|
568 |
- def prune(self):
|
|
543 |
+ # Args:
|
|
544 |
+ # keep_after (int|None): timestamp after which unreachable objects
|
|
545 |
+ # are kept. None if no unreachable object
|
|
546 |
+ # should be kept.
|
|
547 |
+ #
|
|
548 |
+ def prune(self, keep_after=None):
|
|
569 | 549 |
ref_heads = os.path.join(self.casdir, 'refs', 'heads')
|
570 | 550 |
|
571 | 551 |
pruned = 0
|
... | ... | @@ -586,6 +566,10 @@ class CASCache(ArtifactCache): |
586 | 566 |
objhash = os.path.basename(root) + filename
|
587 | 567 |
if objhash not in reachable:
|
588 | 568 |
obj_path = os.path.join(root, filename)
|
569 |
+ if keep_after:
|
|
570 |
+ st = os.stat(obj_path)
|
|
571 |
+ if st.st_mtime >= keep_after:
|
|
572 |
+ continue
|
|
589 | 573 |
pruned += os.stat(obj_path).st_size
|
590 | 574 |
os.unlink(obj_path)
|
591 | 575 |
|
... | ... | @@ -594,6 +578,7 @@ class CASCache(ArtifactCache): |
594 | 578 |
################################################
|
595 | 579 |
# Local Private Methods #
|
596 | 580 |
################################################
|
581 |
+ |
|
597 | 582 |
def _checkout(self, dest, tree):
|
598 | 583 |
os.makedirs(dest, exist_ok=True)
|
599 | 584 |
|
... | ... | @@ -623,7 +608,21 @@ class CASCache(ArtifactCache): |
623 | 608 |
def _refpath(self, ref):
|
624 | 609 |
return os.path.join(self.casdir, 'refs', 'heads', ref)
|
625 | 610 |
|
626 |
- def _create_tree(self, path, *, digest=None):
|
|
611 |
+ # _commit_directory():
|
|
612 |
+ #
|
|
613 |
+ # Adds local directory to content addressable store.
|
|
614 |
+ #
|
|
615 |
+ # Adds files, symbolic links and recursively other directories in
|
|
616 |
+ # a local directory to the content addressable store.
|
|
617 |
+ #
|
|
618 |
+ # Args:
|
|
619 |
+ # path (str): Path to the directory to add.
|
|
620 |
+ # dir_digest (Digest): An optional Digest object to use.
|
|
621 |
+ #
|
|
622 |
+ # Returns:
|
|
623 |
+ # (Digest): Digest object for the directory added.
|
|
624 |
+ #
|
|
625 |
+ def _commit_directory(self, path, *, dir_digest=None):
|
|
627 | 626 |
directory = remote_execution_pb2.Directory()
|
628 | 627 |
|
629 | 628 |
for name in sorted(os.listdir(path)):
|
... | ... | @@ -632,7 +631,7 @@ class CASCache(ArtifactCache): |
632 | 631 |
if stat.S_ISDIR(mode):
|
633 | 632 |
dirnode = directory.directories.add()
|
634 | 633 |
dirnode.name = name
|
635 |
- self._create_tree(full_path, digest=dirnode.digest)
|
|
634 |
+ self._commit_directory(full_path, dir_digest=dirnode.digest)
|
|
636 | 635 |
elif stat.S_ISREG(mode):
|
637 | 636 |
filenode = directory.files.add()
|
638 | 637 |
filenode.name = name
|
... | ... | @@ -645,7 +644,8 @@ class CASCache(ArtifactCache): |
645 | 644 |
else:
|
646 | 645 |
raise ArtifactError("Unsupported file type for {}".format(full_path))
|
647 | 646 |
|
648 |
- return self.add_object(digest=digest, buffer=directory.SerializeToString())
|
|
647 |
+ return self.add_object(digest=dir_digest,
|
|
648 |
+ buffer=directory.SerializeToString())
|
|
649 | 649 |
|
650 | 650 |
def _get_subdir(self, tree, subdir):
|
651 | 651 |
head, name = os.path.split(subdir)
|
... | ... | @@ -756,16 +756,16 @@ class CASCache(ArtifactCache): |
756 | 756 |
#
|
757 | 757 |
q.put(str(e))
|
758 | 758 |
|
759 |
- def _required_blobs(self, tree):
|
|
759 |
+ def _required_blobs(self, directory_digest):
|
|
760 | 760 |
# parse directory, and recursively add blobs
|
761 | 761 |
d = remote_execution_pb2.Digest()
|
762 |
- d.hash = tree.hash
|
|
763 |
- d.size_bytes = tree.size_bytes
|
|
762 |
+ d.hash = directory_digest.hash
|
|
763 |
+ d.size_bytes = directory_digest.size_bytes
|
|
764 | 764 |
yield d
|
765 | 765 |
|
766 | 766 |
directory = remote_execution_pb2.Directory()
|
767 | 767 |
|
768 |
- with open(self.objpath(tree), 'rb') as f:
|
|
768 |
+ with open(self.objpath(directory_digest), 'rb') as f:
|
|
769 | 769 |
directory.ParseFromString(f.read())
|
770 | 770 |
|
771 | 771 |
for filenode in directory.files:
|
... | ... | @@ -777,50 +777,203 @@ class CASCache(ArtifactCache): |
777 | 777 |
for dirnode in directory.directories:
|
778 | 778 |
yield from self._required_blobs(dirnode.digest)
|
779 | 779 |
|
780 |
- def _fetch_blob(self, remote, digest, out):
|
|
780 |
+ def _fetch_blob(self, remote, digest, stream):
|
|
781 | 781 |
resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
|
782 | 782 |
request = bytestream_pb2.ReadRequest()
|
783 | 783 |
request.resource_name = resource_name
|
784 | 784 |
request.read_offset = 0
|
785 | 785 |
for response in remote.bytestream.Read(request):
|
786 |
- out.write(response.data)
|
|
786 |
+ stream.write(response.data)
|
|
787 |
+ stream.flush()
|
|
787 | 788 |
|
788 |
- out.flush()
|
|
789 |
- assert digest.size_bytes == os.fstat(out.fileno()).st_size
|
|
789 |
+ assert digest.size_bytes == os.fstat(stream.fileno()).st_size
|
|
790 | 790 |
|
791 |
- def _fetch_directory(self, remote, tree):
|
|
792 |
- objpath = self.objpath(tree)
|
|
791 |
+ # _ensure_blob():
|
|
792 |
+ #
|
|
793 |
+ # Fetch and add blob if it's not already local.
|
|
794 |
+ #
|
|
795 |
+ # Args:
|
|
796 |
+ # remote (Remote): The remote to use.
|
|
797 |
+ # digest (Digest): Digest object for the blob to fetch.
|
|
798 |
+ #
|
|
799 |
+ # Returns:
|
|
800 |
+ # (str): The path of the object
|
|
801 |
+ #
|
|
802 |
+ def _ensure_blob(self, remote, digest):
|
|
803 |
+ objpath = self.objpath(digest)
|
|
793 | 804 |
if os.path.exists(objpath):
|
794 |
- # already in local cache
|
|
795 |
- return
|
|
805 |
+ # already in local repository
|
|
806 |
+ return objpath
|
|
796 | 807 |
|
797 |
- with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
|
|
798 |
- self._fetch_blob(remote, tree, out)
|
|
808 |
+ with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
|
|
809 |
+ self._fetch_blob(remote, digest, f)
|
|
799 | 810 |
|
800 |
- directory = remote_execution_pb2.Directory()
|
|
811 |
+ added_digest = self.add_object(path=f.name)
|
|
812 |
+ assert added_digest.hash == digest.hash
|
|
801 | 813 |
|
802 |
- with open(out.name, 'rb') as f:
|
|
803 |
- directory.ParseFromString(f.read())
|
|
814 |
+ return objpath
|
|
804 | 815 |
|
805 |
- for filenode in directory.files:
|
|
806 |
- fileobjpath = self.objpath(tree)
|
|
807 |
- if os.path.exists(fileobjpath):
|
|
808 |
- # already in local cache
|
|
809 |
- continue
|
|
816 |
+ def _batch_download_complete(self, batch):
|
|
817 |
+ for digest, data in batch.send():
|
|
818 |
+ with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
|
|
819 |
+ f.write(data)
|
|
820 |
+ f.flush()
|
|
821 |
+ |
|
822 |
+ added_digest = self.add_object(path=f.name)
|
|
823 |
+ assert added_digest.hash == digest.hash
|
|
810 | 824 |
|
811 |
- with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
|
|
812 |
- self._fetch_blob(remote, filenode.digest, f)
|
|
825 |
+ # Helper function for _fetch_directory().
|
|
826 |
+ def _fetch_directory_batch(self, remote, batch, fetch_queue, fetch_next_queue):
|
|
827 |
+ self._batch_download_complete(batch)
|
|
813 | 828 |
|
814 |
- digest = self.add_object(path=f.name)
|
|
815 |
- assert digest.hash == filenode.digest.hash
|
|
829 |
+ # All previously scheduled directories are now locally available,
|
|
830 |
+ # move them to the processing queue.
|
|
831 |
+ fetch_queue.extend(fetch_next_queue)
|
|
832 |
+ fetch_next_queue.clear()
|
|
833 |
+ return _CASBatchRead(remote)
|
|
834 |
+ |
|
835 |
+ # Helper function for _fetch_directory().
|
|
836 |
+ def _fetch_directory_node(self, remote, digest, batch, fetch_queue, fetch_next_queue, *, recursive=False):
|
|
837 |
+ in_local_cache = os.path.exists(self.objpath(digest))
|
|
838 |
+ |
|
839 |
+ if in_local_cache:
|
|
840 |
+ # Skip download, already in local cache.
|
|
841 |
+ pass
|
|
842 |
+ elif (digest.size_bytes >= remote.max_batch_total_size_bytes or
|
|
843 |
+ not remote.batch_read_supported):
|
|
844 |
+ # Too large for batch request, download in independent request.
|
|
845 |
+ self._ensure_blob(remote, digest)
|
|
846 |
+ in_local_cache = True
|
|
847 |
+ else:
|
|
848 |
+ if not batch.add(digest):
|
|
849 |
+ # Not enough space left in batch request.
|
|
850 |
+ # Complete pending batch first.
|
|
851 |
+ batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
|
|
852 |
+ batch.add(digest)
|
|
853 |
+ |
|
854 |
+ if recursive:
|
|
855 |
+ if in_local_cache:
|
|
856 |
+ # Add directory to processing queue.
|
|
857 |
+ fetch_queue.append(digest)
|
|
858 |
+ else:
|
|
859 |
+ # Directory will be available after completing pending batch.
|
|
860 |
+ # Add directory to deferred processing queue.
|
|
861 |
+ fetch_next_queue.append(digest)
|
|
862 |
+ |
|
863 |
+ return batch
|
|
864 |
+ |
|
865 |
+ # _fetch_directory():
|
|
866 |
+ #
|
|
867 |
+ # Fetches remote directory and adds it to content addressable store.
|
|
868 |
+ #
|
|
869 |
+ # Fetches files, symbolic links and recursively other directories in
|
|
870 |
+ # the remote directory and adds them to the content addressable
|
|
871 |
+ # store.
|
|
872 |
+ #
|
|
873 |
+ # Args:
|
|
874 |
+ # remote (Remote): The remote to use.
|
|
875 |
+ # dir_digest (Digest): Digest object for the directory to fetch.
|
|
876 |
+ #
|
|
877 |
+ def _fetch_directory(self, remote, dir_digest):
|
|
878 |
+ fetch_queue = [dir_digest]
|
|
879 |
+ fetch_next_queue = []
|
|
880 |
+ batch = _CASBatchRead(remote)
|
|
881 |
+ |
|
882 |
+ while len(fetch_queue) + len(fetch_next_queue) > 0:
|
|
883 |
+ if len(fetch_queue) == 0:
|
|
884 |
+ batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
|
|
885 |
+ |
|
886 |
+ dir_digest = fetch_queue.pop(0)
|
|
887 |
+ |
|
888 |
+ objpath = self._ensure_blob(remote, dir_digest)
|
|
889 |
+ |
|
890 |
+ directory = remote_execution_pb2.Directory()
|
|
891 |
+ with open(objpath, 'rb') as f:
|
|
892 |
+ directory.ParseFromString(f.read())
|
|
816 | 893 |
|
817 | 894 |
for dirnode in directory.directories:
|
818 |
- self._fetch_directory(remote, dirnode.digest)
|
|
895 |
+ batch = self._fetch_directory_node(remote, dirnode.digest, batch,
|
|
896 |
+ fetch_queue, fetch_next_queue, recursive=True)
|
|
819 | 897 |
|
820 |
- # place directory blob only in final location when we've downloaded
|
|
821 |
- # all referenced blobs to avoid dangling references in the repository
|
|
822 |
- digest = self.add_object(path=out.name)
|
|
823 |
- assert digest.hash == tree.hash
|
|
898 |
+ for filenode in directory.files:
|
|
899 |
+ batch = self._fetch_directory_node(remote, filenode.digest, batch,
|
|
900 |
+ fetch_queue, fetch_next_queue)
|
|
901 |
+ |
|
902 |
+ # Fetch final batch
|
|
903 |
+ self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
|
|
904 |
+ |
|
905 |
+ def _send_blob(self, remote, digest, stream, u_uid=uuid.uuid4()):
|
|
906 |
+ resource_name = '/'.join(['uploads', str(u_uid), 'blobs',
|
|
907 |
+ digest.hash, str(digest.size_bytes)])
|
|
908 |
+ |
|
909 |
+ def request_stream(resname, instream):
|
|
910 |
+ offset = 0
|
|
911 |
+ finished = False
|
|
912 |
+ remaining = digest.size_bytes
|
|
913 |
+ while not finished:
|
|
914 |
+ chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
|
|
915 |
+ remaining -= chunk_size
|
|
916 |
+ |
|
917 |
+ request = bytestream_pb2.WriteRequest()
|
|
918 |
+ request.write_offset = offset
|
|
919 |
+ # max. _MAX_PAYLOAD_BYTES chunks
|
|
920 |
+ request.data = instream.read(chunk_size)
|
|
921 |
+ request.resource_name = resname
|
|
922 |
+ request.finish_write = remaining <= 0
|
|
923 |
+ |
|
924 |
+ yield request
|
|
925 |
+ |
|
926 |
+ offset += chunk_size
|
|
927 |
+ finished = request.finish_write
|
|
928 |
+ |
|
929 |
+ response = remote.bytestream.Write(request_stream(resource_name, stream))
|
|
930 |
+ |
|
931 |
+ assert response.committed_size == digest.size_bytes
|
|
932 |
+ |
|
933 |
+ def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
|
|
934 |
+ required_blobs = self._required_blobs(digest)
|
|
935 |
+ |
|
936 |
+ missing_blobs = dict()
|
|
937 |
+ # Limit size of FindMissingBlobs request
|
|
938 |
+ for required_blobs_group in _grouper(required_blobs, 512):
|
|
939 |
+ request = remote_execution_pb2.FindMissingBlobsRequest()
|
|
940 |
+ |
|
941 |
+ for required_digest in required_blobs_group:
|
|
942 |
+ d = request.blob_digests.add()
|
|
943 |
+ d.hash = required_digest.hash
|
|
944 |
+ d.size_bytes = required_digest.size_bytes
|
|
945 |
+ |
|
946 |
+ response = remote.cas.FindMissingBlobs(request)
|
|
947 |
+ for missing_digest in response.missing_blob_digests:
|
|
948 |
+ d = remote_execution_pb2.Digest()
|
|
949 |
+ d.hash = missing_digest.hash
|
|
950 |
+ d.size_bytes = missing_digest.size_bytes
|
|
951 |
+ missing_blobs[d.hash] = d
|
|
952 |
+ |
|
953 |
+ # Upload any blobs missing on the server
|
|
954 |
+ self._send_blobs(remote, missing_blobs.values(), u_uid)
|
|
955 |
+ |
|
956 |
+ def _send_blobs(self, remote, digests, u_uid=uuid.uuid4()):
|
|
957 |
+ batch = _CASBatchUpdate(remote)
|
|
958 |
+ |
|
959 |
+ for digest in digests:
|
|
960 |
+ with open(self.objpath(digest), 'rb') as f:
|
|
961 |
+ assert os.fstat(f.fileno()).st_size == digest.size_bytes
|
|
962 |
+ |
|
963 |
+ if (digest.size_bytes >= remote.max_batch_total_size_bytes or
|
|
964 |
+ not remote.batch_update_supported):
|
|
965 |
+ # Too large for batch request, upload in independent request.
|
|
966 |
+ self._send_blob(remote, digest, f, u_uid=u_uid)
|
|
967 |
+ else:
|
|
968 |
+ if not batch.add(digest, f):
|
|
969 |
+ # Not enough space left in batch request.
|
|
970 |
+ # Complete pending batch first.
|
|
971 |
+ batch.send()
|
|
972 |
+ batch = _CASBatchUpdate(remote)
|
|
973 |
+ batch.add(digest, f)
|
|
974 |
+ |
|
975 |
+ # Send final batch
|
|
976 |
+ batch.send()
|
|
824 | 977 |
|
825 | 978 |
|
826 | 979 |
# Represents a single remote CAS cache.
|
... | ... | @@ -870,11 +1023,129 @@ class _CASRemote(): |
870 | 1023 |
|
871 | 1024 |
self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel)
|
872 | 1025 |
self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
|
1026 |
+ self.capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self.channel)
|
|
873 | 1027 |
self.ref_storage = buildstream_pb2_grpc.ReferenceStorageStub(self.channel)
|
874 | 1028 |
|
1029 |
+ self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
|
|
1030 |
+ try:
|
|
1031 |
+ request = remote_execution_pb2.GetCapabilitiesRequest()
|
|
1032 |
+ response = self.capabilities.GetCapabilities(request)
|
|
1033 |
+ server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes
|
|
1034 |
+ if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes:
|
|
1035 |
+ self.max_batch_total_size_bytes = server_max_batch_total_size_bytes
|
|
1036 |
+ except grpc.RpcError as e:
|
|
1037 |
+ # Simply use the defaults for servers that don't implement GetCapabilities()
|
|
1038 |
+ if e.code() != grpc.StatusCode.UNIMPLEMENTED:
|
|
1039 |
+ raise
|
|
1040 |
+ |
|
1041 |
+ # Check whether the server supports BatchReadBlobs()
|
|
1042 |
+ self.batch_read_supported = False
|
|
1043 |
+ try:
|
|
1044 |
+ request = remote_execution_pb2.BatchReadBlobsRequest()
|
|
1045 |
+ response = self.cas.BatchReadBlobs(request)
|
|
1046 |
+ self.batch_read_supported = True
|
|
1047 |
+ except grpc.RpcError as e:
|
|
1048 |
+ if e.code() != grpc.StatusCode.UNIMPLEMENTED:
|
|
1049 |
+ raise
|
|
1050 |
+ |
|
1051 |
+ # Check whether the server supports BatchUpdateBlobs()
|
|
1052 |
+ self.batch_update_supported = False
|
|
1053 |
+ try:
|
|
1054 |
+ request = remote_execution_pb2.BatchUpdateBlobsRequest()
|
|
1055 |
+ response = self.cas.BatchUpdateBlobs(request)
|
|
1056 |
+ self.batch_update_supported = True
|
|
1057 |
+ except grpc.RpcError as e:
|
|
1058 |
+ if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
|
|
1059 |
+ e.code() != grpc.StatusCode.PERMISSION_DENIED):
|
|
1060 |
+ raise
|
|
1061 |
+ |
|
875 | 1062 |
self._initialized = True
|
876 | 1063 |
|
877 | 1064 |
|
1065 |
+# Represents a batch of blobs queued for fetching.
|
|
1066 |
+#
|
|
1067 |
+class _CASBatchRead():
|
|
1068 |
+ def __init__(self, remote):
|
|
1069 |
+ self._remote = remote
|
|
1070 |
+ self._max_total_size_bytes = remote.max_batch_total_size_bytes
|
|
1071 |
+ self._request = remote_execution_pb2.BatchReadBlobsRequest()
|
|
1072 |
+ self._size = 0
|
|
1073 |
+ self._sent = False
|
|
1074 |
+ |
|
1075 |
+ def add(self, digest):
|
|
1076 |
+ assert not self._sent
|
|
1077 |
+ |
|
1078 |
+ new_batch_size = self._size + digest.size_bytes
|
|
1079 |
+ if new_batch_size > self._max_total_size_bytes:
|
|
1080 |
+ # Not enough space left in current batch
|
|
1081 |
+ return False
|
|
1082 |
+ |
|
1083 |
+ request_digest = self._request.digests.add()
|
|
1084 |
+ request_digest.hash = digest.hash
|
|
1085 |
+ request_digest.size_bytes = digest.size_bytes
|
|
1086 |
+ self._size = new_batch_size
|
|
1087 |
+ return True
|
|
1088 |
+ |
|
1089 |
+ def send(self):
|
|
1090 |
+ assert not self._sent
|
|
1091 |
+ self._sent = True
|
|
1092 |
+ |
|
1093 |
+ if len(self._request.digests) == 0:
|
|
1094 |
+ return
|
|
1095 |
+ |
|
1096 |
+ batch_response = self._remote.cas.BatchReadBlobs(self._request)
|
|
1097 |
+ |
|
1098 |
+ for response in batch_response.responses:
|
|
1099 |
+ if response.status.code != grpc.StatusCode.OK.value[0]:
|
|
1100 |
+ raise ArtifactError("Failed to download blob {}: {}".format(
|
|
1101 |
+ response.digest.hash, response.status.code))
|
|
1102 |
+ if response.digest.size_bytes != len(response.data):
|
|
1103 |
+ raise ArtifactError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
|
|
1104 |
+ response.digest.hash, response.digest.size_bytes, len(response.data)))
|
|
1105 |
+ |
|
1106 |
+ yield (response.digest, response.data)
|
|
1107 |
+ |
|
1108 |
+ |
|
1109 |
+# Represents a batch of blobs queued for upload.
|
|
1110 |
+#
|
|
1111 |
+class _CASBatchUpdate():
|
|
1112 |
+ def __init__(self, remote):
|
|
1113 |
+ self._remote = remote
|
|
1114 |
+ self._max_total_size_bytes = remote.max_batch_total_size_bytes
|
|
1115 |
+ self._request = remote_execution_pb2.BatchUpdateBlobsRequest()
|
|
1116 |
+ self._size = 0
|
|
1117 |
+ self._sent = False
|
|
1118 |
+ |
|
1119 |
+ def add(self, digest, stream):
|
|
1120 |
+ assert not self._sent
|
|
1121 |
+ |
|
1122 |
+ new_batch_size = self._size + digest.size_bytes
|
|
1123 |
+ if new_batch_size > self._max_total_size_bytes:
|
|
1124 |
+ # Not enough space left in current batch
|
|
1125 |
+ return False
|
|
1126 |
+ |
|
1127 |
+ blob_request = self._request.requests.add()
|
|
1128 |
+ blob_request.digest.hash = digest.hash
|
|
1129 |
+ blob_request.digest.size_bytes = digest.size_bytes
|
|
1130 |
+ blob_request.data = stream.read(digest.size_bytes)
|
|
1131 |
+ self._size = new_batch_size
|
|
1132 |
+ return True
|
|
1133 |
+ |
|
1134 |
+ def send(self):
|
|
1135 |
+ assert not self._sent
|
|
1136 |
+ self._sent = True
|
|
1137 |
+ |
|
1138 |
+ if len(self._request.requests) == 0:
|
|
1139 |
+ return
|
|
1140 |
+ |
|
1141 |
+ batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
|
|
1142 |
+ |
|
1143 |
+ for response in batch_response.responses:
|
|
1144 |
+ if response.status.code != grpc.StatusCode.OK.value[0]:
|
|
1145 |
+ raise ArtifactError("Failed to upload blob {}: {}".format(
|
|
1146 |
+ response.digest.hash, response.status.code))
|
|
1147 |
+ |
|
1148 |
+ |
|
878 | 1149 |
def _grouper(iterable, n):
|
879 | 1150 |
while True:
|
880 | 1151 |
try:
|
... | ... | @@ -24,6 +24,10 @@ import signal |
24 | 24 |
import sys
|
25 | 25 |
import tempfile
|
26 | 26 |
import uuid
|
27 |
+import time
|
|
28 |
+import errno
|
|
29 |
+import ctypes
|
|
30 |
+import faulthandler
|
|
27 | 31 |
|
28 | 32 |
import click
|
29 | 33 |
import grpc
|
... | ... | @@ -38,8 +42,13 @@ from .._context import Context |
38 | 42 |
from .cascache import CASCache
|
39 | 43 |
|
40 | 44 |
|
41 |
-# The default limit for gRPC messages is 4 MiB
|
|
42 |
-_MAX_BATCH_TOTAL_SIZE_BYTES = 4 * 1024 * 1024
|
|
45 |
+# The default limit for gRPC messages is 4 MiB.
|
|
46 |
+# Limit payload to 1 MiB to leave sufficient headroom for metadata.
|
|
47 |
+_MAX_PAYLOAD_BYTES = 1024 * 1024
|
|
48 |
+ |
|
49 |
+# The minimum age in seconds for objects before they can be cleaned
|
|
50 |
+# up.
|
|
51 |
+_OBJECT_MIN_AGE = 6 * 60 * 60
|
|
43 | 52 |
|
44 | 53 |
|
45 | 54 |
# Trying to push an artifact that is too large
|
... | ... | @@ -69,7 +78,7 @@ def create_server(repo, *, enable_push): |
69 | 78 |
_ByteStreamServicer(artifactcache, enable_push=enable_push), server)
|
70 | 79 |
|
71 | 80 |
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
|
72 |
- _ContentAddressableStorageServicer(artifactcache), server)
|
|
81 |
+ _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
|
|
73 | 82 |
|
74 | 83 |
remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
|
75 | 84 |
_CapabilitiesServicer(), server)
|
... | ... | @@ -89,6 +98,8 @@ def create_server(repo, *, enable_push): |
89 | 98 |
help="Allow clients to upload blobs and update artifact cache")
|
90 | 99 |
@click.argument('repo')
|
91 | 100 |
def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
|
101 |
+ faulthandler.register(signal.SIGUSR1, all_threads=True)
|
|
102 |
+ |
|
92 | 103 |
server = create_server(repo, enable_push=enable_push)
|
93 | 104 |
|
94 | 105 |
use_tls = bool(server_key)
|
... | ... | @@ -130,11 +141,43 @@ def server_main(repo, port, server_key, server_cert, client_certs, enable_push): |
130 | 141 |
server.stop(0)
|
131 | 142 |
|
132 | 143 |
|
144 |
+class _FallocateCall:
|
|
145 |
+ |
|
146 |
+ FALLOC_FL_KEEP_SIZE = 1
|
|
147 |
+ FALLOC_FL_PUNCH_HOLE = 2
|
|
148 |
+ FALLOC_FL_NO_HIDE_STALE = 4
|
|
149 |
+ FALLOC_FL_COLLAPSE_RANGE = 8
|
|
150 |
+ FALLOC_FL_ZERO_RANGE = 16
|
|
151 |
+ FALLOC_FL_INSERT_RANGE = 32
|
|
152 |
+ FALLOC_FL_UNSHARE_RANGE = 64
|
|
153 |
+ |
|
154 |
+ def __init__(self):
|
|
155 |
+ self.libc = ctypes.CDLL("libc.so.6", use_errno=True)
|
|
156 |
+ try:
|
|
157 |
+ self.fallocate64 = self.libc.fallocate64
|
|
158 |
+ except AttributeError:
|
|
159 |
+ self.fallocate = self.libc.fallocate
|
|
160 |
+ |
|
161 |
+ def __call__(self, fd, mode, offset, length):
|
|
162 |
+ if hasattr(self, 'fallocate64'):
|
|
163 |
+ print(fd, mode, offset, length)
|
|
164 |
+ ret = self.fallocate64(ctypes.c_int(fd), ctypes.c_int(mode),
|
|
165 |
+ ctypes.c_int64(offset), ctypes.c_int64(length))
|
|
166 |
+ else:
|
|
167 |
+ ret = self.fallocate(ctypes.c_int(fd), ctypes.c_int(mode),
|
|
168 |
+ ctypes.c_int(offset), ctypes.c_int(length))
|
|
169 |
+ if ret == -1:
|
|
170 |
+ errno = ctypes.get_errno()
|
|
171 |
+ raise OSError(errno, os.strerror(errno))
|
|
172 |
+ return ret
|
|
173 |
+ |
|
174 |
+ |
|
133 | 175 |
class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
|
134 | 176 |
def __init__(self, cas, *, enable_push):
|
135 | 177 |
super().__init__()
|
136 | 178 |
self.cas = cas
|
137 | 179 |
self.enable_push = enable_push
|
180 |
+ self.fallocate = _FallocateCall()
|
|
138 | 181 |
|
139 | 182 |
def Read(self, request, context):
|
140 | 183 |
resource_name = request.resource_name
|
... | ... | @@ -158,7 +201,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
158 | 201 |
|
159 | 202 |
remaining = client_digest.size_bytes - request.read_offset
|
160 | 203 |
while remaining > 0:
|
161 |
- chunk_size = min(remaining, 64 * 1024)
|
|
204 |
+ chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
|
|
162 | 205 |
remaining -= chunk_size
|
163 | 206 |
|
164 | 207 |
response = bytestream_pb2.ReadResponse()
|
... | ... | @@ -192,25 +235,44 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
192 | 235 |
context.set_code(grpc.StatusCode.NOT_FOUND)
|
193 | 236 |
return response
|
194 | 237 |
|
195 |
- try:
|
|
196 |
- _clean_up_cache(self.cas, client_digest.size_bytes)
|
|
197 |
- except ArtifactTooLargeException as e:
|
|
198 |
- context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
|
|
199 |
- context.set_details(str(e))
|
|
200 |
- return response
|
|
238 |
+ while True:
|
|
239 |
+ if client_digest.size_bytes == 0:
|
|
240 |
+ break
|
|
241 |
+ try:
|
|
242 |
+ _clean_up_cache(self.cas, client_digest.size_bytes)
|
|
243 |
+ except ArtifactTooLargeException as e:
|
|
244 |
+ context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
|
|
245 |
+ context.set_details(str(e))
|
|
246 |
+ return response
|
|
247 |
+ |
|
248 |
+ try:
|
|
249 |
+ self.fallocate(out.fileno(), 0, 0, client_digest.size_bytes)
|
|
250 |
+ break
|
|
251 |
+ except OSError as e:
|
|
252 |
+ # Multiple upload can happen in the same time
|
|
253 |
+ if e.errno != errno.ENOSPC:
|
|
254 |
+ raise
|
|
255 |
+ |
|
201 | 256 |
elif request.resource_name:
|
202 | 257 |
# If it is set on subsequent calls, it **must** match the value of the first request.
|
203 | 258 |
if request.resource_name != resource_name:
|
204 | 259 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
205 | 260 |
return response
|
261 |
+ |
|
262 |
+ if (offset + len(request.data)) > client_digest.size_bytes:
|
|
263 |
+ context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
|
264 |
+ return response
|
|
265 |
+ |
|
206 | 266 |
out.write(request.data)
|
267 |
+ |
|
207 | 268 |
offset += len(request.data)
|
269 |
+ |
|
208 | 270 |
if request.finish_write:
|
209 | 271 |
if client_digest.size_bytes != offset:
|
210 | 272 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
211 | 273 |
return response
|
212 | 274 |
out.flush()
|
213 |
- digest = self.cas.add_object(path=out.name)
|
|
275 |
+ digest = self.cas.add_object(path=out.name, link_directly=True)
|
|
214 | 276 |
if digest.hash != client_digest.hash:
|
215 | 277 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
216 | 278 |
return response
|
... | ... | @@ -223,17 +285,25 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
223 | 285 |
|
224 | 286 |
|
225 | 287 |
class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
|
226 |
- def __init__(self, cas):
|
|
288 |
+ def __init__(self, cas, *, enable_push):
|
|
227 | 289 |
super().__init__()
|
228 | 290 |
self.cas = cas
|
291 |
+ self.enable_push = enable_push
|
|
229 | 292 |
|
230 | 293 |
def FindMissingBlobs(self, request, context):
|
231 | 294 |
response = remote_execution_pb2.FindMissingBlobsResponse()
|
232 | 295 |
for digest in request.blob_digests:
|
233 |
- if not _has_object(self.cas, digest):
|
|
234 |
- d = response.missing_blob_digests.add()
|
|
235 |
- d.hash = digest.hash
|
|
236 |
- d.size_bytes = digest.size_bytes
|
|
296 |
+ objpath = self.cas.objpath(digest)
|
|
297 |
+ try:
|
|
298 |
+ os.utime(objpath)
|
|
299 |
+ except OSError as e:
|
|
300 |
+ if e.errno != errno.ENOENT:
|
|
301 |
+ raise
|
|
302 |
+ else:
|
|
303 |
+ d = response.missing_blob_digests.add()
|
|
304 |
+ d.hash = digest.hash
|
|
305 |
+ d.size_bytes = digest.size_bytes
|
|
306 |
+ |
|
237 | 307 |
return response
|
238 | 308 |
|
239 | 309 |
def BatchReadBlobs(self, request, context):
|
... | ... | @@ -242,7 +312,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres |
242 | 312 |
|
243 | 313 |
for digest in request.digests:
|
244 | 314 |
batch_size += digest.size_bytes
|
245 |
- if batch_size > _MAX_BATCH_TOTAL_SIZE_BYTES:
|
|
315 |
+ if batch_size > _MAX_PAYLOAD_BYTES:
|
|
246 | 316 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
247 | 317 |
return response
|
248 | 318 |
|
... | ... | @@ -261,6 +331,46 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres |
261 | 331 |
|
262 | 332 |
return response
|
263 | 333 |
|
334 |
+ def BatchUpdateBlobs(self, request, context):
|
|
335 |
+ response = remote_execution_pb2.BatchUpdateBlobsResponse()
|
|
336 |
+ |
|
337 |
+ if not self.enable_push:
|
|
338 |
+ context.set_code(grpc.StatusCode.PERMISSION_DENIED)
|
|
339 |
+ return response
|
|
340 |
+ |
|
341 |
+ batch_size = 0
|
|
342 |
+ |
|
343 |
+ for blob_request in request.requests:
|
|
344 |
+ digest = blob_request.digest
|
|
345 |
+ |
|
346 |
+ batch_size += digest.size_bytes
|
|
347 |
+ if batch_size > _MAX_PAYLOAD_BYTES:
|
|
348 |
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
|
349 |
+ return response
|
|
350 |
+ |
|
351 |
+ blob_response = response.responses.add()
|
|
352 |
+ blob_response.digest.hash = digest.hash
|
|
353 |
+ blob_response.digest.size_bytes = digest.size_bytes
|
|
354 |
+ |
|
355 |
+ if len(blob_request.data) != digest.size_bytes:
|
|
356 |
+ blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
|
|
357 |
+ continue
|
|
358 |
+ |
|
359 |
+ try:
|
|
360 |
+ _clean_up_cache(self.cas, digest.size_bytes)
|
|
361 |
+ |
|
362 |
+ with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
|
|
363 |
+ out.write(blob_request.data)
|
|
364 |
+ out.flush()
|
|
365 |
+ server_digest = self.cas.add_object(path=out.name)
|
|
366 |
+ if server_digest.hash != digest.hash:
|
|
367 |
+ blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
|
|
368 |
+ |
|
369 |
+ except ArtifactTooLargeException:
|
|
370 |
+ blob_response.status.code = grpc.StatusCode.RESOURCE_EXHAUSTED
|
|
371 |
+ |
|
372 |
+ return response
|
|
373 |
+ |
|
264 | 374 |
|
265 | 375 |
class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer):
|
266 | 376 |
def GetCapabilities(self, request, context):
|
... | ... | @@ -269,7 +379,7 @@ class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer): |
269 | 379 |
cache_capabilities = response.cache_capabilities
|
270 | 380 |
cache_capabilities.digest_function.append(remote_execution_pb2.SHA256)
|
271 | 381 |
cache_capabilities.action_cache_update_capabilities.update_enabled = False
|
272 |
- cache_capabilities.max_batch_total_size_bytes = _MAX_BATCH_TOTAL_SIZE_BYTES
|
|
382 |
+ cache_capabilities.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
|
|
273 | 383 |
cache_capabilities.symlink_absolute_path_strategy = remote_execution_pb2.CacheCapabilities.ALLOWED
|
274 | 384 |
|
275 | 385 |
response.deprecated_api_version.major = 2
|
... | ... | @@ -362,11 +472,6 @@ def _digest_from_upload_resource_name(resource_name): |
362 | 472 |
return None
|
363 | 473 |
|
364 | 474 |
|
365 |
-def _has_object(cas, digest):
|
|
366 |
- objpath = cas.objpath(digest)
|
|
367 |
- return os.path.exists(objpath)
|
|
368 |
- |
|
369 |
- |
|
370 | 475 |
# _clean_up_cache()
|
371 | 476 |
#
|
372 | 477 |
# Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
|
... | ... | @@ -399,7 +504,14 @@ def _clean_up_cache(cas, object_size): |
399 | 504 |
# obtain a list of LRP artifacts
|
400 | 505 |
LRP_artifacts = cas.list_artifacts()
|
401 | 506 |
|
507 |
+ keep_after = time.time() - _OBJECT_MIN_AGE
|
|
508 |
+ |
|
402 | 509 |
removed_size = 0 # in bytes
|
510 |
+ if object_size - removed_size > free_disk_space:
|
|
511 |
+ # First we try to see if some unreferenced objects became old
|
|
512 |
+ # enough to be removed.
|
|
513 |
+ removed_size += cas.prune(keep_after=keep_after)
|
|
514 |
+ |
|
403 | 515 |
while object_size - removed_size > free_disk_space:
|
404 | 516 |
try:
|
405 | 517 |
to_remove = LRP_artifacts.pop(0) # The first element in the list is the LRP artifact
|
... | ... | @@ -411,7 +523,8 @@ def _clean_up_cache(cas, object_size): |
411 | 523 |
"the filesystem which mounts the remote "
|
412 | 524 |
"cache".format(object_size))
|
413 | 525 |
|
414 |
- removed_size += cas.remove(to_remove, defer_prune=False)
|
|
526 |
+ cas.remove(to_remove, defer_prune=True)
|
|
527 |
+ removed_size += cas.prune(keep_after=keep_after)
|
|
415 | 528 |
|
416 | 529 |
if removed_size > 0:
|
417 | 530 |
logging.info("Successfully removed {} bytes from the cache".format(removed_size))
|
... | ... | @@ -119,6 +119,8 @@ class Job(): |
119 | 119 |
self._result = None # Return value of child action in the parent
|
120 | 120 |
self._tries = 0 # Try count, for retryable jobs
|
121 | 121 |
self._skipped_flag = False # Indicate whether the job was skipped.
|
122 |
+ self._terminated = False # Whether this job has been explicitly terminated
|
|
123 |
+ |
|
122 | 124 |
# If False, a retry will not be attempted regardless of whether _tries is less than _max_retries.
|
123 | 125 |
#
|
124 | 126 |
self._retry_flag = True
|
... | ... | @@ -188,6 +190,8 @@ class Job(): |
188 | 190 |
# Terminate the process using multiprocessing API pathway
|
189 | 191 |
self._process.terminate()
|
190 | 192 |
|
193 |
+ self._terminated = True
|
|
194 |
+ |
|
191 | 195 |
# terminate_wait()
|
192 | 196 |
#
|
193 | 197 |
# Wait for terminated jobs to complete
|
... | ... | @@ -271,18 +275,22 @@ class Job(): |
271 | 275 |
# running the integration commands).
|
272 | 276 |
#
|
273 | 277 |
# Args:
|
274 |
- # (int): The plugin identifier for this task
|
|
278 |
+ # task_id (int): The plugin identifier for this task
|
|
275 | 279 |
#
|
276 | 280 |
def set_task_id(self, task_id):
|
277 | 281 |
self._task_id = task_id
|
278 | 282 |
|
279 | 283 |
# skipped
|
280 | 284 |
#
|
285 |
+ # This will evaluate to True if the job was skipped
|
|
286 |
+ # during processing, or if it was forcefully terminated.
|
|
287 |
+ #
|
|
281 | 288 |
# Returns:
|
282 |
- # bool: True if the job was skipped while processing.
|
|
289 |
+ # (bool): Whether the job should appear as skipped
|
|
290 |
+ #
|
|
283 | 291 |
@property
|
284 | 292 |
def skipped(self):
|
285 |
- return self._skipped_flag
|
|
293 |
+ return self._skipped_flag or self._terminated
|
|
286 | 294 |
|
287 | 295 |
#######################################################
|
288 | 296 |
# Abstract Methods #
|
... | ... | @@ -65,7 +65,7 @@ class BuildQueue(Queue): |
65 | 65 |
# If the estimated size outgrows the quota, ask the scheduler
|
66 | 66 |
# to queue a job to actually check the real cache size.
|
67 | 67 |
#
|
68 |
- if artifacts.get_quota_exceeded():
|
|
68 |
+ if artifacts.has_quota_exceeded():
|
|
69 | 69 |
self._scheduler.check_cache_size()
|
70 | 70 |
|
71 | 71 |
def done(self, job, element, result, success):
|
... | ... | @@ -325,15 +325,22 @@ class Queue(): |
325 | 325 |
detail=traceback.format_exc())
|
326 | 326 |
self.failed_elements.append(element)
|
327 | 327 |
else:
|
328 |
- |
|
329 |
- # No exception occured, handle the success/failure state in the normal way
|
|
330 | 328 |
#
|
329 |
+ # No exception occured in post processing
|
|
330 |
+ #
|
|
331 |
+ |
|
332 |
+ # Only place in the output done queue if the job
|
|
333 |
+ # was considered successful
|
|
331 | 334 |
if success:
|
332 | 335 |
self._done_queue.append(job)
|
333 |
- if not job.skipped:
|
|
334 |
- self.processed_elements.append(element)
|
|
335 |
- else:
|
|
336 |
- self.skipped_elements.append(element)
|
|
336 |
+ |
|
337 |
+ # A Job can be skipped whether or not it has failed,
|
|
338 |
+ # we want to only bookkeep them as processed or failed
|
|
339 |
+ # if they are not skipped.
|
|
340 |
+ if job.skipped:
|
|
341 |
+ self.skipped_elements.append(element)
|
|
342 |
+ elif success:
|
|
343 |
+ self.processed_elements.append(element)
|
|
337 | 344 |
else:
|
338 | 345 |
self.failed_elements.append(element)
|
339 | 346 |
|
... | ... | @@ -349,7 +349,7 @@ class Scheduler(): |
349 | 349 |
platform = Platform.get_platform()
|
350 | 350 |
artifacts = platform.artifactcache
|
351 | 351 |
|
352 |
- if not artifacts.get_quota_exceeded():
|
|
352 |
+ if not artifacts.has_quota_exceeded():
|
|
353 | 353 |
return
|
354 | 354 |
|
355 | 355 |
job = CleanupJob(self, 'cleanup', 'cleanup/cleanup',
|
... | ... | @@ -387,6 +387,15 @@ class Scheduler(): |
387 | 387 |
# A loop registered event callback for keyboard interrupts
|
388 | 388 |
#
|
389 | 389 |
def _interrupt_event(self):
|
390 |
+ |
|
391 |
+ # FIXME: This should not be needed, but for some reason we receive an
|
|
392 |
+ # additional SIGINT event when the user hits ^C a second time
|
|
393 |
+ # to inform us that they really intend to terminate; even though
|
|
394 |
+ # we have disconnected our handlers at this time.
|
|
395 |
+ #
|
|
396 |
+ if self.terminated:
|
|
397 |
+ return
|
|
398 |
+ |
|
390 | 399 |
# Leave this to the frontend to decide, if no
|
391 | 400 |
# interrrupt callback was specified, then just terminate.
|
392 | 401 |
if self._interrupt_callback:
|
... | ... | @@ -164,10 +164,18 @@ class GitMirror(SourceFetcher): |
164 | 164 |
cwd=self.mirror)
|
165 | 165 |
|
166 | 166 |
def fetch(self, alias_override=None):
|
167 |
- self.ensure(alias_override)
|
|
168 |
- if not self.has_ref():
|
|
169 |
- self._fetch(alias_override)
|
|
170 |
- self.assert_ref()
|
|
167 |
+ # Resolve the URL for the message
|
|
168 |
+ resolved_url = self.source.translate_url(self.url,
|
|
169 |
+ alias_override=alias_override,
|
|
170 |
+ primary=self.primary)
|
|
171 |
+ |
|
172 |
+ with self.source.timed_activity("Fetching from {}"
|
|
173 |
+ .format(resolved_url),
|
|
174 |
+ silent_nested=True):
|
|
175 |
+ self.ensure(alias_override)
|
|
176 |
+ if not self.has_ref():
|
|
177 |
+ self._fetch(alias_override)
|
|
178 |
+ self.assert_ref()
|
|
171 | 179 |
|
172 | 180 |
def has_ref(self):
|
173 | 181 |
if not self.ref:
|
... | ... | @@ -585,28 +585,48 @@ class Source(Plugin): |
585 | 585 |
#
|
586 | 586 |
def _fetch(self):
|
587 | 587 |
project = self._get_project()
|
588 |
- source_fetchers = self.get_source_fetchers()
|
|
588 |
+ context = self._get_context()
|
|
589 |
+ |
|
590 |
+ # Silence the STATUS messages which might happen as a result
|
|
591 |
+ # of checking the source fetchers.
|
|
592 |
+ with context.silence():
|
|
593 |
+ source_fetchers = self.get_source_fetchers()
|
|
589 | 594 |
|
590 | 595 |
# Use the source fetchers if they are provided
|
591 | 596 |
#
|
592 | 597 |
if source_fetchers:
|
593 |
- for fetcher in source_fetchers:
|
|
594 |
- alias = fetcher._get_alias()
|
|
595 |
- for uri in project.get_alias_uris(alias, first_pass=self.__first_pass):
|
|
596 |
- try:
|
|
597 |
- fetcher.fetch(uri)
|
|
598 |
- # FIXME: Need to consider temporary vs. permanent failures,
|
|
599 |
- # and how this works with retries.
|
|
600 |
- except BstError as e:
|
|
601 |
- last_error = e
|
|
602 |
- continue
|
|
603 |
- |
|
604 |
- # No error, we're done with this fetcher
|
|
605 |
- break
|
|
606 | 598 |
|
607 |
- else:
|
|
608 |
- # No break occurred, raise the last detected error
|
|
609 |
- raise last_error
|
|
599 |
+ # Use a contorted loop here, this is to allow us to
|
|
600 |
+ # silence the messages which can result from consuming
|
|
601 |
+ # the items of source_fetchers, if it happens to be a generator.
|
|
602 |
+ #
|
|
603 |
+ source_fetchers = iter(source_fetchers)
|
|
604 |
+ try:
|
|
605 |
+ |
|
606 |
+ while True:
|
|
607 |
+ |
|
608 |
+ with context.silence():
|
|
609 |
+ fetcher = next(source_fetchers)
|
|
610 |
+ |
|
611 |
+ alias = fetcher._get_alias()
|
|
612 |
+ for uri in project.get_alias_uris(alias, first_pass=self.__first_pass):
|
|
613 |
+ try:
|
|
614 |
+ fetcher.fetch(uri)
|
|
615 |
+ # FIXME: Need to consider temporary vs. permanent failures,
|
|
616 |
+ # and how this works with retries.
|
|
617 |
+ except BstError as e:
|
|
618 |
+ last_error = e
|
|
619 |
+ continue
|
|
620 |
+ |
|
621 |
+ # No error, we're done with this fetcher
|
|
622 |
+ break
|
|
623 |
+ |
|
624 |
+ else:
|
|
625 |
+ # No break occurred, raise the last detected error
|
|
626 |
+ raise last_error
|
|
627 |
+ |
|
628 |
+ except StopIteration:
|
|
629 |
+ pass
|
|
610 | 630 |
|
611 | 631 |
# Default codepath is to reinstantiate the Source
|
612 | 632 |
#
|
... | ... | @@ -496,7 +496,7 @@ def get_bst_version(): |
496 | 496 |
|
497 | 497 |
@contextmanager
|
498 | 498 |
def save_file_atomic(filename, mode='w', *, buffering=-1, encoding=None,
|
499 |
- errors=None, newline=None, closefd=True, opener=None):
|
|
499 |
+ errors=None, newline=None, closefd=True, opener=None, tempdir=None):
|
|
500 | 500 |
"""Save a file with a temporary name and rename it into place when ready.
|
501 | 501 |
|
502 | 502 |
This is a context manager which is meant for saving data to files.
|
... | ... | @@ -523,8 +523,9 @@ def save_file_atomic(filename, mode='w', *, buffering=-1, encoding=None, |
523 | 523 |
# https://bugs.python.org/issue8604
|
524 | 524 |
|
525 | 525 |
assert os.path.isabs(filename), "The utils.save_file_atomic() parameter ``filename`` must be an absolute path"
|
526 |
- dirname = os.path.dirname(filename)
|
|
527 |
- fd, tempname = tempfile.mkstemp(dir=dirname)
|
|
526 |
+ if tempdir is None:
|
|
527 |
+ tempdir = os.path.dirname(filename)
|
|
528 |
+ fd, tempname = tempfile.mkstemp(dir=tempdir)
|
|
528 | 529 |
os.close(fd)
|
529 | 530 |
|
530 | 531 |
f = open(tempname, mode=mode, buffering=buffering, encoding=encoding,
|
... | ... | @@ -556,6 +557,9 @@ def save_file_atomic(filename, mode='w', *, buffering=-1, encoding=None, |
556 | 557 |
#
|
557 | 558 |
# Get the disk usage of a given directory in bytes.
|
558 | 559 |
#
|
560 |
+# This function assumes that files do not inadvertantly
|
|
561 |
+# disappear while this function is running.
|
|
562 |
+#
|
|
559 | 563 |
# Arguments:
|
560 | 564 |
# (str) The path whose size to check.
|
561 | 565 |
#
|
... | ... | @@ -675,7 +679,7 @@ def _force_rmtree(rootpath, **kwargs): |
675 | 679 |
|
676 | 680 |
try:
|
677 | 681 |
shutil.rmtree(rootpath, **kwargs)
|
678 |
- except shutil.Error as e:
|
|
682 |
+ except OSError as e:
|
|
679 | 683 |
raise UtilError("Failed to remove cache directory '{}': {}"
|
680 | 684 |
.format(rootpath, e))
|
681 | 685 |
|
... | ... | @@ -9,5 +9,5 @@ element-path: elements |
9 | 9 |
|
10 | 10 |
# Define some aliases for the tarballs we download
|
11 | 11 |
aliases:
|
12 |
- alpine: https://gnome7.codethink.co.uk/tarballs/
|
|
12 |
+ alpine: https://bst-integration-test-images.ams3.cdn.digitaloceanspaces.com/
|
|
13 | 13 |
gnu: http://ftpmirror.gnu.org/gnu/automake/
|
... | ... | @@ -9,4 +9,4 @@ element-path: elements |
9 | 9 |
|
10 | 10 |
# Define an alias for our alpine tarball
|
11 | 11 |
aliases:
|
12 |
- alpine: https://gnome7.codethink.co.uk/tarballs/
|
|
12 |
+ alpine: https://bst-integration-test-images.ams3.cdn.digitaloceanspaces.com/
|
... | ... | @@ -9,4 +9,4 @@ element-path: elements |
9 | 9 |
|
10 | 10 |
# Define an alias for our alpine tarball
|
11 | 11 |
aliases:
|
12 |
- alpine: https://gnome7.codethink.co.uk/tarballs/
|
|
12 |
+ alpine: https://bst-integration-test-images.ams3.cdn.digitaloceanspaces.com/
|
... | ... | @@ -231,6 +231,8 @@ def test_artifact_expires(cli, datafiles, tmpdir): |
231 | 231 |
assert cli.get_element_state(project, 'element2.bst') == 'cached'
|
232 | 232 |
assert_shared(cli, share, project, 'element2.bst')
|
233 | 233 |
|
234 |
+ share.make_all_objects_older()
|
|
235 |
+ |
|
234 | 236 |
# Create and build another element of 5 MB (This will exceed the free disk space available)
|
235 | 237 |
create_element_size('element3.bst', project, element_path, [], int(5e6))
|
236 | 238 |
result = cli.run(project=project, args=['build', 'element3.bst'])
|
... | ... | @@ -327,6 +329,8 @@ def test_recently_pulled_artifact_does_not_expire(cli, datafiles, tmpdir): |
327 | 329 |
# Ensure element1 is cached locally
|
328 | 330 |
assert cli.get_element_state(project, 'element1.bst') == 'cached'
|
329 | 331 |
|
332 |
+ share.make_all_objects_older()
|
|
333 |
+ |
|
330 | 334 |
# Create and build the element3 (of 5 MB)
|
331 | 335 |
create_element_size('element3.bst', project, element_path, [], int(5e6))
|
332 | 336 |
result = cli.run(project=project, args=['build', 'element3.bst'])
|
... | ... | @@ -2,7 +2,7 @@ |
2 | 2 |
name: test
|
3 | 3 |
element-path: elements
|
4 | 4 |
aliases:
|
5 |
- alpine: https://gnome7.codethink.co.uk/tarballs/
|
|
5 |
+ alpine: https://bst-integration-test-images.ams3.cdn.digitaloceanspaces.com/
|
|
6 | 6 |
project_dir: file://{project_dir}
|
7 | 7 |
options:
|
8 | 8 |
linux:
|
... | ... | @@ -122,6 +122,15 @@ class ArtifactShare(): |
122 | 122 |
except ArtifactError:
|
123 | 123 |
return False
|
124 | 124 |
|
125 |
+ def make_all_objects_older(self):
|
|
126 |
+ for root, dirs, files in os.walk(os.path.join(self.cas.casdir, 'objects')):
|
|
127 |
+ for name in files:
|
|
128 |
+ fullname = os.path.join(root, name)
|
|
129 |
+ st = os.stat(fullname)
|
|
130 |
+ mtime = st.st_mtime - 6 * 60 * 60
|
|
131 |
+ atime = st.st_atime - 6 * 60 * 60
|
|
132 |
+ os.utime(fullname, times=(atime, mtime))
|
|
133 |
+ |
|
125 | 134 |
# close():
|
126 | 135 |
#
|
127 | 136 |
# Remove the artifact share.
|