Jürg Billeter pushed to branch juerg/cas-batch at BuildStream / buildstream
Commits:
-
68ef69e4
by Tristan Van Berkom at 2018-09-21T05:20:46Z
-
662c729f
by Tristan Van Berkom at 2018-09-21T05:59:30Z
-
461a0588
by Jim MacArthur at 2018-09-21T10:53:11Z
-
aa9caaac
by Jim MacArthur at 2018-09-21T10:53:11Z
-
2aae68c7
by Jim MacArthur at 2018-09-21T10:53:11Z
-
ca1bb72c
by Jim MacArthur at 2018-09-21T10:53:11Z
-
55c93a82
by Jim MacArthur at 2018-09-21T11:26:55Z
-
e209beb0
by Chandan Singh at 2018-09-21T13:10:08Z
-
0b000518
by Chandan Singh at 2018-09-21T13:56:55Z
-
ef26043a
by Chandan Singh at 2018-09-21T17:14:16Z
-
1b2aed40
by Chandan Singh at 2018-09-21T17:40:11Z
-
c3176024
by Jürg Billeter at 2018-09-24T08:55:47Z
-
dd2fb1aa
by Jürg Billeter at 2018-09-24T09:04:18Z
-
d457f14e
by Jürg Billeter at 2018-09-24T09:08:48Z
-
7966dfa0
by Jürg Billeter at 2018-09-24T09:08:48Z
11 changed files:
- buildstream/_artifactcache/cascache.py
- buildstream/_artifactcache/casserver.py
- buildstream/_stream.py
- buildstream/element.py
- buildstream/sandbox/_sandboxremote.py
- buildstream/source.py
- setup.py
- tests/artifactcache/push.py
- + tests/frontend/project/elements/source-bundle/source-bundle-hello.bst
- + tests/frontend/project/files/source-bundle/llamas.txt
- + tests/frontend/source_bundle.py
Changes:
... | ... | @@ -44,6 +44,11 @@ from .._exceptions import ArtifactError |
44 | 44 |
from . import ArtifactCache
|
45 | 45 |
|
46 | 46 |
|
47 |
+# The default limit for gRPC messages is 4 MiB.
|
|
48 |
+# Limit payload to 1 MiB to leave sufficient headroom for metadata.
|
|
49 |
+_MAX_PAYLOAD_BYTES = 1024 * 1024
|
|
50 |
+ |
|
51 |
+ |
|
47 | 52 |
# A CASCache manages artifacts in a CAS repository as specified in the
|
48 | 53 |
# Remote Execution API.
|
49 | 54 |
#
|
... | ... | @@ -348,19 +353,29 @@ class CASCache(ArtifactCache): |
348 | 353 |
return pushed
|
349 | 354 |
|
350 | 355 |
def push_directory(self, project, directory):
|
356 |
+ """ Push the given virtual directory to all remotes.
|
|
357 |
+ |
|
358 |
+ Args:
|
|
359 |
+ project (Project): The current project
|
|
360 |
+ directory (Directory): A virtual directory object to push.
|
|
361 |
+ |
|
362 |
+ Raises: ArtifactError if no push remotes are configured.
|
|
363 |
+ """
|
|
351 | 364 |
|
352 | 365 |
push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
353 | 366 |
|
367 |
+ if not push_remotes:
|
|
368 |
+ raise ArtifactError("CASCache: push_directory was called, but no remote artifact " +
|
|
369 |
+ "servers are configured as push remotes.")
|
|
370 |
+ |
|
354 | 371 |
if directory.ref is None:
|
355 |
- return None
|
|
372 |
+ return
|
|
356 | 373 |
|
357 | 374 |
for remote in push_remotes:
|
358 | 375 |
remote.init()
|
359 | 376 |
|
360 | 377 |
self._send_directory(remote, directory.ref)
|
361 | 378 |
|
362 |
- return directory.ref
|
|
363 |
- |
|
364 | 379 |
def push_message(self, project, message):
|
365 | 380 |
|
366 | 381 |
push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
... | ... | @@ -844,6 +859,40 @@ class CASCache(ArtifactCache): |
844 | 859 |
|
845 | 860 |
assert digest.size_bytes == os.fstat(stream.fileno()).st_size
|
846 | 861 |
|
862 |
+ # _ensure_blob():
|
|
863 |
+ #
|
|
864 |
+ # Fetch and add blob if it's not already local.
|
|
865 |
+ #
|
|
866 |
+ # Args:
|
|
867 |
+ # remote (Remote): The remote to use.
|
|
868 |
+ # digest (Digest): Digest object for the blob to fetch.
|
|
869 |
+ #
|
|
870 |
+ # Returns:
|
|
871 |
+ # (str): The path of the object
|
|
872 |
+ #
|
|
873 |
+ def _ensure_blob(self, remote, digest):
|
|
874 |
+ objpath = self.objpath(digest)
|
|
875 |
+ if os.path.exists(objpath):
|
|
876 |
+ # already in local repository
|
|
877 |
+ return objpath
|
|
878 |
+ |
|
879 |
+ with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
|
|
880 |
+ self._fetch_blob(remote, digest, f)
|
|
881 |
+ |
|
882 |
+ added_digest = self.add_object(path=f.name)
|
|
883 |
+ assert added_digest.hash == digest.hash
|
|
884 |
+ |
|
885 |
+ return objpath
|
|
886 |
+ |
|
887 |
+ def _batch_download_complete(self, batch):
|
|
888 |
+ for digest, data in batch.send():
|
|
889 |
+ with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
|
|
890 |
+ f.write(data)
|
|
891 |
+ f.flush()
|
|
892 |
+ |
|
893 |
+ added_digest = self.add_object(path=f.name)
|
|
894 |
+ assert added_digest.hash == digest.hash
|
|
895 |
+ |
|
847 | 896 |
# _fetch_directory():
|
848 | 897 |
#
|
849 | 898 |
# Fetches remote directory and adds it to content addressable store.
|
... | ... | @@ -857,39 +906,73 @@ class CASCache(ArtifactCache): |
857 | 906 |
# dir_digest (Digest): Digest object for the directory to fetch.
|
858 | 907 |
#
|
859 | 908 |
def _fetch_directory(self, remote, dir_digest):
|
860 |
- objpath = self.objpath(dir_digest)
|
|
861 |
- if os.path.exists(objpath):
|
|
862 |
- # already in local cache
|
|
863 |
- return
|
|
909 |
+ fetch_queue = [dir_digest]
|
|
910 |
+ fetch_next_queue = []
|
|
911 |
+ batch = _CASBatchRead(remote)
|
|
864 | 912 |
|
865 |
- with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
|
|
866 |
- self._fetch_blob(remote, dir_digest, out)
|
|
913 |
+ def fetch_batch(batch):
|
|
914 |
+ self._batch_download_complete(batch)
|
|
867 | 915 |
|
868 |
- directory = remote_execution_pb2.Directory()
|
|
916 |
+ # All previously scheduled directories are now locally available,
|
|
917 |
+ # move them to the processing queue.
|
|
918 |
+ fetch_queue.extend(fetch_next_queue)
|
|
919 |
+ fetch_next_queue.clear()
|
|
920 |
+ return _CASBatchRead(remote)
|
|
869 | 921 |
|
870 |
- with open(out.name, 'rb') as f:
|
|
922 |
+ while len(fetch_queue) + len(fetch_next_queue) > 0:
|
|
923 |
+ if len(fetch_queue) == 0:
|
|
924 |
+ batch = fetch_batch(batch)
|
|
925 |
+ |
|
926 |
+ dir_digest = fetch_queue.pop(0)
|
|
927 |
+ |
|
928 |
+ objpath = self._ensure_blob(remote, dir_digest)
|
|
929 |
+ |
|
930 |
+ directory = remote_execution_pb2.Directory()
|
|
931 |
+ with open(objpath, 'rb') as f:
|
|
871 | 932 |
directory.ParseFromString(f.read())
|
872 | 933 |
|
873 |
- for filenode in directory.files:
|
|
874 |
- fileobjpath = self.objpath(filenode.digest)
|
|
875 |
- if os.path.exists(fileobjpath):
|
|
876 |
- # already in local cache
|
|
934 |
+ for dirnode in directory.directories:
|
|
935 |
+ if os.path.exists(self.objpath(dirnode.digest)):
|
|
936 |
+ # Skip download, already in local cache.
|
|
937 |
+ # Add directory to processing queue.
|
|
938 |
+ fetch_queue.append(dirnode.digest)
|
|
877 | 939 |
continue
|
878 | 940 |
|
879 |
- with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
|
|
880 |
- self._fetch_blob(remote, filenode.digest, f)
|
|
941 |
+ if (dirnode.digest.size_bytes >= remote.max_batch_total_size_bytes or
|
|
942 |
+ not remote.batch_read_supported):
|
|
943 |
+ # Too large for batch request, download in independent request.
|
|
944 |
+ self._ensure_blob(remote, dirnode.digest)
|
|
945 |
+ # Add directory to processing queue.
|
|
946 |
+ fetch_queue.append(dirnode.digest)
|
|
947 |
+ else:
|
|
948 |
+ if not batch.add(dirnode.digest):
|
|
949 |
+ # Not enough space left in batch request.
|
|
950 |
+ # Complete pending batch first.
|
|
951 |
+ batch = fetch_batch(batch)
|
|
952 |
+ batch.add(dirnode.digest)
|
|
881 | 953 |
|
882 |
- digest = self.add_object(path=f.name)
|
|
883 |
- assert digest.hash == filenode.digest.hash
|
|
954 |
+ # Directory will be available after completing pending batch.
|
|
955 |
+ # Add directory to deferred processing queue.
|
|
956 |
+ fetch_next_queue.append(dirnode.digest)
|
|
884 | 957 |
|
885 |
- for dirnode in directory.directories:
|
|
886 |
- self._fetch_directory(remote, dirnode.digest)
|
|
958 |
+ for filenode in directory.files:
|
|
959 |
+ if os.path.exists(self.objpath(filenode.digest)):
|
|
960 |
+ # Skip download, already in local cache.
|
|
961 |
+ continue
|
|
962 |
+ |
|
963 |
+ if (filenode.digest.size_bytes >= remote.max_batch_total_size_bytes or
|
|
964 |
+ not remote.batch_read_supported):
|
|
965 |
+ # Too large for batch request, download in independent request.
|
|
966 |
+ self._ensure_blob(remote, filenode.digest)
|
|
967 |
+ else:
|
|
968 |
+ if not batch.add(filenode.digest):
|
|
969 |
+ # Not enough space left in batch request.
|
|
970 |
+ # Complete pending batch first.
|
|
971 |
+ batch = fetch_batch(batch)
|
|
972 |
+ batch.add(filenode.digest)
|
|
887 | 973 |
|
888 |
- # Place directory blob only in final location when we've
|
|
889 |
- # downloaded all referenced blobs to avoid dangling
|
|
890 |
- # references in the repository.
|
|
891 |
- digest = self.add_object(path=out.name)
|
|
892 |
- assert digest.hash == dir_digest.hash
|
|
974 |
+ # Fetch final batch
|
|
975 |
+ fetch_batch(batch)
|
|
893 | 976 |
|
894 | 977 |
def _fetch_tree(self, remote, digest):
|
895 | 978 |
# download but do not store the Tree object
|
... | ... | @@ -904,16 +987,7 @@ class CASCache(ArtifactCache): |
904 | 987 |
tree.children.extend([tree.root])
|
905 | 988 |
for directory in tree.children:
|
906 | 989 |
for filenode in directory.files:
|
907 |
- fileobjpath = self.objpath(filenode.digest)
|
|
908 |
- if os.path.exists(fileobjpath):
|
|
909 |
- # already in local cache
|
|
910 |
- continue
|
|
911 |
- |
|
912 |
- with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
|
|
913 |
- self._fetch_blob(remote, filenode.digest, f)
|
|
914 |
- |
|
915 |
- added_digest = self.add_object(path=f.name)
|
|
916 |
- assert added_digest.hash == filenode.digest.hash
|
|
990 |
+ self._ensure_blob(remote, filenode.digest)
|
|
917 | 991 |
|
918 | 992 |
# place directory blob only in final location when we've downloaded
|
919 | 993 |
# all referenced blobs to avoid dangling references in the repository
|
... | ... | @@ -932,12 +1006,12 @@ class CASCache(ArtifactCache): |
932 | 1006 |
finished = False
|
933 | 1007 |
remaining = digest.size_bytes
|
934 | 1008 |
while not finished:
|
935 |
- chunk_size = min(remaining, 64 * 1024)
|
|
1009 |
+ chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
|
|
936 | 1010 |
remaining -= chunk_size
|
937 | 1011 |
|
938 | 1012 |
request = bytestream_pb2.WriteRequest()
|
939 | 1013 |
request.write_offset = offset
|
940 |
- # max. 64 kB chunks
|
|
1014 |
+ # max. _MAX_PAYLOAD_BYTES chunks
|
|
941 | 1015 |
request.data = instream.read(chunk_size)
|
942 | 1016 |
request.resource_name = resname
|
943 | 1017 |
request.finish_write = remaining <= 0
|
... | ... | @@ -1025,11 +1099,78 @@ class _CASRemote(): |
1025 | 1099 |
|
1026 | 1100 |
self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel)
|
1027 | 1101 |
self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
|
1102 |
+ self.capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self.channel)
|
|
1028 | 1103 |
self.ref_storage = buildstream_pb2_grpc.ReferenceStorageStub(self.channel)
|
1029 | 1104 |
|
1105 |
+ self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
|
|
1106 |
+ try:
|
|
1107 |
+ request = remote_execution_pb2.GetCapabilitiesRequest()
|
|
1108 |
+ response = self.capabilities.GetCapabilities(request)
|
|
1109 |
+ server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes
|
|
1110 |
+ if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes:
|
|
1111 |
+ self.max_batch_total_size_bytes = server_max_batch_total_size_bytes
|
|
1112 |
+ except grpc.RpcError as e:
|
|
1113 |
+ # Simply use the defaults for servers that don't implement GetCapabilities()
|
|
1114 |
+ if e.code() != grpc.StatusCode.UNIMPLEMENTED:
|
|
1115 |
+ raise
|
|
1116 |
+ |
|
1117 |
+ # Check whether the server supports BatchReadBlobs()
|
|
1118 |
+ self.batch_read_supported = False
|
|
1119 |
+ try:
|
|
1120 |
+ request = remote_execution_pb2.BatchReadBlobsRequest()
|
|
1121 |
+ response = self.cas.BatchReadBlobs(request)
|
|
1122 |
+ self.batch_read_supported = True
|
|
1123 |
+ except grpc.RpcError as e:
|
|
1124 |
+ if e.code() != grpc.StatusCode.UNIMPLEMENTED:
|
|
1125 |
+ raise
|
|
1126 |
+ |
|
1030 | 1127 |
self._initialized = True
|
1031 | 1128 |
|
1032 | 1129 |
|
1130 |
+# Represents a batch of blobs queued for fetching.
|
|
1131 |
+#
|
|
1132 |
+class _CASBatchRead():
|
|
1133 |
+ def __init__(self, remote):
|
|
1134 |
+ self._remote = remote
|
|
1135 |
+ self._max_total_size_bytes = remote.max_batch_total_size_bytes
|
|
1136 |
+ self._request = remote_execution_pb2.BatchReadBlobsRequest()
|
|
1137 |
+ self._size = 0
|
|
1138 |
+ self._sent = False
|
|
1139 |
+ |
|
1140 |
+ def add(self, digest):
|
|
1141 |
+ assert not self._sent
|
|
1142 |
+ |
|
1143 |
+ new_batch_size = self._size + digest.size_bytes
|
|
1144 |
+ if new_batch_size > self._max_total_size_bytes:
|
|
1145 |
+ # Not enough space left in current batch
|
|
1146 |
+ return False
|
|
1147 |
+ |
|
1148 |
+ request_digest = self._request.digests.add()
|
|
1149 |
+ request_digest.hash = digest.hash
|
|
1150 |
+ request_digest.size_bytes = digest.size_bytes
|
|
1151 |
+ self._size = new_batch_size
|
|
1152 |
+ return True
|
|
1153 |
+ |
|
1154 |
+ def send(self):
|
|
1155 |
+ assert not self._sent
|
|
1156 |
+ self._sent = True
|
|
1157 |
+ |
|
1158 |
+ if len(self._request.digests) == 0:
|
|
1159 |
+ return
|
|
1160 |
+ |
|
1161 |
+ batch_response = self._remote.cas.BatchReadBlobs(self._request)
|
|
1162 |
+ |
|
1163 |
+ for response in batch_response.responses:
|
|
1164 |
+ if response.status.code != grpc.StatusCode.OK.value[0]:
|
|
1165 |
+ raise ArtifactError("Failed to download blob {}: {}".format(
|
|
1166 |
+ response.digest.hash, response.status.code))
|
|
1167 |
+ if response.digest.size_bytes != len(response.data):
|
|
1168 |
+ raise ArtifactError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
|
|
1169 |
+ response.digest.hash, response.digest.size_bytes, len(response.data)))
|
|
1170 |
+ |
|
1171 |
+ yield (response.digest, response.data)
|
|
1172 |
+ |
|
1173 |
+ |
|
1033 | 1174 |
def _grouper(iterable, n):
|
1034 | 1175 |
while True:
|
1035 | 1176 |
try:
|
... | ... | @@ -38,8 +38,9 @@ from .._context import Context |
38 | 38 |
from .cascache import CASCache
|
39 | 39 |
|
40 | 40 |
|
41 |
-# The default limit for gRPC messages is 4 MiB
|
|
42 |
-_MAX_BATCH_TOTAL_SIZE_BYTES = 4 * 1024 * 1024
|
|
41 |
+# The default limit for gRPC messages is 4 MiB.
|
|
42 |
+# Limit payload to 1 MiB to leave sufficient headroom for metadata.
|
|
43 |
+_MAX_PAYLOAD_BYTES = 1024 * 1024
|
|
43 | 44 |
|
44 | 45 |
|
45 | 46 |
# Trying to push an artifact that is too large
|
... | ... | @@ -158,7 +159,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
158 | 159 |
|
159 | 160 |
remaining = client_digest.size_bytes - request.read_offset
|
160 | 161 |
while remaining > 0:
|
161 |
- chunk_size = min(remaining, 64 * 1024)
|
|
162 |
+ chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
|
|
162 | 163 |
remaining -= chunk_size
|
163 | 164 |
|
164 | 165 |
response = bytestream_pb2.ReadResponse()
|
... | ... | @@ -242,7 +243,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres |
242 | 243 |
|
243 | 244 |
for digest in request.digests:
|
244 | 245 |
batch_size += digest.size_bytes
|
245 |
- if batch_size > _MAX_BATCH_TOTAL_SIZE_BYTES:
|
|
246 |
+ if batch_size > _MAX_PAYLOAD_BYTES:
|
|
246 | 247 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
247 | 248 |
return response
|
248 | 249 |
|
... | ... | @@ -269,7 +270,7 @@ class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer): |
269 | 270 |
cache_capabilities = response.cache_capabilities
|
270 | 271 |
cache_capabilities.digest_function.append(remote_execution_pb2.SHA256)
|
271 | 272 |
cache_capabilities.action_cache_update_capabilities.update_enabled = False
|
272 |
- cache_capabilities.max_batch_total_size_bytes = _MAX_BATCH_TOTAL_SIZE_BYTES
|
|
273 |
+ cache_capabilities.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
|
|
273 | 274 |
cache_capabilities.symlink_absolute_path_strategy = remote_execution_pb2.CacheCapabilities.ALLOWED
|
274 | 275 |
|
275 | 276 |
response.deprecated_api_version.major = 2
|
... | ... | @@ -703,6 +703,7 @@ class Stream(): |
703 | 703 |
|
704 | 704 |
# Create a temporary directory to build the source tree in
|
705 | 705 |
builddir = self._context.builddir
|
706 |
+ os.makedirs(builddir, exist_ok=True)
|
|
706 | 707 |
prefix = "{}-".format(target.normal_name)
|
707 | 708 |
|
708 | 709 |
with TemporaryDirectory(prefix=prefix, dir=builddir) as tempdir:
|
... | ... | @@ -1085,6 +1086,7 @@ class Stream(): |
1085 | 1086 |
for element in elements:
|
1086 | 1087 |
source_dir = os.path.join(directory, "source")
|
1087 | 1088 |
element_source_dir = os.path.join(source_dir, element.normal_name)
|
1089 |
+ os.makedirs(element_source_dir)
|
|
1088 | 1090 |
|
1089 | 1091 |
element._stage_sources_at(element_source_dir)
|
1090 | 1092 |
|
... | ... | @@ -2137,14 +2137,11 @@ class Element(Plugin): |
2137 | 2137 |
project = self._get_project()
|
2138 | 2138 |
platform = Platform.get_platform()
|
2139 | 2139 |
|
2140 |
- if self.__remote_execution_url and self.BST_VIRTUAL_DIRECTORY:
|
|
2141 |
- if not self.__artifacts.has_push_remotes(element=self):
|
|
2142 |
- # Give an early warning if remote execution will not work
|
|
2143 |
- raise ElementError("Artifact {} is configured to use remote execution but has no push remotes. "
|
|
2144 |
- .format(self.name) +
|
|
2145 |
- "The remote artifact server(s) may not be correctly configured or contactable.")
|
|
2146 |
- |
|
2147 |
- self.info("Using a remote sandbox for artifact {}".format(self.name))
|
|
2140 |
+ if (directory is not None and
|
|
2141 |
+ self.__remote_execution_url and
|
|
2142 |
+ self.BST_VIRTUAL_DIRECTORY):
|
|
2143 |
+ |
|
2144 |
+ self.info("Using a remote sandbox for artifact {} with directory '{}'".format(self.name, directory))
|
|
2148 | 2145 |
|
2149 | 2146 |
sandbox = SandboxRemote(context, project,
|
2150 | 2147 |
directory,
|
... | ... | @@ -173,8 +173,8 @@ class SandboxRemote(Sandbox): |
173 | 173 |
platform = Platform.get_platform()
|
174 | 174 |
cascache = platform.artifactcache
|
175 | 175 |
# Now, push that key (without necessarily needing a ref) to the remote.
|
176 |
- vdir_digest = cascache.push_directory(self._get_project(), upload_vdir)
|
|
177 |
- if not vdir_digest or not cascache.verify_digest_pushed(self._get_project(), vdir_digest):
|
|
176 |
+ cascache.push_directory(self._get_project(), upload_vdir)
|
|
177 |
+ if not cascache.verify_digest_pushed(self._get_project(), upload_vdir.ref):
|
|
178 | 178 |
raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
|
179 | 179 |
|
180 | 180 |
# Set up environment and working directory
|
... | ... | @@ -930,6 +930,38 @@ class Source(Plugin): |
930 | 930 |
# Local Private Methods #
|
931 | 931 |
#############################################################
|
932 | 932 |
|
933 |
+ # __clone_for_uri()
|
|
934 |
+ #
|
|
935 |
+ # Clone the source with an alternative URI setup for the alias
|
|
936 |
+ # which this source uses.
|
|
937 |
+ #
|
|
938 |
+ # This is used for iteration over source mirrors.
|
|
939 |
+ #
|
|
940 |
+ # Args:
|
|
941 |
+ # uri (str): The alternative URI for this source's alias
|
|
942 |
+ #
|
|
943 |
+ # Returns:
|
|
944 |
+ # (Source): A new clone of this Source, with the specified URI
|
|
945 |
+ # as the value of the alias this Source has marked as
|
|
946 |
+ # primary with either mark_download_url() or
|
|
947 |
+ # translate_url().
|
|
948 |
+ #
|
|
949 |
+ def __clone_for_uri(self, uri):
|
|
950 |
+ project = self._get_project()
|
|
951 |
+ context = self._get_context()
|
|
952 |
+ alias = self._get_alias()
|
|
953 |
+ source_kind = type(self)
|
|
954 |
+ |
|
955 |
+ clone = source_kind(context, project, self.__meta, alias_override=(alias, uri))
|
|
956 |
+ |
|
957 |
+ # Do the necessary post instantiation routines here
|
|
958 |
+ #
|
|
959 |
+ clone._preflight()
|
|
960 |
+ clone._load_ref()
|
|
961 |
+ clone._update_state()
|
|
962 |
+ |
|
963 |
+ return clone
|
|
964 |
+ |
|
933 | 965 |
# Tries to call fetch for every mirror, stopping once it succeeds
|
934 | 966 |
def __do_fetch(self, **kwargs):
|
935 | 967 |
project = self._get_project()
|
... | ... | @@ -968,12 +1000,8 @@ class Source(Plugin): |
968 | 1000 |
self.fetch(**kwargs)
|
969 | 1001 |
return
|
970 | 1002 |
|
971 |
- context = self._get_context()
|
|
972 |
- source_kind = type(self)
|
|
973 | 1003 |
for uri in project.get_alias_uris(alias, first_pass=self.__first_pass):
|
974 |
- new_source = source_kind(context, project, self.__meta,
|
|
975 |
- alias_override=(alias, uri))
|
|
976 |
- new_source._preflight()
|
|
1004 |
+ new_source = self.__clone_for_uri(uri)
|
|
977 | 1005 |
try:
|
978 | 1006 |
new_source.fetch(**kwargs)
|
979 | 1007 |
# FIXME: Need to consider temporary vs. permanent failures,
|
... | ... | @@ -1006,9 +1034,7 @@ class Source(Plugin): |
1006 | 1034 |
# NOTE: We are assuming here that tracking only requires substituting the
|
1007 | 1035 |
# first alias used
|
1008 | 1036 |
for uri in reversed(project.get_alias_uris(alias, first_pass=self.__first_pass)):
|
1009 |
- new_source = source_kind(context, project, self.__meta,
|
|
1010 |
- alias_override=(alias, uri))
|
|
1011 |
- new_source._preflight()
|
|
1037 |
+ new_source = self.__clone_for_uri(uri)
|
|
1012 | 1038 |
try:
|
1013 | 1039 |
ref = new_source.track(**kwargs)
|
1014 | 1040 |
# FIXME: Need to consider temporary vs. permanent failures,
|
... | ... | @@ -264,8 +264,9 @@ setup(name='BuildStream', |
264 | 264 |
license='LGPL',
|
265 | 265 |
long_description=long_description,
|
266 | 266 |
long_description_content_type='text/x-rst; charset=UTF-8',
|
267 |
- url='https://gitlab.com/BuildStream/buildstream',
|
|
267 |
+ url='https://buildstream.build',
|
|
268 | 268 |
project_urls={
|
269 |
+ 'Source': 'https://gitlab.com/BuildStream/buildstream',
|
|
269 | 270 |
'Documentation': 'https://buildstream.gitlab.io/buildstream/',
|
270 | 271 |
'Tracker': 'https://gitlab.com/BuildStream/buildstream/issues',
|
271 | 272 |
'Mailing List': 'https://mail.gnome.org/mailman/listinfo/buildstream-list'
|
... | ... | @@ -228,9 +228,9 @@ def _test_push_directory(user_config_file, project_dir, artifact_dir, artifact_d |
228 | 228 |
directory = CasBasedDirectory(context, ref=artifact_digest)
|
229 | 229 |
|
230 | 230 |
# Push the CasBasedDirectory object
|
231 |
- directory_digest = cas.push_directory(project, directory)
|
|
231 |
+ cas.push_directory(project, directory)
|
|
232 | 232 |
|
233 |
- queue.put(directory_digest.hash)
|
|
233 |
+ queue.put(directory.ref.hash)
|
|
234 | 234 |
else:
|
235 | 235 |
queue.put("No remote configured")
|
236 | 236 |
|
1 |
+kind: import
|
|
2 |
+description: the kind of this element must implement generate_script() method
|
|
3 |
+ |
|
4 |
+sources:
|
|
5 |
+- kind: local
|
|
6 |
+ path: files/source-bundle
|
1 |
+llamas
|
1 |
+#
|
|
2 |
+# Copyright (C) 2018 Bloomberg Finance LP
|
|
3 |
+#
|
|
4 |
+# This program is free software; you can redistribute it and/or
|
|
5 |
+# modify it under the terms of the GNU Lesser General Public
|
|
6 |
+# License as published by the Free Software Foundation; either
|
|
7 |
+# version 2 of the License, or (at your option) any later version.
|
|
8 |
+#
|
|
9 |
+# This library is distributed in the hope that it will be useful,
|
|
10 |
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
11 |
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
12 |
+# Lesser General Public License for more details.
|
|
13 |
+#
|
|
14 |
+# You should have received a copy of the GNU Lesser General Public
|
|
15 |
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
|
|
16 |
+#
|
|
17 |
+# Authors: Chandan Singh <csingh43 bloomberg net>
|
|
18 |
+#
|
|
19 |
+ |
|
20 |
+import os
|
|
21 |
+import tarfile
|
|
22 |
+ |
|
23 |
+import pytest
|
|
24 |
+ |
|
25 |
+from tests.testutils import cli
|
|
26 |
+ |
|
27 |
+# Project directory
|
|
28 |
+DATA_DIR = os.path.join(
|
|
29 |
+ os.path.dirname(os.path.realpath(__file__)),
|
|
30 |
+ "project",
|
|
31 |
+)
|
|
32 |
+ |
|
33 |
+ |
|
34 |
+@pytest.mark.datafiles(DATA_DIR)
|
|
35 |
+def test_source_bundle(cli, tmpdir, datafiles):
|
|
36 |
+ project_path = os.path.join(datafiles.dirname, datafiles.basename)
|
|
37 |
+ element_name = 'source-bundle/source-bundle-hello.bst'
|
|
38 |
+ normal_name = 'source-bundle-source-bundle-hello'
|
|
39 |
+ |
|
40 |
+ # Verify that we can correctly produce a source-bundle
|
|
41 |
+ args = ['source-bundle', element_name, '--directory', str(tmpdir)]
|
|
42 |
+ result = cli.run(project=project_path, args=args)
|
|
43 |
+ result.assert_success()
|
|
44 |
+ |
|
45 |
+ # Verify that the source-bundle contains our sources and a build script
|
|
46 |
+ with tarfile.open(os.path.join(str(tmpdir), '{}.tar.gz'.format(normal_name))) as bundle:
|
|
47 |
+ assert os.path.join(normal_name, 'source', normal_name, 'llamas.txt') in bundle.getnames()
|
|
48 |
+ assert os.path.join(normal_name, 'build.sh') in bundle.getnames()
|