[Notes] [Git][BuildStream/buildstream][tiagogomes/issue-573] 7 commits: artifactcache: dynamically update size all time



Title: GitLab

Tiago Gomes pushed to branch tiagogomes/issue-573 at BuildStream / buildstream

Commits:

8 changed files:

Changes:

  • buildstream/_artifactcache/artifactcache.py
    1 1
     #
    
    2
    -#  Copyright (C) 2017-2018 Codethink Limited
    
    2
    +#  Copyright (C) 2018 Codethink Limited
    
    3 3
     #
    
    4 4
     #  This program is free software; you can redistribute it and/or
    
    5 5
     #  modify it under the terms of the GNU Lesser General Public
    
    ... ... @@ -16,6 +16,7 @@
    16 16
     #
    
    17 17
     #  Authors:
    
    18 18
     #        Tristan Maat <tristan maat codethink co uk>
    
    19
    +#        Tiago Gomes <tiago gomes codethink co uk>
    
    19 20
     
    
    20 21
     import os
    
    21 22
     import string
    
    ... ... @@ -88,7 +89,7 @@ class ArtifactCache():
    88 89
             self.project_remote_specs = {}
    
    89 90
     
    
    90 91
             self._required_artifacts = set()      # The artifacts required for this session
    
    91
    -        self._cache_size = None               # The current cache size, sometimes it's an estimate
    
    92
    +        self._cache_size = None               # The current cache size
    
    92 93
             self._cache_quota = None              # The cache quota
    
    93 94
             self._cache_lower_threshold = None    # The target cache size for a cleanup
    
    94 95
     
    
    ... ... @@ -226,13 +227,11 @@ class ArtifactCache():
    226 227
         # Clean the artifact cache as much as possible.
    
    227 228
         #
    
    228 229
         # Returns:
    
    229
    -    #    (int): The size of the cache after having cleaned up
    
    230
    +    #     (int): Amount of bytes cleaned from the cache.
    
    230 231
         #
    
    231 232
         def clean(self):
    
    232 233
             artifacts = self.list_artifacts()
    
    233
    -
    
    234
    -        # Do a real computation of the cache size once, just in case
    
    235
    -        self.compute_cache_size()
    
    234
    +        old_cache_size = self.get_cache_size()
    
    236 235
     
    
    237 236
             while self.get_cache_size() >= self._cache_lower_threshold:
    
    238 237
                 try:
    
    ... ... @@ -248,7 +247,7 @@ class ArtifactCache():
    248 247
                               "Please increase the cache-quota in {}."
    
    249 248
                               .format(self.context.config_origin or default_conf))
    
    250 249
     
    
    251
    -                if self.get_quota_exceeded():
    
    250
    +                if self.has_quota_exceeded():
    
    252 251
                         raise ArtifactError("Cache too full. Aborting.",
    
    253 252
                                             detail=detail,
    
    254 253
                                             reason="cache-too-full")
    
    ... ... @@ -260,89 +259,56 @@ class ArtifactCache():
    260 259
     
    
    261 260
                     # Remove the actual artifact, if it's not required.
    
    262 261
                     size = self.remove(to_remove)
    
    262
    +                self._cache_size -= size
    
    263
    +                self._message(MessageType.DEBUG,
    
    264
    +                              "Removed artifact {} ({})".format(
    
    265
    +                                  to_remove[:-(len(key) - self.context.log_key_length)],
    
    266
    +                                  utils._pretty_size(size)))
    
    263 267
     
    
    264
    -                # Remove the size from the removed size
    
    265
    -                self.set_cache_size(self._cache_size - size)
    
    268
    +        self._message(MessageType.INFO,
    
    269
    +                      "New artifact cache size: {}".format(
    
    270
    +                          utils._pretty_size(self._cache_size)))
    
    266 271
     
    
    267
    -        # This should be O(1) if implemented correctly
    
    268
    -        return self.get_cache_size()
    
    269
    -
    
    270
    -    # compute_cache_size()
    
    271
    -    #
    
    272
    -    # Computes the real artifact cache size by calling
    
    273
    -    # the abstract calculate_cache_size() method.
    
    274
    -    #
    
    275
    -    # Returns:
    
    276
    -    #    (int): The size of the artifact cache.
    
    277
    -    #
    
    278
    -    def compute_cache_size(self):
    
    279
    -        self._cache_size = self.calculate_cache_size()
    
    280
    -
    
    281
    -        return self._cache_size
    
    272
    +        return old_cache_size - self._cache_size
    
    282 273
     
    
    283 274
         # add_artifact_size()
    
    284 275
         #
    
    285 276
         # Adds the reported size of a newly cached artifact to the
    
    286
    -    # overall estimated size.
    
    277
    +    # current cache size.
    
    287 278
         #
    
    288 279
         # Args:
    
    289 280
         #     artifact_size (int): The size to add.
    
    290 281
         #
    
    291 282
         def add_artifact_size(self, artifact_size):
    
    292
    -        cache_size = self.get_cache_size()
    
    293
    -        cache_size += artifact_size
    
    283
    +        assert utils._is_main_process()
    
    294 284
     
    
    295
    -        self.set_cache_size(cache_size)
    
    285
    +        self._cache_size = self.get_cache_size() + artifact_size
    
    286
    +        self._write_cache_size(self._cache_size)
    
    296 287
     
    
    297 288
         # get_cache_size()
    
    298 289
         #
    
    299
    -    # Fetches the cached size of the cache, this is sometimes
    
    300
    -    # an estimate and periodically adjusted to the real size
    
    301
    -    # when a cache size calculation job runs.
    
    302
    -    #
    
    303
    -    # When it is an estimate, the value is either correct, or
    
    304
    -    # it is greater than the actual cache size.
    
    290
    +    # Returns the size of the artifact cache.
    
    305 291
         #
    
    306 292
         # Returns:
    
    307
    -    #     (int) An approximation of the artifact cache size.
    
    293
    +    #     (int): The size of the artifact cache.
    
    308 294
         #
    
    309 295
         def get_cache_size(self):
    
    296
    +        if self._cache_size is None:
    
    297
    +            self._cache_size = self._read_cache_size()
    
    310 298
     
    
    311
    -        # If we don't currently have an estimate, figure out the real cache size.
    
    312 299
             if self._cache_size is None:
    
    313
    -            stored_size = self._read_cache_size()
    
    314
    -            if stored_size is not None:
    
    315
    -                self._cache_size = stored_size
    
    316
    -            else:
    
    317
    -                self.compute_cache_size()
    
    300
    +            self._cache_size = self.calculate_cache_size()
    
    318 301
     
    
    319 302
             return self._cache_size
    
    320 303
     
    
    321
    -    # set_cache_size()
    
    322
    -    #
    
    323
    -    # Forcefully set the overall cache size.
    
    324
    -    #
    
    325
    -    # This is used to update the size in the main process after
    
    326
    -    # having calculated in a cleanup or a cache size calculation job.
    
    327
    -    #
    
    328
    -    # Args:
    
    329
    -    #     cache_size (int): The size to set.
    
    330
    -    #
    
    331
    -    def set_cache_size(self, cache_size):
    
    332
    -
    
    333
    -        assert cache_size is not None
    
    334
    -
    
    335
    -        self._cache_size = cache_size
    
    336
    -        self._write_cache_size(self._cache_size)
    
    337
    -
    
    338
    -    # get_quota_exceeded()
    
    304
    +    # has_quota_exceeded()
    
    339 305
         #
    
    340 306
         # Checks if the current artifact cache size exceeds the quota.
    
    341 307
         #
    
    342 308
         # Returns:
    
    343 309
         #    (bool): True of the quota is exceeded
    
    344 310
         #
    
    345
    -    def get_quota_exceeded(self):
    
    311
    +    def has_quota_exceeded(self):
    
    346 312
             return self.get_cache_size() > self._cache_quota
    
    347 313
     
    
    348 314
         ################################################
    

  • buildstream/_scheduler/jobs/__init__.py
    ... ... @@ -18,5 +18,4 @@
    18 18
     #        Tristan Maat <tristan maat codethink co uk>
    
    19 19
     
    
    20 20
     from .elementjob import ElementJob
    
    21
    -from .cachesizejob import CacheSizeJob
    
    22 21
     from .cleanupjob import CleanupJob

  • buildstream/_scheduler/jobs/cachesizejob.py deleted
    1
    -#  Copyright (C) 2018 Codethink Limited
    
    2
    -#
    
    3
    -#  This program is free software; you can redistribute it and/or
    
    4
    -#  modify it under the terms of the GNU Lesser General Public
    
    5
    -#  License as published by the Free Software Foundation; either
    
    6
    -#  version 2 of the License, or (at your option) any later version.
    
    7
    -#
    
    8
    -#  This library is distributed in the hope that it will be useful,
    
    9
    -#  but WITHOUT ANY WARRANTY; without even the implied warranty of
    
    10
    -#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
    
    11
    -#  Lesser General Public License for more details.
    
    12
    -#
    
    13
    -#  You should have received a copy of the GNU Lesser General Public
    
    14
    -#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
    
    15
    -#
    
    16
    -#  Author:
    
    17
    -#        Tristan Daniël Maat <tristan maat codethink co uk>
    
    18
    -#
    
    19
    -from .job import Job
    
    20
    -from ..._platform import Platform
    
    21
    -
    
    22
    -
    
    23
    -class CacheSizeJob(Job):
    
    24
    -    def __init__(self, *args, complete_cb, **kwargs):
    
    25
    -        super().__init__(*args, **kwargs)
    
    26
    -        self._complete_cb = complete_cb
    
    27
    -
    
    28
    -        platform = Platform.get_platform()
    
    29
    -        self._artifacts = platform.artifactcache
    
    30
    -
    
    31
    -    def child_process(self):
    
    32
    -        return self._artifacts.compute_cache_size()
    
    33
    -
    
    34
    -    def parent_complete(self, success, result):
    
    35
    -        if success:
    
    36
    -            self._artifacts.set_cache_size(result)
    
    37
    -
    
    38
    -            if self._complete_cb:
    
    39
    -                self._complete_cb(result)
    
    40
    -
    
    41
    -    def child_process_data(self):
    
    42
    -        return {}

  • buildstream/_scheduler/jobs/cleanupjob.py
    ... ... @@ -21,9 +21,8 @@ from ..._platform import Platform
    21 21
     
    
    22 22
     
    
    23 23
     class CleanupJob(Job):
    
    24
    -    def __init__(self, *args, complete_cb, **kwargs):
    
    24
    +    def __init__(self, *args, **kwargs):
    
    25 25
             super().__init__(*args, **kwargs)
    
    26
    -        self._complete_cb = complete_cb
    
    27 26
     
    
    28 27
             platform = Platform.get_platform()
    
    29 28
             self._artifacts = platform.artifactcache
    
    ... ... @@ -33,10 +32,7 @@ class CleanupJob(Job):
    33 32
     
    
    34 33
         def parent_complete(self, success, result):
    
    35 34
             if success:
    
    36
    -            self._artifacts.set_cache_size(result)
    
    37
    -
    
    38
    -            if self._complete_cb:
    
    39
    -                self._complete_cb()
    
    40
    -
    
    41
    -    def child_process_data(self):
    
    42
    -        return {}
    35
    +            # ArtifactCache.clean() returns the number of bytes cleaned.
    
    36
    +            # We negate the number because the cache size is to be
    
    37
    +            # decreased.
    
    38
    +            self._artifacts.add_artifact_size(result * -1)

  • buildstream/_scheduler/queues/buildqueue.py
    ... ... @@ -87,31 +87,17 @@ class BuildQueue(Queue):
    87 87
     
    
    88 88
             return QueueStatus.READY
    
    89 89
     
    
    90
    -    def _check_cache_size(self, job, element, artifact_size):
    
    91
    -
    
    92
    -        # After completing a build job, add the artifact size
    
    93
    -        # as returned from Element._assemble() to the estimated
    
    94
    -        # artifact cache size
    
    95
    -        #
    
    96
    -        platform = Platform.get_platform()
    
    97
    -        artifacts = platform.artifactcache
    
    98
    -
    
    99
    -        artifacts.add_artifact_size(artifact_size)
    
    100
    -
    
    101
    -        # If the estimated size outgrows the quota, ask the scheduler
    
    102
    -        # to queue a job to actually check the real cache size.
    
    103
    -        #
    
    104
    -        if artifacts.get_quota_exceeded():
    
    105
    -            self._scheduler.check_cache_size()
    
    106
    -
    
    107 90
         def done(self, job, element, result, success):
    
    91
    +        if not success:
    
    92
    +            return False
    
    93
    +
    
    94
    +        element._assemble_done()
    
    108 95
     
    
    109
    -        if success:
    
    110
    -            # Inform element in main process that assembly is done
    
    111
    -            element._assemble_done()
    
    96
    +        artifacts = Platform.get_platform().artifactcache
    
    97
    +        artifacts.add_artifact_size(result)
    
    112 98
     
    
    113
    -            # This has to be done after _assemble_done, such that the
    
    114
    -            # element may register its cache key as required
    
    115
    -            self._check_cache_size(job, element, result)
    
    99
    +        # This has to be done after _assemble_done, such that the
    
    100
    +        # element may register its cache key as required
    
    101
    +        self._scheduler.check_cache_size()
    
    116 102
     
    
    117
    -        return True
    103
    +        return success

  • buildstream/_scheduler/queues/pullqueue.py
    1 1
     #
    
    2
    -#  Copyright (C) 2016 Codethink Limited
    
    2
    +#  Copyright (C) 2018 Codethink Limited
    
    3 3
     #
    
    4 4
     #  This program is free software; you can redistribute it and/or
    
    5 5
     #  modify it under the terms of the GNU Lesser General Public
    
    ... ... @@ -21,6 +21,7 @@
    21 21
     # Local imports
    
    22 22
     from . import Queue, QueueStatus
    
    23 23
     from ..resources import ResourceType
    
    24
    +from ..._platform import Platform
    
    24 25
     
    
    25 26
     
    
    26 27
     # A queue which pulls element artifacts
    
    ... ... @@ -52,18 +53,21 @@ class PullQueue(Queue):
    52 53
             else:
    
    53 54
                 return QueueStatus.SKIP
    
    54 55
     
    
    55
    -    def done(self, _, element, result, success):
    
    56
    -
    
    56
    +    def done(self, job, element, result, success):
    
    57 57
             if not success:
    
    58 58
                 return False
    
    59 59
     
    
    60 60
             element._pull_done()
    
    61 61
     
    
    62
    -        # Build jobs will check the "approximate" size first. Since we
    
    63
    -        # do not get an artifact size from pull jobs, we have to
    
    64
    -        # actually check the cache size.
    
    62
    +        pulled, artifact_size = result
    
    63
    +
    
    64
    +        artifacts = Platform.get_platform().artifactcache
    
    65
    +        artifacts.add_artifact_size(artifact_size)
    
    66
    +
    
    67
    +        # This has to be done after _pull_done, such that the
    
    68
    +        # element may register its cache key as required
    
    65 69
             self._scheduler.check_cache_size()
    
    66 70
     
    
    67 71
             # Element._pull() returns True if it downloaded an artifact,
    
    68 72
             # here we want to appear skipped if we did not download.
    
    69
    -        return result
    73
    +        return pulled

  • buildstream/_scheduler/scheduler.py
    1 1
     #
    
    2
    -#  Copyright (C) 2016 Codethink Limited
    
    2
    +#  Copyright (C) 2018 Codethink Limited
    
    3 3
     #
    
    4 4
     #  This program is free software; you can redistribute it and/or
    
    5 5
     #  modify it under the terms of the GNU Lesser General Public
    
    ... ... @@ -28,7 +28,7 @@ from contextlib import contextmanager
    28 28
     
    
    29 29
     # Local imports
    
    30 30
     from .resources import Resources, ResourceType
    
    31
    -from .jobs import CacheSizeJob, CleanupJob
    
    31
    +from .jobs import CleanupJob
    
    32 32
     from .._platform import Platform
    
    33 33
     
    
    34 34
     
    
    ... ... @@ -243,22 +243,19 @@ class Scheduler():
    243 243
     
    
    244 244
         # check_cache_size():
    
    245 245
         #
    
    246
    -    # Queues a cache size calculation job, after the cache
    
    247
    -    # size is calculated, a cleanup job will be run automatically
    
    248
    -    # if needed.
    
    249
    -    #
    
    250
    -    # FIXME: This should ensure that only one cache size job
    
    251
    -    #        is ever pending at a given time. If a cache size
    
    252
    -    #        job is already running, it is correct to queue
    
    253
    -    #        a new one, it is incorrect to have more than one
    
    254
    -    #        of these jobs pending at a given time, though.
    
    246
    +    # Queues a cleanup job if the size of the artifact cache exceeded
    
    247
    +    # the quota
    
    255 248
         #
    
    256 249
         def check_cache_size(self):
    
    257
    -        job = CacheSizeJob(self, 'cache_size', 'cache_size/cache_size',
    
    258
    -                           resources=[ResourceType.CACHE,
    
    259
    -                                      ResourceType.PROCESS],
    
    260
    -                           complete_cb=self._run_cleanup)
    
    261
    -        self.schedule_jobs([job])
    
    250
    +        artifacts = Platform.get_platform().artifactcache
    
    251
    +
    
    252
    +        if artifacts.has_get_quota_exceeded():
    
    253
    +            job = CleanupJob(self, 'Clean artifact cache',
    
    254
    +                             'cleanup/cleanup',
    
    255
    +                             resources=[ResourceType.CACHE,
    
    256
    +                                        ResourceType.PROCESS],
    
    257
    +                             exclusive_resources=[ResourceType.CACHE])
    
    258
    +            self.schedule_jobs([job])
    
    262 259
     
    
    263 260
         #######################################################
    
    264 261
         #                  Local Private Methods              #
    
    ... ... @@ -335,32 +332,6 @@ class Scheduler():
    335 332
             self.schedule_jobs(ready)
    
    336 333
             self._sched()
    
    337 334
     
    
    338
    -    # _run_cleanup()
    
    339
    -    #
    
    340
    -    # Schedules the cache cleanup job if the passed size
    
    341
    -    # exceeds the cache quota.
    
    342
    -    #
    
    343
    -    # Args:
    
    344
    -    #    cache_size (int): The calculated cache size (ignored)
    
    345
    -    #
    
    346
    -    # NOTE: This runs in response to completion of the cache size
    
    347
    -    #       calculation job lauched by Scheduler.check_cache_size(),
    
    348
    -    #       which will report the calculated cache size.
    
    349
    -    #
    
    350
    -    def _run_cleanup(self, cache_size):
    
    351
    -        platform = Platform.get_platform()
    
    352
    -        artifacts = platform.artifactcache
    
    353
    -
    
    354
    -        if not artifacts.get_quota_exceeded():
    
    355
    -            return
    
    356
    -
    
    357
    -        job = CleanupJob(self, 'cleanup', 'cleanup/cleanup',
    
    358
    -                         resources=[ResourceType.CACHE,
    
    359
    -                                    ResourceType.PROCESS],
    
    360
    -                         exclusive_resources=[ResourceType.CACHE],
    
    361
    -                         complete_cb=None)
    
    362
    -        self.schedule_jobs([job])
    
    363
    -
    
    364 335
         # _suspend_jobs()
    
    365 336
         #
    
    366 337
         # Suspend all ongoing jobs.
    

  • buildstream/element.py
    ... ... @@ -1759,7 +1759,7 @@ class Element(Plugin):
    1759 1759
                 display_key = self._get_brief_display_key()
    
    1760 1760
                 self.info("Downloaded artifact {}".format(display_key))
    
    1761 1761
     
    
    1762
    -        return pulled
    
    1762
    +        return pulled, artifact_size
    
    1763 1763
     
    
    1764 1764
         # _skip_push():
    
    1765 1765
         #
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]