Valentin David pushed to branch valentindavid/cache_server_fill_up at BuildStream / buildstream
Commits:
-
516e990e
by ctolentino8 at 2018-10-31T11:36:46Z
-
b8a37a63
by Tristan Van Berkom at 2018-11-01T10:16:25Z
-
b27b592a
by Benjamin Schubert at 2018-11-01T10:49:57Z
-
89ace5d7
by Benjamin Schubert at 2018-11-01T11:16:36Z
-
6e19a26a
by Valentin David at 2018-11-01T11:45:59Z
-
d3db3ddf
by Valentin David at 2018-11-01T11:45:59Z
-
fe367a39
by Valentin David at 2018-11-01T11:45:59Z
-
4e7c9d0a
by Valentin David at 2018-11-01T11:46:22Z
-
040d0597
by Valentin David at 2018-11-01T11:46:22Z
-
029831e4
by Valentin David at 2018-11-01T11:46:22Z
-
18f321e8
by Valentin David at 2018-11-01T11:46:22Z
-
bf4b43d8
by Valentin David at 2018-11-01T11:46:22Z
-
6ec5ad2f
by Valentin David at 2018-11-01T11:46:22Z
-
af86172c
by Valentin David at 2018-11-01T11:46:22Z
-
77a9fd1c
by Valentin David at 2018-11-01T11:46:22Z
7 changed files:
- buildstream/_artifactcache/cascache.py
- buildstream/_artifactcache/casserver.py
- buildstream/_frontend/app.py
- setup.py
- tests/frontend/init.py
- tests/frontend/push.py
- tests/testutils/artifactshare.py
Changes:
... | ... | @@ -27,6 +27,7 @@ import stat |
27 | 27 |
import tempfile
|
28 | 28 |
import uuid
|
29 | 29 |
import errno
|
30 |
+import contextlib
|
|
30 | 31 |
from urllib.parse import urlparse
|
31 | 32 |
|
32 | 33 |
import grpc
|
... | ... | @@ -49,6 +50,13 @@ from . import ArtifactCache |
49 | 50 |
_MAX_PAYLOAD_BYTES = 1024 * 1024
|
50 | 51 |
|
51 | 52 |
|
53 |
+class BlobNotFound(ArtifactError):
|
|
54 |
+ |
|
55 |
+ def __init__(self, blob, msg):
|
|
56 |
+ self.blob = blob
|
|
57 |
+ super().__init__(msg)
|
|
58 |
+ |
|
59 |
+ |
|
52 | 60 |
# A CASCache manages artifacts in a CAS repository as specified in the
|
53 | 61 |
# Remote Execution API.
|
54 | 62 |
#
|
... | ... | @@ -264,6 +272,10 @@ class CASCache(ArtifactCache): |
264 | 272 |
element.info("Remote ({}) does not have {} cached".format(
|
265 | 273 |
remote.spec.url, element._get_brief_display_key()
|
266 | 274 |
))
|
275 |
+ except BlobNotFound as e:
|
|
276 |
+ element.info("Remote ({}) does not have {} cached (blob {} missing)".format(
|
|
277 |
+ remote.spec.url, element._get_brief_display_key(), e.blob
|
|
278 |
+ ))
|
|
267 | 279 |
|
268 | 280 |
return False
|
269 | 281 |
|
... | ... | @@ -452,13 +464,14 @@ class CASCache(ArtifactCache): |
452 | 464 |
# digest (Digest): An optional Digest object to populate
|
453 | 465 |
# path (str): Path to file to add
|
454 | 466 |
# buffer (bytes): Byte buffer to add
|
467 |
+ # link_directly (bool): Whether file given by path can be linked
|
|
455 | 468 |
#
|
456 | 469 |
# Returns:
|
457 | 470 |
# (Digest): The digest of the added object
|
458 | 471 |
#
|
459 | 472 |
# Either `path` or `buffer` must be passed, but not both.
|
460 | 473 |
#
|
461 |
- def add_object(self, *, digest=None, path=None, buffer=None):
|
|
474 |
+ def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False):
|
|
462 | 475 |
# Exactly one of the two parameters has to be specified
|
463 | 476 |
assert (path is None) != (buffer is None)
|
464 | 477 |
|
... | ... | @@ -468,28 +481,34 @@ class CASCache(ArtifactCache): |
468 | 481 |
try:
|
469 | 482 |
h = hashlib.sha256()
|
470 | 483 |
# Always write out new file to avoid corruption if input file is modified
|
471 |
- with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
|
|
472 |
- # Set mode bits to 0644
|
|
473 |
- os.chmod(out.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
|
|
474 |
- |
|
475 |
- if path:
|
|
476 |
- with open(path, 'rb') as f:
|
|
477 |
- for chunk in iter(lambda: f.read(4096), b""):
|
|
478 |
- h.update(chunk)
|
|
479 |
- out.write(chunk)
|
|
484 |
+ with contextlib.ExitStack() as stack:
|
|
485 |
+ if path is not None and link_directly:
|
|
486 |
+ tmp = stack.enter_context(open(path, 'rb'))
|
|
487 |
+ for chunk in iter(lambda: tmp.read(4096), b""):
|
|
488 |
+ h.update(chunk)
|
|
480 | 489 |
else:
|
481 |
- h.update(buffer)
|
|
482 |
- out.write(buffer)
|
|
490 |
+ tmp = stack.enter_context(tempfile.NamedTemporaryFile(dir=self.tmpdir))
|
|
491 |
+ # Set mode bits to 0644
|
|
492 |
+ os.chmod(tmp.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
|
|
483 | 493 |
|
484 |
- out.flush()
|
|
494 |
+ if path:
|
|
495 |
+ with open(path, 'rb') as f:
|
|
496 |
+ for chunk in iter(lambda: f.read(4096), b""):
|
|
497 |
+ h.update(chunk)
|
|
498 |
+ tmp.write(chunk)
|
|
499 |
+ else:
|
|
500 |
+ h.update(buffer)
|
|
501 |
+ tmp.write(buffer)
|
|
502 |
+ |
|
503 |
+ tmp.flush()
|
|
485 | 504 |
|
486 | 505 |
digest.hash = h.hexdigest()
|
487 |
- digest.size_bytes = os.fstat(out.fileno()).st_size
|
|
506 |
+ digest.size_bytes = os.fstat(tmp.fileno()).st_size
|
|
488 | 507 |
|
489 | 508 |
# Place file at final location
|
490 | 509 |
objpath = self.objpath(digest)
|
491 | 510 |
os.makedirs(os.path.dirname(objpath), exist_ok=True)
|
492 |
- os.link(out.name, objpath)
|
|
511 |
+ os.link(tmp.name, objpath)
|
|
493 | 512 |
|
494 | 513 |
except FileExistsError as e:
|
495 | 514 |
# We can ignore the failed link() if the object is already in the repo.
|
... | ... | @@ -549,30 +568,40 @@ class CASCache(ArtifactCache): |
549 | 568 |
def calculate_cache_size(self):
|
550 | 569 |
return utils._get_dir_size(self.casdir)
|
551 | 570 |
|
552 |
- # list_artifacts():
|
|
571 |
+ # list_objects():
|
|
553 | 572 |
#
|
554 |
- # List cached artifacts in Least Recently Modified (LRM) order.
|
|
573 |
+ # List cached objects in Least Recently Modified (LRM) order.
|
|
555 | 574 |
#
|
556 | 575 |
# Returns:
|
557 |
- # (list) - A list of refs in LRM order
|
|
576 |
+ # (list) - A list of objects and timestamps in LRM order
|
|
558 | 577 |
#
|
559 |
- def list_artifacts(self):
|
|
560 |
- # string of: /path/to/repo/refs/heads
|
|
561 |
- ref_heads = os.path.join(self.casdir, 'refs', 'heads')
|
|
562 |
- |
|
563 |
- refs = []
|
|
578 |
+ def list_objects(self):
|
|
579 |
+ objs = []
|
|
564 | 580 |
mtimes = []
|
565 | 581 |
|
566 |
- for root, _, files in os.walk(ref_heads):
|
|
582 |
+ for root, _, files in os.walk(os.path.join(self.casdir, 'objects')):
|
|
567 | 583 |
for filename in files:
|
568 |
- ref_path = os.path.join(root, filename)
|
|
569 |
- refs.append(os.path.relpath(ref_path, ref_heads))
|
|
570 |
- # Obtain the mtime (the time a file was last modified)
|
|
571 |
- mtimes.append(os.path.getmtime(ref_path))
|
|
584 |
+ obj_path = os.path.join(root, filename)
|
|
585 |
+ try:
|
|
586 |
+ mtimes.append(os.path.getmtime(obj_path))
|
|
587 |
+ except FileNotFoundError:
|
|
588 |
+ pass
|
|
589 |
+ else:
|
|
590 |
+ objs.append(obj_path)
|
|
572 | 591 |
|
573 | 592 |
# NOTE: Sorted will sort from earliest to latest, thus the
|
574 | 593 |
# first element of this list will be the file modified earliest.
|
575 |
- return [ref for _, ref in sorted(zip(mtimes, refs))]
|
|
594 |
+ return sorted(zip(mtimes, objs))
|
|
595 |
+ |
|
596 |
+ def clean_up_refs_until(self, time):
|
|
597 |
+ ref_heads = os.path.join(self.casdir, 'refs', 'heads')
|
|
598 |
+ |
|
599 |
+ for root, _, files in os.walk(ref_heads):
|
|
600 |
+ for filename in files:
|
|
601 |
+ ref_path = os.path.join(root, filename)
|
|
602 |
+ # Obtain the mtime (the time a file was last modified)
|
|
603 |
+ if os.path.getmtime(ref_path) < time:
|
|
604 |
+ os.unlink(ref_path)
|
|
576 | 605 |
|
577 | 606 |
# remove():
|
578 | 607 |
#
|
... | ... | @@ -625,7 +654,12 @@ class CASCache(ArtifactCache): |
625 | 654 |
#
|
626 | 655 |
# Prune unreachable objects from the repo.
|
627 | 656 |
#
|
628 |
- def prune(self):
|
|
657 |
+ # Args:
|
|
658 |
+ # keep_after (int|None): timestamp after which unreachable objects
|
|
659 |
+ # are kept. None if no unreachable object
|
|
660 |
+ # should be kept.
|
|
661 |
+ #
|
|
662 |
+ def prune(self, keep_after=None):
|
|
629 | 663 |
ref_heads = os.path.join(self.casdir, 'refs', 'heads')
|
630 | 664 |
|
631 | 665 |
pruned = 0
|
... | ... | @@ -646,11 +680,19 @@ class CASCache(ArtifactCache): |
646 | 680 |
objhash = os.path.basename(root) + filename
|
647 | 681 |
if objhash not in reachable:
|
648 | 682 |
obj_path = os.path.join(root, filename)
|
683 |
+ if keep_after:
|
|
684 |
+ st = os.stat(obj_path)
|
|
685 |
+ if st.st_mtime >= keep_after:
|
|
686 |
+ continue
|
|
649 | 687 |
pruned += os.stat(obj_path).st_size
|
650 | 688 |
os.unlink(obj_path)
|
651 | 689 |
|
652 | 690 |
return pruned
|
653 | 691 |
|
692 |
+ def update_tree_mtime(self, tree):
|
|
693 |
+ reachable = set()
|
|
694 |
+ self._reachable_refs_dir(reachable, tree, update_mtime=True)
|
|
695 |
+ |
|
654 | 696 |
################################################
|
655 | 697 |
# Local Private Methods #
|
656 | 698 |
################################################
|
... | ... | @@ -795,7 +837,7 @@ class CASCache(ArtifactCache): |
795 | 837 |
a += 1
|
796 | 838 |
b += 1
|
797 | 839 |
|
798 |
- def _reachable_refs_dir(self, reachable, tree):
|
|
840 |
+ def _reachable_refs_dir(self, reachable, tree, update_mtime=False):
|
|
799 | 841 |
if tree.hash in reachable:
|
800 | 842 |
return
|
801 | 843 |
|
... | ... | @@ -807,10 +849,14 @@ class CASCache(ArtifactCache): |
807 | 849 |
directory.ParseFromString(f.read())
|
808 | 850 |
|
809 | 851 |
for filenode in directory.files:
|
852 |
+ if update_mtime:
|
|
853 |
+ os.utime(self.objpath(filenode.digest))
|
|
810 | 854 |
reachable.add(filenode.digest.hash)
|
811 | 855 |
|
812 | 856 |
for dirnode in directory.directories:
|
813 |
- self._reachable_refs_dir(reachable, dirnode.digest)
|
|
857 |
+ if update_mtime:
|
|
858 |
+ os.utime(self.objpath(dirnode.digest))
|
|
859 |
+ self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime)
|
|
814 | 860 |
|
815 | 861 |
def _initialize_remote(self, remote_spec, q):
|
816 | 862 |
try:
|
... | ... | @@ -887,7 +933,7 @@ class CASCache(ArtifactCache): |
887 | 933 |
with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
|
888 | 934 |
self._fetch_blob(remote, digest, f)
|
889 | 935 |
|
890 |
- added_digest = self.add_object(path=f.name)
|
|
936 |
+ added_digest = self.add_object(path=f.name, link_directly=True)
|
|
891 | 937 |
assert added_digest.hash == digest.hash
|
892 | 938 |
|
893 | 939 |
return objpath
|
... | ... | @@ -898,7 +944,7 @@ class CASCache(ArtifactCache): |
898 | 944 |
f.write(data)
|
899 | 945 |
f.flush()
|
900 | 946 |
|
901 |
- added_digest = self.add_object(path=f.name)
|
|
947 |
+ added_digest = self.add_object(path=f.name, link_directly=True)
|
|
902 | 948 |
assert added_digest.hash == digest.hash
|
903 | 949 |
|
904 | 950 |
# Helper function for _fetch_directory().
|
... | ... | @@ -1202,6 +1248,9 @@ class _CASBatchRead(): |
1202 | 1248 |
batch_response = self._remote.cas.BatchReadBlobs(self._request)
|
1203 | 1249 |
|
1204 | 1250 |
for response in batch_response.responses:
|
1251 |
+ if response.status.code == code_pb2.NOT_FOUND:
|
|
1252 |
+ raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format(
|
|
1253 |
+ response.digest.hash, response.status.code))
|
|
1205 | 1254 |
if response.status.code != code_pb2.OK:
|
1206 | 1255 |
raise ArtifactError("Failed to download blob {}: {}".format(
|
1207 | 1256 |
response.digest.hash, response.status.code))
|
... | ... | @@ -24,6 +24,10 @@ import signal |
24 | 24 |
import sys
|
25 | 25 |
import tempfile
|
26 | 26 |
import uuid
|
27 |
+import time
|
|
28 |
+import errno
|
|
29 |
+import ctypes
|
|
30 |
+import threading
|
|
27 | 31 |
|
28 | 32 |
import click
|
29 | 33 |
import grpc
|
... | ... | @@ -31,6 +35,7 @@ import grpc |
31 | 35 |
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
32 | 36 |
from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
|
33 | 37 |
from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
|
38 |
+from .._protos.google.rpc import code_pb2
|
|
34 | 39 |
|
35 | 40 |
from .._exceptions import ArtifactError
|
36 | 41 |
from .._context import Context
|
... | ... | @@ -40,6 +45,10 @@ from .._context import Context |
40 | 45 |
# Limit payload to 1 MiB to leave sufficient headroom for metadata.
|
41 | 46 |
_MAX_PAYLOAD_BYTES = 1024 * 1024
|
42 | 47 |
|
48 |
+# The minimum age in seconds for objects before they can be cleaned
|
|
49 |
+# up.
|
|
50 |
+_OBJECT_MIN_AGE = 6 * 60 * 60
|
|
51 |
+ |
|
43 | 52 |
|
44 | 53 |
# Trying to push an artifact that is too large
|
45 | 54 |
class ArtifactTooLargeException(Exception):
|
... | ... | @@ -54,7 +63,7 @@ class ArtifactTooLargeException(Exception): |
54 | 63 |
# repo (str): Path to CAS repository
|
55 | 64 |
# enable_push (bool): Whether to allow blob uploads and artifact updates
|
56 | 65 |
#
|
57 |
-def create_server(repo, *, enable_push):
|
|
66 |
+def create_server(repo, max_head_size, min_head_size, *, enable_push):
|
|
58 | 67 |
context = Context()
|
59 | 68 |
context.artifactdir = os.path.abspath(repo)
|
60 | 69 |
|
... | ... | @@ -64,11 +73,13 @@ def create_server(repo, *, enable_push): |
64 | 73 |
max_workers = (os.cpu_count() or 1) * 5
|
65 | 74 |
server = grpc.server(futures.ThreadPoolExecutor(max_workers))
|
66 | 75 |
|
76 |
+ cache_cleaner = _CacheCleaner(artifactcache, max_head_size, min_head_size)
|
|
77 |
+ |
|
67 | 78 |
bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
|
68 |
- _ByteStreamServicer(artifactcache, enable_push=enable_push), server)
|
|
79 |
+ _ByteStreamServicer(artifactcache, cache_cleaner, enable_push=enable_push), server)
|
|
69 | 80 |
|
70 | 81 |
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
|
71 |
- _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
|
|
82 |
+ _ContentAddressableStorageServicer(artifactcache, cache_cleaner, enable_push=enable_push), server)
|
|
72 | 83 |
|
73 | 84 |
remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
|
74 | 85 |
_CapabilitiesServicer(), server)
|
... | ... | @@ -86,9 +97,16 @@ def create_server(repo, *, enable_push): |
86 | 97 |
@click.option('--client-certs', help="Public client certificates for TLS (PEM-encoded)")
|
87 | 98 |
@click.option('--enable-push', default=False, is_flag=True,
|
88 | 99 |
help="Allow clients to upload blobs and update artifact cache")
|
100 |
+@click.option('--head-room-min', type=click.INT,
|
|
101 |
+ help="Disk head room minimum in bytes",
|
|
102 |
+ default=2e9)
|
|
103 |
+@click.option('--head-room-max', type=click.INT,
|
|
104 |
+ help="Disk head room maximum in bytes",
|
|
105 |
+ default=10e9)
|
|
89 | 106 |
@click.argument('repo')
|
90 |
-def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
|
|
91 |
- server = create_server(repo, enable_push=enable_push)
|
|
107 |
+def server_main(repo, port, server_key, server_cert, client_certs, enable_push,
|
|
108 |
+ head_room_min, head_room_max):
|
|
109 |
+ server = create_server(repo, head_room_max, head_room_min, enable_push=enable_push)
|
|
92 | 110 |
|
93 | 111 |
use_tls = bool(server_key)
|
94 | 112 |
|
... | ... | @@ -129,11 +147,43 @@ def server_main(repo, port, server_key, server_cert, client_certs, enable_push): |
129 | 147 |
server.stop(0)
|
130 | 148 |
|
131 | 149 |
|
150 |
+class _FallocateCall:
|
|
151 |
+ |
|
152 |
+ FALLOC_FL_KEEP_SIZE = 1
|
|
153 |
+ FALLOC_FL_PUNCH_HOLE = 2
|
|
154 |
+ FALLOC_FL_NO_HIDE_STALE = 4
|
|
155 |
+ FALLOC_FL_COLLAPSE_RANGE = 8
|
|
156 |
+ FALLOC_FL_ZERO_RANGE = 16
|
|
157 |
+ FALLOC_FL_INSERT_RANGE = 32
|
|
158 |
+ FALLOC_FL_UNSHARE_RANGE = 64
|
|
159 |
+ |
|
160 |
+ def __init__(self):
|
|
161 |
+ self.libc = ctypes.CDLL("libc.so.6", use_errno=True)
|
|
162 |
+ try:
|
|
163 |
+ self.fallocate64 = self.libc.fallocate64
|
|
164 |
+ except AttributeError:
|
|
165 |
+ self.fallocate = self.libc.fallocate
|
|
166 |
+ |
|
167 |
+ def __call__(self, fd, mode, offset, length):
|
|
168 |
+ if hasattr(self, 'fallocate64'):
|
|
169 |
+ ret = self.fallocate64(ctypes.c_int(fd), ctypes.c_int(mode),
|
|
170 |
+ ctypes.c_int64(offset), ctypes.c_int64(length))
|
|
171 |
+ else:
|
|
172 |
+ ret = self.fallocate(ctypes.c_int(fd), ctypes.c_int(mode),
|
|
173 |
+ ctypes.c_int(offset), ctypes.c_int(length))
|
|
174 |
+ if ret == -1:
|
|
175 |
+ errno = ctypes.get_errno()
|
|
176 |
+ raise OSError(errno, os.strerror(errno))
|
|
177 |
+ return ret
|
|
178 |
+ |
|
179 |
+ |
|
132 | 180 |
class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
|
133 |
- def __init__(self, cas, *, enable_push):
|
|
181 |
+ def __init__(self, cas, cache_cleaner, *, enable_push):
|
|
134 | 182 |
super().__init__()
|
135 | 183 |
self.cas = cas
|
136 | 184 |
self.enable_push = enable_push
|
185 |
+ self.fallocate = _FallocateCall()
|
|
186 |
+ self.cache_cleaner = cache_cleaner
|
|
137 | 187 |
|
138 | 188 |
def Read(self, request, context):
|
139 | 189 |
resource_name = request.resource_name
|
... | ... | @@ -191,25 +241,44 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
191 | 241 |
context.set_code(grpc.StatusCode.NOT_FOUND)
|
192 | 242 |
return response
|
193 | 243 |
|
194 |
- try:
|
|
195 |
- _clean_up_cache(self.cas, client_digest.size_bytes)
|
|
196 |
- except ArtifactTooLargeException as e:
|
|
197 |
- context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
|
|
198 |
- context.set_details(str(e))
|
|
199 |
- return response
|
|
244 |
+ while True:
|
|
245 |
+ if client_digest.size_bytes == 0:
|
|
246 |
+ break
|
|
247 |
+ try:
|
|
248 |
+ self.cache_cleaner.clean_up(client_digest.size_bytes)
|
|
249 |
+ except ArtifactTooLargeException as e:
|
|
250 |
+ context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
|
|
251 |
+ context.set_details(str(e))
|
|
252 |
+ return response
|
|
253 |
+ |
|
254 |
+ try:
|
|
255 |
+ self.fallocate(out.fileno(), 0, 0, client_digest.size_bytes)
|
|
256 |
+ break
|
|
257 |
+ except OSError as e:
|
|
258 |
+ # Multiple upload can happen in the same time
|
|
259 |
+ if e.errno != errno.ENOSPC:
|
|
260 |
+ raise
|
|
261 |
+ |
|
200 | 262 |
elif request.resource_name:
|
201 | 263 |
# If it is set on subsequent calls, it **must** match the value of the first request.
|
202 | 264 |
if request.resource_name != resource_name:
|
203 | 265 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
204 | 266 |
return response
|
267 |
+ |
|
268 |
+ if (offset + len(request.data)) > client_digest.size_bytes:
|
|
269 |
+ context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
|
270 |
+ return response
|
|
271 |
+ |
|
205 | 272 |
out.write(request.data)
|
273 |
+ |
|
206 | 274 |
offset += len(request.data)
|
275 |
+ |
|
207 | 276 |
if request.finish_write:
|
208 | 277 |
if client_digest.size_bytes != offset:
|
209 | 278 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
210 | 279 |
return response
|
211 | 280 |
out.flush()
|
212 |
- digest = self.cas.add_object(path=out.name)
|
|
281 |
+ digest = self.cas.add_object(path=out.name, link_directly=True)
|
|
213 | 282 |
if digest.hash != client_digest.hash:
|
214 | 283 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
215 | 284 |
return response
|
... | ... | @@ -222,18 +291,26 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
222 | 291 |
|
223 | 292 |
|
224 | 293 |
class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
|
225 |
- def __init__(self, cas, *, enable_push):
|
|
294 |
+ def __init__(self, cas, cache_cleaner, *, enable_push):
|
|
226 | 295 |
super().__init__()
|
227 | 296 |
self.cas = cas
|
228 | 297 |
self.enable_push = enable_push
|
298 |
+ self.cache_cleaner = cache_cleaner
|
|
229 | 299 |
|
230 | 300 |
def FindMissingBlobs(self, request, context):
|
231 | 301 |
response = remote_execution_pb2.FindMissingBlobsResponse()
|
232 | 302 |
for digest in request.blob_digests:
|
233 |
- if not _has_object(self.cas, digest):
|
|
234 |
- d = response.missing_blob_digests.add()
|
|
235 |
- d.hash = digest.hash
|
|
236 |
- d.size_bytes = digest.size_bytes
|
|
303 |
+ objpath = self.cas.objpath(digest)
|
|
304 |
+ try:
|
|
305 |
+ os.utime(objpath)
|
|
306 |
+ except OSError as e:
|
|
307 |
+ if e.errno != errno.ENOENT:
|
|
308 |
+ raise
|
|
309 |
+ else:
|
|
310 |
+ d = response.missing_blob_digests.add()
|
|
311 |
+ d.hash = digest.hash
|
|
312 |
+ d.size_bytes = digest.size_bytes
|
|
313 |
+ |
|
237 | 314 |
return response
|
238 | 315 |
|
239 | 316 |
def BatchReadBlobs(self, request, context):
|
... | ... | @@ -252,12 +329,12 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres |
252 | 329 |
try:
|
253 | 330 |
with open(self.cas.objpath(digest), 'rb') as f:
|
254 | 331 |
if os.fstat(f.fileno()).st_size != digest.size_bytes:
|
255 |
- blob_response.status.code = grpc.StatusCode.NOT_FOUND
|
|
332 |
+ blob_response.status.code = code_pb2.NOT_FOUND
|
|
256 | 333 |
continue
|
257 | 334 |
|
258 | 335 |
blob_response.data = f.read(digest.size_bytes)
|
259 | 336 |
except FileNotFoundError:
|
260 |
- blob_response.status.code = grpc.StatusCode.NOT_FOUND
|
|
337 |
+ blob_response.status.code = code_pb2.NOT_FOUND
|
|
261 | 338 |
|
262 | 339 |
return response
|
263 | 340 |
|
... | ... | @@ -287,7 +364,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres |
287 | 364 |
continue
|
288 | 365 |
|
289 | 366 |
try:
|
290 |
- _clean_up_cache(self.cas, digest.size_bytes)
|
|
367 |
+ self.cache_cleaner.clean_up(digest.size_bytes)
|
|
291 | 368 |
|
292 | 369 |
with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
|
293 | 370 |
out.write(blob_request.data)
|
... | ... | @@ -330,6 +407,12 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): |
330 | 407 |
|
331 | 408 |
try:
|
332 | 409 |
tree = self.cas.resolve_ref(request.key, update_mtime=True)
|
410 |
+ try:
|
|
411 |
+ self.cas.update_tree_mtime(tree)
|
|
412 |
+ except FileNotFoundError:
|
|
413 |
+ self.cas.remove(request.key, defer_prune=True)
|
|
414 |
+ context.set_code(grpc.StatusCode.NOT_FOUND)
|
|
415 |
+ return response
|
|
333 | 416 |
|
334 | 417 |
response.digest.hash = tree.hash
|
335 | 418 |
response.digest.size_bytes = tree.size_bytes
|
... | ... | @@ -402,60 +485,80 @@ def _digest_from_upload_resource_name(resource_name): |
402 | 485 |
return None
|
403 | 486 |
|
404 | 487 |
|
405 |
-def _has_object(cas, digest):
|
|
406 |
- objpath = cas.objpath(digest)
|
|
407 |
- return os.path.exists(objpath)
|
|
488 |
+class _CacheCleaner:
|
|
408 | 489 |
|
490 |
+ __cleanup_cache_lock = threading.Lock()
|
|
409 | 491 |
|
410 |
-# _clean_up_cache()
|
|
411 |
-#
|
|
412 |
-# Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
|
|
413 |
-# is enough space for the incoming artifact
|
|
414 |
-#
|
|
415 |
-# Args:
|
|
416 |
-# cas: CASCache object
|
|
417 |
-# object_size: The size of the object being received in bytes
|
|
418 |
-#
|
|
419 |
-# Returns:
|
|
420 |
-# int: The total bytes removed on the filesystem
|
|
421 |
-#
|
|
422 |
-def _clean_up_cache(cas, object_size):
|
|
423 |
- # Determine the available disk space, in bytes, of the file system
|
|
424 |
- # which mounts the repo
|
|
425 |
- stats = os.statvfs(cas.casdir)
|
|
426 |
- buffer_ = int(2e9) # Add a 2 GB buffer
|
|
427 |
- free_disk_space = (stats.f_bfree * stats.f_bsize) - buffer_
|
|
428 |
- total_disk_space = (stats.f_blocks * stats.f_bsize) - buffer_
|
|
429 |
- |
|
430 |
- if object_size > total_disk_space:
|
|
431 |
- raise ArtifactTooLargeException("Artifact of size: {} is too large for "
|
|
432 |
- "the filesystem which mounts the remote "
|
|
433 |
- "cache".format(object_size))
|
|
434 |
- |
|
435 |
- if object_size <= free_disk_space:
|
|
436 |
- # No need to clean up
|
|
437 |
- return 0
|
|
438 |
- |
|
439 |
- # obtain a list of LRP artifacts
|
|
440 |
- LRP_artifacts = cas.list_artifacts()
|
|
441 |
- |
|
442 |
- removed_size = 0 # in bytes
|
|
443 |
- while object_size - removed_size > free_disk_space:
|
|
444 |
- try:
|
|
445 |
- to_remove = LRP_artifacts.pop(0) # The first element in the list is the LRP artifact
|
|
446 |
- except IndexError:
|
|
447 |
- # This exception is caught if there are no more artifacts in the list
|
|
448 |
- # LRP_artifacts. This means the the artifact is too large for the filesystem
|
|
449 |
- # so we abort the process
|
|
450 |
- raise ArtifactTooLargeException("Artifact of size {} is too large for "
|
|
451 |
- "the filesystem which mounts the remote "
|
|
452 |
- "cache".format(object_size))
|
|
492 |
+ def __init__(self, cas, max_head_size, min_head_size = int(2e9)):
|
|
493 |
+ self.__cas = cas
|
|
494 |
+ self.__max_head_size = max_head_size
|
|
495 |
+ self.__min_head_size = min_head_size
|
|
453 | 496 |
|
454 |
- removed_size += cas.remove(to_remove, defer_prune=False)
|
|
497 |
+ def __has_space(self, object_size):
|
|
498 |
+ stats = os.statvfs(self.__cas.casdir)
|
|
499 |
+ free_disk_space = (stats.f_bavail * stats.f_bsize) - self.__min_head_size
|
|
500 |
+ total_disk_space = (stats.f_blocks * stats.f_bsize) - self.__min_head_size
|
|
455 | 501 |
|
456 |
- if removed_size > 0:
|
|
457 |
- logging.info("Successfully removed {} bytes from the cache".format(removed_size))
|
|
458 |
- else:
|
|
459 |
- logging.info("No artifacts were removed from the cache.")
|
|
502 |
+ if object_size > total_disk_space:
|
|
503 |
+ raise ArtifactTooLargeException("Artifact of size: {} is too large for "
|
|
504 |
+ "the filesystem which mounts the remote "
|
|
505 |
+ "cache".format(object_size))
|
|
460 | 506 |
|
461 |
- return removed_size
|
|
507 |
+ return object_size <= free_disk_space
|
|
508 |
+ |
|
509 |
+ # _clean_up_cache()
|
|
510 |
+ #
|
|
511 |
+ # Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
|
|
512 |
+ # is enough space for the incoming artifact
|
|
513 |
+ #
|
|
514 |
+ # Args:
|
|
515 |
+ # object_size: The size of the object being received in bytes
|
|
516 |
+ #
|
|
517 |
+ # Returns:
|
|
518 |
+ # int: The total bytes removed on the filesystem
|
|
519 |
+ #
|
|
520 |
+ def clean_up(self, object_size):
|
|
521 |
+ if self.__has_space(object_size):
|
|
522 |
+ return 0
|
|
523 |
+ |
|
524 |
+ with _CacheCleaner.__cleanup_cache_lock:
|
|
525 |
+ if self.__has_space(object_size):
|
|
526 |
+ # Another thread has done the cleanup for us
|
|
527 |
+ return 0
|
|
528 |
+ |
|
529 |
+ stats = os.statvfs(self.__cas.casdir)
|
|
530 |
+ target_disk_space = (stats.f_bavail * stats.f_bsize) - self.__max_head_size
|
|
531 |
+ |
|
532 |
+ # obtain a list of LRP artifacts
|
|
533 |
+ LRP_objects = self.__cas.list_objects()
|
|
534 |
+ |
|
535 |
+ removed_size = 0 # in bytes
|
|
536 |
+ |
|
537 |
+ last_mtime = 0
|
|
538 |
+ |
|
539 |
+ while object_size - removed_size > target_disk_space:
|
|
540 |
+ try:
|
|
541 |
+ last_mtime, to_remove = LRP_objects.pop(0) # The first element in the list is the LRP artifact
|
|
542 |
+ except IndexError:
|
|
543 |
+ # This exception is caught if there are no more artifacts in the list
|
|
544 |
+ # LRP_artifacts. This means the the artifact is too large for the filesystem
|
|
545 |
+ # so we abort the process
|
|
546 |
+ raise ArtifactTooLargeException("Artifact of size {} is too large for "
|
|
547 |
+ "the filesystem which mounts the remote "
|
|
548 |
+ "cache".format(object_size))
|
|
549 |
+ |
|
550 |
+ try:
|
|
551 |
+ size = os.stat(to_remove).st_size
|
|
552 |
+ os.unlink(to_remove)
|
|
553 |
+ removed_size += size
|
|
554 |
+ except FileNotFoundError:
|
|
555 |
+ pass
|
|
556 |
+ |
|
557 |
+ self.__cas.clean_up_refs_until(last_mtime)
|
|
558 |
+ |
|
559 |
+ if removed_size > 0:
|
|
560 |
+ logging.info("Successfully removed {} bytes from the cache".format(removed_size))
|
|
561 |
+ else:
|
|
562 |
+ logging.info("No artifacts were removed from the cache.")
|
|
563 |
+ |
|
564 |
+ return removed_size
|
... | ... | @@ -305,7 +305,6 @@ class App(): |
305 | 305 |
directory = self._main_options['directory']
|
306 | 306 |
directory = os.path.abspath(directory)
|
307 | 307 |
project_path = os.path.join(directory, 'project.conf')
|
308 |
- elements_path = os.path.join(directory, element_path)
|
|
309 | 308 |
|
310 | 309 |
try:
|
311 | 310 |
# Abort if the project.conf already exists, unless `--force` was specified in `bst init`
|
... | ... | @@ -335,6 +334,7 @@ class App(): |
335 | 334 |
raise AppError("Error creating project directory {}: {}".format(directory, e)) from e
|
336 | 335 |
|
337 | 336 |
# Create the elements sub-directory if it doesnt exist
|
337 |
+ elements_path = os.path.join(directory, element_path)
|
|
338 | 338 |
try:
|
339 | 339 |
os.makedirs(elements_path, exist_ok=True)
|
340 | 340 |
except IOError as e:
|
... | ... | @@ -39,6 +39,7 @@ if sys.version_info[0] != REQUIRED_PYTHON_MAJOR or sys.version_info[1] < REQUIRE |
39 | 39 |
try:
|
40 | 40 |
from setuptools import setup, find_packages, Command
|
41 | 41 |
from setuptools.command.easy_install import ScriptWriter
|
42 |
+ from setuptools.command.test import test as TestCommand
|
|
42 | 43 |
except ImportError:
|
43 | 44 |
print("BuildStream requires setuptools in order to build. Install it using"
|
44 | 45 |
" your package manager (usually python3-setuptools) or via pip (pip3"
|
... | ... | @@ -219,9 +220,48 @@ class BuildGRPC(Command): |
219 | 220 |
f.write(code)
|
220 | 221 |
|
221 | 222 |
|
223 |
+#####################################################
|
|
224 |
+# Pytest command #
|
|
225 |
+#####################################################
|
|
226 |
+class PyTest(TestCommand):
|
|
227 |
+ """Defines a pytest command class to run tests from setup.py"""
|
|
228 |
+ |
|
229 |
+ user_options = TestCommand.user_options + [
|
|
230 |
+ ("addopts=", None, "Arguments to pass to pytest"),
|
|
231 |
+ ('index-url=''build_grpc': BuildGRPC,
|
|
264 |
+ 'pytest': PyTest,
|
|
225 | 265 |
}
|
226 | 266 |
cmdclass.update(versioneer.get_cmdclass())
|
227 | 267 |
return cmdclass
|
... | ... | @@ -305,6 +345,5 @@ setup(name='BuildStream', |
305 | 345 |
'grpcio >= 1.10',
|
306 | 346 |
],
|
307 | 347 |
entry_points=bst_install_entry_points,
|
308 |
- setup_requires=['pytest-runner'],
|
|
309 | 348 |
tests_require=dev_requires,
|
310 | 349 |
zip_safe=False)
|
... | ... | @@ -3,6 +3,7 @@ import pytest |
3 | 3 |
from tests.testutils import cli
|
4 | 4 |
|
5 | 5 |
from buildstream import _yaml
|
6 |
+from buildstream._frontend.app import App
|
|
6 | 7 |
from buildstream._exceptions import ErrorDomain, LoadErrorReason
|
7 | 8 |
from buildstream._versions import BST_FORMAT_VERSION
|
8 | 9 |
|
... | ... | @@ -98,3 +99,34 @@ def test_bad_element_path(cli, tmpdir, element_path): |
98 | 99 |
'init', '--project-name', 'foo', '--element-path', element_path
|
99 | 100 |
])
|
100 | 101 |
result.assert_main_error(ErrorDomain.APP, 'invalid-element-path')
|
102 |
+ |
|
103 |
+ |
|
104 |
+@pytest.mark.parametrize("element_path", [('foo'), ('foo/bar')])
|
|
105 |
+def test_element_path_interactive(cli, tmp_path, monkeypatch, element_path):
|
|
106 |
+ project = tmp_path
|
|
107 |
+ project_conf_path = project.joinpath('project.conf')
|
|
108 |
+ |
|
109 |
+ class DummyInteractiveApp(App):
|
|
110 |
+ def __init__(self, *args, **kwargs):
|
|
111 |
+ super().__init__(*args, **kwargs)
|
|
112 |
+ self.interactive = True
|
|
113 |
+ |
|
114 |
+ @classmethod
|
|
115 |
+ def create(cls, *args, **kwargs):
|
|
116 |
+ return DummyInteractiveApp(*args, **kwargs)
|
|
117 |
+ |
|
118 |
+ def _init_project_interactive(self, *args, **kwargs):
|
|
119 |
+ return ('project_name', '0', element_path)
|
|
120 |
+ |
|
121 |
+ monkeypatch.setattr(App, 'create', DummyInteractiveApp.create)
|
|
122 |
+ |
|
123 |
+ result = cli.run(project=str(project), args=['init'])
|
|
124 |
+ result.assert_success()
|
|
125 |
+ |
|
126 |
+ full_element_path = project.joinpath(element_path)
|
|
127 |
+ assert full_element_path.exists()
|
|
128 |
+ |
|
129 |
+ project_conf = _yaml.load(str(project_conf_path))
|
|
130 |
+ assert project_conf['name'] == 'project_name'
|
|
131 |
+ assert project_conf['format-version'] == '0'
|
|
132 |
+ assert project_conf['element-path'] == element_path
|
... | ... | @@ -253,6 +253,8 @@ def test_artifact_expires(cli, datafiles, tmpdir): |
253 | 253 |
assert cli.get_element_state(project, 'element2.bst') == 'cached'
|
254 | 254 |
assert_shared(cli, share, project, 'element2.bst')
|
255 | 255 |
|
256 |
+ share.make_all_objects_older()
|
|
257 |
+ |
|
256 | 258 |
# Create and build another element of 5 MB (This will exceed the free disk space available)
|
257 | 259 |
create_element_size('element3.bst', project, element_path, [], int(5e6))
|
258 | 260 |
result = cli.run(project=project, args=['build', 'element3.bst'])
|
... | ... | @@ -350,6 +352,7 @@ def test_recently_pulled_artifact_does_not_expire(cli, datafiles, tmpdir): |
350 | 352 |
assert cli.get_element_state(project, 'element1.bst') == 'cached'
|
351 | 353 |
|
352 | 354 |
wait_for_cache_granularity()
|
355 |
+ share.make_all_objects_older()
|
|
353 | 356 |
|
354 | 357 |
# Create and build the element3 (of 5 MB)
|
355 | 358 |
create_element_size('element3.bst', project, element_path, [], int(5e6))
|
... | ... | @@ -138,6 +138,15 @@ class ArtifactShare(): |
138 | 138 |
except ArtifactError:
|
139 | 139 |
return False
|
140 | 140 |
|
141 |
+ def make_all_objects_older(self):
|
|
142 |
+ for root, dirs, files in os.walk(os.path.join(self.cas.casdir, 'objects')):
|
|
143 |
+ for name in files:
|
|
144 |
+ fullname = os.path.join(root, name)
|
|
145 |
+ st = os.stat(fullname)
|
|
146 |
+ mtime = st.st_mtime - 6 * 60 * 60
|
|
147 |
+ atime = st.st_atime - 6 * 60 * 60
|
|
148 |
+ os.utime(fullname, times=(atime, mtime))
|
|
149 |
+ |
|
141 | 150 |
# close():
|
142 | 151 |
#
|
143 | 152 |
# Remove the artifact share.
|