Raoul Hidalgo Charman pushed to branch raoul/802-refactor-artifactcache at BuildStream / buildstream
Commits:
-
9182b8f4
by Raoul Hidalgo Charman at 2019-01-02T11:34:46Z
3 changed files:
Changes:
... | ... | @@ -694,15 +694,26 @@ class ArtifactCache(): |
694 | 694 |
#
|
695 | 695 |
# Args:
|
696 | 696 |
# project (Project): The current project
|
697 |
- # digest (Digest): The digest of the tree
|
|
697 |
+ # tree_digest (Digest): The digest of the tree
|
|
698 | 698 |
#
|
699 |
- def pull_tree(self, project, digest):
|
|
699 |
+ def pull_tree(self, project, tree_digest):
|
|
700 | 700 |
for remote in self._remotes[project]:
|
701 |
- digest = self.cas.pull_tree(remote, digest)
|
|
701 |
+ try:
|
|
702 |
+ for blob_digest in remote.yield_tree_digests(tree_digest):
|
|
703 |
+ if self.cas.check_blob(blob_digest):
|
|
704 |
+ continue
|
|
705 |
+ remote.request_blob(blob_digest)
|
|
706 |
+ for blob_file in remote.get_blobs():
|
|
707 |
+ self.cas.add_object(path=blob_file.name, link_directly=True)
|
|
702 | 708 |
|
703 |
- if digest:
|
|
704 |
- # no need to pull from additional remotes
|
|
705 |
- return digest
|
|
709 |
+ # Get the last batch
|
|
710 |
+ for blob_file in remote.get_blobs(request_batch=True):
|
|
711 |
+ self.cas.add_object(path=blob_file.name, link_directly=True)
|
|
712 |
+ |
|
713 |
+ except BlobNotFound:
|
|
714 |
+ continue
|
|
715 |
+ else:
|
|
716 |
+ return tree_digest
|
|
706 | 717 |
|
707 | 718 |
return None
|
708 | 719 |
|
... | ... | @@ -183,29 +183,6 @@ class CASCache(): |
183 | 183 |
|
184 | 184 |
return modified, removed, added
|
185 | 185 |
|
186 |
- # pull_tree():
|
|
187 |
- #
|
|
188 |
- # Pull a single Tree rather than a ref.
|
|
189 |
- # Does not update local refs.
|
|
190 |
- #
|
|
191 |
- # Args:
|
|
192 |
- # remote (CASRemote): The remote to pull from
|
|
193 |
- # digest (Digest): The digest of the tree
|
|
194 |
- #
|
|
195 |
- def pull_tree(self, remote, digest):
|
|
196 |
- try:
|
|
197 |
- remote.init()
|
|
198 |
- |
|
199 |
- digest = self._fetch_tree(remote, digest)
|
|
200 |
- |
|
201 |
- return digest
|
|
202 |
- |
|
203 |
- except grpc.RpcError as e:
|
|
204 |
- if e.code() != grpc.StatusCode.NOT_FOUND:
|
|
205 |
- raise
|
|
206 |
- |
|
207 |
- return None
|
|
208 |
- |
|
209 | 186 |
# link_ref():
|
210 | 187 |
#
|
211 | 188 |
# Add an alias for an existing ref.
|
... | ... | @@ -771,29 +748,6 @@ class CASCache(): |
771 | 748 |
|
772 | 749 |
return objpath
|
773 | 750 |
|
774 |
- def _fetch_tree(self, remote, digest):
|
|
775 |
- # download but do not store the Tree object
|
|
776 |
- with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
|
|
777 |
- remote._fetch_blob(digest, out)
|
|
778 |
- |
|
779 |
- tree = remote_execution_pb2.Tree()
|
|
780 |
- |
|
781 |
- with open(out.name, 'rb') as f:
|
|
782 |
- tree.ParseFromString(f.read())
|
|
783 |
- |
|
784 |
- tree.children.extend([tree.root])
|
|
785 |
- for directory in tree.children:
|
|
786 |
- for filenode in directory.files:
|
|
787 |
- self._ensure_blob(remote, filenode.digest)
|
|
788 |
- |
|
789 |
- # place directory blob only in final location when we've downloaded
|
|
790 |
- # all referenced blobs to avoid dangling references in the repository
|
|
791 |
- dirbuffer = directory.SerializeToString()
|
|
792 |
- dirdigest = self.add_object(buffer=dirbuffer)
|
|
793 |
- assert dirdigest.size_bytes == len(dirbuffer)
|
|
794 |
- |
|
795 |
- return dirdigest
|
|
796 |
- |
|
797 | 751 |
def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
|
798 | 752 |
required_blobs = self._required_blobs(digest)
|
799 | 753 |
|
... | ... | @@ -281,19 +281,29 @@ class CASRemote(): |
281 | 281 |
else:
|
282 | 282 |
return None
|
283 | 283 |
|
284 |
+ def get_tree_blob(self, tree_digest):
|
|
285 |
+ self.init()
|
|
286 |
+ f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
|
|
287 |
+ self._fetch_blob(tree_digest, f)
|
|
288 |
+ |
|
289 |
+ tree = remote_execution_pb2.Tree()
|
|
290 |
+ with open(f.name, 'rb') as tmp:
|
|
291 |
+ tree.ParseFromString(tmp.read())
|
|
292 |
+ |
|
293 |
+ return tree
|
|
294 |
+ |
|
284 | 295 |
# yield_directory_digests():
|
285 | 296 |
#
|
286 | 297 |
# Iterate over blobs digests starting from a root digest
|
287 | 298 |
#
|
288 | 299 |
# Args:
|
289 |
- # root_digest (str): The root_digest to get a tree of
|
|
300 |
+ # root_digest (digest): The root_digest to get a tree of
|
|
290 | 301 |
# progress (callable): The progress callback, if any
|
291 | 302 |
# subdir (str): The optional specific subdir to pull
|
292 | 303 |
# excluded_subdirs (list): The optional list of subdirs to not pull
|
293 | 304 |
#
|
294 | 305 |
# Returns:
|
295 |
- # (iter): True if pull was successful, False if ref was not available
|
|
296 |
- #
|
|
306 |
+ # (iter digests): recursively iterates over digests contained in root directory
|
|
297 | 307 |
def yield_directory_digests(self, root_digest, *, progress=None, subdir=None, excluded_subdirs=[]):
|
298 | 308 |
self.init()
|
299 | 309 |
|
... | ... | @@ -301,6 +311,37 @@ class CASRemote(): |
301 | 311 |
# Fetch artifact, excluded_subdirs determined in pullqueue
|
302 | 312 |
yield from self._yield_directory_digests(root_digest, excluded_subdirs=excluded_subdirs)
|
303 | 313 |
|
314 |
+ # yield_tree_digests():
|
|
315 |
+ #
|
|
316 |
+ # Fetches a tree file from digests and then iterates over child digests
|
|
317 |
+ #
|
|
318 |
+ # Args:
|
|
319 |
+ # tree_digest (digest): tree digest
|
|
320 |
+ #
|
|
321 |
+ # Returns:
|
|
322 |
+ # (iter digests): iterates over digests in tree message
|
|
323 |
+ def yield_tree_digests(self, tree_digest):
|
|
324 |
+ self.init()
|
|
325 |
+ |
|
326 |
+ # get tree file
|
|
327 |
+ f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
|
|
328 |
+ self._fetch_blob(tree_digest, f)
|
|
329 |
+ tree = remote_execution_pb2.Tree()
|
|
330 |
+ tree.ParseFromString(f.read())
|
|
331 |
+ |
|
332 |
+ tree.children.extend([tree.root])
|
|
333 |
+ for directory in tree.children:
|
|
334 |
+ for filenode in directory.files:
|
|
335 |
+ yield filenode.digest
|
|
336 |
+ |
|
337 |
+ # add the directory to downloaded tmp files to be added
|
|
338 |
+ f2 = tempfile.NamedTemporaryFile(dir=self.tmpdir)
|
|
339 |
+ f2.write(directory.SerializeToString())
|
|
340 |
+ self.__tmp_downloads.append(f2)
|
|
341 |
+ |
|
342 |
+ # Add the tree directory to downloads right at the end
|
|
343 |
+ self.__tmp_downloads.append(f)
|
|
344 |
+ |
|
304 | 345 |
# request_blob():
|
305 | 346 |
#
|
306 | 347 |
# Request blob, triggering download depending via bytestream or cas
|