Tom Pollard pushed to branch tpollard/774 at BuildStream / buildstream
Commits:
-
26cdee08
by Valentin David at 2018-11-28T14:29:52Z
-
58ca298f
by Valentin David at 2018-11-28T14:29:52Z
-
227fa26d
by Valentin David at 2018-11-28T14:29:52Z
-
5ef19a0b
by Valentin David at 2018-11-28T14:29:52Z
-
8d2946ff
by Valentin David at 2018-11-28T14:29:52Z
-
b587953a
by Valentin David at 2018-11-28T14:29:52Z
-
a64f667d
by Valentin David at 2018-11-28T14:29:52Z
-
353b90dd
by Valentin David at 2018-11-28T14:29:52Z
-
ba9afa98
by Valentin David at 2018-11-28T14:29:52Z
-
9a458402
by Valentin David at 2018-11-29T08:47:40Z
-
47b7a9ba
by Jim MacArthur at 2018-11-29T09:58:48Z
-
4a8d0565
by Jim MacArthur at 2018-11-29T10:42:31Z
-
2e78e0d1
by Valentin David at 2018-11-29T13:18:17Z
-
a6144100
by Valentin David at 2018-11-29T13:18:17Z
-
f6c184f5
by Valentin David at 2018-11-29T16:15:56Z
-
4967e0d2
by Tom Pollard at 2018-11-29T17:06:26Z
18 changed files:
- .gitlab-ci.yml
- buildstream/_artifactcache/cascache.py
- buildstream/_artifactcache/casserver.py
- buildstream/_frontend/cli.py
- buildstream/_stream.py
- buildstream/element.py
- buildstream/plugins/sources/_downloadablefilesource.py
- buildstream/sandbox/_sandboxremote.py
- dev-requirements.txt
- tests/frontend/push.py
- tests/integration/pullbuildtrees.py
- tests/sources/remote.py
- tests/sources/tar.py
- tests/sources/zip.py
- tests/testutils/artifactshare.py
- + tests/testutils/file_server.py
- + tests/testutils/ftp_server.py
- + tests/testutils/http_server.py
Changes:
| 1 |
-image: buildstream/testsuite-debian:9-master-119-552f5fc6
|
|
| 1 |
+image: buildstream/testsuite-debian:9-master-123-7ce6581b
|
|
| 2 | 2 |
|
| 3 | 3 |
cache:
|
| 4 | 4 |
key: "$CI_JOB_NAME-"
|
| ... | ... | @@ -140,7 +140,7 @@ tests-unix: |
| 140 | 140 |
|
| 141 | 141 |
tests-fedora-missing-deps:
|
| 142 | 142 |
# Ensure that tests behave nicely while missing bwrap and ostree
|
| 143 |
- image: buildstream/testsuite-fedora:28-master-119-552f5fc6
|
|
| 143 |
+ image: buildstream/testsuite-fedora:28-master-123-7ce6581b
|
|
| 144 | 144 |
<<: *tests
|
| 145 | 145 |
|
| 146 | 146 |
script:
|
| ... | ... | @@ -25,6 +25,7 @@ import os |
| 25 | 25 |
import stat
|
| 26 | 26 |
import tempfile
|
| 27 | 27 |
import uuid
|
| 28 |
+import contextlib
|
|
| 28 | 29 |
from urllib.parse import urlparse
|
| 29 | 30 |
|
| 30 | 31 |
import grpc
|
| ... | ... | @@ -88,6 +89,13 @@ class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key |
| 88 | 89 |
CASRemoteSpec.__new__.__defaults__ = (None, None, None)
|
| 89 | 90 |
|
| 90 | 91 |
|
| 92 |
+class BlobNotFound(CASError):
|
|
| 93 |
+ |
|
| 94 |
+ def __init__(self, blob, msg):
|
|
| 95 |
+ self.blob = blob
|
|
| 96 |
+ super().__init__(msg)
|
|
| 97 |
+ |
|
| 98 |
+ |
|
| 91 | 99 |
# A CASCache manages a CAS repository as specified in the Remote Execution API.
|
| 92 | 100 |
#
|
| 93 | 101 |
# Args:
|
| ... | ... | @@ -299,6 +307,8 @@ class CASCache(): |
| 299 | 307 |
raise CASError("Failed to pull ref {}: {}".format(ref, e)) from e
|
| 300 | 308 |
else:
|
| 301 | 309 |
return False
|
| 310 |
+ except BlobNotFound as e:
|
|
| 311 |
+ return False
|
|
| 302 | 312 |
|
| 303 | 313 |
# pull_tree():
|
| 304 | 314 |
#
|
| ... | ... | @@ -471,13 +481,14 @@ class CASCache(): |
| 471 | 481 |
# digest (Digest): An optional Digest object to populate
|
| 472 | 482 |
# path (str): Path to file to add
|
| 473 | 483 |
# buffer (bytes): Byte buffer to add
|
| 484 |
+ # link_directly (bool): Whether file given by path can be linked
|
|
| 474 | 485 |
#
|
| 475 | 486 |
# Returns:
|
| 476 | 487 |
# (Digest): The digest of the added object
|
| 477 | 488 |
#
|
| 478 | 489 |
# Either `path` or `buffer` must be passed, but not both.
|
| 479 | 490 |
#
|
| 480 |
- def add_object(self, *, digest=None, path=None, buffer=None):
|
|
| 491 |
+ def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False):
|
|
| 481 | 492 |
# Exactly one of the two parameters has to be specified
|
| 482 | 493 |
assert (path is None) != (buffer is None)
|
| 483 | 494 |
|
| ... | ... | @@ -487,28 +498,34 @@ class CASCache(): |
| 487 | 498 |
try:
|
| 488 | 499 |
h = hashlib.sha256()
|
| 489 | 500 |
# Always write out new file to avoid corruption if input file is modified
|
| 490 |
- with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
|
|
| 491 |
- # Set mode bits to 0644
|
|
| 492 |
- os.chmod(out.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
|
|
| 493 |
- |
|
| 494 |
- if path:
|
|
| 495 |
- with open(path, 'rb') as f:
|
|
| 496 |
- for chunk in iter(lambda: f.read(4096), b""):
|
|
| 497 |
- h.update(chunk)
|
|
| 498 |
- out.write(chunk)
|
|
| 501 |
+ with contextlib.ExitStack() as stack:
|
|
| 502 |
+ if path is not None and link_directly:
|
|
| 503 |
+ tmp = stack.enter_context(open(path, 'rb'))
|
|
| 504 |
+ for chunk in iter(lambda: tmp.read(4096), b""):
|
|
| 505 |
+ h.update(chunk)
|
|
| 499 | 506 |
else:
|
| 500 |
- h.update(buffer)
|
|
| 501 |
- out.write(buffer)
|
|
| 507 |
+ tmp = stack.enter_context(tempfile.NamedTemporaryFile(dir=self.tmpdir))
|
|
| 508 |
+ # Set mode bits to 0644
|
|
| 509 |
+ os.chmod(tmp.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
|
|
| 502 | 510 |
|
| 503 |
- out.flush()
|
|
| 511 |
+ if path:
|
|
| 512 |
+ with open(path, 'rb') as f:
|
|
| 513 |
+ for chunk in iter(lambda: f.read(4096), b""):
|
|
| 514 |
+ h.update(chunk)
|
|
| 515 |
+ tmp.write(chunk)
|
|
| 516 |
+ else:
|
|
| 517 |
+ h.update(buffer)
|
|
| 518 |
+ tmp.write(buffer)
|
|
| 519 |
+ |
|
| 520 |
+ tmp.flush()
|
|
| 504 | 521 |
|
| 505 | 522 |
digest.hash = h.hexdigest()
|
| 506 |
- digest.size_bytes = os.fstat(out.fileno()).st_size
|
|
| 523 |
+ digest.size_bytes = os.fstat(tmp.fileno()).st_size
|
|
| 507 | 524 |
|
| 508 | 525 |
# Place file at final location
|
| 509 | 526 |
objpath = self.objpath(digest)
|
| 510 | 527 |
os.makedirs(os.path.dirname(objpath), exist_ok=True)
|
| 511 |
- os.link(out.name, objpath)
|
|
| 528 |
+ os.link(tmp.name, objpath)
|
|
| 512 | 529 |
|
| 513 | 530 |
except FileExistsError as e:
|
| 514 | 531 |
# We can ignore the failed link() if the object is already in the repo.
|
| ... | ... | @@ -606,6 +623,41 @@ class CASCache(): |
| 606 | 623 |
# first ref of this list will be the file modified earliest.
|
| 607 | 624 |
return [ref for _, ref in sorted(zip(mtimes, refs))]
|
| 608 | 625 |
|
| 626 |
+ # list_objects():
|
|
| 627 |
+ #
|
|
| 628 |
+ # List cached objects in Least Recently Modified (LRM) order.
|
|
| 629 |
+ #
|
|
| 630 |
+ # Returns:
|
|
| 631 |
+ # (list) - A list of objects and timestamps in LRM order
|
|
| 632 |
+ #
|
|
| 633 |
+ def list_objects(self):
|
|
| 634 |
+ objs = []
|
|
| 635 |
+ mtimes = []
|
|
| 636 |
+ |
|
| 637 |
+ for root, _, files in os.walk(os.path.join(self.casdir, 'objects')):
|
|
| 638 |
+ for filename in files:
|
|
| 639 |
+ obj_path = os.path.join(root, filename)
|
|
| 640 |
+ try:
|
|
| 641 |
+ mtimes.append(os.path.getmtime(obj_path))
|
|
| 642 |
+ except FileNotFoundError:
|
|
| 643 |
+ pass
|
|
| 644 |
+ else:
|
|
| 645 |
+ objs.append(obj_path)
|
|
| 646 |
+ |
|
| 647 |
+ # NOTE: Sorted will sort from earliest to latest, thus the
|
|
| 648 |
+ # first element of this list will be the file modified earliest.
|
|
| 649 |
+ return sorted(zip(mtimes, objs))
|
|
| 650 |
+ |
|
| 651 |
+ def clean_up_refs_until(self, time):
|
|
| 652 |
+ ref_heads = os.path.join(self.casdir, 'refs', 'heads')
|
|
| 653 |
+ |
|
| 654 |
+ for root, _, files in os.walk(ref_heads):
|
|
| 655 |
+ for filename in files:
|
|
| 656 |
+ ref_path = os.path.join(root, filename)
|
|
| 657 |
+ # Obtain the mtime (the time a file was last modified)
|
|
| 658 |
+ if os.path.getmtime(ref_path) < time:
|
|
| 659 |
+ os.unlink(ref_path)
|
|
| 660 |
+ |
|
| 609 | 661 |
# remove():
|
| 610 | 662 |
#
|
| 611 | 663 |
# Removes the given symbolic ref from the repo.
|
| ... | ... | @@ -665,6 +717,10 @@ class CASCache(): |
| 665 | 717 |
|
| 666 | 718 |
return pruned
|
| 667 | 719 |
|
| 720 |
+ def update_tree_mtime(self, tree):
|
|
| 721 |
+ reachable = set()
|
|
| 722 |
+ self._reachable_refs_dir(reachable, tree, update_mtime=True)
|
|
| 723 |
+ |
|
| 668 | 724 |
################################################
|
| 669 | 725 |
# Local Private Methods #
|
| 670 | 726 |
################################################
|
| ... | ... | @@ -811,10 +867,13 @@ class CASCache(): |
| 811 | 867 |
a += 1
|
| 812 | 868 |
b += 1
|
| 813 | 869 |
|
| 814 |
- def _reachable_refs_dir(self, reachable, tree):
|
|
| 870 |
+ def _reachable_refs_dir(self, reachable, tree, update_mtime=False):
|
|
| 815 | 871 |
if tree.hash in reachable:
|
| 816 | 872 |
return
|
| 817 | 873 |
|
| 874 |
+ if update_mtime:
|
|
| 875 |
+ os.utime(self.objpath(tree))
|
|
| 876 |
+ |
|
| 818 | 877 |
reachable.add(tree.hash)
|
| 819 | 878 |
|
| 820 | 879 |
directory = remote_execution_pb2.Directory()
|
| ... | ... | @@ -823,10 +882,12 @@ class CASCache(): |
| 823 | 882 |
directory.ParseFromString(f.read())
|
| 824 | 883 |
|
| 825 | 884 |
for filenode in directory.files:
|
| 885 |
+ if update_mtime:
|
|
| 886 |
+ os.utime(self.objpath(filenode.digest))
|
|
| 826 | 887 |
reachable.add(filenode.digest.hash)
|
| 827 | 888 |
|
| 828 | 889 |
for dirnode in directory.directories:
|
| 829 |
- self._reachable_refs_dir(reachable, dirnode.digest)
|
|
| 890 |
+ self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime)
|
|
| 830 | 891 |
|
| 831 | 892 |
def _required_blobs(self, directory_digest):
|
| 832 | 893 |
# parse directory, and recursively add blobs
|
| ... | ... | @@ -880,7 +941,7 @@ class CASCache(): |
| 880 | 941 |
with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
|
| 881 | 942 |
self._fetch_blob(remote, digest, f)
|
| 882 | 943 |
|
| 883 |
- added_digest = self.add_object(path=f.name)
|
|
| 944 |
+ added_digest = self.add_object(path=f.name, link_directly=True)
|
|
| 884 | 945 |
assert added_digest.hash == digest.hash
|
| 885 | 946 |
|
| 886 | 947 |
return objpath
|
| ... | ... | @@ -891,7 +952,7 @@ class CASCache(): |
| 891 | 952 |
f.write(data)
|
| 892 | 953 |
f.flush()
|
| 893 | 954 |
|
| 894 |
- added_digest = self.add_object(path=f.name)
|
|
| 955 |
+ added_digest = self.add_object(path=f.name, link_directly=True)
|
|
| 895 | 956 |
assert added_digest.hash == digest.hash
|
| 896 | 957 |
|
| 897 | 958 |
# Helper function for _fetch_directory().
|
| ... | ... | @@ -1203,6 +1264,9 @@ class _CASBatchRead(): |
| 1203 | 1264 |
batch_response = self._remote.cas.BatchReadBlobs(self._request)
|
| 1204 | 1265 |
|
| 1205 | 1266 |
for response in batch_response.responses:
|
| 1267 |
+ if response.status.code == code_pb2.NOT_FOUND:
|
|
| 1268 |
+ raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format(
|
|
| 1269 |
+ response.digest.hash, response.status.code))
|
|
| 1206 | 1270 |
if response.status.code != code_pb2.OK:
|
| 1207 | 1271 |
raise CASError("Failed to download blob {}: {}".format(
|
| 1208 | 1272 |
response.digest.hash, response.status.code))
|
| ... | ... | @@ -24,6 +24,8 @@ import signal |
| 24 | 24 |
import sys
|
| 25 | 25 |
import tempfile
|
| 26 | 26 |
import uuid
|
| 27 |
+import errno
|
|
| 28 |
+import threading
|
|
| 27 | 29 |
|
| 28 | 30 |
import click
|
| 29 | 31 |
import grpc
|
| ... | ... | @@ -31,6 +33,7 @@ import grpc |
| 31 | 33 |
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
| 32 | 34 |
from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
|
| 33 | 35 |
from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
|
| 36 |
+from .._protos.google.rpc import code_pb2
|
|
| 34 | 37 |
|
| 35 | 38 |
from .._exceptions import CASError
|
| 36 | 39 |
|
| ... | ... | @@ -55,18 +58,22 @@ class ArtifactTooLargeException(Exception): |
| 55 | 58 |
# repo (str): Path to CAS repository
|
| 56 | 59 |
# enable_push (bool): Whether to allow blob uploads and artifact updates
|
| 57 | 60 |
#
|
| 58 |
-def create_server(repo, *, enable_push):
|
|
| 61 |
+def create_server(repo, *, enable_push,
|
|
| 62 |
+ max_head_size=int(10e9),
|
|
| 63 |
+ min_head_size=int(2e9)):
|
|
| 59 | 64 |
cas = CASCache(os.path.abspath(repo))
|
| 60 | 65 |
|
| 61 | 66 |
# Use max_workers default from Python 3.5+
|
| 62 | 67 |
max_workers = (os.cpu_count() or 1) * 5
|
| 63 | 68 |
server = grpc.server(futures.ThreadPoolExecutor(max_workers))
|
| 64 | 69 |
|
| 70 |
+ cache_cleaner = _CacheCleaner(cas, max_head_size, min_head_size)
|
|
| 71 |
+ |
|
| 65 | 72 |
bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
|
| 66 |
- _ByteStreamServicer(cas, enable_push=enable_push), server)
|
|
| 73 |
+ _ByteStreamServicer(cas, cache_cleaner, enable_push=enable_push), server)
|
|
| 67 | 74 |
|
| 68 | 75 |
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
|
| 69 |
- _ContentAddressableStorageServicer(cas, enable_push=enable_push), server)
|
|
| 76 |
+ _ContentAddressableStorageServicer(cas, cache_cleaner, enable_push=enable_push), server)
|
|
| 70 | 77 |
|
| 71 | 78 |
remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
|
| 72 | 79 |
_CapabilitiesServicer(), server)
|
| ... | ... | @@ -84,9 +91,19 @@ def create_server(repo, *, enable_push): |
| 84 | 91 |
@click.option('--client-certs', help="Public client certificates for TLS (PEM-encoded)")
|
| 85 | 92 |
@click.option('--enable-push', default=False, is_flag=True,
|
| 86 | 93 |
help="Allow clients to upload blobs and update artifact cache")
|
| 94 |
+@click.option('--head-room-min', type=click.INT,
|
|
| 95 |
+ help="Disk head room minimum in bytes",
|
|
| 96 |
+ default=2e9)
|
|
| 97 |
+@click.option('--head-room-max', type=click.INT,
|
|
| 98 |
+ help="Disk head room maximum in bytes",
|
|
| 99 |
+ default=10e9)
|
|
| 87 | 100 |
@click.argument('repo')
|
| 88 |
-def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
|
|
| 89 |
- server = create_server(repo, enable_push=enable_push)
|
|
| 101 |
+def server_main(repo, port, server_key, server_cert, client_certs, enable_push,
|
|
| 102 |
+ head_room_min, head_room_max):
|
|
| 103 |
+ server = create_server(repo,
|
|
| 104 |
+ max_head_size=head_room_max,
|
|
| 105 |
+ min_head_size=head_room_min,
|
|
| 106 |
+ enable_push=enable_push)
|
|
| 90 | 107 |
|
| 91 | 108 |
use_tls = bool(server_key)
|
| 92 | 109 |
|
| ... | ... | @@ -128,10 +145,11 @@ def server_main(repo, port, server_key, server_cert, client_certs, enable_push): |
| 128 | 145 |
|
| 129 | 146 |
|
| 130 | 147 |
class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
|
| 131 |
- def __init__(self, cas, *, enable_push):
|
|
| 148 |
+ def __init__(self, cas, cache_cleaner, *, enable_push):
|
|
| 132 | 149 |
super().__init__()
|
| 133 | 150 |
self.cas = cas
|
| 134 | 151 |
self.enable_push = enable_push
|
| 152 |
+ self.cache_cleaner = cache_cleaner
|
|
| 135 | 153 |
|
| 136 | 154 |
def Read(self, request, context):
|
| 137 | 155 |
resource_name = request.resource_name
|
| ... | ... | @@ -189,17 +207,34 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
| 189 | 207 |
context.set_code(grpc.StatusCode.NOT_FOUND)
|
| 190 | 208 |
return response
|
| 191 | 209 |
|
| 192 |
- try:
|
|
| 193 |
- _clean_up_cache(self.cas, client_digest.size_bytes)
|
|
| 194 |
- except ArtifactTooLargeException as e:
|
|
| 195 |
- context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
|
|
| 196 |
- context.set_details(str(e))
|
|
| 197 |
- return response
|
|
| 210 |
+ while True:
|
|
| 211 |
+ if client_digest.size_bytes == 0:
|
|
| 212 |
+ break
|
|
| 213 |
+ try:
|
|
| 214 |
+ self.cache_cleaner.clean_up(client_digest.size_bytes)
|
|
| 215 |
+ except ArtifactTooLargeException as e:
|
|
| 216 |
+ context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
|
|
| 217 |
+ context.set_details(str(e))
|
|
| 218 |
+ return response
|
|
| 219 |
+ |
|
| 220 |
+ try:
|
|
| 221 |
+ os.posix_fallocate(out.fileno(), 0, client_digest.size_bytes)
|
|
| 222 |
+ break
|
|
| 223 |
+ except OSError as e:
|
|
| 224 |
+ # Multiple upload can happen in the same time
|
|
| 225 |
+ if e.errno != errno.ENOSPC:
|
|
| 226 |
+ raise
|
|
| 227 |
+ |
|
| 198 | 228 |
elif request.resource_name:
|
| 199 | 229 |
# If it is set on subsequent calls, it **must** match the value of the first request.
|
| 200 | 230 |
if request.resource_name != resource_name:
|
| 201 | 231 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
| 202 | 232 |
return response
|
| 233 |
+ |
|
| 234 |
+ if (offset + len(request.data)) > client_digest.size_bytes:
|
|
| 235 |
+ context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
|
| 236 |
+ return response
|
|
| 237 |
+ |
|
| 203 | 238 |
out.write(request.data)
|
| 204 | 239 |
offset += len(request.data)
|
| 205 | 240 |
if request.finish_write:
|
| ... | ... | @@ -207,7 +242,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
| 207 | 242 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
| 208 | 243 |
return response
|
| 209 | 244 |
out.flush()
|
| 210 |
- digest = self.cas.add_object(path=out.name)
|
|
| 245 |
+ digest = self.cas.add_object(path=out.name, link_directly=True)
|
|
| 211 | 246 |
if digest.hash != client_digest.hash:
|
| 212 | 247 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
| 213 | 248 |
return response
|
| ... | ... | @@ -220,18 +255,26 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
| 220 | 255 |
|
| 221 | 256 |
|
| 222 | 257 |
class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
|
| 223 |
- def __init__(self, cas, *, enable_push):
|
|
| 258 |
+ def __init__(self, cas, cache_cleaner, *, enable_push):
|
|
| 224 | 259 |
super().__init__()
|
| 225 | 260 |
self.cas = cas
|
| 226 | 261 |
self.enable_push = enable_push
|
| 262 |
+ self.cache_cleaner = cache_cleaner
|
|
| 227 | 263 |
|
| 228 | 264 |
def FindMissingBlobs(self, request, context):
|
| 229 | 265 |
response = remote_execution_pb2.FindMissingBlobsResponse()
|
| 230 | 266 |
for digest in request.blob_digests:
|
| 231 |
- if not _has_object(self.cas, digest):
|
|
| 232 |
- d = response.missing_blob_digests.add()
|
|
| 233 |
- d.hash = digest.hash
|
|
| 234 |
- d.size_bytes = digest.size_bytes
|
|
| 267 |
+ objpath = self.cas.objpath(digest)
|
|
| 268 |
+ try:
|
|
| 269 |
+ os.utime(objpath)
|
|
| 270 |
+ except OSError as e:
|
|
| 271 |
+ if e.errno != errno.ENOENT:
|
|
| 272 |
+ raise
|
|
| 273 |
+ else:
|
|
| 274 |
+ d = response.missing_blob_digests.add()
|
|
| 275 |
+ d.hash = digest.hash
|
|
| 276 |
+ d.size_bytes = digest.size_bytes
|
|
| 277 |
+ |
|
| 235 | 278 |
return response
|
| 236 | 279 |
|
| 237 | 280 |
def BatchReadBlobs(self, request, context):
|
| ... | ... | @@ -250,12 +293,12 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres |
| 250 | 293 |
try:
|
| 251 | 294 |
with open(self.cas.objpath(digest), 'rb') as f:
|
| 252 | 295 |
if os.fstat(f.fileno()).st_size != digest.size_bytes:
|
| 253 |
- blob_response.status.code = grpc.StatusCode.NOT_FOUND
|
|
| 296 |
+ blob_response.status.code = code_pb2.NOT_FOUND
|
|
| 254 | 297 |
continue
|
| 255 | 298 |
|
| 256 | 299 |
blob_response.data = f.read(digest.size_bytes)
|
| 257 | 300 |
except FileNotFoundError:
|
| 258 |
- blob_response.status.code = grpc.StatusCode.NOT_FOUND
|
|
| 301 |
+ blob_response.status.code = code_pb2.NOT_FOUND
|
|
| 259 | 302 |
|
| 260 | 303 |
return response
|
| 261 | 304 |
|
| ... | ... | @@ -285,7 +328,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres |
| 285 | 328 |
continue
|
| 286 | 329 |
|
| 287 | 330 |
try:
|
| 288 |
- _clean_up_cache(self.cas, digest.size_bytes)
|
|
| 331 |
+ self.cache_cleaner.clean_up(digest.size_bytes)
|
|
| 289 | 332 |
|
| 290 | 333 |
with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
|
| 291 | 334 |
out.write(blob_request.data)
|
| ... | ... | @@ -328,6 +371,12 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): |
| 328 | 371 |
|
| 329 | 372 |
try:
|
| 330 | 373 |
tree = self.cas.resolve_ref(request.key, update_mtime=True)
|
| 374 |
+ try:
|
|
| 375 |
+ self.cas.update_tree_mtime(tree)
|
|
| 376 |
+ except FileNotFoundError:
|
|
| 377 |
+ self.cas.remove(request.key, defer_prune=True)
|
|
| 378 |
+ context.set_code(grpc.StatusCode.NOT_FOUND)
|
|
| 379 |
+ return response
|
|
| 331 | 380 |
|
| 332 | 381 |
response.digest.hash = tree.hash
|
| 333 | 382 |
response.digest.size_bytes = tree.size_bytes
|
| ... | ... | @@ -400,60 +449,79 @@ def _digest_from_upload_resource_name(resource_name): |
| 400 | 449 |
return None
|
| 401 | 450 |
|
| 402 | 451 |
|
| 403 |
-def _has_object(cas, digest):
|
|
| 404 |
- objpath = cas.objpath(digest)
|
|
| 405 |
- return os.path.exists(objpath)
|
|
| 452 |
+class _CacheCleaner:
|
|
| 406 | 453 |
|
| 454 |
+ __cleanup_cache_lock = threading.Lock()
|
|
| 407 | 455 |
|
| 408 |
-# _clean_up_cache()
|
|
| 409 |
-#
|
|
| 410 |
-# Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
|
|
| 411 |
-# is enough space for the incoming artifact
|
|
| 412 |
-#
|
|
| 413 |
-# Args:
|
|
| 414 |
-# cas: CASCache object
|
|
| 415 |
-# object_size: The size of the object being received in bytes
|
|
| 416 |
-#
|
|
| 417 |
-# Returns:
|
|
| 418 |
-# int: The total bytes removed on the filesystem
|
|
| 419 |
-#
|
|
| 420 |
-def _clean_up_cache(cas, object_size):
|
|
| 421 |
- # Determine the available disk space, in bytes, of the file system
|
|
| 422 |
- # which mounts the repo
|
|
| 423 |
- stats = os.statvfs(cas.casdir)
|
|
| 424 |
- buffer_ = int(2e9) # Add a 2 GB buffer
|
|
| 425 |
- free_disk_space = (stats.f_bfree * stats.f_bsize) - buffer_
|
|
| 426 |
- total_disk_space = (stats.f_blocks * stats.f_bsize) - buffer_
|
|
| 427 |
- |
|
| 428 |
- if object_size > total_disk_space:
|
|
| 429 |
- raise ArtifactTooLargeException("Artifact of size: {} is too large for "
|
|
| 430 |
- "the filesystem which mounts the remote "
|
|
| 431 |
- "cache".format(object_size))
|
|
| 432 |
- |
|
| 433 |
- if object_size <= free_disk_space:
|
|
| 434 |
- # No need to clean up
|
|
| 435 |
- return 0
|
|
| 436 |
- |
|
| 437 |
- # obtain a list of LRP artifacts
|
|
| 438 |
- LRP_artifacts = cas.list_refs()
|
|
| 439 |
- |
|
| 440 |
- removed_size = 0 # in bytes
|
|
| 441 |
- while object_size - removed_size > free_disk_space:
|
|
| 442 |
- try:
|
|
| 443 |
- to_remove = LRP_artifacts.pop(0) # The first element in the list is the LRP artifact
|
|
| 444 |
- except IndexError:
|
|
| 445 |
- # This exception is caught if there are no more artifacts in the list
|
|
| 446 |
- # LRP_artifacts. This means the the artifact is too large for the filesystem
|
|
| 447 |
- # so we abort the process
|
|
| 448 |
- raise ArtifactTooLargeException("Artifact of size {} is too large for "
|
|
| 449 |
- "the filesystem which mounts the remote "
|
|
| 450 |
- "cache".format(object_size))
|
|
| 456 |
+ def __init__(self, cas, max_head_size, min_head_size=int(2e9)):
|
|
| 457 |
+ self.__cas = cas
|
|
| 458 |
+ self.__max_head_size = max_head_size
|
|
| 459 |
+ self.__min_head_size = min_head_size
|
|
| 451 | 460 |
|
| 452 |
- removed_size += cas.remove(to_remove, defer_prune=False)
|
|
| 461 |
+ def __has_space(self, object_size):
|
|
| 462 |
+ stats = os.statvfs(self.__cas.casdir)
|
|
| 463 |
+ free_disk_space = (stats.f_bavail * stats.f_bsize) - self.__min_head_size
|
|
| 464 |
+ total_disk_space = (stats.f_blocks * stats.f_bsize) - self.__min_head_size
|
|
| 453 | 465 |
|
| 454 |
- if removed_size > 0:
|
|
| 455 |
- logging.info("Successfully removed {} bytes from the cache".format(removed_size))
|
|
| 456 |
- else:
|
|
| 457 |
- logging.info("No artifacts were removed from the cache.")
|
|
| 466 |
+ if object_size > total_disk_space:
|
|
| 467 |
+ raise ArtifactTooLargeException("Artifact of size: {} is too large for "
|
|
| 468 |
+ "the filesystem which mounts the remote "
|
|
| 469 |
+ "cache".format(object_size))
|
|
| 458 | 470 |
|
| 459 |
- return removed_size
|
|
| 471 |
+ return object_size <= free_disk_space
|
|
| 472 |
+ |
|
| 473 |
+ # _clean_up_cache()
|
|
| 474 |
+ #
|
|
| 475 |
+ # Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
|
|
| 476 |
+ # is enough space for the incoming artifact
|
|
| 477 |
+ #
|
|
| 478 |
+ # Args:
|
|
| 479 |
+ # object_size: The size of the object being received in bytes
|
|
| 480 |
+ #
|
|
| 481 |
+ # Returns:
|
|
| 482 |
+ # int: The total bytes removed on the filesystem
|
|
| 483 |
+ #
|
|
| 484 |
+ def clean_up(self, object_size):
|
|
| 485 |
+ if self.__has_space(object_size):
|
|
| 486 |
+ return 0
|
|
| 487 |
+ |
|
| 488 |
+ with _CacheCleaner.__cleanup_cache_lock:
|
|
| 489 |
+ if self.__has_space(object_size):
|
|
| 490 |
+ # Another thread has done the cleanup for us
|
|
| 491 |
+ return 0
|
|
| 492 |
+ |
|
| 493 |
+ stats = os.statvfs(self.__cas.casdir)
|
|
| 494 |
+ target_disk_space = (stats.f_bavail * stats.f_bsize) - self.__max_head_size
|
|
| 495 |
+ |
|
| 496 |
+ # obtain a list of LRP artifacts
|
|
| 497 |
+ LRP_objects = self.__cas.list_objects()
|
|
| 498 |
+ |
|
| 499 |
+ removed_size = 0 # in bytes
|
|
| 500 |
+ last_mtime = 0
|
|
| 501 |
+ |
|
| 502 |
+ while object_size - removed_size > target_disk_space:
|
|
| 503 |
+ try:
|
|
| 504 |
+ last_mtime, to_remove = LRP_objects.pop(0) # The first element in the list is the LRP artifact
|
|
| 505 |
+ except IndexError:
|
|
| 506 |
+ # This exception is caught if there are no more artifacts in the list
|
|
| 507 |
+ # LRP_artifacts. This means the the artifact is too large for the filesystem
|
|
| 508 |
+ # so we abort the process
|
|
| 509 |
+ raise ArtifactTooLargeException("Artifact of size {} is too large for "
|
|
| 510 |
+ "the filesystem which mounts the remote "
|
|
| 511 |
+ "cache".format(object_size))
|
|
| 512 |
+ |
|
| 513 |
+ try:
|
|
| 514 |
+ size = os.stat(to_remove).st_size
|
|
| 515 |
+ os.unlink(to_remove)
|
|
| 516 |
+ removed_size += size
|
|
| 517 |
+ except FileNotFoundError:
|
|
| 518 |
+ pass
|
|
| 519 |
+ |
|
| 520 |
+ self.__cas.clean_up_refs_until(last_mtime)
|
|
| 521 |
+ |
|
| 522 |
+ if removed_size > 0:
|
|
| 523 |
+ logging.info("Successfully removed {} bytes from the cache".format(removed_size))
|
|
| 524 |
+ else:
|
|
| 525 |
+ logging.info("No artifacts were removed from the cache.")
|
|
| 526 |
+ |
|
| 527 |
+ return removed_size
|
| ... | ... | @@ -469,6 +469,10 @@ def push(app, elements, deps, remote): |
| 469 | 469 |
The default destination is the highest priority configured cache. You can
|
| 470 | 470 |
override this by passing a different cache URL with the `--remote` flag.
|
| 471 | 471 |
|
| 472 |
+ If bst has been configured to include build trees on artifact pulls,
|
|
| 473 |
+ an attempt will be made to pull any required build trees to avoid the
|
|
| 474 |
+ skipping of partial artifacts being pushed.
|
|
| 475 |
+ |
|
| 472 | 476 |
Specify `--deps` to control which artifacts to push:
|
| 473 | 477 |
|
| 474 | 478 |
\b
|
| ... | ... | @@ -327,6 +327,10 @@ class Stream(): |
| 327 | 327 |
# If `remote` specified as None, then regular configuration will be used
|
| 328 | 328 |
# to determine where to push artifacts to.
|
| 329 | 329 |
#
|
| 330 |
+ # If any of the given targets are missing their expected buildtree artifact,
|
|
| 331 |
+ # a pull queue will be created if user context and availavble remotes allow for
|
|
| 332 |
+ # attempting to fetch them.
|
|
| 333 |
+ #
|
|
| 330 | 334 |
def push(self, targets, *,
|
| 331 | 335 |
selection=PipelineSelection.NONE,
|
| 332 | 336 |
remote=None):
|
| ... | ... | @@ -345,8 +349,17 @@ class Stream(): |
| 345 | 349 |
raise StreamError("No artifact caches available for pushing artifacts")
|
| 346 | 350 |
|
| 347 | 351 |
self._pipeline.assert_consistent(elements)
|
| 348 |
- self._add_queue(PushQueue(self._scheduler))
|
|
| 349 |
- self._enqueue_plan(elements)
|
|
| 352 |
+ |
|
| 353 |
+ # Check if we require a pull queue, with given artifact state and context
|
|
| 354 |
+ require_buildtrees = self._buildtree_pull_required(elements)
|
|
| 355 |
+ if require_buildtrees:
|
|
| 356 |
+ self._message(MessageType.INFO, "Attempting to fetch missing artifact buildtrees")
|
|
| 357 |
+ self._add_queue(PullQueue(self._scheduler))
|
|
| 358 |
+ self._enqueue_plan(require_buildtrees)
|
|
| 359 |
+ |
|
| 360 |
+ push_queue = PushQueue(self._scheduler)
|
|
| 361 |
+ self._add_queue(push_queue)
|
|
| 362 |
+ self._enqueue_plan(elements, queue=push_queue)
|
|
| 350 | 363 |
self._run()
|
| 351 | 364 |
|
| 352 | 365 |
# checkout()
|
| ... | ... | @@ -1237,3 +1250,26 @@ class Stream(): |
| 1237 | 1250 |
parts.append(element.normal_name)
|
| 1238 | 1251 |
|
| 1239 | 1252 |
return os.path.join(directory, *reversed(parts))
|
| 1253 |
+ |
|
| 1254 |
+ # _buildtree_pull_required()
|
|
| 1255 |
+ #
|
|
| 1256 |
+ # Check if current task, given config, requires element buildtree artifact
|
|
| 1257 |
+ #
|
|
| 1258 |
+ # Args:
|
|
| 1259 |
+ # elements (list): elements to check if buildtrees are required
|
|
| 1260 |
+ #
|
|
| 1261 |
+ # Returns:
|
|
| 1262 |
+ # (list): elements requiring buildtrees
|
|
| 1263 |
+ #
|
|
| 1264 |
+ def _buildtree_pull_required(self, elements):
|
|
| 1265 |
+ required_list = []
|
|
| 1266 |
+ |
|
| 1267 |
+ # If context is set to not pull buildtrees, or no fetch remotes, return empty list
|
|
| 1268 |
+ if not (self._context.pull_buildtrees or self._artifacts.has_fetch_remotes()):
|
|
| 1269 |
+ return required_list
|
|
| 1270 |
+ |
|
| 1271 |
+ for element in elements:
|
|
| 1272 |
+ if not element._cached_buildtree():
|
|
| 1273 |
+ required_list.append(element)
|
|
| 1274 |
+ |
|
| 1275 |
+ return required_list
|
| ... | ... | @@ -1998,6 +1998,17 @@ class Element(Plugin): |
| 1998 | 1998 |
def _get_source_element(self):
|
| 1999 | 1999 |
return self
|
| 2000 | 2000 |
|
| 2001 |
+ # _cached_buildtree()
|
|
| 2002 |
+ #
|
|
| 2003 |
+ # Check if the element has an expected cached buildtree artifact
|
|
| 2004 |
+ #
|
|
| 2005 |
+ # Returns:
|
|
| 2006 |
+ # (bool): True if artifact cached with buildtree, False if
|
|
| 2007 |
+ # element not cached or missing expected buildtree
|
|
| 2008 |
+ #
|
|
| 2009 |
+ def _cached_buildtree(self):
|
|
| 2010 |
+ return self.__cached_buildtree()
|
|
| 2011 |
+ |
|
| 2001 | 2012 |
#############################################################
|
| 2002 | 2013 |
# Private Local Methods #
|
| 2003 | 2014 |
#############################################################
|
| ... | ... | @@ -2777,10 +2788,10 @@ class Element(Plugin): |
| 2777 | 2788 |
|
| 2778 | 2789 |
if not self._cached():
|
| 2779 | 2790 |
return False
|
| 2780 |
- elif context.get_strict():
|
|
| 2781 |
- if not self.__artifacts.contains_subdir_artifact(self, self.__strict_cache_key, 'buildtree'):
|
|
| 2782 |
- return False
|
|
| 2783 |
- elif not self.__artifacts.contains_subdir_artifact(self, self.__weak_cache_key, 'buildtree'):
|
|
| 2791 |
+ |
|
| 2792 |
+ key_strength = _KeyStrength.STRONG if context.get_strict() else _KeyStrength.WEAK
|
|
| 2793 |
+ if not self.__artifacts.contains_subdir_artifact(self, self._get_cache_key(strength=key_strength),
|
|
| 2794 |
+ 'buildtree'):
|
|
| 2784 | 2795 |
return False
|
| 2785 | 2796 |
|
| 2786 | 2797 |
return True
|
| ... | ... | @@ -5,16 +5,77 @@ import urllib.request |
| 5 | 5 |
import urllib.error
|
| 6 | 6 |
import contextlib
|
| 7 | 7 |
import shutil
|
| 8 |
+import netrc
|
|
| 8 | 9 |
|
| 9 | 10 |
from buildstream import Source, SourceError, Consistency
|
| 10 | 11 |
from buildstream import utils
|
| 11 | 12 |
|
| 12 | 13 |
|
| 14 |
+class _NetrcFTPOpener(urllib.request.FTPHandler):
|
|
| 15 |
+ |
|
| 16 |
+ def __init__(self, netrc_config):
|
|
| 17 |
+ self.netrc = netrc_config
|
|
| 18 |
+ |
|
| 19 |
+ def _split(self, netloc):
|
|
| 20 |
+ userpass, hostport = urllib.parse.splituser(netloc)
|
|
| 21 |
+ host, port = urllib.parse.splitport(hostport)
|
|
| 22 |
+ if userpass:
|
|
| 23 |
+ user, passwd = urllib.parse.splitpasswd(userpass)
|
|
| 24 |
+ else:
|
|
| 25 |
+ user = None
|
|
| 26 |
+ passwd = None
|
|
| 27 |
+ return host, port, user, passwd
|
|
| 28 |
+ |
|
| 29 |
+ def _unsplit(self, host, port, user, passwd):
|
|
| 30 |
+ if port:
|
|
| 31 |
+ host = '{}:{}'.format(host, port)
|
|
| 32 |
+ if user:
|
|
| 33 |
+ if passwd:
|
|
| 34 |
+ user = '{}:{}'.format(user, passwd)
|
|
| 35 |
+ host = '{}@{}'.format(user, host)
|
|
| 36 |
+ |
|
| 37 |
+ return host
|
|
| 38 |
+ |
|
| 39 |
+ def ftp_open(self, req):
|
|
| 40 |
+ host, port, user, passwd = self._split(req.host)
|
|
| 41 |
+ |
|
| 42 |
+ if user is None and self.netrc:
|
|
| 43 |
+ entry = self.netrc.authenticators(host)
|
|
| 44 |
+ if entry:
|
|
| 45 |
+ user, _, passwd = entry
|
|
| 46 |
+ |
|
| 47 |
+ req.host = self._unsplit(host, port, user, passwd)
|
|
| 48 |
+ |
|
| 49 |
+ return super().ftp_open(req)
|
|
| 50 |
+ |
|
| 51 |
+ |
|
| 52 |
+class _NetrcPasswordManager:
|
|
| 53 |
+ |
|
| 54 |
+ def __init__(self, netrc_config):
|
|
| 55 |
+ self.netrc = netrc_config
|
|
| 56 |
+ |
|
| 57 |
+ def add_password(self, realm, uri, user, passwd):
|
|
| 58 |
+ pass
|
|
| 59 |
+ |
|
| 60 |
+ def find_user_password(self, realm, authuri):
|
|
| 61 |
+ if not self.netrc:
|
|
| 62 |
+ return None, None
|
|
| 63 |
+ parts = urllib.parse.urlsplit(authuri)
|
|
| 64 |
+ entry = self.netrc.authenticators(parts.hostname)
|
|
| 65 |
+ if not entry:
|
|
| 66 |
+ return None, None
|
|
| 67 |
+ else:
|
|
| 68 |
+ login, _, password = entry
|
|
| 69 |
+ return login, password
|
|
| 70 |
+ |
|
| 71 |
+ |
|
| 13 | 72 |
class DownloadableFileSource(Source):
|
| 14 | 73 |
# pylint: disable=attribute-defined-outside-init
|
| 15 | 74 |
|
| 16 | 75 |
COMMON_CONFIG_KEYS = Source.COMMON_CONFIG_KEYS + ['url', 'ref', 'etag']
|
| 17 | 76 |
|
| 77 |
+ __urlopener = None
|
|
| 78 |
+ |
|
| 18 | 79 |
def configure(self, node):
|
| 19 | 80 |
self.original_url = self.node_get_member(node, str, 'url')
|
| 20 | 81 |
self.ref = self.node_get_member(node, str, 'ref', None)
|
| ... | ... | @@ -118,7 +179,8 @@ class DownloadableFileSource(Source): |
| 118 | 179 |
if etag and self.get_consistency() == Consistency.CACHED:
|
| 119 | 180 |
request.add_header('If-None-Match', etag)
|
| 120 | 181 |
|
| 121 |
- with contextlib.closing(urllib.request.urlopen(request)) as response:
|
|
| 182 |
+ opener = self.__get_urlopener()
|
|
| 183 |
+ with contextlib.closing(opener.open(request)) as response:
|
|
| 122 | 184 |
info = response.info()
|
| 123 | 185 |
|
| 124 | 186 |
etag = info['ETag'] if 'ETag' in info else None
|
| ... | ... | @@ -164,3 +226,19 @@ class DownloadableFileSource(Source): |
| 164 | 226 |
|
| 165 | 227 |
def _get_mirror_file(self, sha=None):
|
| 166 | 228 |
return os.path.join(self._get_mirror_dir(), sha or self.ref)
|
| 229 |
+ |
|
| 230 |
+ def __get_urlopener(self):
|
|
| 231 |
+ if not DownloadableFileSource.__urlopener:
|
|
| 232 |
+ try:
|
|
| 233 |
+ netrc_config = netrc.netrc()
|
|
| 234 |
+ except FileNotFoundError:
|
|
| 235 |
+ DownloadableFileSource.__urlopener = urllib.request.build_opener()
|
|
| 236 |
+ except netrc.NetrcParseError as e:
|
|
| 237 |
+ self.warn('{}: While reading .netrc: {}'.format(self, e))
|
|
| 238 |
+ return urllib.request.build_opener()
|
|
| 239 |
+ else:
|
|
| 240 |
+ netrc_pw_mgr = _NetrcPasswordManager(netrc_config)
|
|
| 241 |
+ http_auth = urllib.request.HTTPBasicAuthHandler(netrc_pw_mgr)
|
|
| 242 |
+ ftp_handler = _NetrcFTPOpener(netrc_config)
|
|
| 243 |
+ DownloadableFileSource.__urlopener = urllib.request.build_opener(http_auth, ftp_handler)
|
|
| 244 |
+ return DownloadableFileSource.__urlopener
|
| ... | ... | @@ -139,8 +139,7 @@ class SandboxRemote(Sandbox): |
| 139 | 139 |
|
| 140 | 140 |
# Upload the Command message to the remote CAS server
|
| 141 | 141 |
command_digest = cascache.push_message(casremote, remote_command)
|
| 142 |
- if not command_digest or not cascache.verify_digest_on_remote(casremote, command_digest):
|
|
| 143 |
- raise SandboxError("Failed pushing build command to remote CAS.")
|
|
| 142 |
+ |
|
| 144 | 143 |
# Create and send the action.
|
| 145 | 144 |
action = remote_execution_pb2.Action(command_digest=command_digest,
|
| 146 | 145 |
input_root_digest=input_root_digest,
|
| ... | ... | @@ -149,8 +148,6 @@ class SandboxRemote(Sandbox): |
| 149 | 148 |
|
| 150 | 149 |
# Upload the Action message to the remote CAS server
|
| 151 | 150 |
action_digest = cascache.push_message(casremote, action)
|
| 152 |
- if not action_digest or not cascache.verify_digest_on_remote(casremote, action_digest):
|
|
| 153 |
- raise SandboxError("Failed pushing build action to remote CAS.")
|
|
| 154 | 151 |
|
| 155 | 152 |
# Next, try to create a communication channel to the BuildGrid server.
|
| 156 | 153 |
url = urlparse(self.exec_url)
|
| ... | ... | @@ -299,15 +296,11 @@ class SandboxRemote(Sandbox): |
| 299 | 296 |
|
| 300 | 297 |
casremote = CASRemote(self.storage_remote_spec)
|
| 301 | 298 |
# Now, push that key (without necessarily needing a ref) to the remote.
|
| 302 |
- |
|
| 303 | 299 |
try:
|
| 304 | 300 |
cascache.push_directory(casremote, upload_vdir)
|
| 305 | 301 |
except grpc.RpcError as e:
|
| 306 | 302 |
raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
|
| 307 | 303 |
|
| 308 |
- if not cascache.verify_digest_on_remote(casremote, upload_vdir.ref):
|
|
| 309 |
- raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
|
|
| 310 |
- |
|
| 311 | 304 |
# Now transmit the command to execute
|
| 312 | 305 |
operation = self.run_remote_command(command, upload_vdir.ref, cwd, env)
|
| 313 | 306 |
|
| ... | ... | @@ -9,3 +9,4 @@ pytest-pep8 |
| 9 | 9 |
pytest-pylint
|
| 10 | 10 |
pytest-xdist
|
| 11 | 11 |
pytest-timeout
|
| 12 |
+pyftpdlib
|
| ... | ... | @@ -230,6 +230,8 @@ def test_artifact_expires(cli, datafiles, tmpdir): |
| 230 | 230 |
# Create an artifact share (remote artifact cache) in the tmpdir/artifactshare
|
| 231 | 231 |
# Mock a file system with 12 MB free disk space
|
| 232 | 232 |
with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'),
|
| 233 |
+ min_head_size=int(2e9),
|
|
| 234 |
+ max_head_size=int(2e9),
|
|
| 233 | 235 |
total_space=int(10e9), free_space=(int(12e6) + int(2e9))) as share:
|
| 234 | 236 |
|
| 235 | 237 |
# Configure bst to push to the cache
|
| ... | ... | @@ -313,6 +315,8 @@ def test_recently_pulled_artifact_does_not_expire(cli, datafiles, tmpdir): |
| 313 | 315 |
# Create an artifact share (remote cache) in tmpdir/artifactshare
|
| 314 | 316 |
# Mock a file system with 12 MB free disk space
|
| 315 | 317 |
with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'),
|
| 318 |
+ min_head_size=int(2e9),
|
|
| 319 |
+ max_head_size=int(2e9),
|
|
| 316 | 320 |
total_space=int(10e9), free_space=(int(12e6) + int(2e9))) as share:
|
| 317 | 321 |
|
| 318 | 322 |
# Configure bst to push to the cache
|
| ... | ... | @@ -38,7 +38,8 @@ def test_pullbuildtrees(cli, tmpdir, datafiles, integration_cache): |
| 38 | 38 |
|
| 39 | 39 |
# Create artifact shares for pull & push testing
|
| 40 | 40 |
with create_artifact_share(os.path.join(str(tmpdir), 'share1')) as share1,\
|
| 41 |
- create_artifact_share(os.path.join(str(tmpdir), 'share2')) as share2:
|
|
| 41 |
+ create_artifact_share(os.path.join(str(tmpdir), 'share2')) as share2,\
|
|
| 42 |
+ create_artifact_share(os.path.join(str(tmpdir), 'share3')) as share3:
|
|
| 42 | 43 |
cli.configure({
|
| 43 | 44 |
'artifacts': {'url': share1.repo, 'push': True},
|
| 44 | 45 |
'artifactdir': os.path.join(str(tmpdir), 'artifacts')
|
| ... | ... | @@ -123,6 +124,32 @@ def test_pullbuildtrees(cli, tmpdir, datafiles, integration_cache): |
| 123 | 124 |
assert share2.has_artifact('test', element_name, cli.get_element_key(project, element_name))
|
| 124 | 125 |
default_state(cli, tmpdir, share1)
|
| 125 | 126 |
|
| 127 |
+ # Assert that bst push will automatically attempt to pull a missing buildtree
|
|
| 128 |
+ # if pull-buildtrees is set, however as share3 is the only defined remote and is empty,
|
|
| 129 |
+ # assert that no element artifact buildtrees are pulled (no available remote buildtree) and thus the
|
|
| 130 |
+ # artifact cannot be pushed.
|
|
| 131 |
+ result = cli.run(project=project, args=['pull', element_name])
|
|
| 132 |
+ assert element_name in result.get_pulled_elements()
|
|
| 133 |
+ cli.configure({'artifacts': {'url': share3.repo, 'push': True}})
|
|
| 134 |
+ result = cli.run(project=project, args=['--pull-buildtrees', 'push', element_name])
|
|
| 135 |
+ assert "Attempting to fetch missing artifact buildtrees" in result.stderr
|
|
| 136 |
+ assert element_name not in result.get_pulled_elements()
|
|
| 137 |
+ assert not os.path.isdir(buildtreedir)
|
|
| 138 |
+ assert element_name not in result.get_pushed_elements()
|
|
| 139 |
+ assert not share3.has_artifact('test', element_name, cli.get_element_key(project, element_name))
|
|
| 140 |
+ |
|
| 141 |
+ # Assert that if we add an extra remote that has the buildtree artfact cached, bst push will
|
|
| 142 |
+ # automatically attempt to pull it and will be successful, leading to the full artifact being pushed
|
|
| 143 |
+ # to the empty share3. This gives the ability to attempt push currently partial artifacts to a remote,
|
|
| 144 |
+ # without exlipictly requiring a bst pull.
|
|
| 145 |
+ cli.configure({'artifacts': [{'url': share1.repo, 'push': False}, {'url': share3.repo, 'push': True}]})
|
|
| 146 |
+ result = cli.run(project=project, args=['--pull-buildtrees', 'push', element_name])
|
|
| 147 |
+ assert "Attempting to fetch missing artifact buildtrees" in result.stderr
|
|
| 148 |
+ assert element_name in result.get_pulled_elements()
|
|
| 149 |
+ assert os.path.isdir(buildtreedir)
|
|
| 150 |
+ assert element_name in result.get_pushed_elements()
|
|
| 151 |
+ assert share3.has_artifact('test', element_name, cli.get_element_key(project, element_name))
|
|
| 152 |
+ |
|
| 126 | 153 |
|
| 127 | 154 |
# Ensure that only valid pull-buildtrees boolean options make it through the loading
|
| 128 | 155 |
# process.
|
| ... | ... | @@ -5,6 +5,7 @@ import pytest |
| 5 | 5 |
from buildstream._exceptions import ErrorDomain
|
| 6 | 6 |
from buildstream import _yaml
|
| 7 | 7 |
from tests.testutils import cli
|
| 8 |
+from tests.testutils.file_server import create_file_server
|
|
| 8 | 9 |
|
| 9 | 10 |
DATA_DIR = os.path.join(
|
| 10 | 11 |
os.path.dirname(os.path.realpath(__file__)),
|
| ... | ... | @@ -22,6 +23,16 @@ def generate_project(project_dir, tmpdir): |
| 22 | 23 |
}, project_file)
|
| 23 | 24 |
|
| 24 | 25 |
|
| 26 |
+def generate_project_file_server(server, project_dir):
|
|
| 27 |
+ project_file = os.path.join(project_dir, "project.conf")
|
|
| 28 |
+ _yaml.dump({
|
|
| 29 |
+ 'name': 'foo',
|
|
| 30 |
+ 'aliases': {
|
|
| 31 |
+ 'tmpdir': server.base_url()
|
|
| 32 |
+ }
|
|
| 33 |
+ }, project_file)
|
|
| 34 |
+ |
|
| 35 |
+ |
|
| 25 | 36 |
# Test that without ref, consistency is set appropriately.
|
| 26 | 37 |
@pytest.mark.datafiles(os.path.join(DATA_DIR, 'no-ref'))
|
| 27 | 38 |
def test_no_ref(cli, tmpdir, datafiles):
|
| ... | ... | @@ -164,3 +175,35 @@ def test_executable(cli, tmpdir, datafiles): |
| 164 | 175 |
assert (mode & stat.S_IEXEC)
|
| 165 | 176 |
# Assert executable by anyone
|
| 166 | 177 |
assert(mode & (stat.S_IEXEC | stat.S_IXGRP | stat.S_IXOTH))
|
| 178 |
+ |
|
| 179 |
+ |
|
| 180 |
+@pytest.mark.parametrize('server_type', ('FTP', 'HTTP'))
|
|
| 181 |
+@pytest.mark.datafiles(os.path.join(DATA_DIR, 'single-file'))
|
|
| 182 |
+def test_use_netrc(cli, datafiles, server_type, tmpdir):
|
|
| 183 |
+ fake_home = os.path.join(str(tmpdir), 'fake_home')
|
|
| 184 |
+ os.makedirs(fake_home, exist_ok=True)
|
|
| 185 |
+ project = str(datafiles)
|
|
| 186 |
+ checkoutdir = os.path.join(str(tmpdir), 'checkout')
|
|
| 187 |
+ |
|
| 188 |
+ os.environ['HOME'] = fake_home
|
|
| 189 |
+ with open(os.path.join(fake_home, '.netrc'), 'wb') as f:
|
|
| 190 |
+ os.fchmod(f.fileno(), 0o700)
|
|
| 191 |
+ f.write(b'machine 127.0.0.1\n')
|
|
| 192 |
+ f.write(b'login testuser\n')
|
|
| 193 |
+ f.write(b'password 12345\n')
|
|
| 194 |
+ |
|
| 195 |
+ with create_file_server(server_type) as server:
|
|
| 196 |
+ server.add_user('testuser', '12345', project)
|
|
| 197 |
+ generate_project_file_server(server, project)
|
|
| 198 |
+ |
|
| 199 |
+ server.start()
|
|
| 200 |
+ |
|
| 201 |
+ result = cli.run(project=project, args=['fetch', 'target.bst'])
|
|
| 202 |
+ result.assert_success()
|
|
| 203 |
+ result = cli.run(project=project, args=['build', 'target.bst'])
|
|
| 204 |
+ result.assert_success()
|
|
| 205 |
+ result = cli.run(project=project, args=['checkout', 'target.bst', checkoutdir])
|
|
| 206 |
+ result.assert_success()
|
|
| 207 |
+ |
|
| 208 |
+ checkout_file = os.path.join(checkoutdir, 'file')
|
|
| 209 |
+ assert(os.path.exists(checkout_file))
|
| ... | ... | @@ -3,11 +3,13 @@ import pytest |
| 3 | 3 |
import tarfile
|
| 4 | 4 |
import tempfile
|
| 5 | 5 |
import subprocess
|
| 6 |
+import urllib.parse
|
|
| 6 | 7 |
from shutil import copyfile, rmtree
|
| 7 | 8 |
|
| 8 | 9 |
from buildstream._exceptions import ErrorDomain
|
| 9 | 10 |
from buildstream import _yaml
|
| 10 | 11 |
from tests.testutils import cli
|
| 12 |
+from tests.testutils.file_server import create_file_server
|
|
| 11 | 13 |
from tests.testutils.site import HAVE_LZIP
|
| 12 | 14 |
from . import list_dir_contents
|
| 13 | 15 |
|
| ... | ... | @@ -49,6 +51,16 @@ def generate_project(project_dir, tmpdir): |
| 49 | 51 |
}, project_file)
|
| 50 | 52 |
|
| 51 | 53 |
|
| 54 |
+def generate_project_file_server(base_url, project_dir):
|
|
| 55 |
+ project_file = os.path.join(project_dir, "project.conf")
|
|
| 56 |
+ _yaml.dump({
|
|
| 57 |
+ 'name': 'foo',
|
|
| 58 |
+ 'aliases': {
|
|
| 59 |
+ 'tmpdir': base_url
|
|
| 60 |
+ }
|
|
| 61 |
+ }, project_file)
|
|
| 62 |
+ |
|
| 63 |
+ |
|
| 52 | 64 |
# Test that without ref, consistency is set appropriately.
|
| 53 | 65 |
@pytest.mark.datafiles(os.path.join(DATA_DIR, 'no-ref'))
|
| 54 | 66 |
def test_no_ref(cli, tmpdir, datafiles):
|
| ... | ... | @@ -302,3 +314,77 @@ def test_read_only_dir(cli, tmpdir, datafiles): |
| 302 | 314 |
else:
|
| 303 | 315 |
os.remove(path)
|
| 304 | 316 |
rmtree(str(tmpdir), onerror=make_dir_writable)
|
| 317 |
+ |
|
| 318 |
+ |
|
| 319 |
+@pytest.mark.parametrize('server_type', ('FTP', 'HTTP'))
|
|
| 320 |
+@pytest.mark.datafiles(os.path.join(DATA_DIR, 'fetch'))
|
|
| 321 |
+def test_use_netrc(cli, datafiles, server_type, tmpdir):
|
|
| 322 |
+ file_server_files = os.path.join(str(tmpdir), 'file_server')
|
|
| 323 |
+ fake_home = os.path.join(str(tmpdir), 'fake_home')
|
|
| 324 |
+ os.makedirs(file_server_files, exist_ok=True)
|
|
| 325 |
+ os.makedirs(fake_home, exist_ok=True)
|
|
| 326 |
+ project = str(datafiles)
|
|
| 327 |
+ checkoutdir = os.path.join(str(tmpdir), 'checkout')
|
|
| 328 |
+ |
|
| 329 |
+ os.environ['HOME'] = fake_home
|
|
| 330 |
+ with open(os.path.join(fake_home, '.netrc'), 'wb') as f:
|
|
| 331 |
+ os.fchmod(f.fileno(), 0o700)
|
|
| 332 |
+ f.write(b'machine 127.0.0.1\n')
|
|
| 333 |
+ f.write(b'login testuser\n')
|
|
| 334 |
+ f.write(b'password 12345\n')
|
|
| 335 |
+ |
|
| 336 |
+ with create_file_server(server_type) as server:
|
|
| 337 |
+ server.add_user('testuser', '12345', file_server_files)
|
|
| 338 |
+ generate_project_file_server(server.base_url(), project)
|
|
| 339 |
+ |
|
| 340 |
+ src_tar = os.path.join(file_server_files, 'a.tar.gz')
|
|
| 341 |
+ _assemble_tar(os.path.join(str(datafiles), 'content'), 'a', src_tar)
|
|
| 342 |
+ |
|
| 343 |
+ server.start()
|
|
| 344 |
+ |
|
| 345 |
+ result = cli.run(project=project, args=['track', 'target.bst'])
|
|
| 346 |
+ result.assert_success()
|
|
| 347 |
+ result = cli.run(project=project, args=['fetch', 'target.bst'])
|
|
| 348 |
+ result.assert_success()
|
|
| 349 |
+ result = cli.run(project=project, args=['build', 'target.bst'])
|
|
| 350 |
+ result.assert_success()
|
|
| 351 |
+ result = cli.run(project=project, args=['checkout', 'target.bst', checkoutdir])
|
|
| 352 |
+ result.assert_success()
|
|
| 353 |
+ |
|
| 354 |
+ original_dir = os.path.join(str(datafiles), 'content', 'a')
|
|
| 355 |
+ original_contents = list_dir_contents(original_dir)
|
|
| 356 |
+ checkout_contents = list_dir_contents(checkoutdir)
|
|
| 357 |
+ assert(checkout_contents == original_contents)
|
|
| 358 |
+ |
|
| 359 |
+ |
|
| 360 |
+@pytest.mark.parametrize('server_type', ('FTP', 'HTTP'))
|
|
| 361 |
+@pytest.mark.datafiles(os.path.join(DATA_DIR, 'fetch'))
|
|
| 362 |
+def test_netrc_already_specified_user(cli, datafiles, server_type, tmpdir):
|
|
| 363 |
+ file_server_files = os.path.join(str(tmpdir), 'file_server')
|
|
| 364 |
+ fake_home = os.path.join(str(tmpdir), 'fake_home')
|
|
| 365 |
+ os.makedirs(file_server_files, exist_ok=True)
|
|
| 366 |
+ os.makedirs(fake_home, exist_ok=True)
|
|
| 367 |
+ project = str(datafiles)
|
|
| 368 |
+ checkoutdir = os.path.join(str(tmpdir), 'checkout')
|
|
| 369 |
+ |
|
| 370 |
+ os.environ['HOME'] = fake_home
|
|
| 371 |
+ with open(os.path.join(fake_home, '.netrc'), 'wb') as f:
|
|
| 372 |
+ os.fchmod(f.fileno(), 0o700)
|
|
| 373 |
+ f.write(b'machine 127.0.0.1\n')
|
|
| 374 |
+ f.write(b'login testuser\n')
|
|
| 375 |
+ f.write(b'password 12345\n')
|
|
| 376 |
+ |
|
| 377 |
+ with create_file_server(server_type) as server:
|
|
| 378 |
+ server.add_user('otheruser', '12345', file_server_files)
|
|
| 379 |
+ parts = urllib.parse.urlsplit(server.base_url())
|
|
| 380 |
+ base_url = urllib.parse.urlunsplit([parts[0]] + ['otheruser@{}'.format(parts[1])] + list(parts[2:]))
|
|
| 381 |
+ generate_project_file_server(base_url, project)
|
|
| 382 |
+ |
|
| 383 |
+ src_tar = os.path.join(file_server_files, 'a.tar.gz')
|
|
| 384 |
+ _assemble_tar(os.path.join(str(datafiles), 'content'), 'a', src_tar)
|
|
| 385 |
+ |
|
| 386 |
+ server.start()
|
|
| 387 |
+ |
|
| 388 |
+ result = cli.run(project=project, args=['track', 'target.bst'])
|
|
| 389 |
+ result.assert_main_error(ErrorDomain.STREAM, None)
|
|
| 390 |
+ result.assert_task_error(ErrorDomain.SOURCE, None)
|
| ... | ... | @@ -5,6 +5,7 @@ import zipfile |
| 5 | 5 |
from buildstream._exceptions import ErrorDomain
|
| 6 | 6 |
from buildstream import _yaml
|
| 7 | 7 |
from tests.testutils import cli
|
| 8 |
+from tests.testutils.file_server import create_file_server
|
|
| 8 | 9 |
from . import list_dir_contents
|
| 9 | 10 |
|
| 10 | 11 |
DATA_DIR = os.path.join(
|
| ... | ... | @@ -35,6 +36,16 @@ def generate_project(project_dir, tmpdir): |
| 35 | 36 |
}, project_file)
|
| 36 | 37 |
|
| 37 | 38 |
|
| 39 |
+def generate_project_file_server(server, project_dir):
|
|
| 40 |
+ project_file = os.path.join(project_dir, "project.conf")
|
|
| 41 |
+ _yaml.dump({
|
|
| 42 |
+ 'name': 'foo',
|
|
| 43 |
+ 'aliases': {
|
|
| 44 |
+ 'tmpdir': server.base_url()
|
|
| 45 |
+ }
|
|
| 46 |
+ }, project_file)
|
|
| 47 |
+ |
|
| 48 |
+ |
|
| 38 | 49 |
# Test that without ref, consistency is set appropriately.
|
| 39 | 50 |
@pytest.mark.datafiles(os.path.join(DATA_DIR, 'no-ref'))
|
| 40 | 51 |
def test_no_ref(cli, tmpdir, datafiles):
|
| ... | ... | @@ -176,3 +187,44 @@ def test_stage_explicit_basedir(cli, tmpdir, datafiles): |
| 176 | 187 |
original_contents = list_dir_contents(original_dir)
|
| 177 | 188 |
checkout_contents = list_dir_contents(checkoutdir)
|
| 178 | 189 |
assert(checkout_contents == original_contents)
|
| 190 |
+ |
|
| 191 |
+ |
|
| 192 |
+@pytest.mark.parametrize('server_type', ('FTP', 'HTTP'))
|
|
| 193 |
+@pytest.mark.datafiles(os.path.join(DATA_DIR, 'fetch'))
|
|
| 194 |
+def test_use_netrc(cli, datafiles, server_type, tmpdir):
|
|
| 195 |
+ file_server_files = os.path.join(str(tmpdir), 'file_server')
|
|
| 196 |
+ fake_home = os.path.join(str(tmpdir), 'fake_home')
|
|
| 197 |
+ os.makedirs(file_server_files, exist_ok=True)
|
|
| 198 |
+ os.makedirs(fake_home, exist_ok=True)
|
|
| 199 |
+ project = str(datafiles)
|
|
| 200 |
+ checkoutdir = os.path.join(str(tmpdir), 'checkout')
|
|
| 201 |
+ |
|
| 202 |
+ os.environ['HOME'] = fake_home
|
|
| 203 |
+ with open(os.path.join(fake_home, '.netrc'), 'wb') as f:
|
|
| 204 |
+ os.fchmod(f.fileno(), 0o700)
|
|
| 205 |
+ f.write(b'machine 127.0.0.1\n')
|
|
| 206 |
+ f.write(b'login testuser\n')
|
|
| 207 |
+ f.write(b'password 12345\n')
|
|
| 208 |
+ |
|
| 209 |
+ with create_file_server(server_type) as server:
|
|
| 210 |
+ server.add_user('testuser', '12345', file_server_files)
|
|
| 211 |
+ generate_project_file_server(server, project)
|
|
| 212 |
+ |
|
| 213 |
+ src_zip = os.path.join(file_server_files, 'a.zip')
|
|
| 214 |
+ _assemble_zip(os.path.join(str(datafiles), 'content'), src_zip)
|
|
| 215 |
+ |
|
| 216 |
+ server.start()
|
|
| 217 |
+ |
|
| 218 |
+ result = cli.run(project=project, args=['track', 'target.bst'])
|
|
| 219 |
+ result.assert_success()
|
|
| 220 |
+ result = cli.run(project=project, args=['fetch', 'target.bst'])
|
|
| 221 |
+ result.assert_success()
|
|
| 222 |
+ result = cli.run(project=project, args=['build', 'target.bst'])
|
|
| 223 |
+ result.assert_success()
|
|
| 224 |
+ result = cli.run(project=project, args=['checkout', 'target.bst', checkoutdir])
|
|
| 225 |
+ result.assert_success()
|
|
| 226 |
+ |
|
| 227 |
+ original_dir = os.path.join(str(datafiles), 'content', 'a')
|
|
| 228 |
+ original_contents = list_dir_contents(original_dir)
|
|
| 229 |
+ checkout_contents = list_dir_contents(checkoutdir)
|
|
| 230 |
+ assert(checkout_contents == original_contents)
|
| ... | ... | @@ -29,7 +29,11 @@ from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution |
| 29 | 29 |
#
|
| 30 | 30 |
class ArtifactShare():
|
| 31 | 31 |
|
| 32 |
- def __init__(self, directory, *, total_space=None, free_space=None):
|
|
| 32 |
+ def __init__(self, directory, *,
|
|
| 33 |
+ total_space=None,
|
|
| 34 |
+ free_space=None,
|
|
| 35 |
+ min_head_size=int(2e9),
|
|
| 36 |
+ max_head_size=int(10e9)):
|
|
| 33 | 37 |
|
| 34 | 38 |
# The working directory for the artifact share (in case it
|
| 35 | 39 |
# needs to do something outside of its backend's storage folder).
|
| ... | ... | @@ -50,6 +54,9 @@ class ArtifactShare(): |
| 50 | 54 |
self.total_space = total_space
|
| 51 | 55 |
self.free_space = free_space
|
| 52 | 56 |
|
| 57 |
+ self.max_head_size = max_head_size
|
|
| 58 |
+ self.min_head_size = min_head_size
|
|
| 59 |
+ |
|
| 53 | 60 |
q = Queue()
|
| 54 | 61 |
|
| 55 | 62 |
self.process = Process(target=self.run, args=(q,))
|
| ... | ... | @@ -74,7 +81,10 @@ class ArtifactShare(): |
| 74 | 81 |
self.free_space = self.total_space
|
| 75 | 82 |
os.statvfs = self._mock_statvfs
|
| 76 | 83 |
|
| 77 |
- server = create_server(self.repodir, enable_push=True)
|
|
| 84 |
+ server = create_server(self.repodir,
|
|
| 85 |
+ max_head_size=self.max_head_size,
|
|
| 86 |
+ min_head_size=self.min_head_size,
|
|
| 87 |
+ enable_push=True)
|
|
| 78 | 88 |
port = server.add_insecure_port('localhost:0')
|
| 79 | 89 |
|
| 80 | 90 |
server.start()
|
| ... | ... | @@ -136,6 +146,15 @@ class ArtifactShare(): |
| 136 | 146 |
|
| 137 | 147 |
try:
|
| 138 | 148 |
tree = self.cas.resolve_ref(artifact_key)
|
| 149 |
+ reachable = set()
|
|
| 150 |
+ try:
|
|
| 151 |
+ self.cas._reachable_refs_dir(reachable, tree, update_mtime=False)
|
|
| 152 |
+ except FileNotFoundError:
|
|
| 153 |
+ return None
|
|
| 154 |
+ for digest in reachable:
|
|
| 155 |
+ object_name = os.path.join(self.cas.casdir, 'objects', digest[:2], digest[2:])
|
|
| 156 |
+ if not os.path.exists(object_name):
|
|
| 157 |
+ return None
|
|
| 139 | 158 |
return tree
|
| 140 | 159 |
except CASError:
|
| 141 | 160 |
return None
|
| ... | ... | @@ -167,8 +186,11 @@ class ArtifactShare(): |
| 167 | 186 |
# Create an ArtifactShare for use in a test case
|
| 168 | 187 |
#
|
| 169 | 188 |
@contextmanager
|
| 170 |
-def create_artifact_share(directory, *, total_space=None, free_space=None):
|
|
| 171 |
- share = ArtifactShare(directory, total_space=total_space, free_space=free_space)
|
|
| 189 |
+def create_artifact_share(directory, *, total_space=None, free_space=None,
|
|
| 190 |
+ min_head_size=int(2e9),
|
|
| 191 |
+ max_head_size=int(10e9)):
|
|
| 192 |
+ share = ArtifactShare(directory, total_space=total_space, free_space=free_space,
|
|
| 193 |
+ min_head_size=min_head_size, max_head_size=max_head_size)
|
|
| 172 | 194 |
try:
|
| 173 | 195 |
yield share
|
| 174 | 196 |
finally:
|
| 1 |
+from contextlib import contextmanager
|
|
| 2 |
+ |
|
| 3 |
+from .ftp_server import SimpleFtpServer
|
|
| 4 |
+from .http_server import SimpleHttpServer
|
|
| 5 |
+ |
|
| 6 |
+ |
|
| 7 |
+@contextmanager
|
|
| 8 |
+def create_file_server(file_server_type):
|
|
| 9 |
+ if file_server_type == 'FTP':
|
|
| 10 |
+ server = SimpleFtpServer()
|
|
| 11 |
+ elif file_server_type == 'HTTP':
|
|
| 12 |
+ server = SimpleHttpServer()
|
|
| 13 |
+ else:
|
|
| 14 |
+ assert False
|
|
| 15 |
+ |
|
| 16 |
+ try:
|
|
| 17 |
+ yield server
|
|
| 18 |
+ finally:
|
|
| 19 |
+ server.stop()
|
| 1 |
+import multiprocessing
|
|
| 2 |
+ |
|
| 3 |
+from pyftpdlib.authorizers import DummyAuthorizer
|
|
| 4 |
+from pyftpdlib.handlers import FTPHandler
|
|
| 5 |
+from pyftpdlib.servers import FTPServer
|
|
| 6 |
+ |
|
| 7 |
+ |
|
| 8 |
+class SimpleFtpServer(multiprocessing.Process):
|
|
| 9 |
+ def __init__(self):
|
|
| 10 |
+ super().__init__()
|
|
| 11 |
+ self.authorizer = DummyAuthorizer()
|
|
| 12 |
+ handler = FTPHandler
|
|
| 13 |
+ handler.authorizer = self.authorizer
|
|
| 14 |
+ self.server = FTPServer(('127.0.0.1', 0), handler)
|
|
| 15 |
+ |
|
| 16 |
+ def run(self):
|
|
| 17 |
+ self.server.serve_forever()
|
|
| 18 |
+ |
|
| 19 |
+ def stop(self):
|
|
| 20 |
+ self.server.close_all()
|
|
| 21 |
+ self.server.close()
|
|
| 22 |
+ self.terminate()
|
|
| 23 |
+ self.join()
|
|
| 24 |
+ |
|
| 25 |
+ def allow_anonymous(self, cwd):
|
|
| 26 |
+ self.authorizer.add_anonymous(cwd)
|
|
| 27 |
+ |
|
| 28 |
+ def add_user(self, user, password, cwd):
|
|
| 29 |
+ self.authorizer.add_user(user, password, cwd, perm='elradfmwMT')
|
|
| 30 |
+ |
|
| 31 |
+ def base_url(self):
|
|
| 32 |
+ return 'ftp://127.0.0.1:{}'.format(self.server.address[1])
|
| 1 |
+import multiprocessing
|
|
| 2 |
+import os
|
|
| 3 |
+import posixpath
|
|
| 4 |
+import html
|
|
| 5 |
+import threading
|
|
| 6 |
+import base64
|
|
| 7 |
+from http.server import SimpleHTTPRequestHandler, HTTPServer, HTTPStatus
|
|
| 8 |
+ |
|
| 9 |
+ |
|
| 10 |
+class Unauthorized(Exception):
|
|
| 11 |
+ pass
|
|
| 12 |
+ |
|
| 13 |
+ |
|
| 14 |
+class RequestHandler(SimpleHTTPRequestHandler):
|
|
| 15 |
+ |
|
| 16 |
+ def get_root_dir(self):
|
|
| 17 |
+ authorization = self.headers.get('authorization')
|
|
| 18 |
+ if not authorization:
|
|
| 19 |
+ if not self.server.anonymous_dir:
|
|
| 20 |
+ raise Unauthorized('unauthorized')
|
|
| 21 |
+ return self.server.anonymous_dir
|
|
| 22 |
+ else:
|
|
| 23 |
+ authorization = authorization.split()
|
|
| 24 |
+ if len(authorization) != 2 or authorization[0].lower() != 'basic':
|
|
| 25 |
+ raise Unauthorized('unauthorized')
|
|
| 26 |
+ try:
|
|
| 27 |
+ decoded = base64.decodebytes(authorization[1].encode('ascii'))
|
|
| 28 |
+ user, password = decoded.decode('ascii').split(':')
|
|
| 29 |
+ expected_password, directory = self.server.users[user]
|
|
| 30 |
+ if password == expected_password:
|
|
| 31 |
+ return directory
|
|
| 32 |
+ except:
|
|
| 33 |
+ raise Unauthorized('unauthorized')
|
|
| 34 |
+ return None
|
|
| 35 |
+ |
|
| 36 |
+ def unauthorized(self):
|
|
| 37 |
+ shortmsg, longmsg = self.responses[HTTPStatus.UNAUTHORIZED]
|
|
| 38 |
+ self.send_response(HTTPStatus.UNAUTHORIZED, shortmsg)
|
|
| 39 |
+ self.send_header('Connection', 'close')
|
|
| 40 |
+ |
|
| 41 |
+ content = (self.error_message_format % {
|
|
| 42 |
+ 'code': HTTPStatus.UNAUTHORIZED,
|
|
| 43 |
+ 'message': html.escape(longmsg, quote=False),
|
|
| 44 |
+ 'explain': html.escape(longmsg, quote=False)
|
|
| 45 |
+ })
|
|
| 46 |
+ body = content.encode('UTF-8', 'replace')
|
|
| 47 |
+ self.send_header('Content-Type', self.error_content_type)
|
|
| 48 |
+ self.send_header('Content-Length', str(len(body)))
|
|
| 49 |
+ self.send_header('WWW-Authenticate', 'Basic realm="{}"'.format(self.server.realm))
|
|
| 50 |
+ self.end_headers()
|
|
| 51 |
+ self.end_headers()
|
|
| 52 |
+ |
|
| 53 |
+ if self.command != 'HEAD' and body:
|
|
| 54 |
+ self.wfile.write(body)
|
|
| 55 |
+ |
|
| 56 |
+ def do_GET(self):
|
|
| 57 |
+ try:
|
|
| 58 |
+ super().do_GET()
|
|
| 59 |
+ except Unauthorized:
|
|
| 60 |
+ self.unauthorized()
|
|
| 61 |
+ |
|
| 62 |
+ def do_HEAD(self):
|
|
| 63 |
+ try:
|
|
| 64 |
+ super().do_HEAD()
|
|
| 65 |
+ except Unauthorized:
|
|
| 66 |
+ self.unauthorized()
|
|
| 67 |
+ |
|
| 68 |
+ def translate_path(self, path):
|
|
| 69 |
+ path = path.split('?', 1)[0]
|
|
| 70 |
+ path = path.split('#', 1)[0]
|
|
| 71 |
+ path = posixpath.normpath(path)
|
|
| 72 |
+ assert(posixpath.isabs(path))
|
|
| 73 |
+ path = posixpath.relpath(path, '/')
|
|
| 74 |
+ return os.path.join(self.get_root_dir(), path)
|
|
| 75 |
+ |
|
| 76 |
+ |
|
| 77 |
+class AuthHTTPServer(HTTPServer):
|
|
| 78 |
+ def __init__(self, *args, **kwargs):
|
|
| 79 |
+ self.users = {}
|
|
| 80 |
+ self.anonymous_dir = None
|
|
| 81 |
+ self.realm = 'Realm'
|
|
| 82 |
+ super().__init__(*args, **kwargs)
|
|
| 83 |
+ |
|
| 84 |
+ |
|
| 85 |
+class SimpleHttpServer(multiprocessing.Process):
|
|
| 86 |
+ def __init__(self):
|
|
| 87 |
+ self.__stop = multiprocessing.Queue()
|
|
| 88 |
+ super().__init__()
|
|
| 89 |
+ self.server = AuthHTTPServer(('127.0.0.1', 0), RequestHandler)
|
|
| 90 |
+ self.started = False
|
|
| 91 |
+ |
|
| 92 |
+ def start(self):
|
|
| 93 |
+ self.started = True
|
|
| 94 |
+ super().start()
|
|
| 95 |
+ |
|
| 96 |
+ def run(self):
|
|
| 97 |
+ t = threading.Thread(target=self.server.serve_forever)
|
|
| 98 |
+ t.start()
|
|
| 99 |
+ self.__stop.get()
|
|
| 100 |
+ self.server.shutdown()
|
|
| 101 |
+ t.join()
|
|
| 102 |
+ |
|
| 103 |
+ def stop(self):
|
|
| 104 |
+ if not self.started:
|
|
| 105 |
+ return
|
|
| 106 |
+ self.__stop.put(None)
|
|
| 107 |
+ self.terminate()
|
|
| 108 |
+ self.join()
|
|
| 109 |
+ |
|
| 110 |
+ def allow_anonymous(self, cwd):
|
|
| 111 |
+ self.server.anonymous_dir = cwd
|
|
| 112 |
+ |
|
| 113 |
+ def add_user(self, user, password, cwd):
|
|
| 114 |
+ self.server.users[user] = (password, cwd)
|
|
| 115 |
+ |
|
| 116 |
+ def base_url(self):
|
|
| 117 |
+ return 'http://127.0.0.1:{}'.format(self.server.server_port)
|
