Tiago Gomes pushed to branch tiagogomes/issue-573 at BuildStream / buildstream
Commits:
-
9b327eb6
by Ben Brewer at 2018-09-04T14:41:23Z
-
3e67e64a
by Javier Jardón at 2018-09-04T16:43:13Z
-
3409609e
by Daniel Silverstone at 2018-09-04T16:55:51Z
-
7b32e1ec
by Tristan Maat at 2018-09-04T17:20:55Z
-
37b9a4f6
by Tiago Gomes at 2018-09-07T11:23:23Z
-
33001f98
by Tiago Gomes at 2018-09-07T11:23:23Z
-
39711ae6
by Tiago Gomes at 2018-09-07T11:23:23Z
-
b3de548f
by Tiago Gomes at 2018-09-07T11:23:23Z
-
2274ef85
by Tiago Gomes at 2018-09-07T11:23:23Z
-
075d78b4
by Tiago Gomes at 2018-09-07T11:23:23Z
-
67be5cbf
by Tiago Gomes at 2018-09-07T11:23:23Z
16 changed files:
- buildstream/_artifactcache/artifactcache.py
- buildstream/_artifactcache/cascache.py
- buildstream/_artifactcache/casserver.py
- buildstream/_context.py
- buildstream/_scheduler/jobs/__init__.py
- − buildstream/_scheduler/jobs/cachesizejob.py
- buildstream/_scheduler/jobs/cleanupjob.py
- buildstream/_scheduler/jobs/elementjob.py
- buildstream/_scheduler/jobs/job.py
- buildstream/_scheduler/queues/buildqueue.py
- buildstream/_scheduler/queues/pullqueue.py
- buildstream/_scheduler/queues/queue.py
- buildstream/_scheduler/scheduler.py
- buildstream/element.py
- buildstream/storage/_casbaseddirectory.py
- doc/source/install_artifacts.rst
Changes:
... | ... | @@ -16,6 +16,7 @@ |
16 | 16 |
#
|
17 | 17 |
# Authors:
|
18 | 18 |
# Tristan Maat <tristan maat codethink co uk>
|
19 |
+# Tiago Gomes <tiago gomes codethink co uk>
|
|
19 | 20 |
|
20 | 21 |
import os
|
21 | 22 |
import string
|
... | ... | @@ -85,8 +86,6 @@ class ArtifactCache(): |
85 | 86 |
self.extractdir = os.path.join(context.artifactdir, 'extract')
|
86 | 87 |
self.tmpdir = os.path.join(context.artifactdir, 'tmp')
|
87 | 88 |
|
88 |
- self.estimated_size = None
|
|
89 |
- |
|
90 | 89 |
self.global_remote_specs = []
|
91 | 90 |
self.project_remote_specs = {}
|
92 | 91 |
|
... | ... | @@ -228,10 +227,14 @@ class ArtifactCache(): |
228 | 227 |
#
|
229 | 228 |
# Clean the artifact cache as much as possible.
|
230 | 229 |
#
|
230 |
+ # Returns:
|
|
231 |
+ # (int): Amount of bytes cleaned from the cache
|
|
232 |
+ #
|
|
231 | 233 |
def clean(self):
|
232 | 234 |
artifacts = self.list_artifacts()
|
235 |
+ cache_size = old_cache_size = self.get_cache_size()
|
|
233 | 236 |
|
234 |
- while self.calculate_cache_size() >= self.cache_quota - self.cache_lower_threshold:
|
|
237 |
+ while cache_size >= self.cache_quota - self.cache_lower_threshold:
|
|
235 | 238 |
try:
|
236 | 239 |
to_remove = artifacts.pop(0)
|
237 | 240 |
except IndexError:
|
... | ... | @@ -245,7 +248,7 @@ class ArtifactCache(): |
245 | 248 |
"Please increase the cache-quota in {}."
|
246 | 249 |
.format(self.context.config_origin or default_conf))
|
247 | 250 |
|
248 |
- if self.calculate_cache_size() > self.cache_quota:
|
|
251 |
+ if cache_size > self.cache_quota:
|
|
249 | 252 |
raise ArtifactError("Cache too full. Aborting.",
|
250 | 253 |
detail=detail,
|
251 | 254 |
reason="cache-too-full")
|
... | ... | @@ -255,44 +258,17 @@ class ArtifactCache(): |
255 | 258 |
key = to_remove.rpartition('/')[2]
|
256 | 259 |
if key not in self.required_artifacts:
|
257 | 260 |
size = self.remove(to_remove)
|
258 |
- if size:
|
|
259 |
- self.cache_size -= size
|
|
260 |
- |
|
261 |
- # This should be O(1) if implemented correctly
|
|
262 |
- return self.calculate_cache_size()
|
|
261 |
+ cache_size -= size
|
|
262 |
+ self._message(MessageType.DEBUG,
|
|
263 |
+ "Removed artifact {} ({})".format(
|
|
264 |
+ to_remove[:-(len(key) - self.context.log_key_length)],
|
|
265 |
+ utils._pretty_size(size)))
|
|
263 | 266 |
|
264 |
- # get_approximate_cache_size()
|
|
265 |
- #
|
|
266 |
- # A cheap method that aims to serve as an upper limit on the
|
|
267 |
- # artifact cache size.
|
|
268 |
- #
|
|
269 |
- # The cache size reported by this function will normally be larger
|
|
270 |
- # than the real cache size, since it is calculated using the
|
|
271 |
- # pre-commit artifact size, but for very small artifacts in
|
|
272 |
- # certain caches additional overhead could cause this to be
|
|
273 |
- # smaller than, but close to, the actual size.
|
|
274 |
- #
|
|
275 |
- # Nonetheless, in practice this should be safe to use as an upper
|
|
276 |
- # limit on the cache size.
|
|
277 |
- #
|
|
278 |
- # If the cache has built-in constant-time size reporting, please
|
|
279 |
- # feel free to override this method with a more accurate
|
|
280 |
- # implementation.
|
|
281 |
- #
|
|
282 |
- # Returns:
|
|
283 |
- # (int) An approximation of the artifact cache size.
|
|
284 |
- #
|
|
285 |
- def get_approximate_cache_size(self):
|
|
286 |
- # If we don't currently have an estimate, figure out the real
|
|
287 |
- # cache size.
|
|
288 |
- if self.estimated_size is None:
|
|
289 |
- stored_size = self._read_cache_size()
|
|
290 |
- if stored_size is not None:
|
|
291 |
- self.estimated_size = stored_size
|
|
292 |
- else:
|
|
293 |
- self.estimated_size = self.calculate_cache_size()
|
|
267 |
+ self._message(MessageType.INFO,
|
|
268 |
+ "New artifact cache size: {}".format(
|
|
269 |
+ utils._pretty_size(cache_size)))
|
|
294 | 270 |
|
295 |
- return self.estimated_size
|
|
271 |
+ return old_cache_size - cache_size
|
|
296 | 272 |
|
297 | 273 |
################################################
|
298 | 274 |
# Abstract methods for subclasses to implement #
|
... | ... | @@ -390,6 +366,10 @@ class ArtifactCache(): |
390 | 366 |
# content (str): The element's content directory
|
391 | 367 |
# keys (list): The cache keys to use
|
392 | 368 |
#
|
369 |
+ # Returns:
|
|
370 |
+ # (int): Bytes required to cache the artifact taking deduplication
|
|
371 |
+ # into account
|
|
372 |
+ #
|
|
393 | 373 |
def commit(self, element, content, keys):
|
394 | 374 |
raise ImplError("Cache '{kind}' does not implement commit()"
|
395 | 375 |
.format(kind=type(self).__name__))
|
... | ... | @@ -462,6 +442,8 @@ class ArtifactCache(): |
462 | 442 |
#
|
463 | 443 |
# Returns:
|
464 | 444 |
# (bool): True if pull was successful, False if artifact was not available
|
445 |
+ # (int): Bytes required to cache the artifact taking deduplication
|
|
446 |
+ # into account
|
|
465 | 447 |
#
|
466 | 448 |
def pull(self, element, key, *, progress=None):
|
467 | 449 |
raise ImplError("Cache '{kind}' does not implement pull()"
|
... | ... | @@ -484,8 +466,6 @@ class ArtifactCache(): |
484 | 466 |
#
|
485 | 467 |
# Return the real artifact cache size.
|
486 | 468 |
#
|
487 |
- # Implementations should also use this to update estimated_size.
|
|
488 |
- #
|
|
489 | 469 |
# Returns:
|
490 | 470 |
#
|
491 | 471 |
# (int) The size of the artifact cache.
|
... | ... | @@ -494,6 +474,22 @@ class ArtifactCache(): |
494 | 474 |
raise ImplError("Cache '{kind}' does not implement calculate_cache_size()"
|
495 | 475 |
.format(kind=type(self).__name__))
|
496 | 476 |
|
477 |
+ # get_cache_size()
|
|
478 |
+ #
|
|
479 |
+ # Return the artifact cache size.
|
|
480 |
+ #
|
|
481 |
+ # Returns:
|
|
482 |
+ # (int) The size of the artifact cache.
|
|
483 |
+ #
|
|
484 |
+ def get_cache_size(self):
|
|
485 |
+ if self.cache_size is None:
|
|
486 |
+ self.cache_size = self._read_cache_size()
|
|
487 |
+ |
|
488 |
+ if self.cache_size is None:
|
|
489 |
+ self.cache_size = self.calculate_cache_size()
|
|
490 |
+ |
|
491 |
+ return self.cache_size
|
|
492 |
+ |
|
497 | 493 |
################################################
|
498 | 494 |
# Local Private Methods #
|
499 | 495 |
################################################
|
... | ... | @@ -537,32 +533,13 @@ class ArtifactCache(): |
537 | 533 |
|
538 | 534 |
# _add_artifact_size()
|
539 | 535 |
#
|
540 |
- # Since we cannot keep track of the cache size between threads,
|
|
541 |
- # this method will be called by the main process every time a
|
|
542 |
- # process that added something to the cache finishes.
|
|
543 |
- #
|
|
544 |
- # This will then add the reported size to
|
|
545 |
- # ArtifactCache.estimated_size.
|
|
536 |
+ # Since we cannot keep track of the cache size between processes,
|
|
537 |
+ # this method will be called by the main process every time a job
|
|
538 |
+ # added or removed an artifact from the cache finishes.
|
|
546 | 539 |
#
|
547 | 540 |
def _add_artifact_size(self, artifact_size):
|
548 |
- if not self.estimated_size:
|
|
549 |
- self.estimated_size = self.calculate_cache_size()
|
|
550 |
- |
|
551 |
- self.estimated_size += artifact_size
|
|
552 |
- self._write_cache_size(self.estimated_size)
|
|
553 |
- |
|
554 |
- # _set_cache_size()
|
|
555 |
- #
|
|
556 |
- # Similarly to the above method, when we calculate the actual size
|
|
557 |
- # in a child thread, we can't update it. We instead pass the value
|
|
558 |
- # back to the main thread and update it there.
|
|
559 |
- #
|
|
560 |
- def _set_cache_size(self, cache_size):
|
|
561 |
- self.estimated_size = cache_size
|
|
562 |
- |
|
563 |
- # set_cache_size is called in cleanup, where it may set the cache to None
|
|
564 |
- if self.estimated_size is not None:
|
|
565 |
- self._write_cache_size(self.estimated_size)
|
|
541 |
+ self.cache_size = self.get_cache_size() + artifact_size
|
|
542 |
+ self._write_cache_size(self.cache_size)
|
|
566 | 543 |
|
567 | 544 |
# _write_cache_size()
|
568 | 545 |
#
|
... | ... | @@ -628,7 +605,7 @@ class ArtifactCache(): |
628 | 605 |
stat = os.statvfs(artifactdir_volume)
|
629 | 606 |
available_space = (stat.f_bsize * stat.f_bavail)
|
630 | 607 |
|
631 |
- cache_size = self.get_approximate_cache_size()
|
|
608 |
+ cache_size = self.get_cache_size()
|
|
632 | 609 |
|
633 | 610 |
# Ensure system has enough storage for the cache_quota
|
634 | 611 |
#
|
... | ... | @@ -16,6 +16,7 @@ |
16 | 16 |
#
|
17 | 17 |
# Authors:
|
18 | 18 |
# Jürg Billeter <juerg billeter codethink co uk>
|
19 |
+# Tiago Gomes <tiago gomes codethink co uk>
|
|
19 | 20 |
|
20 | 21 |
import hashlib
|
21 | 22 |
import itertools
|
... | ... | @@ -95,7 +96,7 @@ class CASCache(ArtifactCache): |
95 | 96 |
|
96 | 97 |
with tempfile.TemporaryDirectory(prefix='tmp', dir=self.extractdir) as tmpdir:
|
97 | 98 |
checkoutdir = os.path.join(tmpdir, ref)
|
98 |
- self._checkout(checkoutdir, tree)
|
|
99 |
+ self._checkout_tree(checkoutdir, tree)
|
|
99 | 100 |
|
100 | 101 |
os.makedirs(os.path.dirname(dest), exist_ok=True)
|
101 | 102 |
try:
|
... | ... | @@ -115,12 +116,12 @@ class CASCache(ArtifactCache): |
115 | 116 |
def commit(self, element, content, keys):
|
116 | 117 |
refs = [self.get_artifact_fullname(element, key) for key in keys]
|
117 | 118 |
|
118 |
- tree = self._create_tree(content)
|
|
119 |
+ tree, size = self._create_tree(content)
|
|
119 | 120 |
|
120 | 121 |
for ref in refs:
|
121 | 122 |
self.set_ref(ref, tree)
|
122 | 123 |
|
123 |
- self.cache_size = None
|
|
124 |
+ return size
|
|
124 | 125 |
|
125 | 126 |
def diff(self, element, key_a, key_b, *, subdir=None):
|
126 | 127 |
ref_a = self.get_artifact_fullname(element, key_a)
|
... | ... | @@ -238,12 +239,12 @@ class CASCache(ArtifactCache): |
238 | 239 |
tree.hash = response.digest.hash
|
239 | 240 |
tree.size_bytes = response.digest.size_bytes
|
240 | 241 |
|
241 |
- self._fetch_directory(remote, tree)
|
|
242 |
+ size = self._fetch_tree(remote, tree)
|
|
242 | 243 |
|
243 | 244 |
self.set_ref(ref, tree)
|
244 | 245 |
|
245 | 246 |
# no need to pull from additional remotes
|
246 |
- return True
|
|
247 |
+ return True, size
|
|
247 | 248 |
|
248 | 249 |
except grpc.RpcError as e:
|
249 | 250 |
if e.code() != grpc.StatusCode.NOT_FOUND:
|
... | ... | @@ -257,7 +258,7 @@ class CASCache(ArtifactCache): |
257 | 258 |
remote.spec.url, element._get_brief_display_key())
|
258 | 259 |
))
|
259 | 260 |
|
260 |
- return False
|
|
261 |
+ return False, 0
|
|
261 | 262 |
|
262 | 263 |
def link_key(self, element, oldkey, newkey):
|
263 | 264 |
oldref = self.get_artifact_fullname(element, oldkey)
|
... | ... | @@ -397,6 +398,7 @@ class CASCache(ArtifactCache): |
397 | 398 |
#
|
398 | 399 |
# Returns:
|
399 | 400 |
# (Digest): The digest of the added object
|
401 |
+ # (int): The number of bytes required to store the object
|
|
400 | 402 |
#
|
401 | 403 |
# Either `path` or `buffer` must be passed, but not both.
|
402 | 404 |
#
|
... | ... | @@ -425,22 +427,39 @@ class CASCache(ArtifactCache): |
425 | 427 |
|
426 | 428 |
out.flush()
|
427 | 429 |
|
430 |
+ file_size = os.fstat(out.fileno()).st_size
|
|
431 |
+ |
|
428 | 432 |
digest.hash = h.hexdigest()
|
429 |
- digest.size_bytes = os.fstat(out.fileno()).st_size
|
|
433 |
+ digest.size_bytes = file_size
|
|
430 | 434 |
|
431 | 435 |
# Place file at final location
|
432 | 436 |
objpath = self.objpath(digest)
|
433 |
- os.makedirs(os.path.dirname(objpath), exist_ok=True)
|
|
437 |
+ dirpath = os.path.dirname(objpath)
|
|
438 |
+ |
|
439 |
+ # Track the increased size on the parent directory caused by
|
|
440 |
+ # adding a new entry, as these directories can contain a large
|
|
441 |
+ # number of files.
|
|
442 |
+ new_dir_size = 0
|
|
443 |
+ old_dir_size = 0
|
|
444 |
+ try:
|
|
445 |
+ os.makedirs(dirpath)
|
|
446 |
+ except FileExistsError:
|
|
447 |
+ old_dir_size = os.stat(dirpath).st_size
|
|
448 |
+ else:
|
|
449 |
+ new_dir_size = os.stat(dirpath).st_size
|
|
450 |
+ |
|
434 | 451 |
os.link(out.name, objpath)
|
452 |
+ new_dir_size = os.stat(dirpath).st_size - old_dir_size
|
|
435 | 453 |
|
436 | 454 |
except FileExistsError as e:
|
437 | 455 |
# We can ignore the failed link() if the object is already in the repo.
|
456 |
+ file_size = 0
|
|
438 | 457 |
pass
|
439 | 458 |
|
440 | 459 |
except OSError as e:
|
441 | 460 |
raise ArtifactError("Failed to hash object: {}".format(e)) from e
|
442 | 461 |
|
443 |
- return digest
|
|
462 |
+ return digest, file_size + new_dir_size
|
|
444 | 463 |
|
445 | 464 |
# set_ref():
|
446 | 465 |
#
|
... | ... | @@ -449,6 +468,8 @@ class CASCache(ArtifactCache): |
449 | 468 |
# Args:
|
450 | 469 |
# ref (str): The name of the ref
|
451 | 470 |
#
|
471 |
+ # Note: Setting a ref will have a very low overhead on the cache
|
|
472 |
+ # size, so we don't track this.
|
|
452 | 473 |
def set_ref(self, ref, tree):
|
453 | 474 |
refpath = self._refpath(ref)
|
454 | 475 |
os.makedirs(os.path.dirname(refpath), exist_ok=True)
|
... | ... | @@ -488,11 +509,7 @@ class CASCache(ArtifactCache): |
488 | 509 |
raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e
|
489 | 510 |
|
490 | 511 |
def calculate_cache_size(self):
|
491 |
- if self.cache_size is None:
|
|
492 |
- self.cache_size = utils._get_dir_size(self.casdir)
|
|
493 |
- self.estimated_size = self.cache_size
|
|
494 |
- |
|
495 |
- return self.cache_size
|
|
512 |
+ return utils._get_dir_size(self.casdir)
|
|
496 | 513 |
|
497 | 514 |
# list_artifacts():
|
498 | 515 |
#
|
... | ... | @@ -599,7 +616,7 @@ class CASCache(ArtifactCache): |
599 | 616 |
################################################
|
600 | 617 |
# Local Private Methods #
|
601 | 618 |
################################################
|
602 |
- def _checkout(self, dest, tree):
|
|
619 |
+ def _checkout_tree(self, dest, tree):
|
|
603 | 620 |
os.makedirs(dest, exist_ok=True)
|
604 | 621 |
|
605 | 622 |
directory = remote_execution_pb2.Directory()
|
... | ... | @@ -618,7 +635,7 @@ class CASCache(ArtifactCache): |
618 | 635 |
|
619 | 636 |
for dirnode in directory.directories:
|
620 | 637 |
fullpath = os.path.join(dest, dirnode.name)
|
621 |
- self._checkout(fullpath, dirnode.digest)
|
|
638 |
+ self._checkout_tree(fullpath, dirnode.digest)
|
|
622 | 639 |
|
623 | 640 |
for symlinknode in directory.symlinks:
|
624 | 641 |
# symlink
|
... | ... | @@ -630,6 +647,7 @@ class CASCache(ArtifactCache): |
630 | 647 |
|
631 | 648 |
def _create_tree(self, path, *, digest=None):
|
632 | 649 |
directory = remote_execution_pb2.Directory()
|
650 |
+ size = 0
|
|
633 | 651 |
|
634 | 652 |
for name in sorted(os.listdir(path)):
|
635 | 653 |
full_path = os.path.join(path, name)
|
... | ... | @@ -637,11 +655,11 @@ class CASCache(ArtifactCache): |
637 | 655 |
if stat.S_ISDIR(mode):
|
638 | 656 |
dirnode = directory.directories.add()
|
639 | 657 |
dirnode.name = name
|
640 |
- self._create_tree(full_path, digest=dirnode.digest)
|
|
658 |
+ size += self._create_tree(full_path, digest=dirnode.digest)[1]
|
|
641 | 659 |
elif stat.S_ISREG(mode):
|
642 | 660 |
filenode = directory.files.add()
|
643 | 661 |
filenode.name = name
|
644 |
- self.add_object(path=full_path, digest=filenode.digest)
|
|
662 |
+ size += self.add_object(path=full_path, digest=filenode.digest)[1]
|
|
645 | 663 |
filenode.is_executable = (mode & stat.S_IXUSR) == stat.S_IXUSR
|
646 | 664 |
elif stat.S_ISLNK(mode):
|
647 | 665 |
symlinknode = directory.symlinks.add()
|
... | ... | @@ -650,7 +668,8 @@ class CASCache(ArtifactCache): |
650 | 668 |
else:
|
651 | 669 |
raise ArtifactError("Unsupported file type for {}".format(full_path))
|
652 | 670 |
|
653 |
- return self.add_object(digest=digest, buffer=directory.SerializeToString())
|
|
671 |
+ res = self.add_object(digest=digest, buffer=directory.SerializeToString())
|
|
672 |
+ return res[0], res[1] + size
|
|
654 | 673 |
|
655 | 674 |
def _get_subdir(self, tree, subdir):
|
656 | 675 |
head, name = os.path.split(subdir)
|
... | ... | @@ -793,11 +812,12 @@ class CASCache(ArtifactCache): |
793 | 812 |
out.flush()
|
794 | 813 |
assert digest.size_bytes == os.fstat(out.fileno()).st_size
|
795 | 814 |
|
796 |
- def _fetch_directory(self, remote, tree):
|
|
815 |
+ def _fetch_tree(self, remote, tree):
|
|
816 |
+ size = 0
|
|
797 | 817 |
objpath = self.objpath(tree)
|
798 | 818 |
if os.path.exists(objpath):
|
799 | 819 |
# already in local cache
|
800 |
- return
|
|
820 |
+ return 0
|
|
801 | 821 |
|
802 | 822 |
with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
|
803 | 823 |
self._fetch_blob(remote, tree, out)
|
... | ... | @@ -808,7 +828,7 @@ class CASCache(ArtifactCache): |
808 | 828 |
directory.ParseFromString(f.read())
|
809 | 829 |
|
810 | 830 |
for filenode in directory.files:
|
811 |
- fileobjpath = self.objpath(tree)
|
|
831 |
+ fileobjpath = self.objpath(filenode.digest)
|
|
812 | 832 |
if os.path.exists(fileobjpath):
|
813 | 833 |
# already in local cache
|
814 | 834 |
continue
|
... | ... | @@ -816,17 +836,21 @@ class CASCache(ArtifactCache): |
816 | 836 |
with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
|
817 | 837 |
self._fetch_blob(remote, filenode.digest, f)
|
818 | 838 |
|
819 |
- digest = self.add_object(path=f.name)
|
|
839 |
+ digest, obj_size = self.add_object(path=f.name)
|
|
840 |
+ size += obj_size
|
|
820 | 841 |
assert digest.hash == filenode.digest.hash
|
821 | 842 |
|
822 | 843 |
for dirnode in directory.directories:
|
823 |
- self._fetch_directory(remote, dirnode.digest)
|
|
844 |
+ size += self._fetch_tree(remote, dirnode.digest)
|
|
824 | 845 |
|
825 | 846 |
# place directory blob only in final location when we've downloaded
|
826 | 847 |
# all referenced blobs to avoid dangling references in the repository
|
827 |
- digest = self.add_object(path=out.name)
|
|
848 |
+ digest, obj_size = self.add_object(path=out.name)
|
|
849 |
+ size += obj_size
|
|
828 | 850 |
assert digest.hash == tree.hash
|
829 | 851 |
|
852 |
+ return size
|
|
853 |
+ |
|
830 | 854 |
|
831 | 855 |
# Represents a single remote CAS cache.
|
832 | 856 |
#
|
... | ... | @@ -203,7 +203,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
203 | 203 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
204 | 204 |
return response
|
205 | 205 |
out.flush()
|
206 |
- digest = self.cas.add_object(path=out.name)
|
|
206 |
+ digest = self.cas.add_object(path=out.name)[0]
|
|
207 | 207 |
if digest.hash != client_digest.hash:
|
208 | 208 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
209 | 209 |
return response
|
... | ... | @@ -119,7 +119,6 @@ class Context(): |
119 | 119 |
self._log_handle = None
|
120 | 120 |
self._log_filename = None
|
121 | 121 |
self.config_cache_quota = 'infinity'
|
122 |
- self.artifactdir_volume = None
|
|
123 | 122 |
|
124 | 123 |
# load()
|
125 | 124 |
#
|
1 | 1 |
from .elementjob import ElementJob
|
2 |
-from .cachesizejob import CacheSizeJob
|
|
3 | 2 |
from .cleanupjob import CleanupJob
|
1 |
-# Copyright (C) 2018 Codethink Limited
|
|
2 |
-#
|
|
3 |
-# This program is free software; you can redistribute it and/or
|
|
4 |
-# modify it under the terms of the GNU Lesser General Public
|
|
5 |
-# License as published by the Free Software Foundation; either
|
|
6 |
-# version 2 of the License, or (at your option) any later version.
|
|
7 |
-#
|
|
8 |
-# This library is distributed in the hope that it will be useful,
|
|
9 |
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
10 |
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
11 |
-# Lesser General Public License for more details.
|
|
12 |
-#
|
|
13 |
-# You should have received a copy of the GNU Lesser General Public
|
|
14 |
-# License along with this library. If not, see <http://www.gnu.org/licenses/>.
|
|
15 |
-#
|
|
16 |
-# Author:
|
|
17 |
-# Tristan Daniël Maat <tristan maat codethink co uk>
|
|
18 |
-#
|
|
19 |
-from .job import Job
|
|
20 |
-from ..._platform import Platform
|
|
21 |
- |
|
22 |
- |
|
23 |
-class CacheSizeJob(Job):
|
|
24 |
- def __init__(self, *args, complete_cb, **kwargs):
|
|
25 |
- super().__init__(*args, **kwargs)
|
|
26 |
- self._complete_cb = complete_cb
|
|
27 |
- self._cache = Platform._instance.artifactcache
|
|
28 |
- |
|
29 |
- def child_process(self):
|
|
30 |
- return self._cache.calculate_cache_size()
|
|
31 |
- |
|
32 |
- def parent_complete(self, success, result):
|
|
33 |
- self._cache._set_cache_size(result)
|
|
34 |
- if self._complete_cb:
|
|
35 |
- self._complete_cb(result)
|
|
36 |
- |
|
37 |
- def child_process_data(self):
|
|
38 |
- return {}
|
... | ... | @@ -21,18 +21,19 @@ from ..._platform import Platform |
21 | 21 |
|
22 | 22 |
|
23 | 23 |
class CleanupJob(Job):
|
24 |
- def __init__(self, *args, complete_cb, **kwargs):
|
|
24 |
+ def __init__(self, *args, **kwargs):
|
|
25 | 25 |
super().__init__(*args, **kwargs)
|
26 |
- self._complete_cb = complete_cb
|
|
27 | 26 |
self._cache = Platform._instance.artifactcache
|
28 | 27 |
|
29 | 28 |
def child_process(self):
|
30 | 29 |
return self._cache.clean()
|
31 | 30 |
|
32 | 31 |
def parent_complete(self, success, result):
|
33 |
- self._cache._set_cache_size(result)
|
|
34 |
- if self._complete_cb:
|
|
35 |
- self._complete_cb()
|
|
32 |
+ if success:
|
|
33 |
+ # ArtifactCache.clean() returns the number of bytes cleaned.
|
|
34 |
+ # We negate the number because the cache size is to be
|
|
35 |
+ # decreased.
|
|
36 |
+ self._cache._add_artifact_size(result * -1)
|
|
36 | 37 |
|
37 | 38 |
def child_process_data(self):
|
38 | 39 |
return {}
|
... | ... | @@ -110,12 +110,10 @@ class ElementJob(Job): |
110 | 110 |
|
111 | 111 |
workspace = self._element._get_workspace()
|
112 | 112 |
artifact_size = self._element._get_artifact_size()
|
113 |
- cache_size = self._element._get_artifact_cache().calculate_cache_size()
|
|
114 | 113 |
|
115 | 114 |
if workspace is not None:
|
116 | 115 |
data['workspace'] = workspace.to_dict()
|
117 | 116 |
if artifact_size is not None:
|
118 | 117 |
data['artifact_size'] = artifact_size
|
119 |
- data['cache_size'] = cache_size
|
|
120 | 118 |
|
121 | 119 |
return data
|
... | ... | @@ -109,7 +109,7 @@ class Job(): |
109 | 109 |
# Private members
|
110 | 110 |
#
|
111 | 111 |
self._scheduler = scheduler # The scheduler
|
112 |
- self._queue = multiprocessing.Queue() # A message passing queue
|
|
112 |
+ self._queue = None # A message passing queue
|
|
113 | 113 |
self._process = None # The Process object
|
114 | 114 |
self._watcher = None # Child process watcher
|
115 | 115 |
self._listening = False # Whether the parent is currently listening
|
... | ... | @@ -130,6 +130,8 @@ class Job(): |
130 | 130 |
#
|
131 | 131 |
def spawn(self):
|
132 | 132 |
|
133 |
+ self._queue = multiprocessing.Queue()
|
|
134 |
+ |
|
133 | 135 |
self._tries += 1
|
134 | 136 |
self._parent_start_listening()
|
135 | 137 |
|
... | ... | @@ -552,6 +554,9 @@ class Job(): |
552 | 554 |
self.parent_complete(returncode == RC_OK, self._result)
|
553 | 555 |
self._scheduler.job_completed(self, returncode == RC_OK)
|
554 | 556 |
|
557 |
+ # Force the deletion of the queue and process objects to try and clean up FDs
|
|
558 |
+ self._queue = self._process = None
|
|
559 |
+ |
|
555 | 560 |
# _parent_process_envelope()
|
556 | 561 |
#
|
557 | 562 |
# Processes a message Envelope deserialized form the message queue.
|
... | ... | @@ -87,19 +87,6 @@ class BuildQueue(Queue): |
87 | 87 |
|
88 | 88 |
return QueueStatus.READY
|
89 | 89 |
|
90 |
- def _check_cache_size(self, job, element):
|
|
91 |
- if not job.child_data:
|
|
92 |
- return
|
|
93 |
- |
|
94 |
- artifact_size = job.child_data.get('artifact_size', False)
|
|
95 |
- |
|
96 |
- if artifact_size:
|
|
97 |
- cache = element._get_artifact_cache()
|
|
98 |
- cache._add_artifact_size(artifact_size)
|
|
99 |
- |
|
100 |
- if cache.get_approximate_cache_size() > cache.cache_quota:
|
|
101 |
- self._scheduler._check_cache_size_real()
|
|
102 |
- |
|
103 | 90 |
def done(self, job, element, result, success):
|
104 | 91 |
|
105 | 92 |
if success:
|
1 | 1 |
#
|
2 |
-# Copyright (C) 2016 Codethink Limited
|
|
2 |
+# Copyright (C) 2018 Codethink Limited
|
|
3 | 3 |
#
|
4 | 4 |
# This program is free software; you can redistribute it and/or
|
5 | 5 |
# modify it under the terms of the GNU Lesser General Public
|
... | ... | @@ -52,17 +52,14 @@ class PullQueue(Queue): |
52 | 52 |
else:
|
53 | 53 |
return QueueStatus.SKIP
|
54 | 54 |
|
55 |
- def done(self, _, element, result, success):
|
|
55 |
+ def done(self, job, element, result, success):
|
|
56 | 56 |
|
57 | 57 |
if not success:
|
58 | 58 |
return False
|
59 | 59 |
|
60 | 60 |
element._pull_done()
|
61 | 61 |
|
62 |
- # Build jobs will check the "approximate" size first. Since we
|
|
63 |
- # do not get an artifact size from pull jobs, we have to
|
|
64 |
- # actually check the cache size.
|
|
65 |
- self._scheduler._check_cache_size_real()
|
|
62 |
+ self._check_cache_size(job, element)
|
|
66 | 63 |
|
67 | 64 |
# Element._pull() returns True if it downloaded an artifact,
|
68 | 65 |
# here we want to appear skipped if we did not download.
|
1 | 1 |
#
|
2 |
-# Copyright (C) 2016 Codethink Limited
|
|
2 |
+# Copyright (C) 2018 Codethink Limited
|
|
3 | 3 |
#
|
4 | 4 |
# This program is free software; you can redistribute it and/or
|
5 | 5 |
# modify it under the terms of the GNU Lesser General Public
|
... | ... | @@ -301,8 +301,6 @@ class Queue(): |
301 | 301 |
# Update values that need to be synchronized in the main task
|
302 | 302 |
# before calling any queue implementation
|
303 | 303 |
self._update_workspaces(element, job)
|
304 |
- if job.child_data:
|
|
305 |
- element._get_artifact_cache().cache_size = job.child_data.get('cache_size')
|
|
306 | 304 |
|
307 | 305 |
# Give the result of the job to the Queue implementor,
|
308 | 306 |
# and determine if it should be considered as processed
|
... | ... | @@ -360,3 +358,16 @@ class Queue(): |
360 | 358 |
logfile = "{key}-{action}".format(key=key, action=action)
|
361 | 359 |
|
362 | 360 |
return os.path.join(project.name, element.normal_name, logfile)
|
361 |
+ |
|
362 |
+ def _check_cache_size(self, job, element):
|
|
363 |
+ if not job.child_data:
|
|
364 |
+ return
|
|
365 |
+ |
|
366 |
+ artifact_size = job.child_data.get('artifact_size', False)
|
|
367 |
+ |
|
368 |
+ if artifact_size:
|
|
369 |
+ cache = element._get_artifact_cache()
|
|
370 |
+ cache._add_artifact_size(artifact_size)
|
|
371 |
+ |
|
372 |
+ if cache.get_cache_size() > cache.cache_quota:
|
|
373 |
+ self._scheduler._run_cache_cleanup()
|
1 | 1 |
#
|
2 |
-# Copyright (C) 2016 Codethink Limited
|
|
2 |
+# Copyright (C) 2018 Codethink Limited
|
|
3 | 3 |
#
|
4 | 4 |
# This program is free software; you can redistribute it and/or
|
5 | 5 |
# modify it under the terms of the GNU Lesser General Public
|
... | ... | @@ -28,8 +28,7 @@ from contextlib import contextmanager |
28 | 28 |
|
29 | 29 |
# Local imports
|
30 | 30 |
from .resources import Resources, ResourceType
|
31 |
-from .jobs import CacheSizeJob, CleanupJob
|
|
32 |
-from .._platform import Platform
|
|
31 |
+from .jobs import CleanupJob
|
|
33 | 32 |
|
34 | 33 |
|
35 | 34 |
# A decent return code for Scheduler.run()
|
... | ... | @@ -316,24 +315,11 @@ class Scheduler(): |
316 | 315 |
self.schedule_jobs(ready)
|
317 | 316 |
self._sched()
|
318 | 317 |
|
319 |
- def _run_cleanup(self, cache_size):
|
|
320 |
- platform = Platform.get_platform()
|
|
321 |
- if cache_size and cache_size < platform.artifactcache.cache_quota:
|
|
322 |
- return
|
|
323 |
- |
|
324 |
- job = CleanupJob(self, 'cleanup', 'cleanup',
|
|
318 |
+ def _run_cache_cleanup(self):
|
|
319 |
+ job = CleanupJob(self, 'Cleaning artifact cache', 'cleanup',
|
|
325 | 320 |
resources=[ResourceType.CACHE,
|
326 | 321 |
ResourceType.PROCESS],
|
327 |
- exclusive_resources=[ResourceType.CACHE],
|
|
328 |
- complete_cb=None)
|
|
329 |
- self.schedule_jobs([job])
|
|
330 |
- |
|
331 |
- def _check_cache_size_real(self):
|
|
332 |
- job = CacheSizeJob(self, 'cache_size', 'cache_size/cache_size',
|
|
333 |
- resources=[ResourceType.CACHE,
|
|
334 |
- ResourceType.PROCESS],
|
|
335 |
- exclusive_resources=[ResourceType.CACHE],
|
|
336 |
- complete_cb=self._run_cleanup)
|
|
322 |
+ exclusive_resources=[ResourceType.CACHE])
|
|
337 | 323 |
self.schedule_jobs([job])
|
338 | 324 |
|
339 | 325 |
# _suspend_jobs()
|
... | ... | @@ -1646,8 +1646,8 @@ class Element(Plugin): |
1646 | 1646 |
}), os.path.join(metadir, 'workspaced-dependencies.yaml'))
|
1647 | 1647 |
|
1648 | 1648 |
with self.timed_activity("Caching artifact"):
|
1649 |
- self.__artifact_size = utils._get_dir_size(assembledir)
|
|
1650 |
- self.__artifacts.commit(self, assembledir, self.__get_cache_keys_for_commit())
|
|
1649 |
+ self.__artifact_size = self.__artifacts.commit(
|
|
1650 |
+ self, assembledir, self.__get_cache_keys_for_commit())
|
|
1651 | 1651 |
|
1652 | 1652 |
if collect is not None and collectvdir is None:
|
1653 | 1653 |
raise ElementError(
|
... | ... | @@ -1697,31 +1697,31 @@ class Element(Plugin): |
1697 | 1697 |
self._update_state()
|
1698 | 1698 |
|
1699 | 1699 |
def _pull_strong(self, *, progress=None):
|
1700 |
- weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
|
|
1701 |
- |
|
1702 | 1700 |
key = self.__strict_cache_key
|
1703 |
- if not self.__artifacts.pull(self, key, progress=progress):
|
|
1704 |
- return False
|
|
1701 |
+ pulled, self.__artifact_size = self.__artifacts.pull(
|
|
1702 |
+ self, key, progress=progress)
|
|
1705 | 1703 |
|
1706 |
- # update weak ref by pointing it to this newly fetched artifact
|
|
1707 |
- self.__artifacts.link_key(self, key, weak_key)
|
|
1704 |
+ if pulled:
|
|
1705 |
+ # update weak ref by pointing it to this newly fetched artifact
|
|
1706 |
+ weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
|
|
1707 |
+ self.__artifacts.link_key(self, key, weak_key)
|
|
1708 | 1708 |
|
1709 |
- return True
|
|
1709 |
+ return pulled
|
|
1710 | 1710 |
|
1711 | 1711 |
def _pull_weak(self, *, progress=None):
|
1712 | 1712 |
weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
|
1713 |
+ pulled, self.__artifact_size = self.__artifacts.pull(
|
|
1714 |
+ self, weak_key, progress=progress)
|
|
1713 | 1715 |
|
1714 |
- if not self.__artifacts.pull(self, weak_key, progress=progress):
|
|
1715 |
- return False
|
|
1716 |
- |
|
1717 |
- # extract strong cache key from this newly fetched artifact
|
|
1718 |
- self._pull_done()
|
|
1716 |
+ if pulled:
|
|
1717 |
+ # extract strong cache key from this newly fetched artifact
|
|
1718 |
+ self._pull_done()
|
|
1719 | 1719 |
|
1720 |
- # create tag for strong cache key
|
|
1721 |
- key = self._get_cache_key(strength=_KeyStrength.STRONG)
|
|
1722 |
- self.__artifacts.link_key(self, weak_key, key)
|
|
1720 |
+ # create tag for strong cache key
|
|
1721 |
+ key = self._get_cache_key(strength=_KeyStrength.STRONG)
|
|
1722 |
+ self.__artifacts.link_key(self, weak_key, key)
|
|
1723 | 1723 |
|
1724 |
- return True
|
|
1724 |
+ return pulled
|
|
1725 | 1725 |
|
1726 | 1726 |
# _pull():
|
1727 | 1727 |
#
|
... | ... | @@ -1741,13 +1741,12 @@ class Element(Plugin): |
1741 | 1741 |
if not pulled and not self._cached() and not context.get_strict():
|
1742 | 1742 |
pulled = self._pull_weak(progress=progress)
|
1743 | 1743 |
|
1744 |
- if not pulled:
|
|
1745 |
- return False
|
|
1744 |
+ if pulled:
|
|
1745 |
+ # Notify successfull download
|
|
1746 |
+ display_key = self._get_brief_display_key()
|
|
1747 |
+ self.info("Downloaded artifact {}".format(display_key))
|
|
1746 | 1748 |
|
1747 |
- # Notify successfull download
|
|
1748 |
- display_key = self._get_brief_display_key()
|
|
1749 |
- self.info("Downloaded artifact {}".format(display_key))
|
|
1750 |
- return True
|
|
1749 |
+ return pulled
|
|
1751 | 1750 |
|
1752 | 1751 |
# _skip_push():
|
1753 | 1752 |
#
|
... | ... | @@ -111,7 +111,7 @@ class CasBasedDirectory(Directory): |
111 | 111 |
the parent).
|
112 | 112 |
|
113 | 113 |
"""
|
114 |
- self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString())
|
|
114 |
+ self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString())[0]
|
|
115 | 115 |
if caller:
|
116 | 116 |
old_dir = self._find_pb2_entry(caller.filename)
|
117 | 117 |
self.cas_cache.add_object(digest=old_dir.digest, buffer=caller.pb2_directory.SerializeToString())
|
... | ... | @@ -130,9 +130,10 @@ class CasBasedDirectory(Directory): |
130 | 130 |
self.index[entry.name].buildstream_object._recalculate_recursing_down(entry)
|
131 | 131 |
|
132 | 132 |
if parent:
|
133 |
- self.ref = self.cas_cache.add_object(digest=parent.digest, buffer=self.pb2_directory.SerializeToString())
|
|
133 |
+ self.ref = self.cas_cache.add_object(digest=parent.digest,
|
|
134 |
+ buffer=self.pb2_directory.SerializeToString())[0]
|
|
134 | 135 |
else:
|
135 |
- self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString())
|
|
136 |
+ self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString())[0]
|
|
136 | 137 |
# We don't need to do anything more than that; files were already added ealier, and symlinks are
|
137 | 138 |
# part of the directory structure.
|
138 | 139 |
|
... | ... | @@ -161,13 +161,13 @@ Below are two examples of how to run the cache server as a systemd service, one |
161 | 161 |
|
162 | 162 |
[Service]
|
163 | 163 |
Environment="LC_ALL=C.UTF-8"
|
164 |
- ExecStart=/usr/local/bin/bst-artifact-server --port 11001 --server-key {{certs_path}}/privkey.pem --
|
|
165 |
- server-cert {{certs_path}}/fullchain.pem {{artifacts_path}}
|
|
164 |
+ ExecStart=/usr/local/bin/bst-artifact-server --port 11001 --server-key {{certs_path}}/server.key --server-cert {{certs_path}}/server.crt {{artifacts_path}}
|
|
166 | 165 |
User=artifacts
|
167 | 166 |
|
168 | 167 |
[Install]
|
169 | 168 |
WantedBy=multi-user.target
|
170 | 169 |
|
170 |
+.. code:: ini
|
|
171 | 171 |
|
172 | 172 |
#
|
173 | 173 |
# Pull/Push
|
... | ... | @@ -178,9 +178,7 @@ Below are two examples of how to run the cache server as a systemd service, one |
178 | 178 |
|
179 | 179 |
[Service]
|
180 | 180 |
Environment="LC_ALL=C.UTF-8"
|
181 |
- ExecStart=/usr/local/bin/bst-artifact-server --port 11002 --server-key {{certs_path}}/privkey.pem --
|
|
182 |
- server-cert {{certs_path}}/fullchain.pem --client-certs /home/artifacts/authorized.crt --enable-push /
|
|
183 |
- {{artifacts_path}}
|
|
181 |
+ ExecStart=/usr/local/bin/bst-artifact-server --port 11002 --server-key {{certs_path}}/server.key --server-cert {{certs_path}}/server.crt --client-certs {{certs_path}}/authorized.crt --enable-push {{artifacts_path}}
|
|
184 | 182 |
User=artifacts
|
185 | 183 |
|
186 | 184 |
[Install]
|
... | ... | @@ -188,11 +186,16 @@ Below are two examples of how to run the cache server as a systemd service, one |
188 | 186 |
|
189 | 187 |
Here we define when systemd should start the service, which is after the networking stack has been started, we then define how to run the cache with the desired configuration, under the artifacts user. The {{ }} are there to denote where you should change these files to point to your desired locations.
|
190 | 188 |
|
189 |
+For more information on systemd services see:
|
|
190 |
+`Creating Systemd Service Files <https://www.devdungeon.com/content/creating-systemd-service-files>`_.
|
|
191 |
+ |
|
191 | 192 |
User configuration
|
192 | 193 |
~~~~~~~~~~~~~~~~~~
|
193 | 194 |
The user configuration for artifacts is documented with the rest
|
194 | 195 |
of the :ref:`user configuration documentation <user_config>`.
|
195 | 196 |
|
197 |
+Note that for self-signed certificates, the public key fields are mandatory.
|
|
198 |
+ |
|
196 | 199 |
Assuming you have the same setup used in this document, and that your
|
197 | 200 |
host is reachable on the internet as ``artifacts.com`` (for example),
|
198 | 201 |
then a user can use the following user configuration:
|