Tiago Gomes pushed to branch master at BuildStream / buildstream
Commits:
-
924cdc75
by Tiago Gomes at 2018-09-14T15:32:01Z
-
510ccbfd
by Tiago Gomes at 2018-09-14T15:32:21Z
-
82d4e2ac
by Tiago Gomes at 2018-09-14T15:32:21Z
-
18b37aab
by Tiago Gomes at 2018-09-14T15:34:10Z
-
32fad24f
by Tiago Gomes at 2018-09-14T15:38:41Z
-
46cbd889
by Tiago Gomes at 2018-09-14T15:38:43Z
-
2fa92716
by Tiago Gomes at 2018-09-14T15:38:43Z
-
c285f244
by Tiago Gomes at 2018-09-14T16:08:07Z
7 changed files:
- buildstream/_artifactcache/artifactcache.py
- buildstream/_artifactcache/cascache.py
- buildstream/_context.py
- buildstream/_scheduler/jobs/__init__.py
- buildstream/_scheduler/jobs/cleanupjob.py
- buildstream/_scheduler/queues/buildqueue.py
- buildstream/_scheduler/scheduler.py
Changes:
... | ... | @@ -267,7 +267,7 @@ class ArtifactCache(): |
267 | 267 |
"Please increase the cache-quota in {}."
|
268 | 268 |
.format(self.context.config_origin or default_conf))
|
269 | 269 |
|
270 |
- if self.get_quota_exceeded():
|
|
270 |
+ if self.has_quota_exceeded():
|
|
271 | 271 |
raise ArtifactError("Cache too full. Aborting.",
|
272 | 272 |
detail=detail,
|
273 | 273 |
reason="cache-too-full")
|
... | ... | @@ -354,14 +354,14 @@ class ArtifactCache(): |
354 | 354 |
self._cache_size = cache_size
|
355 | 355 |
self._write_cache_size(self._cache_size)
|
356 | 356 |
|
357 |
- # get_quota_exceeded()
|
|
357 |
+ # has_quota_exceeded()
|
|
358 | 358 |
#
|
359 | 359 |
# Checks if the current artifact cache size exceeds the quota.
|
360 | 360 |
#
|
361 | 361 |
# Returns:
|
362 | 362 |
# (bool): True of the quota is exceeded
|
363 | 363 |
#
|
364 |
- def get_quota_exceeded(self):
|
|
364 |
+ def has_quota_exceeded(self):
|
|
365 | 365 |
return self.get_cache_size() > self._cache_quota
|
366 | 366 |
|
367 | 367 |
################################################
|
... | ... | @@ -117,7 +117,7 @@ class CASCache(ArtifactCache): |
117 | 117 |
def commit(self, element, content, keys):
|
118 | 118 |
refs = [self.get_artifact_fullname(element, key) for key in keys]
|
119 | 119 |
|
120 |
- tree = self._create_tree(content)
|
|
120 |
+ tree = self._commit_directory(content)
|
|
121 | 121 |
|
122 | 122 |
for ref in refs:
|
123 | 123 |
self.set_ref(ref, tree)
|
... | ... | @@ -665,7 +665,21 @@ class CASCache(ArtifactCache): |
665 | 665 |
def _refpath(self, ref):
|
666 | 666 |
return os.path.join(self.casdir, 'refs', 'heads', ref)
|
667 | 667 |
|
668 |
- def _create_tree(self, path, *, digest=None):
|
|
668 |
+ # _commit_directory():
|
|
669 |
+ #
|
|
670 |
+ # Adds local directory to content addressable store.
|
|
671 |
+ #
|
|
672 |
+ # Adds files, symbolic links and recursively other directories in
|
|
673 |
+ # a local directory to the content addressable store.
|
|
674 |
+ #
|
|
675 |
+ # Args:
|
|
676 |
+ # path (str): Path to the directory to add.
|
|
677 |
+ # dir_digest (Digest): An optional Digest object to use.
|
|
678 |
+ #
|
|
679 |
+ # Returns:
|
|
680 |
+ # (Digest): Digest object for the directory added.
|
|
681 |
+ #
|
|
682 |
+ def _commit_directory(self, path, *, dir_digest=None):
|
|
669 | 683 |
directory = remote_execution_pb2.Directory()
|
670 | 684 |
|
671 | 685 |
for name in sorted(os.listdir(path)):
|
... | ... | @@ -674,7 +688,7 @@ class CASCache(ArtifactCache): |
674 | 688 |
if stat.S_ISDIR(mode):
|
675 | 689 |
dirnode = directory.directories.add()
|
676 | 690 |
dirnode.name = name
|
677 |
- self._create_tree(full_path, digest=dirnode.digest)
|
|
691 |
+ self._commit_directory(full_path, dir_digest=dirnode.digest)
|
|
678 | 692 |
elif stat.S_ISREG(mode):
|
679 | 693 |
filenode = directory.files.add()
|
680 | 694 |
filenode.name = name
|
... | ... | @@ -690,7 +704,8 @@ class CASCache(ArtifactCache): |
690 | 704 |
else:
|
691 | 705 |
raise ArtifactError("Unsupported file type for {}".format(full_path))
|
692 | 706 |
|
693 |
- return self.add_object(digest=digest, buffer=directory.SerializeToString())
|
|
707 |
+ return self.add_object(digest=dir_digest,
|
|
708 |
+ buffer=directory.SerializeToString())
|
|
694 | 709 |
|
695 | 710 |
def _get_subdir(self, tree, subdir):
|
696 | 711 |
head, name = os.path.split(subdir)
|
... | ... | @@ -833,14 +848,26 @@ class CASCache(ArtifactCache): |
833 | 848 |
|
834 | 849 |
assert digest.size_bytes == os.fstat(stream.fileno()).st_size
|
835 | 850 |
|
836 |
- def _fetch_directory(self, remote, tree):
|
|
837 |
- objpath = self.objpath(tree)
|
|
851 |
+ # _fetch_directory():
|
|
852 |
+ #
|
|
853 |
+ # Fetches remote directory and adds it to content addressable store.
|
|
854 |
+ #
|
|
855 |
+ # Fetches files, symbolic links and recursively other directories in
|
|
856 |
+ # the remote directory and adds them to the content addressable
|
|
857 |
+ # store.
|
|
858 |
+ #
|
|
859 |
+ # Args:
|
|
860 |
+ # remote (Remote): The remote to use.
|
|
861 |
+ # dir_digest (Digest): Digest object for the directory to fetch.
|
|
862 |
+ #
|
|
863 |
+ def _fetch_directory(self, remote, dir_digest):
|
|
864 |
+ objpath = self.objpath(dir_digest)
|
|
838 | 865 |
if os.path.exists(objpath):
|
839 | 866 |
# already in local cache
|
840 | 867 |
return
|
841 | 868 |
|
842 | 869 |
with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
|
843 |
- self._fetch_blob(remote, tree, out)
|
|
870 |
+ self._fetch_blob(remote, dir_digest, out)
|
|
844 | 871 |
|
845 | 872 |
directory = remote_execution_pb2.Directory()
|
846 | 873 |
|
... | ... | @@ -848,7 +875,7 @@ class CASCache(ArtifactCache): |
848 | 875 |
directory.ParseFromString(f.read())
|
849 | 876 |
|
850 | 877 |
for filenode in directory.files:
|
851 |
- fileobjpath = self.objpath(tree)
|
|
878 |
+ fileobjpath = self.objpath(filenode.digest)
|
|
852 | 879 |
if os.path.exists(fileobjpath):
|
853 | 880 |
# already in local cache
|
854 | 881 |
continue
|
... | ... | @@ -862,10 +889,11 @@ class CASCache(ArtifactCache): |
862 | 889 |
for dirnode in directory.directories:
|
863 | 890 |
self._fetch_directory(remote, dirnode.digest)
|
864 | 891 |
|
865 |
- # place directory blob only in final location when we've downloaded
|
|
866 |
- # all referenced blobs to avoid dangling references in the repository
|
|
892 |
+ # Place directory blob only in final location when we've
|
|
893 |
+ # downloaded all referenced blobs to avoid dangling
|
|
894 |
+ # references in the repository.
|
|
867 | 895 |
digest = self.add_object(path=out.name)
|
868 |
- assert digest.hash == tree.hash
|
|
896 |
+ assert digest.hash == dir_digest.hash
|
|
869 | 897 |
|
870 | 898 |
def _fetch_tree(self, remote, digest):
|
871 | 899 |
# download but do not store the Tree object
|
... | ... | @@ -119,7 +119,6 @@ class Context(): |
119 | 119 |
self._log_handle = None
|
120 | 120 |
self._log_filename = None
|
121 | 121 |
self.config_cache_quota = 'infinity'
|
122 |
- self.artifactdir_volume = None
|
|
123 | 122 |
|
124 | 123 |
# load()
|
125 | 124 |
#
|
1 |
+#
|
|
2 |
+# Copyright (C) 2018 Codethink Limited
|
|
3 |
+#
|
|
4 |
+# This program is free software; you can redistribute it and/or
|
|
5 |
+# modify it under the terms of the GNU Lesser General Public
|
|
6 |
+# License as published by the Free Software Foundation; either
|
|
7 |
+# version 2 of the License, or (at your option) any later version.
|
|
8 |
+#
|
|
9 |
+# This library is distributed in the hope that it will be useful,
|
|
10 |
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
11 |
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
12 |
+# Lesser General Public License for more details.
|
|
13 |
+#
|
|
14 |
+# You should have received a copy of the GNU Lesser General Public
|
|
15 |
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
|
|
16 |
+#
|
|
17 |
+# Authors:
|
|
18 |
+# Tristan Maat <tristan maat codethink co uk>
|
|
19 |
+ |
|
1 | 20 |
from .elementjob import ElementJob
|
2 | 21 |
from .cachesizejob import CacheSizeJob
|
3 | 22 |
from .cleanupjob import CleanupJob
|
... | ... | @@ -21,9 +21,8 @@ from ..._platform import Platform |
21 | 21 |
|
22 | 22 |
|
23 | 23 |
class CleanupJob(Job):
|
24 |
- def __init__(self, *args, complete_cb, **kwargs):
|
|
24 |
+ def __init__(self, *args, **kwargs):
|
|
25 | 25 |
super().__init__(*args, **kwargs)
|
26 |
- self._complete_cb = complete_cb
|
|
27 | 26 |
|
28 | 27 |
platform = Platform.get_platform()
|
29 | 28 |
self._artifacts = platform.artifactcache
|
... | ... | @@ -34,9 +33,3 @@ class CleanupJob(Job): |
34 | 33 |
def parent_complete(self, success, result):
|
35 | 34 |
if success:
|
36 | 35 |
self._artifacts.set_cache_size(result)
|
37 |
- |
|
38 |
- if self._complete_cb:
|
|
39 |
- self._complete_cb()
|
|
40 |
- |
|
41 |
- def child_process_data(self):
|
|
42 |
- return {}
|
... | ... | @@ -101,7 +101,7 @@ class BuildQueue(Queue): |
101 | 101 |
# If the estimated size outgrows the quota, ask the scheduler
|
102 | 102 |
# to queue a job to actually check the real cache size.
|
103 | 103 |
#
|
104 |
- if artifacts.get_quota_exceeded():
|
|
104 |
+ if artifacts.has_quota_exceeded():
|
|
105 | 105 |
self._scheduler.check_cache_size()
|
106 | 106 |
|
107 | 107 |
def done(self, job, element, result, success):
|
... | ... | @@ -351,14 +351,13 @@ class Scheduler(): |
351 | 351 |
platform = Platform.get_platform()
|
352 | 352 |
artifacts = platform.artifactcache
|
353 | 353 |
|
354 |
- if not artifacts.get_quota_exceeded():
|
|
354 |
+ if not artifacts.has_quota_exceeded():
|
|
355 | 355 |
return
|
356 | 356 |
|
357 | 357 |
job = CleanupJob(self, 'cleanup', 'cleanup/cleanup',
|
358 | 358 |
resources=[ResourceType.CACHE,
|
359 | 359 |
ResourceType.PROCESS],
|
360 |
- exclusive_resources=[ResourceType.CACHE],
|
|
361 |
- complete_cb=None)
|
|
360 |
+ exclusive_resources=[ResourceType.CACHE])
|
|
362 | 361 |
self.schedule_jobs([job])
|
363 | 362 |
|
364 | 363 |
# _suspend_jobs()
|