Raoul Hidalgo Charman pushed to branch raoul/802-refactor-artifactcache at BuildStream / buildstream
Commits:
-
211ad14b
by Raoul Hidalgo Charman at 2019-01-02T14:47:13Z
4 changed files:
- buildstream/_artifactcache.py
- buildstream/_cas/cascache.py
- buildstream/_cas/casremote.py
- tests/artifactcache/pull.py
Changes:
| ... | ... | @@ -694,15 +694,26 @@ class ArtifactCache(): |
| 694 | 694 |
#
|
| 695 | 695 |
# Args:
|
| 696 | 696 |
# project (Project): The current project
|
| 697 |
- # digest (Digest): The digest of the tree
|
|
| 697 |
+ # tree_digest (Digest): The digest of the tree
|
|
| 698 | 698 |
#
|
| 699 |
- def pull_tree(self, project, digest):
|
|
| 699 |
+ def pull_tree(self, project, tree_digest):
|
|
| 700 | 700 |
for remote in self._remotes[project]:
|
| 701 |
- digest = self.cas.pull_tree(remote, digest)
|
|
| 701 |
+ try:
|
|
| 702 |
+ for blob_digest in remote.yield_tree_digests(tree_digest):
|
|
| 703 |
+ if self.cas.check_blob(blob_digest):
|
|
| 704 |
+ continue
|
|
| 705 |
+ remote.request_blob(blob_digest)
|
|
| 706 |
+ for blob_file in remote.get_blobs():
|
|
| 707 |
+ self.cas.add_object(path=blob_file.name, link_directly=True)
|
|
| 702 | 708 |
|
| 703 |
- if digest:
|
|
| 704 |
- # no need to pull from additional remotes
|
|
| 705 |
- return digest
|
|
| 709 |
+ # Get the last batch
|
|
| 710 |
+ for blob_file in remote.get_blobs(request_batch=True):
|
|
| 711 |
+ self.cas.add_object(path=blob_file.name, link_directly=True)
|
|
| 712 |
+ |
|
| 713 |
+ except BlobNotFound:
|
|
| 714 |
+ continue
|
|
| 715 |
+ else:
|
|
| 716 |
+ return tree_digest
|
|
| 706 | 717 |
|
| 707 | 718 |
return None
|
| 708 | 719 |
|
| ... | ... | @@ -183,29 +183,6 @@ class CASCache(): |
| 183 | 183 |
|
| 184 | 184 |
return modified, removed, added
|
| 185 | 185 |
|
| 186 |
- # pull_tree():
|
|
| 187 |
- #
|
|
| 188 |
- # Pull a single Tree rather than a ref.
|
|
| 189 |
- # Does not update local refs.
|
|
| 190 |
- #
|
|
| 191 |
- # Args:
|
|
| 192 |
- # remote (CASRemote): The remote to pull from
|
|
| 193 |
- # digest (Digest): The digest of the tree
|
|
| 194 |
- #
|
|
| 195 |
- def pull_tree(self, remote, digest):
|
|
| 196 |
- try:
|
|
| 197 |
- remote.init()
|
|
| 198 |
- |
|
| 199 |
- digest = self._fetch_tree(remote, digest)
|
|
| 200 |
- |
|
| 201 |
- return digest
|
|
| 202 |
- |
|
| 203 |
- except grpc.RpcError as e:
|
|
| 204 |
- if e.code() != grpc.StatusCode.NOT_FOUND:
|
|
| 205 |
- raise
|
|
| 206 |
- |
|
| 207 |
- return None
|
|
| 208 |
- |
|
| 209 | 186 |
# link_ref():
|
| 210 | 187 |
#
|
| 211 | 188 |
# Add an alias for an existing ref.
|
| ... | ... | @@ -771,29 +748,6 @@ class CASCache(): |
| 771 | 748 |
|
| 772 | 749 |
return objpath
|
| 773 | 750 |
|
| 774 |
- def _fetch_tree(self, remote, digest):
|
|
| 775 |
- # download but do not store the Tree object
|
|
| 776 |
- with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
|
|
| 777 |
- remote._fetch_blob(digest, out)
|
|
| 778 |
- |
|
| 779 |
- tree = remote_execution_pb2.Tree()
|
|
| 780 |
- |
|
| 781 |
- with open(out.name, 'rb') as f:
|
|
| 782 |
- tree.ParseFromString(f.read())
|
|
| 783 |
- |
|
| 784 |
- tree.children.extend([tree.root])
|
|
| 785 |
- for directory in tree.children:
|
|
| 786 |
- for filenode in directory.files:
|
|
| 787 |
- self._ensure_blob(remote, filenode.digest)
|
|
| 788 |
- |
|
| 789 |
- # place directory blob only in final location when we've downloaded
|
|
| 790 |
- # all referenced blobs to avoid dangling references in the repository
|
|
| 791 |
- dirbuffer = directory.SerializeToString()
|
|
| 792 |
- dirdigest = self.add_object(buffer=dirbuffer)
|
|
| 793 |
- assert dirdigest.size_bytes == len(dirbuffer)
|
|
| 794 |
- |
|
| 795 |
- return dirdigest
|
|
| 796 |
- |
|
| 797 | 751 |
def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
|
| 798 | 752 |
required_blobs = self._required_blobs(digest)
|
| 799 | 753 |
|
| ... | ... | @@ -281,19 +281,29 @@ class CASRemote(): |
| 281 | 281 |
else:
|
| 282 | 282 |
return None
|
| 283 | 283 |
|
| 284 |
+ def get_tree_blob(self, tree_digest):
|
|
| 285 |
+ self.init()
|
|
| 286 |
+ f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
|
|
| 287 |
+ self._fetch_blob(tree_digest, f)
|
|
| 288 |
+ |
|
| 289 |
+ tree = remote_execution_pb2.Tree()
|
|
| 290 |
+ with open(f.name, 'rb') as tmp:
|
|
| 291 |
+ tree.ParseFromString(tmp.read())
|
|
| 292 |
+ |
|
| 293 |
+ return tree
|
|
| 294 |
+ |
|
| 284 | 295 |
# yield_directory_digests():
|
| 285 | 296 |
#
|
| 286 | 297 |
# Iterate over blobs digests starting from a root digest
|
| 287 | 298 |
#
|
| 288 | 299 |
# Args:
|
| 289 |
- # root_digest (str): The root_digest to get a tree of
|
|
| 300 |
+ # root_digest (digest): The root_digest to get a tree of
|
|
| 290 | 301 |
# progress (callable): The progress callback, if any
|
| 291 | 302 |
# subdir (str): The optional specific subdir to pull
|
| 292 | 303 |
# excluded_subdirs (list): The optional list of subdirs to not pull
|
| 293 | 304 |
#
|
| 294 | 305 |
# Returns:
|
| 295 |
- # (iter): True if pull was successful, False if ref was not available
|
|
| 296 |
- #
|
|
| 306 |
+ # (iter digests): recursively iterates over digests contained in root directory
|
|
| 297 | 307 |
def yield_directory_digests(self, root_digest, *, progress=None, subdir=None, excluded_subdirs=[]):
|
| 298 | 308 |
self.init()
|
| 299 | 309 |
|
| ... | ... | @@ -301,6 +311,37 @@ class CASRemote(): |
| 301 | 311 |
# Fetch artifact, excluded_subdirs determined in pullqueue
|
| 302 | 312 |
yield from self._yield_directory_digests(root_digest, excluded_subdirs=excluded_subdirs)
|
| 303 | 313 |
|
| 314 |
+ # yield_tree_digests():
|
|
| 315 |
+ #
|
|
| 316 |
+ # Fetches a tree file from digests and then iterates over child digests
|
|
| 317 |
+ #
|
|
| 318 |
+ # Args:
|
|
| 319 |
+ # tree_digest (digest): tree digest
|
|
| 320 |
+ #
|
|
| 321 |
+ # Returns:
|
|
| 322 |
+ # (iter digests): iterates over digests in tree message
|
|
| 323 |
+ def yield_tree_digests(self, tree_digest):
|
|
| 324 |
+ self.init()
|
|
| 325 |
+ |
|
| 326 |
+ # get tree file
|
|
| 327 |
+ f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
|
|
| 328 |
+ self._fetch_blob(tree_digest, f)
|
|
| 329 |
+ tree = remote_execution_pb2.Tree()
|
|
| 330 |
+ tree.ParseFromString(f.read())
|
|
| 331 |
+ |
|
| 332 |
+ tree.children.extend([tree.root])
|
|
| 333 |
+ for directory in tree.children:
|
|
| 334 |
+ for filenode in directory.files:
|
|
| 335 |
+ yield filenode.digest
|
|
| 336 |
+ |
|
| 337 |
+ # add the directory to downloaded tmp files to be added
|
|
| 338 |
+ f2 = tempfile.NamedTemporaryFile(dir=self.tmpdir)
|
|
| 339 |
+ f2.write(directory.SerializeToString())
|
|
| 340 |
+ self.__tmp_downloads.append(f2)
|
|
| 341 |
+ |
|
| 342 |
+ # Add the tree directory to downloads right at the end
|
|
| 343 |
+ self.__tmp_downloads.append(f)
|
|
| 344 |
+ |
|
| 304 | 345 |
# request_blob():
|
| 305 | 346 |
#
|
| 306 | 347 |
# Request blob, triggering download depending via bytestream or cas
|
| ... | ... | @@ -110,7 +110,7 @@ def test_pull(cli, tmpdir, datafiles): |
| 110 | 110 |
# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
|
| 111 | 111 |
process = multiprocessing.Process(target=_queue_wrapper,
|
| 112 | 112 |
args=(_test_pull, queue, user_config_file, project_dir,
|
| 113 |
- artifact_dir, 'target.bst', element_key))
|
|
| 113 |
+ artifact_dir, tmpdir, 'target.bst', element_key))
|
|
| 114 | 114 |
|
| 115 | 115 |
try:
|
| 116 | 116 |
# Keep SIGINT blocked in the child process
|
| ... | ... | @@ -127,13 +127,14 @@ def test_pull(cli, tmpdir, datafiles): |
| 127 | 127 |
assert cas.contains(element, element_key)
|
| 128 | 128 |
|
| 129 | 129 |
|
| 130 |
-def _test_pull(user_config_file, project_dir, artifact_dir,
|
|
| 130 |
+def _test_pull(user_config_file, project_dir, artifact_dir, tmpdir,
|
|
| 131 | 131 |
element_name, element_key, queue):
|
| 132 | 132 |
# Fake minimal context
|
| 133 | 133 |
context = Context()
|
| 134 | 134 |
context.load(config=user_config_file)
|
| 135 | 135 |
context.artifactdir = artifact_dir
|
| 136 | 136 |
context.set_message_handler(message_handler)
|
| 137 |
+ context.tmpdir = tmpdir
|
|
| 137 | 138 |
|
| 138 | 139 |
# Load the project manually
|
| 139 | 140 |
project = Project(project_dir, context)
|
| ... | ... | @@ -218,7 +219,7 @@ def test_pull_tree(cli, tmpdir, datafiles): |
| 218 | 219 |
# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
|
| 219 | 220 |
process = multiprocessing.Process(target=_queue_wrapper,
|
| 220 | 221 |
args=(_test_push_tree, queue, user_config_file, project_dir,
|
| 221 |
- artifact_dir, artifact_digest))
|
|
| 222 |
+ artifact_dir, tmpdir, artifact_digest))
|
|
| 222 | 223 |
|
| 223 | 224 |
try:
|
| 224 | 225 |
# Keep SIGINT blocked in the child process
|
| ... | ... | @@ -246,7 +247,7 @@ def test_pull_tree(cli, tmpdir, datafiles): |
| 246 | 247 |
# Use subprocess to avoid creation of gRPC threads in main BuildStream process
|
| 247 | 248 |
process = multiprocessing.Process(target=_queue_wrapper,
|
| 248 | 249 |
args=(_test_pull_tree, queue, user_config_file, project_dir,
|
| 249 |
- artifact_dir, tree_digest))
|
|
| 250 |
+ artifact_dir, tmpdir, tree_digest))
|
|
| 250 | 251 |
|
| 251 | 252 |
try:
|
| 252 | 253 |
# Keep SIGINT blocked in the child process
|
| ... | ... | @@ -265,15 +266,18 @@ def test_pull_tree(cli, tmpdir, datafiles): |
| 265 | 266 |
size_bytes=directory_size)
|
| 266 | 267 |
|
| 267 | 268 |
# Ensure the entire Tree stucture has been pulled
|
| 269 |
+ print(cas.objpath(directory_digest))
|
|
| 268 | 270 |
assert os.path.exists(cas.objpath(directory_digest))
|
| 269 | 271 |
|
| 270 | 272 |
|
| 271 |
-def _test_push_tree(user_config_file, project_dir, artifact_dir, artifact_digest, queue):
|
|
| 273 |
+def _test_push_tree(user_config_file, project_dir, artifact_dir, tmpdir,
|
|
| 274 |
+ artifact_digest, queue):
|
|
| 272 | 275 |
# Fake minimal context
|
| 273 | 276 |
context = Context()
|
| 274 | 277 |
context.load(config=user_config_file)
|
| 275 | 278 |
context.artifactdir = artifact_dir
|
| 276 | 279 |
context.set_message_handler(message_handler)
|
| 280 |
+ context.tmpdir
|
|
| 277 | 281 |
|
| 278 | 282 |
# Load the project manually
|
| 279 | 283 |
project = Project(project_dir, context)
|
| ... | ... | @@ -304,12 +308,14 @@ def _test_push_tree(user_config_file, project_dir, artifact_dir, artifact_digest |
| 304 | 308 |
queue.put("No remote configured")
|
| 305 | 309 |
|
| 306 | 310 |
|
| 307 |
-def _test_pull_tree(user_config_file, project_dir, artifact_dir, artifact_digest, queue):
|
|
| 311 |
+def _test_pull_tree(user_config_file, project_dir, artifact_dir, tmpdir,
|
|
| 312 |
+ artifact_digest, queue):
|
|
| 308 | 313 |
# Fake minimal context
|
| 309 | 314 |
context = Context()
|
| 310 | 315 |
context.load(config=user_config_file)
|
| 311 | 316 |
context.artifactdir = artifact_dir
|
| 312 | 317 |
context.set_message_handler(message_handler)
|
| 318 |
+ context.tmpdir = tmpdir
|
|
| 313 | 319 |
|
| 314 | 320 |
# Load the project manually
|
| 315 | 321 |
project = Project(project_dir, context)
|
