[Notes] [Git][BuildStream/buildstream][tiagogomes/issue-573] 17 commits: source/install_source.rst: pip plugin depends on host pip



Title: GitLab

Tiago Gomes pushed to branch tiagogomes/issue-573 at BuildStream / buildstream

Commits:

14 changed files:

Changes:

  • README.rst
    ... ... @@ -13,6 +13,9 @@ About
    13 13
     .. image:: https://gitlab.com/BuildStream/buildstream/badges/master/coverage.svg?job=coverage
    
    14 14
        :target: https://gitlab.com/BuildStream/buildstream/commits/master
    
    15 15
     
    
    16
    +.. image:: https://img.shields.io/pypi/v/BuildStream.svg
    
    17
    +   :target: https://pypi.org/project/BuildStream
    
    18
    +
    
    16 19
     
    
    17 20
     What is BuildStream?
    
    18 21
     ====================
    

  • buildstream/_artifactcache/artifactcache.py
    1 1
     #
    
    2
    -#  Copyright (C) 2017-2018 Codethink Limited
    
    2
    +#  Copyright (C) 2018 Codethink Limited
    
    3 3
     #
    
    4 4
     #  This program is free software; you can redistribute it and/or
    
    5 5
     #  modify it under the terms of the GNU Lesser General Public
    
    ... ... @@ -16,6 +16,7 @@
    16 16
     #
    
    17 17
     #  Authors:
    
    18 18
     #        Tristan Maat <tristan maat codethink co uk>
    
    19
    +#        Tiago Gomes <tiago gomes codethink co uk>
    
    19 20
     
    
    20 21
     import os
    
    21 22
     import string
    
    ... ... @@ -88,7 +89,7 @@ class ArtifactCache():
    88 89
             self.project_remote_specs = {}
    
    89 90
     
    
    90 91
             self._required_artifacts = set()      # The artifacts required for this session
    
    91
    -        self._cache_size = None               # The current cache size, sometimes it's an estimate
    
    92
    +        self._cache_size = None               # The current cache size
    
    92 93
             self._cache_quota = None              # The cache quota
    
    93 94
             self._cache_lower_threshold = None    # The target cache size for a cleanup
    
    94 95
     
    
    ... ... @@ -226,13 +227,13 @@ class ArtifactCache():
    226 227
         # Clean the artifact cache as much as possible.
    
    227 228
         #
    
    228 229
         # Returns:
    
    229
    -    #    (int): The size of the cache after having cleaned up
    
    230
    +    #     (int): Amount of bytes cleaned from the cache.
    
    230 231
         #
    
    231 232
         def clean(self):
    
    232 233
             artifacts = self.list_artifacts()
    
    233 234
     
    
    234 235
             # Do a real computation of the cache size once, just in case
    
    235
    -        self.compute_cache_size()
    
    236
    +        old_cache_size = self._cache_size = self.calculate_cache_size()
    
    236 237
     
    
    237 238
             while self.get_cache_size() >= self._cache_lower_threshold:
    
    238 239
                 try:
    
    ... ... @@ -248,7 +249,7 @@ class ArtifactCache():
    248 249
                               "Please increase the cache-quota in {}."
    
    249 250
                               .format(self.context.config_origin or default_conf))
    
    250 251
     
    
    251
    -                if self.get_quota_exceeded():
    
    252
    +                if self.has_quota_exceeded():
    
    252 253
                         raise ArtifactError("Cache too full. Aborting.",
    
    253 254
                                             detail=detail,
    
    254 255
                                             reason="cache-too-full")
    
    ... ... @@ -260,89 +261,65 @@ class ArtifactCache():
    260 261
     
    
    261 262
                     # Remove the actual artifact, if it's not required.
    
    262 263
                     size = self.remove(to_remove)
    
    264
    +                self._cache_size -= size
    
    265
    +                self._message(MessageType.DEBUG,
    
    266
    +                              "Removed artifact {} ({})".format(
    
    267
    +                                  to_remove[:-(len(key) - self.context.log_key_length)],
    
    268
    +                                  utils._pretty_size(size)))
    
    263 269
     
    
    264
    -                # Remove the size from the removed size
    
    265
    -                self.set_cache_size(self._cache_size - size)
    
    270
    +        self._message(MessageType.INFO,
    
    271
    +                      "New artifact cache size: {}".format(
    
    272
    +                          utils._pretty_size(self._cache_size)))
    
    266 273
     
    
    267
    -        # This should be O(1) if implemented correctly
    
    268
    -        return self.get_cache_size()
    
    274
    +        return old_cache_size - self._cache_size
    
    269 275
     
    
    270
    -    # compute_cache_size()
    
    276
    +    # add_artifact_size()
    
    271 277
         #
    
    272
    -    # Computes the real artifact cache size by calling
    
    273
    -    # the abstract calculate_cache_size() method.
    
    278
    +    # Adds given artifact size to the cache size
    
    274 279
         #
    
    275
    -    # Returns:
    
    276
    -    #    (int): The size of the artifact cache.
    
    280
    +    # Args:
    
    281
    +    #     artifact_size (int): The artifact size to add.
    
    277 282
         #
    
    278
    -    def compute_cache_size(self):
    
    279
    -        self._cache_size = self.calculate_cache_size()
    
    283
    +    def add_artifact_size(self, artifact_size):
    
    284
    +        assert utils._is_main_process()
    
    280 285
     
    
    281
    -        return self._cache_size
    
    286
    +        self._cache_size = self.get_cache_size() + artifact_size
    
    287
    +        self._write_cache_size(self._cache_size)
    
    282 288
     
    
    283
    -    # add_artifact_size()
    
    289
    +    # subtract_artifact_size()
    
    284 290
         #
    
    285
    -    # Adds the reported size of a newly cached artifact to the
    
    286
    -    # overall estimated size.
    
    291
    +    # Subtracts given artifact size from the cache size
    
    287 292
         #
    
    288 293
         # Args:
    
    289
    -    #     artifact_size (int): The size to add.
    
    294
    +    #     artifact_size (int): The artifact size to subtract.
    
    290 295
         #
    
    291
    -    def add_artifact_size(self, artifact_size):
    
    292
    -        cache_size = self.get_cache_size()
    
    293
    -        cache_size += artifact_size
    
    294
    -
    
    295
    -        self.set_cache_size(cache_size)
    
    296
    +    def subtract_artifact_size(self, artifact_size):
    
    297
    +        self.add_artifact_size(artifact_size * -1)
    
    296 298
     
    
    297 299
         # get_cache_size()
    
    298 300
         #
    
    299
    -    # Fetches the cached size of the cache, this is sometimes
    
    300
    -    # an estimate and periodically adjusted to the real size
    
    301
    -    # when a cache size calculation job runs.
    
    302
    -    #
    
    303
    -    # When it is an estimate, the value is either correct, or
    
    304
    -    # it is greater than the actual cache size.
    
    301
    +    # Returns the size of the artifact cache.
    
    305 302
         #
    
    306 303
         # Returns:
    
    307
    -    #     (int) An approximation of the artifact cache size.
    
    304
    +    #     (int): The size of the artifact cache.
    
    308 305
         #
    
    309 306
         def get_cache_size(self):
    
    307
    +        if self._cache_size is None:
    
    308
    +            self._cache_size = self._read_cache_size()
    
    310 309
     
    
    311
    -        # If we don't currently have an estimate, figure out the real cache size.
    
    312 310
             if self._cache_size is None:
    
    313
    -            stored_size = self._read_cache_size()
    
    314
    -            if stored_size is not None:
    
    315
    -                self._cache_size = stored_size
    
    316
    -            else:
    
    317
    -                self.compute_cache_size()
    
    311
    +            self._cache_size = self.calculate_cache_size()
    
    318 312
     
    
    319 313
             return self._cache_size
    
    320 314
     
    
    321
    -    # set_cache_size()
    
    322
    -    #
    
    323
    -    # Forcefully set the overall cache size.
    
    324
    -    #
    
    325
    -    # This is used to update the size in the main process after
    
    326
    -    # having calculated in a cleanup or a cache size calculation job.
    
    327
    -    #
    
    328
    -    # Args:
    
    329
    -    #     cache_size (int): The size to set.
    
    330
    -    #
    
    331
    -    def set_cache_size(self, cache_size):
    
    332
    -
    
    333
    -        assert cache_size is not None
    
    334
    -
    
    335
    -        self._cache_size = cache_size
    
    336
    -        self._write_cache_size(self._cache_size)
    
    337
    -
    
    338
    -    # get_quota_exceeded()
    
    315
    +    # has_quota_exceeded()
    
    339 316
         #
    
    340 317
         # Checks if the current artifact cache size exceeds the quota.
    
    341 318
         #
    
    342 319
         # Returns:
    
    343 320
         #    (bool): True of the quota is exceeded
    
    344 321
         #
    
    345
    -    def get_quota_exceeded(self):
    
    322
    +    def has_quota_exceeded(self):
    
    346 323
             return self.get_cache_size() > self._cache_quota
    
    347 324
     
    
    348 325
         ################################################
    
    ... ... @@ -441,6 +418,10 @@ class ArtifactCache():
    441 418
         #     content (str): The element's content directory
    
    442 419
         #     keys (list): The cache keys to use
    
    443 420
         #
    
    421
    +    # Returns:
    
    422
    +    #     (int): Disk size overhead in bytes required to cache the
    
    423
    +    #            artifact
    
    424
    +    #
    
    444 425
         def commit(self, element, content, keys):
    
    445 426
             raise ImplError("Cache '{kind}' does not implement commit()"
    
    446 427
                             .format(kind=type(self).__name__))
    
    ... ... @@ -512,8 +493,9 @@ class ArtifactCache():
    512 493
         #     progress (callable): The progress callback, if any
    
    513 494
         #
    
    514 495
         # Returns:
    
    515
    -    #   (bool): True if pull was successful, False if artifact was not available
    
    516
    -    #
    
    496
    +    #     (bool): True if pull was successful, False if artifact was not available
    
    497
    +    #     (int): Disk size overhead in bytes required to cache the
    
    498
    +    #            artifact
    
    517 499
         def pull(self, element, key, *, progress=None):
    
    518 500
             raise ImplError("Cache '{kind}' does not implement pull()"
    
    519 501
                             .format(kind=type(self).__name__))
    

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -16,6 +16,7 @@
    16 16
     #
    
    17 17
     #  Authors:
    
    18 18
     #        Jürg Billeter <juerg billeter codethink co uk>
    
    19
    +#        Tiago Gomes <tiago gomes codethink co uk>
    
    19 20
     
    
    20 21
     import hashlib
    
    21 22
     import itertools
    
    ... ... @@ -117,11 +118,13 @@ class CASCache(ArtifactCache):
    117 118
         def commit(self, element, content, keys):
    
    118 119
             refs = [self.get_artifact_fullname(element, key) for key in keys]
    
    119 120
     
    
    120
    -        tree = self._create_tree(content)
    
    121
    +        tree, size = self._commit_directory(content)
    
    121 122
     
    
    122 123
             for ref in refs:
    
    123 124
                 self.set_ref(ref, tree)
    
    124 125
     
    
    126
    +        return size
    
    127
    +
    
    125 128
         def diff(self, element, key_a, key_b, *, subdir=None):
    
    126 129
             ref_a = self.get_artifact_fullname(element, key_a)
    
    127 130
             ref_b = self.get_artifact_fullname(element, key_b)
    
    ... ... @@ -239,12 +242,12 @@ class CASCache(ArtifactCache):
    239 242
                     tree.hash = response.digest.hash
    
    240 243
                     tree.size_bytes = response.digest.size_bytes
    
    241 244
     
    
    242
    -                self._fetch_directory(remote, tree)
    
    245
    +                size = self._fetch_directory(remote, tree)
    
    243 246
     
    
    244 247
                     self.set_ref(ref, tree)
    
    245 248
     
    
    246 249
                     # no need to pull from additional remotes
    
    247
    -                return True
    
    250
    +                return True, size
    
    248 251
     
    
    249 252
                 except grpc.RpcError as e:
    
    250 253
                     if e.code() != grpc.StatusCode.NOT_FOUND:
    
    ... ... @@ -258,7 +261,7 @@ class CASCache(ArtifactCache):
    258 261
                                 remote.spec.url, element._get_brief_display_key())
    
    259 262
                         ))
    
    260 263
     
    
    261
    -        return False
    
    264
    +        return False, 0
    
    262 265
     
    
    263 266
         def pull_tree(self, project, digest):
    
    264 267
             """ Pull a single Tree rather than an artifact.
    
    ... ... @@ -437,6 +440,7 @@ class CASCache(ArtifactCache):
    437 440
         #
    
    438 441
         # Returns:
    
    439 442
         #     (Digest): The digest of the added object
    
    443
    +    #     (int): The amount of bytes required to store the object
    
    440 444
         #
    
    441 445
         # Either `path` or `buffer` must be passed, but not both.
    
    442 446
         #
    
    ... ... @@ -465,22 +469,38 @@ class CASCache(ArtifactCache):
    465 469
     
    
    466 470
                     out.flush()
    
    467 471
     
    
    472
    +                file_size = os.fstat(out.fileno()).st_size
    
    473
    +
    
    468 474
                     digest.hash = h.hexdigest()
    
    469
    -                digest.size_bytes = os.fstat(out.fileno()).st_size
    
    475
    +                digest.size_bytes = file_size
    
    470 476
     
    
    471 477
                     # Place file at final location
    
    472 478
                     objpath = self.objpath(digest)
    
    473
    -                os.makedirs(os.path.dirname(objpath), exist_ok=True)
    
    479
    +                dirpath = os.path.dirname(objpath)
    
    480
    +
    
    481
    +                # Track the increased size on the parent directory caused by
    
    482
    +                # adding a new entry, as these directories can contain a large
    
    483
    +                # number of files.
    
    484
    +                new_dir_size = 0
    
    485
    +                old_dir_size = 0
    
    486
    +                try:
    
    487
    +                    os.makedirs(dirpath)
    
    488
    +                except FileExistsError:
    
    489
    +                    old_dir_size = os.stat(dirpath).st_size
    
    490
    +                else:
    
    491
    +                    new_dir_size = os.stat(dirpath).st_size
    
    492
    +
    
    474 493
                     os.link(out.name, objpath)
    
    494
    +                new_dir_size = os.stat(dirpath).st_size - old_dir_size
    
    475 495
     
    
    476 496
             except FileExistsError as e:
    
    477 497
                 # We can ignore the failed link() if the object is already in the repo.
    
    478
    -            pass
    
    498
    +            file_size = 0
    
    479 499
     
    
    480 500
             except OSError as e:
    
    481 501
                 raise ArtifactError("Failed to hash object: {}".format(e)) from e
    
    482 502
     
    
    483
    -        return digest
    
    503
    +        return digest, file_size + new_dir_size
    
    484 504
     
    
    485 505
         # set_ref():
    
    486 506
         #
    
    ... ... @@ -489,6 +509,8 @@ class CASCache(ArtifactCache):
    489 509
         # Args:
    
    490 510
         #     ref (str): The name of the ref
    
    491 511
         #
    
    512
    +    # Note: as setting a ref has very low disk size overhead, don't
    
    513
    +    # bother to track this.
    
    492 514
         def set_ref(self, ref, tree):
    
    493 515
             refpath = self._refpath(ref)
    
    494 516
             os.makedirs(os.path.dirname(refpath), exist_ok=True)
    
    ... ... @@ -665,7 +687,23 @@ class CASCache(ArtifactCache):
    665 687
         def _refpath(self, ref):
    
    666 688
             return os.path.join(self.casdir, 'refs', 'heads', ref)
    
    667 689
     
    
    668
    -    def _create_tree(self, path, *, digest=None):
    
    690
    +    # _commit_directory():
    
    691
    +    #
    
    692
    +    # Adds contents of the given directory to content addressable store.
    
    693
    +    #
    
    694
    +    # Adds files, symbolic links and recursively other directories in
    
    695
    +    # the given path to the content addressable store.
    
    696
    +    #
    
    697
    +    # Args:
    
    698
    +    #     path (str): Path to the directory to add.
    
    699
    +    #     dir_digest (Digest): An optional Digest object to use.
    
    700
    +    #
    
    701
    +    # Returns:
    
    702
    +    #     (Digest): Digest object for the Directory object on the given
    
    703
    +    #               path.
    
    704
    +    #
    
    705
    +    def _commit_directory(self, path, *, dir_digest=None):
    
    706
    +        size = 0
    
    669 707
             directory = remote_execution_pb2.Directory()
    
    670 708
     
    
    671 709
             for name in sorted(os.listdir(path)):
    
    ... ... @@ -674,11 +712,11 @@ class CASCache(ArtifactCache):
    674 712
                 if stat.S_ISDIR(mode):
    
    675 713
                     dirnode = directory.directories.add()
    
    676 714
                     dirnode.name = name
    
    677
    -                self._create_tree(full_path, digest=dirnode.digest)
    
    715
    +                size += self._commit_directory(full_path, dir_digest=dirnode.digest)[1]
    
    678 716
                 elif stat.S_ISREG(mode):
    
    679 717
                     filenode = directory.files.add()
    
    680 718
                     filenode.name = name
    
    681
    -                self.add_object(path=full_path, digest=filenode.digest)
    
    719
    +                size += self.add_object(path=full_path, digest=filenode.digest)[1]
    
    682 720
                     filenode.is_executable = (mode & stat.S_IXUSR) == stat.S_IXUSR
    
    683 721
                 elif stat.S_ISLNK(mode):
    
    684 722
                     symlinknode = directory.symlinks.add()
    
    ... ... @@ -687,7 +725,10 @@ class CASCache(ArtifactCache):
    687 725
                 else:
    
    688 726
                     raise ArtifactError("Unsupported file type for {}".format(full_path))
    
    689 727
     
    
    690
    -        return self.add_object(digest=digest, buffer=directory.SerializeToString())
    
    728
    +        dir_digest, dir_object_size = self.add_object(
    
    729
    +            digest=dir_digest, buffer=directory.SerializeToString())
    
    730
    +
    
    731
    +        return dir_digest, size + dir_object_size
    
    691 732
     
    
    692 733
         def _get_subdir(self, tree, subdir):
    
    693 734
             head, name = os.path.split(subdir)
    
    ... ... @@ -830,14 +871,27 @@ class CASCache(ArtifactCache):
    830 871
     
    
    831 872
             assert digest.size_bytes == os.fstat(stream.fileno()).st_size
    
    832 873
     
    
    833
    -    def _fetch_directory(self, remote, tree):
    
    834
    -        objpath = self.objpath(tree)
    
    874
    +    # _fetch_directory():
    
    875
    +    #
    
    876
    +    # Fetches a given directory to content addressable store.
    
    877
    +    #
    
    878
    +    # Fetches files, symbolic links and recursively other directories in
    
    879
    +    # the remote directory to the content addressable store.
    
    880
    +    #
    
    881
    +    # Args:
    
    882
    +    #     remote (Remote): The remote to use.
    
    883
    +    #     dir_digest (Digest): Digest object for the Directory object to
    
    884
    +    #                          fetch.
    
    885
    +    #
    
    886
    +    def _fetch_directory(self, remote, dir_digest):
    
    887
    +        size = 0
    
    888
    +        objpath = self.objpath(dir_digest)
    
    835 889
             if os.path.exists(objpath):
    
    836 890
                 # already in local cache
    
    837
    -            return
    
    891
    +            return 0
    
    838 892
     
    
    839 893
             with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
    
    840
    -            self._fetch_blob(remote, tree, out)
    
    894
    +            self._fetch_blob(remote, dir_digest, out)
    
    841 895
     
    
    842 896
                 directory = remote_execution_pb2.Directory()
    
    843 897
     
    
    ... ... @@ -845,7 +899,7 @@ class CASCache(ArtifactCache):
    845 899
                     directory.ParseFromString(f.read())
    
    846 900
     
    
    847 901
                 for filenode in directory.files:
    
    848
    -                fileobjpath = self.objpath(tree)
    
    902
    +                fileobjpath = self.objpath(filenode.digest)
    
    849 903
                     if os.path.exists(fileobjpath):
    
    850 904
                         # already in local cache
    
    851 905
                         continue
    
    ... ... @@ -853,16 +907,23 @@ class CASCache(ArtifactCache):
    853 907
                     with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
    
    854 908
                         self._fetch_blob(remote, filenode.digest, f)
    
    855 909
     
    
    856
    -                    digest = self.add_object(path=f.name)
    
    910
    +                    digest, obj_size = self.add_object(path=f.name)
    
    911
    +                    size += obj_size
    
    857 912
                         assert digest.hash == filenode.digest.hash
    
    858 913
     
    
    859 914
                 for dirnode in directory.directories:
    
    860
    -                self._fetch_directory(remote, dirnode.digest)
    
    915
    +                size += self._fetch_directory(remote, dirnode.digest)
    
    916
    +
    
    917
    +            # Place directory blob only in final location when we've
    
    918
    +            # downloaded all referenced blobs to avoid dangling
    
    919
    +            # references in the repository.
    
    920
    +            digest, obj_size = self.add_object(path=out.name)
    
    921
    +
    
    922
    +            assert digest.hash == dir_digest.hash
    
    923
    +
    
    924
    +            size += obj_size
    
    861 925
     
    
    862
    -            # place directory blob only in final location when we've downloaded
    
    863
    -            # all referenced blobs to avoid dangling references in the repository
    
    864
    -            digest = self.add_object(path=out.name)
    
    865
    -            assert digest.hash == tree.hash
    
    926
    +            return size
    
    866 927
     
    
    867 928
         def _fetch_tree(self, remote, digest):
    
    868 929
             # download but do not store the Tree object
    
    ... ... @@ -885,13 +946,13 @@ class CASCache(ArtifactCache):
    885 946
                         with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
    
    886 947
                             self._fetch_blob(remote, filenode.digest, f)
    
    887 948
     
    
    888
    -                        added_digest = self.add_object(path=f.name)
    
    949
    +                        added_digest = self.add_object(path=f.name)[0]
    
    889 950
                             assert added_digest.hash == filenode.digest.hash
    
    890 951
     
    
    891 952
                     # place directory blob only in final location when we've downloaded
    
    892 953
                     # all referenced blobs to avoid dangling references in the repository
    
    893 954
                     dirbuffer = directory.SerializeToString()
    
    894
    -                dirdigest = self.add_object(buffer=dirbuffer)
    
    955
    +                dirdigest = self.add_object(buffer=dirbuffer)[0]
    
    895 956
                     assert dirdigest.size_bytes == len(dirbuffer)
    
    896 957
     
    
    897 958
             return dirdigest
    

  • buildstream/_artifactcache/casserver.py
    ... ... @@ -210,7 +210,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    210 210
                             context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    211 211
                             return response
    
    212 212
                         out.flush()
    
    213
    -                    digest = self.cas.add_object(path=out.name)
    
    213
    +                    digest = self.cas.add_object(path=out.name)[0]
    
    214 214
                         if digest.hash != client_digest.hash:
    
    215 215
                             context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    216 216
                             return response
    

  • buildstream/_context.py
    ... ... @@ -119,7 +119,6 @@ class Context():
    119 119
             self._log_handle = None
    
    120 120
             self._log_filename = None
    
    121 121
             self.config_cache_quota = 'infinity'
    
    122
    -        self.artifactdir_volume = None
    
    123 122
     
    
    124 123
         # load()
    
    125 124
         #
    

  • buildstream/_scheduler/jobs/__init__.py
    1
    +#
    
    2
    +#  Copyright (C) 2018 Codethink Limited
    
    3
    +#
    
    4
    +#  This program is free software; you can redistribute it and/or
    
    5
    +#  modify it under the terms of the GNU Lesser General Public
    
    6
    +#  License as published by the Free Software Foundation; either
    
    7
    +#  version 2 of the License, or (at your option) any later version.
    
    8
    +#
    
    9
    +#  This library is distributed in the hope that it will be useful,
    
    10
    +#  but WITHOUT ANY WARRANTY; without even the implied warranty of
    
    11
    +#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
    
    12
    +#  Lesser General Public License for more details.
    
    13
    +#
    
    14
    +#  You should have received a copy of the GNU Lesser General Public
    
    15
    +#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
    
    16
    +#
    
    17
    +#  Authors:
    
    18
    +#        Tristan Maat <tristan maat codethink co uk>
    
    19
    +
    
    1 20
     from .elementjob import ElementJob
    
    2
    -from .cachesizejob import CacheSizeJob
    
    3 21
     from .cleanupjob import CleanupJob

  • buildstream/_scheduler/jobs/cachesizejob.py deleted
    1
    -#  Copyright (C) 2018 Codethink Limited
    
    2
    -#
    
    3
    -#  This program is free software; you can redistribute it and/or
    
    4
    -#  modify it under the terms of the GNU Lesser General Public
    
    5
    -#  License as published by the Free Software Foundation; either
    
    6
    -#  version 2 of the License, or (at your option) any later version.
    
    7
    -#
    
    8
    -#  This library is distributed in the hope that it will be useful,
    
    9
    -#  but WITHOUT ANY WARRANTY; without even the implied warranty of
    
    10
    -#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
    
    11
    -#  Lesser General Public License for more details.
    
    12
    -#
    
    13
    -#  You should have received a copy of the GNU Lesser General Public
    
    14
    -#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
    
    15
    -#
    
    16
    -#  Author:
    
    17
    -#        Tristan Daniël Maat <tristan maat codethink co uk>
    
    18
    -#
    
    19
    -from .job import Job
    
    20
    -from ..._platform import Platform
    
    21
    -
    
    22
    -
    
    23
    -class CacheSizeJob(Job):
    
    24
    -    def __init__(self, *args, complete_cb, **kwargs):
    
    25
    -        super().__init__(*args, **kwargs)
    
    26
    -        self._complete_cb = complete_cb
    
    27
    -
    
    28
    -        platform = Platform.get_platform()
    
    29
    -        self._artifacts = platform.artifactcache
    
    30
    -
    
    31
    -    def child_process(self):
    
    32
    -        return self._artifacts.compute_cache_size()
    
    33
    -
    
    34
    -    def parent_complete(self, success, result):
    
    35
    -        if success:
    
    36
    -            self._artifacts.set_cache_size(result)
    
    37
    -
    
    38
    -            if self._complete_cb:
    
    39
    -                self._complete_cb(result)
    
    40
    -
    
    41
    -    def child_process_data(self):
    
    42
    -        return {}

  • buildstream/_scheduler/jobs/cleanupjob.py
    ... ... @@ -21,9 +21,8 @@ from ..._platform import Platform
    21 21
     
    
    22 22
     
    
    23 23
     class CleanupJob(Job):
    
    24
    -    def __init__(self, *args, complete_cb, **kwargs):
    
    24
    +    def __init__(self, *args, **kwargs):
    
    25 25
             super().__init__(*args, **kwargs)
    
    26
    -        self._complete_cb = complete_cb
    
    27 26
     
    
    28 27
             platform = Platform.get_platform()
    
    29 28
             self._artifacts = platform.artifactcache
    
    ... ... @@ -33,10 +32,4 @@ class CleanupJob(Job):
    33 32
     
    
    34 33
         def parent_complete(self, success, result):
    
    35 34
             if success:
    
    36
    -            self._artifacts.set_cache_size(result)
    
    37
    -
    
    38
    -            if self._complete_cb:
    
    39
    -                self._complete_cb()
    
    40
    -
    
    41
    -    def child_process_data(self):
    
    42
    -        return {}
    35
    +            self._artifacts.subtract_artifact_size(result)

  • buildstream/_scheduler/queues/buildqueue.py
    1 1
     #
    
    2
    -#  Copyright (C) 2016 Codethink Limited
    
    2
    +#  Copyright (C) 2018 Codethink Limited
    
    3 3
     #
    
    4 4
     #  This program is free software; you can redistribute it and/or
    
    5 5
     #  modify it under the terms of the GNU Lesser General Public
    
    ... ... @@ -87,31 +87,17 @@ class BuildQueue(Queue):
    87 87
     
    
    88 88
             return QueueStatus.READY
    
    89 89
     
    
    90
    -    def _check_cache_size(self, job, element, artifact_size):
    
    91
    -
    
    92
    -        # After completing a build job, add the artifact size
    
    93
    -        # as returned from Element._assemble() to the estimated
    
    94
    -        # artifact cache size
    
    95
    -        #
    
    96
    -        platform = Platform.get_platform()
    
    97
    -        artifacts = platform.artifactcache
    
    98
    -
    
    99
    -        artifacts.add_artifact_size(artifact_size)
    
    100
    -
    
    101
    -        # If the estimated size outgrows the quota, ask the scheduler
    
    102
    -        # to queue a job to actually check the real cache size.
    
    103
    -        #
    
    104
    -        if artifacts.get_quota_exceeded():
    
    105
    -            self._scheduler.check_cache_size()
    
    106
    -
    
    107 90
         def done(self, job, element, result, success):
    
    91
    +        if not success:
    
    92
    +            return False
    
    93
    +
    
    94
    +        element._assemble_done()
    
    108 95
     
    
    109
    -        if success:
    
    110
    -            # Inform element in main process that assembly is done
    
    111
    -            element._assemble_done()
    
    96
    +        artifacts = Platform.get_platform().artifactcache
    
    97
    +        artifacts.add_artifact_size(result)
    
    112 98
     
    
    113
    -            # This has to be done after _assemble_done, such that the
    
    114
    -            # element may register its cache key as required
    
    115
    -            self._check_cache_size(job, element, result)
    
    99
    +        # This has to be done after _assemble_done, such that the
    
    100
    +        # element may register its cache key as required
    
    101
    +        self._scheduler.check_cache_size()
    
    116 102
     
    
    117
    -        return True
    103
    +        return success

  • buildstream/_scheduler/queues/pullqueue.py
    1 1
     #
    
    2
    -#  Copyright (C) 2016 Codethink Limited
    
    2
    +#  Copyright (C) 2018 Codethink Limited
    
    3 3
     #
    
    4 4
     #  This program is free software; you can redistribute it and/or
    
    5 5
     #  modify it under the terms of the GNU Lesser General Public
    
    ... ... @@ -21,6 +21,7 @@
    21 21
     # Local imports
    
    22 22
     from . import Queue, QueueStatus
    
    23 23
     from ..resources import ResourceType
    
    24
    +from ..._platform import Platform
    
    24 25
     
    
    25 26
     
    
    26 27
     # A queue which pulls element artifacts
    
    ... ... @@ -52,18 +53,21 @@ class PullQueue(Queue):
    52 53
             else:
    
    53 54
                 return QueueStatus.SKIP
    
    54 55
     
    
    55
    -    def done(self, _, element, result, success):
    
    56
    -
    
    56
    +    def done(self, job, element, result, success):
    
    57 57
             if not success:
    
    58 58
                 return False
    
    59 59
     
    
    60 60
             element._pull_done()
    
    61 61
     
    
    62
    -        # Build jobs will check the "approximate" size first. Since we
    
    63
    -        # do not get an artifact size from pull jobs, we have to
    
    64
    -        # actually check the cache size.
    
    62
    +        pulled, artifact_size = result
    
    63
    +
    
    64
    +        artifacts = Platform.get_platform().artifactcache
    
    65
    +        artifacts.add_artifact_size(artifact_size)
    
    66
    +
    
    67
    +        # This has to be done after _pull_done, such that the
    
    68
    +        # element may register its cache key as required
    
    65 69
             self._scheduler.check_cache_size()
    
    66 70
     
    
    67 71
             # Element._pull() returns True if it downloaded an artifact,
    
    68 72
             # here we want to appear skipped if we did not download.
    
    69
    -        return result
    73
    +        return pulled

  • buildstream/_scheduler/scheduler.py
    1 1
     #
    
    2
    -#  Copyright (C) 2016 Codethink Limited
    
    2
    +#  Copyright (C) 2018 Codethink Limited
    
    3 3
     #
    
    4 4
     #  This program is free software; you can redistribute it and/or
    
    5 5
     #  modify it under the terms of the GNU Lesser General Public
    
    ... ... @@ -28,7 +28,7 @@ from contextlib import contextmanager
    28 28
     
    
    29 29
     # Local imports
    
    30 30
     from .resources import Resources, ResourceType
    
    31
    -from .jobs import CacheSizeJob, CleanupJob
    
    31
    +from .jobs import CleanupJob
    
    32 32
     from .._platform import Platform
    
    33 33
     
    
    34 34
     
    
    ... ... @@ -243,22 +243,19 @@ class Scheduler():
    243 243
     
    
    244 244
         # check_cache_size():
    
    245 245
         #
    
    246
    -    # Queues a cache size calculation job, after the cache
    
    247
    -    # size is calculated, a cleanup job will be run automatically
    
    248
    -    # if needed.
    
    249
    -    #
    
    250
    -    # FIXME: This should ensure that only one cache size job
    
    251
    -    #        is ever pending at a given time. If a cache size
    
    252
    -    #        job is already running, it is correct to queue
    
    253
    -    #        a new one, it is incorrect to have more than one
    
    254
    -    #        of these jobs pending at a given time, though.
    
    246
    +    # Queues a cleanup job if the size of the artifact cache exceeded
    
    247
    +    # the quota
    
    255 248
         #
    
    256 249
         def check_cache_size(self):
    
    257
    -        job = CacheSizeJob(self, 'cache_size', 'cache_size/cache_size',
    
    258
    -                           resources=[ResourceType.CACHE,
    
    259
    -                                      ResourceType.PROCESS],
    
    260
    -                           complete_cb=self._run_cleanup)
    
    261
    -        self.schedule_jobs([job])
    
    250
    +        artifacts = Platform.get_platform().artifactcache
    
    251
    +
    
    252
    +        if artifacts.has_quota_exceeded():
    
    253
    +            job = CleanupJob(self, 'Clean artifact cache',
    
    254
    +                             'cleanup/cleanup',
    
    255
    +                             resources=[ResourceType.CACHE,
    
    256
    +                                        ResourceType.PROCESS],
    
    257
    +                             exclusive_resources=[ResourceType.CACHE])
    
    258
    +            self.schedule_jobs([job])
    
    262 259
     
    
    263 260
         #######################################################
    
    264 261
         #                  Local Private Methods              #
    
    ... ... @@ -335,32 +332,6 @@ class Scheduler():
    335 332
             self.schedule_jobs(ready)
    
    336 333
             self._sched()
    
    337 334
     
    
    338
    -    # _run_cleanup()
    
    339
    -    #
    
    340
    -    # Schedules the cache cleanup job if the passed size
    
    341
    -    # exceeds the cache quota.
    
    342
    -    #
    
    343
    -    # Args:
    
    344
    -    #    cache_size (int): The calculated cache size (ignored)
    
    345
    -    #
    
    346
    -    # NOTE: This runs in response to completion of the cache size
    
    347
    -    #       calculation job lauched by Scheduler.check_cache_size(),
    
    348
    -    #       which will report the calculated cache size.
    
    349
    -    #
    
    350
    -    def _run_cleanup(self, cache_size):
    
    351
    -        platform = Platform.get_platform()
    
    352
    -        artifacts = platform.artifactcache
    
    353
    -
    
    354
    -        if not artifacts.get_quota_exceeded():
    
    355
    -            return
    
    356
    -
    
    357
    -        job = CleanupJob(self, 'cleanup', 'cleanup/cleanup',
    
    358
    -                         resources=[ResourceType.CACHE,
    
    359
    -                                    ResourceType.PROCESS],
    
    360
    -                         exclusive_resources=[ResourceType.CACHE],
    
    361
    -                         complete_cb=None)
    
    362
    -        self.schedule_jobs([job])
    
    363
    -
    
    364 335
         # _suspend_jobs()
    
    365 336
         #
    
    366 337
         # Suspend all ongoing jobs.
    

  • buildstream/element.py
    ... ... @@ -1657,8 +1657,8 @@ class Element(Plugin):
    1657 1657
                         }), os.path.join(metadir, 'workspaced-dependencies.yaml'))
    
    1658 1658
     
    
    1659 1659
                         with self.timed_activity("Caching artifact"):
    
    1660
    -                        artifact_size = utils._get_dir_size(assembledir)
    
    1661
    -                        self.__artifacts.commit(self, assembledir, self.__get_cache_keys_for_commit())
    
    1660
    +                        artifact_size = self.__artifacts.commit(
    
    1661
    +                            self, assembledir, self.__get_cache_keys_for_commit())
    
    1662 1662
     
    
    1663 1663
                         if collect is not None and collectvdir is None:
    
    1664 1664
                             raise ElementError(
    
    ... ... @@ -1710,31 +1710,31 @@ class Element(Plugin):
    1710 1710
             self._update_state()
    
    1711 1711
     
    
    1712 1712
         def _pull_strong(self, *, progress=None):
    
    1713
    -        weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
    
    1714
    -
    
    1715 1713
             key = self.__strict_cache_key
    
    1716
    -        if not self.__artifacts.pull(self, key, progress=progress):
    
    1717
    -            return False
    
    1714
    +        pulled, artifact_size = self.__artifacts.pull(self, key,
    
    1715
    +                                                      progress=progress)
    
    1718 1716
     
    
    1719
    -        # update weak ref by pointing it to this newly fetched artifact
    
    1720
    -        self.__artifacts.link_key(self, key, weak_key)
    
    1717
    +        if pulled:
    
    1718
    +            # update weak ref by pointing it to this newly fetched artifact
    
    1719
    +            weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
    
    1720
    +            self.__artifacts.link_key(self, key, weak_key)
    
    1721 1721
     
    
    1722
    -        return True
    
    1722
    +        return pulled, artifact_size
    
    1723 1723
     
    
    1724 1724
         def _pull_weak(self, *, progress=None):
    
    1725 1725
             weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
    
    1726
    +        pulled, artifact_size = self.__artifacts.pull(self, weak_key,
    
    1727
    +                                                      progress=progress)
    
    1726 1728
     
    
    1727
    -        if not self.__artifacts.pull(self, weak_key, progress=progress):
    
    1728
    -            return False
    
    1729
    -
    
    1730
    -        # extract strong cache key from this newly fetched artifact
    
    1731
    -        self._pull_done()
    
    1729
    +        if pulled:
    
    1730
    +            # extract strong cache key from this newly fetched artifact
    
    1731
    +            self._pull_done()
    
    1732 1732
     
    
    1733
    -        # create tag for strong cache key
    
    1734
    -        key = self._get_cache_key(strength=_KeyStrength.STRONG)
    
    1735
    -        self.__artifacts.link_key(self, weak_key, key)
    
    1733
    +            # create tag for strong cache key
    
    1734
    +            key = self._get_cache_key(strength=_KeyStrength.STRONG)
    
    1735
    +            self.__artifacts.link_key(self, weak_key, key)
    
    1736 1736
     
    
    1737
    -        return True
    
    1737
    +        return pulled, artifact_size
    
    1738 1738
     
    
    1739 1739
         # _pull():
    
    1740 1740
         #
    
    ... ... @@ -1749,18 +1749,17 @@ class Element(Plugin):
    1749 1749
                 self.status(message)
    
    1750 1750
     
    
    1751 1751
             # Attempt to pull artifact without knowing whether it's available
    
    1752
    -        pulled = self._pull_strong(progress=progress)
    
    1752
    +        pulled, artifact_size = self._pull_strong(progress=progress)
    
    1753 1753
     
    
    1754 1754
             if not pulled and not self._cached() and not context.get_strict():
    
    1755
    -            pulled = self._pull_weak(progress=progress)
    
    1755
    +            pulled, artifact_size = self._pull_weak(progress=progress)
    
    1756 1756
     
    
    1757
    -        if not pulled:
    
    1758
    -            return False
    
    1757
    +        if pulled:
    
    1758
    +            # Notify successfull download
    
    1759
    +            display_key = self._get_brief_display_key()
    
    1760
    +            self.info("Downloaded artifact {}".format(display_key))
    
    1759 1761
     
    
    1760
    -        # Notify successfull download
    
    1761
    -        display_key = self._get_brief_display_key()
    
    1762
    -        self.info("Downloaded artifact {}".format(display_key))
    
    1763
    -        return True
    
    1762
    +        return pulled, artifact_size
    
    1764 1763
     
    
    1765 1764
         # _skip_push():
    
    1766 1765
         #
    

  • buildstream/storage/_casbaseddirectory.py
    ... ... @@ -111,7 +111,7 @@ class CasBasedDirectory(Directory):
    111 111
             the parent).
    
    112 112
     
    
    113 113
             """
    
    114
    -        self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString())
    
    114
    +        self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString())[0]
    
    115 115
             if caller:
    
    116 116
                 old_dir = self._find_pb2_entry(caller.filename)
    
    117 117
                 self.cas_cache.add_object(digest=old_dir.digest, buffer=caller.pb2_directory.SerializeToString())
    
    ... ... @@ -130,9 +130,10 @@ class CasBasedDirectory(Directory):
    130 130
                 self.index[entry.name].buildstream_object._recalculate_recursing_down(entry)
    
    131 131
     
    
    132 132
             if parent:
    
    133
    -            self.ref = self.cas_cache.add_object(digest=parent.digest, buffer=self.pb2_directory.SerializeToString())
    
    133
    +            self.ref = self.cas_cache.add_object(digest=parent.digest,
    
    134
    +                                                 buffer=self.pb2_directory.SerializeToString())[0]
    
    134 135
             else:
    
    135
    -            self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString())
    
    136
    +            self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString())[0]
    
    136 137
             # We don't need to do anything more than that; files were already added ealier, and symlinks are
    
    137 138
             # part of the directory structure.
    
    138 139
     
    

  • doc/source/install_source.rst
    ... ... @@ -29,6 +29,7 @@ The default plugins with extra host dependencies are:
    29 29
     * git
    
    30 30
     * ostree
    
    31 31
     * patch
    
    32
    +* pip
    
    32 33
     * tar
    
    33 34
     
    
    34 35
     If you intend to push built artifacts to a remote artifact server,
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]