Tiago Gomes pushed to branch tiagogomes/issue-573 at BuildStream / buildstream
Commits:
-
abeb1674
by Tiago Gomes at 2018-09-12T12:18:54Z
-
5f4c19ec
by Tiago Gomes at 2018-09-12T12:20:02Z
-
386d5a80
by Tiago Gomes at 2018-09-12T12:20:04Z
-
111c23a6
by Tiago Gomes at 2018-09-12T12:20:04Z
-
3a91f092
by Tiago Gomes at 2018-09-12T12:29:58Z
-
83b5d55c
by Tiago Gomes at 2018-09-12T12:29:58Z
-
d676da49
by Tiago Gomes at 2018-09-12T12:29:58Z
8 changed files:
- buildstream/_artifactcache/artifactcache.py
- buildstream/_scheduler/jobs/__init__.py
- − buildstream/_scheduler/jobs/cachesizejob.py
- buildstream/_scheduler/jobs/cleanupjob.py
- buildstream/_scheduler/queues/buildqueue.py
- buildstream/_scheduler/queues/pullqueue.py
- buildstream/_scheduler/scheduler.py
- buildstream/element.py
Changes:
1 | 1 |
#
|
2 |
-# Copyright (C) 2017-2018 Codethink Limited
|
|
2 |
+# Copyright (C) 2018 Codethink Limited
|
|
3 | 3 |
#
|
4 | 4 |
# This program is free software; you can redistribute it and/or
|
5 | 5 |
# modify it under the terms of the GNU Lesser General Public
|
... | ... | @@ -16,6 +16,7 @@ |
16 | 16 |
#
|
17 | 17 |
# Authors:
|
18 | 18 |
# Tristan Maat <tristan maat codethink co uk>
|
19 |
+# Tiago Gomes <tiago gomes codethink co uk>
|
|
19 | 20 |
|
20 | 21 |
import os
|
21 | 22 |
import string
|
... | ... | @@ -88,7 +89,7 @@ class ArtifactCache(): |
88 | 89 |
self.project_remote_specs = {}
|
89 | 90 |
|
90 | 91 |
self._required_artifacts = set() # The artifacts required for this session
|
91 |
- self._cache_size = None # The current cache size, sometimes it's an estimate
|
|
92 |
+ self._cache_size = None # The current cache size
|
|
92 | 93 |
self._cache_quota = None # The cache quota
|
93 | 94 |
self._cache_lower_threshold = None # The target cache size for a cleanup
|
94 | 95 |
|
... | ... | @@ -226,13 +227,11 @@ class ArtifactCache(): |
226 | 227 |
# Clean the artifact cache as much as possible.
|
227 | 228 |
#
|
228 | 229 |
# Returns:
|
229 |
- # (int): The size of the cache after having cleaned up
|
|
230 |
+ # (int): Amount of bytes cleaned from the cache.
|
|
230 | 231 |
#
|
231 | 232 |
def clean(self):
|
232 | 233 |
artifacts = self.list_artifacts()
|
233 |
- |
|
234 |
- # Do a real computation of the cache size once, just in case
|
|
235 |
- self.compute_cache_size()
|
|
234 |
+ old_cache_size = self.get_cache_size()
|
|
236 | 235 |
|
237 | 236 |
while self.get_cache_size() >= self._cache_lower_threshold:
|
238 | 237 |
try:
|
... | ... | @@ -248,7 +247,7 @@ class ArtifactCache(): |
248 | 247 |
"Please increase the cache-quota in {}."
|
249 | 248 |
.format(self.context.config_origin or default_conf))
|
250 | 249 |
|
251 |
- if self.get_quota_exceeded():
|
|
250 |
+ if self.has_quota_exceeded():
|
|
252 | 251 |
raise ArtifactError("Cache too full. Aborting.",
|
253 | 252 |
detail=detail,
|
254 | 253 |
reason="cache-too-full")
|
... | ... | @@ -260,89 +259,56 @@ class ArtifactCache(): |
260 | 259 |
|
261 | 260 |
# Remove the actual artifact, if it's not required.
|
262 | 261 |
size = self.remove(to_remove)
|
262 |
+ self._cache_size -= size
|
|
263 |
+ self._message(MessageType.DEBUG,
|
|
264 |
+ "Removed artifact {} ({})".format(
|
|
265 |
+ to_remove[:-(len(key) - self.context.log_key_length)],
|
|
266 |
+ utils._pretty_size(size)))
|
|
263 | 267 |
|
264 |
- # Remove the size from the removed size
|
|
265 |
- self.set_cache_size(self._cache_size - size)
|
|
268 |
+ self._message(MessageType.INFO,
|
|
269 |
+ "New artifact cache size: {}".format(
|
|
270 |
+ utils._pretty_size(self._cache_size)))
|
|
266 | 271 |
|
267 |
- # This should be O(1) if implemented correctly
|
|
268 |
- return self.get_cache_size()
|
|
269 |
- |
|
270 |
- # compute_cache_size()
|
|
271 |
- #
|
|
272 |
- # Computes the real artifact cache size by calling
|
|
273 |
- # the abstract calculate_cache_size() method.
|
|
274 |
- #
|
|
275 |
- # Returns:
|
|
276 |
- # (int): The size of the artifact cache.
|
|
277 |
- #
|
|
278 |
- def compute_cache_size(self):
|
|
279 |
- self._cache_size = self.calculate_cache_size()
|
|
280 |
- |
|
281 |
- return self._cache_size
|
|
272 |
+ return old_cache_size - self._cache_size
|
|
282 | 273 |
|
283 | 274 |
# add_artifact_size()
|
284 | 275 |
#
|
285 | 276 |
# Adds the reported size of a newly cached artifact to the
|
286 |
- # overall estimated size.
|
|
277 |
+ # current cache size.
|
|
287 | 278 |
#
|
288 | 279 |
# Args:
|
289 | 280 |
# artifact_size (int): The size to add.
|
290 | 281 |
#
|
291 | 282 |
def add_artifact_size(self, artifact_size):
|
292 |
- cache_size = self.get_cache_size()
|
|
293 |
- cache_size += artifact_size
|
|
283 |
+ assert utils._is_main_process()
|
|
294 | 284 |
|
295 |
- self.set_cache_size(cache_size)
|
|
285 |
+ self._cache_size = self.get_cache_size() + artifact_size
|
|
286 |
+ self._write_cache_size(self._cache_size)
|
|
296 | 287 |
|
297 | 288 |
# get_cache_size()
|
298 | 289 |
#
|
299 |
- # Fetches the cached size of the cache, this is sometimes
|
|
300 |
- # an estimate and periodically adjusted to the real size
|
|
301 |
- # when a cache size calculation job runs.
|
|
302 |
- #
|
|
303 |
- # When it is an estimate, the value is either correct, or
|
|
304 |
- # it is greater than the actual cache size.
|
|
290 |
+ # Returns the size of the artifact cache.
|
|
305 | 291 |
#
|
306 | 292 |
# Returns:
|
307 |
- # (int) An approximation of the artifact cache size.
|
|
293 |
+ # (int): The size of the artifact cache.
|
|
308 | 294 |
#
|
309 | 295 |
def get_cache_size(self):
|
296 |
+ if self._cache_size is None:
|
|
297 |
+ self._cache_size = self._read_cache_size()
|
|
310 | 298 |
|
311 |
- # If we don't currently have an estimate, figure out the real cache size.
|
|
312 | 299 |
if self._cache_size is None:
|
313 |
- stored_size = self._read_cache_size()
|
|
314 |
- if stored_size is not None:
|
|
315 |
- self._cache_size = stored_size
|
|
316 |
- else:
|
|
317 |
- self.compute_cache_size()
|
|
300 |
+ self._cache_size = self.calculate_cache_size()
|
|
318 | 301 |
|
319 | 302 |
return self._cache_size
|
320 | 303 |
|
321 |
- # set_cache_size()
|
|
322 |
- #
|
|
323 |
- # Forcefully set the overall cache size.
|
|
324 |
- #
|
|
325 |
- # This is used to update the size in the main process after
|
|
326 |
- # having calculated in a cleanup or a cache size calculation job.
|
|
327 |
- #
|
|
328 |
- # Args:
|
|
329 |
- # cache_size (int): The size to set.
|
|
330 |
- #
|
|
331 |
- def set_cache_size(self, cache_size):
|
|
332 |
- |
|
333 |
- assert cache_size is not None
|
|
334 |
- |
|
335 |
- self._cache_size = cache_size
|
|
336 |
- self._write_cache_size(self._cache_size)
|
|
337 |
- |
|
338 |
- # get_quota_exceeded()
|
|
304 |
+ # has_quota_exceeded()
|
|
339 | 305 |
#
|
340 | 306 |
# Checks if the current artifact cache size exceeds the quota.
|
341 | 307 |
#
|
342 | 308 |
# Returns:
|
343 | 309 |
# (bool): True of the quota is exceeded
|
344 | 310 |
#
|
345 |
- def get_quota_exceeded(self):
|
|
311 |
+ def has_quota_exceeded(self):
|
|
346 | 312 |
return self.get_cache_size() > self._cache_quota
|
347 | 313 |
|
348 | 314 |
################################################
|
... | ... | @@ -18,5 +18,4 @@ |
18 | 18 |
# Tristan Maat <tristan maat codethink co uk>
|
19 | 19 |
|
20 | 20 |
from .elementjob import ElementJob
|
21 |
-from .cachesizejob import CacheSizeJob
|
|
22 | 21 |
from .cleanupjob import CleanupJob
|
1 |
-# Copyright (C) 2018 Codethink Limited
|
|
2 |
-#
|
|
3 |
-# This program is free software; you can redistribute it and/or
|
|
4 |
-# modify it under the terms of the GNU Lesser General Public
|
|
5 |
-# License as published by the Free Software Foundation; either
|
|
6 |
-# version 2 of the License, or (at your option) any later version.
|
|
7 |
-#
|
|
8 |
-# This library is distributed in the hope that it will be useful,
|
|
9 |
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
10 |
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
11 |
-# Lesser General Public License for more details.
|
|
12 |
-#
|
|
13 |
-# You should have received a copy of the GNU Lesser General Public
|
|
14 |
-# License along with this library. If not, see <http://www.gnu.org/licenses/>.
|
|
15 |
-#
|
|
16 |
-# Author:
|
|
17 |
-# Tristan Daniël Maat <tristan maat codethink co uk>
|
|
18 |
-#
|
|
19 |
-from .job import Job
|
|
20 |
-from ..._platform import Platform
|
|
21 |
- |
|
22 |
- |
|
23 |
-class CacheSizeJob(Job):
|
|
24 |
- def __init__(self, *args, complete_cb, **kwargs):
|
|
25 |
- super().__init__(*args, **kwargs)
|
|
26 |
- self._complete_cb = complete_cb
|
|
27 |
- |
|
28 |
- platform = Platform.get_platform()
|
|
29 |
- self._artifacts = platform.artifactcache
|
|
30 |
- |
|
31 |
- def child_process(self):
|
|
32 |
- return self._artifacts.compute_cache_size()
|
|
33 |
- |
|
34 |
- def parent_complete(self, success, result):
|
|
35 |
- if success:
|
|
36 |
- self._artifacts.set_cache_size(result)
|
|
37 |
- |
|
38 |
- if self._complete_cb:
|
|
39 |
- self._complete_cb(result)
|
|
40 |
- |
|
41 |
- def child_process_data(self):
|
|
42 |
- return {}
|
... | ... | @@ -21,9 +21,8 @@ from ..._platform import Platform |
21 | 21 |
|
22 | 22 |
|
23 | 23 |
class CleanupJob(Job):
|
24 |
- def __init__(self, *args, complete_cb, **kwargs):
|
|
24 |
+ def __init__(self, *args, **kwargs):
|
|
25 | 25 |
super().__init__(*args, **kwargs)
|
26 |
- self._complete_cb = complete_cb
|
|
27 | 26 |
|
28 | 27 |
platform = Platform.get_platform()
|
29 | 28 |
self._artifacts = platform.artifactcache
|
... | ... | @@ -33,10 +32,7 @@ class CleanupJob(Job): |
33 | 32 |
|
34 | 33 |
def parent_complete(self, success, result):
|
35 | 34 |
if success:
|
36 |
- self._artifacts.set_cache_size(result)
|
|
37 |
- |
|
38 |
- if self._complete_cb:
|
|
39 |
- self._complete_cb()
|
|
40 |
- |
|
41 |
- def child_process_data(self):
|
|
42 |
- return {}
|
|
35 |
+ # ArtifactCache.clean() returns the number of bytes cleaned.
|
|
36 |
+ # We negate the number because the cache size is to be
|
|
37 |
+ # decreased.
|
|
38 |
+ self._artifacts.add_artifact_size(result * -1)
|
... | ... | @@ -87,31 +87,17 @@ class BuildQueue(Queue): |
87 | 87 |
|
88 | 88 |
return QueueStatus.READY
|
89 | 89 |
|
90 |
- def _check_cache_size(self, job, element, artifact_size):
|
|
91 |
- |
|
92 |
- # After completing a build job, add the artifact size
|
|
93 |
- # as returned from Element._assemble() to the estimated
|
|
94 |
- # artifact cache size
|
|
95 |
- #
|
|
96 |
- platform = Platform.get_platform()
|
|
97 |
- artifacts = platform.artifactcache
|
|
98 |
- |
|
99 |
- artifacts.add_artifact_size(artifact_size)
|
|
100 |
- |
|
101 |
- # If the estimated size outgrows the quota, ask the scheduler
|
|
102 |
- # to queue a job to actually check the real cache size.
|
|
103 |
- #
|
|
104 |
- if artifacts.get_quota_exceeded():
|
|
105 |
- self._scheduler.check_cache_size()
|
|
106 |
- |
|
107 | 90 |
def done(self, job, element, result, success):
|
91 |
+ if not success:
|
|
92 |
+ return False
|
|
93 |
+ |
|
94 |
+ element._assemble_done()
|
|
108 | 95 |
|
109 |
- if success:
|
|
110 |
- # Inform element in main process that assembly is done
|
|
111 |
- element._assemble_done()
|
|
96 |
+ artifacts = Platform.get_platform().artifactcache
|
|
97 |
+ artifacts.add_artifact_size(result)
|
|
112 | 98 |
|
113 |
- # This has to be done after _assemble_done, such that the
|
|
114 |
- # element may register its cache key as required
|
|
115 |
- self._check_cache_size(job, element, result)
|
|
99 |
+ # This has to be done after _assemble_done, such that the
|
|
100 |
+ # element may register its cache key as required
|
|
101 |
+ self._scheduler.check_cache_size()
|
|
116 | 102 |
|
117 |
- return True
|
|
103 |
+ return success
|
1 | 1 |
#
|
2 |
-# Copyright (C) 2016 Codethink Limited
|
|
2 |
+# Copyright (C) 2018 Codethink Limited
|
|
3 | 3 |
#
|
4 | 4 |
# This program is free software; you can redistribute it and/or
|
5 | 5 |
# modify it under the terms of the GNU Lesser General Public
|
... | ... | @@ -21,6 +21,7 @@ |
21 | 21 |
# Local imports
|
22 | 22 |
from . import Queue, QueueStatus
|
23 | 23 |
from ..resources import ResourceType
|
24 |
+from ..._platform import Platform
|
|
24 | 25 |
|
25 | 26 |
|
26 | 27 |
# A queue which pulls element artifacts
|
... | ... | @@ -52,18 +53,21 @@ class PullQueue(Queue): |
52 | 53 |
else:
|
53 | 54 |
return QueueStatus.SKIP
|
54 | 55 |
|
55 |
- def done(self, _, element, result, success):
|
|
56 |
- |
|
56 |
+ def done(self, job, element, result, success):
|
|
57 | 57 |
if not success:
|
58 | 58 |
return False
|
59 | 59 |
|
60 | 60 |
element._pull_done()
|
61 | 61 |
|
62 |
- # Build jobs will check the "approximate" size first. Since we
|
|
63 |
- # do not get an artifact size from pull jobs, we have to
|
|
64 |
- # actually check the cache size.
|
|
62 |
+ pulled, artifact_size = result
|
|
63 |
+ |
|
64 |
+ artifacts = Platform.get_platform().artifactcache
|
|
65 |
+ artifacts.add_artifact_size(artifact_size)
|
|
66 |
+ |
|
67 |
+ # This has to be done after _pull_done, such that the
|
|
68 |
+ # element may register its cache key as required
|
|
65 | 69 |
self._scheduler.check_cache_size()
|
66 | 70 |
|
67 | 71 |
# Element._pull() returns True if it downloaded an artifact,
|
68 | 72 |
# here we want to appear skipped if we did not download.
|
69 |
- return result
|
|
73 |
+ return pulled
|
1 | 1 |
#
|
2 |
-# Copyright (C) 2016 Codethink Limited
|
|
2 |
+# Copyright (C) 2018 Codethink Limited
|
|
3 | 3 |
#
|
4 | 4 |
# This program is free software; you can redistribute it and/or
|
5 | 5 |
# modify it under the terms of the GNU Lesser General Public
|
... | ... | @@ -28,7 +28,7 @@ from contextlib import contextmanager |
28 | 28 |
|
29 | 29 |
# Local imports
|
30 | 30 |
from .resources import Resources, ResourceType
|
31 |
-from .jobs import CacheSizeJob, CleanupJob
|
|
31 |
+from .jobs import CleanupJob
|
|
32 | 32 |
from .._platform import Platform
|
33 | 33 |
|
34 | 34 |
|
... | ... | @@ -243,22 +243,19 @@ class Scheduler(): |
243 | 243 |
|
244 | 244 |
# check_cache_size():
|
245 | 245 |
#
|
246 |
- # Queues a cache size calculation job, after the cache
|
|
247 |
- # size is calculated, a cleanup job will be run automatically
|
|
248 |
- # if needed.
|
|
249 |
- #
|
|
250 |
- # FIXME: This should ensure that only one cache size job
|
|
251 |
- # is ever pending at a given time. If a cache size
|
|
252 |
- # job is already running, it is correct to queue
|
|
253 |
- # a new one, it is incorrect to have more than one
|
|
254 |
- # of these jobs pending at a given time, though.
|
|
246 |
+ # Queues a cleanup job if the size of the artifact cache exceeded
|
|
247 |
+ # the quota
|
|
255 | 248 |
#
|
256 | 249 |
def check_cache_size(self):
|
257 |
- job = CacheSizeJob(self, 'cache_size', 'cache_size/cache_size',
|
|
258 |
- resources=[ResourceType.CACHE,
|
|
259 |
- ResourceType.PROCESS],
|
|
260 |
- complete_cb=self._run_cleanup)
|
|
261 |
- self.schedule_jobs([job])
|
|
250 |
+ artifacts = Platform.get_platform().artifactcache
|
|
251 |
+ |
|
252 |
+ if artifacts.has_get_quota_exceeded():
|
|
253 |
+ job = CleanupJob(self, 'Clean artifact cache',
|
|
254 |
+ 'cleanup/cleanup',
|
|
255 |
+ resources=[ResourceType.CACHE,
|
|
256 |
+ ResourceType.PROCESS],
|
|
257 |
+ exclusive_resources=[ResourceType.CACHE])
|
|
258 |
+ self.schedule_jobs([job])
|
|
262 | 259 |
|
263 | 260 |
#######################################################
|
264 | 261 |
# Local Private Methods #
|
... | ... | @@ -335,32 +332,6 @@ class Scheduler(): |
335 | 332 |
self.schedule_jobs(ready)
|
336 | 333 |
self._sched()
|
337 | 334 |
|
338 |
- # _run_cleanup()
|
|
339 |
- #
|
|
340 |
- # Schedules the cache cleanup job if the passed size
|
|
341 |
- # exceeds the cache quota.
|
|
342 |
- #
|
|
343 |
- # Args:
|
|
344 |
- # cache_size (int): The calculated cache size (ignored)
|
|
345 |
- #
|
|
346 |
- # NOTE: This runs in response to completion of the cache size
|
|
347 |
- # calculation job lauched by Scheduler.check_cache_size(),
|
|
348 |
- # which will report the calculated cache size.
|
|
349 |
- #
|
|
350 |
- def _run_cleanup(self, cache_size):
|
|
351 |
- platform = Platform.get_platform()
|
|
352 |
- artifacts = platform.artifactcache
|
|
353 |
- |
|
354 |
- if not artifacts.get_quota_exceeded():
|
|
355 |
- return
|
|
356 |
- |
|
357 |
- job = CleanupJob(self, 'cleanup', 'cleanup/cleanup',
|
|
358 |
- resources=[ResourceType.CACHE,
|
|
359 |
- ResourceType.PROCESS],
|
|
360 |
- exclusive_resources=[ResourceType.CACHE],
|
|
361 |
- complete_cb=None)
|
|
362 |
- self.schedule_jobs([job])
|
|
363 |
- |
|
364 | 335 |
# _suspend_jobs()
|
365 | 336 |
#
|
366 | 337 |
# Suspend all ongoing jobs.
|
... | ... | @@ -1759,7 +1759,7 @@ class Element(Plugin): |
1759 | 1759 |
display_key = self._get_brief_display_key()
|
1760 | 1760 |
self.info("Downloaded artifact {}".format(display_key))
|
1761 | 1761 |
|
1762 |
- return pulled
|
|
1762 |
+ return pulled, artifact_size
|
|
1763 | 1763 |
|
1764 | 1764 |
# _skip_push():
|
1765 | 1765 |
#
|