[Notes] [Git][BuildStream/buildstream][tiagogomes/issue-573] 11 commits: Improve documentation for artifact cache installation



Title: GitLab

Tiago Gomes pushed to branch tiagogomes/issue-573 at BuildStream / buildstream

Commits:

16 changed files:

Changes:

  • buildstream/_artifactcache/artifactcache.py
    ... ... @@ -16,6 +16,7 @@
    16 16
     #
    
    17 17
     #  Authors:
    
    18 18
     #        Tristan Maat <tristan maat codethink co uk>
    
    19
    +#        Tiago Gomes <tiago gomes codethink co uk>
    
    19 20
     
    
    20 21
     import os
    
    21 22
     import string
    
    ... ... @@ -85,8 +86,6 @@ class ArtifactCache():
    85 86
             self.extractdir = os.path.join(context.artifactdir, 'extract')
    
    86 87
             self.tmpdir = os.path.join(context.artifactdir, 'tmp')
    
    87 88
     
    
    88
    -        self.estimated_size = None
    
    89
    -
    
    90 89
             self.global_remote_specs = []
    
    91 90
             self.project_remote_specs = {}
    
    92 91
     
    
    ... ... @@ -228,10 +227,14 @@ class ArtifactCache():
    228 227
         #
    
    229 228
         # Clean the artifact cache as much as possible.
    
    230 229
         #
    
    230
    +    # Returns:
    
    231
    +    #     (int): Amount of bytes cleaned from the cache
    
    232
    +    #
    
    231 233
         def clean(self):
    
    232 234
             artifacts = self.list_artifacts()
    
    235
    +        cache_size = old_cache_size = self.get_cache_size()
    
    233 236
     
    
    234
    -        while self.calculate_cache_size() >= self.cache_quota - self.cache_lower_threshold:
    
    237
    +        while cache_size >= self.cache_quota - self.cache_lower_threshold:
    
    235 238
                 try:
    
    236 239
                     to_remove = artifacts.pop(0)
    
    237 240
                 except IndexError:
    
    ... ... @@ -245,7 +248,7 @@ class ArtifactCache():
    245 248
                               "Please increase the cache-quota in {}."
    
    246 249
                               .format(self.context.config_origin or default_conf))
    
    247 250
     
    
    248
    -                if self.calculate_cache_size() > self.cache_quota:
    
    251
    +                if cache_size > self.cache_quota:
    
    249 252
                         raise ArtifactError("Cache too full. Aborting.",
    
    250 253
                                             detail=detail,
    
    251 254
                                             reason="cache-too-full")
    
    ... ... @@ -255,44 +258,17 @@ class ArtifactCache():
    255 258
                 key = to_remove.rpartition('/')[2]
    
    256 259
                 if key not in self.required_artifacts:
    
    257 260
                     size = self.remove(to_remove)
    
    258
    -                if size:
    
    259
    -                    self.cache_size -= size
    
    260
    -
    
    261
    -        # This should be O(1) if implemented correctly
    
    262
    -        return self.calculate_cache_size()
    
    261
    +                cache_size -= size
    
    262
    +                self._message(MessageType.DEBUG,
    
    263
    +                              "Removed artifact {} ({})".format(
    
    264
    +                                  to_remove[:-(len(key) - self.context.log_key_length)],
    
    265
    +                                  utils._pretty_size(size)))
    
    263 266
     
    
    264
    -    # get_approximate_cache_size()
    
    265
    -    #
    
    266
    -    # A cheap method that aims to serve as an upper limit on the
    
    267
    -    # artifact cache size.
    
    268
    -    #
    
    269
    -    # The cache size reported by this function will normally be larger
    
    270
    -    # than the real cache size, since it is calculated using the
    
    271
    -    # pre-commit artifact size, but for very small artifacts in
    
    272
    -    # certain caches additional overhead could cause this to be
    
    273
    -    # smaller than, but close to, the actual size.
    
    274
    -    #
    
    275
    -    # Nonetheless, in practice this should be safe to use as an upper
    
    276
    -    # limit on the cache size.
    
    277
    -    #
    
    278
    -    # If the cache has built-in constant-time size reporting, please
    
    279
    -    # feel free to override this method with a more accurate
    
    280
    -    # implementation.
    
    281
    -    #
    
    282
    -    # Returns:
    
    283
    -    #     (int) An approximation of the artifact cache size.
    
    284
    -    #
    
    285
    -    def get_approximate_cache_size(self):
    
    286
    -        # If we don't currently have an estimate, figure out the real
    
    287
    -        # cache size.
    
    288
    -        if self.estimated_size is None:
    
    289
    -            stored_size = self._read_cache_size()
    
    290
    -            if stored_size is not None:
    
    291
    -                self.estimated_size = stored_size
    
    292
    -            else:
    
    293
    -                self.estimated_size = self.calculate_cache_size()
    
    267
    +        self._message(MessageType.INFO,
    
    268
    +                      "New artifact cache size: {}".format(
    
    269
    +                          utils._pretty_size(cache_size)))
    
    294 270
     
    
    295
    -        return self.estimated_size
    
    271
    +        return old_cache_size - cache_size
    
    296 272
     
    
    297 273
         ################################################
    
    298 274
         # Abstract methods for subclasses to implement #
    
    ... ... @@ -390,6 +366,10 @@ class ArtifactCache():
    390 366
         #     content (str): The element's content directory
    
    391 367
         #     keys (list): The cache keys to use
    
    392 368
         #
    
    369
    +    # Returns:
    
    370
    +    #   (int): Bytes required to cache the artifact taking deduplication
    
    371
    +    #          into account
    
    372
    +    #
    
    393 373
         def commit(self, element, content, keys):
    
    394 374
             raise ImplError("Cache '{kind}' does not implement commit()"
    
    395 375
                             .format(kind=type(self).__name__))
    
    ... ... @@ -462,6 +442,8 @@ class ArtifactCache():
    462 442
         #
    
    463 443
         # Returns:
    
    464 444
         #   (bool): True if pull was successful, False if artifact was not available
    
    445
    +    #   (int): Bytes required to cache the artifact taking deduplication
    
    446
    +    #          into account
    
    465 447
         #
    
    466 448
         def pull(self, element, key, *, progress=None):
    
    467 449
             raise ImplError("Cache '{kind}' does not implement pull()"
    
    ... ... @@ -484,8 +466,6 @@ class ArtifactCache():
    484 466
         #
    
    485 467
         # Return the real artifact cache size.
    
    486 468
         #
    
    487
    -    # Implementations should also use this to update estimated_size.
    
    488
    -    #
    
    489 469
         # Returns:
    
    490 470
         #
    
    491 471
         # (int) The size of the artifact cache.
    
    ... ... @@ -494,6 +474,22 @@ class ArtifactCache():
    494 474
             raise ImplError("Cache '{kind}' does not implement calculate_cache_size()"
    
    495 475
                             .format(kind=type(self).__name__))
    
    496 476
     
    
    477
    +    # get_cache_size()
    
    478
    +    #
    
    479
    +    # Return the artifact cache size.
    
    480
    +    #
    
    481
    +    # Returns:
    
    482
    +    #     (int) The size of the artifact cache.
    
    483
    +    #
    
    484
    +    def get_cache_size(self):
    
    485
    +        if self.cache_size is None:
    
    486
    +            self.cache_size = self._read_cache_size()
    
    487
    +
    
    488
    +        if self.cache_size is None:
    
    489
    +            self.cache_size = self.calculate_cache_size()
    
    490
    +
    
    491
    +        return self.cache_size
    
    492
    +
    
    497 493
         ################################################
    
    498 494
         #               Local Private Methods          #
    
    499 495
         ################################################
    
    ... ... @@ -537,32 +533,13 @@ class ArtifactCache():
    537 533
     
    
    538 534
         # _add_artifact_size()
    
    539 535
         #
    
    540
    -    # Since we cannot keep track of the cache size between threads,
    
    541
    -    # this method will be called by the main process every time a
    
    542
    -    # process that added something to the cache finishes.
    
    543
    -    #
    
    544
    -    # This will then add the reported size to
    
    545
    -    # ArtifactCache.estimated_size.
    
    536
    +    # Since we cannot keep track of the cache size between processes,
    
    537
    +    # this method will be called by the main process every time a job
    
    538
    +    # added or removed an artifact from the cache finishes.
    
    546 539
         #
    
    547 540
         def _add_artifact_size(self, artifact_size):
    
    548
    -        if not self.estimated_size:
    
    549
    -            self.estimated_size = self.calculate_cache_size()
    
    550
    -
    
    551
    -        self.estimated_size += artifact_size
    
    552
    -        self._write_cache_size(self.estimated_size)
    
    553
    -
    
    554
    -    # _set_cache_size()
    
    555
    -    #
    
    556
    -    # Similarly to the above method, when we calculate the actual size
    
    557
    -    # in a child thread, we can't update it. We instead pass the value
    
    558
    -    # back to the main thread and update it there.
    
    559
    -    #
    
    560
    -    def _set_cache_size(self, cache_size):
    
    561
    -        self.estimated_size = cache_size
    
    562
    -
    
    563
    -        # set_cache_size is called in cleanup, where it may set the cache to None
    
    564
    -        if self.estimated_size is not None:
    
    565
    -            self._write_cache_size(self.estimated_size)
    
    541
    +        self.cache_size = self.get_cache_size() + artifact_size
    
    542
    +        self._write_cache_size(self.cache_size)
    
    566 543
     
    
    567 544
         # _write_cache_size()
    
    568 545
         #
    
    ... ... @@ -628,7 +605,7 @@ class ArtifactCache():
    628 605
             stat = os.statvfs(artifactdir_volume)
    
    629 606
             available_space = (stat.f_bsize * stat.f_bavail)
    
    630 607
     
    
    631
    -        cache_size = self.get_approximate_cache_size()
    
    608
    +        cache_size = self.get_cache_size()
    
    632 609
     
    
    633 610
             # Ensure system has enough storage for the cache_quota
    
    634 611
             #
    

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -16,6 +16,7 @@
    16 16
     #
    
    17 17
     #  Authors:
    
    18 18
     #        Jürg Billeter <juerg billeter codethink co uk>
    
    19
    +#        Tiago Gomes <tiago gomes codethink co uk>
    
    19 20
     
    
    20 21
     import hashlib
    
    21 22
     import itertools
    
    ... ... @@ -95,7 +96,7 @@ class CASCache(ArtifactCache):
    95 96
     
    
    96 97
             with tempfile.TemporaryDirectory(prefix='tmp', dir=self.extractdir) as tmpdir:
    
    97 98
                 checkoutdir = os.path.join(tmpdir, ref)
    
    98
    -            self._checkout(checkoutdir, tree)
    
    99
    +            self._checkout_tree(checkoutdir, tree)
    
    99 100
     
    
    100 101
                 os.makedirs(os.path.dirname(dest), exist_ok=True)
    
    101 102
                 try:
    
    ... ... @@ -115,12 +116,12 @@ class CASCache(ArtifactCache):
    115 116
         def commit(self, element, content, keys):
    
    116 117
             refs = [self.get_artifact_fullname(element, key) for key in keys]
    
    117 118
     
    
    118
    -        tree = self._create_tree(content)
    
    119
    +        tree, size = self._create_tree(content)
    
    119 120
     
    
    120 121
             for ref in refs:
    
    121 122
                 self.set_ref(ref, tree)
    
    122 123
     
    
    123
    -        self.cache_size = None
    
    124
    +        return size
    
    124 125
     
    
    125 126
         def diff(self, element, key_a, key_b, *, subdir=None):
    
    126 127
             ref_a = self.get_artifact_fullname(element, key_a)
    
    ... ... @@ -238,12 +239,12 @@ class CASCache(ArtifactCache):
    238 239
                     tree.hash = response.digest.hash
    
    239 240
                     tree.size_bytes = response.digest.size_bytes
    
    240 241
     
    
    241
    -                self._fetch_directory(remote, tree)
    
    242
    +                size = self._fetch_tree(remote, tree)
    
    242 243
     
    
    243 244
                     self.set_ref(ref, tree)
    
    244 245
     
    
    245 246
                     # no need to pull from additional remotes
    
    246
    -                return True
    
    247
    +                return True, size
    
    247 248
     
    
    248 249
                 except grpc.RpcError as e:
    
    249 250
                     if e.code() != grpc.StatusCode.NOT_FOUND:
    
    ... ... @@ -257,7 +258,7 @@ class CASCache(ArtifactCache):
    257 258
                                 remote.spec.url, element._get_brief_display_key())
    
    258 259
                         ))
    
    259 260
     
    
    260
    -        return False
    
    261
    +        return False, 0
    
    261 262
     
    
    262 263
         def link_key(self, element, oldkey, newkey):
    
    263 264
             oldref = self.get_artifact_fullname(element, oldkey)
    
    ... ... @@ -397,6 +398,7 @@ class CASCache(ArtifactCache):
    397 398
         #
    
    398 399
         # Returns:
    
    399 400
         #     (Digest): The digest of the added object
    
    401
    +    #     (int): The number of bytes required to store the object
    
    400 402
         #
    
    401 403
         # Either `path` or `buffer` must be passed, but not both.
    
    402 404
         #
    
    ... ... @@ -425,22 +427,39 @@ class CASCache(ArtifactCache):
    425 427
     
    
    426 428
                     out.flush()
    
    427 429
     
    
    430
    +                file_size = os.fstat(out.fileno()).st_size
    
    431
    +
    
    428 432
                     digest.hash = h.hexdigest()
    
    429
    -                digest.size_bytes = os.fstat(out.fileno()).st_size
    
    433
    +                digest.size_bytes = file_size
    
    430 434
     
    
    431 435
                     # Place file at final location
    
    432 436
                     objpath = self.objpath(digest)
    
    433
    -                os.makedirs(os.path.dirname(objpath), exist_ok=True)
    
    437
    +                dirpath = os.path.dirname(objpath)
    
    438
    +
    
    439
    +                # Track the increased size on the parent directory caused by
    
    440
    +                # adding a new entry, as these directories can contain a large
    
    441
    +                # number of files.
    
    442
    +                new_dir_size = 0
    
    443
    +                old_dir_size = 0
    
    444
    +                try:
    
    445
    +                    os.makedirs(dirpath)
    
    446
    +                except FileExistsError:
    
    447
    +                    old_dir_size = os.stat(dirpath).st_size
    
    448
    +                else:
    
    449
    +                    new_dir_size = os.stat(dirpath).st_size
    
    450
    +
    
    434 451
                     os.link(out.name, objpath)
    
    452
    +                new_dir_size = os.stat(dirpath).st_size - old_dir_size
    
    435 453
     
    
    436 454
             except FileExistsError as e:
    
    437 455
                 # We can ignore the failed link() if the object is already in the repo.
    
    456
    +            file_size = 0
    
    438 457
                 pass
    
    439 458
     
    
    440 459
             except OSError as e:
    
    441 460
                 raise ArtifactError("Failed to hash object: {}".format(e)) from e
    
    442 461
     
    
    443
    -        return digest
    
    462
    +        return digest, file_size + new_dir_size
    
    444 463
     
    
    445 464
         # set_ref():
    
    446 465
         #
    
    ... ... @@ -449,6 +468,8 @@ class CASCache(ArtifactCache):
    449 468
         # Args:
    
    450 469
         #     ref (str): The name of the ref
    
    451 470
         #
    
    471
    +    # Note: Setting a ref will have a very low overhead on the cache
    
    472
    +    # size, so we don't track this.
    
    452 473
         def set_ref(self, ref, tree):
    
    453 474
             refpath = self._refpath(ref)
    
    454 475
             os.makedirs(os.path.dirname(refpath), exist_ok=True)
    
    ... ... @@ -488,11 +509,7 @@ class CASCache(ArtifactCache):
    488 509
                 raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e
    
    489 510
     
    
    490 511
         def calculate_cache_size(self):
    
    491
    -        if self.cache_size is None:
    
    492
    -            self.cache_size = utils._get_dir_size(self.casdir)
    
    493
    -            self.estimated_size = self.cache_size
    
    494
    -
    
    495
    -        return self.cache_size
    
    512
    +        return utils._get_dir_size(self.casdir)
    
    496 513
     
    
    497 514
         # list_artifacts():
    
    498 515
         #
    
    ... ... @@ -599,7 +616,7 @@ class CASCache(ArtifactCache):
    599 616
         ################################################
    
    600 617
         #             Local Private Methods            #
    
    601 618
         ################################################
    
    602
    -    def _checkout(self, dest, tree):
    
    619
    +    def _checkout_tree(self, dest, tree):
    
    603 620
             os.makedirs(dest, exist_ok=True)
    
    604 621
     
    
    605 622
             directory = remote_execution_pb2.Directory()
    
    ... ... @@ -618,7 +635,7 @@ class CASCache(ArtifactCache):
    618 635
     
    
    619 636
             for dirnode in directory.directories:
    
    620 637
                 fullpath = os.path.join(dest, dirnode.name)
    
    621
    -            self._checkout(fullpath, dirnode.digest)
    
    638
    +            self._checkout_tree(fullpath, dirnode.digest)
    
    622 639
     
    
    623 640
             for symlinknode in directory.symlinks:
    
    624 641
                 # symlink
    
    ... ... @@ -630,6 +647,7 @@ class CASCache(ArtifactCache):
    630 647
     
    
    631 648
         def _create_tree(self, path, *, digest=None):
    
    632 649
             directory = remote_execution_pb2.Directory()
    
    650
    +        size = 0
    
    633 651
     
    
    634 652
             for name in sorted(os.listdir(path)):
    
    635 653
                 full_path = os.path.join(path, name)
    
    ... ... @@ -637,11 +655,11 @@ class CASCache(ArtifactCache):
    637 655
                 if stat.S_ISDIR(mode):
    
    638 656
                     dirnode = directory.directories.add()
    
    639 657
                     dirnode.name = name
    
    640
    -                self._create_tree(full_path, digest=dirnode.digest)
    
    658
    +                size += self._create_tree(full_path, digest=dirnode.digest)[1]
    
    641 659
                 elif stat.S_ISREG(mode):
    
    642 660
                     filenode = directory.files.add()
    
    643 661
                     filenode.name = name
    
    644
    -                self.add_object(path=full_path, digest=filenode.digest)
    
    662
    +                size += self.add_object(path=full_path, digest=filenode.digest)[1]
    
    645 663
                     filenode.is_executable = (mode & stat.S_IXUSR) == stat.S_IXUSR
    
    646 664
                 elif stat.S_ISLNK(mode):
    
    647 665
                     symlinknode = directory.symlinks.add()
    
    ... ... @@ -650,7 +668,8 @@ class CASCache(ArtifactCache):
    650 668
                 else:
    
    651 669
                     raise ArtifactError("Unsupported file type for {}".format(full_path))
    
    652 670
     
    
    653
    -        return self.add_object(digest=digest, buffer=directory.SerializeToString())
    
    671
    +        res = self.add_object(digest=digest, buffer=directory.SerializeToString())
    
    672
    +        return res[0], res[1] + size
    
    654 673
     
    
    655 674
         def _get_subdir(self, tree, subdir):
    
    656 675
             head, name = os.path.split(subdir)
    
    ... ... @@ -793,11 +812,12 @@ class CASCache(ArtifactCache):
    793 812
             out.flush()
    
    794 813
             assert digest.size_bytes == os.fstat(out.fileno()).st_size
    
    795 814
     
    
    796
    -    def _fetch_directory(self, remote, tree):
    
    815
    +    def _fetch_tree(self, remote, tree):
    
    816
    +        size = 0
    
    797 817
             objpath = self.objpath(tree)
    
    798 818
             if os.path.exists(objpath):
    
    799 819
                 # already in local cache
    
    800
    -            return
    
    820
    +            return 0
    
    801 821
     
    
    802 822
             with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
    
    803 823
                 self._fetch_blob(remote, tree, out)
    
    ... ... @@ -808,7 +828,7 @@ class CASCache(ArtifactCache):
    808 828
                     directory.ParseFromString(f.read())
    
    809 829
     
    
    810 830
                 for filenode in directory.files:
    
    811
    -                fileobjpath = self.objpath(tree)
    
    831
    +                fileobjpath = self.objpath(filenode.digest)
    
    812 832
                     if os.path.exists(fileobjpath):
    
    813 833
                         # already in local cache
    
    814 834
                         continue
    
    ... ... @@ -816,17 +836,21 @@ class CASCache(ArtifactCache):
    816 836
                     with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
    
    817 837
                         self._fetch_blob(remote, filenode.digest, f)
    
    818 838
     
    
    819
    -                    digest = self.add_object(path=f.name)
    
    839
    +                    digest, obj_size = self.add_object(path=f.name)
    
    840
    +                    size += obj_size
    
    820 841
                         assert digest.hash == filenode.digest.hash
    
    821 842
     
    
    822 843
                 for dirnode in directory.directories:
    
    823
    -                self._fetch_directory(remote, dirnode.digest)
    
    844
    +                size += self._fetch_tree(remote, dirnode.digest)
    
    824 845
     
    
    825 846
                 # place directory blob only in final location when we've downloaded
    
    826 847
                 # all referenced blobs to avoid dangling references in the repository
    
    827
    -            digest = self.add_object(path=out.name)
    
    848
    +            digest, obj_size = self.add_object(path=out.name)
    
    849
    +            size += obj_size
    
    828 850
                 assert digest.hash == tree.hash
    
    829 851
     
    
    852
    +            return size
    
    853
    +
    
    830 854
     
    
    831 855
     # Represents a single remote CAS cache.
    
    832 856
     #
    

  • buildstream/_artifactcache/casserver.py
    ... ... @@ -203,7 +203,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    203 203
                             context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    204 204
                             return response
    
    205 205
                         out.flush()
    
    206
    -                    digest = self.cas.add_object(path=out.name)
    
    206
    +                    digest = self.cas.add_object(path=out.name)[0]
    
    207 207
                         if digest.hash != client_digest.hash:
    
    208 208
                             context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    209 209
                             return response
    

  • buildstream/_context.py
    ... ... @@ -119,7 +119,6 @@ class Context():
    119 119
             self._log_handle = None
    
    120 120
             self._log_filename = None
    
    121 121
             self.config_cache_quota = 'infinity'
    
    122
    -        self.artifactdir_volume = None
    
    123 122
     
    
    124 123
         # load()
    
    125 124
         #
    

  • buildstream/_scheduler/jobs/__init__.py
    1 1
     from .elementjob import ElementJob
    
    2
    -from .cachesizejob import CacheSizeJob
    
    3 2
     from .cleanupjob import CleanupJob

  • buildstream/_scheduler/jobs/cachesizejob.py deleted
    1
    -#  Copyright (C) 2018 Codethink Limited
    
    2
    -#
    
    3
    -#  This program is free software; you can redistribute it and/or
    
    4
    -#  modify it under the terms of the GNU Lesser General Public
    
    5
    -#  License as published by the Free Software Foundation; either
    
    6
    -#  version 2 of the License, or (at your option) any later version.
    
    7
    -#
    
    8
    -#  This library is distributed in the hope that it will be useful,
    
    9
    -#  but WITHOUT ANY WARRANTY; without even the implied warranty of
    
    10
    -#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
    
    11
    -#  Lesser General Public License for more details.
    
    12
    -#
    
    13
    -#  You should have received a copy of the GNU Lesser General Public
    
    14
    -#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
    
    15
    -#
    
    16
    -#  Author:
    
    17
    -#        Tristan Daniël Maat <tristan maat codethink co uk>
    
    18
    -#
    
    19
    -from .job import Job
    
    20
    -from ..._platform import Platform
    
    21
    -
    
    22
    -
    
    23
    -class CacheSizeJob(Job):
    
    24
    -    def __init__(self, *args, complete_cb, **kwargs):
    
    25
    -        super().__init__(*args, **kwargs)
    
    26
    -        self._complete_cb = complete_cb
    
    27
    -        self._cache = Platform._instance.artifactcache
    
    28
    -
    
    29
    -    def child_process(self):
    
    30
    -        return self._cache.calculate_cache_size()
    
    31
    -
    
    32
    -    def parent_complete(self, success, result):
    
    33
    -        self._cache._set_cache_size(result)
    
    34
    -        if self._complete_cb:
    
    35
    -            self._complete_cb(result)
    
    36
    -
    
    37
    -    def child_process_data(self):
    
    38
    -        return {}

  • buildstream/_scheduler/jobs/cleanupjob.py
    ... ... @@ -21,18 +21,19 @@ from ..._platform import Platform
    21 21
     
    
    22 22
     
    
    23 23
     class CleanupJob(Job):
    
    24
    -    def __init__(self, *args, complete_cb, **kwargs):
    
    24
    +    def __init__(self, *args, **kwargs):
    
    25 25
             super().__init__(*args, **kwargs)
    
    26
    -        self._complete_cb = complete_cb
    
    27 26
             self._cache = Platform._instance.artifactcache
    
    28 27
     
    
    29 28
         def child_process(self):
    
    30 29
             return self._cache.clean()
    
    31 30
     
    
    32 31
         def parent_complete(self, success, result):
    
    33
    -        self._cache._set_cache_size(result)
    
    34
    -        if self._complete_cb:
    
    35
    -            self._complete_cb()
    
    32
    +        if success:
    
    33
    +            # ArtifactCache.clean() returns the number of bytes cleaned.
    
    34
    +            # We negate the number because the cache size is to be
    
    35
    +            # decreased.
    
    36
    +            self._cache._add_artifact_size(result * -1)
    
    36 37
     
    
    37 38
         def child_process_data(self):
    
    38 39
             return {}

  • buildstream/_scheduler/jobs/elementjob.py
    ... ... @@ -110,12 +110,10 @@ class ElementJob(Job):
    110 110
     
    
    111 111
             workspace = self._element._get_workspace()
    
    112 112
             artifact_size = self._element._get_artifact_size()
    
    113
    -        cache_size = self._element._get_artifact_cache().calculate_cache_size()
    
    114 113
     
    
    115 114
             if workspace is not None:
    
    116 115
                 data['workspace'] = workspace.to_dict()
    
    117 116
             if artifact_size is not None:
    
    118 117
                 data['artifact_size'] = artifact_size
    
    119
    -        data['cache_size'] = cache_size
    
    120 118
     
    
    121 119
             return data

  • buildstream/_scheduler/jobs/job.py
    ... ... @@ -109,7 +109,7 @@ class Job():
    109 109
             # Private members
    
    110 110
             #
    
    111 111
             self._scheduler = scheduler            # The scheduler
    
    112
    -        self._queue = multiprocessing.Queue()  # A message passing queue
    
    112
    +        self._queue = None                     # A message passing queue
    
    113 113
             self._process = None                   # The Process object
    
    114 114
             self._watcher = None                   # Child process watcher
    
    115 115
             self._listening = False                # Whether the parent is currently listening
    
    ... ... @@ -130,6 +130,8 @@ class Job():
    130 130
         #
    
    131 131
         def spawn(self):
    
    132 132
     
    
    133
    +        self._queue = multiprocessing.Queue()
    
    134
    +
    
    133 135
             self._tries += 1
    
    134 136
             self._parent_start_listening()
    
    135 137
     
    
    ... ... @@ -552,6 +554,9 @@ class Job():
    552 554
             self.parent_complete(returncode == RC_OK, self._result)
    
    553 555
             self._scheduler.job_completed(self, returncode == RC_OK)
    
    554 556
     
    
    557
    +        # Force the deletion of the queue and process objects to try and clean up FDs
    
    558
    +        self._queue = self._process = None
    
    559
    +
    
    555 560
         # _parent_process_envelope()
    
    556 561
         #
    
    557 562
         # Processes a message Envelope deserialized form the message queue.
    

  • buildstream/_scheduler/queues/buildqueue.py
    ... ... @@ -87,19 +87,6 @@ class BuildQueue(Queue):
    87 87
     
    
    88 88
             return QueueStatus.READY
    
    89 89
     
    
    90
    -    def _check_cache_size(self, job, element):
    
    91
    -        if not job.child_data:
    
    92
    -            return
    
    93
    -
    
    94
    -        artifact_size = job.child_data.get('artifact_size', False)
    
    95
    -
    
    96
    -        if artifact_size:
    
    97
    -            cache = element._get_artifact_cache()
    
    98
    -            cache._add_artifact_size(artifact_size)
    
    99
    -
    
    100
    -            if cache.get_approximate_cache_size() > cache.cache_quota:
    
    101
    -                self._scheduler._check_cache_size_real()
    
    102
    -
    
    103 90
         def done(self, job, element, result, success):
    
    104 91
     
    
    105 92
             if success:
    

  • buildstream/_scheduler/queues/pullqueue.py
    1 1
     #
    
    2
    -#  Copyright (C) 2016 Codethink Limited
    
    2
    +#  Copyright (C) 2018 Codethink Limited
    
    3 3
     #
    
    4 4
     #  This program is free software; you can redistribute it and/or
    
    5 5
     #  modify it under the terms of the GNU Lesser General Public
    
    ... ... @@ -52,17 +52,14 @@ class PullQueue(Queue):
    52 52
             else:
    
    53 53
                 return QueueStatus.SKIP
    
    54 54
     
    
    55
    -    def done(self, _, element, result, success):
    
    55
    +    def done(self, job, element, result, success):
    
    56 56
     
    
    57 57
             if not success:
    
    58 58
                 return False
    
    59 59
     
    
    60 60
             element._pull_done()
    
    61 61
     
    
    62
    -        # Build jobs will check the "approximate" size first. Since we
    
    63
    -        # do not get an artifact size from pull jobs, we have to
    
    64
    -        # actually check the cache size.
    
    65
    -        self._scheduler._check_cache_size_real()
    
    62
    +        self._check_cache_size(job, element)
    
    66 63
     
    
    67 64
             # Element._pull() returns True if it downloaded an artifact,
    
    68 65
             # here we want to appear skipped if we did not download.
    

  • buildstream/_scheduler/queues/queue.py
    1 1
     #
    
    2
    -#  Copyright (C) 2016 Codethink Limited
    
    2
    +#  Copyright (C) 2018 Codethink Limited
    
    3 3
     #
    
    4 4
     #  This program is free software; you can redistribute it and/or
    
    5 5
     #  modify it under the terms of the GNU Lesser General Public
    
    ... ... @@ -301,8 +301,6 @@ class Queue():
    301 301
             # Update values that need to be synchronized in the main task
    
    302 302
             # before calling any queue implementation
    
    303 303
             self._update_workspaces(element, job)
    
    304
    -        if job.child_data:
    
    305
    -            element._get_artifact_cache().cache_size = job.child_data.get('cache_size')
    
    306 304
     
    
    307 305
             # Give the result of the job to the Queue implementor,
    
    308 306
             # and determine if it should be considered as processed
    
    ... ... @@ -360,3 +358,16 @@ class Queue():
    360 358
             logfile = "{key}-{action}".format(key=key, action=action)
    
    361 359
     
    
    362 360
             return os.path.join(project.name, element.normal_name, logfile)
    
    361
    +
    
    362
    +    def _check_cache_size(self, job, element):
    
    363
    +        if not job.child_data:
    
    364
    +            return
    
    365
    +
    
    366
    +        artifact_size = job.child_data.get('artifact_size', False)
    
    367
    +
    
    368
    +        if artifact_size:
    
    369
    +            cache = element._get_artifact_cache()
    
    370
    +            cache._add_artifact_size(artifact_size)
    
    371
    +
    
    372
    +            if cache.get_cache_size() > cache.cache_quota:
    
    373
    +                self._scheduler._run_cache_cleanup()

  • buildstream/_scheduler/scheduler.py
    1 1
     #
    
    2
    -#  Copyright (C) 2016 Codethink Limited
    
    2
    +#  Copyright (C) 2018 Codethink Limited
    
    3 3
     #
    
    4 4
     #  This program is free software; you can redistribute it and/or
    
    5 5
     #  modify it under the terms of the GNU Lesser General Public
    
    ... ... @@ -28,8 +28,7 @@ from contextlib import contextmanager
    28 28
     
    
    29 29
     # Local imports
    
    30 30
     from .resources import Resources, ResourceType
    
    31
    -from .jobs import CacheSizeJob, CleanupJob
    
    32
    -from .._platform import Platform
    
    31
    +from .jobs import CleanupJob
    
    33 32
     
    
    34 33
     
    
    35 34
     # A decent return code for Scheduler.run()
    
    ... ... @@ -316,24 +315,11 @@ class Scheduler():
    316 315
             self.schedule_jobs(ready)
    
    317 316
             self._sched()
    
    318 317
     
    
    319
    -    def _run_cleanup(self, cache_size):
    
    320
    -        platform = Platform.get_platform()
    
    321
    -        if cache_size and cache_size < platform.artifactcache.cache_quota:
    
    322
    -            return
    
    323
    -
    
    324
    -        job = CleanupJob(self, 'cleanup', 'cleanup',
    
    318
    +    def _run_cache_cleanup(self):
    
    319
    +        job = CleanupJob(self, 'Cleaning artifact cache', 'cleanup',
    
    325 320
                              resources=[ResourceType.CACHE,
    
    326 321
                                         ResourceType.PROCESS],
    
    327
    -                         exclusive_resources=[ResourceType.CACHE],
    
    328
    -                         complete_cb=None)
    
    329
    -        self.schedule_jobs([job])
    
    330
    -
    
    331
    -    def _check_cache_size_real(self):
    
    332
    -        job = CacheSizeJob(self, 'cache_size', 'cache_size/cache_size',
    
    333
    -                           resources=[ResourceType.CACHE,
    
    334
    -                                      ResourceType.PROCESS],
    
    335
    -                           exclusive_resources=[ResourceType.CACHE],
    
    336
    -                           complete_cb=self._run_cleanup)
    
    322
    +                         exclusive_resources=[ResourceType.CACHE])
    
    337 323
             self.schedule_jobs([job])
    
    338 324
     
    
    339 325
         # _suspend_jobs()
    

  • buildstream/element.py
    ... ... @@ -1646,8 +1646,8 @@ class Element(Plugin):
    1646 1646
                         }), os.path.join(metadir, 'workspaced-dependencies.yaml'))
    
    1647 1647
     
    
    1648 1648
                         with self.timed_activity("Caching artifact"):
    
    1649
    -                        self.__artifact_size = utils._get_dir_size(assembledir)
    
    1650
    -                        self.__artifacts.commit(self, assembledir, self.__get_cache_keys_for_commit())
    
    1649
    +                        self.__artifact_size = self.__artifacts.commit(
    
    1650
    +                            self, assembledir, self.__get_cache_keys_for_commit())
    
    1651 1651
     
    
    1652 1652
                         if collect is not None and collectvdir is None:
    
    1653 1653
                             raise ElementError(
    
    ... ... @@ -1697,31 +1697,31 @@ class Element(Plugin):
    1697 1697
             self._update_state()
    
    1698 1698
     
    
    1699 1699
         def _pull_strong(self, *, progress=None):
    
    1700
    -        weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
    
    1701
    -
    
    1702 1700
             key = self.__strict_cache_key
    
    1703
    -        if not self.__artifacts.pull(self, key, progress=progress):
    
    1704
    -            return False
    
    1701
    +        pulled, self.__artifact_size = self.__artifacts.pull(
    
    1702
    +            self, key, progress=progress)
    
    1705 1703
     
    
    1706
    -        # update weak ref by pointing it to this newly fetched artifact
    
    1707
    -        self.__artifacts.link_key(self, key, weak_key)
    
    1704
    +        if pulled:
    
    1705
    +            # update weak ref by pointing it to this newly fetched artifact
    
    1706
    +            weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
    
    1707
    +            self.__artifacts.link_key(self, key, weak_key)
    
    1708 1708
     
    
    1709
    -        return True
    
    1709
    +        return pulled
    
    1710 1710
     
    
    1711 1711
         def _pull_weak(self, *, progress=None):
    
    1712 1712
             weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
    
    1713
    +        pulled, self.__artifact_size = self.__artifacts.pull(
    
    1714
    +            self, weak_key, progress=progress)
    
    1713 1715
     
    
    1714
    -        if not self.__artifacts.pull(self, weak_key, progress=progress):
    
    1715
    -            return False
    
    1716
    -
    
    1717
    -        # extract strong cache key from this newly fetched artifact
    
    1718
    -        self._pull_done()
    
    1716
    +        if pulled:
    
    1717
    +            # extract strong cache key from this newly fetched artifact
    
    1718
    +            self._pull_done()
    
    1719 1719
     
    
    1720
    -        # create tag for strong cache key
    
    1721
    -        key = self._get_cache_key(strength=_KeyStrength.STRONG)
    
    1722
    -        self.__artifacts.link_key(self, weak_key, key)
    
    1720
    +            # create tag for strong cache key
    
    1721
    +            key = self._get_cache_key(strength=_KeyStrength.STRONG)
    
    1722
    +            self.__artifacts.link_key(self, weak_key, key)
    
    1723 1723
     
    
    1724
    -        return True
    
    1724
    +        return pulled
    
    1725 1725
     
    
    1726 1726
         # _pull():
    
    1727 1727
         #
    
    ... ... @@ -1741,13 +1741,12 @@ class Element(Plugin):
    1741 1741
             if not pulled and not self._cached() and not context.get_strict():
    
    1742 1742
                 pulled = self._pull_weak(progress=progress)
    
    1743 1743
     
    
    1744
    -        if not pulled:
    
    1745
    -            return False
    
    1744
    +        if pulled:
    
    1745
    +            # Notify successfull download
    
    1746
    +            display_key = self._get_brief_display_key()
    
    1747
    +            self.info("Downloaded artifact {}".format(display_key))
    
    1746 1748
     
    
    1747
    -        # Notify successfull download
    
    1748
    -        display_key = self._get_brief_display_key()
    
    1749
    -        self.info("Downloaded artifact {}".format(display_key))
    
    1750
    -        return True
    
    1749
    +        return pulled
    
    1751 1750
     
    
    1752 1751
         # _skip_push():
    
    1753 1752
         #
    

  • buildstream/storage/_casbaseddirectory.py
    ... ... @@ -111,7 +111,7 @@ class CasBasedDirectory(Directory):
    111 111
             the parent).
    
    112 112
     
    
    113 113
             """
    
    114
    -        self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString())
    
    114
    +        self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString())[0]
    
    115 115
             if caller:
    
    116 116
                 old_dir = self._find_pb2_entry(caller.filename)
    
    117 117
                 self.cas_cache.add_object(digest=old_dir.digest, buffer=caller.pb2_directory.SerializeToString())
    
    ... ... @@ -130,9 +130,10 @@ class CasBasedDirectory(Directory):
    130 130
                 self.index[entry.name].buildstream_object._recalculate_recursing_down(entry)
    
    131 131
     
    
    132 132
             if parent:
    
    133
    -            self.ref = self.cas_cache.add_object(digest=parent.digest, buffer=self.pb2_directory.SerializeToString())
    
    133
    +            self.ref = self.cas_cache.add_object(digest=parent.digest,
    
    134
    +                                                 buffer=self.pb2_directory.SerializeToString())[0]
    
    134 135
             else:
    
    135
    -            self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString())
    
    136
    +            self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString())[0]
    
    136 137
             # We don't need to do anything more than that; files were already added ealier, and symlinks are
    
    137 138
             # part of the directory structure.
    
    138 139
     
    

  • doc/source/install_artifacts.rst
    ... ... @@ -161,13 +161,13 @@ Below are two examples of how to run the cache server as a systemd service, one
    161 161
     
    
    162 162
        [Service]
    
    163 163
        Environment="LC_ALL=C.UTF-8"
    
    164
    -   ExecStart=/usr/local/bin/bst-artifact-server --port 11001 --server-key {{certs_path}}/privkey.pem --
    
    165
    -   server-cert {{certs_path}}/fullchain.pem {{artifacts_path}}
    
    164
    +   ExecStart=/usr/local/bin/bst-artifact-server --port 11001 --server-key {{certs_path}}/server.key --server-cert {{certs_path}}/server.crt {{artifacts_path}}
    
    166 165
        User=artifacts
    
    167 166
     
    
    168 167
        [Install]
    
    169 168
        WantedBy=multi-user.target
    
    170 169
     
    
    170
    +.. code:: ini
    
    171 171
     
    
    172 172
        #
    
    173 173
        # Pull/Push
    
    ... ... @@ -178,9 +178,7 @@ Below are two examples of how to run the cache server as a systemd service, one
    178 178
     
    
    179 179
        [Service]
    
    180 180
        Environment="LC_ALL=C.UTF-8"
    
    181
    -   ExecStart=/usr/local/bin/bst-artifact-server --port 11002 --server-key {{certs_path}}/privkey.pem --
    
    182
    -   server-cert {{certs_path}}/fullchain.pem --client-certs /home/artifacts/authorized.crt --enable-push /
    
    183
    -   {{artifacts_path}}
    
    181
    +   ExecStart=/usr/local/bin/bst-artifact-server --port 11002 --server-key {{certs_path}}/server.key --server-cert {{certs_path}}/server.crt --client-certs {{certs_path}}/authorized.crt --enable-push {{artifacts_path}}
    
    184 182
        User=artifacts
    
    185 183
     
    
    186 184
        [Install]
    
    ... ... @@ -188,11 +186,16 @@ Below are two examples of how to run the cache server as a systemd service, one
    188 186
     
    
    189 187
     Here we define when systemd should start the service, which is after the networking stack has been started, we then define how to run the cache with the desired configuration, under the artifacts user. The {{ }} are there to denote where you should change these files to point to your desired locations.
    
    190 188
     
    
    189
    +For more information on systemd services see: 
    
    190
    +`Creating Systemd Service Files <https://www.devdungeon.com/content/creating-systemd-service-files>`_.
    
    191
    +
    
    191 192
     User configuration
    
    192 193
     ~~~~~~~~~~~~~~~~~~
    
    193 194
     The user configuration for artifacts is documented with the rest
    
    194 195
     of the :ref:`user configuration documentation <user_config>`.
    
    195 196
     
    
    197
    +Note that for self-signed certificates, the public key fields are mandatory.
    
    198
    +
    
    196 199
     Assuming you have the same setup used in this document, and that your
    
    197 200
     host is reachable on the internet as ``artifacts.com`` (for example),
    
    198 201
     then a user can use the following user configuration:
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]