Raoul Hidalgo Charman pushed to branch raoul/802-refactor-artifactcache at BuildStream / buildstream
Commits:
- 
02206638
by Raoul Hidalgo Charman at 2018-12-14T10:21:49Z
 
3 changed files:
Changes:
| ... | ... | @@ -28,7 +28,7 @@ from ._message import Message, MessageType | 
| 28 | 28 | 
 from . import utils
 | 
| 29 | 29 | 
 from . import _yaml
 | 
| 30 | 30 | 
 | 
| 31 | 
-from ._cas.casremote import CASRemote, CASRemoteSpec
 | 
|
| 31 | 
+from ._cas.casremote import BlobNotFound, CASRemote, CASRemoteSpec
 | 
|
| 32 | 32 | 
 | 
| 33 | 33 | 
 | 
| 34 | 34 | 
 CACHE_SIZE_FILE = "cache_size"
 | 
| ... | ... | @@ -634,7 +634,7 @@ class ArtifactCache(): | 
| 634 | 634 | 
     # Returns:
 | 
| 635 | 635 | 
     #   (bool): True if pull was successful, False if artifact was not available
 | 
| 636 | 636 | 
     #
 | 
| 637 | 
-    def pull(self, element, key, *, progress=None, subdir=None, excluded_subdirs=None):
 | 
|
| 637 | 
+    def pull(self, element, key, *, progress=None, subdir=None, excluded_subdirs=[]):
 | 
|
| 638 | 638 | 
         ref = self.get_artifact_fullname(element, key)
 | 
| 639 | 639 | 
 | 
| 640 | 640 | 
         project = element._get_project()
 | 
| ... | ... | @@ -644,13 +644,31 @@ class ArtifactCache(): | 
| 644 | 644 | 
                 display_key = element._get_brief_display_key()
 | 
| 645 | 645 | 
                 element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url))
 | 
| 646 | 646 | 
 | 
| 647 | 
-                if self.cas.pull(ref, remote, progress=progress, subdir=subdir, excluded_subdirs=excluded_subdirs):
 | 
|
| 648 | 
-                    element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
 | 
|
| 647 | 
+                root_digest = remote.get_reference(ref)
 | 
|
| 648 | 
+  | 
|
| 649 | 
+                if root_digest:
 | 
|
| 650 | 
+                    try:
 | 
|
| 651 | 
+                        for blob_digest in remote.yield_blob_digests(
 | 
|
| 652 | 
+                                root_digest, progress=progress, subdir=subdir,
 | 
|
| 653 | 
+                                excluded_subdirs=excluded_subdirs):
 | 
|
| 654 | 
+                            if self.cas.check_blob(blob_digest):
 | 
|
| 655 | 
+                                continue
 | 
|
| 656 | 
+                            remote.request_blob(blob_digest)
 | 
|
| 657 | 
+                            for blob_file in remote.get_blobs():
 | 
|
| 658 | 
+                                self.cas.add_object(path=blob_file.name, link_directly=True)
 | 
|
| 659 | 
+                        self.cas.set_ref(ref, root_digest)
 | 
|
| 660 | 
+                    except BlobNotFound:
 | 
|
| 661 | 
+                        element.info("Remote ({}) is missing blobs for {}".format(
 | 
|
| 662 | 
+                            remote.spec.url, element._get_brief_display_key()))
 | 
|
| 663 | 
+                        continue
 | 
|
| 664 | 
+  | 
|
| 649 | 665 | 
                     if subdir:
 | 
| 650 | 666 | 
                         # Attempt to extract subdir into artifact extract dir if it already exists
 | 
| 651 | 667 | 
                         # without containing the subdir. If the respective artifact extract dir does not
 | 
| 652 | 668 | 
                         # exist a complete extraction will complete.
 | 
| 653 | 669 | 
                         self.extract(element, key, subdir)
 | 
| 670 | 
+  | 
|
| 671 | 
+                    element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
 | 
|
| 654 | 672 | 
                     # no need to pull from additional remotes
 | 
| 655 | 673 | 
                     return True
 | 
| 656 | 674 | 
                 else:
 | 
| ... | ... | @@ -33,7 +33,7 @@ from .._protos.buildstream.v2 import buildstream_pb2 | 
| 33 | 33 | 
 from .. import utils
 | 
| 34 | 34 | 
 from .._exceptions import CASError
 | 
| 35 | 35 | 
 | 
| 36 | 
-from .casremote import BlobNotFound, _CASBatchRead, _CASBatchUpdate
 | 
|
| 36 | 
+from .casremote import _CASBatchUpdate
 | 
|
| 37 | 37 | 
 | 
| 38 | 38 | 
 | 
| 39 | 39 | 
 # A CASCache manages a CAS repository as specified in the Remote Execution API.
 | 
| ... | ... | @@ -183,50 +183,6 @@ class CASCache(): | 
| 183 | 183 | 
 | 
| 184 | 184 | 
         return modified, removed, added
 | 
| 185 | 185 | 
 | 
| 186 | 
-    # pull():
 | 
|
| 187 | 
-    #
 | 
|
| 188 | 
-    # Pull a ref from a remote repository.
 | 
|
| 189 | 
-    #
 | 
|
| 190 | 
-    # Args:
 | 
|
| 191 | 
-    #     ref (str): The ref to pull
 | 
|
| 192 | 
-    #     remote (CASRemote): The remote repository to pull from
 | 
|
| 193 | 
-    #     progress (callable): The progress callback, if any
 | 
|
| 194 | 
-    #     subdir (str): The optional specific subdir to pull
 | 
|
| 195 | 
-    #     excluded_subdirs (list): The optional list of subdirs to not pull
 | 
|
| 196 | 
-    #
 | 
|
| 197 | 
-    # Returns:
 | 
|
| 198 | 
-    #   (bool): True if pull was successful, False if ref was not available
 | 
|
| 199 | 
-    #
 | 
|
| 200 | 
-    def pull(self, ref, remote, *, progress=None, subdir=None, excluded_subdirs=None):
 | 
|
| 201 | 
-        try:
 | 
|
| 202 | 
-            remote.init()
 | 
|
| 203 | 
-  | 
|
| 204 | 
-            request = buildstream_pb2.GetReferenceRequest()
 | 
|
| 205 | 
-            request.key = ref
 | 
|
| 206 | 
-            response = remote.ref_storage.GetReference(request)
 | 
|
| 207 | 
-  | 
|
| 208 | 
-            tree = remote_execution_pb2.Digest()
 | 
|
| 209 | 
-            tree.hash = response.digest.hash
 | 
|
| 210 | 
-            tree.size_bytes = response.digest.size_bytes
 | 
|
| 211 | 
-  | 
|
| 212 | 
-            # Check if the element artifact is present, if so just fetch the subdir.
 | 
|
| 213 | 
-            if subdir and os.path.exists(self.objpath(tree)):
 | 
|
| 214 | 
-                self._fetch_subdir(remote, tree, subdir)
 | 
|
| 215 | 
-            else:
 | 
|
| 216 | 
-                # Fetch artifact, excluded_subdirs determined in pullqueue
 | 
|
| 217 | 
-                self._fetch_directory(remote, tree, excluded_subdirs=excluded_subdirs)
 | 
|
| 218 | 
-  | 
|
| 219 | 
-            self.set_ref(ref, tree)
 | 
|
| 220 | 
-  | 
|
| 221 | 
-            return True
 | 
|
| 222 | 
-        except grpc.RpcError as e:
 | 
|
| 223 | 
-            if e.code() != grpc.StatusCode.NOT_FOUND:
 | 
|
| 224 | 
-                raise CASError("Failed to pull ref {}: {}".format(ref, e)) from e
 | 
|
| 225 | 
-            else:
 | 
|
| 226 | 
-                return False
 | 
|
| 227 | 
-        except BlobNotFound as e:
 | 
|
| 228 | 
-            return False
 | 
|
| 229 | 
-  | 
|
| 230 | 186 | 
     # pull_tree():
 | 
| 231 | 187 | 
     #
 | 
| 232 | 188 | 
     # Pull a single Tree rather than a ref.
 | 
| ... | ... | @@ -591,6 +547,16 @@ class CASCache(): | 
| 591 | 547 | 
         reachable = set()
 | 
| 592 | 548 | 
         self._reachable_refs_dir(reachable, tree, update_mtime=True)
 | 
| 593 | 549 | 
 | 
| 550 | 
+    # Check to see if a blob is in the local CAS
 | 
|
| 551 | 
+    # return None if not
 | 
|
| 552 | 
+    def check_blob(self, digest):
 | 
|
| 553 | 
+        objpath = self.objpath(digest)
 | 
|
| 554 | 
+        if os.path.exists(objpath):
 | 
|
| 555 | 
+            # already in local repository
 | 
|
| 556 | 
+            return objpath
 | 
|
| 557 | 
+        else:
 | 
|
| 558 | 
+            return None
 | 
|
| 559 | 
+  | 
|
| 594 | 560 | 
     ################################################
 | 
| 595 | 561 | 
     #             Local Private Methods            #
 | 
| 596 | 562 | 
     ################################################
 | 
| ... | ... | @@ -805,103 +771,6 @@ class CASCache(): | 
| 805 | 771 | 
 | 
| 806 | 772 | 
         return objpath
 | 
| 807 | 773 | 
 | 
| 808 | 
-    def _batch_download_complete(self, batch):
 | 
|
| 809 | 
-        for digest, data in batch.send():
 | 
|
| 810 | 
-            with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
 | 
|
| 811 | 
-                f.write(data)
 | 
|
| 812 | 
-                f.flush()
 | 
|
| 813 | 
-  | 
|
| 814 | 
-                added_digest = self.add_object(path=f.name, link_directly=True)
 | 
|
| 815 | 
-                assert added_digest.hash == digest.hash
 | 
|
| 816 | 
-  | 
|
| 817 | 
-    # Helper function for _fetch_directory().
 | 
|
| 818 | 
-    def _fetch_directory_batch(self, remote, batch, fetch_queue, fetch_next_queue):
 | 
|
| 819 | 
-        self._batch_download_complete(batch)
 | 
|
| 820 | 
-  | 
|
| 821 | 
-        # All previously scheduled directories are now locally available,
 | 
|
| 822 | 
-        # move them to the processing queue.
 | 
|
| 823 | 
-        fetch_queue.extend(fetch_next_queue)
 | 
|
| 824 | 
-        fetch_next_queue.clear()
 | 
|
| 825 | 
-        return _CASBatchRead(remote)
 | 
|
| 826 | 
-  | 
|
| 827 | 
-    # Helper function for _fetch_directory().
 | 
|
| 828 | 
-    def _fetch_directory_node(self, remote, digest, batch, fetch_queue, fetch_next_queue, *, recursive=False):
 | 
|
| 829 | 
-        in_local_cache = os.path.exists(self.objpath(digest))
 | 
|
| 830 | 
-  | 
|
| 831 | 
-        if in_local_cache:
 | 
|
| 832 | 
-            # Skip download, already in local cache.
 | 
|
| 833 | 
-            pass
 | 
|
| 834 | 
-        elif (digest.size_bytes >= remote.max_batch_total_size_bytes or
 | 
|
| 835 | 
-              not remote.batch_read_supported):
 | 
|
| 836 | 
-            # Too large for batch request, download in independent request.
 | 
|
| 837 | 
-            self._ensure_blob(remote, digest)
 | 
|
| 838 | 
-            in_local_cache = True
 | 
|
| 839 | 
-        else:
 | 
|
| 840 | 
-            if not batch.add(digest):
 | 
|
| 841 | 
-                # Not enough space left in batch request.
 | 
|
| 842 | 
-                # Complete pending batch first.
 | 
|
| 843 | 
-                batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
 | 
|
| 844 | 
-                batch.add(digest)
 | 
|
| 845 | 
-  | 
|
| 846 | 
-        if recursive:
 | 
|
| 847 | 
-            if in_local_cache:
 | 
|
| 848 | 
-                # Add directory to processing queue.
 | 
|
| 849 | 
-                fetch_queue.append(digest)
 | 
|
| 850 | 
-            else:
 | 
|
| 851 | 
-                # Directory will be available after completing pending batch.
 | 
|
| 852 | 
-                # Add directory to deferred processing queue.
 | 
|
| 853 | 
-                fetch_next_queue.append(digest)
 | 
|
| 854 | 
-  | 
|
| 855 | 
-        return batch
 | 
|
| 856 | 
-  | 
|
| 857 | 
-    # _fetch_directory():
 | 
|
| 858 | 
-    #
 | 
|
| 859 | 
-    # Fetches remote directory and adds it to content addressable store.
 | 
|
| 860 | 
-    #
 | 
|
| 861 | 
-    # Fetches files, symbolic links and recursively other directories in
 | 
|
| 862 | 
-    # the remote directory and adds them to the content addressable
 | 
|
| 863 | 
-    # store.
 | 
|
| 864 | 
-    #
 | 
|
| 865 | 
-    # Args:
 | 
|
| 866 | 
-    #     remote (Remote): The remote to use.
 | 
|
| 867 | 
-    #     dir_digest (Digest): Digest object for the directory to fetch.
 | 
|
| 868 | 
-    #     excluded_subdirs (list): The optional list of subdirs to not fetch
 | 
|
| 869 | 
-    #
 | 
|
| 870 | 
-    def _fetch_directory(self, remote, dir_digest, *, excluded_subdirs=None):
 | 
|
| 871 | 
-        fetch_queue = [dir_digest]
 | 
|
| 872 | 
-        fetch_next_queue = []
 | 
|
| 873 | 
-        batch = _CASBatchRead(remote)
 | 
|
| 874 | 
-        if not excluded_subdirs:
 | 
|
| 875 | 
-            excluded_subdirs = []
 | 
|
| 876 | 
-  | 
|
| 877 | 
-        while len(fetch_queue) + len(fetch_next_queue) > 0:
 | 
|
| 878 | 
-            if not fetch_queue:
 | 
|
| 879 | 
-                batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
 | 
|
| 880 | 
-  | 
|
| 881 | 
-            dir_digest = fetch_queue.pop(0)
 | 
|
| 882 | 
-  | 
|
| 883 | 
-            objpath = self._ensure_blob(remote, dir_digest)
 | 
|
| 884 | 
-  | 
|
| 885 | 
-            directory = remote_execution_pb2.Directory()
 | 
|
| 886 | 
-            with open(objpath, 'rb') as f:
 | 
|
| 887 | 
-                directory.ParseFromString(f.read())
 | 
|
| 888 | 
-  | 
|
| 889 | 
-            for dirnode in directory.directories:
 | 
|
| 890 | 
-                if dirnode.name not in excluded_subdirs:
 | 
|
| 891 | 
-                    batch = self._fetch_directory_node(remote, dirnode.digest, batch,
 | 
|
| 892 | 
-                                                       fetch_queue, fetch_next_queue, recursive=True)
 | 
|
| 893 | 
-  | 
|
| 894 | 
-            for filenode in directory.files:
 | 
|
| 895 | 
-                batch = self._fetch_directory_node(remote, filenode.digest, batch,
 | 
|
| 896 | 
-                                                   fetch_queue, fetch_next_queue)
 | 
|
| 897 | 
-  | 
|
| 898 | 
-        # Fetch final batch
 | 
|
| 899 | 
-        self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
 | 
|
| 900 | 
-  | 
|
| 901 | 
-    def _fetch_subdir(self, remote, tree, subdir):
 | 
|
| 902 | 
-        subdirdigest = self._get_subdir(tree, subdir)
 | 
|
| 903 | 
-        self._fetch_directory(remote, subdirdigest)
 | 
|
| 904 | 
-  | 
|
| 905 | 774 | 
     def _fetch_tree(self, remote, digest):
 | 
| 906 | 775 | 
         # download but do not store the Tree object
 | 
| 907 | 776 | 
         with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
 | 
| ... | ... | @@ -3,6 +3,7 @@ import io | 
| 3 | 3 | 
 import os
 | 
| 4 | 4 | 
 import multiprocessing
 | 
| 5 | 5 | 
 import signal
 | 
| 6 | 
+import tempfile
 | 
|
| 6 | 7 | 
 from urllib.parse import urlparse
 | 
| 7 | 8 | 
 import uuid
 | 
| 8 | 9 | 
 | 
| ... | ... | @@ -89,6 +90,13 @@ class CASRemote(): | 
| 89 | 90 | 
         self.capabilities = None
 | 
| 90 | 91 | 
         self.max_batch_total_size_bytes = None
 | 
| 91 | 92 | 
 | 
| 93 | 
+        # TODO tidy this
 | 
|
| 94 | 
+        self.tmpdir = os.path.join(os.environ['XDG_CACHE_HOME'],
 | 
|
| 95 | 
+                                   'buildstream/artifacts/tmp')
 | 
|
| 96 | 
+        os.makedirs(self.tmpdir, exist_ok=True)
 | 
|
| 97 | 
+  | 
|
| 98 | 
+        self.__tmp_downloads = []  # files in the tmpdir waiting to be added to local caches
 | 
|
| 99 | 
+  | 
|
| 92 | 100 | 
     def init(self):
 | 
| 93 | 101 | 
         if not self._initialized:
 | 
| 94 | 102 | 
             url = urlparse(self.spec.url)
 | 
| ... | ... | @@ -252,6 +260,70 @@ class CASRemote(): | 
| 252 | 260 | 
 | 
| 253 | 261 | 
         return message_digest
 | 
| 254 | 262 | 
 | 
| 263 | 
+    # get_reference():
 | 
|
| 264 | 
+    #
 | 
|
| 265 | 
+    # Args:
 | 
|
| 266 | 
+    #    ref (str): The ref to request
 | 
|
| 267 | 
+    #
 | 
|
| 268 | 
+    def get_reference(self, ref):
 | 
|
| 269 | 
+        try:
 | 
|
| 270 | 
+            self.init()
 | 
|
| 271 | 
+  | 
|
| 272 | 
+            request = buildstream_pb2.GetReferenceRequest()
 | 
|
| 273 | 
+            request.key = ref
 | 
|
| 274 | 
+            return self.ref_storage.GetReference(request).digest
 | 
|
| 275 | 
+        except grpc.RpcError as e:
 | 
|
| 276 | 
+            if e.code() != grpc.StatusCode.NOT_FOUND:
 | 
|
| 277 | 
+                raise CASError("Failed to find ref {}: {}".format(ref, e)) from e
 | 
|
| 278 | 
+            else:
 | 
|
| 279 | 
+                return None
 | 
|
| 280 | 
+  | 
|
| 281 | 
+    # yield_blob_digests():
 | 
|
| 282 | 
+    #
 | 
|
| 283 | 
+    # Iterate over blobs digests from a reference
 | 
|
| 284 | 
+    #
 | 
|
| 285 | 
+    # Args:
 | 
|
| 286 | 
+    #     root_digest (str): The root_digest to get a tree of
 | 
|
| 287 | 
+    #     progress (callable): The progress callback, if any
 | 
|
| 288 | 
+    #     subdir (str): The optional specific subdir to pull
 | 
|
| 289 | 
+    #     excluded_subdirs (list): The optional list of subdirs to not pull
 | 
|
| 290 | 
+    #
 | 
|
| 291 | 
+    # Returns:
 | 
|
| 292 | 
+    #   (iter): True if pull was successful, False if ref was not available
 | 
|
| 293 | 
+    #
 | 
|
| 294 | 
+    def yield_blob_digests(self, root_digest, *, progress=None, subdir=None, excluded_subdirs=[]):
 | 
|
| 295 | 
+        self.init()
 | 
|
| 296 | 
+  | 
|
| 297 | 
+        # TODO add subdir stuff
 | 
|
| 298 | 
+        # Fetch artifact, excluded_subdirs determined in pullqueue
 | 
|
| 299 | 
+        yield from self._yield_directory_digests(root_digest, excluded_subdirs=excluded_subdirs)
 | 
|
| 300 | 
+  | 
|
| 301 | 
+    # request_blob():
 | 
|
| 302 | 
+    #
 | 
|
| 303 | 
+    # Request blob and returns path to tmpdir location
 | 
|
| 304 | 
+    #
 | 
|
| 305 | 
+    # Args:
 | 
|
| 306 | 
+    #    digest (Digest): digest of the requested blob
 | 
|
| 307 | 
+    #    path (str): tmpdir locations of downloaded blobs
 | 
|
| 308 | 
+    #
 | 
|
| 309 | 
+    def request_blob(self, digest):
 | 
|
| 310 | 
+        # TODO expand for adding to batches some other logic
 | 
|
| 311 | 
+        f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
 | 
|
| 312 | 
+        self._fetch_blob(digest, f)
 | 
|
| 313 | 
+        self.__tmp_downloads.append(f)
 | 
|
| 314 | 
+        return f.name
 | 
|
| 315 | 
+  | 
|
| 316 | 
+    # get_blobs():
 | 
|
| 317 | 
+    #
 | 
|
| 318 | 
+    # Yield over downloaded blobs in the tmp file locations, causing the files
 | 
|
| 319 | 
+    # to be deleted once they go out of scope.
 | 
|
| 320 | 
+    #
 | 
|
| 321 | 
+    # Returns:
 | 
|
| 322 | 
+    #    iterator over NamedTemporaryFile
 | 
|
| 323 | 
+    def get_blobs(self):
 | 
|
| 324 | 
+        while self.__tmp_downloads:
 | 
|
| 325 | 
+            yield self.__tmp_downloads.pop()
 | 
|
| 326 | 
+  | 
|
| 255 | 327 | 
     ################################################
 | 
| 256 | 328 | 
     #             Local Private Methods            #
 | 
| 257 | 329 | 
     ################################################
 | 
| ... | ... | @@ -266,6 +338,36 @@ class CASRemote(): | 
| 266 | 338 | 
 | 
| 267 | 339 | 
         assert digest.size_bytes == os.fstat(stream.fileno()).st_size
 | 
| 268 | 340 | 
 | 
| 341 | 
+    # _yield_directory_digests():
 | 
|
| 342 | 
+    #
 | 
|
| 343 | 
+    # Fetches remote directory and adds it to content addressable store.
 | 
|
| 344 | 
+    #
 | 
|
| 345 | 
+    # Fetches files, symbolic links and recursively other directories in
 | 
|
| 346 | 
+    # the remote directory and adds them to the content addressable
 | 
|
| 347 | 
+    # store.
 | 
|
| 348 | 
+    #
 | 
|
| 349 | 
+    # Args:
 | 
|
| 350 | 
+    #     dir_digest (Digest): Digest object for the directory to fetch.
 | 
|
| 351 | 
+    #     excluded_subdirs (list): The optional list of subdirs to not fetch
 | 
|
| 352 | 
+    #
 | 
|
| 353 | 
+    def _yield_directory_digests(self, dir_digest, *, excluded_subdirs=[]):
 | 
|
| 354 | 
+  | 
|
| 355 | 
+        objpath = self.request_blob(dir_digest)
 | 
|
| 356 | 
+  | 
|
| 357 | 
+        directory = remote_execution_pb2.Directory()
 | 
|
| 358 | 
+  | 
|
| 359 | 
+        with open(objpath, 'rb') as f:
 | 
|
| 360 | 
+            directory.ParseFromString(f.read())
 | 
|
| 361 | 
+  | 
|
| 362 | 
+        yield dir_digest
 | 
|
| 363 | 
+        for filenode in directory.files:
 | 
|
| 364 | 
+            yield filenode.digest
 | 
|
| 365 | 
+  | 
|
| 366 | 
+        for dirnode in directory.directories:
 | 
|
| 367 | 
+            if dirnode.name not in excluded_subdirs:
 | 
|
| 368 | 
+                yield dirnode.digest
 | 
|
| 369 | 
+                yield from self._yield_directory_digests(dirnode.digest)
 | 
|
| 370 | 
+  | 
|
| 269 | 371 | 
     def _send_blob(self, digest, stream, u_uid=uuid.uuid4()):
 | 
| 270 | 372 | 
         resource_name = '/'.join(['uploads', str(u_uid), 'blobs',
 | 
| 271 | 373 | 
                                   digest.hash, str(digest.size_bytes)])
 | 
