Tiago Gomes pushed to branch tiagogomes/issue-573 at BuildStream / buildstream
Commits:
-
6f925bcb
by Javier Jardón at 2018-09-13T07:38:48Z
-
c6155f8d
by Javier Jardón at 2018-09-13T08:11:54Z
-
19838a07
by Chandan Singh at 2018-09-13T10:58:38Z
-
3b81d451
by Chandan Singh at 2018-09-13T12:27:59Z
-
35667848
by Tiago Gomes at 2018-09-13T16:30:53Z
-
1eaf9053
by Tiago Gomes at 2018-09-13T16:30:53Z
-
4fbb3b56
by Tiago Gomes at 2018-09-13T16:30:53Z
-
9661950f
by Tiago Gomes at 2018-09-13T16:30:53Z
-
7cf9ff14
by Tiago Gomes at 2018-09-13T16:30:53Z
-
3f0a51eb
by Tiago Gomes at 2018-09-13T16:30:53Z
-
5b2bae38
by Tiago Gomes at 2018-09-13T16:30:53Z
-
33df74ed
by Tiago Gomes at 2018-09-13T16:30:53Z
-
16a5a151
by Tiago Gomes at 2018-09-13T16:30:53Z
-
2761b164
by Tiago Gomes at 2018-09-13T16:30:53Z
-
0c781fcc
by Tiago Gomes at 2018-09-13T16:30:53Z
-
5ec2e266
by Tiago Gomes at 2018-09-13T16:30:53Z
-
68c56564
by Tiago Gomes at 2018-09-13T16:30:53Z
14 changed files:
- README.rst
- buildstream/_artifactcache/artifactcache.py
- buildstream/_artifactcache/cascache.py
- buildstream/_artifactcache/casserver.py
- buildstream/_context.py
- buildstream/_scheduler/jobs/__init__.py
- − buildstream/_scheduler/jobs/cachesizejob.py
- buildstream/_scheduler/jobs/cleanupjob.py
- buildstream/_scheduler/queues/buildqueue.py
- buildstream/_scheduler/queues/pullqueue.py
- buildstream/_scheduler/scheduler.py
- buildstream/element.py
- buildstream/storage/_casbaseddirectory.py
- doc/source/install_source.rst
Changes:
... | ... | @@ -13,6 +13,9 @@ About |
13 | 13 |
.. image:: https://gitlab.com/BuildStream/buildstream/badges/master/coverage.svg?job=coverage
|
14 | 14 |
:target: https://gitlab.com/BuildStream/buildstream/commits/master
|
15 | 15 |
|
16 |
+.. image:: https://img.shields.io/pypi/v/BuildStream.svg
|
|
17 |
+ :target: https://pypi.org/project/BuildStream
|
|
18 |
+ |
|
16 | 19 |
|
17 | 20 |
What is BuildStream?
|
18 | 21 |
====================
|
1 | 1 |
#
|
2 |
-# Copyright (C) 2017-2018 Codethink Limited
|
|
2 |
+# Copyright (C) 2018 Codethink Limited
|
|
3 | 3 |
#
|
4 | 4 |
# This program is free software; you can redistribute it and/or
|
5 | 5 |
# modify it under the terms of the GNU Lesser General Public
|
... | ... | @@ -16,6 +16,7 @@ |
16 | 16 |
#
|
17 | 17 |
# Authors:
|
18 | 18 |
# Tristan Maat <tristan maat codethink co uk>
|
19 |
+# Tiago Gomes <tiago gomes codethink co uk>
|
|
19 | 20 |
|
20 | 21 |
import os
|
21 | 22 |
import string
|
... | ... | @@ -88,7 +89,7 @@ class ArtifactCache(): |
88 | 89 |
self.project_remote_specs = {}
|
89 | 90 |
|
90 | 91 |
self._required_artifacts = set() # The artifacts required for this session
|
91 |
- self._cache_size = None # The current cache size, sometimes it's an estimate
|
|
92 |
+ self._cache_size = None # The current cache size
|
|
92 | 93 |
self._cache_quota = None # The cache quota
|
93 | 94 |
self._cache_lower_threshold = None # The target cache size for a cleanup
|
94 | 95 |
|
... | ... | @@ -226,13 +227,13 @@ class ArtifactCache(): |
226 | 227 |
# Clean the artifact cache as much as possible.
|
227 | 228 |
#
|
228 | 229 |
# Returns:
|
229 |
- # (int): The size of the cache after having cleaned up
|
|
230 |
+ # (int): Amount of bytes cleaned from the cache.
|
|
230 | 231 |
#
|
231 | 232 |
def clean(self):
|
232 | 233 |
artifacts = self.list_artifacts()
|
233 | 234 |
|
234 | 235 |
# Do a real computation of the cache size once, just in case
|
235 |
- self.compute_cache_size()
|
|
236 |
+ old_cache_size = self._cache_size = self.calculate_cache_size()
|
|
236 | 237 |
|
237 | 238 |
while self.get_cache_size() >= self._cache_lower_threshold:
|
238 | 239 |
try:
|
... | ... | @@ -248,7 +249,7 @@ class ArtifactCache(): |
248 | 249 |
"Please increase the cache-quota in {}."
|
249 | 250 |
.format(self.context.config_origin or default_conf))
|
250 | 251 |
|
251 |
- if self.get_quota_exceeded():
|
|
252 |
+ if self.has_quota_exceeded():
|
|
252 | 253 |
raise ArtifactError("Cache too full. Aborting.",
|
253 | 254 |
detail=detail,
|
254 | 255 |
reason="cache-too-full")
|
... | ... | @@ -260,89 +261,65 @@ class ArtifactCache(): |
260 | 261 |
|
261 | 262 |
# Remove the actual artifact, if it's not required.
|
262 | 263 |
size = self.remove(to_remove)
|
264 |
+ self._cache_size -= size
|
|
265 |
+ self._message(MessageType.DEBUG,
|
|
266 |
+ "Removed artifact {} ({})".format(
|
|
267 |
+ to_remove[:-(len(key) - self.context.log_key_length)],
|
|
268 |
+ utils._pretty_size(size)))
|
|
263 | 269 |
|
264 |
- # Remove the size from the removed size
|
|
265 |
- self.set_cache_size(self._cache_size - size)
|
|
270 |
+ self._message(MessageType.INFO,
|
|
271 |
+ "New artifact cache size: {}".format(
|
|
272 |
+ utils._pretty_size(self._cache_size)))
|
|
266 | 273 |
|
267 |
- # This should be O(1) if implemented correctly
|
|
268 |
- return self.get_cache_size()
|
|
274 |
+ return old_cache_size - self._cache_size
|
|
269 | 275 |
|
270 |
- # compute_cache_size()
|
|
276 |
+ # add_artifact_size()
|
|
271 | 277 |
#
|
272 |
- # Computes the real artifact cache size by calling
|
|
273 |
- # the abstract calculate_cache_size() method.
|
|
278 |
+ # Adds given artifact size to the cache size
|
|
274 | 279 |
#
|
275 |
- # Returns:
|
|
276 |
- # (int): The size of the artifact cache.
|
|
280 |
+ # Args:
|
|
281 |
+ # artifact_size (int): The artifact size to add.
|
|
277 | 282 |
#
|
278 |
- def compute_cache_size(self):
|
|
279 |
- self._cache_size = self.calculate_cache_size()
|
|
283 |
+ def add_artifact_size(self, artifact_size):
|
|
284 |
+ assert utils._is_main_process()
|
|
280 | 285 |
|
281 |
- return self._cache_size
|
|
286 |
+ self._cache_size = self.get_cache_size() + artifact_size
|
|
287 |
+ self._write_cache_size(self._cache_size)
|
|
282 | 288 |
|
283 |
- # add_artifact_size()
|
|
289 |
+ # subtract_artifact_size()
|
|
284 | 290 |
#
|
285 |
- # Adds the reported size of a newly cached artifact to the
|
|
286 |
- # overall estimated size.
|
|
291 |
+ # Subtracts given artifact size from the cache size
|
|
287 | 292 |
#
|
288 | 293 |
# Args:
|
289 |
- # artifact_size (int): The size to add.
|
|
294 |
+ # artifact_size (int): The artifact size to subtract.
|
|
290 | 295 |
#
|
291 |
- def add_artifact_size(self, artifact_size):
|
|
292 |
- cache_size = self.get_cache_size()
|
|
293 |
- cache_size += artifact_size
|
|
294 |
- |
|
295 |
- self.set_cache_size(cache_size)
|
|
296 |
+ def subtract_artifact_size(self, artifact_size):
|
|
297 |
+ self.add_artifact_size(artifact_size * -1)
|
|
296 | 298 |
|
297 | 299 |
# get_cache_size()
|
298 | 300 |
#
|
299 |
- # Fetches the cached size of the cache, this is sometimes
|
|
300 |
- # an estimate and periodically adjusted to the real size
|
|
301 |
- # when a cache size calculation job runs.
|
|
302 |
- #
|
|
303 |
- # When it is an estimate, the value is either correct, or
|
|
304 |
- # it is greater than the actual cache size.
|
|
301 |
+ # Returns the size of the artifact cache.
|
|
305 | 302 |
#
|
306 | 303 |
# Returns:
|
307 |
- # (int) An approximation of the artifact cache size.
|
|
304 |
+ # (int): The size of the artifact cache.
|
|
308 | 305 |
#
|
309 | 306 |
def get_cache_size(self):
|
307 |
+ if self._cache_size is None:
|
|
308 |
+ self._cache_size = self._read_cache_size()
|
|
310 | 309 |
|
311 |
- # If we don't currently have an estimate, figure out the real cache size.
|
|
312 | 310 |
if self._cache_size is None:
|
313 |
- stored_size = self._read_cache_size()
|
|
314 |
- if stored_size is not None:
|
|
315 |
- self._cache_size = stored_size
|
|
316 |
- else:
|
|
317 |
- self.compute_cache_size()
|
|
311 |
+ self._cache_size = self.calculate_cache_size()
|
|
318 | 312 |
|
319 | 313 |
return self._cache_size
|
320 | 314 |
|
321 |
- # set_cache_size()
|
|
322 |
- #
|
|
323 |
- # Forcefully set the overall cache size.
|
|
324 |
- #
|
|
325 |
- # This is used to update the size in the main process after
|
|
326 |
- # having calculated in a cleanup or a cache size calculation job.
|
|
327 |
- #
|
|
328 |
- # Args:
|
|
329 |
- # cache_size (int): The size to set.
|
|
330 |
- #
|
|
331 |
- def set_cache_size(self, cache_size):
|
|
332 |
- |
|
333 |
- assert cache_size is not None
|
|
334 |
- |
|
335 |
- self._cache_size = cache_size
|
|
336 |
- self._write_cache_size(self._cache_size)
|
|
337 |
- |
|
338 |
- # get_quota_exceeded()
|
|
315 |
+ # has_quota_exceeded()
|
|
339 | 316 |
#
|
340 | 317 |
# Checks if the current artifact cache size exceeds the quota.
|
341 | 318 |
#
|
342 | 319 |
# Returns:
|
343 | 320 |
# (bool): True of the quota is exceeded
|
344 | 321 |
#
|
345 |
- def get_quota_exceeded(self):
|
|
322 |
+ def has_quota_exceeded(self):
|
|
346 | 323 |
return self.get_cache_size() > self._cache_quota
|
347 | 324 |
|
348 | 325 |
################################################
|
... | ... | @@ -441,6 +418,10 @@ class ArtifactCache(): |
441 | 418 |
# content (str): The element's content directory
|
442 | 419 |
# keys (list): The cache keys to use
|
443 | 420 |
#
|
421 |
+ # Returns:
|
|
422 |
+ # (int): Disk size overhead in bytes required to cache the
|
|
423 |
+ # artifact
|
|
424 |
+ #
|
|
444 | 425 |
def commit(self, element, content, keys):
|
445 | 426 |
raise ImplError("Cache '{kind}' does not implement commit()"
|
446 | 427 |
.format(kind=type(self).__name__))
|
... | ... | @@ -512,8 +493,9 @@ class ArtifactCache(): |
512 | 493 |
# progress (callable): The progress callback, if any
|
513 | 494 |
#
|
514 | 495 |
# Returns:
|
515 |
- # (bool): True if pull was successful, False if artifact was not available
|
|
516 |
- #
|
|
496 |
+ # (bool): True if pull was successful, False if artifact was not available
|
|
497 |
+ # (int): Disk size overhead in bytes required to cache the
|
|
498 |
+ # artifact
|
|
517 | 499 |
def pull(self, element, key, *, progress=None):
|
518 | 500 |
raise ImplError("Cache '{kind}' does not implement pull()"
|
519 | 501 |
.format(kind=type(self).__name__))
|
... | ... | @@ -16,6 +16,7 @@ |
16 | 16 |
#
|
17 | 17 |
# Authors:
|
18 | 18 |
# Jürg Billeter <juerg billeter codethink co uk>
|
19 |
+# Tiago Gomes <tiago gomes codethink co uk>
|
|
19 | 20 |
|
20 | 21 |
import hashlib
|
21 | 22 |
import itertools
|
... | ... | @@ -117,11 +118,13 @@ class CASCache(ArtifactCache): |
117 | 118 |
def commit(self, element, content, keys):
|
118 | 119 |
refs = [self.get_artifact_fullname(element, key) for key in keys]
|
119 | 120 |
|
120 |
- tree = self._create_tree(content)
|
|
121 |
+ tree, size = self._commit_directory(content)
|
|
121 | 122 |
|
122 | 123 |
for ref in refs:
|
123 | 124 |
self.set_ref(ref, tree)
|
124 | 125 |
|
126 |
+ return size
|
|
127 |
+ |
|
125 | 128 |
def diff(self, element, key_a, key_b, *, subdir=None):
|
126 | 129 |
ref_a = self.get_artifact_fullname(element, key_a)
|
127 | 130 |
ref_b = self.get_artifact_fullname(element, key_b)
|
... | ... | @@ -239,12 +242,12 @@ class CASCache(ArtifactCache): |
239 | 242 |
tree.hash = response.digest.hash
|
240 | 243 |
tree.size_bytes = response.digest.size_bytes
|
241 | 244 |
|
242 |
- self._fetch_directory(remote, tree)
|
|
245 |
+ size = self._fetch_directory(remote, tree)
|
|
243 | 246 |
|
244 | 247 |
self.set_ref(ref, tree)
|
245 | 248 |
|
246 | 249 |
# no need to pull from additional remotes
|
247 |
- return True
|
|
250 |
+ return True, size
|
|
248 | 251 |
|
249 | 252 |
except grpc.RpcError as e:
|
250 | 253 |
if e.code() != grpc.StatusCode.NOT_FOUND:
|
... | ... | @@ -258,7 +261,7 @@ class CASCache(ArtifactCache): |
258 | 261 |
remote.spec.url, element._get_brief_display_key())
|
259 | 262 |
))
|
260 | 263 |
|
261 |
- return False
|
|
264 |
+ return False, 0
|
|
262 | 265 |
|
263 | 266 |
def pull_tree(self, project, digest):
|
264 | 267 |
""" Pull a single Tree rather than an artifact.
|
... | ... | @@ -437,6 +440,7 @@ class CASCache(ArtifactCache): |
437 | 440 |
#
|
438 | 441 |
# Returns:
|
439 | 442 |
# (Digest): The digest of the added object
|
443 |
+ # (int): The amount of bytes required to store the object
|
|
440 | 444 |
#
|
441 | 445 |
# Either `path` or `buffer` must be passed, but not both.
|
442 | 446 |
#
|
... | ... | @@ -465,22 +469,38 @@ class CASCache(ArtifactCache): |
465 | 469 |
|
466 | 470 |
out.flush()
|
467 | 471 |
|
472 |
+ file_size = os.fstat(out.fileno()).st_size
|
|
473 |
+ |
|
468 | 474 |
digest.hash = h.hexdigest()
|
469 |
- digest.size_bytes = os.fstat(out.fileno()).st_size
|
|
475 |
+ digest.size_bytes = file_size
|
|
470 | 476 |
|
471 | 477 |
# Place file at final location
|
472 | 478 |
objpath = self.objpath(digest)
|
473 |
- os.makedirs(os.path.dirname(objpath), exist_ok=True)
|
|
479 |
+ dirpath = os.path.dirname(objpath)
|
|
480 |
+ |
|
481 |
+ # Track the increased size on the parent directory caused by
|
|
482 |
+ # adding a new entry, as these directories can contain a large
|
|
483 |
+ # number of files.
|
|
484 |
+ new_dir_size = 0
|
|
485 |
+ old_dir_size = 0
|
|
486 |
+ try:
|
|
487 |
+ os.makedirs(dirpath)
|
|
488 |
+ except FileExistsError:
|
|
489 |
+ old_dir_size = os.stat(dirpath).st_size
|
|
490 |
+ else:
|
|
491 |
+ new_dir_size = os.stat(dirpath).st_size
|
|
492 |
+ |
|
474 | 493 |
os.link(out.name, objpath)
|
494 |
+ new_dir_size = os.stat(dirpath).st_size - old_dir_size
|
|
475 | 495 |
|
476 | 496 |
except FileExistsError as e:
|
477 | 497 |
# We can ignore the failed link() if the object is already in the repo.
|
478 |
- pass
|
|
498 |
+ file_size = 0
|
|
479 | 499 |
|
480 | 500 |
except OSError as e:
|
481 | 501 |
raise ArtifactError("Failed to hash object: {}".format(e)) from e
|
482 | 502 |
|
483 |
- return digest
|
|
503 |
+ return digest, file_size + new_dir_size
|
|
484 | 504 |
|
485 | 505 |
# set_ref():
|
486 | 506 |
#
|
... | ... | @@ -489,6 +509,8 @@ class CASCache(ArtifactCache): |
489 | 509 |
# Args:
|
490 | 510 |
# ref (str): The name of the ref
|
491 | 511 |
#
|
512 |
+ # Note: as setting a ref has very low disk size overhead, don't
|
|
513 |
+ # bother to track this.
|
|
492 | 514 |
def set_ref(self, ref, tree):
|
493 | 515 |
refpath = self._refpath(ref)
|
494 | 516 |
os.makedirs(os.path.dirname(refpath), exist_ok=True)
|
... | ... | @@ -665,7 +687,23 @@ class CASCache(ArtifactCache): |
665 | 687 |
def _refpath(self, ref):
|
666 | 688 |
return os.path.join(self.casdir, 'refs', 'heads', ref)
|
667 | 689 |
|
668 |
- def _create_tree(self, path, *, digest=None):
|
|
690 |
+ # _commit_directory():
|
|
691 |
+ #
|
|
692 |
+ # Adds contents of the given directory to content addressable store.
|
|
693 |
+ #
|
|
694 |
+ # Adds files, symbolic links and recursively other directories in
|
|
695 |
+ # the given path to the content addressable store.
|
|
696 |
+ #
|
|
697 |
+ # Args:
|
|
698 |
+ # path (str): Path to the directory to add.
|
|
699 |
+ # dir_digest (Digest): An optional Digest object to use.
|
|
700 |
+ #
|
|
701 |
+ # Returns:
|
|
702 |
+ # (Digest): Digest object for the Directory object on the given
|
|
703 |
+ # path.
|
|
704 |
+ #
|
|
705 |
+ def _commit_directory(self, path, *, dir_digest=None):
|
|
706 |
+ size = 0
|
|
669 | 707 |
directory = remote_execution_pb2.Directory()
|
670 | 708 |
|
671 | 709 |
for name in sorted(os.listdir(path)):
|
... | ... | @@ -674,11 +712,11 @@ class CASCache(ArtifactCache): |
674 | 712 |
if stat.S_ISDIR(mode):
|
675 | 713 |
dirnode = directory.directories.add()
|
676 | 714 |
dirnode.name = name
|
677 |
- self._create_tree(full_path, digest=dirnode.digest)
|
|
715 |
+ size += self._commit_directory(full_path, dir_digest=dirnode.digest)[1]
|
|
678 | 716 |
elif stat.S_ISREG(mode):
|
679 | 717 |
filenode = directory.files.add()
|
680 | 718 |
filenode.name = name
|
681 |
- self.add_object(path=full_path, digest=filenode.digest)
|
|
719 |
+ size += self.add_object(path=full_path, digest=filenode.digest)[1]
|
|
682 | 720 |
filenode.is_executable = (mode & stat.S_IXUSR) == stat.S_IXUSR
|
683 | 721 |
elif stat.S_ISLNK(mode):
|
684 | 722 |
symlinknode = directory.symlinks.add()
|
... | ... | @@ -687,7 +725,10 @@ class CASCache(ArtifactCache): |
687 | 725 |
else:
|
688 | 726 |
raise ArtifactError("Unsupported file type for {}".format(full_path))
|
689 | 727 |
|
690 |
- return self.add_object(digest=digest, buffer=directory.SerializeToString())
|
|
728 |
+ dir_digest, dir_object_size = self.add_object(
|
|
729 |
+ digest=dir_digest, buffer=directory.SerializeToString())
|
|
730 |
+ |
|
731 |
+ return dir_digest, size + dir_object_size
|
|
691 | 732 |
|
692 | 733 |
def _get_subdir(self, tree, subdir):
|
693 | 734 |
head, name = os.path.split(subdir)
|
... | ... | @@ -830,14 +871,27 @@ class CASCache(ArtifactCache): |
830 | 871 |
|
831 | 872 |
assert digest.size_bytes == os.fstat(stream.fileno()).st_size
|
832 | 873 |
|
833 |
- def _fetch_directory(self, remote, tree):
|
|
834 |
- objpath = self.objpath(tree)
|
|
874 |
+ # _fetch_directory():
|
|
875 |
+ #
|
|
876 |
+ # Fetches a given directory to content addressable store.
|
|
877 |
+ #
|
|
878 |
+ # Fetches files, symbolic links and recursively other directories in
|
|
879 |
+ # the remote directory to the content addressable store.
|
|
880 |
+ #
|
|
881 |
+ # Args:
|
|
882 |
+ # remote (Remote): The remote to use.
|
|
883 |
+ # dir_digest (Digest): Digest object for the Directory object to
|
|
884 |
+ # fetch.
|
|
885 |
+ #
|
|
886 |
+ def _fetch_directory(self, remote, dir_digest):
|
|
887 |
+ size = 0
|
|
888 |
+ objpath = self.objpath(dir_digest)
|
|
835 | 889 |
if os.path.exists(objpath):
|
836 | 890 |
# already in local cache
|
837 |
- return
|
|
891 |
+ return 0
|
|
838 | 892 |
|
839 | 893 |
with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
|
840 |
- self._fetch_blob(remote, tree, out)
|
|
894 |
+ self._fetch_blob(remote, dir_digest, out)
|
|
841 | 895 |
|
842 | 896 |
directory = remote_execution_pb2.Directory()
|
843 | 897 |
|
... | ... | @@ -845,7 +899,7 @@ class CASCache(ArtifactCache): |
845 | 899 |
directory.ParseFromString(f.read())
|
846 | 900 |
|
847 | 901 |
for filenode in directory.files:
|
848 |
- fileobjpath = self.objpath(tree)
|
|
902 |
+ fileobjpath = self.objpath(filenode.digest)
|
|
849 | 903 |
if os.path.exists(fileobjpath):
|
850 | 904 |
# already in local cache
|
851 | 905 |
continue
|
... | ... | @@ -853,16 +907,23 @@ class CASCache(ArtifactCache): |
853 | 907 |
with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
|
854 | 908 |
self._fetch_blob(remote, filenode.digest, f)
|
855 | 909 |
|
856 |
- digest = self.add_object(path=f.name)
|
|
910 |
+ digest, obj_size = self.add_object(path=f.name)
|
|
911 |
+ size += obj_size
|
|
857 | 912 |
assert digest.hash == filenode.digest.hash
|
858 | 913 |
|
859 | 914 |
for dirnode in directory.directories:
|
860 |
- self._fetch_directory(remote, dirnode.digest)
|
|
915 |
+ size += self._fetch_directory(remote, dirnode.digest)
|
|
916 |
+ |
|
917 |
+ # Place directory blob only in final location when we've
|
|
918 |
+ # downloaded all referenced blobs to avoid dangling
|
|
919 |
+ # references in the repository.
|
|
920 |
+ digest, obj_size = self.add_object(path=out.name)
|
|
921 |
+ |
|
922 |
+ assert digest.hash == dir_digest.hash
|
|
923 |
+ |
|
924 |
+ size += obj_size
|
|
861 | 925 |
|
862 |
- # place directory blob only in final location when we've downloaded
|
|
863 |
- # all referenced blobs to avoid dangling references in the repository
|
|
864 |
- digest = self.add_object(path=out.name)
|
|
865 |
- assert digest.hash == tree.hash
|
|
926 |
+ return size
|
|
866 | 927 |
|
867 | 928 |
def _fetch_tree(self, remote, digest):
|
868 | 929 |
# download but do not store the Tree object
|
... | ... | @@ -885,13 +946,13 @@ class CASCache(ArtifactCache): |
885 | 946 |
with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
|
886 | 947 |
self._fetch_blob(remote, filenode.digest, f)
|
887 | 948 |
|
888 |
- added_digest = self.add_object(path=f.name)
|
|
949 |
+ added_digest = self.add_object(path=f.name)[0]
|
|
889 | 950 |
assert added_digest.hash == filenode.digest.hash
|
890 | 951 |
|
891 | 952 |
# place directory blob only in final location when we've downloaded
|
892 | 953 |
# all referenced blobs to avoid dangling references in the repository
|
893 | 954 |
dirbuffer = directory.SerializeToString()
|
894 |
- dirdigest = self.add_object(buffer=dirbuffer)
|
|
955 |
+ dirdigest = self.add_object(buffer=dirbuffer)[0]
|
|
895 | 956 |
assert dirdigest.size_bytes == len(dirbuffer)
|
896 | 957 |
|
897 | 958 |
return dirdigest
|
... | ... | @@ -210,7 +210,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
210 | 210 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
211 | 211 |
return response
|
212 | 212 |
out.flush()
|
213 |
- digest = self.cas.add_object(path=out.name)
|
|
213 |
+ digest = self.cas.add_object(path=out.name)[0]
|
|
214 | 214 |
if digest.hash != client_digest.hash:
|
215 | 215 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
216 | 216 |
return response
|
... | ... | @@ -119,7 +119,6 @@ class Context(): |
119 | 119 |
self._log_handle = None
|
120 | 120 |
self._log_filename = None
|
121 | 121 |
self.config_cache_quota = 'infinity'
|
122 |
- self.artifactdir_volume = None
|
|
123 | 122 |
|
124 | 123 |
# load()
|
125 | 124 |
#
|
1 |
+#
|
|
2 |
+# Copyright (C) 2018 Codethink Limited
|
|
3 |
+#
|
|
4 |
+# This program is free software; you can redistribute it and/or
|
|
5 |
+# modify it under the terms of the GNU Lesser General Public
|
|
6 |
+# License as published by the Free Software Foundation; either
|
|
7 |
+# version 2 of the License, or (at your option) any later version.
|
|
8 |
+#
|
|
9 |
+# This library is distributed in the hope that it will be useful,
|
|
10 |
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
11 |
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
12 |
+# Lesser General Public License for more details.
|
|
13 |
+#
|
|
14 |
+# You should have received a copy of the GNU Lesser General Public
|
|
15 |
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
|
|
16 |
+#
|
|
17 |
+# Authors:
|
|
18 |
+# Tristan Maat <tristan maat codethink co uk>
|
|
19 |
+ |
|
1 | 20 |
from .elementjob import ElementJob
|
2 |
-from .cachesizejob import CacheSizeJob
|
|
3 | 21 |
from .cleanupjob import CleanupJob
|
1 |
-# Copyright (C) 2018 Codethink Limited
|
|
2 |
-#
|
|
3 |
-# This program is free software; you can redistribute it and/or
|
|
4 |
-# modify it under the terms of the GNU Lesser General Public
|
|
5 |
-# License as published by the Free Software Foundation; either
|
|
6 |
-# version 2 of the License, or (at your option) any later version.
|
|
7 |
-#
|
|
8 |
-# This library is distributed in the hope that it will be useful,
|
|
9 |
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
10 |
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
11 |
-# Lesser General Public License for more details.
|
|
12 |
-#
|
|
13 |
-# You should have received a copy of the GNU Lesser General Public
|
|
14 |
-# License along with this library. If not, see <http://www.gnu.org/licenses/>.
|
|
15 |
-#
|
|
16 |
-# Author:
|
|
17 |
-# Tristan Daniël Maat <tristan maat codethink co uk>
|
|
18 |
-#
|
|
19 |
-from .job import Job
|
|
20 |
-from ..._platform import Platform
|
|
21 |
- |
|
22 |
- |
|
23 |
-class CacheSizeJob(Job):
|
|
24 |
- def __init__(self, *args, complete_cb, **kwargs):
|
|
25 |
- super().__init__(*args, **kwargs)
|
|
26 |
- self._complete_cb = complete_cb
|
|
27 |
- |
|
28 |
- platform = Platform.get_platform()
|
|
29 |
- self._artifacts = platform.artifactcache
|
|
30 |
- |
|
31 |
- def child_process(self):
|
|
32 |
- return self._artifacts.compute_cache_size()
|
|
33 |
- |
|
34 |
- def parent_complete(self, success, result):
|
|
35 |
- if success:
|
|
36 |
- self._artifacts.set_cache_size(result)
|
|
37 |
- |
|
38 |
- if self._complete_cb:
|
|
39 |
- self._complete_cb(result)
|
|
40 |
- |
|
41 |
- def child_process_data(self):
|
|
42 |
- return {}
|
... | ... | @@ -21,9 +21,8 @@ from ..._platform import Platform |
21 | 21 |
|
22 | 22 |
|
23 | 23 |
class CleanupJob(Job):
|
24 |
- def __init__(self, *args, complete_cb, **kwargs):
|
|
24 |
+ def __init__(self, *args, **kwargs):
|
|
25 | 25 |
super().__init__(*args, **kwargs)
|
26 |
- self._complete_cb = complete_cb
|
|
27 | 26 |
|
28 | 27 |
platform = Platform.get_platform()
|
29 | 28 |
self._artifacts = platform.artifactcache
|
... | ... | @@ -33,10 +32,4 @@ class CleanupJob(Job): |
33 | 32 |
|
34 | 33 |
def parent_complete(self, success, result):
|
35 | 34 |
if success:
|
36 |
- self._artifacts.set_cache_size(result)
|
|
37 |
- |
|
38 |
- if self._complete_cb:
|
|
39 |
- self._complete_cb()
|
|
40 |
- |
|
41 |
- def child_process_data(self):
|
|
42 |
- return {}
|
|
35 |
+ self._artifacts.subtract_artifact_size(result)
|
1 | 1 |
#
|
2 |
-# Copyright (C) 2016 Codethink Limited
|
|
2 |
+# Copyright (C) 2018 Codethink Limited
|
|
3 | 3 |
#
|
4 | 4 |
# This program is free software; you can redistribute it and/or
|
5 | 5 |
# modify it under the terms of the GNU Lesser General Public
|
... | ... | @@ -87,31 +87,17 @@ class BuildQueue(Queue): |
87 | 87 |
|
88 | 88 |
return QueueStatus.READY
|
89 | 89 |
|
90 |
- def _check_cache_size(self, job, element, artifact_size):
|
|
91 |
- |
|
92 |
- # After completing a build job, add the artifact size
|
|
93 |
- # as returned from Element._assemble() to the estimated
|
|
94 |
- # artifact cache size
|
|
95 |
- #
|
|
96 |
- platform = Platform.get_platform()
|
|
97 |
- artifacts = platform.artifactcache
|
|
98 |
- |
|
99 |
- artifacts.add_artifact_size(artifact_size)
|
|
100 |
- |
|
101 |
- # If the estimated size outgrows the quota, ask the scheduler
|
|
102 |
- # to queue a job to actually check the real cache size.
|
|
103 |
- #
|
|
104 |
- if artifacts.get_quota_exceeded():
|
|
105 |
- self._scheduler.check_cache_size()
|
|
106 |
- |
|
107 | 90 |
def done(self, job, element, result, success):
|
91 |
+ if not success:
|
|
92 |
+ return False
|
|
93 |
+ |
|
94 |
+ element._assemble_done()
|
|
108 | 95 |
|
109 |
- if success:
|
|
110 |
- # Inform element in main process that assembly is done
|
|
111 |
- element._assemble_done()
|
|
96 |
+ artifacts = Platform.get_platform().artifactcache
|
|
97 |
+ artifacts.add_artifact_size(result)
|
|
112 | 98 |
|
113 |
- # This has to be done after _assemble_done, such that the
|
|
114 |
- # element may register its cache key as required
|
|
115 |
- self._check_cache_size(job, element, result)
|
|
99 |
+ # This has to be done after _assemble_done, such that the
|
|
100 |
+ # element may register its cache key as required
|
|
101 |
+ self._scheduler.check_cache_size()
|
|
116 | 102 |
|
117 |
- return True
|
|
103 |
+ return success
|
1 | 1 |
#
|
2 |
-# Copyright (C) 2016 Codethink Limited
|
|
2 |
+# Copyright (C) 2018 Codethink Limited
|
|
3 | 3 |
#
|
4 | 4 |
# This program is free software; you can redistribute it and/or
|
5 | 5 |
# modify it under the terms of the GNU Lesser General Public
|
... | ... | @@ -21,6 +21,7 @@ |
21 | 21 |
# Local imports
|
22 | 22 |
from . import Queue, QueueStatus
|
23 | 23 |
from ..resources import ResourceType
|
24 |
+from ..._platform import Platform
|
|
24 | 25 |
|
25 | 26 |
|
26 | 27 |
# A queue which pulls element artifacts
|
... | ... | @@ -52,18 +53,21 @@ class PullQueue(Queue): |
52 | 53 |
else:
|
53 | 54 |
return QueueStatus.SKIP
|
54 | 55 |
|
55 |
- def done(self, _, element, result, success):
|
|
56 |
- |
|
56 |
+ def done(self, job, element, result, success):
|
|
57 | 57 |
if not success:
|
58 | 58 |
return False
|
59 | 59 |
|
60 | 60 |
element._pull_done()
|
61 | 61 |
|
62 |
- # Build jobs will check the "approximate" size first. Since we
|
|
63 |
- # do not get an artifact size from pull jobs, we have to
|
|
64 |
- # actually check the cache size.
|
|
62 |
+ pulled, artifact_size = result
|
|
63 |
+ |
|
64 |
+ artifacts = Platform.get_platform().artifactcache
|
|
65 |
+ artifacts.add_artifact_size(artifact_size)
|
|
66 |
+ |
|
67 |
+ # This has to be done after _pull_done, such that the
|
|
68 |
+ # element may register its cache key as required
|
|
65 | 69 |
self._scheduler.check_cache_size()
|
66 | 70 |
|
67 | 71 |
# Element._pull() returns True if it downloaded an artifact,
|
68 | 72 |
# here we want to appear skipped if we did not download.
|
69 |
- return result
|
|
73 |
+ return pulled
|
1 | 1 |
#
|
2 |
-# Copyright (C) 2016 Codethink Limited
|
|
2 |
+# Copyright (C) 2018 Codethink Limited
|
|
3 | 3 |
#
|
4 | 4 |
# This program is free software; you can redistribute it and/or
|
5 | 5 |
# modify it under the terms of the GNU Lesser General Public
|
... | ... | @@ -28,7 +28,7 @@ from contextlib import contextmanager |
28 | 28 |
|
29 | 29 |
# Local imports
|
30 | 30 |
from .resources import Resources, ResourceType
|
31 |
-from .jobs import CacheSizeJob, CleanupJob
|
|
31 |
+from .jobs import CleanupJob
|
|
32 | 32 |
from .._platform import Platform
|
33 | 33 |
|
34 | 34 |
|
... | ... | @@ -243,22 +243,19 @@ class Scheduler(): |
243 | 243 |
|
244 | 244 |
# check_cache_size():
|
245 | 245 |
#
|
246 |
- # Queues a cache size calculation job, after the cache
|
|
247 |
- # size is calculated, a cleanup job will be run automatically
|
|
248 |
- # if needed.
|
|
249 |
- #
|
|
250 |
- # FIXME: This should ensure that only one cache size job
|
|
251 |
- # is ever pending at a given time. If a cache size
|
|
252 |
- # job is already running, it is correct to queue
|
|
253 |
- # a new one, it is incorrect to have more than one
|
|
254 |
- # of these jobs pending at a given time, though.
|
|
246 |
+ # Queues a cleanup job if the size of the artifact cache exceeded
|
|
247 |
+ # the quota
|
|
255 | 248 |
#
|
256 | 249 |
def check_cache_size(self):
|
257 |
- job = CacheSizeJob(self, 'cache_size', 'cache_size/cache_size',
|
|
258 |
- resources=[ResourceType.CACHE,
|
|
259 |
- ResourceType.PROCESS],
|
|
260 |
- complete_cb=self._run_cleanup)
|
|
261 |
- self.schedule_jobs([job])
|
|
250 |
+ artifacts = Platform.get_platform().artifactcache
|
|
251 |
+ |
|
252 |
+ if artifacts.has_quota_exceeded():
|
|
253 |
+ job = CleanupJob(self, 'Clean artifact cache',
|
|
254 |
+ 'cleanup/cleanup',
|
|
255 |
+ resources=[ResourceType.CACHE,
|
|
256 |
+ ResourceType.PROCESS],
|
|
257 |
+ exclusive_resources=[ResourceType.CACHE])
|
|
258 |
+ self.schedule_jobs([job])
|
|
262 | 259 |
|
263 | 260 |
#######################################################
|
264 | 261 |
# Local Private Methods #
|
... | ... | @@ -335,32 +332,6 @@ class Scheduler(): |
335 | 332 |
self.schedule_jobs(ready)
|
336 | 333 |
self._sched()
|
337 | 334 |
|
338 |
- # _run_cleanup()
|
|
339 |
- #
|
|
340 |
- # Schedules the cache cleanup job if the passed size
|
|
341 |
- # exceeds the cache quota.
|
|
342 |
- #
|
|
343 |
- # Args:
|
|
344 |
- # cache_size (int): The calculated cache size (ignored)
|
|
345 |
- #
|
|
346 |
- # NOTE: This runs in response to completion of the cache size
|
|
347 |
- # calculation job lauched by Scheduler.check_cache_size(),
|
|
348 |
- # which will report the calculated cache size.
|
|
349 |
- #
|
|
350 |
- def _run_cleanup(self, cache_size):
|
|
351 |
- platform = Platform.get_platform()
|
|
352 |
- artifacts = platform.artifactcache
|
|
353 |
- |
|
354 |
- if not artifacts.get_quota_exceeded():
|
|
355 |
- return
|
|
356 |
- |
|
357 |
- job = CleanupJob(self, 'cleanup', 'cleanup/cleanup',
|
|
358 |
- resources=[ResourceType.CACHE,
|
|
359 |
- ResourceType.PROCESS],
|
|
360 |
- exclusive_resources=[ResourceType.CACHE],
|
|
361 |
- complete_cb=None)
|
|
362 |
- self.schedule_jobs([job])
|
|
363 |
- |
|
364 | 335 |
# _suspend_jobs()
|
365 | 336 |
#
|
366 | 337 |
# Suspend all ongoing jobs.
|
... | ... | @@ -1657,8 +1657,8 @@ class Element(Plugin): |
1657 | 1657 |
}), os.path.join(metadir, 'workspaced-dependencies.yaml'))
|
1658 | 1658 |
|
1659 | 1659 |
with self.timed_activity("Caching artifact"):
|
1660 |
- artifact_size = utils._get_dir_size(assembledir)
|
|
1661 |
- self.__artifacts.commit(self, assembledir, self.__get_cache_keys_for_commit())
|
|
1660 |
+ artifact_size = self.__artifacts.commit(
|
|
1661 |
+ self, assembledir, self.__get_cache_keys_for_commit())
|
|
1662 | 1662 |
|
1663 | 1663 |
if collect is not None and collectvdir is None:
|
1664 | 1664 |
raise ElementError(
|
... | ... | @@ -1710,31 +1710,31 @@ class Element(Plugin): |
1710 | 1710 |
self._update_state()
|
1711 | 1711 |
|
1712 | 1712 |
def _pull_strong(self, *, progress=None):
|
1713 |
- weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
|
|
1714 |
- |
|
1715 | 1713 |
key = self.__strict_cache_key
|
1716 |
- if not self.__artifacts.pull(self, key, progress=progress):
|
|
1717 |
- return False
|
|
1714 |
+ pulled, artifact_size = self.__artifacts.pull(self, key,
|
|
1715 |
+ progress=progress)
|
|
1718 | 1716 |
|
1719 |
- # update weak ref by pointing it to this newly fetched artifact
|
|
1720 |
- self.__artifacts.link_key(self, key, weak_key)
|
|
1717 |
+ if pulled:
|
|
1718 |
+ # update weak ref by pointing it to this newly fetched artifact
|
|
1719 |
+ weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
|
|
1720 |
+ self.__artifacts.link_key(self, key, weak_key)
|
|
1721 | 1721 |
|
1722 |
- return True
|
|
1722 |
+ return pulled, artifact_size
|
|
1723 | 1723 |
|
1724 | 1724 |
def _pull_weak(self, *, progress=None):
|
1725 | 1725 |
weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
|
1726 |
+ pulled, artifact_size = self.__artifacts.pull(self, weak_key,
|
|
1727 |
+ progress=progress)
|
|
1726 | 1728 |
|
1727 |
- if not self.__artifacts.pull(self, weak_key, progress=progress):
|
|
1728 |
- return False
|
|
1729 |
- |
|
1730 |
- # extract strong cache key from this newly fetched artifact
|
|
1731 |
- self._pull_done()
|
|
1729 |
+ if pulled:
|
|
1730 |
+ # extract strong cache key from this newly fetched artifact
|
|
1731 |
+ self._pull_done()
|
|
1732 | 1732 |
|
1733 |
- # create tag for strong cache key
|
|
1734 |
- key = self._get_cache_key(strength=_KeyStrength.STRONG)
|
|
1735 |
- self.__artifacts.link_key(self, weak_key, key)
|
|
1733 |
+ # create tag for strong cache key
|
|
1734 |
+ key = self._get_cache_key(strength=_KeyStrength.STRONG)
|
|
1735 |
+ self.__artifacts.link_key(self, weak_key, key)
|
|
1736 | 1736 |
|
1737 |
- return True
|
|
1737 |
+ return pulled, artifact_size
|
|
1738 | 1738 |
|
1739 | 1739 |
# _pull():
|
1740 | 1740 |
#
|
... | ... | @@ -1749,18 +1749,17 @@ class Element(Plugin): |
1749 | 1749 |
self.status(message)
|
1750 | 1750 |
|
1751 | 1751 |
# Attempt to pull artifact without knowing whether it's available
|
1752 |
- pulled = self._pull_strong(progress=progress)
|
|
1752 |
+ pulled, artifact_size = self._pull_strong(progress=progress)
|
|
1753 | 1753 |
|
1754 | 1754 |
if not pulled and not self._cached() and not context.get_strict():
|
1755 |
- pulled = self._pull_weak(progress=progress)
|
|
1755 |
+ pulled, artifact_size = self._pull_weak(progress=progress)
|
|
1756 | 1756 |
|
1757 |
- if not pulled:
|
|
1758 |
- return False
|
|
1757 |
+ if pulled:
|
|
1758 |
+ # Notify successfull download
|
|
1759 |
+ display_key = self._get_brief_display_key()
|
|
1760 |
+ self.info("Downloaded artifact {}".format(display_key))
|
|
1759 | 1761 |
|
1760 |
- # Notify successfull download
|
|
1761 |
- display_key = self._get_brief_display_key()
|
|
1762 |
- self.info("Downloaded artifact {}".format(display_key))
|
|
1763 |
- return True
|
|
1762 |
+ return pulled, artifact_size
|
|
1764 | 1763 |
|
1765 | 1764 |
# _skip_push():
|
1766 | 1765 |
#
|
... | ... | @@ -111,7 +111,7 @@ class CasBasedDirectory(Directory): |
111 | 111 |
the parent).
|
112 | 112 |
|
113 | 113 |
"""
|
114 |
- self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString())
|
|
114 |
+ self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString())[0]
|
|
115 | 115 |
if caller:
|
116 | 116 |
old_dir = self._find_pb2_entry(caller.filename)
|
117 | 117 |
self.cas_cache.add_object(digest=old_dir.digest, buffer=caller.pb2_directory.SerializeToString())
|
... | ... | @@ -130,9 +130,10 @@ class CasBasedDirectory(Directory): |
130 | 130 |
self.index[entry.name].buildstream_object._recalculate_recursing_down(entry)
|
131 | 131 |
|
132 | 132 |
if parent:
|
133 |
- self.ref = self.cas_cache.add_object(digest=parent.digest, buffer=self.pb2_directory.SerializeToString())
|
|
133 |
+ self.ref = self.cas_cache.add_object(digest=parent.digest,
|
|
134 |
+ buffer=self.pb2_directory.SerializeToString())[0]
|
|
134 | 135 |
else:
|
135 |
- self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString())
|
|
136 |
+ self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString())[0]
|
|
136 | 137 |
# We don't need to do anything more than that; files were already added ealier, and symlinks are
|
137 | 138 |
# part of the directory structure.
|
138 | 139 |
|
... | ... | @@ -29,6 +29,7 @@ The default plugins with extra host dependencies are: |
29 | 29 |
* git
|
30 | 30 |
* ostree
|
31 | 31 |
* patch
|
32 |
+* pip
|
|
32 | 33 |
* tar
|
33 | 34 |
|
34 | 35 |
If you intend to push built artifacts to a remote artifact server,
|