Tristan Van Berkom pushed to branch tristan/fix-cache-exclusivity-1.2 at BuildStream / buildstream
Commits:
- 
80e912e6
by Tristan Van Berkom at 2018-09-10T06:56:11Z
- 
f81637c5
by Tristan Van Berkom at 2018-09-10T06:56:11Z
- 
f074a8ac
by Tristan Van Berkom at 2018-09-10T06:56:11Z
- 
d2f5c4f3
by Tristan Van Berkom at 2018-09-10T06:56:12Z
- 
eb9eb6db
by Tristan Van Berkom at 2018-09-10T06:56:12Z
- 
8293d8da
by Tristan Van Berkom at 2018-09-10T06:56:12Z
- 
879d5117
by Tristan Van Berkom at 2018-09-10T06:56:12Z
- 
2c898796
by Tristan Van Berkom at 2018-09-10T06:56:12Z
- 
55266475
by Tristan Van Berkom at 2018-09-10T06:56:12Z
- 
af0bfaee
by Tristan Van Berkom at 2018-09-10T06:56:12Z
- 
222c5034
by Tristan Van Berkom at 2018-09-10T06:56:12Z
- 
67fa96e1
by Tristan Van Berkom at 2018-09-10T06:56:12Z
- 
303240ab
by Tristan Van Berkom at 2018-09-10T06:56:12Z
- 
78499b7d
by Tristan Van Berkom at 2018-09-10T06:56:12Z
12 changed files:
- buildstream/_artifactcache/artifactcache.py
- buildstream/_artifactcache/cascache.py
- buildstream/_scheduler/jobs/cachesizejob.py
- buildstream/_scheduler/jobs/cleanupjob.py
- buildstream/_scheduler/jobs/elementjob.py
- buildstream/_scheduler/queues/buildqueue.py
- buildstream/_scheduler/queues/pullqueue.py
- buildstream/_scheduler/queues/queue.py
- buildstream/_scheduler/resources.py
- buildstream/_scheduler/scheduler.py
- buildstream/element.py
- tests/artifactcache/expiry.py
Changes:
| ... | ... | @@ -81,19 +81,16 @@ ArtifactCacheSpec.__new__.__defaults__ = (None, None, None) | 
| 81 | 81 |  class ArtifactCache():
 | 
| 82 | 82 |      def __init__(self, context):
 | 
| 83 | 83 |          self.context = context
 | 
| 84 | -        self.required_artifacts = set()
 | |
| 85 | 84 |          self.extractdir = os.path.join(context.artifactdir, 'extract')
 | 
| 86 | 85 |          self.tmpdir = os.path.join(context.artifactdir, 'tmp')
 | 
| 87 | 86 |  | 
| 88 | -        self.estimated_size = None
 | |
| 89 | - | |
| 90 | 87 |          self.global_remote_specs = []
 | 
| 91 | 88 |          self.project_remote_specs = {}
 | 
| 92 | 89 |  | 
| 93 | -        self._local = False
 | |
| 94 | -        self.cache_size = None
 | |
| 95 | -        self.cache_quota = None
 | |
| 96 | -        self.cache_lower_threshold = None
 | |
| 90 | +        self._required_artifacts = set()      # The artifacts required for this session
 | |
| 91 | +        self._cache_size = None               # The current cache size, sometimes it's an estimate
 | |
| 92 | +        self._cache_quota = None              # The cache quota
 | |
| 93 | +        self._cache_lower_threshold = None    # The target cache size for a cleanup
 | |
| 97 | 94 |  | 
| 98 | 95 |          os.makedirs(self.extractdir, exist_ok=True)
 | 
| 99 | 96 |          os.makedirs(self.tmpdir, exist_ok=True)
 | 
| ... | ... | @@ -212,8 +209,8 @@ class ArtifactCache(): | 
| 212 | 209 |              weak_key = element._get_cache_key(strength=_KeyStrength.WEAK)
 | 
| 213 | 210 |  | 
| 214 | 211 |              for key in (strong_key, weak_key):
 | 
| 215 | -                if key and key not in self.required_artifacts:
 | |
| 216 | -                    self.required_artifacts.add(key)
 | |
| 212 | +                if key and key not in self._required_artifacts:
 | |
| 213 | +                    self._required_artifacts.add(key)
 | |
| 217 | 214 |  | 
| 218 | 215 |                      # We also update the usage times of any artifacts
 | 
| 219 | 216 |                      # we will be using, which helps preventing a
 | 
| ... | ... | @@ -228,10 +225,16 @@ class ArtifactCache(): | 
| 228 | 225 |      #
 | 
| 229 | 226 |      # Clean the artifact cache as much as possible.
 | 
| 230 | 227 |      #
 | 
| 228 | +    # Returns:
 | |
| 229 | +    #    (int): The size of the cache after having cleaned up
 | |
| 230 | +    #
 | |
| 231 | 231 |      def clean(self):
 | 
| 232 | 232 |          artifacts = self.list_artifacts()
 | 
| 233 | 233 |  | 
| 234 | -        while self.calculate_cache_size() >= self.cache_quota - self.cache_lower_threshold:
 | |
| 234 | +        # Do a real computation of the cache size once, just in case
 | |
| 235 | +        self.compute_cache_size()
 | |
| 236 | + | |
| 237 | +        while self.get_cache_size() >= self._cache_lower_threshold:
 | |
| 235 | 238 |              try:
 | 
| 236 | 239 |                  to_remove = artifacts.pop(0)
 | 
| 237 | 240 |              except IndexError:
 | 
| ... | ... | @@ -245,7 +248,7 @@ class ArtifactCache(): | 
| 245 | 248 |                            "Please increase the cache-quota in {}."
 | 
| 246 | 249 |                            .format(self.context.config_origin or default_conf))
 | 
| 247 | 250 |  | 
| 248 | -                if self.calculate_cache_size() > self.cache_quota:
 | |
| 251 | +                if self.get_quota_exceeded():
 | |
| 249 | 252 |                      raise ArtifactError("Cache too full. Aborting.",
 | 
| 250 | 253 |                                          detail=detail,
 | 
| 251 | 254 |                                          reason="cache-too-full")
 | 
| ... | ... | @@ -253,46 +256,94 @@ class ArtifactCache(): | 
| 253 | 256 |                      break
 | 
| 254 | 257 |  | 
| 255 | 258 |              key = to_remove.rpartition('/')[2]
 | 
| 256 | -            if key not in self.required_artifacts:
 | |
| 259 | +            if key not in self._required_artifacts:
 | |
| 260 | + | |
| 261 | +                # Remove the actual artifact, if it's not required.
 | |
| 257 | 262 |                  size = self.remove(to_remove)
 | 
| 258 | -                if size:
 | |
| 259 | -                    self.cache_size -= size
 | |
| 263 | + | |
| 264 | +                # Remove the size from the removed size
 | |
| 265 | +                self.set_cache_size(self._cache_size - size)
 | |
| 260 | 266 |  | 
| 261 | 267 |          # This should be O(1) if implemented correctly
 | 
| 262 | -        return self.calculate_cache_size()
 | |
| 268 | +        return self.get_cache_size()
 | |
| 263 | 269 |  | 
| 264 | -    # get_approximate_cache_size()
 | |
| 270 | +    # compute_cache_size()
 | |
| 265 | 271 |      #
 | 
| 266 | -    # A cheap method that aims to serve as an upper limit on the
 | |
| 267 | -    # artifact cache size.
 | |
| 272 | +    # Computes the real artifact cache size by calling
 | |
| 273 | +    # the abstract calculate_cache_size() method.
 | |
| 268 | 274 |      #
 | 
| 269 | -    # The cache size reported by this function will normally be larger
 | |
| 270 | -    # than the real cache size, since it is calculated using the
 | |
| 271 | -    # pre-commit artifact size, but for very small artifacts in
 | |
| 272 | -    # certain caches additional overhead could cause this to be
 | |
| 273 | -    # smaller than, but close to, the actual size.
 | |
| 275 | +    # Returns:
 | |
| 276 | +    #    (int): The size of the artifact cache.
 | |
| 277 | +    #
 | |
| 278 | +    def compute_cache_size(self):
 | |
| 279 | +        self._cache_size = self.calculate_cache_size()
 | |
| 280 | + | |
| 281 | +        return self._cache_size
 | |
| 282 | + | |
| 283 | +    # add_artifact_size()
 | |
| 274 | 284 |      #
 | 
| 275 | -    # Nonetheless, in practice this should be safe to use as an upper
 | |
| 276 | -    # limit on the cache size.
 | |
| 285 | +    # Adds the reported size of a newly cached artifact to the
 | |
| 286 | +    # overall estimated size.
 | |
| 277 | 287 |      #
 | 
| 278 | -    # If the cache has built-in constant-time size reporting, please
 | |
| 279 | -    # feel free to override this method with a more accurate
 | |
| 280 | -    # implementation.
 | |
| 288 | +    # Args:
 | |
| 289 | +    #     artifact_size (int): The size to add.
 | |
| 290 | +    #
 | |
| 291 | +    def add_artifact_size(self, artifact_size):
 | |
| 292 | +        cache_size = self.get_cache_size()
 | |
| 293 | +        cache_size += artifact_size
 | |
| 294 | + | |
| 295 | +        self.set_cache_size(cache_size)
 | |
| 296 | + | |
| 297 | +    # get_cache_size()
 | |
| 298 | +    #
 | |
| 299 | +    # Fetches the cached size of the cache, this is sometimes
 | |
| 300 | +    # an estimate and periodically adjusted to the real size
 | |
| 301 | +    # when a cache size calculation job runs.
 | |
| 302 | +    #
 | |
| 303 | +    # When it is an estimate, the value is either correct, or
 | |
| 304 | +    # it is greater than the actual cache size.
 | |
| 281 | 305 |      #
 | 
| 282 | 306 |      # Returns:
 | 
| 283 | 307 |      #     (int) An approximation of the artifact cache size.
 | 
| 284 | 308 |      #
 | 
| 285 | -    def get_approximate_cache_size(self):
 | |
| 286 | -        # If we don't currently have an estimate, figure out the real
 | |
| 287 | -        # cache size.
 | |
| 288 | -        if self.estimated_size is None:
 | |
| 309 | +    def get_cache_size(self):
 | |
| 310 | + | |
| 311 | +        # If we don't currently have an estimate, figure out the real cache size.
 | |
| 312 | +        if self._cache_size is None:
 | |
| 289 | 313 |              stored_size = self._read_cache_size()
 | 
| 290 | 314 |              if stored_size is not None:
 | 
| 291 | -                self.estimated_size = stored_size
 | |
| 315 | +                self._cache_size = stored_size
 | |
| 292 | 316 |              else:
 | 
| 293 | -                self.estimated_size = self.calculate_cache_size()
 | |
| 317 | +                self.compute_cache_size()
 | |
| 318 | + | |
| 319 | +        return self._cache_size
 | |
| 294 | 320 |  | 
| 295 | -        return self.estimated_size
 | |
| 321 | +    # set_cache_size()
 | |
| 322 | +    #
 | |
| 323 | +    # Forcefully set the overall cache size.
 | |
| 324 | +    #
 | |
| 325 | +    # This is used to update the size in the main process after
 | |
| 326 | +    # having calculated in a cleanup or a cache size calculation job.
 | |
| 327 | +    #
 | |
| 328 | +    # Args:
 | |
| 329 | +    #     cache_size (int): The size to set.
 | |
| 330 | +    #
 | |
| 331 | +    def set_cache_size(self, cache_size):
 | |
| 332 | + | |
| 333 | +        assert cache_size is not None
 | |
| 334 | + | |
| 335 | +        self._cache_size = cache_size
 | |
| 336 | +        self._write_cache_size(self._cache_size)
 | |
| 337 | + | |
| 338 | +    # get_quota_exceeded()
 | |
| 339 | +    #
 | |
| 340 | +    # Checks if the current artifact cache size exceeds the quota.
 | |
| 341 | +    #
 | |
| 342 | +    # Returns:
 | |
| 343 | +    #    (bool): True of the quota is exceeded
 | |
| 344 | +    #
 | |
| 345 | +    def get_quota_exceeded(self):
 | |
| 346 | +        return self.get_cache_size() > self._cache_quota
 | |
| 296 | 347 |  | 
| 297 | 348 |      ################################################
 | 
| 298 | 349 |      # Abstract methods for subclasses to implement #
 | 
| ... | ... | @@ -484,11 +535,8 @@ class ArtifactCache(): | 
| 484 | 535 |      #
 | 
| 485 | 536 |      # Return the real artifact cache size.
 | 
| 486 | 537 |      #
 | 
| 487 | -    # Implementations should also use this to update estimated_size.
 | |
| 488 | -    #
 | |
| 489 | 538 |      # Returns:
 | 
| 490 | -    #
 | |
| 491 | -    # (int) The size of the artifact cache.
 | |
| 539 | +    #    (int): The size of the artifact cache.
 | |
| 492 | 540 |      #
 | 
| 493 | 541 |      def calculate_cache_size(self):
 | 
| 494 | 542 |          raise ImplError("Cache '{kind}' does not implement calculate_cache_size()"
 | 
| ... | ... | @@ -535,39 +583,13 @@ class ArtifactCache(): | 
| 535 | 583 |          with self.context.timed_activity("Initializing remote caches", silent_nested=True):
 | 
| 536 | 584 |              self.initialize_remotes(on_failure=remote_failed)
 | 
| 537 | 585 |  | 
| 538 | -    # _add_artifact_size()
 | |
| 539 | -    #
 | |
| 540 | -    # Since we cannot keep track of the cache size between threads,
 | |
| 541 | -    # this method will be called by the main process every time a
 | |
| 542 | -    # process that added something to the cache finishes.
 | |
| 543 | -    #
 | |
| 544 | -    # This will then add the reported size to
 | |
| 545 | -    # ArtifactCache.estimated_size.
 | |
| 546 | -    #
 | |
| 547 | -    def _add_artifact_size(self, artifact_size):
 | |
| 548 | -        if not self.estimated_size:
 | |
| 549 | -            self.estimated_size = self.calculate_cache_size()
 | |
| 550 | - | |
| 551 | -        self.estimated_size += artifact_size
 | |
| 552 | -        self._write_cache_size(self.estimated_size)
 | |
| 553 | - | |
| 554 | -    # _set_cache_size()
 | |
| 555 | -    #
 | |
| 556 | -    # Similarly to the above method, when we calculate the actual size
 | |
| 557 | -    # in a child thread, we can't update it. We instead pass the value
 | |
| 558 | -    # back to the main thread and update it there.
 | |
| 559 | -    #
 | |
| 560 | -    def _set_cache_size(self, cache_size):
 | |
| 561 | -        self.estimated_size = cache_size
 | |
| 562 | - | |
| 563 | -        # set_cache_size is called in cleanup, where it may set the cache to None
 | |
| 564 | -        if self.estimated_size is not None:
 | |
| 565 | -            self._write_cache_size(self.estimated_size)
 | |
| 566 | - | |
| 567 | 586 |      # _write_cache_size()
 | 
| 568 | 587 |      #
 | 
| 569 | 588 |      # Writes the given size of the artifact to the cache's size file
 | 
| 570 | 589 |      #
 | 
| 590 | +    # Args:
 | |
| 591 | +    #    size (int): The size of the artifact cache to record
 | |
| 592 | +    #
 | |
| 571 | 593 |      def _write_cache_size(self, size):
 | 
| 572 | 594 |          assert isinstance(size, int)
 | 
| 573 | 595 |          size_file_path = os.path.join(self.context.artifactdir, CACHE_SIZE_FILE)
 | 
| ... | ... | @@ -579,6 +601,9 @@ class ArtifactCache(): | 
| 579 | 601 |      # Reads and returns the size of the artifact cache that's stored in the
 | 
| 580 | 602 |      # cache's size file
 | 
| 581 | 603 |      #
 | 
| 604 | +    # Returns:
 | |
| 605 | +    #    (int): The size of the artifact cache, as recorded in the file
 | |
| 606 | +    #
 | |
| 582 | 607 |      def _read_cache_size(self):
 | 
| 583 | 608 |          size_file_path = os.path.join(self.context.artifactdir, CACHE_SIZE_FILE)
 | 
| 584 | 609 |  | 
| ... | ... | @@ -628,13 +653,13 @@ class ArtifactCache(): | 
| 628 | 653 |          stat = os.statvfs(artifactdir_volume)
 | 
| 629 | 654 |          available_space = (stat.f_bsize * stat.f_bavail)
 | 
| 630 | 655 |  | 
| 631 | -        cache_size = self.get_approximate_cache_size()
 | |
| 656 | +        cache_size = self.get_cache_size()
 | |
| 632 | 657 |  | 
| 633 | 658 |          # Ensure system has enough storage for the cache_quota
 | 
| 634 | 659 |          #
 | 
| 635 | 660 |          # If cache_quota is none, set it to the maximum it could possibly be.
 | 
| 636 | 661 |          #
 | 
| 637 | -        # Also check that cache_quota is atleast as large as our headroom.
 | |
| 662 | +        # Also check that cache_quota is at least as large as our headroom.
 | |
| 638 | 663 |          #
 | 
| 639 | 664 |          if cache_quota is None:  # Infinity, set to max system storage
 | 
| 640 | 665 |              cache_quota = cache_size + available_space
 | 
| ... | ... | @@ -660,8 +685,8 @@ class ArtifactCache(): | 
| 660 | 685 |          # if we end up writing more than 2G, but hey, this stuff is
 | 
| 661 | 686 |          # already really fuzzy.
 | 
| 662 | 687 |          #
 | 
| 663 | -        self.cache_quota = cache_quota - headroom
 | |
| 664 | -        self.cache_lower_threshold = self.cache_quota / 2
 | |
| 688 | +        self._cache_quota = cache_quota - headroom
 | |
| 689 | +        self._cache_lower_threshold = self._cache_quota / 2
 | |
| 665 | 690 |  | 
| 666 | 691 |  | 
| 667 | 692 |  # _configured_remote_artifact_cache_specs():
 | 
| ... | ... | @@ -120,8 +120,6 @@ class CASCache(ArtifactCache): | 
| 120 | 120 |          for ref in refs:
 | 
| 121 | 121 |              self.set_ref(ref, tree)
 | 
| 122 | 122 |  | 
| 123 | -        self.cache_size = None
 | |
| 124 | - | |
| 125 | 123 |      def diff(self, element, key_a, key_b, *, subdir=None):
 | 
| 126 | 124 |          ref_a = self.get_artifact_fullname(element, key_a)
 | 
| 127 | 125 |          ref_b = self.get_artifact_fullname(element, key_b)
 | 
| ... | ... | @@ -488,11 +486,7 @@ class CASCache(ArtifactCache): | 
| 488 | 486 |              raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e
 | 
| 489 | 487 |  | 
| 490 | 488 |      def calculate_cache_size(self):
 | 
| 491 | -        if self.cache_size is None:
 | |
| 492 | -            self.cache_size = utils._get_dir_size(self.casdir)
 | |
| 493 | -            self.estimated_size = self.cache_size
 | |
| 494 | - | |
| 495 | -        return self.cache_size
 | |
| 489 | +        return utils._get_dir_size(self.casdir)
 | |
| 496 | 490 |  | 
| 497 | 491 |      # list_artifacts():
 | 
| 498 | 492 |      #
 | 
| ... | ... | @@ -24,15 +24,19 @@ class CacheSizeJob(Job): | 
| 24 | 24 |      def __init__(self, *args, complete_cb, **kwargs):
 | 
| 25 | 25 |          super().__init__(*args, **kwargs)
 | 
| 26 | 26 |          self._complete_cb = complete_cb
 | 
| 27 | -        self._cache = Platform._instance.artifactcache
 | |
| 27 | + | |
| 28 | +        platform = Platform.get_platform()
 | |
| 29 | +        self._artifacts = platform.artifactcache
 | |
| 28 | 30 |  | 
| 29 | 31 |      def child_process(self):
 | 
| 30 | -        return self._cache.calculate_cache_size()
 | |
| 32 | +        return self._artifacts.compute_cache_size()
 | |
| 31 | 33 |  | 
| 32 | 34 |      def parent_complete(self, success, result):
 | 
| 33 | -        self._cache._set_cache_size(result)
 | |
| 34 | -        if self._complete_cb:
 | |
| 35 | -            self._complete_cb(result)
 | |
| 35 | +        if success:
 | |
| 36 | +            self._artifacts.set_cache_size(result)
 | |
| 37 | + | |
| 38 | +            if self._complete_cb:
 | |
| 39 | +                self._complete_cb(result)
 | |
| 36 | 40 |  | 
| 37 | 41 |      def child_process_data(self):
 | 
| 38 | 42 |          return {} | 
| ... | ... | @@ -24,15 +24,19 @@ class CleanupJob(Job): | 
| 24 | 24 |      def __init__(self, *args, complete_cb, **kwargs):
 | 
| 25 | 25 |          super().__init__(*args, **kwargs)
 | 
| 26 | 26 |          self._complete_cb = complete_cb
 | 
| 27 | -        self._cache = Platform._instance.artifactcache
 | |
| 27 | + | |
| 28 | +        platform = Platform.get_platform()
 | |
| 29 | +        self._artifacts = platform.artifactcache
 | |
| 28 | 30 |  | 
| 29 | 31 |      def child_process(self):
 | 
| 30 | -        return self._cache.clean()
 | |
| 32 | +        return self._artifacts.clean()
 | |
| 31 | 33 |  | 
| 32 | 34 |      def parent_complete(self, success, result):
 | 
| 33 | -        self._cache._set_cache_size(result)
 | |
| 34 | -        if self._complete_cb:
 | |
| 35 | -            self._complete_cb()
 | |
| 35 | +        if success:
 | |
| 36 | +            self._artifacts.set_cache_size(result)
 | |
| 37 | + | |
| 38 | +            if self._complete_cb:
 | |
| 39 | +                self._complete_cb()
 | |
| 36 | 40 |  | 
| 37 | 41 |      def child_process_data(self):
 | 
| 38 | 42 |          return {} | 
| ... | ... | @@ -109,13 +109,7 @@ class ElementJob(Job): | 
| 109 | 109 |          data = {}
 | 
| 110 | 110 |  | 
| 111 | 111 |          workspace = self._element._get_workspace()
 | 
| 112 | -        artifact_size = self._element._get_artifact_size()
 | |
| 113 | -        cache_size = self._element._get_artifact_cache().calculate_cache_size()
 | |
| 114 | - | |
| 115 | 112 |          if workspace is not None:
 | 
| 116 | 113 |              data['workspace'] = workspace.to_dict()
 | 
| 117 | -        if artifact_size is not None:
 | |
| 118 | -            data['artifact_size'] = artifact_size
 | |
| 119 | -        data['cache_size'] = cache_size
 | |
| 120 | 114 |  | 
| 121 | 115 |          return data | 
| ... | ... | @@ -20,6 +20,7 @@ | 
| 20 | 20 |  | 
| 21 | 21 |  from . import Queue, QueueStatus
 | 
| 22 | 22 |  from ..resources import ResourceType
 | 
| 23 | +from ..._platform import Platform
 | |
| 23 | 24 |  | 
| 24 | 25 |  | 
| 25 | 26 |  # A queue which assembles elements
 | 
| ... | ... | @@ -28,11 +29,10 @@ class BuildQueue(Queue): | 
| 28 | 29 |  | 
| 29 | 30 |      action_name = "Build"
 | 
| 30 | 31 |      complete_name = "Built"
 | 
| 31 | -    resources = [ResourceType.PROCESS]
 | |
| 32 | +    resources = [ResourceType.PROCESS, ResourceType.CACHE]
 | |
| 32 | 33 |  | 
| 33 | 34 |      def process(self, element):
 | 
| 34 | -        element._assemble()
 | |
| 35 | -        return element._get_unique_id()
 | |
| 35 | +        return element._assemble()
 | |
| 36 | 36 |  | 
| 37 | 37 |      def status(self, element):
 | 
| 38 | 38 |          # state of dependencies may have changed, recalculate element state
 | 
| ... | ... | @@ -51,18 +51,22 @@ class BuildQueue(Queue): | 
| 51 | 51 |  | 
| 52 | 52 |          return QueueStatus.READY
 | 
| 53 | 53 |  | 
| 54 | -    def _check_cache_size(self, job, element):
 | |
| 55 | -        if not job.child_data:
 | |
| 56 | -            return
 | |
| 54 | +    def _check_cache_size(self, job, element, artifact_size):
 | |
| 57 | 55 |  | 
| 58 | -        artifact_size = job.child_data.get('artifact_size', False)
 | |
| 56 | +        # After completing a build job, add the artifact size
 | |
| 57 | +        # as returned from Element._assemble() to the estimated
 | |
| 58 | +        # artifact cache size
 | |
| 59 | +        #
 | |
| 60 | +        platform = Platform.get_platform()
 | |
| 61 | +        artifacts = platform.artifactcache
 | |
| 59 | 62 |  | 
| 60 | -        if artifact_size:
 | |
| 61 | -            cache = element._get_artifact_cache()
 | |
| 62 | -            cache._add_artifact_size(artifact_size)
 | |
| 63 | +        artifacts.add_artifact_size(artifact_size)
 | |
| 63 | 64 |  | 
| 64 | -            if cache.get_approximate_cache_size() > cache.cache_quota:
 | |
| 65 | -                self._scheduler._check_cache_size_real()
 | |
| 65 | +        # If the estimated size outgrows the quota, ask the scheduler
 | |
| 66 | +        # to queue a job to actually check the real cache size.
 | |
| 67 | +        #
 | |
| 68 | +        if artifacts.get_quota_exceeded():
 | |
| 69 | +            self._scheduler.check_cache_size()
 | |
| 66 | 70 |  | 
| 67 | 71 |      def done(self, job, element, result, success):
 | 
| 68 | 72 |  | 
| ... | ... | @@ -70,8 +74,8 @@ class BuildQueue(Queue): | 
| 70 | 74 |              # Inform element in main process that assembly is done
 | 
| 71 | 75 |              element._assemble_done()
 | 
| 72 | 76 |  | 
| 73 | -        # This has to be done after _assemble_done, such that the
 | |
| 74 | -        # element may register its cache key as required
 | |
| 75 | -        self._check_cache_size(job, element)
 | |
| 77 | +            # This has to be done after _assemble_done, such that the
 | |
| 78 | +            # element may register its cache key as required
 | |
| 79 | +            self._check_cache_size(job, element, result)
 | |
| 76 | 80 |  | 
| 77 | 81 |          return True | 
| ... | ... | @@ -29,7 +29,7 @@ class PullQueue(Queue): | 
| 29 | 29 |  | 
| 30 | 30 |      action_name = "Pull"
 | 
| 31 | 31 |      complete_name = "Pulled"
 | 
| 32 | -    resources = [ResourceType.DOWNLOAD]
 | |
| 32 | +    resources = [ResourceType.DOWNLOAD, ResourceType.CACHE]
 | |
| 33 | 33 |  | 
| 34 | 34 |      def process(self, element):
 | 
| 35 | 35 |          # returns whether an artifact was downloaded or not
 | 
| ... | ... | @@ -62,7 +62,7 @@ class PullQueue(Queue): | 
| 62 | 62 |          # Build jobs will check the "approximate" size first. Since we
 | 
| 63 | 63 |          # do not get an artifact size from pull jobs, we have to
 | 
| 64 | 64 |          # actually check the cache size.
 | 
| 65 | -        self._scheduler._check_cache_size_real()
 | |
| 65 | +        self._scheduler.check_cache_size()
 | |
| 66 | 66 |  | 
| 67 | 67 |          # Element._pull() returns True if it downloaded an artifact,
 | 
| 68 | 68 |          # here we want to appear skipped if we did not download.
 | 
| ... | ... | @@ -300,8 +300,6 @@ class Queue(): | 
| 300 | 300 |          # Update values that need to be synchronized in the main task
 | 
| 301 | 301 |          # before calling any queue implementation
 | 
| 302 | 302 |          self._update_workspaces(element, job)
 | 
| 303 | -        if job.child_data:
 | |
| 304 | -            element._get_artifact_cache().cache_size = job.child_data.get('cache_size')
 | |
| 305 | 303 |  | 
| 306 | 304 |          # Give the result of the job to the Queue implementor,
 | 
| 307 | 305 |          # and determine if it should be considered as processed
 | 
| ... | ... | @@ -8,7 +8,7 @@ class ResourceType(): | 
| 8 | 8 |  class Resources():
 | 
| 9 | 9 |      def __init__(self, num_builders, num_fetchers, num_pushers):
 | 
| 10 | 10 |          self._max_resources = {
 | 
| 11 | -            ResourceType.CACHE: 1,
 | |
| 11 | +            ResourceType.CACHE: 0,
 | |
| 12 | 12 |              ResourceType.DOWNLOAD: num_fetchers,
 | 
| 13 | 13 |              ResourceType.PROCESS: num_builders,
 | 
| 14 | 14 |              ResourceType.UPLOAD: num_pushers
 | 
| ... | ... | @@ -239,6 +239,25 @@ class Scheduler(): | 
| 239 | 239 |          self._schedule_queue_jobs()
 | 
| 240 | 240 |          self._sched()
 | 
| 241 | 241 |  | 
| 242 | +    # check_cache_size():
 | |
| 243 | +    #
 | |
| 244 | +    # Queues a cache size calculation job, after the cache
 | |
| 245 | +    # size is calculated, a cleanup job will be run automatically
 | |
| 246 | +    # if needed.
 | |
| 247 | +    #
 | |
| 248 | +    # FIXME: This should ensure that only one cache size job
 | |
| 249 | +    #        is ever pending at a given time. If a cache size
 | |
| 250 | +    #        job is already running, it is correct to queue
 | |
| 251 | +    #        a new one, it is incorrect to have more than one
 | |
| 252 | +    #        of these jobs pending at a given time, though.
 | |
| 253 | +    #
 | |
| 254 | +    def check_cache_size(self):
 | |
| 255 | +        job = CacheSizeJob(self, 'cache_size', 'cache_size/cache_size',
 | |
| 256 | +                           resources=[ResourceType.CACHE,
 | |
| 257 | +                                      ResourceType.PROCESS],
 | |
| 258 | +                           complete_cb=self._run_cleanup)
 | |
| 259 | +        self.schedule_jobs([job])
 | |
| 260 | + | |
| 242 | 261 |      #######################################################
 | 
| 243 | 262 |      #                  Local Private Methods              #
 | 
| 244 | 263 |      #######################################################
 | 
| ... | ... | @@ -314,26 +333,32 @@ class Scheduler(): | 
| 314 | 333 |          self.schedule_jobs(ready)
 | 
| 315 | 334 |          self._sched()
 | 
| 316 | 335 |  | 
| 336 | +    # _run_cleanup()
 | |
| 337 | +    #
 | |
| 338 | +    # Schedules the cache cleanup job if the passed size
 | |
| 339 | +    # exceeds the cache quota.
 | |
| 340 | +    #
 | |
| 341 | +    # Args:
 | |
| 342 | +    #    cache_size (int): The calculated cache size (ignored)
 | |
| 343 | +    #
 | |
| 344 | +    # NOTE: This runs in response to completion of the cache size
 | |
| 345 | +    #       calculation job lauched by Scheduler.check_cache_size(),
 | |
| 346 | +    #       which will report the calculated cache size.
 | |
| 347 | +    #
 | |
| 317 | 348 |      def _run_cleanup(self, cache_size):
 | 
| 318 | 349 |          platform = Platform.get_platform()
 | 
| 319 | -        if cache_size and cache_size < platform.artifactcache.cache_quota:
 | |
| 350 | +        artifacts = platform.artifactcache
 | |
| 351 | + | |
| 352 | +        if not artifacts.get_quota_exceeded():
 | |
| 320 | 353 |              return
 | 
| 321 | 354 |  | 
| 322 | -        job = CleanupJob(self, 'cleanup', 'cleanup',
 | |
| 355 | +        job = CleanupJob(self, 'cleanup', 'cleanup/cleanup',
 | |
| 323 | 356 |                           resources=[ResourceType.CACHE,
 | 
| 324 | 357 |                                      ResourceType.PROCESS],
 | 
| 325 | 358 |                           exclusive_resources=[ResourceType.CACHE],
 | 
| 326 | 359 |                           complete_cb=None)
 | 
| 327 | 360 |          self.schedule_jobs([job])
 | 
| 328 | 361 |  | 
| 329 | -    def _check_cache_size_real(self):
 | |
| 330 | -        job = CacheSizeJob(self, 'cache_size', 'cache_size/cache_size',
 | |
| 331 | -                           resources=[ResourceType.CACHE,
 | |
| 332 | -                                      ResourceType.PROCESS],
 | |
| 333 | -                           exclusive_resources=[ResourceType.CACHE],
 | |
| 334 | -                           complete_cb=self._run_cleanup)
 | |
| 335 | -        self.schedule_jobs([job])
 | |
| 336 | - | |
| 337 | 362 |      # _suspend_jobs()
 | 
| 338 | 363 |      #
 | 
| 339 | 364 |      # Suspend all ongoing jobs.
 | 
| ... | ... | @@ -231,7 +231,6 @@ class Element(Plugin): | 
| 231 | 231 |          self.__staged_sources_directory = None  # Location where Element.stage_sources() was called
 | 
| 232 | 232 |          self.__tainted = None                   # Whether the artifact is tainted and should not be shared
 | 
| 233 | 233 |          self.__required = False                 # Whether the artifact is required in the current session
 | 
| 234 | -        self.__artifact_size = None             # The size of data committed to the artifact cache
 | |
| 235 | 234 |  | 
| 236 | 235 |          # hash tables of loaded artifact metadata, hashed by key
 | 
| 237 | 236 |          self.__metadata_keys = {}                     # Strong and weak keys for this key
 | 
| ... | ... | @@ -1454,6 +1453,9 @@ class Element(Plugin): | 
| 1454 | 1453 |      #   - Call the public abstract methods for the build phase
 | 
| 1455 | 1454 |      #   - Cache the resulting artifact
 | 
| 1456 | 1455 |      #
 | 
| 1456 | +    # Returns:
 | |
| 1457 | +    #    (int): The size of the newly cached artifact
 | |
| 1458 | +    #
 | |
| 1457 | 1459 |      def _assemble(self):
 | 
| 1458 | 1460 |  | 
| 1459 | 1461 |          # Assert call ordering
 | 
| ... | ... | @@ -1573,12 +1575,14 @@ class Element(Plugin): | 
| 1573 | 1575 |                  }), os.path.join(metadir, 'workspaced-dependencies.yaml'))
 | 
| 1574 | 1576 |  | 
| 1575 | 1577 |                  with self.timed_activity("Caching artifact"):
 | 
| 1576 | -                    self.__artifact_size = utils._get_dir_size(assembledir)
 | |
| 1578 | +                    artifact_size = utils._get_dir_size(assembledir)
 | |
| 1577 | 1579 |                      self.__artifacts.commit(self, assembledir, self.__get_cache_keys_for_commit())
 | 
| 1578 | 1580 |  | 
| 1579 | 1581 |              # Finally cleanup the build dir
 | 
| 1580 | 1582 |              cleanup_rootdir()
 | 
| 1581 | 1583 |  | 
| 1584 | +        return artifact_size
 | |
| 1585 | + | |
| 1582 | 1586 |      # _pull_pending()
 | 
| 1583 | 1587 |      #
 | 
| 1584 | 1588 |      # Check whether the artifact will be pulled.
 | 
| ... | ... | @@ -1817,25 +1821,6 @@ class Element(Plugin): | 
| 1817 | 1821 |          workspaces = self._get_context().get_workspaces()
 | 
| 1818 | 1822 |          return workspaces.get_workspace(self._get_full_name())
 | 
| 1819 | 1823 |  | 
| 1820 | -    # _get_artifact_size()
 | |
| 1821 | -    #
 | |
| 1822 | -    # Get the size of the artifact produced by this element in the
 | |
| 1823 | -    # current pipeline - if this element has not been assembled or
 | |
| 1824 | -    # pulled, this will be None.
 | |
| 1825 | -    #
 | |
| 1826 | -    # Note that this is the size of an artifact *before* committing it
 | |
| 1827 | -    # to the cache, the size on disk may differ. It can act as an
 | |
| 1828 | -    # approximate guide for when to do a proper size calculation.
 | |
| 1829 | -    #
 | |
| 1830 | -    # Returns:
 | |
| 1831 | -    #    (int|None): The size of the artifact
 | |
| 1832 | -    #
 | |
| 1833 | -    def _get_artifact_size(self):
 | |
| 1834 | -        return self.__artifact_size
 | |
| 1835 | - | |
| 1836 | -    def _get_artifact_cache(self):
 | |
| 1837 | -        return self.__artifacts
 | |
| 1838 | - | |
| 1839 | 1824 |      # _write_script():
 | 
| 1840 | 1825 |      #
 | 
| 1841 | 1826 |      # Writes a script to the given directory.
 | 
| ... | ... | @@ -175,6 +175,22 @@ def test_keep_dependencies(cli, datafiles, tmpdir): | 
| 175 | 175 |  | 
| 176 | 176 |  | 
| 177 | 177 |  # Assert that we never delete a dependency required for a build tree
 | 
| 178 | +#
 | |
| 179 | +# NOTE: This test expects that a build will fail if it attempts to
 | |
| 180 | +#       put more artifacts in the cache than the quota can hold,
 | |
| 181 | +#       and expects that the last two elements which don't fit into
 | |
| 182 | +#       the quota wont even be built.
 | |
| 183 | +#
 | |
| 184 | +#       In real life, this will not be the case, since once we reach
 | |
| 185 | +#       the estimated quota we launch a cache size calculation job and
 | |
| 186 | +#       only launch a cleanup job when the size is calculated; and
 | |
| 187 | +#       other build tasks will be scheduled while the cache size job
 | |
| 188 | +#       is running.
 | |
| 189 | +#
 | |
| 190 | +#       This test only passes because we configure `builders` to 1,
 | |
| 191 | +#       ensuring that the cache size job runs exclusively since it
 | |
| 192 | +#       also requires a compute resource (a "builder").
 | |
| 193 | +#
 | |
| 178 | 194 |  @pytest.mark.datafiles(DATA_DIR)
 | 
| 179 | 195 |  def test_never_delete_dependencies(cli, datafiles, tmpdir):
 | 
| 180 | 196 |      project = os.path.join(datafiles.dirname, datafiles.basename)
 | 
| ... | ... | @@ -183,6 +199,9 @@ def test_never_delete_dependencies(cli, datafiles, tmpdir): | 
| 183 | 199 |      cli.configure({
 | 
| 184 | 200 |          'cache': {
 | 
| 185 | 201 |              'quota': 10000000
 | 
| 202 | +        },
 | |
| 203 | +        'scheduler': {
 | |
| 204 | +            'builders': 1
 | |
| 186 | 205 |          }
 | 
| 187 | 206 |      })
 | 
| 188 | 207 |  | 
