Tiago Gomes pushed to branch tiagogomes/issue-573 at BuildStream / buildstream
Commits:
- 
abeb1674
by Tiago Gomes at 2018-09-12T12:18:54Z
- 
5f4c19ec
by Tiago Gomes at 2018-09-12T12:20:02Z
- 
386d5a80
by Tiago Gomes at 2018-09-12T12:20:04Z
- 
111c23a6
by Tiago Gomes at 2018-09-12T12:20:04Z
- 
3a91f092
by Tiago Gomes at 2018-09-12T12:29:58Z
- 
83b5d55c
by Tiago Gomes at 2018-09-12T12:29:58Z
- 
d676da49
by Tiago Gomes at 2018-09-12T12:29:58Z
8 changed files:
- buildstream/_artifactcache/artifactcache.py
- buildstream/_scheduler/jobs/__init__.py
- − buildstream/_scheduler/jobs/cachesizejob.py
- buildstream/_scheduler/jobs/cleanupjob.py
- buildstream/_scheduler/queues/buildqueue.py
- buildstream/_scheduler/queues/pullqueue.py
- buildstream/_scheduler/scheduler.py
- buildstream/element.py
Changes:
| 1 | 1 |  #
 | 
| 2 | -#  Copyright (C) 2017-2018 Codethink Limited
 | |
| 2 | +#  Copyright (C) 2018 Codethink Limited
 | |
| 3 | 3 |  #
 | 
| 4 | 4 |  #  This program is free software; you can redistribute it and/or
 | 
| 5 | 5 |  #  modify it under the terms of the GNU Lesser General Public
 | 
| ... | ... | @@ -16,6 +16,7 @@ | 
| 16 | 16 |  #
 | 
| 17 | 17 |  #  Authors:
 | 
| 18 | 18 |  #        Tristan Maat <tristan maat codethink co uk>
 | 
| 19 | +#        Tiago Gomes <tiago gomes codethink co uk>
 | |
| 19 | 20 |  | 
| 20 | 21 |  import os
 | 
| 21 | 22 |  import string
 | 
| ... | ... | @@ -88,7 +89,7 @@ class ArtifactCache(): | 
| 88 | 89 |          self.project_remote_specs = {}
 | 
| 89 | 90 |  | 
| 90 | 91 |          self._required_artifacts = set()      # The artifacts required for this session
 | 
| 91 | -        self._cache_size = None               # The current cache size, sometimes it's an estimate
 | |
| 92 | +        self._cache_size = None               # The current cache size
 | |
| 92 | 93 |          self._cache_quota = None              # The cache quota
 | 
| 93 | 94 |          self._cache_lower_threshold = None    # The target cache size for a cleanup
 | 
| 94 | 95 |  | 
| ... | ... | @@ -226,13 +227,11 @@ class ArtifactCache(): | 
| 226 | 227 |      # Clean the artifact cache as much as possible.
 | 
| 227 | 228 |      #
 | 
| 228 | 229 |      # Returns:
 | 
| 229 | -    #    (int): The size of the cache after having cleaned up
 | |
| 230 | +    #     (int): Amount of bytes cleaned from the cache.
 | |
| 230 | 231 |      #
 | 
| 231 | 232 |      def clean(self):
 | 
| 232 | 233 |          artifacts = self.list_artifacts()
 | 
| 233 | - | |
| 234 | -        # Do a real computation of the cache size once, just in case
 | |
| 235 | -        self.compute_cache_size()
 | |
| 234 | +        old_cache_size = self.get_cache_size()
 | |
| 236 | 235 |  | 
| 237 | 236 |          while self.get_cache_size() >= self._cache_lower_threshold:
 | 
| 238 | 237 |              try:
 | 
| ... | ... | @@ -248,7 +247,7 @@ class ArtifactCache(): | 
| 248 | 247 |                            "Please increase the cache-quota in {}."
 | 
| 249 | 248 |                            .format(self.context.config_origin or default_conf))
 | 
| 250 | 249 |  | 
| 251 | -                if self.get_quota_exceeded():
 | |
| 250 | +                if self.has_quota_exceeded():
 | |
| 252 | 251 |                      raise ArtifactError("Cache too full. Aborting.",
 | 
| 253 | 252 |                                          detail=detail,
 | 
| 254 | 253 |                                          reason="cache-too-full")
 | 
| ... | ... | @@ -260,89 +259,56 @@ class ArtifactCache(): | 
| 260 | 259 |  | 
| 261 | 260 |                  # Remove the actual artifact, if it's not required.
 | 
| 262 | 261 |                  size = self.remove(to_remove)
 | 
| 262 | +                self._cache_size -= size
 | |
| 263 | +                self._message(MessageType.DEBUG,
 | |
| 264 | +                              "Removed artifact {} ({})".format(
 | |
| 265 | +                                  to_remove[:-(len(key) - self.context.log_key_length)],
 | |
| 266 | +                                  utils._pretty_size(size)))
 | |
| 263 | 267 |  | 
| 264 | -                # Remove the size from the removed size
 | |
| 265 | -                self.set_cache_size(self._cache_size - size)
 | |
| 268 | +        self._message(MessageType.INFO,
 | |
| 269 | +                      "New artifact cache size: {}".format(
 | |
| 270 | +                          utils._pretty_size(self._cache_size)))
 | |
| 266 | 271 |  | 
| 267 | -        # This should be O(1) if implemented correctly
 | |
| 268 | -        return self.get_cache_size()
 | |
| 269 | - | |
| 270 | -    # compute_cache_size()
 | |
| 271 | -    #
 | |
| 272 | -    # Computes the real artifact cache size by calling
 | |
| 273 | -    # the abstract calculate_cache_size() method.
 | |
| 274 | -    #
 | |
| 275 | -    # Returns:
 | |
| 276 | -    #    (int): The size of the artifact cache.
 | |
| 277 | -    #
 | |
| 278 | -    def compute_cache_size(self):
 | |
| 279 | -        self._cache_size = self.calculate_cache_size()
 | |
| 280 | - | |
| 281 | -        return self._cache_size
 | |
| 272 | +        return old_cache_size - self._cache_size
 | |
| 282 | 273 |  | 
| 283 | 274 |      # add_artifact_size()
 | 
| 284 | 275 |      #
 | 
| 285 | 276 |      # Adds the reported size of a newly cached artifact to the
 | 
| 286 | -    # overall estimated size.
 | |
| 277 | +    # current cache size.
 | |
| 287 | 278 |      #
 | 
| 288 | 279 |      # Args:
 | 
| 289 | 280 |      #     artifact_size (int): The size to add.
 | 
| 290 | 281 |      #
 | 
| 291 | 282 |      def add_artifact_size(self, artifact_size):
 | 
| 292 | -        cache_size = self.get_cache_size()
 | |
| 293 | -        cache_size += artifact_size
 | |
| 283 | +        assert utils._is_main_process()
 | |
| 294 | 284 |  | 
| 295 | -        self.set_cache_size(cache_size)
 | |
| 285 | +        self._cache_size = self.get_cache_size() + artifact_size
 | |
| 286 | +        self._write_cache_size(self._cache_size)
 | |
| 296 | 287 |  | 
| 297 | 288 |      # get_cache_size()
 | 
| 298 | 289 |      #
 | 
| 299 | -    # Fetches the cached size of the cache, this is sometimes
 | |
| 300 | -    # an estimate and periodically adjusted to the real size
 | |
| 301 | -    # when a cache size calculation job runs.
 | |
| 302 | -    #
 | |
| 303 | -    # When it is an estimate, the value is either correct, or
 | |
| 304 | -    # it is greater than the actual cache size.
 | |
| 290 | +    # Returns the size of the artifact cache.
 | |
| 305 | 291 |      #
 | 
| 306 | 292 |      # Returns:
 | 
| 307 | -    #     (int) An approximation of the artifact cache size.
 | |
| 293 | +    #     (int): The size of the artifact cache.
 | |
| 308 | 294 |      #
 | 
| 309 | 295 |      def get_cache_size(self):
 | 
| 296 | +        if self._cache_size is None:
 | |
| 297 | +            self._cache_size = self._read_cache_size()
 | |
| 310 | 298 |  | 
| 311 | -        # If we don't currently have an estimate, figure out the real cache size.
 | |
| 312 | 299 |          if self._cache_size is None:
 | 
| 313 | -            stored_size = self._read_cache_size()
 | |
| 314 | -            if stored_size is not None:
 | |
| 315 | -                self._cache_size = stored_size
 | |
| 316 | -            else:
 | |
| 317 | -                self.compute_cache_size()
 | |
| 300 | +            self._cache_size = self.calculate_cache_size()
 | |
| 318 | 301 |  | 
| 319 | 302 |          return self._cache_size
 | 
| 320 | 303 |  | 
| 321 | -    # set_cache_size()
 | |
| 322 | -    #
 | |
| 323 | -    # Forcefully set the overall cache size.
 | |
| 324 | -    #
 | |
| 325 | -    # This is used to update the size in the main process after
 | |
| 326 | -    # having calculated in a cleanup or a cache size calculation job.
 | |
| 327 | -    #
 | |
| 328 | -    # Args:
 | |
| 329 | -    #     cache_size (int): The size to set.
 | |
| 330 | -    #
 | |
| 331 | -    def set_cache_size(self, cache_size):
 | |
| 332 | - | |
| 333 | -        assert cache_size is not None
 | |
| 334 | - | |
| 335 | -        self._cache_size = cache_size
 | |
| 336 | -        self._write_cache_size(self._cache_size)
 | |
| 337 | - | |
| 338 | -    # get_quota_exceeded()
 | |
| 304 | +    # has_quota_exceeded()
 | |
| 339 | 305 |      #
 | 
| 340 | 306 |      # Checks if the current artifact cache size exceeds the quota.
 | 
| 341 | 307 |      #
 | 
| 342 | 308 |      # Returns:
 | 
| 343 | 309 |      #    (bool): True of the quota is exceeded
 | 
| 344 | 310 |      #
 | 
| 345 | -    def get_quota_exceeded(self):
 | |
| 311 | +    def has_quota_exceeded(self):
 | |
| 346 | 312 |          return self.get_cache_size() > self._cache_quota
 | 
| 347 | 313 |  | 
| 348 | 314 |      ################################################
 | 
| ... | ... | @@ -18,5 +18,4 @@ | 
| 18 | 18 |  #        Tristan Maat <tristan maat codethink co uk>
 | 
| 19 | 19 |  | 
| 20 | 20 |  from .elementjob import ElementJob
 | 
| 21 | -from .cachesizejob import CacheSizeJob
 | |
| 22 | 21 |  from .cleanupjob import CleanupJob | 
| 1 | -#  Copyright (C) 2018 Codethink Limited
 | |
| 2 | -#
 | |
| 3 | -#  This program is free software; you can redistribute it and/or
 | |
| 4 | -#  modify it under the terms of the GNU Lesser General Public
 | |
| 5 | -#  License as published by the Free Software Foundation; either
 | |
| 6 | -#  version 2 of the License, or (at your option) any later version.
 | |
| 7 | -#
 | |
| 8 | -#  This library is distributed in the hope that it will be useful,
 | |
| 9 | -#  but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
| 10 | -#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
 | |
| 11 | -#  Lesser General Public License for more details.
 | |
| 12 | -#
 | |
| 13 | -#  You should have received a copy of the GNU Lesser General Public
 | |
| 14 | -#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
 | |
| 15 | -#
 | |
| 16 | -#  Author:
 | |
| 17 | -#        Tristan Daniël Maat <tristan maat codethink co uk>
 | |
| 18 | -#
 | |
| 19 | -from .job import Job
 | |
| 20 | -from ..._platform import Platform
 | |
| 21 | - | |
| 22 | - | |
| 23 | -class CacheSizeJob(Job):
 | |
| 24 | -    def __init__(self, *args, complete_cb, **kwargs):
 | |
| 25 | -        super().__init__(*args, **kwargs)
 | |
| 26 | -        self._complete_cb = complete_cb
 | |
| 27 | - | |
| 28 | -        platform = Platform.get_platform()
 | |
| 29 | -        self._artifacts = platform.artifactcache
 | |
| 30 | - | |
| 31 | -    def child_process(self):
 | |
| 32 | -        return self._artifacts.compute_cache_size()
 | |
| 33 | - | |
| 34 | -    def parent_complete(self, success, result):
 | |
| 35 | -        if success:
 | |
| 36 | -            self._artifacts.set_cache_size(result)
 | |
| 37 | - | |
| 38 | -            if self._complete_cb:
 | |
| 39 | -                self._complete_cb(result)
 | |
| 40 | - | |
| 41 | -    def child_process_data(self):
 | |
| 42 | -        return {} | 
| ... | ... | @@ -21,9 +21,8 @@ from ..._platform import Platform | 
| 21 | 21 |  | 
| 22 | 22 |  | 
| 23 | 23 |  class CleanupJob(Job):
 | 
| 24 | -    def __init__(self, *args, complete_cb, **kwargs):
 | |
| 24 | +    def __init__(self, *args, **kwargs):
 | |
| 25 | 25 |          super().__init__(*args, **kwargs)
 | 
| 26 | -        self._complete_cb = complete_cb
 | |
| 27 | 26 |  | 
| 28 | 27 |          platform = Platform.get_platform()
 | 
| 29 | 28 |          self._artifacts = platform.artifactcache
 | 
| ... | ... | @@ -33,10 +32,7 @@ class CleanupJob(Job): | 
| 33 | 32 |  | 
| 34 | 33 |      def parent_complete(self, success, result):
 | 
| 35 | 34 |          if success:
 | 
| 36 | -            self._artifacts.set_cache_size(result)
 | |
| 37 | - | |
| 38 | -            if self._complete_cb:
 | |
| 39 | -                self._complete_cb()
 | |
| 40 | - | |
| 41 | -    def child_process_data(self):
 | |
| 42 | -        return {} | |
| 35 | +            # ArtifactCache.clean() returns the number of bytes cleaned.
 | |
| 36 | +            # We negate the number because the cache size is to be
 | |
| 37 | +            # decreased.
 | |
| 38 | +            self._artifacts.add_artifact_size(result * -1) | 
| ... | ... | @@ -87,31 +87,17 @@ class BuildQueue(Queue): | 
| 87 | 87 |  | 
| 88 | 88 |          return QueueStatus.READY
 | 
| 89 | 89 |  | 
| 90 | -    def _check_cache_size(self, job, element, artifact_size):
 | |
| 91 | - | |
| 92 | -        # After completing a build job, add the artifact size
 | |
| 93 | -        # as returned from Element._assemble() to the estimated
 | |
| 94 | -        # artifact cache size
 | |
| 95 | -        #
 | |
| 96 | -        platform = Platform.get_platform()
 | |
| 97 | -        artifacts = platform.artifactcache
 | |
| 98 | - | |
| 99 | -        artifacts.add_artifact_size(artifact_size)
 | |
| 100 | - | |
| 101 | -        # If the estimated size outgrows the quota, ask the scheduler
 | |
| 102 | -        # to queue a job to actually check the real cache size.
 | |
| 103 | -        #
 | |
| 104 | -        if artifacts.get_quota_exceeded():
 | |
| 105 | -            self._scheduler.check_cache_size()
 | |
| 106 | - | |
| 107 | 90 |      def done(self, job, element, result, success):
 | 
| 91 | +        if not success:
 | |
| 92 | +            return False
 | |
| 93 | + | |
| 94 | +        element._assemble_done()
 | |
| 108 | 95 |  | 
| 109 | -        if success:
 | |
| 110 | -            # Inform element in main process that assembly is done
 | |
| 111 | -            element._assemble_done()
 | |
| 96 | +        artifacts = Platform.get_platform().artifactcache
 | |
| 97 | +        artifacts.add_artifact_size(result)
 | |
| 112 | 98 |  | 
| 113 | -            # This has to be done after _assemble_done, such that the
 | |
| 114 | -            # element may register its cache key as required
 | |
| 115 | -            self._check_cache_size(job, element, result)
 | |
| 99 | +        # This has to be done after _assemble_done, such that the
 | |
| 100 | +        # element may register its cache key as required
 | |
| 101 | +        self._scheduler.check_cache_size()
 | |
| 116 | 102 |  | 
| 117 | -        return True | |
| 103 | +        return success | 
| 1 | 1 |  #
 | 
| 2 | -#  Copyright (C) 2016 Codethink Limited
 | |
| 2 | +#  Copyright (C) 2018 Codethink Limited
 | |
| 3 | 3 |  #
 | 
| 4 | 4 |  #  This program is free software; you can redistribute it and/or
 | 
| 5 | 5 |  #  modify it under the terms of the GNU Lesser General Public
 | 
| ... | ... | @@ -21,6 +21,7 @@ | 
| 21 | 21 |  # Local imports
 | 
| 22 | 22 |  from . import Queue, QueueStatus
 | 
| 23 | 23 |  from ..resources import ResourceType
 | 
| 24 | +from ..._platform import Platform
 | |
| 24 | 25 |  | 
| 25 | 26 |  | 
| 26 | 27 |  # A queue which pulls element artifacts
 | 
| ... | ... | @@ -52,18 +53,21 @@ class PullQueue(Queue): | 
| 52 | 53 |          else:
 | 
| 53 | 54 |              return QueueStatus.SKIP
 | 
| 54 | 55 |  | 
| 55 | -    def done(self, _, element, result, success):
 | |
| 56 | - | |
| 56 | +    def done(self, job, element, result, success):
 | |
| 57 | 57 |          if not success:
 | 
| 58 | 58 |              return False
 | 
| 59 | 59 |  | 
| 60 | 60 |          element._pull_done()
 | 
| 61 | 61 |  | 
| 62 | -        # Build jobs will check the "approximate" size first. Since we
 | |
| 63 | -        # do not get an artifact size from pull jobs, we have to
 | |
| 64 | -        # actually check the cache size.
 | |
| 62 | +        pulled, artifact_size = result
 | |
| 63 | + | |
| 64 | +        artifacts = Platform.get_platform().artifactcache
 | |
| 65 | +        artifacts.add_artifact_size(artifact_size)
 | |
| 66 | + | |
| 67 | +        # This has to be done after _pull_done, such that the
 | |
| 68 | +        # element may register its cache key as required
 | |
| 65 | 69 |          self._scheduler.check_cache_size()
 | 
| 66 | 70 |  | 
| 67 | 71 |          # Element._pull() returns True if it downloaded an artifact,
 | 
| 68 | 72 |          # here we want to appear skipped if we did not download.
 | 
| 69 | -        return result | |
| 73 | +        return pulled | 
| 1 | 1 |  #
 | 
| 2 | -#  Copyright (C) 2016 Codethink Limited
 | |
| 2 | +#  Copyright (C) 2018 Codethink Limited
 | |
| 3 | 3 |  #
 | 
| 4 | 4 |  #  This program is free software; you can redistribute it and/or
 | 
| 5 | 5 |  #  modify it under the terms of the GNU Lesser General Public
 | 
| ... | ... | @@ -28,7 +28,7 @@ from contextlib import contextmanager | 
| 28 | 28 |  | 
| 29 | 29 |  # Local imports
 | 
| 30 | 30 |  from .resources import Resources, ResourceType
 | 
| 31 | -from .jobs import CacheSizeJob, CleanupJob
 | |
| 31 | +from .jobs import CleanupJob
 | |
| 32 | 32 |  from .._platform import Platform
 | 
| 33 | 33 |  | 
| 34 | 34 |  | 
| ... | ... | @@ -243,22 +243,19 @@ class Scheduler(): | 
| 243 | 243 |  | 
| 244 | 244 |      # check_cache_size():
 | 
| 245 | 245 |      #
 | 
| 246 | -    # Queues a cache size calculation job, after the cache
 | |
| 247 | -    # size is calculated, a cleanup job will be run automatically
 | |
| 248 | -    # if needed.
 | |
| 249 | -    #
 | |
| 250 | -    # FIXME: This should ensure that only one cache size job
 | |
| 251 | -    #        is ever pending at a given time. If a cache size
 | |
| 252 | -    #        job is already running, it is correct to queue
 | |
| 253 | -    #        a new one, it is incorrect to have more than one
 | |
| 254 | -    #        of these jobs pending at a given time, though.
 | |
| 246 | +    # Queues a cleanup job if the size of the artifact cache exceeded
 | |
| 247 | +    # the quota
 | |
| 255 | 248 |      #
 | 
| 256 | 249 |      def check_cache_size(self):
 | 
| 257 | -        job = CacheSizeJob(self, 'cache_size', 'cache_size/cache_size',
 | |
| 258 | -                           resources=[ResourceType.CACHE,
 | |
| 259 | -                                      ResourceType.PROCESS],
 | |
| 260 | -                           complete_cb=self._run_cleanup)
 | |
| 261 | -        self.schedule_jobs([job])
 | |
| 250 | +        artifacts = Platform.get_platform().artifactcache
 | |
| 251 | + | |
| 252 | +        if artifacts.has_get_quota_exceeded():
 | |
| 253 | +            job = CleanupJob(self, 'Clean artifact cache',
 | |
| 254 | +                             'cleanup/cleanup',
 | |
| 255 | +                             resources=[ResourceType.CACHE,
 | |
| 256 | +                                        ResourceType.PROCESS],
 | |
| 257 | +                             exclusive_resources=[ResourceType.CACHE])
 | |
| 258 | +            self.schedule_jobs([job])
 | |
| 262 | 259 |  | 
| 263 | 260 |      #######################################################
 | 
| 264 | 261 |      #                  Local Private Methods              #
 | 
| ... | ... | @@ -335,32 +332,6 @@ class Scheduler(): | 
| 335 | 332 |          self.schedule_jobs(ready)
 | 
| 336 | 333 |          self._sched()
 | 
| 337 | 334 |  | 
| 338 | -    # _run_cleanup()
 | |
| 339 | -    #
 | |
| 340 | -    # Schedules the cache cleanup job if the passed size
 | |
| 341 | -    # exceeds the cache quota.
 | |
| 342 | -    #
 | |
| 343 | -    # Args:
 | |
| 344 | -    #    cache_size (int): The calculated cache size (ignored)
 | |
| 345 | -    #
 | |
| 346 | -    # NOTE: This runs in response to completion of the cache size
 | |
| 347 | -    #       calculation job lauched by Scheduler.check_cache_size(),
 | |
| 348 | -    #       which will report the calculated cache size.
 | |
| 349 | -    #
 | |
| 350 | -    def _run_cleanup(self, cache_size):
 | |
| 351 | -        platform = Platform.get_platform()
 | |
| 352 | -        artifacts = platform.artifactcache
 | |
| 353 | - | |
| 354 | -        if not artifacts.get_quota_exceeded():
 | |
| 355 | -            return
 | |
| 356 | - | |
| 357 | -        job = CleanupJob(self, 'cleanup', 'cleanup/cleanup',
 | |
| 358 | -                         resources=[ResourceType.CACHE,
 | |
| 359 | -                                    ResourceType.PROCESS],
 | |
| 360 | -                         exclusive_resources=[ResourceType.CACHE],
 | |
| 361 | -                         complete_cb=None)
 | |
| 362 | -        self.schedule_jobs([job])
 | |
| 363 | - | |
| 364 | 335 |      # _suspend_jobs()
 | 
| 365 | 336 |      #
 | 
| 366 | 337 |      # Suspend all ongoing jobs.
 | 
| ... | ... | @@ -1759,7 +1759,7 @@ class Element(Plugin): | 
| 1759 | 1759 |              display_key = self._get_brief_display_key()
 | 
| 1760 | 1760 |              self.info("Downloaded artifact {}".format(display_key))
 | 
| 1761 | 1761 |  | 
| 1762 | -        return pulled
 | |
| 1762 | +        return pulled, artifact_size
 | |
| 1763 | 1763 |  | 
| 1764 | 1764 |      # _skip_push():
 | 
| 1765 | 1765 |      #
 | 
