Tristan Van Berkom pushed to branch bst-1.2 at BuildStream / buildstream
Commits:
-
80e912e6
by Tristan Van Berkom at 2018-09-10T06:56:11Z
-
f81637c5
by Tristan Van Berkom at 2018-09-10T06:56:11Z
-
f074a8ac
by Tristan Van Berkom at 2018-09-10T06:56:11Z
-
d2f5c4f3
by Tristan Van Berkom at 2018-09-10T06:56:12Z
-
eb9eb6db
by Tristan Van Berkom at 2018-09-10T06:56:12Z
-
8293d8da
by Tristan Van Berkom at 2018-09-10T06:56:12Z
-
879d5117
by Tristan Van Berkom at 2018-09-10T06:56:12Z
-
2c898796
by Tristan Van Berkom at 2018-09-10T06:56:12Z
-
55266475
by Tristan Van Berkom at 2018-09-10T06:56:12Z
-
af0bfaee
by Tristan Van Berkom at 2018-09-10T06:56:12Z
-
222c5034
by Tristan Van Berkom at 2018-09-10T06:56:12Z
-
67fa96e1
by Tristan Van Berkom at 2018-09-10T06:56:12Z
-
303240ab
by Tristan Van Berkom at 2018-09-10T06:56:12Z
-
78499b7d
by Tristan Van Berkom at 2018-09-10T06:56:12Z
-
32d0ce66
by Tristan Van Berkom at 2018-09-10T09:22:57Z
12 changed files:
- buildstream/_artifactcache/artifactcache.py
- buildstream/_artifactcache/cascache.py
- buildstream/_scheduler/jobs/cachesizejob.py
- buildstream/_scheduler/jobs/cleanupjob.py
- buildstream/_scheduler/jobs/elementjob.py
- buildstream/_scheduler/queues/buildqueue.py
- buildstream/_scheduler/queues/pullqueue.py
- buildstream/_scheduler/queues/queue.py
- buildstream/_scheduler/resources.py
- buildstream/_scheduler/scheduler.py
- buildstream/element.py
- tests/artifactcache/expiry.py
Changes:
... | ... | @@ -81,19 +81,16 @@ ArtifactCacheSpec.__new__.__defaults__ = (None, None, None) |
81 | 81 |
class ArtifactCache():
|
82 | 82 |
def __init__(self, context):
|
83 | 83 |
self.context = context
|
84 |
- self.required_artifacts = set()
|
|
85 | 84 |
self.extractdir = os.path.join(context.artifactdir, 'extract')
|
86 | 85 |
self.tmpdir = os.path.join(context.artifactdir, 'tmp')
|
87 | 86 |
|
88 |
- self.estimated_size = None
|
|
89 |
- |
|
90 | 87 |
self.global_remote_specs = []
|
91 | 88 |
self.project_remote_specs = {}
|
92 | 89 |
|
93 |
- self._local = False
|
|
94 |
- self.cache_size = None
|
|
95 |
- self.cache_quota = None
|
|
96 |
- self.cache_lower_threshold = None
|
|
90 |
+ self._required_artifacts = set() # The artifacts required for this session
|
|
91 |
+ self._cache_size = None # The current cache size, sometimes it's an estimate
|
|
92 |
+ self._cache_quota = None # The cache quota
|
|
93 |
+ self._cache_lower_threshold = None # The target cache size for a cleanup
|
|
97 | 94 |
|
98 | 95 |
os.makedirs(self.extractdir, exist_ok=True)
|
99 | 96 |
os.makedirs(self.tmpdir, exist_ok=True)
|
... | ... | @@ -212,8 +209,8 @@ class ArtifactCache(): |
212 | 209 |
weak_key = element._get_cache_key(strength=_KeyStrength.WEAK)
|
213 | 210 |
|
214 | 211 |
for key in (strong_key, weak_key):
|
215 |
- if key and key not in self.required_artifacts:
|
|
216 |
- self.required_artifacts.add(key)
|
|
212 |
+ if key and key not in self._required_artifacts:
|
|
213 |
+ self._required_artifacts.add(key)
|
|
217 | 214 |
|
218 | 215 |
# We also update the usage times of any artifacts
|
219 | 216 |
# we will be using, which helps preventing a
|
... | ... | @@ -228,10 +225,16 @@ class ArtifactCache(): |
228 | 225 |
#
|
229 | 226 |
# Clean the artifact cache as much as possible.
|
230 | 227 |
#
|
228 |
+ # Returns:
|
|
229 |
+ # (int): The size of the cache after having cleaned up
|
|
230 |
+ #
|
|
231 | 231 |
def clean(self):
|
232 | 232 |
artifacts = self.list_artifacts()
|
233 | 233 |
|
234 |
- while self.calculate_cache_size() >= self.cache_quota - self.cache_lower_threshold:
|
|
234 |
+ # Do a real computation of the cache size once, just in case
|
|
235 |
+ self.compute_cache_size()
|
|
236 |
+ |
|
237 |
+ while self.get_cache_size() >= self._cache_lower_threshold:
|
|
235 | 238 |
try:
|
236 | 239 |
to_remove = artifacts.pop(0)
|
237 | 240 |
except IndexError:
|
... | ... | @@ -245,7 +248,7 @@ class ArtifactCache(): |
245 | 248 |
"Please increase the cache-quota in {}."
|
246 | 249 |
.format(self.context.config_origin or default_conf))
|
247 | 250 |
|
248 |
- if self.calculate_cache_size() > self.cache_quota:
|
|
251 |
+ if self.get_quota_exceeded():
|
|
249 | 252 |
raise ArtifactError("Cache too full. Aborting.",
|
250 | 253 |
detail=detail,
|
251 | 254 |
reason="cache-too-full")
|
... | ... | @@ -253,46 +256,94 @@ class ArtifactCache(): |
253 | 256 |
break
|
254 | 257 |
|
255 | 258 |
key = to_remove.rpartition('/')[2]
|
256 |
- if key not in self.required_artifacts:
|
|
259 |
+ if key not in self._required_artifacts:
|
|
260 |
+ |
|
261 |
+ # Remove the actual artifact, if it's not required.
|
|
257 | 262 |
size = self.remove(to_remove)
|
258 |
- if size:
|
|
259 |
- self.cache_size -= size
|
|
263 |
+ |
|
264 |
+ # Remove the size from the removed size
|
|
265 |
+ self.set_cache_size(self._cache_size - size)
|
|
260 | 266 |
|
261 | 267 |
# This should be O(1) if implemented correctly
|
262 |
- return self.calculate_cache_size()
|
|
268 |
+ return self.get_cache_size()
|
|
263 | 269 |
|
264 |
- # get_approximate_cache_size()
|
|
270 |
+ # compute_cache_size()
|
|
265 | 271 |
#
|
266 |
- # A cheap method that aims to serve as an upper limit on the
|
|
267 |
- # artifact cache size.
|
|
272 |
+ # Computes the real artifact cache size by calling
|
|
273 |
+ # the abstract calculate_cache_size() method.
|
|
268 | 274 |
#
|
269 |
- # The cache size reported by this function will normally be larger
|
|
270 |
- # than the real cache size, since it is calculated using the
|
|
271 |
- # pre-commit artifact size, but for very small artifacts in
|
|
272 |
- # certain caches additional overhead could cause this to be
|
|
273 |
- # smaller than, but close to, the actual size.
|
|
275 |
+ # Returns:
|
|
276 |
+ # (int): The size of the artifact cache.
|
|
277 |
+ #
|
|
278 |
+ def compute_cache_size(self):
|
|
279 |
+ self._cache_size = self.calculate_cache_size()
|
|
280 |
+ |
|
281 |
+ return self._cache_size
|
|
282 |
+ |
|
283 |
+ # add_artifact_size()
|
|
274 | 284 |
#
|
275 |
- # Nonetheless, in practice this should be safe to use as an upper
|
|
276 |
- # limit on the cache size.
|
|
285 |
+ # Adds the reported size of a newly cached artifact to the
|
|
286 |
+ # overall estimated size.
|
|
277 | 287 |
#
|
278 |
- # If the cache has built-in constant-time size reporting, please
|
|
279 |
- # feel free to override this method with a more accurate
|
|
280 |
- # implementation.
|
|
288 |
+ # Args:
|
|
289 |
+ # artifact_size (int): The size to add.
|
|
290 |
+ #
|
|
291 |
+ def add_artifact_size(self, artifact_size):
|
|
292 |
+ cache_size = self.get_cache_size()
|
|
293 |
+ cache_size += artifact_size
|
|
294 |
+ |
|
295 |
+ self.set_cache_size(cache_size)
|
|
296 |
+ |
|
297 |
+ # get_cache_size()
|
|
298 |
+ #
|
|
299 |
+ # Fetches the cached size of the cache, this is sometimes
|
|
300 |
+ # an estimate and periodically adjusted to the real size
|
|
301 |
+ # when a cache size calculation job runs.
|
|
302 |
+ #
|
|
303 |
+ # When it is an estimate, the value is either correct, or
|
|
304 |
+ # it is greater than the actual cache size.
|
|
281 | 305 |
#
|
282 | 306 |
# Returns:
|
283 | 307 |
# (int) An approximation of the artifact cache size.
|
284 | 308 |
#
|
285 |
- def get_approximate_cache_size(self):
|
|
286 |
- # If we don't currently have an estimate, figure out the real
|
|
287 |
- # cache size.
|
|
288 |
- if self.estimated_size is None:
|
|
309 |
+ def get_cache_size(self):
|
|
310 |
+ |
|
311 |
+ # If we don't currently have an estimate, figure out the real cache size.
|
|
312 |
+ if self._cache_size is None:
|
|
289 | 313 |
stored_size = self._read_cache_size()
|
290 | 314 |
if stored_size is not None:
|
291 |
- self.estimated_size = stored_size
|
|
315 |
+ self._cache_size = stored_size
|
|
292 | 316 |
else:
|
293 |
- self.estimated_size = self.calculate_cache_size()
|
|
317 |
+ self.compute_cache_size()
|
|
318 |
+ |
|
319 |
+ return self._cache_size
|
|
294 | 320 |
|
295 |
- return self.estimated_size
|
|
321 |
+ # set_cache_size()
|
|
322 |
+ #
|
|
323 |
+ # Forcefully set the overall cache size.
|
|
324 |
+ #
|
|
325 |
+ # This is used to update the size in the main process after
|
|
326 |
+ # having calculated in a cleanup or a cache size calculation job.
|
|
327 |
+ #
|
|
328 |
+ # Args:
|
|
329 |
+ # cache_size (int): The size to set.
|
|
330 |
+ #
|
|
331 |
+ def set_cache_size(self, cache_size):
|
|
332 |
+ |
|
333 |
+ assert cache_size is not None
|
|
334 |
+ |
|
335 |
+ self._cache_size = cache_size
|
|
336 |
+ self._write_cache_size(self._cache_size)
|
|
337 |
+ |
|
338 |
+ # get_quota_exceeded()
|
|
339 |
+ #
|
|
340 |
+ # Checks if the current artifact cache size exceeds the quota.
|
|
341 |
+ #
|
|
342 |
+ # Returns:
|
|
343 |
+ # (bool): True of the quota is exceeded
|
|
344 |
+ #
|
|
345 |
+ def get_quota_exceeded(self):
|
|
346 |
+ return self.get_cache_size() > self._cache_quota
|
|
296 | 347 |
|
297 | 348 |
################################################
|
298 | 349 |
# Abstract methods for subclasses to implement #
|
... | ... | @@ -484,11 +535,8 @@ class ArtifactCache(): |
484 | 535 |
#
|
485 | 536 |
# Return the real artifact cache size.
|
486 | 537 |
#
|
487 |
- # Implementations should also use this to update estimated_size.
|
|
488 |
- #
|
|
489 | 538 |
# Returns:
|
490 |
- #
|
|
491 |
- # (int) The size of the artifact cache.
|
|
539 |
+ # (int): The size of the artifact cache.
|
|
492 | 540 |
#
|
493 | 541 |
def calculate_cache_size(self):
|
494 | 542 |
raise ImplError("Cache '{kind}' does not implement calculate_cache_size()"
|
... | ... | @@ -535,39 +583,13 @@ class ArtifactCache(): |
535 | 583 |
with self.context.timed_activity("Initializing remote caches", silent_nested=True):
|
536 | 584 |
self.initialize_remotes(on_failure=remote_failed)
|
537 | 585 |
|
538 |
- # _add_artifact_size()
|
|
539 |
- #
|
|
540 |
- # Since we cannot keep track of the cache size between threads,
|
|
541 |
- # this method will be called by the main process every time a
|
|
542 |
- # process that added something to the cache finishes.
|
|
543 |
- #
|
|
544 |
- # This will then add the reported size to
|
|
545 |
- # ArtifactCache.estimated_size.
|
|
546 |
- #
|
|
547 |
- def _add_artifact_size(self, artifact_size):
|
|
548 |
- if not self.estimated_size:
|
|
549 |
- self.estimated_size = self.calculate_cache_size()
|
|
550 |
- |
|
551 |
- self.estimated_size += artifact_size
|
|
552 |
- self._write_cache_size(self.estimated_size)
|
|
553 |
- |
|
554 |
- # _set_cache_size()
|
|
555 |
- #
|
|
556 |
- # Similarly to the above method, when we calculate the actual size
|
|
557 |
- # in a child thread, we can't update it. We instead pass the value
|
|
558 |
- # back to the main thread and update it there.
|
|
559 |
- #
|
|
560 |
- def _set_cache_size(self, cache_size):
|
|
561 |
- self.estimated_size = cache_size
|
|
562 |
- |
|
563 |
- # set_cache_size is called in cleanup, where it may set the cache to None
|
|
564 |
- if self.estimated_size is not None:
|
|
565 |
- self._write_cache_size(self.estimated_size)
|
|
566 |
- |
|
567 | 586 |
# _write_cache_size()
|
568 | 587 |
#
|
569 | 588 |
# Writes the given size of the artifact to the cache's size file
|
570 | 589 |
#
|
590 |
+ # Args:
|
|
591 |
+ # size (int): The size of the artifact cache to record
|
|
592 |
+ #
|
|
571 | 593 |
def _write_cache_size(self, size):
|
572 | 594 |
assert isinstance(size, int)
|
573 | 595 |
size_file_path = os.path.join(self.context.artifactdir, CACHE_SIZE_FILE)
|
... | ... | @@ -579,6 +601,9 @@ class ArtifactCache(): |
579 | 601 |
# Reads and returns the size of the artifact cache that's stored in the
|
580 | 602 |
# cache's size file
|
581 | 603 |
#
|
604 |
+ # Returns:
|
|
605 |
+ # (int): The size of the artifact cache, as recorded in the file
|
|
606 |
+ #
|
|
582 | 607 |
def _read_cache_size(self):
|
583 | 608 |
size_file_path = os.path.join(self.context.artifactdir, CACHE_SIZE_FILE)
|
584 | 609 |
|
... | ... | @@ -628,13 +653,13 @@ class ArtifactCache(): |
628 | 653 |
stat = os.statvfs(artifactdir_volume)
|
629 | 654 |
available_space = (stat.f_bsize * stat.f_bavail)
|
630 | 655 |
|
631 |
- cache_size = self.get_approximate_cache_size()
|
|
656 |
+ cache_size = self.get_cache_size()
|
|
632 | 657 |
|
633 | 658 |
# Ensure system has enough storage for the cache_quota
|
634 | 659 |
#
|
635 | 660 |
# If cache_quota is none, set it to the maximum it could possibly be.
|
636 | 661 |
#
|
637 |
- # Also check that cache_quota is atleast as large as our headroom.
|
|
662 |
+ # Also check that cache_quota is at least as large as our headroom.
|
|
638 | 663 |
#
|
639 | 664 |
if cache_quota is None: # Infinity, set to max system storage
|
640 | 665 |
cache_quota = cache_size + available_space
|
... | ... | @@ -660,8 +685,8 @@ class ArtifactCache(): |
660 | 685 |
# if we end up writing more than 2G, but hey, this stuff is
|
661 | 686 |
# already really fuzzy.
|
662 | 687 |
#
|
663 |
- self.cache_quota = cache_quota - headroom
|
|
664 |
- self.cache_lower_threshold = self.cache_quota / 2
|
|
688 |
+ self._cache_quota = cache_quota - headroom
|
|
689 |
+ self._cache_lower_threshold = self._cache_quota / 2
|
|
665 | 690 |
|
666 | 691 |
|
667 | 692 |
# _configured_remote_artifact_cache_specs():
|
... | ... | @@ -120,8 +120,6 @@ class CASCache(ArtifactCache): |
120 | 120 |
for ref in refs:
|
121 | 121 |
self.set_ref(ref, tree)
|
122 | 122 |
|
123 |
- self.cache_size = None
|
|
124 |
- |
|
125 | 123 |
def diff(self, element, key_a, key_b, *, subdir=None):
|
126 | 124 |
ref_a = self.get_artifact_fullname(element, key_a)
|
127 | 125 |
ref_b = self.get_artifact_fullname(element, key_b)
|
... | ... | @@ -488,11 +486,7 @@ class CASCache(ArtifactCache): |
488 | 486 |
raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e
|
489 | 487 |
|
490 | 488 |
def calculate_cache_size(self):
|
491 |
- if self.cache_size is None:
|
|
492 |
- self.cache_size = utils._get_dir_size(self.casdir)
|
|
493 |
- self.estimated_size = self.cache_size
|
|
494 |
- |
|
495 |
- return self.cache_size
|
|
489 |
+ return utils._get_dir_size(self.casdir)
|
|
496 | 490 |
|
497 | 491 |
# list_artifacts():
|
498 | 492 |
#
|
... | ... | @@ -24,15 +24,19 @@ class CacheSizeJob(Job): |
24 | 24 |
def __init__(self, *args, complete_cb, **kwargs):
|
25 | 25 |
super().__init__(*args, **kwargs)
|
26 | 26 |
self._complete_cb = complete_cb
|
27 |
- self._cache = Platform._instance.artifactcache
|
|
27 |
+ |
|
28 |
+ platform = Platform.get_platform()
|
|
29 |
+ self._artifacts = platform.artifactcache
|
|
28 | 30 |
|
29 | 31 |
def child_process(self):
|
30 |
- return self._cache.calculate_cache_size()
|
|
32 |
+ return self._artifacts.compute_cache_size()
|
|
31 | 33 |
|
32 | 34 |
def parent_complete(self, success, result):
|
33 |
- self._cache._set_cache_size(result)
|
|
34 |
- if self._complete_cb:
|
|
35 |
- self._complete_cb(result)
|
|
35 |
+ if success:
|
|
36 |
+ self._artifacts.set_cache_size(result)
|
|
37 |
+ |
|
38 |
+ if self._complete_cb:
|
|
39 |
+ self._complete_cb(result)
|
|
36 | 40 |
|
37 | 41 |
def child_process_data(self):
|
38 | 42 |
return {}
|
... | ... | @@ -24,15 +24,19 @@ class CleanupJob(Job): |
24 | 24 |
def __init__(self, *args, complete_cb, **kwargs):
|
25 | 25 |
super().__init__(*args, **kwargs)
|
26 | 26 |
self._complete_cb = complete_cb
|
27 |
- self._cache = Platform._instance.artifactcache
|
|
27 |
+ |
|
28 |
+ platform = Platform.get_platform()
|
|
29 |
+ self._artifacts = platform.artifactcache
|
|
28 | 30 |
|
29 | 31 |
def child_process(self):
|
30 |
- return self._cache.clean()
|
|
32 |
+ return self._artifacts.clean()
|
|
31 | 33 |
|
32 | 34 |
def parent_complete(self, success, result):
|
33 |
- self._cache._set_cache_size(result)
|
|
34 |
- if self._complete_cb:
|
|
35 |
- self._complete_cb()
|
|
35 |
+ if success:
|
|
36 |
+ self._artifacts.set_cache_size(result)
|
|
37 |
+ |
|
38 |
+ if self._complete_cb:
|
|
39 |
+ self._complete_cb()
|
|
36 | 40 |
|
37 | 41 |
def child_process_data(self):
|
38 | 42 |
return {}
|
... | ... | @@ -109,13 +109,7 @@ class ElementJob(Job): |
109 | 109 |
data = {}
|
110 | 110 |
|
111 | 111 |
workspace = self._element._get_workspace()
|
112 |
- artifact_size = self._element._get_artifact_size()
|
|
113 |
- cache_size = self._element._get_artifact_cache().calculate_cache_size()
|
|
114 |
- |
|
115 | 112 |
if workspace is not None:
|
116 | 113 |
data['workspace'] = workspace.to_dict()
|
117 |
- if artifact_size is not None:
|
|
118 |
- data['artifact_size'] = artifact_size
|
|
119 |
- data['cache_size'] = cache_size
|
|
120 | 114 |
|
121 | 115 |
return data
|
... | ... | @@ -20,6 +20,7 @@ |
20 | 20 |
|
21 | 21 |
from . import Queue, QueueStatus
|
22 | 22 |
from ..resources import ResourceType
|
23 |
+from ..._platform import Platform
|
|
23 | 24 |
|
24 | 25 |
|
25 | 26 |
# A queue which assembles elements
|
... | ... | @@ -28,11 +29,10 @@ class BuildQueue(Queue): |
28 | 29 |
|
29 | 30 |
action_name = "Build"
|
30 | 31 |
complete_name = "Built"
|
31 |
- resources = [ResourceType.PROCESS]
|
|
32 |
+ resources = [ResourceType.PROCESS, ResourceType.CACHE]
|
|
32 | 33 |
|
33 | 34 |
def process(self, element):
|
34 |
- element._assemble()
|
|
35 |
- return element._get_unique_id()
|
|
35 |
+ return element._assemble()
|
|
36 | 36 |
|
37 | 37 |
def status(self, element):
|
38 | 38 |
# state of dependencies may have changed, recalculate element state
|
... | ... | @@ -51,18 +51,22 @@ class BuildQueue(Queue): |
51 | 51 |
|
52 | 52 |
return QueueStatus.READY
|
53 | 53 |
|
54 |
- def _check_cache_size(self, job, element):
|
|
55 |
- if not job.child_data:
|
|
56 |
- return
|
|
54 |
+ def _check_cache_size(self, job, element, artifact_size):
|
|
57 | 55 |
|
58 |
- artifact_size = job.child_data.get('artifact_size', False)
|
|
56 |
+ # After completing a build job, add the artifact size
|
|
57 |
+ # as returned from Element._assemble() to the estimated
|
|
58 |
+ # artifact cache size
|
|
59 |
+ #
|
|
60 |
+ platform = Platform.get_platform()
|
|
61 |
+ artifacts = platform.artifactcache
|
|
59 | 62 |
|
60 |
- if artifact_size:
|
|
61 |
- cache = element._get_artifact_cache()
|
|
62 |
- cache._add_artifact_size(artifact_size)
|
|
63 |
+ artifacts.add_artifact_size(artifact_size)
|
|
63 | 64 |
|
64 |
- if cache.get_approximate_cache_size() > cache.cache_quota:
|
|
65 |
- self._scheduler._check_cache_size_real()
|
|
65 |
+ # If the estimated size outgrows the quota, ask the scheduler
|
|
66 |
+ # to queue a job to actually check the real cache size.
|
|
67 |
+ #
|
|
68 |
+ if artifacts.get_quota_exceeded():
|
|
69 |
+ self._scheduler.check_cache_size()
|
|
66 | 70 |
|
67 | 71 |
def done(self, job, element, result, success):
|
68 | 72 |
|
... | ... | @@ -70,8 +74,8 @@ class BuildQueue(Queue): |
70 | 74 |
# Inform element in main process that assembly is done
|
71 | 75 |
element._assemble_done()
|
72 | 76 |
|
73 |
- # This has to be done after _assemble_done, such that the
|
|
74 |
- # element may register its cache key as required
|
|
75 |
- self._check_cache_size(job, element)
|
|
77 |
+ # This has to be done after _assemble_done, such that the
|
|
78 |
+ # element may register its cache key as required
|
|
79 |
+ self._check_cache_size(job, element, result)
|
|
76 | 80 |
|
77 | 81 |
return True
|
... | ... | @@ -29,7 +29,7 @@ class PullQueue(Queue): |
29 | 29 |
|
30 | 30 |
action_name = "Pull"
|
31 | 31 |
complete_name = "Pulled"
|
32 |
- resources = [ResourceType.DOWNLOAD]
|
|
32 |
+ resources = [ResourceType.DOWNLOAD, ResourceType.CACHE]
|
|
33 | 33 |
|
34 | 34 |
def process(self, element):
|
35 | 35 |
# returns whether an artifact was downloaded or not
|
... | ... | @@ -62,7 +62,7 @@ class PullQueue(Queue): |
62 | 62 |
# Build jobs will check the "approximate" size first. Since we
|
63 | 63 |
# do not get an artifact size from pull jobs, we have to
|
64 | 64 |
# actually check the cache size.
|
65 |
- self._scheduler._check_cache_size_real()
|
|
65 |
+ self._scheduler.check_cache_size()
|
|
66 | 66 |
|
67 | 67 |
# Element._pull() returns True if it downloaded an artifact,
|
68 | 68 |
# here we want to appear skipped if we did not download.
|
... | ... | @@ -300,8 +300,6 @@ class Queue(): |
300 | 300 |
# Update values that need to be synchronized in the main task
|
301 | 301 |
# before calling any queue implementation
|
302 | 302 |
self._update_workspaces(element, job)
|
303 |
- if job.child_data:
|
|
304 |
- element._get_artifact_cache().cache_size = job.child_data.get('cache_size')
|
|
305 | 303 |
|
306 | 304 |
# Give the result of the job to the Queue implementor,
|
307 | 305 |
# and determine if it should be considered as processed
|
... | ... | @@ -8,7 +8,7 @@ class ResourceType(): |
8 | 8 |
class Resources():
|
9 | 9 |
def __init__(self, num_builders, num_fetchers, num_pushers):
|
10 | 10 |
self._max_resources = {
|
11 |
- ResourceType.CACHE: 1,
|
|
11 |
+ ResourceType.CACHE: 0,
|
|
12 | 12 |
ResourceType.DOWNLOAD: num_fetchers,
|
13 | 13 |
ResourceType.PROCESS: num_builders,
|
14 | 14 |
ResourceType.UPLOAD: num_pushers
|
... | ... | @@ -239,6 +239,25 @@ class Scheduler(): |
239 | 239 |
self._schedule_queue_jobs()
|
240 | 240 |
self._sched()
|
241 | 241 |
|
242 |
+ # check_cache_size():
|
|
243 |
+ #
|
|
244 |
+ # Queues a cache size calculation job, after the cache
|
|
245 |
+ # size is calculated, a cleanup job will be run automatically
|
|
246 |
+ # if needed.
|
|
247 |
+ #
|
|
248 |
+ # FIXME: This should ensure that only one cache size job
|
|
249 |
+ # is ever pending at a given time. If a cache size
|
|
250 |
+ # job is already running, it is correct to queue
|
|
251 |
+ # a new one, it is incorrect to have more than one
|
|
252 |
+ # of these jobs pending at a given time, though.
|
|
253 |
+ #
|
|
254 |
+ def check_cache_size(self):
|
|
255 |
+ job = CacheSizeJob(self, 'cache_size', 'cache_size/cache_size',
|
|
256 |
+ resources=[ResourceType.CACHE,
|
|
257 |
+ ResourceType.PROCESS],
|
|
258 |
+ complete_cb=self._run_cleanup)
|
|
259 |
+ self.schedule_jobs([job])
|
|
260 |
+ |
|
242 | 261 |
#######################################################
|
243 | 262 |
# Local Private Methods #
|
244 | 263 |
#######################################################
|
... | ... | @@ -314,26 +333,32 @@ class Scheduler(): |
314 | 333 |
self.schedule_jobs(ready)
|
315 | 334 |
self._sched()
|
316 | 335 |
|
336 |
+ # _run_cleanup()
|
|
337 |
+ #
|
|
338 |
+ # Schedules the cache cleanup job if the passed size
|
|
339 |
+ # exceeds the cache quota.
|
|
340 |
+ #
|
|
341 |
+ # Args:
|
|
342 |
+ # cache_size (int): The calculated cache size (ignored)
|
|
343 |
+ #
|
|
344 |
+ # NOTE: This runs in response to completion of the cache size
|
|
345 |
+ # calculation job lauched by Scheduler.check_cache_size(),
|
|
346 |
+ # which will report the calculated cache size.
|
|
347 |
+ #
|
|
317 | 348 |
def _run_cleanup(self, cache_size):
|
318 | 349 |
platform = Platform.get_platform()
|
319 |
- if cache_size and cache_size < platform.artifactcache.cache_quota:
|
|
350 |
+ artifacts = platform.artifactcache
|
|
351 |
+ |
|
352 |
+ if not artifacts.get_quota_exceeded():
|
|
320 | 353 |
return
|
321 | 354 |
|
322 |
- job = CleanupJob(self, 'cleanup', 'cleanup',
|
|
355 |
+ job = CleanupJob(self, 'cleanup', 'cleanup/cleanup',
|
|
323 | 356 |
resources=[ResourceType.CACHE,
|
324 | 357 |
ResourceType.PROCESS],
|
325 | 358 |
exclusive_resources=[ResourceType.CACHE],
|
326 | 359 |
complete_cb=None)
|
327 | 360 |
self.schedule_jobs([job])
|
328 | 361 |
|
329 |
- def _check_cache_size_real(self):
|
|
330 |
- job = CacheSizeJob(self, 'cache_size', 'cache_size/cache_size',
|
|
331 |
- resources=[ResourceType.CACHE,
|
|
332 |
- ResourceType.PROCESS],
|
|
333 |
- exclusive_resources=[ResourceType.CACHE],
|
|
334 |
- complete_cb=self._run_cleanup)
|
|
335 |
- self.schedule_jobs([job])
|
|
336 |
- |
|
337 | 362 |
# _suspend_jobs()
|
338 | 363 |
#
|
339 | 364 |
# Suspend all ongoing jobs.
|
... | ... | @@ -231,7 +231,6 @@ class Element(Plugin): |
231 | 231 |
self.__staged_sources_directory = None # Location where Element.stage_sources() was called
|
232 | 232 |
self.__tainted = None # Whether the artifact is tainted and should not be shared
|
233 | 233 |
self.__required = False # Whether the artifact is required in the current session
|
234 |
- self.__artifact_size = None # The size of data committed to the artifact cache
|
|
235 | 234 |
|
236 | 235 |
# hash tables of loaded artifact metadata, hashed by key
|
237 | 236 |
self.__metadata_keys = {} # Strong and weak keys for this key
|
... | ... | @@ -1454,6 +1453,9 @@ class Element(Plugin): |
1454 | 1453 |
# - Call the public abstract methods for the build phase
|
1455 | 1454 |
# - Cache the resulting artifact
|
1456 | 1455 |
#
|
1456 |
+ # Returns:
|
|
1457 |
+ # (int): The size of the newly cached artifact
|
|
1458 |
+ #
|
|
1457 | 1459 |
def _assemble(self):
|
1458 | 1460 |
|
1459 | 1461 |
# Assert call ordering
|
... | ... | @@ -1573,12 +1575,14 @@ class Element(Plugin): |
1573 | 1575 |
}), os.path.join(metadir, 'workspaced-dependencies.yaml'))
|
1574 | 1576 |
|
1575 | 1577 |
with self.timed_activity("Caching artifact"):
|
1576 |
- self.__artifact_size = utils._get_dir_size(assembledir)
|
|
1578 |
+ artifact_size = utils._get_dir_size(assembledir)
|
|
1577 | 1579 |
self.__artifacts.commit(self, assembledir, self.__get_cache_keys_for_commit())
|
1578 | 1580 |
|
1579 | 1581 |
# Finally cleanup the build dir
|
1580 | 1582 |
cleanup_rootdir()
|
1581 | 1583 |
|
1584 |
+ return artifact_size
|
|
1585 |
+ |
|
1582 | 1586 |
# _pull_pending()
|
1583 | 1587 |
#
|
1584 | 1588 |
# Check whether the artifact will be pulled.
|
... | ... | @@ -1817,25 +1821,6 @@ class Element(Plugin): |
1817 | 1821 |
workspaces = self._get_context().get_workspaces()
|
1818 | 1822 |
return workspaces.get_workspace(self._get_full_name())
|
1819 | 1823 |
|
1820 |
- # _get_artifact_size()
|
|
1821 |
- #
|
|
1822 |
- # Get the size of the artifact produced by this element in the
|
|
1823 |
- # current pipeline - if this element has not been assembled or
|
|
1824 |
- # pulled, this will be None.
|
|
1825 |
- #
|
|
1826 |
- # Note that this is the size of an artifact *before* committing it
|
|
1827 |
- # to the cache, the size on disk may differ. It can act as an
|
|
1828 |
- # approximate guide for when to do a proper size calculation.
|
|
1829 |
- #
|
|
1830 |
- # Returns:
|
|
1831 |
- # (int|None): The size of the artifact
|
|
1832 |
- #
|
|
1833 |
- def _get_artifact_size(self):
|
|
1834 |
- return self.__artifact_size
|
|
1835 |
- |
|
1836 |
- def _get_artifact_cache(self):
|
|
1837 |
- return self.__artifacts
|
|
1838 |
- |
|
1839 | 1824 |
# _write_script():
|
1840 | 1825 |
#
|
1841 | 1826 |
# Writes a script to the given directory.
|
... | ... | @@ -175,6 +175,22 @@ def test_keep_dependencies(cli, datafiles, tmpdir): |
175 | 175 |
|
176 | 176 |
|
177 | 177 |
# Assert that we never delete a dependency required for a build tree
|
178 |
+#
|
|
179 |
+# NOTE: This test expects that a build will fail if it attempts to
|
|
180 |
+# put more artifacts in the cache than the quota can hold,
|
|
181 |
+# and expects that the last two elements which don't fit into
|
|
182 |
+# the quota wont even be built.
|
|
183 |
+#
|
|
184 |
+# In real life, this will not be the case, since once we reach
|
|
185 |
+# the estimated quota we launch a cache size calculation job and
|
|
186 |
+# only launch a cleanup job when the size is calculated; and
|
|
187 |
+# other build tasks will be scheduled while the cache size job
|
|
188 |
+# is running.
|
|
189 |
+#
|
|
190 |
+# This test only passes because we configure `builders` to 1,
|
|
191 |
+# ensuring that the cache size job runs exclusively since it
|
|
192 |
+# also requires a compute resource (a "builder").
|
|
193 |
+#
|
|
178 | 194 |
@pytest.mark.datafiles(DATA_DIR)
|
179 | 195 |
def test_never_delete_dependencies(cli, datafiles, tmpdir):
|
180 | 196 |
project = os.path.join(datafiles.dirname, datafiles.basename)
|
... | ... | @@ -183,6 +199,9 @@ def test_never_delete_dependencies(cli, datafiles, tmpdir): |
183 | 199 |
cli.configure({
|
184 | 200 |
'cache': {
|
185 | 201 |
'quota': 10000000
|
202 |
+ },
|
|
203 |
+ 'scheduler': {
|
|
204 |
+ 'builders': 1
|
|
186 | 205 |
}
|
187 | 206 |
})
|
188 | 207 |
|