[Notes] [Git][BuildStream/buildstream][master] 15 commits: tests/artifactcache/expiry.py: Fix test case expectations.



Title: GitLab

Tristan Van Berkom pushed to branch master at BuildStream / buildstream

Commits:

12 changed files:

Changes:

  • buildstream/_artifactcache/artifactcache.py
    ... ... @@ -81,19 +81,16 @@ ArtifactCacheSpec.__new__.__defaults__ = (None, None, None)
    81 81
     class ArtifactCache():
    
    82 82
         def __init__(self, context):
    
    83 83
             self.context = context
    
    84
    -        self.required_artifacts = set()
    
    85 84
             self.extractdir = os.path.join(context.artifactdir, 'extract')
    
    86 85
             self.tmpdir = os.path.join(context.artifactdir, 'tmp')
    
    87 86
     
    
    88
    -        self.estimated_size = None
    
    89
    -
    
    90 87
             self.global_remote_specs = []
    
    91 88
             self.project_remote_specs = {}
    
    92 89
     
    
    93
    -        self._local = False
    
    94
    -        self.cache_size = None
    
    95
    -        self.cache_quota = None
    
    96
    -        self.cache_lower_threshold = None
    
    90
    +        self._required_artifacts = set()      # The artifacts required for this session
    
    91
    +        self._cache_size = None               # The current cache size, sometimes it's an estimate
    
    92
    +        self._cache_quota = None              # The cache quota
    
    93
    +        self._cache_lower_threshold = None    # The target cache size for a cleanup
    
    97 94
     
    
    98 95
             os.makedirs(self.extractdir, exist_ok=True)
    
    99 96
             os.makedirs(self.tmpdir, exist_ok=True)
    
    ... ... @@ -212,8 +209,8 @@ class ArtifactCache():
    212 209
                 weak_key = element._get_cache_key(strength=_KeyStrength.WEAK)
    
    213 210
     
    
    214 211
                 for key in (strong_key, weak_key):
    
    215
    -                if key and key not in self.required_artifacts:
    
    216
    -                    self.required_artifacts.add(key)
    
    212
    +                if key and key not in self._required_artifacts:
    
    213
    +                    self._required_artifacts.add(key)
    
    217 214
     
    
    218 215
                         # We also update the usage times of any artifacts
    
    219 216
                         # we will be using, which helps preventing a
    
    ... ... @@ -228,10 +225,16 @@ class ArtifactCache():
    228 225
         #
    
    229 226
         # Clean the artifact cache as much as possible.
    
    230 227
         #
    
    228
    +    # Returns:
    
    229
    +    #    (int): The size of the cache after having cleaned up
    
    230
    +    #
    
    231 231
         def clean(self):
    
    232 232
             artifacts = self.list_artifacts()
    
    233 233
     
    
    234
    -        while self.calculate_cache_size() >= self.cache_quota - self.cache_lower_threshold:
    
    234
    +        # Do a real computation of the cache size once, just in case
    
    235
    +        self.compute_cache_size()
    
    236
    +
    
    237
    +        while self.get_cache_size() >= self._cache_lower_threshold:
    
    235 238
                 try:
    
    236 239
                     to_remove = artifacts.pop(0)
    
    237 240
                 except IndexError:
    
    ... ... @@ -245,7 +248,7 @@ class ArtifactCache():
    245 248
                               "Please increase the cache-quota in {}."
    
    246 249
                               .format(self.context.config_origin or default_conf))
    
    247 250
     
    
    248
    -                if self.calculate_cache_size() > self.cache_quota:
    
    251
    +                if self.get_quota_exceeded():
    
    249 252
                         raise ArtifactError("Cache too full. Aborting.",
    
    250 253
                                             detail=detail,
    
    251 254
                                             reason="cache-too-full")
    
    ... ... @@ -253,46 +256,94 @@ class ArtifactCache():
    253 256
                         break
    
    254 257
     
    
    255 258
                 key = to_remove.rpartition('/')[2]
    
    256
    -            if key not in self.required_artifacts:
    
    259
    +            if key not in self._required_artifacts:
    
    260
    +
    
    261
    +                # Remove the actual artifact, if it's not required.
    
    257 262
                     size = self.remove(to_remove)
    
    258
    -                if size:
    
    259
    -                    self.cache_size -= size
    
    263
    +
    
    264
    +                # Remove the size from the removed size
    
    265
    +                self.set_cache_size(self._cache_size - size)
    
    260 266
     
    
    261 267
             # This should be O(1) if implemented correctly
    
    262
    -        return self.calculate_cache_size()
    
    268
    +        return self.get_cache_size()
    
    263 269
     
    
    264
    -    # get_approximate_cache_size()
    
    270
    +    # compute_cache_size()
    
    265 271
         #
    
    266
    -    # A cheap method that aims to serve as an upper limit on the
    
    267
    -    # artifact cache size.
    
    272
    +    # Computes the real artifact cache size by calling
    
    273
    +    # the abstract calculate_cache_size() method.
    
    268 274
         #
    
    269
    -    # The cache size reported by this function will normally be larger
    
    270
    -    # than the real cache size, since it is calculated using the
    
    271
    -    # pre-commit artifact size, but for very small artifacts in
    
    272
    -    # certain caches additional overhead could cause this to be
    
    273
    -    # smaller than, but close to, the actual size.
    
    275
    +    # Returns:
    
    276
    +    #    (int): The size of the artifact cache.
    
    277
    +    #
    
    278
    +    def compute_cache_size(self):
    
    279
    +        self._cache_size = self.calculate_cache_size()
    
    280
    +
    
    281
    +        return self._cache_size
    
    282
    +
    
    283
    +    # add_artifact_size()
    
    274 284
         #
    
    275
    -    # Nonetheless, in practice this should be safe to use as an upper
    
    276
    -    # limit on the cache size.
    
    285
    +    # Adds the reported size of a newly cached artifact to the
    
    286
    +    # overall estimated size.
    
    277 287
         #
    
    278
    -    # If the cache has built-in constant-time size reporting, please
    
    279
    -    # feel free to override this method with a more accurate
    
    280
    -    # implementation.
    
    288
    +    # Args:
    
    289
    +    #     artifact_size (int): The size to add.
    
    290
    +    #
    
    291
    +    def add_artifact_size(self, artifact_size):
    
    292
    +        cache_size = self.get_cache_size()
    
    293
    +        cache_size += artifact_size
    
    294
    +
    
    295
    +        self.set_cache_size(cache_size)
    
    296
    +
    
    297
    +    # get_cache_size()
    
    298
    +    #
    
    299
    +    # Fetches the cached size of the cache, this is sometimes
    
    300
    +    # an estimate and periodically adjusted to the real size
    
    301
    +    # when a cache size calculation job runs.
    
    302
    +    #
    
    303
    +    # When it is an estimate, the value is either correct, or
    
    304
    +    # it is greater than the actual cache size.
    
    281 305
         #
    
    282 306
         # Returns:
    
    283 307
         #     (int) An approximation of the artifact cache size.
    
    284 308
         #
    
    285
    -    def get_approximate_cache_size(self):
    
    286
    -        # If we don't currently have an estimate, figure out the real
    
    287
    -        # cache size.
    
    288
    -        if self.estimated_size is None:
    
    309
    +    def get_cache_size(self):
    
    310
    +
    
    311
    +        # If we don't currently have an estimate, figure out the real cache size.
    
    312
    +        if self._cache_size is None:
    
    289 313
                 stored_size = self._read_cache_size()
    
    290 314
                 if stored_size is not None:
    
    291
    -                self.estimated_size = stored_size
    
    315
    +                self._cache_size = stored_size
    
    292 316
                 else:
    
    293
    -                self.estimated_size = self.calculate_cache_size()
    
    317
    +                self.compute_cache_size()
    
    318
    +
    
    319
    +        return self._cache_size
    
    294 320
     
    
    295
    -        return self.estimated_size
    
    321
    +    # set_cache_size()
    
    322
    +    #
    
    323
    +    # Forcefully set the overall cache size.
    
    324
    +    #
    
    325
    +    # This is used to update the size in the main process after
    
    326
    +    # having calculated in a cleanup or a cache size calculation job.
    
    327
    +    #
    
    328
    +    # Args:
    
    329
    +    #     cache_size (int): The size to set.
    
    330
    +    #
    
    331
    +    def set_cache_size(self, cache_size):
    
    332
    +
    
    333
    +        assert cache_size is not None
    
    334
    +
    
    335
    +        self._cache_size = cache_size
    
    336
    +        self._write_cache_size(self._cache_size)
    
    337
    +
    
    338
    +    # get_quota_exceeded()
    
    339
    +    #
    
    340
    +    # Checks if the current artifact cache size exceeds the quota.
    
    341
    +    #
    
    342
    +    # Returns:
    
    343
    +    #    (bool): True of the quota is exceeded
    
    344
    +    #
    
    345
    +    def get_quota_exceeded(self):
    
    346
    +        return self.get_cache_size() > self._cache_quota
    
    296 347
     
    
    297 348
         ################################################
    
    298 349
         # Abstract methods for subclasses to implement #
    
    ... ... @@ -484,11 +535,8 @@ class ArtifactCache():
    484 535
         #
    
    485 536
         # Return the real artifact cache size.
    
    486 537
         #
    
    487
    -    # Implementations should also use this to update estimated_size.
    
    488
    -    #
    
    489 538
         # Returns:
    
    490
    -    #
    
    491
    -    # (int) The size of the artifact cache.
    
    539
    +    #    (int): The size of the artifact cache.
    
    492 540
         #
    
    493 541
         def calculate_cache_size(self):
    
    494 542
             raise ImplError("Cache '{kind}' does not implement calculate_cache_size()"
    
    ... ... @@ -535,39 +583,13 @@ class ArtifactCache():
    535 583
             with self.context.timed_activity("Initializing remote caches", silent_nested=True):
    
    536 584
                 self.initialize_remotes(on_failure=remote_failed)
    
    537 585
     
    
    538
    -    # _add_artifact_size()
    
    539
    -    #
    
    540
    -    # Since we cannot keep track of the cache size between threads,
    
    541
    -    # this method will be called by the main process every time a
    
    542
    -    # process that added something to the cache finishes.
    
    543
    -    #
    
    544
    -    # This will then add the reported size to
    
    545
    -    # ArtifactCache.estimated_size.
    
    546
    -    #
    
    547
    -    def _add_artifact_size(self, artifact_size):
    
    548
    -        if not self.estimated_size:
    
    549
    -            self.estimated_size = self.calculate_cache_size()
    
    550
    -
    
    551
    -        self.estimated_size += artifact_size
    
    552
    -        self._write_cache_size(self.estimated_size)
    
    553
    -
    
    554
    -    # _set_cache_size()
    
    555
    -    #
    
    556
    -    # Similarly to the above method, when we calculate the actual size
    
    557
    -    # in a child thread, we can't update it. We instead pass the value
    
    558
    -    # back to the main thread and update it there.
    
    559
    -    #
    
    560
    -    def _set_cache_size(self, cache_size):
    
    561
    -        self.estimated_size = cache_size
    
    562
    -
    
    563
    -        # set_cache_size is called in cleanup, where it may set the cache to None
    
    564
    -        if self.estimated_size is not None:
    
    565
    -            self._write_cache_size(self.estimated_size)
    
    566
    -
    
    567 586
         # _write_cache_size()
    
    568 587
         #
    
    569 588
         # Writes the given size of the artifact to the cache's size file
    
    570 589
         #
    
    590
    +    # Args:
    
    591
    +    #    size (int): The size of the artifact cache to record
    
    592
    +    #
    
    571 593
         def _write_cache_size(self, size):
    
    572 594
             assert isinstance(size, int)
    
    573 595
             size_file_path = os.path.join(self.context.artifactdir, CACHE_SIZE_FILE)
    
    ... ... @@ -579,6 +601,9 @@ class ArtifactCache():
    579 601
         # Reads and returns the size of the artifact cache that's stored in the
    
    580 602
         # cache's size file
    
    581 603
         #
    
    604
    +    # Returns:
    
    605
    +    #    (int): The size of the artifact cache, as recorded in the file
    
    606
    +    #
    
    582 607
         def _read_cache_size(self):
    
    583 608
             size_file_path = os.path.join(self.context.artifactdir, CACHE_SIZE_FILE)
    
    584 609
     
    
    ... ... @@ -628,13 +653,13 @@ class ArtifactCache():
    628 653
             stat = os.statvfs(artifactdir_volume)
    
    629 654
             available_space = (stat.f_bsize * stat.f_bavail)
    
    630 655
     
    
    631
    -        cache_size = self.get_approximate_cache_size()
    
    656
    +        cache_size = self.get_cache_size()
    
    632 657
     
    
    633 658
             # Ensure system has enough storage for the cache_quota
    
    634 659
             #
    
    635 660
             # If cache_quota is none, set it to the maximum it could possibly be.
    
    636 661
             #
    
    637
    -        # Also check that cache_quota is atleast as large as our headroom.
    
    662
    +        # Also check that cache_quota is at least as large as our headroom.
    
    638 663
             #
    
    639 664
             if cache_quota is None:  # Infinity, set to max system storage
    
    640 665
                 cache_quota = cache_size + available_space
    
    ... ... @@ -660,8 +685,8 @@ class ArtifactCache():
    660 685
             # if we end up writing more than 2G, but hey, this stuff is
    
    661 686
             # already really fuzzy.
    
    662 687
             #
    
    663
    -        self.cache_quota = cache_quota - headroom
    
    664
    -        self.cache_lower_threshold = self.cache_quota / 2
    
    688
    +        self._cache_quota = cache_quota - headroom
    
    689
    +        self._cache_lower_threshold = self._cache_quota / 2
    
    665 690
     
    
    666 691
     
    
    667 692
     # _configured_remote_artifact_cache_specs():
    

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -122,8 +122,6 @@ class CASCache(ArtifactCache):
    122 122
             for ref in refs:
    
    123 123
                 self.set_ref(ref, tree)
    
    124 124
     
    
    125
    -        self.cache_size = None
    
    126
    -
    
    127 125
         def diff(self, element, key_a, key_b, *, subdir=None):
    
    128 126
             ref_a = self.get_artifact_fullname(element, key_a)
    
    129 127
             ref_b = self.get_artifact_fullname(element, key_b)
    
    ... ... @@ -530,11 +528,7 @@ class CASCache(ArtifactCache):
    530 528
                 raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e
    
    531 529
     
    
    532 530
         def calculate_cache_size(self):
    
    533
    -        if self.cache_size is None:
    
    534
    -            self.cache_size = utils._get_dir_size(self.casdir)
    
    535
    -            self.estimated_size = self.cache_size
    
    536
    -
    
    537
    -        return self.cache_size
    
    531
    +        return utils._get_dir_size(self.casdir)
    
    538 532
     
    
    539 533
         # list_artifacts():
    
    540 534
         #
    

  • buildstream/_scheduler/jobs/cachesizejob.py
    ... ... @@ -24,15 +24,19 @@ class CacheSizeJob(Job):
    24 24
         def __init__(self, *args, complete_cb, **kwargs):
    
    25 25
             super().__init__(*args, **kwargs)
    
    26 26
             self._complete_cb = complete_cb
    
    27
    -        self._cache = Platform._instance.artifactcache
    
    27
    +
    
    28
    +        platform = Platform.get_platform()
    
    29
    +        self._artifacts = platform.artifactcache
    
    28 30
     
    
    29 31
         def child_process(self):
    
    30
    -        return self._cache.calculate_cache_size()
    
    32
    +        return self._artifacts.compute_cache_size()
    
    31 33
     
    
    32 34
         def parent_complete(self, success, result):
    
    33
    -        self._cache._set_cache_size(result)
    
    34
    -        if self._complete_cb:
    
    35
    -            self._complete_cb(result)
    
    35
    +        if success:
    
    36
    +            self._artifacts.set_cache_size(result)
    
    37
    +
    
    38
    +            if self._complete_cb:
    
    39
    +                self._complete_cb(result)
    
    36 40
     
    
    37 41
         def child_process_data(self):
    
    38 42
             return {}

  • buildstream/_scheduler/jobs/cleanupjob.py
    ... ... @@ -24,15 +24,19 @@ class CleanupJob(Job):
    24 24
         def __init__(self, *args, complete_cb, **kwargs):
    
    25 25
             super().__init__(*args, **kwargs)
    
    26 26
             self._complete_cb = complete_cb
    
    27
    -        self._cache = Platform._instance.artifactcache
    
    27
    +
    
    28
    +        platform = Platform.get_platform()
    
    29
    +        self._artifacts = platform.artifactcache
    
    28 30
     
    
    29 31
         def child_process(self):
    
    30
    -        return self._cache.clean()
    
    32
    +        return self._artifacts.clean()
    
    31 33
     
    
    32 34
         def parent_complete(self, success, result):
    
    33
    -        self._cache._set_cache_size(result)
    
    34
    -        if self._complete_cb:
    
    35
    -            self._complete_cb()
    
    35
    +        if success:
    
    36
    +            self._artifacts.set_cache_size(result)
    
    37
    +
    
    38
    +            if self._complete_cb:
    
    39
    +                self._complete_cb()
    
    36 40
     
    
    37 41
         def child_process_data(self):
    
    38 42
             return {}

  • buildstream/_scheduler/jobs/elementjob.py
    ... ... @@ -109,13 +109,7 @@ class ElementJob(Job):
    109 109
             data = {}
    
    110 110
     
    
    111 111
             workspace = self._element._get_workspace()
    
    112
    -        artifact_size = self._element._get_artifact_size()
    
    113
    -        cache_size = self._element._get_artifact_cache().calculate_cache_size()
    
    114
    -
    
    115 112
             if workspace is not None:
    
    116 113
                 data['workspace'] = workspace.to_dict()
    
    117
    -        if artifact_size is not None:
    
    118
    -            data['artifact_size'] = artifact_size
    
    119
    -        data['cache_size'] = cache_size
    
    120 114
     
    
    121 115
             return data

  • buildstream/_scheduler/queues/buildqueue.py
    ... ... @@ -24,6 +24,7 @@ from . import Queue, QueueStatus
    24 24
     from ..jobs import ElementJob
    
    25 25
     from ..resources import ResourceType
    
    26 26
     from ..._message import MessageType
    
    27
    +from ..._platform import Platform
    
    27 28
     
    
    28 29
     
    
    29 30
     # A queue which assembles elements
    
    ... ... @@ -32,7 +33,7 @@ class BuildQueue(Queue):
    32 33
     
    
    33 34
         action_name = "Build"
    
    34 35
         complete_name = "Built"
    
    35
    -    resources = [ResourceType.PROCESS]
    
    36
    +    resources = [ResourceType.PROCESS, ResourceType.CACHE]
    
    36 37
     
    
    37 38
         def __init__(self, *args, **kwargs):
    
    38 39
             super().__init__(*args, **kwargs)
    
    ... ... @@ -67,8 +68,7 @@ class BuildQueue(Queue):
    67 68
             return super().enqueue(to_queue)
    
    68 69
     
    
    69 70
         def process(self, element):
    
    70
    -        element._assemble()
    
    71
    -        return element._get_unique_id()
    
    71
    +        return element._assemble()
    
    72 72
     
    
    73 73
         def status(self, element):
    
    74 74
             # state of dependencies may have changed, recalculate element state
    
    ... ... @@ -87,18 +87,22 @@ class BuildQueue(Queue):
    87 87
     
    
    88 88
             return QueueStatus.READY
    
    89 89
     
    
    90
    -    def _check_cache_size(self, job, element):
    
    91
    -        if not job.child_data:
    
    92
    -            return
    
    90
    +    def _check_cache_size(self, job, element, artifact_size):
    
    93 91
     
    
    94
    -        artifact_size = job.child_data.get('artifact_size', False)
    
    92
    +        # After completing a build job, add the artifact size
    
    93
    +        # as returned from Element._assemble() to the estimated
    
    94
    +        # artifact cache size
    
    95
    +        #
    
    96
    +        platform = Platform.get_platform()
    
    97
    +        artifacts = platform.artifactcache
    
    95 98
     
    
    96
    -        if artifact_size:
    
    97
    -            cache = element._get_artifact_cache()
    
    98
    -            cache._add_artifact_size(artifact_size)
    
    99
    +        artifacts.add_artifact_size(artifact_size)
    
    99 100
     
    
    100
    -            if cache.get_approximate_cache_size() > cache.cache_quota:
    
    101
    -                self._scheduler._check_cache_size_real()
    
    101
    +        # If the estimated size outgrows the quota, ask the scheduler
    
    102
    +        # to queue a job to actually check the real cache size.
    
    103
    +        #
    
    104
    +        if artifacts.get_quota_exceeded():
    
    105
    +            self._scheduler.check_cache_size()
    
    102 106
     
    
    103 107
         def done(self, job, element, result, success):
    
    104 108
     
    
    ... ... @@ -106,8 +110,8 @@ class BuildQueue(Queue):
    106 110
                 # Inform element in main process that assembly is done
    
    107 111
                 element._assemble_done()
    
    108 112
     
    
    109
    -        # This has to be done after _assemble_done, such that the
    
    110
    -        # element may register its cache key as required
    
    111
    -        self._check_cache_size(job, element)
    
    113
    +            # This has to be done after _assemble_done, such that the
    
    114
    +            # element may register its cache key as required
    
    115
    +            self._check_cache_size(job, element, result)
    
    112 116
     
    
    113 117
             return True

  • buildstream/_scheduler/queues/pullqueue.py
    ... ... @@ -29,7 +29,7 @@ class PullQueue(Queue):
    29 29
     
    
    30 30
         action_name = "Pull"
    
    31 31
         complete_name = "Pulled"
    
    32
    -    resources = [ResourceType.DOWNLOAD]
    
    32
    +    resources = [ResourceType.DOWNLOAD, ResourceType.CACHE]
    
    33 33
     
    
    34 34
         def process(self, element):
    
    35 35
             # returns whether an artifact was downloaded or not
    
    ... ... @@ -62,7 +62,7 @@ class PullQueue(Queue):
    62 62
             # Build jobs will check the "approximate" size first. Since we
    
    63 63
             # do not get an artifact size from pull jobs, we have to
    
    64 64
             # actually check the cache size.
    
    65
    -        self._scheduler._check_cache_size_real()
    
    65
    +        self._scheduler.check_cache_size()
    
    66 66
     
    
    67 67
             # Element._pull() returns True if it downloaded an artifact,
    
    68 68
             # here we want to appear skipped if we did not download.
    

  • buildstream/_scheduler/queues/queue.py
    ... ... @@ -301,8 +301,6 @@ class Queue():
    301 301
             # Update values that need to be synchronized in the main task
    
    302 302
             # before calling any queue implementation
    
    303 303
             self._update_workspaces(element, job)
    
    304
    -        if job.child_data:
    
    305
    -            element._get_artifact_cache().cache_size = job.child_data.get('cache_size')
    
    306 304
     
    
    307 305
             # Give the result of the job to the Queue implementor,
    
    308 306
             # and determine if it should be considered as processed
    

  • buildstream/_scheduler/resources.py
    ... ... @@ -8,7 +8,7 @@ class ResourceType():
    8 8
     class Resources():
    
    9 9
         def __init__(self, num_builders, num_fetchers, num_pushers):
    
    10 10
             self._max_resources = {
    
    11
    -            ResourceType.CACHE: 1,
    
    11
    +            ResourceType.CACHE: 0,
    
    12 12
                 ResourceType.DOWNLOAD: num_fetchers,
    
    13 13
                 ResourceType.PROCESS: num_builders,
    
    14 14
                 ResourceType.UPLOAD: num_pushers
    

  • buildstream/_scheduler/scheduler.py
    ... ... @@ -241,6 +241,25 @@ class Scheduler():
    241 241
             self._schedule_queue_jobs()
    
    242 242
             self._sched()
    
    243 243
     
    
    244
    +    # check_cache_size():
    
    245
    +    #
    
    246
    +    # Queues a cache size calculation job, after the cache
    
    247
    +    # size is calculated, a cleanup job will be run automatically
    
    248
    +    # if needed.
    
    249
    +    #
    
    250
    +    # FIXME: This should ensure that only one cache size job
    
    251
    +    #        is ever pending at a given time. If a cache size
    
    252
    +    #        job is already running, it is correct to queue
    
    253
    +    #        a new one, it is incorrect to have more than one
    
    254
    +    #        of these jobs pending at a given time, though.
    
    255
    +    #
    
    256
    +    def check_cache_size(self):
    
    257
    +        job = CacheSizeJob(self, 'cache_size', 'cache_size/cache_size',
    
    258
    +                           resources=[ResourceType.CACHE,
    
    259
    +                                      ResourceType.PROCESS],
    
    260
    +                           complete_cb=self._run_cleanup)
    
    261
    +        self.schedule_jobs([job])
    
    262
    +
    
    244 263
         #######################################################
    
    245 264
         #                  Local Private Methods              #
    
    246 265
         #######################################################
    
    ... ... @@ -316,26 +335,32 @@ class Scheduler():
    316 335
             self.schedule_jobs(ready)
    
    317 336
             self._sched()
    
    318 337
     
    
    338
    +    # _run_cleanup()
    
    339
    +    #
    
    340
    +    # Schedules the cache cleanup job if the passed size
    
    341
    +    # exceeds the cache quota.
    
    342
    +    #
    
    343
    +    # Args:
    
    344
    +    #    cache_size (int): The calculated cache size (ignored)
    
    345
    +    #
    
    346
    +    # NOTE: This runs in response to completion of the cache size
    
    347
    +    #       calculation job lauched by Scheduler.check_cache_size(),
    
    348
    +    #       which will report the calculated cache size.
    
    349
    +    #
    
    319 350
         def _run_cleanup(self, cache_size):
    
    320 351
             platform = Platform.get_platform()
    
    321
    -        if cache_size and cache_size < platform.artifactcache.cache_quota:
    
    352
    +        artifacts = platform.artifactcache
    
    353
    +
    
    354
    +        if not artifacts.get_quota_exceeded():
    
    322 355
                 return
    
    323 356
     
    
    324
    -        job = CleanupJob(self, 'cleanup', 'cleanup',
    
    357
    +        job = CleanupJob(self, 'cleanup', 'cleanup/cleanup',
    
    325 358
                              resources=[ResourceType.CACHE,
    
    326 359
                                         ResourceType.PROCESS],
    
    327 360
                              exclusive_resources=[ResourceType.CACHE],
    
    328 361
                              complete_cb=None)
    
    329 362
             self.schedule_jobs([job])
    
    330 363
     
    
    331
    -    def _check_cache_size_real(self):
    
    332
    -        job = CacheSizeJob(self, 'cache_size', 'cache_size/cache_size',
    
    333
    -                           resources=[ResourceType.CACHE,
    
    334
    -                                      ResourceType.PROCESS],
    
    335
    -                           exclusive_resources=[ResourceType.CACHE],
    
    336
    -                           complete_cb=self._run_cleanup)
    
    337
    -        self.schedule_jobs([job])
    
    338
    -
    
    339 364
         # _suspend_jobs()
    
    340 365
         #
    
    341 366
         # Suspend all ongoing jobs.
    

  • buildstream/element.py
    ... ... @@ -213,7 +213,6 @@ class Element(Plugin):
    213 213
             self.__staged_sources_directory = None  # Location where Element.stage_sources() was called
    
    214 214
             self.__tainted = None                   # Whether the artifact is tainted and should not be shared
    
    215 215
             self.__required = False                 # Whether the artifact is required in the current session
    
    216
    -        self.__artifact_size = None             # The size of data committed to the artifact cache
    
    217 216
             self.__build_result = None              # The result of assembling this Element
    
    218 217
             self._build_log_path = None            # The path of the build log for this Element
    
    219 218
     
    
    ... ... @@ -1509,6 +1508,9 @@ class Element(Plugin):
    1509 1508
         #   - Call the public abstract methods for the build phase
    
    1510 1509
         #   - Cache the resulting artifact
    
    1511 1510
         #
    
    1511
    +    # Returns:
    
    1512
    +    #    (int): The size of the newly cached artifact
    
    1513
    +    #
    
    1512 1514
         def _assemble(self):
    
    1513 1515
     
    
    1514 1516
             # Assert call ordering
    
    ... ... @@ -1655,7 +1657,7 @@ class Element(Plugin):
    1655 1657
                         }), os.path.join(metadir, 'workspaced-dependencies.yaml'))
    
    1656 1658
     
    
    1657 1659
                         with self.timed_activity("Caching artifact"):
    
    1658
    -                        self.__artifact_size = utils._get_dir_size(assembledir)
    
    1660
    +                        artifact_size = utils._get_dir_size(assembledir)
    
    1659 1661
                             self.__artifacts.commit(self, assembledir, self.__get_cache_keys_for_commit())
    
    1660 1662
     
    
    1661 1663
                         if collect is not None and collectvdir is None:
    
    ... ... @@ -1667,6 +1669,8 @@ class Element(Plugin):
    1667 1669
                 # Finally cleanup the build dir
    
    1668 1670
                 cleanup_rootdir()
    
    1669 1671
     
    
    1672
    +        return artifact_size
    
    1673
    +
    
    1670 1674
         def _get_build_log(self):
    
    1671 1675
             return self._build_log_path
    
    1672 1676
     
    
    ... ... @@ -1908,25 +1912,6 @@ class Element(Plugin):
    1908 1912
             workspaces = self._get_context().get_workspaces()
    
    1909 1913
             return workspaces.get_workspace(self._get_full_name())
    
    1910 1914
     
    
    1911
    -    # _get_artifact_size()
    
    1912
    -    #
    
    1913
    -    # Get the size of the artifact produced by this element in the
    
    1914
    -    # current pipeline - if this element has not been assembled or
    
    1915
    -    # pulled, this will be None.
    
    1916
    -    #
    
    1917
    -    # Note that this is the size of an artifact *before* committing it
    
    1918
    -    # to the cache, the size on disk may differ. It can act as an
    
    1919
    -    # approximate guide for when to do a proper size calculation.
    
    1920
    -    #
    
    1921
    -    # Returns:
    
    1922
    -    #    (int|None): The size of the artifact
    
    1923
    -    #
    
    1924
    -    def _get_artifact_size(self):
    
    1925
    -        return self.__artifact_size
    
    1926
    -
    
    1927
    -    def _get_artifact_cache(self):
    
    1928
    -        return self.__artifacts
    
    1929
    -
    
    1930 1915
         # _write_script():
    
    1931 1916
         #
    
    1932 1917
         # Writes a script to the given directory.
    

  • tests/artifactcache/expiry.py
    ... ... @@ -196,6 +196,22 @@ def test_keep_dependencies(cli, datafiles, tmpdir):
    196 196
     
    
    197 197
     
    
    198 198
     # Assert that we never delete a dependency required for a build tree
    
    199
    +#
    
    200
    +# NOTE: This test expects that a build will fail if it attempts to
    
    201
    +#       put more artifacts in the cache than the quota can hold,
    
    202
    +#       and expects that the last two elements which don't fit into
    
    203
    +#       the quota wont even be built.
    
    204
    +#
    
    205
    +#       In real life, this will not be the case, since once we reach
    
    206
    +#       the estimated quota we launch a cache size calculation job and
    
    207
    +#       only launch a cleanup job when the size is calculated; and
    
    208
    +#       other build tasks will be scheduled while the cache size job
    
    209
    +#       is running.
    
    210
    +#
    
    211
    +#       This test only passes because we configure `builders` to 1,
    
    212
    +#       ensuring that the cache size job runs exclusively since it
    
    213
    +#       also requires a compute resource (a "builder").
    
    214
    +#
    
    199 215
     @pytest.mark.datafiles(DATA_DIR)
    
    200 216
     def test_never_delete_dependencies(cli, datafiles, tmpdir):
    
    201 217
         project = os.path.join(datafiles.dirname, datafiles.basename)
    
    ... ... @@ -204,6 +220,9 @@ def test_never_delete_dependencies(cli, datafiles, tmpdir):
    204 220
         cli.configure({
    
    205 221
             'cache': {
    
    206 222
                 'quota': 10000000
    
    223
    +        },
    
    224
    +        'scheduler': {
    
    225
    +            'builders': 1
    
    207 226
             }
    
    208 227
         })
    
    209 228
     
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]