Valentin David pushed to branch valentindavid/cache_server_fill_up at BuildStream / buildstream
Commits:
-
4cfabce8
by Angelos Evripiotis at 2018-11-01T11:35:02Z
-
48860aac
by Tristan Van Berkom at 2018-11-01T12:01:04Z
-
d868b409
by Daniel Silverstone at 2018-11-01T13:40:24Z
-
7f79b9ce
by Tristan Van Berkom at 2018-11-01T14:25:57Z
-
a3036492
by Valentin David at 2018-11-02T10:14:01Z
-
6e35ad93
by Valentin David at 2018-11-02T10:14:01Z
-
5c589e47
by Valentin David at 2018-11-02T10:14:01Z
-
ac066763
by Valentin David at 2018-11-02T10:18:58Z
-
cd018d7c
by Valentin David at 2018-11-02T10:18:58Z
-
afc00580
by Valentin David at 2018-11-02T10:18:58Z
-
4dd0121c
by Valentin David at 2018-11-02T10:18:58Z
-
66ff4426
by Valentin David at 2018-11-02T10:18:58Z
-
724b9fb1
by Valentin David at 2018-11-02T10:18:58Z
-
121f07b7
by Valentin David at 2018-11-02T10:18:58Z
-
db8b48ee
by Valentin David at 2018-11-02T10:18:58Z
8 changed files:
- NEWS
- buildstream/_artifactcache/cascache.py
- buildstream/_artifactcache/casserver.py
- buildstream/_versions.py
- buildstream/_yaml.py
- buildstream/plugins/elements/manual.yaml
- tests/frontend/push.py
- tests/testutils/artifactshare.py
Changes:
| ... | ... | @@ -2,6 +2,12 @@ |
| 2 | 2 |
buildstream 1.3.1
|
| 3 | 3 |
=================
|
| 4 | 4 |
|
| 5 |
+ o BREAKING CHANGE: The 'manual' element lost its default 'MAKEFLAGS' and 'V'
|
|
| 6 |
+ environment variables. There is already a 'make' element with the same
|
|
| 7 |
+ variables. Note that this is a breaking change, it will require users to
|
|
| 8 |
+ make changes to their .bst files if they are expecting these environment
|
|
| 9 |
+ variables to be set.
|
|
| 10 |
+ |
|
| 5 | 11 |
o Failed builds are included in the cache as well.
|
| 6 | 12 |
`bst checkout` will provide anything in `%{install-root}`.
|
| 7 | 13 |
A build including cached fails will cause any dependant elements
|
| ... | ... | @@ -27,6 +27,7 @@ import stat |
| 27 | 27 |
import tempfile
|
| 28 | 28 |
import uuid
|
| 29 | 29 |
import errno
|
| 30 |
+import contextlib
|
|
| 30 | 31 |
from urllib.parse import urlparse
|
| 31 | 32 |
|
| 32 | 33 |
import grpc
|
| ... | ... | @@ -49,6 +50,13 @@ from . import ArtifactCache |
| 49 | 50 |
_MAX_PAYLOAD_BYTES = 1024 * 1024
|
| 50 | 51 |
|
| 51 | 52 |
|
| 53 |
+class BlobNotFound(ArtifactError):
|
|
| 54 |
+ |
|
| 55 |
+ def __init__(self, blob, msg):
|
|
| 56 |
+ self.blob = blob
|
|
| 57 |
+ super().__init__(msg)
|
|
| 58 |
+ |
|
| 59 |
+ |
|
| 52 | 60 |
# A CASCache manages artifacts in a CAS repository as specified in the
|
| 53 | 61 |
# Remote Execution API.
|
| 54 | 62 |
#
|
| ... | ... | @@ -264,6 +272,10 @@ class CASCache(ArtifactCache): |
| 264 | 272 |
element.info("Remote ({}) does not have {} cached".format(
|
| 265 | 273 |
remote.spec.url, element._get_brief_display_key()
|
| 266 | 274 |
))
|
| 275 |
+ except BlobNotFound as e:
|
|
| 276 |
+ element.info("Remote ({}) does not have {} cached (blob {} missing)".format(
|
|
| 277 |
+ remote.spec.url, element._get_brief_display_key(), e.blob
|
|
| 278 |
+ ))
|
|
| 267 | 279 |
|
| 268 | 280 |
return False
|
| 269 | 281 |
|
| ... | ... | @@ -452,13 +464,14 @@ class CASCache(ArtifactCache): |
| 452 | 464 |
# digest (Digest): An optional Digest object to populate
|
| 453 | 465 |
# path (str): Path to file to add
|
| 454 | 466 |
# buffer (bytes): Byte buffer to add
|
| 467 |
+ # link_directly (bool): Whether file given by path can be linked
|
|
| 455 | 468 |
#
|
| 456 | 469 |
# Returns:
|
| 457 | 470 |
# (Digest): The digest of the added object
|
| 458 | 471 |
#
|
| 459 | 472 |
# Either `path` or `buffer` must be passed, but not both.
|
| 460 | 473 |
#
|
| 461 |
- def add_object(self, *, digest=None, path=None, buffer=None):
|
|
| 474 |
+ def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False):
|
|
| 462 | 475 |
# Exactly one of the two parameters has to be specified
|
| 463 | 476 |
assert (path is None) != (buffer is None)
|
| 464 | 477 |
|
| ... | ... | @@ -468,28 +481,34 @@ class CASCache(ArtifactCache): |
| 468 | 481 |
try:
|
| 469 | 482 |
h = hashlib.sha256()
|
| 470 | 483 |
# Always write out new file to avoid corruption if input file is modified
|
| 471 |
- with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
|
|
| 472 |
- # Set mode bits to 0644
|
|
| 473 |
- os.chmod(out.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
|
|
| 474 |
- |
|
| 475 |
- if path:
|
|
| 476 |
- with open(path, 'rb') as f:
|
|
| 477 |
- for chunk in iter(lambda: f.read(4096), b""):
|
|
| 478 |
- h.update(chunk)
|
|
| 479 |
- out.write(chunk)
|
|
| 484 |
+ with contextlib.ExitStack() as stack:
|
|
| 485 |
+ if path is not None and link_directly:
|
|
| 486 |
+ tmp = stack.enter_context(open(path, 'rb'))
|
|
| 487 |
+ for chunk in iter(lambda: tmp.read(4096), b""):
|
|
| 488 |
+ h.update(chunk)
|
|
| 480 | 489 |
else:
|
| 481 |
- h.update(buffer)
|
|
| 482 |
- out.write(buffer)
|
|
| 490 |
+ tmp = stack.enter_context(tempfile.NamedTemporaryFile(dir=self.tmpdir))
|
|
| 491 |
+ # Set mode bits to 0644
|
|
| 492 |
+ os.chmod(tmp.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
|
|
| 483 | 493 |
|
| 484 |
- out.flush()
|
|
| 494 |
+ if path:
|
|
| 495 |
+ with open(path, 'rb') as f:
|
|
| 496 |
+ for chunk in iter(lambda: f.read(4096), b""):
|
|
| 497 |
+ h.update(chunk)
|
|
| 498 |
+ tmp.write(chunk)
|
|
| 499 |
+ else:
|
|
| 500 |
+ h.update(buffer)
|
|
| 501 |
+ tmp.write(buffer)
|
|
| 502 |
+ |
|
| 503 |
+ tmp.flush()
|
|
| 485 | 504 |
|
| 486 | 505 |
digest.hash = h.hexdigest()
|
| 487 |
- digest.size_bytes = os.fstat(out.fileno()).st_size
|
|
| 506 |
+ digest.size_bytes = os.fstat(tmp.fileno()).st_size
|
|
| 488 | 507 |
|
| 489 | 508 |
# Place file at final location
|
| 490 | 509 |
objpath = self.objpath(digest)
|
| 491 | 510 |
os.makedirs(os.path.dirname(objpath), exist_ok=True)
|
| 492 |
- os.link(out.name, objpath)
|
|
| 511 |
+ os.link(tmp.name, objpath)
|
|
| 493 | 512 |
|
| 494 | 513 |
except FileExistsError as e:
|
| 495 | 514 |
# We can ignore the failed link() if the object is already in the repo.
|
| ... | ... | @@ -574,6 +593,41 @@ class CASCache(ArtifactCache): |
| 574 | 593 |
# first element of this list will be the file modified earliest.
|
| 575 | 594 |
return [ref for _, ref in sorted(zip(mtimes, refs))]
|
| 576 | 595 |
|
| 596 |
+ # list_objects():
|
|
| 597 |
+ #
|
|
| 598 |
+ # List cached objects in Least Recently Modified (LRM) order.
|
|
| 599 |
+ #
|
|
| 600 |
+ # Returns:
|
|
| 601 |
+ # (list) - A list of objects and timestamps in LRM order
|
|
| 602 |
+ #
|
|
| 603 |
+ def list_objects(self):
|
|
| 604 |
+ objs = []
|
|
| 605 |
+ mtimes = []
|
|
| 606 |
+ |
|
| 607 |
+ for root, _, files in os.walk(os.path.join(self.casdir, 'objects')):
|
|
| 608 |
+ for filename in files:
|
|
| 609 |
+ obj_path = os.path.join(root, filename)
|
|
| 610 |
+ try:
|
|
| 611 |
+ mtimes.append(os.path.getmtime(obj_path))
|
|
| 612 |
+ except FileNotFoundError:
|
|
| 613 |
+ pass
|
|
| 614 |
+ else:
|
|
| 615 |
+ objs.append(obj_path)
|
|
| 616 |
+ |
|
| 617 |
+ # NOTE: Sorted will sort from earliest to latest, thus the
|
|
| 618 |
+ # first element of this list will be the file modified earliest.
|
|
| 619 |
+ return sorted(zip(mtimes, objs))
|
|
| 620 |
+ |
|
| 621 |
+ def clean_up_refs_until(self, time):
|
|
| 622 |
+ ref_heads = os.path.join(self.casdir, 'refs', 'heads')
|
|
| 623 |
+ |
|
| 624 |
+ for root, _, files in os.walk(ref_heads):
|
|
| 625 |
+ for filename in files:
|
|
| 626 |
+ ref_path = os.path.join(root, filename)
|
|
| 627 |
+ # Obtain the mtime (the time a file was last modified)
|
|
| 628 |
+ if os.path.getmtime(ref_path) < time:
|
|
| 629 |
+ os.unlink(ref_path)
|
|
| 630 |
+ |
|
| 577 | 631 |
# remove():
|
| 578 | 632 |
#
|
| 579 | 633 |
# Removes the given symbolic ref from the repo.
|
| ... | ... | @@ -625,7 +679,12 @@ class CASCache(ArtifactCache): |
| 625 | 679 |
#
|
| 626 | 680 |
# Prune unreachable objects from the repo.
|
| 627 | 681 |
#
|
| 628 |
- def prune(self):
|
|
| 682 |
+ # Args:
|
|
| 683 |
+ # keep_after (int|None): timestamp after which unreachable objects
|
|
| 684 |
+ # are kept. None if no unreachable object
|
|
| 685 |
+ # should be kept.
|
|
| 686 |
+ #
|
|
| 687 |
+ def prune(self, keep_after=None):
|
|
| 629 | 688 |
ref_heads = os.path.join(self.casdir, 'refs', 'heads')
|
| 630 | 689 |
|
| 631 | 690 |
pruned = 0
|
| ... | ... | @@ -646,11 +705,19 @@ class CASCache(ArtifactCache): |
| 646 | 705 |
objhash = os.path.basename(root) + filename
|
| 647 | 706 |
if objhash not in reachable:
|
| 648 | 707 |
obj_path = os.path.join(root, filename)
|
| 708 |
+ if keep_after:
|
|
| 709 |
+ st = os.stat(obj_path)
|
|
| 710 |
+ if st.st_mtime >= keep_after:
|
|
| 711 |
+ continue
|
|
| 649 | 712 |
pruned += os.stat(obj_path).st_size
|
| 650 | 713 |
os.unlink(obj_path)
|
| 651 | 714 |
|
| 652 | 715 |
return pruned
|
| 653 | 716 |
|
| 717 |
+ def update_tree_mtime(self, tree):
|
|
| 718 |
+ reachable = set()
|
|
| 719 |
+ self._reachable_refs_dir(reachable, tree, update_mtime=True)
|
|
| 720 |
+ |
|
| 654 | 721 |
################################################
|
| 655 | 722 |
# Local Private Methods #
|
| 656 | 723 |
################################################
|
| ... | ... | @@ -795,7 +862,7 @@ class CASCache(ArtifactCache): |
| 795 | 862 |
a += 1
|
| 796 | 863 |
b += 1
|
| 797 | 864 |
|
| 798 |
- def _reachable_refs_dir(self, reachable, tree):
|
|
| 865 |
+ def _reachable_refs_dir(self, reachable, tree, update_mtime=False):
|
|
| 799 | 866 |
if tree.hash in reachable:
|
| 800 | 867 |
return
|
| 801 | 868 |
|
| ... | ... | @@ -807,10 +874,14 @@ class CASCache(ArtifactCache): |
| 807 | 874 |
directory.ParseFromString(f.read())
|
| 808 | 875 |
|
| 809 | 876 |
for filenode in directory.files:
|
| 877 |
+ if update_mtime:
|
|
| 878 |
+ os.utime(self.objpath(filenode.digest))
|
|
| 810 | 879 |
reachable.add(filenode.digest.hash)
|
| 811 | 880 |
|
| 812 | 881 |
for dirnode in directory.directories:
|
| 813 |
- self._reachable_refs_dir(reachable, dirnode.digest)
|
|
| 882 |
+ if update_mtime:
|
|
| 883 |
+ os.utime(self.objpath(dirnode.digest))
|
|
| 884 |
+ self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime)
|
|
| 814 | 885 |
|
| 815 | 886 |
def _initialize_remote(self, remote_spec, q):
|
| 816 | 887 |
try:
|
| ... | ... | @@ -887,7 +958,7 @@ class CASCache(ArtifactCache): |
| 887 | 958 |
with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
|
| 888 | 959 |
self._fetch_blob(remote, digest, f)
|
| 889 | 960 |
|
| 890 |
- added_digest = self.add_object(path=f.name)
|
|
| 961 |
+ added_digest = self.add_object(path=f.name, link_directly=True)
|
|
| 891 | 962 |
assert added_digest.hash == digest.hash
|
| 892 | 963 |
|
| 893 | 964 |
return objpath
|
| ... | ... | @@ -898,7 +969,7 @@ class CASCache(ArtifactCache): |
| 898 | 969 |
f.write(data)
|
| 899 | 970 |
f.flush()
|
| 900 | 971 |
|
| 901 |
- added_digest = self.add_object(path=f.name)
|
|
| 972 |
+ added_digest = self.add_object(path=f.name, link_directly=True)
|
|
| 902 | 973 |
assert added_digest.hash == digest.hash
|
| 903 | 974 |
|
| 904 | 975 |
# Helper function for _fetch_directory().
|
| ... | ... | @@ -1202,6 +1273,9 @@ class _CASBatchRead(): |
| 1202 | 1273 |
batch_response = self._remote.cas.BatchReadBlobs(self._request)
|
| 1203 | 1274 |
|
| 1204 | 1275 |
for response in batch_response.responses:
|
| 1276 |
+ if response.status.code == code_pb2.NOT_FOUND:
|
|
| 1277 |
+ raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format(
|
|
| 1278 |
+ response.digest.hash, response.status.code))
|
|
| 1205 | 1279 |
if response.status.code != code_pb2.OK:
|
| 1206 | 1280 |
raise ArtifactError("Failed to download blob {}: {}".format(
|
| 1207 | 1281 |
response.digest.hash, response.status.code))
|
| ... | ... | @@ -24,6 +24,9 @@ import signal |
| 24 | 24 |
import sys
|
| 25 | 25 |
import tempfile
|
| 26 | 26 |
import uuid
|
| 27 |
+import errno
|
|
| 28 |
+import ctypes
|
|
| 29 |
+import threading
|
|
| 27 | 30 |
|
| 28 | 31 |
import click
|
| 29 | 32 |
import grpc
|
| ... | ... | @@ -31,6 +34,7 @@ import grpc |
| 31 | 34 |
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
| 32 | 35 |
from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
|
| 33 | 36 |
from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
|
| 37 |
+from .._protos.google.rpc import code_pb2
|
|
| 34 | 38 |
|
| 35 | 39 |
from .._exceptions import ArtifactError
|
| 36 | 40 |
from .._context import Context
|
| ... | ... | @@ -40,6 +44,10 @@ from .._context import Context |
| 40 | 44 |
# Limit payload to 1 MiB to leave sufficient headroom for metadata.
|
| 41 | 45 |
_MAX_PAYLOAD_BYTES = 1024 * 1024
|
| 42 | 46 |
|
| 47 |
+# The minimum age in seconds for objects before they can be cleaned
|
|
| 48 |
+# up.
|
|
| 49 |
+_OBJECT_MIN_AGE = 6 * 60 * 60
|
|
| 50 |
+ |
|
| 43 | 51 |
|
| 44 | 52 |
# Trying to push an artifact that is too large
|
| 45 | 53 |
class ArtifactTooLargeException(Exception):
|
| ... | ... | @@ -54,7 +62,7 @@ class ArtifactTooLargeException(Exception): |
| 54 | 62 |
# repo (str): Path to CAS repository
|
| 55 | 63 |
# enable_push (bool): Whether to allow blob uploads and artifact updates
|
| 56 | 64 |
#
|
| 57 |
-def create_server(repo, *, enable_push):
|
|
| 65 |
+def create_server(repo, max_head_size, min_head_size, *, enable_push):
|
|
| 58 | 66 |
context = Context()
|
| 59 | 67 |
context.artifactdir = os.path.abspath(repo)
|
| 60 | 68 |
|
| ... | ... | @@ -64,11 +72,13 @@ def create_server(repo, *, enable_push): |
| 64 | 72 |
max_workers = (os.cpu_count() or 1) * 5
|
| 65 | 73 |
server = grpc.server(futures.ThreadPoolExecutor(max_workers))
|
| 66 | 74 |
|
| 75 |
+ cache_cleaner = _CacheCleaner(artifactcache, max_head_size, min_head_size)
|
|
| 76 |
+ |
|
| 67 | 77 |
bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
|
| 68 |
- _ByteStreamServicer(artifactcache, enable_push=enable_push), server)
|
|
| 78 |
+ _ByteStreamServicer(artifactcache, cache_cleaner, enable_push=enable_push), server)
|
|
| 69 | 79 |
|
| 70 | 80 |
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
|
| 71 |
- _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
|
|
| 81 |
+ _ContentAddressableStorageServicer(artifactcache, cache_cleaner, enable_push=enable_push), server)
|
|
| 72 | 82 |
|
| 73 | 83 |
remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
|
| 74 | 84 |
_CapabilitiesServicer(), server)
|
| ... | ... | @@ -86,9 +96,16 @@ def create_server(repo, *, enable_push): |
| 86 | 96 |
@click.option('--client-certs', help="Public client certificates for TLS (PEM-encoded)")
|
| 87 | 97 |
@click.option('--enable-push', default=False, is_flag=True,
|
| 88 | 98 |
help="Allow clients to upload blobs and update artifact cache")
|
| 99 |
+@click.option('--head-room-min', type=click.INT,
|
|
| 100 |
+ help="Disk head room minimum in bytes",
|
|
| 101 |
+ default=2e9)
|
|
| 102 |
+@click.option('--head-room-max', type=click.INT,
|
|
| 103 |
+ help="Disk head room maximum in bytes",
|
|
| 104 |
+ default=10e9)
|
|
| 89 | 105 |
@click.argument('repo')
|
| 90 |
-def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
|
|
| 91 |
- server = create_server(repo, enable_push=enable_push)
|
|
| 106 |
+def server_main(repo, port, server_key, server_cert, client_certs, enable_push,
|
|
| 107 |
+ head_room_min, head_room_max):
|
|
| 108 |
+ server = create_server(repo, head_room_max, head_room_min, enable_push=enable_push)
|
|
| 92 | 109 |
|
| 93 | 110 |
use_tls = bool(server_key)
|
| 94 | 111 |
|
| ... | ... | @@ -129,11 +146,43 @@ def server_main(repo, port, server_key, server_cert, client_certs, enable_push): |
| 129 | 146 |
server.stop(0)
|
| 130 | 147 |
|
| 131 | 148 |
|
| 149 |
+class _FallocateCall:
|
|
| 150 |
+ |
|
| 151 |
+ FALLOC_FL_KEEP_SIZE = 1
|
|
| 152 |
+ FALLOC_FL_PUNCH_HOLE = 2
|
|
| 153 |
+ FALLOC_FL_NO_HIDE_STALE = 4
|
|
| 154 |
+ FALLOC_FL_COLLAPSE_RANGE = 8
|
|
| 155 |
+ FALLOC_FL_ZERO_RANGE = 16
|
|
| 156 |
+ FALLOC_FL_INSERT_RANGE = 32
|
|
| 157 |
+ FALLOC_FL_UNSHARE_RANGE = 64
|
|
| 158 |
+ |
|
| 159 |
+ def __init__(self):
|
|
| 160 |
+ self.libc = ctypes.CDLL("libc.so.6", use_errno=True)
|
|
| 161 |
+ try:
|
|
| 162 |
+ self.fallocate64 = self.libc.fallocate64
|
|
| 163 |
+ except AttributeError:
|
|
| 164 |
+ self.fallocate = self.libc.fallocate
|
|
| 165 |
+ |
|
| 166 |
+ def __call__(self, fd, mode, offset, length):
|
|
| 167 |
+ if hasattr(self, 'fallocate64'):
|
|
| 168 |
+ ret = self.fallocate64(ctypes.c_int(fd), ctypes.c_int(mode),
|
|
| 169 |
+ ctypes.c_int64(offset), ctypes.c_int64(length))
|
|
| 170 |
+ else:
|
|
| 171 |
+ ret = self.fallocate(ctypes.c_int(fd), ctypes.c_int(mode),
|
|
| 172 |
+ ctypes.c_int(offset), ctypes.c_int(length))
|
|
| 173 |
+ if ret == -1:
|
|
| 174 |
+ err = ctypes.get_errno()
|
|
| 175 |
+ raise OSError(errno, os.strerror(err))
|
|
| 176 |
+ return ret
|
|
| 177 |
+ |
|
| 178 |
+ |
|
| 132 | 179 |
class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
|
| 133 |
- def __init__(self, cas, *, enable_push):
|
|
| 180 |
+ def __init__(self, cas, cache_cleaner, *, enable_push):
|
|
| 134 | 181 |
super().__init__()
|
| 135 | 182 |
self.cas = cas
|
| 136 | 183 |
self.enable_push = enable_push
|
| 184 |
+ self.fallocate = _FallocateCall()
|
|
| 185 |
+ self.cache_cleaner = cache_cleaner
|
|
| 137 | 186 |
|
| 138 | 187 |
def Read(self, request, context):
|
| 139 | 188 |
resource_name = request.resource_name
|
| ... | ... | @@ -191,25 +240,44 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
| 191 | 240 |
context.set_code(grpc.StatusCode.NOT_FOUND)
|
| 192 | 241 |
return response
|
| 193 | 242 |
|
| 194 |
- try:
|
|
| 195 |
- _clean_up_cache(self.cas, client_digest.size_bytes)
|
|
| 196 |
- except ArtifactTooLargeException as e:
|
|
| 197 |
- context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
|
|
| 198 |
- context.set_details(str(e))
|
|
| 199 |
- return response
|
|
| 243 |
+ while True:
|
|
| 244 |
+ if client_digest.size_bytes == 0:
|
|
| 245 |
+ break
|
|
| 246 |
+ try:
|
|
| 247 |
+ self.cache_cleaner.clean_up(client_digest.size_bytes)
|
|
| 248 |
+ except ArtifactTooLargeException as e:
|
|
| 249 |
+ context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
|
|
| 250 |
+ context.set_details(str(e))
|
|
| 251 |
+ return response
|
|
| 252 |
+ |
|
| 253 |
+ try:
|
|
| 254 |
+ self.fallocate(out.fileno(), 0, 0, client_digest.size_bytes)
|
|
| 255 |
+ break
|
|
| 256 |
+ except OSError as e:
|
|
| 257 |
+ # Multiple upload can happen in the same time
|
|
| 258 |
+ if e.errno != errno.ENOSPC:
|
|
| 259 |
+ raise
|
|
| 260 |
+ |
|
| 200 | 261 |
elif request.resource_name:
|
| 201 | 262 |
# If it is set on subsequent calls, it **must** match the value of the first request.
|
| 202 | 263 |
if request.resource_name != resource_name:
|
| 203 | 264 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
| 204 | 265 |
return response
|
| 266 |
+ |
|
| 267 |
+ if (offset + len(request.data)) > client_digest.size_bytes:
|
|
| 268 |
+ context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
|
| 269 |
+ return response
|
|
| 270 |
+ |
|
| 205 | 271 |
out.write(request.data)
|
| 272 |
+ |
|
| 206 | 273 |
offset += len(request.data)
|
| 274 |
+ |
|
| 207 | 275 |
if request.finish_write:
|
| 208 | 276 |
if client_digest.size_bytes != offset:
|
| 209 | 277 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
| 210 | 278 |
return response
|
| 211 | 279 |
out.flush()
|
| 212 |
- digest = self.cas.add_object(path=out.name)
|
|
| 280 |
+ digest = self.cas.add_object(path=out.name, link_directly=True)
|
|
| 213 | 281 |
if digest.hash != client_digest.hash:
|
| 214 | 282 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
| 215 | 283 |
return response
|
| ... | ... | @@ -222,18 +290,26 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
| 222 | 290 |
|
| 223 | 291 |
|
| 224 | 292 |
class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
|
| 225 |
- def __init__(self, cas, *, enable_push):
|
|
| 293 |
+ def __init__(self, cas, cache_cleaner, *, enable_push):
|
|
| 226 | 294 |
super().__init__()
|
| 227 | 295 |
self.cas = cas
|
| 228 | 296 |
self.enable_push = enable_push
|
| 297 |
+ self.cache_cleaner = cache_cleaner
|
|
| 229 | 298 |
|
| 230 | 299 |
def FindMissingBlobs(self, request, context):
|
| 231 | 300 |
response = remote_execution_pb2.FindMissingBlobsResponse()
|
| 232 | 301 |
for digest in request.blob_digests:
|
| 233 |
- if not _has_object(self.cas, digest):
|
|
| 234 |
- d = response.missing_blob_digests.add()
|
|
| 235 |
- d.hash = digest.hash
|
|
| 236 |
- d.size_bytes = digest.size_bytes
|
|
| 302 |
+ objpath = self.cas.objpath(digest)
|
|
| 303 |
+ try:
|
|
| 304 |
+ os.utime(objpath)
|
|
| 305 |
+ except OSError as e:
|
|
| 306 |
+ if e.errno != errno.ENOENT:
|
|
| 307 |
+ raise
|
|
| 308 |
+ else:
|
|
| 309 |
+ d = response.missing_blob_digests.add()
|
|
| 310 |
+ d.hash = digest.hash
|
|
| 311 |
+ d.size_bytes = digest.size_bytes
|
|
| 312 |
+ |
|
| 237 | 313 |
return response
|
| 238 | 314 |
|
| 239 | 315 |
def BatchReadBlobs(self, request, context):
|
| ... | ... | @@ -252,12 +328,12 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres |
| 252 | 328 |
try:
|
| 253 | 329 |
with open(self.cas.objpath(digest), 'rb') as f:
|
| 254 | 330 |
if os.fstat(f.fileno()).st_size != digest.size_bytes:
|
| 255 |
- blob_response.status.code = grpc.StatusCode.NOT_FOUND
|
|
| 331 |
+ blob_response.status.code = code_pb2.NOT_FOUND
|
|
| 256 | 332 |
continue
|
| 257 | 333 |
|
| 258 | 334 |
blob_response.data = f.read(digest.size_bytes)
|
| 259 | 335 |
except FileNotFoundError:
|
| 260 |
- blob_response.status.code = grpc.StatusCode.NOT_FOUND
|
|
| 336 |
+ blob_response.status.code = code_pb2.NOT_FOUND
|
|
| 261 | 337 |
|
| 262 | 338 |
return response
|
| 263 | 339 |
|
| ... | ... | @@ -287,7 +363,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres |
| 287 | 363 |
continue
|
| 288 | 364 |
|
| 289 | 365 |
try:
|
| 290 |
- _clean_up_cache(self.cas, digest.size_bytes)
|
|
| 366 |
+ self.cache_cleaner.clean_up(digest.size_bytes)
|
|
| 291 | 367 |
|
| 292 | 368 |
with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
|
| 293 | 369 |
out.write(blob_request.data)
|
| ... | ... | @@ -330,6 +406,12 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): |
| 330 | 406 |
|
| 331 | 407 |
try:
|
| 332 | 408 |
tree = self.cas.resolve_ref(request.key, update_mtime=True)
|
| 409 |
+ try:
|
|
| 410 |
+ self.cas.update_tree_mtime(tree)
|
|
| 411 |
+ except FileNotFoundError:
|
|
| 412 |
+ self.cas.remove(request.key, defer_prune=True)
|
|
| 413 |
+ context.set_code(grpc.StatusCode.NOT_FOUND)
|
|
| 414 |
+ return response
|
|
| 333 | 415 |
|
| 334 | 416 |
response.digest.hash = tree.hash
|
| 335 | 417 |
response.digest.size_bytes = tree.size_bytes
|
| ... | ... | @@ -402,60 +484,80 @@ def _digest_from_upload_resource_name(resource_name): |
| 402 | 484 |
return None
|
| 403 | 485 |
|
| 404 | 486 |
|
| 405 |
-def _has_object(cas, digest):
|
|
| 406 |
- objpath = cas.objpath(digest)
|
|
| 407 |
- return os.path.exists(objpath)
|
|
| 487 |
+class _CacheCleaner:
|
|
| 408 | 488 |
|
| 489 |
+ __cleanup_cache_lock = threading.Lock()
|
|
| 409 | 490 |
|
| 410 |
-# _clean_up_cache()
|
|
| 411 |
-#
|
|
| 412 |
-# Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
|
|
| 413 |
-# is enough space for the incoming artifact
|
|
| 414 |
-#
|
|
| 415 |
-# Args:
|
|
| 416 |
-# cas: CASCache object
|
|
| 417 |
-# object_size: The size of the object being received in bytes
|
|
| 418 |
-#
|
|
| 419 |
-# Returns:
|
|
| 420 |
-# int: The total bytes removed on the filesystem
|
|
| 421 |
-#
|
|
| 422 |
-def _clean_up_cache(cas, object_size):
|
|
| 423 |
- # Determine the available disk space, in bytes, of the file system
|
|
| 424 |
- # which mounts the repo
|
|
| 425 |
- stats = os.statvfs(cas.casdir)
|
|
| 426 |
- buffer_ = int(2e9) # Add a 2 GB buffer
|
|
| 427 |
- free_disk_space = (stats.f_bfree * stats.f_bsize) - buffer_
|
|
| 428 |
- total_disk_space = (stats.f_blocks * stats.f_bsize) - buffer_
|
|
| 429 |
- |
|
| 430 |
- if object_size > total_disk_space:
|
|
| 431 |
- raise ArtifactTooLargeException("Artifact of size: {} is too large for "
|
|
| 432 |
- "the filesystem which mounts the remote "
|
|
| 433 |
- "cache".format(object_size))
|
|
| 434 |
- |
|
| 435 |
- if object_size <= free_disk_space:
|
|
| 436 |
- # No need to clean up
|
|
| 437 |
- return 0
|
|
| 438 |
- |
|
| 439 |
- # obtain a list of LRP artifacts
|
|
| 440 |
- LRP_artifacts = cas.list_artifacts()
|
|
| 441 |
- |
|
| 442 |
- removed_size = 0 # in bytes
|
|
| 443 |
- while object_size - removed_size > free_disk_space:
|
|
| 444 |
- try:
|
|
| 445 |
- to_remove = LRP_artifacts.pop(0) # The first element in the list is the LRP artifact
|
|
| 446 |
- except IndexError:
|
|
| 447 |
- # This exception is caught if there are no more artifacts in the list
|
|
| 448 |
- # LRP_artifacts. This means the the artifact is too large for the filesystem
|
|
| 449 |
- # so we abort the process
|
|
| 450 |
- raise ArtifactTooLargeException("Artifact of size {} is too large for "
|
|
| 451 |
- "the filesystem which mounts the remote "
|
|
| 452 |
- "cache".format(object_size))
|
|
| 491 |
+ def __init__(self, cas, max_head_size, min_head_size=int(2e9)):
|
|
| 492 |
+ self.__cas = cas
|
|
| 493 |
+ self.__max_head_size = max_head_size
|
|
| 494 |
+ self.__min_head_size = min_head_size
|
|
| 453 | 495 |
|
| 454 |
- removed_size += cas.remove(to_remove, defer_prune=False)
|
|
| 496 |
+ def __has_space(self, object_size):
|
|
| 497 |
+ stats = os.statvfs(self.__cas.casdir)
|
|
| 498 |
+ free_disk_space = (stats.f_bavail * stats.f_bsize) - self.__min_head_size
|
|
| 499 |
+ total_disk_space = (stats.f_blocks * stats.f_bsize) - self.__min_head_size
|
|
| 455 | 500 |
|
| 456 |
- if removed_size > 0:
|
|
| 457 |
- logging.info("Successfully removed {} bytes from the cache".format(removed_size))
|
|
| 458 |
- else:
|
|
| 459 |
- logging.info("No artifacts were removed from the cache.")
|
|
| 501 |
+ if object_size > total_disk_space:
|
|
| 502 |
+ raise ArtifactTooLargeException("Artifact of size: {} is too large for "
|
|
| 503 |
+ "the filesystem which mounts the remote "
|
|
| 504 |
+ "cache".format(object_size))
|
|
| 460 | 505 |
|
| 461 |
- return removed_size
|
|
| 506 |
+ return object_size <= free_disk_space
|
|
| 507 |
+ |
|
| 508 |
+ # _clean_up_cache()
|
|
| 509 |
+ #
|
|
| 510 |
+ # Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
|
|
| 511 |
+ # is enough space for the incoming artifact
|
|
| 512 |
+ #
|
|
| 513 |
+ # Args:
|
|
| 514 |
+ # object_size: The size of the object being received in bytes
|
|
| 515 |
+ #
|
|
| 516 |
+ # Returns:
|
|
| 517 |
+ # int: The total bytes removed on the filesystem
|
|
| 518 |
+ #
|
|
| 519 |
+ def clean_up(self, object_size):
|
|
| 520 |
+ if self.__has_space(object_size):
|
|
| 521 |
+ return 0
|
|
| 522 |
+ |
|
| 523 |
+ with _CacheCleaner.__cleanup_cache_lock:
|
|
| 524 |
+ if self.__has_space(object_size):
|
|
| 525 |
+ # Another thread has done the cleanup for us
|
|
| 526 |
+ return 0
|
|
| 527 |
+ |
|
| 528 |
+ stats = os.statvfs(self.__cas.casdir)
|
|
| 529 |
+ target_disk_space = (stats.f_bavail * stats.f_bsize) - self.__max_head_size
|
|
| 530 |
+ |
|
| 531 |
+ # obtain a list of LRP artifacts
|
|
| 532 |
+ LRP_objects = self.__cas.list_objects()
|
|
| 533 |
+ |
|
| 534 |
+ removed_size = 0 # in bytes
|
|
| 535 |
+ |
|
| 536 |
+ last_mtime = 0
|
|
| 537 |
+ |
|
| 538 |
+ while object_size - removed_size > target_disk_space:
|
|
| 539 |
+ try:
|
|
| 540 |
+ last_mtime, to_remove = LRP_objects.pop(0) # The first element in the list is the LRP artifact
|
|
| 541 |
+ except IndexError:
|
|
| 542 |
+ # This exception is caught if there are no more artifacts in the list
|
|
| 543 |
+ # LRP_artifacts. This means the the artifact is too large for the filesystem
|
|
| 544 |
+ # so we abort the process
|
|
| 545 |
+ raise ArtifactTooLargeException("Artifact of size {} is too large for "
|
|
| 546 |
+ "the filesystem which mounts the remote "
|
|
| 547 |
+ "cache".format(object_size))
|
|
| 548 |
+ |
|
| 549 |
+ try:
|
|
| 550 |
+ size = os.stat(to_remove).st_size
|
|
| 551 |
+ os.unlink(to_remove)
|
|
| 552 |
+ removed_size += size
|
|
| 553 |
+ except FileNotFoundError:
|
|
| 554 |
+ pass
|
|
| 555 |
+ |
|
| 556 |
+ self.__cas.clean_up_refs_until(last_mtime)
|
|
| 557 |
+ |
|
| 558 |
+ if removed_size > 0:
|
|
| 559 |
+ logging.info("Successfully removed {} bytes from the cache".format(removed_size))
|
|
| 560 |
+ else:
|
|
| 561 |
+ logging.info("No artifacts were removed from the cache.")
|
|
| 562 |
+ |
|
| 563 |
+ return removed_size
|
| ... | ... | @@ -23,7 +23,7 @@ |
| 23 | 23 |
# This version is bumped whenever enhancements are made
|
| 24 | 24 |
# to the `project.conf` format or the core element format.
|
| 25 | 25 |
#
|
| 26 |
-BST_FORMAT_VERSION = 17
|
|
| 26 |
+BST_FORMAT_VERSION = 18
|
|
| 27 | 27 |
|
| 28 | 28 |
|
| 29 | 29 |
# The base BuildStream artifact version
|
| ... | ... | @@ -1049,6 +1049,12 @@ class ChainMap(collections.ChainMap): |
| 1049 | 1049 |
for key in clearable:
|
| 1050 | 1050 |
del self[key]
|
| 1051 | 1051 |
|
| 1052 |
+ def get(self, key, default=None):
|
|
| 1053 |
+ try:
|
|
| 1054 |
+ return self[key]
|
|
| 1055 |
+ except KeyError:
|
|
| 1056 |
+ return default
|
|
| 1057 |
+ |
|
| 1052 | 1058 |
|
| 1053 | 1059 |
def node_chain_copy(source):
|
| 1054 | 1060 |
copy = ChainMap({}, source)
|
| 1 |
-# No variables added for the manual element by default, set
|
|
| 2 |
-# this if you plan to use make, and the sources cannot handle
|
|
| 3 |
-# parallelization.
|
|
| 4 |
-#
|
|
| 5 |
-# variables:
|
|
| 6 |
-#
|
|
| 7 |
-# notparallel: True
|
|
| 8 |
- |
|
| 9 | 1 |
# Manual build element does not provide any default
|
| 10 | 2 |
# build commands
|
| 11 | 3 |
config:
|
| ... | ... | @@ -28,14 +20,3 @@ config: |
| 28 | 20 |
strip-commands:
|
| 29 | 21 |
- |
|
| 30 | 22 |
%{strip-binaries}
|
| 31 |
- |
|
| 32 |
-# Use max-jobs CPUs for building and enable verbosity
|
|
| 33 |
-environment:
|
|
| 34 |
- MAKEFLAGS: -j%{max-jobs}
|
|
| 35 |
- V: 1
|
|
| 36 |
- |
|
| 37 |
-# And dont consider MAKEFLAGS or V as something which may
|
|
| 38 |
-# affect build output.
|
|
| 39 |
-environment-nocache:
|
|
| 40 |
-- MAKEFLAGS
|
|
| 41 |
-- V
|
| ... | ... | @@ -253,6 +253,8 @@ def test_artifact_expires(cli, datafiles, tmpdir): |
| 253 | 253 |
assert cli.get_element_state(project, 'element2.bst') == 'cached'
|
| 254 | 254 |
assert_shared(cli, share, project, 'element2.bst')
|
| 255 | 255 |
|
| 256 |
+ share.make_all_objects_older()
|
|
| 257 |
+ |
|
| 256 | 258 |
# Create and build another element of 5 MB (This will exceed the free disk space available)
|
| 257 | 259 |
create_element_size('element3.bst', project, element_path, [], int(5e6))
|
| 258 | 260 |
result = cli.run(project=project, args=['build', 'element3.bst'])
|
| ... | ... | @@ -350,6 +352,7 @@ def test_recently_pulled_artifact_does_not_expire(cli, datafiles, tmpdir): |
| 350 | 352 |
assert cli.get_element_state(project, 'element1.bst') == 'cached'
|
| 351 | 353 |
|
| 352 | 354 |
wait_for_cache_granularity()
|
| 355 |
+ share.make_all_objects_older()
|
|
| 353 | 356 |
|
| 354 | 357 |
# Create and build the element3 (of 5 MB)
|
| 355 | 358 |
create_element_size('element3.bst', project, element_path, [], int(5e6))
|
| ... | ... | @@ -138,6 +138,15 @@ class ArtifactShare(): |
| 138 | 138 |
except ArtifactError:
|
| 139 | 139 |
return False
|
| 140 | 140 |
|
| 141 |
+ def make_all_objects_older(self):
|
|
| 142 |
+ for root, dirs, files in os.walk(os.path.join(self.cas.casdir, 'objects')):
|
|
| 143 |
+ for name in files:
|
|
| 144 |
+ fullname = os.path.join(root, name)
|
|
| 145 |
+ st = os.stat(fullname)
|
|
| 146 |
+ mtime = st.st_mtime - 6 * 60 * 60
|
|
| 147 |
+ atime = st.st_atime - 6 * 60 * 60
|
|
| 148 |
+ os.utime(fullname, times=(atime, mtime))
|
|
| 149 |
+ |
|
| 141 | 150 |
# close():
|
| 142 | 151 |
#
|
| 143 | 152 |
# Remove the artifact share.
|
