Tristan Van Berkom pushed to branch tristan/fix-cache-exclusivity at BuildStream / buildstream
Commits:
-
de2b8603
by Tristan Van Berkom at 2018-09-10T07:07:13Z
-
fc79fb67
by Tristan Van Berkom at 2018-09-10T07:07:13Z
-
48ebe889
by Tristan Van Berkom at 2018-09-10T07:07:13Z
-
7c937cb8
by Tristan Van Berkom at 2018-09-10T07:07:13Z
-
f1837748
by Tristan Van Berkom at 2018-09-10T07:07:27Z
-
24bbaf38
by Tristan Van Berkom at 2018-09-10T07:07:34Z
-
7b061b9f
by Tristan Van Berkom at 2018-09-10T07:07:41Z
-
b40ce761
by Tristan Van Berkom at 2018-09-10T07:07:48Z
-
74c17fac
by Tristan Van Berkom at 2018-09-10T07:08:00Z
-
6949dbd7
by Tristan Van Berkom at 2018-09-10T07:14:51Z
-
04e009b5
by Tristan Van Berkom at 2018-09-10T07:15:16Z
-
471b58b7
by Tristan Van Berkom at 2018-09-10T07:15:20Z
-
0db41047
by Tristan Van Berkom at 2018-09-10T07:16:01Z
-
446654a2
by Tristan Van Berkom at 2018-09-10T07:16:27Z
12 changed files:
- buildstream/_artifactcache/artifactcache.py
- buildstream/_artifactcache/cascache.py
- buildstream/_scheduler/jobs/cachesizejob.py
- buildstream/_scheduler/jobs/cleanupjob.py
- buildstream/_scheduler/jobs/elementjob.py
- buildstream/_scheduler/queues/buildqueue.py
- buildstream/_scheduler/queues/pullqueue.py
- buildstream/_scheduler/queues/queue.py
- buildstream/_scheduler/resources.py
- buildstream/_scheduler/scheduler.py
- buildstream/element.py
- tests/artifactcache/expiry.py
Changes:
... | ... | @@ -81,19 +81,16 @@ ArtifactCacheSpec.__new__.__defaults__ = (None, None, None) |
81 | 81 |
class ArtifactCache():
|
82 | 82 |
def __init__(self, context):
|
83 | 83 |
self.context = context
|
84 |
- self.required_artifacts = set()
|
|
85 | 84 |
self.extractdir = os.path.join(context.artifactdir, 'extract')
|
86 | 85 |
self.tmpdir = os.path.join(context.artifactdir, 'tmp')
|
87 | 86 |
|
88 |
- self.estimated_size = None
|
|
89 |
- |
|
90 | 87 |
self.global_remote_specs = []
|
91 | 88 |
self.project_remote_specs = {}
|
92 | 89 |
|
93 |
- self._local = False
|
|
94 |
- self.cache_size = None
|
|
95 |
- self.cache_quota = None
|
|
96 |
- self.cache_lower_threshold = None
|
|
90 |
+ self._required_artifacts = set() # The artifacts required for this session
|
|
91 |
+ self._cache_size = None # The current cache size, sometimes it's an estimate
|
|
92 |
+ self._cache_quota = None # The cache quota
|
|
93 |
+ self._cache_lower_threshold = None # The target cache size for a cleanup
|
|
97 | 94 |
|
98 | 95 |
os.makedirs(self.extractdir, exist_ok=True)
|
99 | 96 |
os.makedirs(self.tmpdir, exist_ok=True)
|
... | ... | @@ -212,8 +209,8 @@ class ArtifactCache(): |
212 | 209 |
weak_key = element._get_cache_key(strength=_KeyStrength.WEAK)
|
213 | 210 |
|
214 | 211 |
for key in (strong_key, weak_key):
|
215 |
- if key and key not in self.required_artifacts:
|
|
216 |
- self.required_artifacts.add(key)
|
|
212 |
+ if key and key not in self._required_artifacts:
|
|
213 |
+ self._required_artifacts.add(key)
|
|
217 | 214 |
|
218 | 215 |
# We also update the usage times of any artifacts
|
219 | 216 |
# we will be using, which helps preventing a
|
... | ... | @@ -228,10 +225,16 @@ class ArtifactCache(): |
228 | 225 |
#
|
229 | 226 |
# Clean the artifact cache as much as possible.
|
230 | 227 |
#
|
228 |
+ # Returns:
|
|
229 |
+ # (int): The size of the cache after having cleaned up
|
|
230 |
+ #
|
|
231 | 231 |
def clean(self):
|
232 | 232 |
artifacts = self.list_artifacts()
|
233 | 233 |
|
234 |
- while self.calculate_cache_size() >= self.cache_quota - self.cache_lower_threshold:
|
|
234 |
+ # Do a real computation of the cache size once, just in case
|
|
235 |
+ self.compute_cache_size()
|
|
236 |
+ |
|
237 |
+ while self.get_cache_size() >= self._cache_lower_threshold:
|
|
235 | 238 |
try:
|
236 | 239 |
to_remove = artifacts.pop(0)
|
237 | 240 |
except IndexError:
|
... | ... | @@ -245,7 +248,7 @@ class ArtifactCache(): |
245 | 248 |
"Please increase the cache-quota in {}."
|
246 | 249 |
.format(self.context.config_origin or default_conf))
|
247 | 250 |
|
248 |
- if self.calculate_cache_size() > self.cache_quota:
|
|
251 |
+ if self.get_quota_exceeded():
|
|
249 | 252 |
raise ArtifactError("Cache too full. Aborting.",
|
250 | 253 |
detail=detail,
|
251 | 254 |
reason="cache-too-full")
|
... | ... | @@ -253,46 +256,94 @@ class ArtifactCache(): |
253 | 256 |
break
|
254 | 257 |
|
255 | 258 |
key = to_remove.rpartition('/')[2]
|
256 |
- if key not in self.required_artifacts:
|
|
259 |
+ if key not in self._required_artifacts:
|
|
260 |
+ |
|
261 |
+ # Remove the actual artifact, if it's not required.
|
|
257 | 262 |
size = self.remove(to_remove)
|
258 |
- if size:
|
|
259 |
- self.cache_size -= size
|
|
263 |
+ |
|
264 |
+ # Remove the size from the removed size
|
|
265 |
+ self.set_cache_size(self._cache_size - size)
|
|
260 | 266 |
|
261 | 267 |
# This should be O(1) if implemented correctly
|
262 |
- return self.calculate_cache_size()
|
|
268 |
+ return self.get_cache_size()
|
|
263 | 269 |
|
264 |
- # get_approximate_cache_size()
|
|
270 |
+ # compute_cache_size()
|
|
265 | 271 |
#
|
266 |
- # A cheap method that aims to serve as an upper limit on the
|
|
267 |
- # artifact cache size.
|
|
272 |
+ # Computes the real artifact cache size by calling
|
|
273 |
+ # the abstract calculate_cache_size() method.
|
|
268 | 274 |
#
|
269 |
- # The cache size reported by this function will normally be larger
|
|
270 |
- # than the real cache size, since it is calculated using the
|
|
271 |
- # pre-commit artifact size, but for very small artifacts in
|
|
272 |
- # certain caches additional overhead could cause this to be
|
|
273 |
- # smaller than, but close to, the actual size.
|
|
275 |
+ # Returns:
|
|
276 |
+ # (int): The size of the artifact cache.
|
|
277 |
+ #
|
|
278 |
+ def compute_cache_size(self):
|
|
279 |
+ self._cache_size = self.calculate_cache_size()
|
|
280 |
+ |
|
281 |
+ return self._cache_size
|
|
282 |
+ |
|
283 |
+ # add_artifact_size()
|
|
274 | 284 |
#
|
275 |
- # Nonetheless, in practice this should be safe to use as an upper
|
|
276 |
- # limit on the cache size.
|
|
285 |
+ # Adds the reported size of a newly cached artifact to the
|
|
286 |
+ # overall estimated size.
|
|
277 | 287 |
#
|
278 |
- # If the cache has built-in constant-time size reporting, please
|
|
279 |
- # feel free to override this method with a more accurate
|
|
280 |
- # implementation.
|
|
288 |
+ # Args:
|
|
289 |
+ # artifact_size (int): The size to add.
|
|
290 |
+ #
|
|
291 |
+ def add_artifact_size(self, artifact_size):
|
|
292 |
+ cache_size = self.get_cache_size()
|
|
293 |
+ cache_size += artifact_size
|
|
294 |
+ |
|
295 |
+ self.set_cache_size(cache_size)
|
|
296 |
+ |
|
297 |
+ # get_cache_size()
|
|
298 |
+ #
|
|
299 |
+ # Fetches the cached size of the cache, this is sometimes
|
|
300 |
+ # an estimate and periodically adjusted to the real size
|
|
301 |
+ # when a cache size calculation job runs.
|
|
302 |
+ #
|
|
303 |
+ # When it is an estimate, the value is either correct, or
|
|
304 |
+ # it is greater than the actual cache size.
|
|
281 | 305 |
#
|
282 | 306 |
# Returns:
|
283 | 307 |
# (int) An approximation of the artifact cache size.
|
284 | 308 |
#
|
285 |
- def get_approximate_cache_size(self):
|
|
286 |
- # If we don't currently have an estimate, figure out the real
|
|
287 |
- # cache size.
|
|
288 |
- if self.estimated_size is None:
|
|
309 |
+ def get_cache_size(self):
|
|
310 |
+ |
|
311 |
+ # If we don't currently have an estimate, figure out the real cache size.
|
|
312 |
+ if self._cache_size is None:
|
|
289 | 313 |
stored_size = self._read_cache_size()
|
290 | 314 |
if stored_size is not None:
|
291 |
- self.estimated_size = stored_size
|
|
315 |
+ self._cache_size = stored_size
|
|
292 | 316 |
else:
|
293 |
- self.estimated_size = self.calculate_cache_size()
|
|
317 |
+ self.compute_cache_size()
|
|
318 |
+ |
|
319 |
+ return self._cache_size
|
|
294 | 320 |
|
295 |
- return self.estimated_size
|
|
321 |
+ # set_cache_size()
|
|
322 |
+ #
|
|
323 |
+ # Forcefully set the overall cache size.
|
|
324 |
+ #
|
|
325 |
+ # This is used to update the size in the main process after
|
|
326 |
+ # having calculated in a cleanup or a cache size calculation job.
|
|
327 |
+ #
|
|
328 |
+ # Args:
|
|
329 |
+ # cache_size (int): The size to set.
|
|
330 |
+ #
|
|
331 |
+ def set_cache_size(self, cache_size):
|
|
332 |
+ |
|
333 |
+ assert cache_size is not None
|
|
334 |
+ |
|
335 |
+ self._cache_size = cache_size
|
|
336 |
+ self._write_cache_size(self._cache_size)
|
|
337 |
+ |
|
338 |
+ # get_quota_exceeded()
|
|
339 |
+ #
|
|
340 |
+ # Checks if the current artifact cache size exceeds the quota.
|
|
341 |
+ #
|
|
342 |
+ # Returns:
|
|
343 |
+ # (bool): True of the quota is exceeded
|
|
344 |
+ #
|
|
345 |
+ def get_quota_exceeded(self):
|
|
346 |
+ return self.get_cache_size() > self._cache_quota
|
|
296 | 347 |
|
297 | 348 |
################################################
|
298 | 349 |
# Abstract methods for subclasses to implement #
|
... | ... | @@ -484,11 +535,8 @@ class ArtifactCache(): |
484 | 535 |
#
|
485 | 536 |
# Return the real artifact cache size.
|
486 | 537 |
#
|
487 |
- # Implementations should also use this to update estimated_size.
|
|
488 |
- #
|
|
489 | 538 |
# Returns:
|
490 |
- #
|
|
491 |
- # (int) The size of the artifact cache.
|
|
539 |
+ # (int): The size of the artifact cache.
|
|
492 | 540 |
#
|
493 | 541 |
def calculate_cache_size(self):
|
494 | 542 |
raise ImplError("Cache '{kind}' does not implement calculate_cache_size()"
|
... | ... | @@ -535,39 +583,13 @@ class ArtifactCache(): |
535 | 583 |
with self.context.timed_activity("Initializing remote caches", silent_nested=True):
|
536 | 584 |
self.initialize_remotes(on_failure=remote_failed)
|
537 | 585 |
|
538 |
- # _add_artifact_size()
|
|
539 |
- #
|
|
540 |
- # Since we cannot keep track of the cache size between threads,
|
|
541 |
- # this method will be called by the main process every time a
|
|
542 |
- # process that added something to the cache finishes.
|
|
543 |
- #
|
|
544 |
- # This will then add the reported size to
|
|
545 |
- # ArtifactCache.estimated_size.
|
|
546 |
- #
|
|
547 |
- def _add_artifact_size(self, artifact_size):
|
|
548 |
- if not self.estimated_size:
|
|
549 |
- self.estimated_size = self.calculate_cache_size()
|
|
550 |
- |
|
551 |
- self.estimated_size += artifact_size
|
|
552 |
- self._write_cache_size(self.estimated_size)
|
|
553 |
- |
|
554 |
- # _set_cache_size()
|
|
555 |
- #
|
|
556 |
- # Similarly to the above method, when we calculate the actual size
|
|
557 |
- # in a child thread, we can't update it. We instead pass the value
|
|
558 |
- # back to the main thread and update it there.
|
|
559 |
- #
|
|
560 |
- def _set_cache_size(self, cache_size):
|
|
561 |
- self.estimated_size = cache_size
|
|
562 |
- |
|
563 |
- # set_cache_size is called in cleanup, where it may set the cache to None
|
|
564 |
- if self.estimated_size is not None:
|
|
565 |
- self._write_cache_size(self.estimated_size)
|
|
566 |
- |
|
567 | 586 |
# _write_cache_size()
|
568 | 587 |
#
|
569 | 588 |
# Writes the given size of the artifact to the cache's size file
|
570 | 589 |
#
|
590 |
+ # Args:
|
|
591 |
+ # size (int): The size of the artifact cache to record
|
|
592 |
+ #
|
|
571 | 593 |
def _write_cache_size(self, size):
|
572 | 594 |
assert isinstance(size, int)
|
573 | 595 |
size_file_path = os.path.join(self.context.artifactdir, CACHE_SIZE_FILE)
|
... | ... | @@ -579,6 +601,9 @@ class ArtifactCache(): |
579 | 601 |
# Reads and returns the size of the artifact cache that's stored in the
|
580 | 602 |
# cache's size file
|
581 | 603 |
#
|
604 |
+ # Returns:
|
|
605 |
+ # (int): The size of the artifact cache, as recorded in the file
|
|
606 |
+ #
|
|
582 | 607 |
def _read_cache_size(self):
|
583 | 608 |
size_file_path = os.path.join(self.context.artifactdir, CACHE_SIZE_FILE)
|
584 | 609 |
|
... | ... | @@ -628,13 +653,13 @@ class ArtifactCache(): |
628 | 653 |
stat = os.statvfs(artifactdir_volume)
|
629 | 654 |
available_space = (stat.f_bsize * stat.f_bavail)
|
630 | 655 |
|
631 |
- cache_size = self.get_approximate_cache_size()
|
|
656 |
+ cache_size = self.get_cache_size()
|
|
632 | 657 |
|
633 | 658 |
# Ensure system has enough storage for the cache_quota
|
634 | 659 |
#
|
635 | 660 |
# If cache_quota is none, set it to the maximum it could possibly be.
|
636 | 661 |
#
|
637 |
- # Also check that cache_quota is atleast as large as our headroom.
|
|
662 |
+ # Also check that cache_quota is at least as large as our headroom.
|
|
638 | 663 |
#
|
639 | 664 |
if cache_quota is None: # Infinity, set to max system storage
|
640 | 665 |
cache_quota = cache_size + available_space
|
... | ... | @@ -660,8 +685,8 @@ class ArtifactCache(): |
660 | 685 |
# if we end up writing more than 2G, but hey, this stuff is
|
661 | 686 |
# already really fuzzy.
|
662 | 687 |
#
|
663 |
- self.cache_quota = cache_quota - headroom
|
|
664 |
- self.cache_lower_threshold = self.cache_quota / 2
|
|
688 |
+ self._cache_quota = cache_quota - headroom
|
|
689 |
+ self._cache_lower_threshold = self._cache_quota / 2
|
|
665 | 690 |
|
666 | 691 |
|
667 | 692 |
# _configured_remote_artifact_cache_specs():
|
... | ... | @@ -120,8 +120,6 @@ class CASCache(ArtifactCache): |
120 | 120 |
for ref in refs:
|
121 | 121 |
self.set_ref(ref, tree)
|
122 | 122 |
|
123 |
- self.cache_size = None
|
|
124 |
- |
|
125 | 123 |
def diff(self, element, key_a, key_b, *, subdir=None):
|
126 | 124 |
ref_a = self.get_artifact_fullname(element, key_a)
|
127 | 125 |
ref_b = self.get_artifact_fullname(element, key_b)
|
... | ... | @@ -488,11 +486,7 @@ class CASCache(ArtifactCache): |
488 | 486 |
raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e
|
489 | 487 |
|
490 | 488 |
def calculate_cache_size(self):
|
491 |
- if self.cache_size is None:
|
|
492 |
- self.cache_size = utils._get_dir_size(self.casdir)
|
|
493 |
- self.estimated_size = self.cache_size
|
|
494 |
- |
|
495 |
- return self.cache_size
|
|
489 |
+ return utils._get_dir_size(self.casdir)
|
|
496 | 490 |
|
497 | 491 |
# list_artifacts():
|
498 | 492 |
#
|
... | ... | @@ -24,15 +24,19 @@ class CacheSizeJob(Job): |
24 | 24 |
def __init__(self, *args, complete_cb, **kwargs):
|
25 | 25 |
super().__init__(*args, **kwargs)
|
26 | 26 |
self._complete_cb = complete_cb
|
27 |
- self._cache = Platform._instance.artifactcache
|
|
27 |
+ |
|
28 |
+ platform = Platform.get_platform()
|
|
29 |
+ self._artifacts = platform.artifactcache
|
|
28 | 30 |
|
29 | 31 |
def child_process(self):
|
30 |
- return self._cache.calculate_cache_size()
|
|
32 |
+ return self._artifacts.compute_cache_size()
|
|
31 | 33 |
|
32 | 34 |
def parent_complete(self, success, result):
|
33 |
- self._cache._set_cache_size(result)
|
|
34 |
- if self._complete_cb:
|
|
35 |
- self._complete_cb(result)
|
|
35 |
+ if success:
|
|
36 |
+ self._artifacts.set_cache_size(result)
|
|
37 |
+ |
|
38 |
+ if self._complete_cb:
|
|
39 |
+ self._complete_cb(result)
|
|
36 | 40 |
|
37 | 41 |
def child_process_data(self):
|
38 | 42 |
return {}
|
... | ... | @@ -24,15 +24,19 @@ class CleanupJob(Job): |
24 | 24 |
def __init__(self, *args, complete_cb, **kwargs):
|
25 | 25 |
super().__init__(*args, **kwargs)
|
26 | 26 |
self._complete_cb = complete_cb
|
27 |
- self._cache = Platform._instance.artifactcache
|
|
27 |
+ |
|
28 |
+ platform = Platform.get_platform()
|
|
29 |
+ self._artifacts = platform.artifactcache
|
|
28 | 30 |
|
29 | 31 |
def child_process(self):
|
30 |
- return self._cache.clean()
|
|
32 |
+ return self._artifacts.clean()
|
|
31 | 33 |
|
32 | 34 |
def parent_complete(self, success, result):
|
33 |
- self._cache._set_cache_size(result)
|
|
34 |
- if self._complete_cb:
|
|
35 |
- self._complete_cb()
|
|
35 |
+ if success:
|
|
36 |
+ self._artifacts.set_cache_size(result)
|
|
37 |
+ |
|
38 |
+ if self._complete_cb:
|
|
39 |
+ self._complete_cb()
|
|
36 | 40 |
|
37 | 41 |
def child_process_data(self):
|
38 | 42 |
return {}
|
... | ... | @@ -109,13 +109,7 @@ class ElementJob(Job): |
109 | 109 |
data = {}
|
110 | 110 |
|
111 | 111 |
workspace = self._element._get_workspace()
|
112 |
- artifact_size = self._element._get_artifact_size()
|
|
113 |
- cache_size = self._element._get_artifact_cache().calculate_cache_size()
|
|
114 |
- |
|
115 | 112 |
if workspace is not None:
|
116 | 113 |
data['workspace'] = workspace.to_dict()
|
117 |
- if artifact_size is not None:
|
|
118 |
- data['artifact_size'] = artifact_size
|
|
119 |
- data['cache_size'] = cache_size
|
|
120 | 114 |
|
121 | 115 |
return data
|
... | ... | @@ -24,6 +24,7 @@ from . import Queue, QueueStatus |
24 | 24 |
from ..jobs import ElementJob
|
25 | 25 |
from ..resources import ResourceType
|
26 | 26 |
from ..._message import MessageType
|
27 |
+from ..._platform import Platform
|
|
27 | 28 |
|
28 | 29 |
|
29 | 30 |
# A queue which assembles elements
|
... | ... | @@ -32,7 +33,7 @@ class BuildQueue(Queue): |
32 | 33 |
|
33 | 34 |
action_name = "Build"
|
34 | 35 |
complete_name = "Built"
|
35 |
- resources = [ResourceType.PROCESS]
|
|
36 |
+ resources = [ResourceType.PROCESS, ResourceType.CACHE]
|
|
36 | 37 |
|
37 | 38 |
def __init__(self, *args, **kwargs):
|
38 | 39 |
super().__init__(*args, **kwargs)
|
... | ... | @@ -67,8 +68,7 @@ class BuildQueue(Queue): |
67 | 68 |
return super().enqueue(to_queue)
|
68 | 69 |
|
69 | 70 |
def process(self, element):
|
70 |
- element._assemble()
|
|
71 |
- return element._get_unique_id()
|
|
71 |
+ return element._assemble()
|
|
72 | 72 |
|
73 | 73 |
def status(self, element):
|
74 | 74 |
# state of dependencies may have changed, recalculate element state
|
... | ... | @@ -87,18 +87,22 @@ class BuildQueue(Queue): |
87 | 87 |
|
88 | 88 |
return QueueStatus.READY
|
89 | 89 |
|
90 |
- def _check_cache_size(self, job, element):
|
|
91 |
- if not job.child_data:
|
|
92 |
- return
|
|
90 |
+ def _check_cache_size(self, job, element, artifact_size):
|
|
93 | 91 |
|
94 |
- artifact_size = job.child_data.get('artifact_size', False)
|
|
92 |
+ # After completing a build job, add the artifact size
|
|
93 |
+ # as returned from Element._assemble() to the estimated
|
|
94 |
+ # artifact cache size
|
|
95 |
+ #
|
|
96 |
+ platform = Platform.get_platform()
|
|
97 |
+ artifacts = platform.artifactcache
|
|
95 | 98 |
|
96 |
- if artifact_size:
|
|
97 |
- cache = element._get_artifact_cache()
|
|
98 |
- cache._add_artifact_size(artifact_size)
|
|
99 |
+ artifacts.add_artifact_size(artifact_size)
|
|
99 | 100 |
|
100 |
- if cache.get_approximate_cache_size() > cache.cache_quota:
|
|
101 |
- self._scheduler._check_cache_size_real()
|
|
101 |
+ # If the estimated size outgrows the quota, ask the scheduler
|
|
102 |
+ # to queue a job to actually check the real cache size.
|
|
103 |
+ #
|
|
104 |
+ if artifacts.get_quota_exceeded():
|
|
105 |
+ self._scheduler.check_cache_size()
|
|
102 | 106 |
|
103 | 107 |
def done(self, job, element, result, success):
|
104 | 108 |
|
... | ... | @@ -106,8 +110,8 @@ class BuildQueue(Queue): |
106 | 110 |
# Inform element in main process that assembly is done
|
107 | 111 |
element._assemble_done()
|
108 | 112 |
|
109 |
- # This has to be done after _assemble_done, such that the
|
|
110 |
- # element may register its cache key as required
|
|
111 |
- self._check_cache_size(job, element)
|
|
113 |
+ # This has to be done after _assemble_done, such that the
|
|
114 |
+ # element may register its cache key as required
|
|
115 |
+ self._check_cache_size(job, element, result)
|
|
112 | 116 |
|
113 | 117 |
return True
|
... | ... | @@ -29,7 +29,7 @@ class PullQueue(Queue): |
29 | 29 |
|
30 | 30 |
action_name = "Pull"
|
31 | 31 |
complete_name = "Pulled"
|
32 |
- resources = [ResourceType.DOWNLOAD]
|
|
32 |
+ resources = [ResourceType.DOWNLOAD, ResourceType.CACHE]
|
|
33 | 33 |
|
34 | 34 |
def process(self, element):
|
35 | 35 |
# returns whether an artifact was downloaded or not
|
... | ... | @@ -62,7 +62,7 @@ class PullQueue(Queue): |
62 | 62 |
# Build jobs will check the "approximate" size first. Since we
|
63 | 63 |
# do not get an artifact size from pull jobs, we have to
|
64 | 64 |
# actually check the cache size.
|
65 |
- self._scheduler._check_cache_size_real()
|
|
65 |
+ self._scheduler.check_cache_size()
|
|
66 | 66 |
|
67 | 67 |
# Element._pull() returns True if it downloaded an artifact,
|
68 | 68 |
# here we want to appear skipped if we did not download.
|
... | ... | @@ -301,8 +301,6 @@ class Queue(): |
301 | 301 |
# Update values that need to be synchronized in the main task
|
302 | 302 |
# before calling any queue implementation
|
303 | 303 |
self._update_workspaces(element, job)
|
304 |
- if job.child_data:
|
|
305 |
- element._get_artifact_cache().cache_size = job.child_data.get('cache_size')
|
|
306 | 304 |
|
307 | 305 |
# Give the result of the job to the Queue implementor,
|
308 | 306 |
# and determine if it should be considered as processed
|
... | ... | @@ -8,7 +8,7 @@ class ResourceType(): |
8 | 8 |
class Resources():
|
9 | 9 |
def __init__(self, num_builders, num_fetchers, num_pushers):
|
10 | 10 |
self._max_resources = {
|
11 |
- ResourceType.CACHE: 1,
|
|
11 |
+ ResourceType.CACHE: 0,
|
|
12 | 12 |
ResourceType.DOWNLOAD: num_fetchers,
|
13 | 13 |
ResourceType.PROCESS: num_builders,
|
14 | 14 |
ResourceType.UPLOAD: num_pushers
|
... | ... | @@ -241,6 +241,25 @@ class Scheduler(): |
241 | 241 |
self._schedule_queue_jobs()
|
242 | 242 |
self._sched()
|
243 | 243 |
|
244 |
+ # check_cache_size():
|
|
245 |
+ #
|
|
246 |
+ # Queues a cache size calculation job, after the cache
|
|
247 |
+ # size is calculated, a cleanup job will be run automatically
|
|
248 |
+ # if needed.
|
|
249 |
+ #
|
|
250 |
+ # FIXME: This should ensure that only one cache size job
|
|
251 |
+ # is ever pending at a given time. If a cache size
|
|
252 |
+ # job is already running, it is correct to queue
|
|
253 |
+ # a new one, it is incorrect to have more than one
|
|
254 |
+ # of these jobs pending at a given time, though.
|
|
255 |
+ #
|
|
256 |
+ def check_cache_size(self):
|
|
257 |
+ job = CacheSizeJob(self, 'cache_size', 'cache_size/cache_size',
|
|
258 |
+ resources=[ResourceType.CACHE,
|
|
259 |
+ ResourceType.PROCESS],
|
|
260 |
+ complete_cb=self._run_cleanup)
|
|
261 |
+ self.schedule_jobs([job])
|
|
262 |
+ |
|
244 | 263 |
#######################################################
|
245 | 264 |
# Local Private Methods #
|
246 | 265 |
#######################################################
|
... | ... | @@ -316,26 +335,32 @@ class Scheduler(): |
316 | 335 |
self.schedule_jobs(ready)
|
317 | 336 |
self._sched()
|
318 | 337 |
|
338 |
+ # _run_cleanup()
|
|
339 |
+ #
|
|
340 |
+ # Schedules the cache cleanup job if the passed size
|
|
341 |
+ # exceeds the cache quota.
|
|
342 |
+ #
|
|
343 |
+ # Args:
|
|
344 |
+ # cache_size (int): The calculated cache size (ignored)
|
|
345 |
+ #
|
|
346 |
+ # NOTE: This runs in response to completion of the cache size
|
|
347 |
+ # calculation job lauched by Scheduler.check_cache_size(),
|
|
348 |
+ # which will report the calculated cache size.
|
|
349 |
+ #
|
|
319 | 350 |
def _run_cleanup(self, cache_size):
|
320 | 351 |
platform = Platform.get_platform()
|
321 |
- if cache_size and cache_size < platform.artifactcache.cache_quota:
|
|
352 |
+ artifacts = platform.artifactcache
|
|
353 |
+ |
|
354 |
+ if not artifacts.get_quota_exceeded():
|
|
322 | 355 |
return
|
323 | 356 |
|
324 |
- job = CleanupJob(self, 'cleanup', 'cleanup',
|
|
357 |
+ job = CleanupJob(self, 'cleanup', 'cleanup/cleanup',
|
|
325 | 358 |
resources=[ResourceType.CACHE,
|
326 | 359 |
ResourceType.PROCESS],
|
327 | 360 |
exclusive_resources=[ResourceType.CACHE],
|
328 | 361 |
complete_cb=None)
|
329 | 362 |
self.schedule_jobs([job])
|
330 | 363 |
|
331 |
- def _check_cache_size_real(self):
|
|
332 |
- job = CacheSizeJob(self, 'cache_size', 'cache_size/cache_size',
|
|
333 |
- resources=[ResourceType.CACHE,
|
|
334 |
- ResourceType.PROCESS],
|
|
335 |
- exclusive_resources=[ResourceType.CACHE],
|
|
336 |
- complete_cb=self._run_cleanup)
|
|
337 |
- self.schedule_jobs([job])
|
|
338 |
- |
|
339 | 364 |
# _suspend_jobs()
|
340 | 365 |
#
|
341 | 366 |
# Suspend all ongoing jobs.
|
... | ... | @@ -212,7 +212,6 @@ class Element(Plugin): |
212 | 212 |
self.__staged_sources_directory = None # Location where Element.stage_sources() was called
|
213 | 213 |
self.__tainted = None # Whether the artifact is tainted and should not be shared
|
214 | 214 |
self.__required = False # Whether the artifact is required in the current session
|
215 |
- self.__artifact_size = None # The size of data committed to the artifact cache
|
|
216 | 215 |
self.__build_result = None # The result of assembling this Element
|
217 | 216 |
self._build_log_path = None # The path of the build log for this Element
|
218 | 217 |
|
... | ... | @@ -1502,6 +1501,9 @@ class Element(Plugin): |
1502 | 1501 |
# - Call the public abstract methods for the build phase
|
1503 | 1502 |
# - Cache the resulting artifact
|
1504 | 1503 |
#
|
1504 |
+ # Returns:
|
|
1505 |
+ # (int): The size of the newly cached artifact
|
|
1506 |
+ #
|
|
1505 | 1507 |
def _assemble(self):
|
1506 | 1508 |
|
1507 | 1509 |
# Assert call ordering
|
... | ... | @@ -1646,7 +1648,7 @@ class Element(Plugin): |
1646 | 1648 |
}), os.path.join(metadir, 'workspaced-dependencies.yaml'))
|
1647 | 1649 |
|
1648 | 1650 |
with self.timed_activity("Caching artifact"):
|
1649 |
- self.__artifact_size = utils._get_dir_size(assembledir)
|
|
1651 |
+ artifact_size = utils._get_dir_size(assembledir)
|
|
1650 | 1652 |
self.__artifacts.commit(self, assembledir, self.__get_cache_keys_for_commit())
|
1651 | 1653 |
|
1652 | 1654 |
if collect is not None and collectvdir is None:
|
... | ... | @@ -1658,6 +1660,8 @@ class Element(Plugin): |
1658 | 1660 |
# Finally cleanup the build dir
|
1659 | 1661 |
cleanup_rootdir()
|
1660 | 1662 |
|
1663 |
+ return artifact_size
|
|
1664 |
+ |
|
1661 | 1665 |
def _get_build_log(self):
|
1662 | 1666 |
return self._build_log_path
|
1663 | 1667 |
|
... | ... | @@ -1899,25 +1903,6 @@ class Element(Plugin): |
1899 | 1903 |
workspaces = self._get_context().get_workspaces()
|
1900 | 1904 |
return workspaces.get_workspace(self._get_full_name())
|
1901 | 1905 |
|
1902 |
- # _get_artifact_size()
|
|
1903 |
- #
|
|
1904 |
- # Get the size of the artifact produced by this element in the
|
|
1905 |
- # current pipeline - if this element has not been assembled or
|
|
1906 |
- # pulled, this will be None.
|
|
1907 |
- #
|
|
1908 |
- # Note that this is the size of an artifact *before* committing it
|
|
1909 |
- # to the cache, the size on disk may differ. It can act as an
|
|
1910 |
- # approximate guide for when to do a proper size calculation.
|
|
1911 |
- #
|
|
1912 |
- # Returns:
|
|
1913 |
- # (int|None): The size of the artifact
|
|
1914 |
- #
|
|
1915 |
- def _get_artifact_size(self):
|
|
1916 |
- return self.__artifact_size
|
|
1917 |
- |
|
1918 |
- def _get_artifact_cache(self):
|
|
1919 |
- return self.__artifacts
|
|
1920 |
- |
|
1921 | 1906 |
# _write_script():
|
1922 | 1907 |
#
|
1923 | 1908 |
# Writes a script to the given directory.
|
... | ... | @@ -196,6 +196,22 @@ def test_keep_dependencies(cli, datafiles, tmpdir): |
196 | 196 |
|
197 | 197 |
|
198 | 198 |
# Assert that we never delete a dependency required for a build tree
|
199 |
+#
|
|
200 |
+# NOTE: This test expects that a build will fail if it attempts to
|
|
201 |
+# put more artifacts in the cache than the quota can hold,
|
|
202 |
+# and expects that the last two elements which don't fit into
|
|
203 |
+# the quota wont even be built.
|
|
204 |
+#
|
|
205 |
+# In real life, this will not be the case, since once we reach
|
|
206 |
+# the estimated quota we launch a cache size calculation job and
|
|
207 |
+# only launch a cleanup job when the size is calculated; and
|
|
208 |
+# other build tasks will be scheduled while the cache size job
|
|
209 |
+# is running.
|
|
210 |
+#
|
|
211 |
+# This test only passes because we configure `builders` to 1,
|
|
212 |
+# ensuring that the cache size job runs exclusively since it
|
|
213 |
+# also requires a compute resource (a "builder").
|
|
214 |
+#
|
|
199 | 215 |
@pytest.mark.datafiles(DATA_DIR)
|
200 | 216 |
def test_never_delete_dependencies(cli, datafiles, tmpdir):
|
201 | 217 |
project = os.path.join(datafiles.dirname, datafiles.basename)
|
... | ... | @@ -204,6 +220,9 @@ def test_never_delete_dependencies(cli, datafiles, tmpdir): |
204 | 220 |
cli.configure({
|
205 | 221 |
'cache': {
|
206 | 222 |
'quota': 10000000
|
223 |
+ },
|
|
224 |
+ 'scheduler': {
|
|
225 |
+ 'builders': 1
|
|
207 | 226 |
}
|
208 | 227 |
})
|
209 | 228 |
|