[Notes] [Git][BuildStream/buildstream][tiagogomes/issue-573] 4 commits: artifactcache: return bytes required to cache artifact



Title: GitLab

Tiago Gomes pushed to branch tiagogomes/issue-573 at BuildStream / buildstream

Commits:

13 changed files:

Changes:

  • buildstream/_artifactcache/artifactcache.py
    ... ... @@ -16,6 +16,7 @@
    16 16
     #
    
    17 17
     #  Authors:
    
    18 18
     #        Tristan Maat <tristan maat codethink co uk>
    
    19
    +#        Tiago Gomes <tiago gomes codethink co uk>
    
    19 20
     
    
    20 21
     import os
    
    21 22
     import string
    
    ... ... @@ -85,8 +86,6 @@ class ArtifactCache():
    85 86
             self.extractdir = os.path.join(context.artifactdir, 'extract')
    
    86 87
             self.tmpdir = os.path.join(context.artifactdir, 'tmp')
    
    87 88
     
    
    88
    -        self.estimated_size = None
    
    89
    -
    
    90 89
             self.global_remote_specs = []
    
    91 90
             self.project_remote_specs = {}
    
    92 91
     
    
    ... ... @@ -228,10 +227,14 @@ class ArtifactCache():
    228 227
         #
    
    229 228
         # Clean the artifact cache as much as possible.
    
    230 229
         #
    
    230
    +    # Returns:
    
    231
    +    #   (int): Amount of bytes cleaned from the cache
    
    232
    +    #
    
    231 233
         def clean(self):
    
    232 234
             artifacts = self.list_artifacts()
    
    235
    +        cache_size = old_cache_size = self.get_cache_size()
    
    233 236
     
    
    234
    -        while self.calculate_cache_size() >= self.cache_quota - self.cache_lower_threshold:
    
    237
    +        while cache_size >= self.cache_quota - self.cache_lower_threshold:
    
    235 238
                 try:
    
    236 239
                     to_remove = artifacts.pop(0)
    
    237 240
                 except IndexError:
    
    ... ... @@ -245,7 +248,7 @@ class ArtifactCache():
    245 248
                               "Please increase the cache-quota in {}."
    
    246 249
                               .format(self.context.config_origin or default_conf))
    
    247 250
     
    
    248
    -                if self.calculate_cache_size() > self.cache_quota:
    
    251
    +                if cache_size > self.cache_quota:
    
    249 252
                         raise ArtifactError("Cache too full. Aborting.",
    
    250 253
                                             detail=detail,
    
    251 254
                                             reason="cache-too-full")
    
    ... ... @@ -255,44 +258,17 @@ class ArtifactCache():
    255 258
                 key = to_remove.rpartition('/')[2]
    
    256 259
                 if key not in self.required_artifacts:
    
    257 260
                     size = self.remove(to_remove)
    
    258
    -                if size:
    
    259
    -                    self.cache_size -= size
    
    261
    +                cache_size -= size
    
    262
    +                self._message(MessageType.DEBUG,
    
    263
    +                              "Removed artifact {} ({})".format(
    
    264
    +                                  to_remove[:-(len(key) - self.context.log_key_length)],
    
    265
    +                                  utils._pretty_size(size)))
    
    260 266
     
    
    261
    -        # This should be O(1) if implemented correctly
    
    262
    -        return self.calculate_cache_size()
    
    267
    +        self._message(MessageType.INFO,
    
    268
    +                      "New artifact cache size: {}".format(
    
    269
    +                          utils._pretty_size(cache_size)))
    
    263 270
     
    
    264
    -    # get_approximate_cache_size()
    
    265
    -    #
    
    266
    -    # A cheap method that aims to serve as an upper limit on the
    
    267
    -    # artifact cache size.
    
    268
    -    #
    
    269
    -    # The cache size reported by this function will normally be larger
    
    270
    -    # than the real cache size, since it is calculated using the
    
    271
    -    # pre-commit artifact size, but for very small artifacts in
    
    272
    -    # certain caches additional overhead could cause this to be
    
    273
    -    # smaller than, but close to, the actual size.
    
    274
    -    #
    
    275
    -    # Nonetheless, in practice this should be safe to use as an upper
    
    276
    -    # limit on the cache size.
    
    277
    -    #
    
    278
    -    # If the cache has built-in constant-time size reporting, please
    
    279
    -    # feel free to override this method with a more accurate
    
    280
    -    # implementation.
    
    281
    -    #
    
    282
    -    # Returns:
    
    283
    -    #     (int) An approximation of the artifact cache size.
    
    284
    -    #
    
    285
    -    def get_approximate_cache_size(self):
    
    286
    -        # If we don't currently have an estimate, figure out the real
    
    287
    -        # cache size.
    
    288
    -        if self.estimated_size is None:
    
    289
    -            stored_size = self._read_cache_size()
    
    290
    -            if stored_size is not None:
    
    291
    -                self.estimated_size = stored_size
    
    292
    -            else:
    
    293
    -                self.estimated_size = self.calculate_cache_size()
    
    294
    -
    
    295
    -        return self.estimated_size
    
    271
    +        return old_cache_size - cache_size
    
    296 272
     
    
    297 273
         ################################################
    
    298 274
         # Abstract methods for subclasses to implement #
    
    ... ... @@ -390,6 +366,10 @@ class ArtifactCache():
    390 366
         #     content (str): The element's content directory
    
    391 367
         #     keys (list): The cache keys to use
    
    392 368
         #
    
    369
    +    # Returns:
    
    370
    +    #   (int): Bytes required to cache the artifact taking deduplication
    
    371
    +    #          into account
    
    372
    +    #
    
    393 373
         def commit(self, element, content, keys):
    
    394 374
             raise ImplError("Cache '{kind}' does not implement commit()"
    
    395 375
                             .format(kind=type(self).__name__))
    
    ... ... @@ -462,6 +442,8 @@ class ArtifactCache():
    462 442
         #
    
    463 443
         # Returns:
    
    464 444
         #   (bool): True if pull was successful, False if artifact was not available
    
    445
    +    #   (int): Bytes required to cache the artifact taking deduplication
    
    446
    +    #          into account
    
    465 447
         #
    
    466 448
         def pull(self, element, key, *, progress=None):
    
    467 449
             raise ImplError("Cache '{kind}' does not implement pull()"
    
    ... ... @@ -484,8 +466,6 @@ class ArtifactCache():
    484 466
         #
    
    485 467
         # Return the real artifact cache size.
    
    486 468
         #
    
    487
    -    # Implementations should also use this to update estimated_size.
    
    488
    -    #
    
    489 469
         # Returns:
    
    490 470
         #
    
    491 471
         # (int) The size of the artifact cache.
    
    ... ... @@ -494,6 +474,23 @@ class ArtifactCache():
    494 474
             raise ImplError("Cache '{kind}' does not implement calculate_cache_size()"
    
    495 475
                             .format(kind=type(self).__name__))
    
    496 476
     
    
    477
    +    # get_cache_size()
    
    478
    +    #
    
    479
    +    # Return the artifact cache size.
    
    480
    +    #
    
    481
    +    # Returns:
    
    482
    +    #
    
    483
    +    # (int) The size of the artifact cache.
    
    484
    +    #
    
    485
    +    def get_cache_size(self):
    
    486
    +        if self.cache_size is None:
    
    487
    +            self.cache_size = self._read_cache_size()
    
    488
    +
    
    489
    +        if self.cache_size is None:
    
    490
    +            self.cache_size = self.calculate_cache_size()
    
    491
    +
    
    492
    +        return self.cache_size
    
    493
    +
    
    497 494
         ################################################
    
    498 495
         #               Local Private Methods          #
    
    499 496
         ################################################
    
    ... ... @@ -537,32 +534,13 @@ class ArtifactCache():
    537 534
     
    
    538 535
         # _add_artifact_size()
    
    539 536
         #
    
    540
    -    # Since we cannot keep track of the cache size between threads,
    
    541
    -    # this method will be called by the main process every time a
    
    542
    -    # process that added something to the cache finishes.
    
    543
    -    #
    
    544
    -    # This will then add the reported size to
    
    545
    -    # ArtifactCache.estimated_size.
    
    537
    +    # Since we cannot keep track of the cache size between processes,
    
    538
    +    # this method will be called by the main process every time a job
    
    539
    +    # added or removed an artifact from the cache finishes.
    
    546 540
         #
    
    547 541
         def _add_artifact_size(self, artifact_size):
    
    548
    -        if not self.estimated_size:
    
    549
    -            self.estimated_size = self.calculate_cache_size()
    
    550
    -
    
    551
    -        self.estimated_size += artifact_size
    
    552
    -        self._write_cache_size(self.estimated_size)
    
    553
    -
    
    554
    -    # _set_cache_size()
    
    555
    -    #
    
    556
    -    # Similarly to the above method, when we calculate the actual size
    
    557
    -    # in a child thread, we can't update it. We instead pass the value
    
    558
    -    # back to the main thread and update it there.
    
    559
    -    #
    
    560
    -    def _set_cache_size(self, cache_size):
    
    561
    -        self.estimated_size = cache_size
    
    562
    -
    
    563
    -        # set_cache_size is called in cleanup, where it may set the cache to None
    
    564
    -        if self.estimated_size is not None:
    
    565
    -            self._write_cache_size(self.estimated_size)
    
    542
    +        self.cache_size = self.get_cache_size() + artifact_size
    
    543
    +        self._write_cache_size(self.cache_size)
    
    566 544
     
    
    567 545
         # _write_cache_size()
    
    568 546
         #
    
    ... ... @@ -628,7 +606,7 @@ class ArtifactCache():
    628 606
             stat = os.statvfs(artifactdir_volume)
    
    629 607
             available_space = (stat.f_bsize * stat.f_bavail)
    
    630 608
     
    
    631
    -        cache_size = self.get_approximate_cache_size()
    
    609
    +        cache_size = self.get_cache_size()
    
    632 610
     
    
    633 611
             # Ensure system has enough storage for the cache_quota
    
    634 612
             #
    

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -16,6 +16,7 @@
    16 16
     #
    
    17 17
     #  Authors:
    
    18 18
     #        Jürg Billeter <juerg billeter codethink co uk>
    
    19
    +#        Tiago Gomes <tiago gomes codethink co uk>
    
    19 20
     
    
    20 21
     import hashlib
    
    21 22
     import itertools
    
    ... ... @@ -115,12 +116,12 @@ class CASCache(ArtifactCache):
    115 116
         def commit(self, element, content, keys):
    
    116 117
             refs = [self.get_artifact_fullname(element, key) for key in keys]
    
    117 118
     
    
    118
    -        tree = self._create_tree(content)
    
    119
    +        tree, size = self._create_tree(content)
    
    119 120
     
    
    120 121
             for ref in refs:
    
    121 122
                 self.set_ref(ref, tree)
    
    122 123
     
    
    123
    -        self.cache_size = None
    
    124
    +        return size
    
    124 125
     
    
    125 126
         def diff(self, element, key_a, key_b, *, subdir=None):
    
    126 127
             ref_a = self.get_artifact_fullname(element, key_a)
    
    ... ... @@ -238,12 +239,12 @@ class CASCache(ArtifactCache):
    238 239
                     tree.hash = response.digest.hash
    
    239 240
                     tree.size_bytes = response.digest.size_bytes
    
    240 241
     
    
    241
    -                self._fetch_tree(remote, tree)
    
    242
    +                size = self._fetch_tree(remote, tree)
    
    242 243
     
    
    243 244
                     self.set_ref(ref, tree)
    
    244 245
     
    
    245 246
                     # no need to pull from additional remotes
    
    246
    -                return True
    
    247
    +                return True, size
    
    247 248
     
    
    248 249
                 except grpc.RpcError as e:
    
    249 250
                     if e.code() != grpc.StatusCode.NOT_FOUND:
    
    ... ... @@ -257,7 +258,7 @@ class CASCache(ArtifactCache):
    257 258
                                 remote.spec.url, element._get_brief_display_key())
    
    258 259
                         ))
    
    259 260
     
    
    260
    -        return False
    
    261
    +        return False, 0
    
    261 262
     
    
    262 263
         def link_key(self, element, oldkey, newkey):
    
    263 264
             oldref = self.get_artifact_fullname(element, oldkey)
    
    ... ... @@ -397,6 +398,7 @@ class CASCache(ArtifactCache):
    397 398
         #
    
    398 399
         # Returns:
    
    399 400
         #     (Digest): The digest of the added object
    
    401
    +    #     (int): The number of bytes required to store the object
    
    400 402
         #
    
    401 403
         # Either `path` or `buffer` must be passed, but not both.
    
    402 404
         #
    
    ... ... @@ -425,22 +427,39 @@ class CASCache(ArtifactCache):
    425 427
     
    
    426 428
                     out.flush()
    
    427 429
     
    
    430
    +                file_size = os.fstat(out.fileno()).st_size
    
    431
    +
    
    428 432
                     digest.hash = h.hexdigest()
    
    429
    -                digest.size_bytes = os.fstat(out.fileno()).st_size
    
    433
    +                digest.size_bytes = file_size
    
    430 434
     
    
    431 435
                     # Place file at final location
    
    432 436
                     objpath = self.objpath(digest)
    
    433
    -                os.makedirs(os.path.dirname(objpath), exist_ok=True)
    
    437
    +                dirpath = os.path.dirname(objpath)
    
    438
    +
    
    439
    +                # Track the increased size on the parent directory caused by
    
    440
    +                # adding a new entry, as these directories can contain a large
    
    441
    +                # number of files.
    
    442
    +                new_dir_size = 0
    
    443
    +                old_dir_size = 0
    
    444
    +                try:
    
    445
    +                    os.makedirs(dirpath)
    
    446
    +                except FileExistsError:
    
    447
    +                    old_dir_size = os.stat(dirpath).st_size
    
    448
    +                else:
    
    449
    +                    new_dir_size = os.stat(dirpath).st_size
    
    450
    +
    
    434 451
                     os.link(out.name, objpath)
    
    452
    +                new_dir_size = os.stat(dirpath).st_size - old_dir_size
    
    435 453
     
    
    436 454
             except FileExistsError as e:
    
    437 455
                 # We can ignore the failed link() if the object is already in the repo.
    
    456
    +            file_size = 0
    
    438 457
                 pass
    
    439 458
     
    
    440 459
             except OSError as e:
    
    441 460
                 raise ArtifactError("Failed to hash object: {}".format(e)) from e
    
    442 461
     
    
    443
    -        return digest
    
    462
    +        return digest, file_size + new_dir_size
    
    444 463
     
    
    445 464
         # set_ref():
    
    446 465
         #
    
    ... ... @@ -449,6 +468,8 @@ class CASCache(ArtifactCache):
    449 468
         # Args:
    
    450 469
         #     ref (str): The name of the ref
    
    451 470
         #
    
    471
    +    # Note: Setting a ref will have a very low overhead on the cache
    
    472
    +    # size, so we don't track this.
    
    452 473
         def set_ref(self, ref, tree):
    
    453 474
             refpath = self._refpath(ref)
    
    454 475
             os.makedirs(os.path.dirname(refpath), exist_ok=True)
    
    ... ... @@ -488,11 +509,7 @@ class CASCache(ArtifactCache):
    488 509
                 raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e
    
    489 510
     
    
    490 511
         def calculate_cache_size(self):
    
    491
    -        if self.cache_size is None:
    
    492
    -            self.cache_size = utils._get_dir_size(self.casdir)
    
    493
    -            self.estimated_size = self.cache_size
    
    494
    -
    
    495
    -        return self.cache_size
    
    512
    +        return utils._get_dir_size(self.casdir)
    
    496 513
     
    
    497 514
         # list_artifacts():
    
    498 515
         #
    
    ... ... @@ -630,6 +647,7 @@ class CASCache(ArtifactCache):
    630 647
     
    
    631 648
         def _create_tree(self, path, *, digest=None):
    
    632 649
             directory = remote_execution_pb2.Directory()
    
    650
    +        size = 0
    
    633 651
     
    
    634 652
             for name in sorted(os.listdir(path)):
    
    635 653
                 full_path = os.path.join(path, name)
    
    ... ... @@ -637,11 +655,11 @@ class CASCache(ArtifactCache):
    637 655
                 if stat.S_ISDIR(mode):
    
    638 656
                     dirnode = directory.directories.add()
    
    639 657
                     dirnode.name = name
    
    640
    -                self._create_tree(full_path, digest=dirnode.digest)
    
    658
    +                size += self._create_tree(full_path, digest=dirnode.digest)[1]
    
    641 659
                 elif stat.S_ISREG(mode):
    
    642 660
                     filenode = directory.files.add()
    
    643 661
                     filenode.name = name
    
    644
    -                self.add_object(path=full_path, digest=filenode.digest)
    
    662
    +                size += self.add_object(path=full_path, digest=filenode.digest)[1]
    
    645 663
                     filenode.is_executable = (mode & stat.S_IXUSR) == stat.S_IXUSR
    
    646 664
                 elif stat.S_ISLNK(mode):
    
    647 665
                     symlinknode = directory.symlinks.add()
    
    ... ... @@ -650,7 +668,8 @@ class CASCache(ArtifactCache):
    650 668
                 else:
    
    651 669
                     raise ArtifactError("Unsupported file type for {}".format(full_path))
    
    652 670
     
    
    653
    -        return self.add_object(digest=digest, buffer=directory.SerializeToString())
    
    671
    +        res = self.add_object(digest=digest, buffer=directory.SerializeToString())
    
    672
    +        return res[0], res[1] + size
    
    654 673
     
    
    655 674
         def _get_subdir(self, tree, subdir):
    
    656 675
             head, name = os.path.split(subdir)
    
    ... ... @@ -794,10 +813,11 @@ class CASCache(ArtifactCache):
    794 813
             assert digest.size_bytes == os.fstat(out.fileno()).st_size
    
    795 814
     
    
    796 815
         def _fetch_tree(self, remote, tree):
    
    816
    +        size = 0
    
    797 817
             objpath = self.objpath(tree)
    
    798 818
             if os.path.exists(objpath):
    
    799 819
                 # already in local cache
    
    800
    -            return
    
    820
    +            return 0
    
    801 821
     
    
    802 822
             with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
    
    803 823
                 self._fetch_blob(remote, tree, out)
    
    ... ... @@ -816,17 +836,21 @@ class CASCache(ArtifactCache):
    816 836
                     with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
    
    817 837
                         self._fetch_blob(remote, filenode.digest, f)
    
    818 838
     
    
    819
    -                    digest = self.add_object(path=f.name)
    
    839
    +                    digest, obj_size = self.add_object(path=f.name)
    
    840
    +                    size += obj_size
    
    820 841
                         assert digest.hash == filenode.digest.hash
    
    821 842
     
    
    822 843
                 for dirnode in directory.directories:
    
    823
    -                self._fetch_tree(remote, dirnode.digest)
    
    844
    +                size += self._fetch_tree(remote, dirnode.digest)
    
    824 845
     
    
    825 846
                 # place directory blob only in final location when we've downloaded
    
    826 847
                 # all referenced blobs to avoid dangling references in the repository
    
    827
    -            digest = self.add_object(path=out.name)
    
    848
    +            digest, obj_size = self.add_object(path=out.name)
    
    849
    +            size += obj_size
    
    828 850
                 assert digest.hash == tree.hash
    
    829 851
     
    
    852
    +            return size
    
    853
    +
    
    830 854
     
    
    831 855
     # Represents a single remote CAS cache.
    
    832 856
     #
    

  • buildstream/_artifactcache/casserver.py
    ... ... @@ -203,7 +203,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    203 203
                             context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    204 204
                             return response
    
    205 205
                         out.flush()
    
    206
    -                    digest = self.cas.add_object(path=out.name)
    
    206
    +                    digest = self.cas.add_object(path=out.name)[0]
    
    207 207
                         if digest.hash != client_digest.hash:
    
    208 208
                             context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    209 209
                             return response
    

  • buildstream/_scheduler/jobs/__init__.py
    1 1
     from .elementjob import ElementJob
    
    2
    -from .cachesizejob import CacheSizeJob
    
    3 2
     from .cleanupjob import CleanupJob

  • buildstream/_scheduler/jobs/cachesizejob.py deleted
    1
    -#  Copyright (C) 2018 Codethink Limited
    
    2
    -#
    
    3
    -#  This program is free software; you can redistribute it and/or
    
    4
    -#  modify it under the terms of the GNU Lesser General Public
    
    5
    -#  License as published by the Free Software Foundation; either
    
    6
    -#  version 2 of the License, or (at your option) any later version.
    
    7
    -#
    
    8
    -#  This library is distributed in the hope that it will be useful,
    
    9
    -#  but WITHOUT ANY WARRANTY; without even the implied warranty of
    
    10
    -#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
    
    11
    -#  Lesser General Public License for more details.
    
    12
    -#
    
    13
    -#  You should have received a copy of the GNU Lesser General Public
    
    14
    -#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
    
    15
    -#
    
    16
    -#  Author:
    
    17
    -#        Tristan Daniël Maat <tristan maat codethink co uk>
    
    18
    -#
    
    19
    -from .job import Job
    
    20
    -from ..._platform import Platform
    
    21
    -
    
    22
    -
    
    23
    -class CacheSizeJob(Job):
    
    24
    -    def __init__(self, *args, complete_cb, **kwargs):
    
    25
    -        super().__init__(*args, **kwargs)
    
    26
    -        self._complete_cb = complete_cb
    
    27
    -        self._cache = Platform._instance.artifactcache
    
    28
    -
    
    29
    -    def child_process(self):
    
    30
    -        return self._cache.calculate_cache_size()
    
    31
    -
    
    32
    -    def parent_complete(self, success, result):
    
    33
    -        self._cache._set_cache_size(result)
    
    34
    -        if self._complete_cb:
    
    35
    -            self._complete_cb(result)
    
    36
    -
    
    37
    -    def child_process_data(self):
    
    38
    -        return {}

  • buildstream/_scheduler/jobs/cleanupjob.py
    ... ... @@ -21,18 +21,19 @@ from ..._platform import Platform
    21 21
     
    
    22 22
     
    
    23 23
     class CleanupJob(Job):
    
    24
    -    def __init__(self, *args, complete_cb, **kwargs):
    
    24
    +    def __init__(self, *args, **kwargs):
    
    25 25
             super().__init__(*args, **kwargs)
    
    26
    -        self._complete_cb = complete_cb
    
    27 26
             self._cache = Platform._instance.artifactcache
    
    28 27
     
    
    29 28
         def child_process(self):
    
    30 29
             return self._cache.clean()
    
    31 30
     
    
    32 31
         def parent_complete(self, success, result):
    
    33
    -        self._cache._set_cache_size(result)
    
    34
    -        if self._complete_cb:
    
    35
    -            self._complete_cb()
    
    32
    +        if success:
    
    33
    +            # ArtifactCache.clean() returns the number of bytes cleaned.
    
    34
    +            # We negate the number because the cache size is to be
    
    35
    +            # decreased.
    
    36
    +            self._cache._add_artifact_size(result * -1)
    
    36 37
     
    
    37 38
         def child_process_data(self):
    
    38 39
             return {}

  • buildstream/_scheduler/jobs/elementjob.py
    ... ... @@ -110,12 +110,10 @@ class ElementJob(Job):
    110 110
     
    
    111 111
             workspace = self._element._get_workspace()
    
    112 112
             artifact_size = self._element._get_artifact_size()
    
    113
    -        cache_size = self._element._get_artifact_cache().calculate_cache_size()
    
    114 113
     
    
    115 114
             if workspace is not None:
    
    116 115
                 data['workspace'] = workspace.to_dict()
    
    117 116
             if artifact_size is not None:
    
    118 117
                 data['artifact_size'] = artifact_size
    
    119
    -        data['cache_size'] = cache_size
    
    120 118
     
    
    121 119
             return data

  • buildstream/_scheduler/queues/buildqueue.py
    ... ... @@ -87,19 +87,6 @@ class BuildQueue(Queue):
    87 87
     
    
    88 88
             return QueueStatus.READY
    
    89 89
     
    
    90
    -    def _check_cache_size(self, job, element):
    
    91
    -        if not job.child_data:
    
    92
    -            return
    
    93
    -
    
    94
    -        artifact_size = job.child_data.get('artifact_size', False)
    
    95
    -
    
    96
    -        if artifact_size:
    
    97
    -            cache = element._get_artifact_cache()
    
    98
    -            cache._add_artifact_size(artifact_size)
    
    99
    -
    
    100
    -            if cache.get_approximate_cache_size() > cache.cache_quota:
    
    101
    -                self._scheduler._check_cache_size_real()
    
    102
    -
    
    103 90
         def done(self, job, element, result, success):
    
    104 91
     
    
    105 92
             if success:
    

  • buildstream/_scheduler/queues/pullqueue.py
    1 1
     #
    
    2
    -#  Copyright (C) 2016 Codethink Limited
    
    2
    +#  Copyright (C) 2018 Codethink Limited
    
    3 3
     #
    
    4 4
     #  This program is free software; you can redistribute it and/or
    
    5 5
     #  modify it under the terms of the GNU Lesser General Public
    
    ... ... @@ -52,17 +52,14 @@ class PullQueue(Queue):
    52 52
             else:
    
    53 53
                 return QueueStatus.SKIP
    
    54 54
     
    
    55
    -    def done(self, _, element, result, success):
    
    55
    +    def done(self, job, element, result, success):
    
    56 56
     
    
    57 57
             if not success:
    
    58 58
                 return False
    
    59 59
     
    
    60 60
             element._pull_done()
    
    61 61
     
    
    62
    -        # Build jobs will check the "approximate" size first. Since we
    
    63
    -        # do not get an artifact size from pull jobs, we have to
    
    64
    -        # actually check the cache size.
    
    65
    -        self._scheduler._check_cache_size_real()
    
    62
    +        self._check_cache_size(job, element)
    
    66 63
     
    
    67 64
             # Element._pull() returns True if it downloaded an artifact,
    
    68 65
             # here we want to appear skipped if we did not download.
    

  • buildstream/_scheduler/queues/queue.py
    1 1
     #
    
    2
    -#  Copyright (C) 2016 Codethink Limited
    
    2
    +#  Copyright (C) 2018 Codethink Limited
    
    3 3
     #
    
    4 4
     #  This program is free software; you can redistribute it and/or
    
    5 5
     #  modify it under the terms of the GNU Lesser General Public
    
    ... ... @@ -301,8 +301,6 @@ class Queue():
    301 301
             # Update values that need to be synchronized in the main task
    
    302 302
             # before calling any queue implementation
    
    303 303
             self._update_workspaces(element, job)
    
    304
    -        if job.child_data:
    
    305
    -            element._get_artifact_cache().cache_size = job.child_data.get('cache_size')
    
    306 304
     
    
    307 305
             # Give the result of the job to the Queue implementor,
    
    308 306
             # and determine if it should be considered as processed
    
    ... ... @@ -360,3 +358,16 @@ class Queue():
    360 358
             logfile = "{key}-{action}".format(key=key, action=action)
    
    361 359
     
    
    362 360
             return os.path.join(project.name, element.normal_name, logfile)
    
    361
    +
    
    362
    +    def _check_cache_size(self, job, element):
    
    363
    +        if not job.child_data:
    
    364
    +            return
    
    365
    +
    
    366
    +        artifact_size = job.child_data.get('artifact_size', False)
    
    367
    +
    
    368
    +        if artifact_size:
    
    369
    +            cache = element._get_artifact_cache()
    
    370
    +            cache._add_artifact_size(artifact_size)
    
    371
    +
    
    372
    +            if cache.get_cache_size() > cache.cache_quota:
    
    373
    +                self._scheduler._run_cache_cleanup()

  • buildstream/_scheduler/scheduler.py
    1 1
     #
    
    2
    -#  Copyright (C) 2016 Codethink Limited
    
    2
    +#  Copyright (C) 2018 Codethink Limited
    
    3 3
     #
    
    4 4
     #  This program is free software; you can redistribute it and/or
    
    5 5
     #  modify it under the terms of the GNU Lesser General Public
    
    ... ... @@ -28,8 +28,7 @@ from contextlib import contextmanager
    28 28
     
    
    29 29
     # Local imports
    
    30 30
     from .resources import Resources, ResourceType
    
    31
    -from .jobs import CacheSizeJob, CleanupJob
    
    32
    -from .._platform import Platform
    
    31
    +from .jobs import CleanupJob
    
    33 32
     
    
    34 33
     
    
    35 34
     # A decent return code for Scheduler.run()
    
    ... ... @@ -316,24 +315,11 @@ class Scheduler():
    316 315
             self.schedule_jobs(ready)
    
    317 316
             self._sched()
    
    318 317
     
    
    319
    -    def _run_cleanup(self, cache_size):
    
    320
    -        platform = Platform.get_platform()
    
    321
    -        if cache_size and cache_size < platform.artifactcache.cache_quota:
    
    322
    -            return
    
    323
    -
    
    324
    -        job = CleanupJob(self, 'cleanup', 'cleanup',
    
    318
    +    def _run_cache_cleanup(self):
    
    319
    +        job = CleanupJob(self, 'Cleaning artifact cache', 'cleanup',
    
    325 320
                              resources=[ResourceType.CACHE,
    
    326 321
                                         ResourceType.PROCESS],
    
    327
    -                         exclusive_resources=[ResourceType.CACHE],
    
    328
    -                         complete_cb=None)
    
    329
    -        self.schedule_jobs([job])
    
    330
    -
    
    331
    -    def _check_cache_size_real(self):
    
    332
    -        job = CacheSizeJob(self, 'cache_size', 'cache_size/cache_size',
    
    333
    -                           resources=[ResourceType.CACHE,
    
    334
    -                                      ResourceType.PROCESS],
    
    335
    -                           exclusive_resources=[ResourceType.CACHE],
    
    336
    -                           complete_cb=self._run_cleanup)
    
    322
    +                         exclusive_resources=[ResourceType.CACHE])
    
    337 323
             self.schedule_jobs([job])
    
    338 324
     
    
    339 325
         # _suspend_jobs()
    

  • buildstream/element.py
    ... ... @@ -1646,8 +1646,8 @@ class Element(Plugin):
    1646 1646
                         }), os.path.join(metadir, 'workspaced-dependencies.yaml'))
    
    1647 1647
     
    
    1648 1648
                         with self.timed_activity("Caching artifact"):
    
    1649
    -                        self.__artifact_size = utils._get_dir_size(assembledir)
    
    1650
    -                        self.__artifacts.commit(self, assembledir, self.__get_cache_keys_for_commit())
    
    1649
    +                        self.__artifact_size = self.__artifacts.commit(
    
    1650
    +                            self, assembledir, self.__get_cache_keys_for_commit())
    
    1651 1651
     
    
    1652 1652
                         if collect is not None and collectvdir is None:
    
    1653 1653
                             raise ElementError(
    
    ... ... @@ -1697,31 +1697,31 @@ class Element(Plugin):
    1697 1697
             self._update_state()
    
    1698 1698
     
    
    1699 1699
         def _pull_strong(self, *, progress=None):
    
    1700
    -        weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
    
    1701
    -
    
    1702 1700
             key = self.__strict_cache_key
    
    1703
    -        if not self.__artifacts.pull(self, key, progress=progress):
    
    1704
    -            return False
    
    1701
    +        pulled, self.__artifact_size = self.__artifacts.pull(
    
    1702
    +            self, key, progress=progress)
    
    1705 1703
     
    
    1706
    -        # update weak ref by pointing it to this newly fetched artifact
    
    1707
    -        self.__artifacts.link_key(self, key, weak_key)
    
    1704
    +        if pulled:
    
    1705
    +            # update weak ref by pointing it to this newly fetched artifact
    
    1706
    +            weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
    
    1707
    +            self.__artifacts.link_key(self, key, weak_key)
    
    1708 1708
     
    
    1709
    -        return True
    
    1709
    +        return pulled
    
    1710 1710
     
    
    1711 1711
         def _pull_weak(self, *, progress=None):
    
    1712 1712
             weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
    
    1713
    +        pulled, self.__artifact_size = self.__artifacts.pull(
    
    1714
    +            self, weak_key, progress=progress)
    
    1713 1715
     
    
    1714
    -        if not self.__artifacts.pull(self, weak_key, progress=progress):
    
    1715
    -            return False
    
    1716
    -
    
    1717
    -        # extract strong cache key from this newly fetched artifact
    
    1718
    -        self._pull_done()
    
    1716
    +        if pulled:
    
    1717
    +            # extract strong cache key from this newly fetched artifact
    
    1718
    +            self._pull_done()
    
    1719 1719
     
    
    1720
    -        # create tag for strong cache key
    
    1721
    -        key = self._get_cache_key(strength=_KeyStrength.STRONG)
    
    1722
    -        self.__artifacts.link_key(self, weak_key, key)
    
    1720
    +            # create tag for strong cache key
    
    1721
    +            key = self._get_cache_key(strength=_KeyStrength.STRONG)
    
    1722
    +            self.__artifacts.link_key(self, weak_key, key)
    
    1723 1723
     
    
    1724
    -        return True
    
    1724
    +        return pulled
    
    1725 1725
     
    
    1726 1726
         # _pull():
    
    1727 1727
         #
    
    ... ... @@ -1741,13 +1741,12 @@ class Element(Plugin):
    1741 1741
             if not pulled and not self._cached() and not context.get_strict():
    
    1742 1742
                 pulled = self._pull_weak(progress=progress)
    
    1743 1743
     
    
    1744
    -        if not pulled:
    
    1745
    -            return False
    
    1744
    +        if pulled:
    
    1745
    +            # Notify successfull download
    
    1746
    +            display_key = self._get_brief_display_key()
    
    1747
    +            self.info("Downloaded artifact {}".format(display_key))
    
    1746 1748
     
    
    1747
    -        # Notify successfull download
    
    1748
    -        display_key = self._get_brief_display_key()
    
    1749
    -        self.info("Downloaded artifact {}".format(display_key))
    
    1750
    -        return True
    
    1749
    +        return pulled
    
    1751 1750
     
    
    1752 1751
         # _skip_push():
    
    1753 1752
         #
    

  • buildstream/storage/_casbaseddirectory.py
    ... ... @@ -111,7 +111,7 @@ class CasBasedDirectory(Directory):
    111 111
             the parent).
    
    112 112
     
    
    113 113
             """
    
    114
    -        self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString())
    
    114
    +        self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString())[0]
    
    115 115
             if caller:
    
    116 116
                 old_dir = self._find_pb2_entry(caller.filename)
    
    117 117
                 self.cas_cache.add_object(digest=old_dir.digest, buffer=caller.pb2_directory.SerializeToString())
    
    ... ... @@ -130,9 +130,10 @@ class CasBasedDirectory(Directory):
    130 130
                 self.index[entry.name].buildstream_object._recalculate_recursing_down(entry)
    
    131 131
     
    
    132 132
             if parent:
    
    133
    -            self.ref = self.cas_cache.add_object(digest=parent.digest, buffer=self.pb2_directory.SerializeToString())
    
    133
    +            self.ref = self.cas_cache.add_object(digest=parent.digest,
    
    134
    +                                                 buffer=self.pb2_directory.SerializeToString())[0]
    
    134 135
             else:
    
    135
    -            self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString())
    
    136
    +            self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString())[0]
    
    136 137
             # We don't need to do anything more than that; files were already added ealier, and symlinks are
    
    137 138
             # part of the directory structure.
    
    138 139
     
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]