Tiago Gomes pushed to branch tiagogomes/issue-573 at BuildStream / buildstream
Commits:
-
55956762
by Richard Maw at 2018-09-13T16:50:33Z
-
fc7f83ac
by richardmaw-codethink at 2018-09-13T17:14:31Z
-
233a7d83
by Richard Maw at 2018-09-14T08:53:14Z
-
f06f234a
by Richard Maw at 2018-09-14T08:53:14Z
-
f86ab8f6
by richardmaw-codethink at 2018-09-14T09:46:07Z
-
e7427462
by Richard Maw at 2018-09-14T10:28:17Z
-
800a8403
by Richard Maw at 2018-09-14T10:28:17Z
-
d7152ef4
by richardmaw-codethink at 2018-09-14T10:55:16Z
-
160bb0c6
by Tristan Van Berkom at 2018-09-14T12:07:46Z
-
39125d24
by Tristan Van Berkom at 2018-09-14T12:07:46Z
-
f60558a3
by Tristan Van Berkom at 2018-09-14T12:07:46Z
-
ce68fd27
by Tristan Van Berkom at 2018-09-14T12:07:46Z
-
532ec1eb
by Tristan Van Berkom at 2018-09-14T12:07:46Z
-
20b797cb
by Tristan Van Berkom at 2018-09-14T12:07:46Z
-
c2af0d51
by Tristan Van Berkom at 2018-09-14T12:44:42Z
-
924cdc75
by Tiago Gomes at 2018-09-14T15:32:01Z
-
510ccbfd
by Tiago Gomes at 2018-09-14T15:32:21Z
-
82d4e2ac
by Tiago Gomes at 2018-09-14T15:32:21Z
-
18b37aab
by Tiago Gomes at 2018-09-14T15:34:10Z
-
32fad24f
by Tiago Gomes at 2018-09-14T15:38:41Z
-
46cbd889
by Tiago Gomes at 2018-09-14T15:38:43Z
-
2fa92716
by Tiago Gomes at 2018-09-14T15:38:43Z
-
c285f244
by Tiago Gomes at 2018-09-14T16:08:07Z
-
3bd589b9
by Tiago Gomes at 2018-09-14T16:36:06Z
-
fccb722a
by Tiago Gomes at 2018-09-14T16:46:57Z
-
2d9cb0c0
by Tiago Gomes at 2018-09-14T16:47:15Z
-
372c48f2
by Tiago Gomes at 2018-09-14T16:47:18Z
-
e2af2d51
by Tiago Gomes at 2018-09-14T16:47:18Z
-
23b54e60
by Tiago Gomes at 2018-09-14T16:47:19Z
23 changed files:
- buildstream/_artifactcache/artifactcache.py
- buildstream/_artifactcache/cascache.py
- buildstream/_artifactcache/casserver.py
- buildstream/_context.py
- buildstream/_scheduler/jobs/__init__.py
- − buildstream/_scheduler/jobs/cachesizejob.py
- buildstream/_scheduler/jobs/cleanupjob.py
- buildstream/_scheduler/queues/buildqueue.py
- buildstream/_scheduler/queues/pullqueue.py
- buildstream/_scheduler/scheduler.py
- buildstream/_stream.py
- buildstream/element.py
- buildstream/storage/_casbaseddirectory.py
- buildstream/utils.py
- tests/artifactcache/expiry.py
- tests/frontend/logging.py
- tests/frontend/workspace.py
- + tests/integration/project/elements/sockets/make-builddir-socket.bst
- + tests/integration/project/elements/sockets/make-install-root-socket.bst
- + tests/integration/sockets.py
- tests/testutils/__init__.py
- tests/testutils/element_generators.py
- tests/testutils/repo/git.py
Changes:
1 | 1 |
#
|
2 |
-# Copyright (C) 2017-2018 Codethink Limited
|
|
2 |
+# Copyright (C) 2018 Codethink Limited
|
|
3 | 3 |
#
|
4 | 4 |
# This program is free software; you can redistribute it and/or
|
5 | 5 |
# modify it under the terms of the GNU Lesser General Public
|
... | ... | @@ -16,6 +16,7 @@ |
16 | 16 |
#
|
17 | 17 |
# Authors:
|
18 | 18 |
# Tristan Maat <tristan maat codethink co uk>
|
19 |
+# Tiago Gomes <tiago gomes codethink co uk>
|
|
19 | 20 |
|
20 | 21 |
import os
|
21 | 22 |
import string
|
... | ... | @@ -87,8 +88,8 @@ class ArtifactCache(): |
87 | 88 |
self.global_remote_specs = []
|
88 | 89 |
self.project_remote_specs = {}
|
89 | 90 |
|
90 |
- self._required_artifacts = set() # The artifacts required for this session
|
|
91 |
- self._cache_size = None # The current cache size, sometimes it's an estimate
|
|
91 |
+ self._required_elements = set() # The elements required for this session
|
|
92 |
+ self._cache_size = None # The current cache size
|
|
92 | 93 |
self._cache_quota = None # The cache quota
|
93 | 94 |
self._cache_lower_threshold = None # The target cache size for a cleanup
|
94 | 95 |
|
... | ... | @@ -189,33 +190,40 @@ class ArtifactCache(): |
189 | 190 |
(str(provenance)))
|
190 | 191 |
return cache_specs
|
191 | 192 |
|
192 |
- # append_required_artifacts():
|
|
193 |
+ # mark_required_elements():
|
|
193 | 194 |
#
|
194 |
- # Append to the list of elements whose artifacts are required for
|
|
195 |
- # the current run. Artifacts whose elements are in this list will
|
|
196 |
- # be locked by the artifact cache and not touched for the duration
|
|
197 |
- # of the current pipeline.
|
|
195 |
+ # Mark elements whose artifacts are required for the current run.
|
|
196 |
+ #
|
|
197 |
+ # Artifacts whose elements are in this list will be locked by the artifact
|
|
198 |
+ # cache and not touched for the duration of the current pipeline.
|
|
198 | 199 |
#
|
199 | 200 |
# Args:
|
200 | 201 |
# elements (iterable): A set of elements to mark as required
|
201 | 202 |
#
|
202 |
- def append_required_artifacts(self, elements):
|
|
203 |
- # We lock both strong and weak keys - deleting one but not the
|
|
204 |
- # other won't save space in most cases anyway, but would be a
|
|
205 |
- # user inconvenience.
|
|
203 |
+ def mark_required_elements(self, elements):
|
|
204 |
+ |
|
205 |
+ # We risk calling this function with a generator, so we
|
|
206 |
+ # better consume it first.
|
|
207 |
+ #
|
|
208 |
+ elements = list(elements)
|
|
209 |
+ |
|
210 |
+ # Mark the elements as required. We cannot know that we know the
|
|
211 |
+ # cache keys yet, so we only check that later when deleting.
|
|
212 |
+ #
|
|
213 |
+ self._required_elements.update(elements)
|
|
206 | 214 |
|
215 |
+ # For the cache keys which were resolved so far, we bump
|
|
216 |
+ # the atime of them.
|
|
217 |
+ #
|
|
218 |
+ # This is just in case we have concurrent instances of
|
|
219 |
+ # BuildStream running with the same artifact cache, it will
|
|
220 |
+ # reduce the likelyhood of one instance deleting artifacts
|
|
221 |
+ # which are required by the other.
|
|
207 | 222 |
for element in elements:
|
208 | 223 |
strong_key = element._get_cache_key(strength=_KeyStrength.STRONG)
|
209 | 224 |
weak_key = element._get_cache_key(strength=_KeyStrength.WEAK)
|
210 |
- |
|
211 | 225 |
for key in (strong_key, weak_key):
|
212 |
- if key and key not in self._required_artifacts:
|
|
213 |
- self._required_artifacts.add(key)
|
|
214 |
- |
|
215 |
- # We also update the usage times of any artifacts
|
|
216 |
- # we will be using, which helps preventing a
|
|
217 |
- # buildstream process that runs in parallel with
|
|
218 |
- # this one from removing artifacts in-use.
|
|
226 |
+ if key:
|
|
219 | 227 |
try:
|
220 | 228 |
self.update_atime(key)
|
221 | 229 |
except ArtifactError:
|
... | ... | @@ -226,13 +234,25 @@ class ArtifactCache(): |
226 | 234 |
# Clean the artifact cache as much as possible.
|
227 | 235 |
#
|
228 | 236 |
# Returns:
|
229 |
- # (int): The size of the cache after having cleaned up
|
|
237 |
+ # (int): Amount of bytes cleaned from the cache.
|
|
230 | 238 |
#
|
231 | 239 |
def clean(self):
|
232 | 240 |
artifacts = self.list_artifacts()
|
233 | 241 |
|
242 |
+ # Build a set of the cache keys which are required
|
|
243 |
+ # based on the required elements at cleanup time
|
|
244 |
+ #
|
|
245 |
+ # We lock both strong and weak keys - deleting one but not the
|
|
246 |
+ # other won't save space, but would be a user inconvenience.
|
|
247 |
+ required_artifacts = set()
|
|
248 |
+ for element in self._required_elements:
|
|
249 |
+ required_artifacts.update([
|
|
250 |
+ element._get_cache_key(strength=_KeyStrength.STRONG),
|
|
251 |
+ element._get_cache_key(strength=_KeyStrength.WEAK)
|
|
252 |
+ ])
|
|
253 |
+ |
|
234 | 254 |
# Do a real computation of the cache size once, just in case
|
235 |
- self.compute_cache_size()
|
|
255 |
+ old_cache_size = self._cache_size = self.calculate_cache_size()
|
|
236 | 256 |
|
237 | 257 |
while self.get_cache_size() >= self._cache_lower_threshold:
|
238 | 258 |
try:
|
... | ... | @@ -248,7 +268,7 @@ class ArtifactCache(): |
248 | 268 |
"Please increase the cache-quota in {}."
|
249 | 269 |
.format(self.context.config_origin or default_conf))
|
250 | 270 |
|
251 |
- if self.get_quota_exceeded():
|
|
271 |
+ if self.has_quota_exceeded():
|
|
252 | 272 |
raise ArtifactError("Cache too full. Aborting.",
|
253 | 273 |
detail=detail,
|
254 | 274 |
reason="cache-too-full")
|
... | ... | @@ -256,93 +276,69 @@ class ArtifactCache(): |
256 | 276 |
break
|
257 | 277 |
|
258 | 278 |
key = to_remove.rpartition('/')[2]
|
259 |
- if key not in self._required_artifacts:
|
|
279 |
+ if key not in required_artifacts:
|
|
260 | 280 |
|
261 | 281 |
# Remove the actual artifact, if it's not required.
|
262 | 282 |
size = self.remove(to_remove)
|
283 |
+ self._cache_size -= size
|
|
284 |
+ self._message(MessageType.DEBUG,
|
|
285 |
+ "Removed artifact {} ({})".format(
|
|
286 |
+ to_remove[:-(len(key) - self.context.log_key_length)],
|
|
287 |
+ utils._pretty_size(size)))
|
|
263 | 288 |
|
264 |
- # Remove the size from the removed size
|
|
265 |
- self.set_cache_size(self._cache_size - size)
|
|
289 |
+ self._message(MessageType.INFO,
|
|
290 |
+ "New artifact cache size: {}".format(
|
|
291 |
+ utils._pretty_size(self._cache_size)))
|
|
266 | 292 |
|
267 |
- # This should be O(1) if implemented correctly
|
|
268 |
- return self.get_cache_size()
|
|
293 |
+ return old_cache_size - self._cache_size
|
|
269 | 294 |
|
270 |
- # compute_cache_size()
|
|
295 |
+ # add_artifact_size()
|
|
271 | 296 |
#
|
272 |
- # Computes the real artifact cache size by calling
|
|
273 |
- # the abstract calculate_cache_size() method.
|
|
297 |
+ # Adds given artifact size to the cache size
|
|
274 | 298 |
#
|
275 |
- # Returns:
|
|
276 |
- # (int): The size of the artifact cache.
|
|
299 |
+ # Args:
|
|
300 |
+ # artifact_size (int): The artifact size to add.
|
|
277 | 301 |
#
|
278 |
- def compute_cache_size(self):
|
|
279 |
- self._cache_size = self.calculate_cache_size()
|
|
302 |
+ def add_artifact_size(self, artifact_size):
|
|
303 |
+ assert utils._is_main_process()
|
|
280 | 304 |
|
281 |
- return self._cache_size
|
|
305 |
+ self._cache_size = self.get_cache_size() + artifact_size
|
|
306 |
+ self._write_cache_size(self._cache_size)
|
|
282 | 307 |
|
283 |
- # add_artifact_size()
|
|
308 |
+ # subtract_artifact_size()
|
|
284 | 309 |
#
|
285 |
- # Adds the reported size of a newly cached artifact to the
|
|
286 |
- # overall estimated size.
|
|
310 |
+ # Subtracts given artifact size from the cache size
|
|
287 | 311 |
#
|
288 | 312 |
# Args:
|
289 |
- # artifact_size (int): The size to add.
|
|
313 |
+ # artifact_size (int): The artifact size to subtract.
|
|
290 | 314 |
#
|
291 |
- def add_artifact_size(self, artifact_size):
|
|
292 |
- cache_size = self.get_cache_size()
|
|
293 |
- cache_size += artifact_size
|
|
294 |
- |
|
295 |
- self.set_cache_size(cache_size)
|
|
315 |
+ def subtract_artifact_size(self, artifact_size):
|
|
316 |
+ self.add_artifact_size(artifact_size * -1)
|
|
296 | 317 |
|
297 | 318 |
# get_cache_size()
|
298 | 319 |
#
|
299 |
- # Fetches the cached size of the cache, this is sometimes
|
|
300 |
- # an estimate and periodically adjusted to the real size
|
|
301 |
- # when a cache size calculation job runs.
|
|
302 |
- #
|
|
303 |
- # When it is an estimate, the value is either correct, or
|
|
304 |
- # it is greater than the actual cache size.
|
|
320 |
+ # Returns the size of the artifact cache.
|
|
305 | 321 |
#
|
306 | 322 |
# Returns:
|
307 |
- # (int) An approximation of the artifact cache size.
|
|
323 |
+ # (int): The size of the artifact cache.
|
|
308 | 324 |
#
|
309 | 325 |
def get_cache_size(self):
|
326 |
+ if self._cache_size is None:
|
|
327 |
+ self._cache_size = self._read_cache_size()
|
|
310 | 328 |
|
311 |
- # If we don't currently have an estimate, figure out the real cache size.
|
|
312 | 329 |
if self._cache_size is None:
|
313 |
- stored_size = self._read_cache_size()
|
|
314 |
- if stored_size is not None:
|
|
315 |
- self._cache_size = stored_size
|
|
316 |
- else:
|
|
317 |
- self.compute_cache_size()
|
|
330 |
+ self._cache_size = self.calculate_cache_size()
|
|
318 | 331 |
|
319 | 332 |
return self._cache_size
|
320 | 333 |
|
321 |
- # set_cache_size()
|
|
322 |
- #
|
|
323 |
- # Forcefully set the overall cache size.
|
|
324 |
- #
|
|
325 |
- # This is used to update the size in the main process after
|
|
326 |
- # having calculated in a cleanup or a cache size calculation job.
|
|
327 |
- #
|
|
328 |
- # Args:
|
|
329 |
- # cache_size (int): The size to set.
|
|
330 |
- #
|
|
331 |
- def set_cache_size(self, cache_size):
|
|
332 |
- |
|
333 |
- assert cache_size is not None
|
|
334 |
- |
|
335 |
- self._cache_size = cache_size
|
|
336 |
- self._write_cache_size(self._cache_size)
|
|
337 |
- |
|
338 |
- # get_quota_exceeded()
|
|
334 |
+ # has_quota_exceeded()
|
|
339 | 335 |
#
|
340 | 336 |
# Checks if the current artifact cache size exceeds the quota.
|
341 | 337 |
#
|
342 | 338 |
# Returns:
|
343 | 339 |
# (bool): True of the quota is exceeded
|
344 | 340 |
#
|
345 |
- def get_quota_exceeded(self):
|
|
341 |
+ def has_quota_exceeded(self):
|
|
346 | 342 |
return self.get_cache_size() > self._cache_quota
|
347 | 343 |
|
348 | 344 |
################################################
|
... | ... | @@ -441,6 +437,10 @@ class ArtifactCache(): |
441 | 437 |
# content (str): The element's content directory
|
442 | 438 |
# keys (list): The cache keys to use
|
443 | 439 |
#
|
440 |
+ # Returns:
|
|
441 |
+ # (int): Disk size overhead in bytes required to cache the
|
|
442 |
+ # artifact
|
|
443 |
+ #
|
|
444 | 444 |
def commit(self, element, content, keys):
|
445 | 445 |
raise ImplError("Cache '{kind}' does not implement commit()"
|
446 | 446 |
.format(kind=type(self).__name__))
|
... | ... | @@ -512,8 +512,9 @@ class ArtifactCache(): |
512 | 512 |
# progress (callable): The progress callback, if any
|
513 | 513 |
#
|
514 | 514 |
# Returns:
|
515 |
- # (bool): True if pull was successful, False if artifact was not available
|
|
516 |
- #
|
|
515 |
+ # (bool): True if pull was successful, False if artifact was not available
|
|
516 |
+ # (int): Disk size overhead in bytes required to cache the
|
|
517 |
+ # artifact
|
|
517 | 518 |
def pull(self, element, key, *, progress=None):
|
518 | 519 |
raise ImplError("Cache '{kind}' does not implement pull()"
|
519 | 520 |
.format(kind=type(self).__name__))
|
... | ... | @@ -16,6 +16,7 @@ |
16 | 16 |
#
|
17 | 17 |
# Authors:
|
18 | 18 |
# Jürg Billeter <juerg billeter codethink co uk>
|
19 |
+# Tiago Gomes <tiago gomes codethink co uk>
|
|
19 | 20 |
|
20 | 21 |
import hashlib
|
21 | 22 |
import itertools
|
... | ... | @@ -117,11 +118,13 @@ class CASCache(ArtifactCache): |
117 | 118 |
def commit(self, element, content, keys):
|
118 | 119 |
refs = [self.get_artifact_fullname(element, key) for key in keys]
|
119 | 120 |
|
120 |
- tree = self._create_tree(content)
|
|
121 |
+ tree, size = self._commit_directory(content)
|
|
121 | 122 |
|
122 | 123 |
for ref in refs:
|
123 | 124 |
self.set_ref(ref, tree)
|
124 | 125 |
|
126 |
+ return size
|
|
127 |
+ |
|
125 | 128 |
def diff(self, element, key_a, key_b, *, subdir=None):
|
126 | 129 |
ref_a = self.get_artifact_fullname(element, key_a)
|
127 | 130 |
ref_b = self.get_artifact_fullname(element, key_b)
|
... | ... | @@ -239,12 +242,12 @@ class CASCache(ArtifactCache): |
239 | 242 |
tree.hash = response.digest.hash
|
240 | 243 |
tree.size_bytes = response.digest.size_bytes
|
241 | 244 |
|
242 |
- self._fetch_directory(remote, tree)
|
|
245 |
+ size = self._fetch_directory(remote, tree)
|
|
243 | 246 |
|
244 | 247 |
self.set_ref(ref, tree)
|
245 | 248 |
|
246 | 249 |
# no need to pull from additional remotes
|
247 |
- return True
|
|
250 |
+ return True, size
|
|
248 | 251 |
|
249 | 252 |
except grpc.RpcError as e:
|
250 | 253 |
if e.code() != grpc.StatusCode.NOT_FOUND:
|
... | ... | @@ -258,7 +261,7 @@ class CASCache(ArtifactCache): |
258 | 261 |
remote.spec.url, element._get_brief_display_key())
|
259 | 262 |
))
|
260 | 263 |
|
261 |
- return False
|
|
264 |
+ return False, 0
|
|
262 | 265 |
|
263 | 266 |
def pull_tree(self, project, digest):
|
264 | 267 |
""" Pull a single Tree rather than an artifact.
|
... | ... | @@ -437,6 +440,7 @@ class CASCache(ArtifactCache): |
437 | 440 |
#
|
438 | 441 |
# Returns:
|
439 | 442 |
# (Digest): The digest of the added object
|
443 |
+ # (int): The amount of bytes required to store the object
|
|
440 | 444 |
#
|
441 | 445 |
# Either `path` or `buffer` must be passed, but not both.
|
442 | 446 |
#
|
... | ... | @@ -465,22 +469,38 @@ class CASCache(ArtifactCache): |
465 | 469 |
|
466 | 470 |
out.flush()
|
467 | 471 |
|
472 |
+ file_size = os.fstat(out.fileno()).st_size
|
|
473 |
+ |
|
468 | 474 |
digest.hash = h.hexdigest()
|
469 |
- digest.size_bytes = os.fstat(out.fileno()).st_size
|
|
475 |
+ digest.size_bytes = file_size
|
|
470 | 476 |
|
471 | 477 |
# Place file at final location
|
472 | 478 |
objpath = self.objpath(digest)
|
473 |
- os.makedirs(os.path.dirname(objpath), exist_ok=True)
|
|
479 |
+ dirpath = os.path.dirname(objpath)
|
|
480 |
+ |
|
481 |
+ # Track the increased size on the parent directory caused by
|
|
482 |
+ # adding a new entry, as these directories can contain a large
|
|
483 |
+ # number of files.
|
|
484 |
+ new_dir_size = 0
|
|
485 |
+ old_dir_size = 0
|
|
486 |
+ try:
|
|
487 |
+ os.makedirs(dirpath)
|
|
488 |
+ except FileExistsError:
|
|
489 |
+ old_dir_size = os.stat(dirpath).st_size
|
|
490 |
+ else:
|
|
491 |
+ new_dir_size = os.stat(dirpath).st_size
|
|
492 |
+ |
|
474 | 493 |
os.link(out.name, objpath)
|
494 |
+ new_dir_size = os.stat(dirpath).st_size - old_dir_size
|
|
475 | 495 |
|
476 | 496 |
except FileExistsError as e:
|
477 | 497 |
# We can ignore the failed link() if the object is already in the repo.
|
478 |
- pass
|
|
498 |
+ file_size = 0
|
|
479 | 499 |
|
480 | 500 |
except OSError as e:
|
481 | 501 |
raise ArtifactError("Failed to hash object: {}".format(e)) from e
|
482 | 502 |
|
483 |
- return digest
|
|
503 |
+ return digest, file_size + new_dir_size
|
|
484 | 504 |
|
485 | 505 |
# set_ref():
|
486 | 506 |
#
|
... | ... | @@ -489,6 +509,8 @@ class CASCache(ArtifactCache): |
489 | 509 |
# Args:
|
490 | 510 |
# ref (str): The name of the ref
|
491 | 511 |
#
|
512 |
+ # Note: as setting a ref has very low disk size overhead, don't
|
|
513 |
+ # bother to track this.
|
|
492 | 514 |
def set_ref(self, ref, tree):
|
493 | 515 |
refpath = self._refpath(ref)
|
494 | 516 |
os.makedirs(os.path.dirname(refpath), exist_ok=True)
|
... | ... | @@ -665,7 +687,23 @@ class CASCache(ArtifactCache): |
665 | 687 |
def _refpath(self, ref):
|
666 | 688 |
return os.path.join(self.casdir, 'refs', 'heads', ref)
|
667 | 689 |
|
668 |
- def _create_tree(self, path, *, digest=None):
|
|
690 |
+ # _commit_directory():
|
|
691 |
+ #
|
|
692 |
+ # Adds local directory to content addressable store.
|
|
693 |
+ #
|
|
694 |
+ # Adds files, symbolic links and recursively other directories in
|
|
695 |
+ # a local directory to the content addressable store.
|
|
696 |
+ #
|
|
697 |
+ # Args:
|
|
698 |
+ # path (str): Path to the directory to add.
|
|
699 |
+ # dir_digest (Digest): An optional Digest object to use.
|
|
700 |
+ #
|
|
701 |
+ # Returns:
|
|
702 |
+ # (Digest): Digest object for the directory added.
|
|
703 |
+ # (int): Bytes required to cache local directory
|
|
704 |
+ #
|
|
705 |
+ def _commit_directory(self, path, *, dir_digest=None):
|
|
706 |
+ size = 0
|
|
669 | 707 |
directory = remote_execution_pb2.Directory()
|
670 | 708 |
|
671 | 709 |
for name in sorted(os.listdir(path)):
|
... | ... | @@ -674,20 +712,26 @@ class CASCache(ArtifactCache): |
674 | 712 |
if stat.S_ISDIR(mode):
|
675 | 713 |
dirnode = directory.directories.add()
|
676 | 714 |
dirnode.name = name
|
677 |
- self._create_tree(full_path, digest=dirnode.digest)
|
|
715 |
+ size += self._commit_directory(full_path, dir_digest=dirnode.digest)[1]
|
|
678 | 716 |
elif stat.S_ISREG(mode):
|
679 | 717 |
filenode = directory.files.add()
|
680 | 718 |
filenode.name = name
|
681 |
- self.add_object(path=full_path, digest=filenode.digest)
|
|
719 |
+ size += self.add_object(path=full_path, digest=filenode.digest)[1]
|
|
682 | 720 |
filenode.is_executable = (mode & stat.S_IXUSR) == stat.S_IXUSR
|
683 | 721 |
elif stat.S_ISLNK(mode):
|
684 | 722 |
symlinknode = directory.symlinks.add()
|
685 | 723 |
symlinknode.name = name
|
686 | 724 |
symlinknode.target = os.readlink(full_path)
|
725 |
+ elif stat.S_ISSOCK(mode):
|
|
726 |
+ # The process serving the socket can't be cached anyway
|
|
727 |
+ pass
|
|
687 | 728 |
else:
|
688 | 729 |
raise ArtifactError("Unsupported file type for {}".format(full_path))
|
689 | 730 |
|
690 |
- return self.add_object(digest=digest, buffer=directory.SerializeToString())
|
|
731 |
+ dir_digest, dir_object_size = self.add_object(
|
|
732 |
+ digest=dir_digest, buffer=directory.SerializeToString())
|
|
733 |
+ |
|
734 |
+ return dir_digest, size + dir_object_size
|
|
691 | 735 |
|
692 | 736 |
def _get_subdir(self, tree, subdir):
|
693 | 737 |
head, name = os.path.split(subdir)
|
... | ... | @@ -830,14 +874,30 @@ class CASCache(ArtifactCache): |
830 | 874 |
|
831 | 875 |
assert digest.size_bytes == os.fstat(stream.fileno()).st_size
|
832 | 876 |
|
833 |
- def _fetch_directory(self, remote, tree):
|
|
834 |
- objpath = self.objpath(tree)
|
|
877 |
+ # _fetch_directory():
|
|
878 |
+ #
|
|
879 |
+ # Fetches remote directory and adds it to content addressable store.
|
|
880 |
+ #
|
|
881 |
+ # Fetches files, symbolic links and recursively other directories in
|
|
882 |
+ # the remote directory and adds them to the content addressable
|
|
883 |
+ # store.
|
|
884 |
+ #
|
|
885 |
+ # Args:
|
|
886 |
+ # remote (Remote): The remote to use.
|
|
887 |
+ # dir_digest (Digest): Digest object for the directory to fetch.
|
|
888 |
+ #
|
|
889 |
+ # Returns:
|
|
890 |
+ # (int): Bytes required to cache fetched directory
|
|
891 |
+ #
|
|
892 |
+ def _fetch_directory(self, remote, dir_digest):
|
|
893 |
+ size = 0
|
|
894 |
+ objpath = self.objpath(dir_digest)
|
|
835 | 895 |
if os.path.exists(objpath):
|
836 | 896 |
# already in local cache
|
837 |
- return
|
|
897 |
+ return 0
|
|
838 | 898 |
|
839 | 899 |
with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
|
840 |
- self._fetch_blob(remote, tree, out)
|
|
900 |
+ self._fetch_blob(remote, dir_digest, out)
|
|
841 | 901 |
|
842 | 902 |
directory = remote_execution_pb2.Directory()
|
843 | 903 |
|
... | ... | @@ -845,7 +905,7 @@ class CASCache(ArtifactCache): |
845 | 905 |
directory.ParseFromString(f.read())
|
846 | 906 |
|
847 | 907 |
for filenode in directory.files:
|
848 |
- fileobjpath = self.objpath(tree)
|
|
908 |
+ fileobjpath = self.objpath(filenode.digest)
|
|
849 | 909 |
if os.path.exists(fileobjpath):
|
850 | 910 |
# already in local cache
|
851 | 911 |
continue
|
... | ... | @@ -853,16 +913,23 @@ class CASCache(ArtifactCache): |
853 | 913 |
with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
|
854 | 914 |
self._fetch_blob(remote, filenode.digest, f)
|
855 | 915 |
|
856 |
- digest = self.add_object(path=f.name)
|
|
916 |
+ digest, obj_size = self.add_object(path=f.name)
|
|
917 |
+ size += obj_size
|
|
857 | 918 |
assert digest.hash == filenode.digest.hash
|
858 | 919 |
|
859 | 920 |
for dirnode in directory.directories:
|
860 |
- self._fetch_directory(remote, dirnode.digest)
|
|
921 |
+ size += self._fetch_directory(remote, dirnode.digest)
|
|
922 |
+ |
|
923 |
+ # Place directory blob only in final location when we've
|
|
924 |
+ # downloaded all referenced blobs to avoid dangling
|
|
925 |
+ # references in the repository.
|
|
926 |
+ digest, obj_size = self.add_object(path=out.name)
|
|
927 |
+ |
|
928 |
+ assert digest.hash == dir_digest.hash
|
|
929 |
+ |
|
930 |
+ size += obj_size
|
|
861 | 931 |
|
862 |
- # place directory blob only in final location when we've downloaded
|
|
863 |
- # all referenced blobs to avoid dangling references in the repository
|
|
864 |
- digest = self.add_object(path=out.name)
|
|
865 |
- assert digest.hash == tree.hash
|
|
932 |
+ return size
|
|
866 | 933 |
|
867 | 934 |
def _fetch_tree(self, remote, digest):
|
868 | 935 |
# download but do not store the Tree object
|
... | ... | @@ -885,13 +952,13 @@ class CASCache(ArtifactCache): |
885 | 952 |
with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
|
886 | 953 |
self._fetch_blob(remote, filenode.digest, f)
|
887 | 954 |
|
888 |
- added_digest = self.add_object(path=f.name)
|
|
955 |
+ added_digest = self.add_object(path=f.name)[0]
|
|
889 | 956 |
assert added_digest.hash == filenode.digest.hash
|
890 | 957 |
|
891 | 958 |
# place directory blob only in final location when we've downloaded
|
892 | 959 |
# all referenced blobs to avoid dangling references in the repository
|
893 | 960 |
dirbuffer = directory.SerializeToString()
|
894 |
- dirdigest = self.add_object(buffer=dirbuffer)
|
|
961 |
+ dirdigest = self.add_object(buffer=dirbuffer)[0]
|
|
895 | 962 |
assert dirdigest.size_bytes == len(dirbuffer)
|
896 | 963 |
|
897 | 964 |
return dirdigest
|
... | ... | @@ -210,7 +210,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
210 | 210 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
211 | 211 |
return response
|
212 | 212 |
out.flush()
|
213 |
- digest = self.cas.add_object(path=out.name)
|
|
213 |
+ digest = self.cas.add_object(path=out.name)[0]
|
|
214 | 214 |
if digest.hash != client_digest.hash:
|
215 | 215 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
216 | 216 |
return response
|
... | ... | @@ -119,7 +119,6 @@ class Context(): |
119 | 119 |
self._log_handle = None
|
120 | 120 |
self._log_filename = None
|
121 | 121 |
self.config_cache_quota = 'infinity'
|
122 |
- self.artifactdir_volume = None
|
|
123 | 122 |
|
124 | 123 |
# load()
|
125 | 124 |
#
|
1 |
+#
|
|
2 |
+# Copyright (C) 2018 Codethink Limited
|
|
3 |
+#
|
|
4 |
+# This program is free software; you can redistribute it and/or
|
|
5 |
+# modify it under the terms of the GNU Lesser General Public
|
|
6 |
+# License as published by the Free Software Foundation; either
|
|
7 |
+# version 2 of the License, or (at your option) any later version.
|
|
8 |
+#
|
|
9 |
+# This library is distributed in the hope that it will be useful,
|
|
10 |
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
11 |
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
12 |
+# Lesser General Public License for more details.
|
|
13 |
+#
|
|
14 |
+# You should have received a copy of the GNU Lesser General Public
|
|
15 |
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
|
|
16 |
+#
|
|
17 |
+# Authors:
|
|
18 |
+# Tristan Maat <tristan maat codethink co uk>
|
|
19 |
+ |
|
1 | 20 |
from .elementjob import ElementJob
|
2 |
-from .cachesizejob import CacheSizeJob
|
|
3 | 21 |
from .cleanupjob import CleanupJob
|
1 |
-# Copyright (C) 2018 Codethink Limited
|
|
2 |
-#
|
|
3 |
-# This program is free software; you can redistribute it and/or
|
|
4 |
-# modify it under the terms of the GNU Lesser General Public
|
|
5 |
-# License as published by the Free Software Foundation; either
|
|
6 |
-# version 2 of the License, or (at your option) any later version.
|
|
7 |
-#
|
|
8 |
-# This library is distributed in the hope that it will be useful,
|
|
9 |
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
10 |
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
11 |
-# Lesser General Public License for more details.
|
|
12 |
-#
|
|
13 |
-# You should have received a copy of the GNU Lesser General Public
|
|
14 |
-# License along with this library. If not, see <http://www.gnu.org/licenses/>.
|
|
15 |
-#
|
|
16 |
-# Author:
|
|
17 |
-# Tristan Daniël Maat <tristan maat codethink co uk>
|
|
18 |
-#
|
|
19 |
-from .job import Job
|
|
20 |
-from ..._platform import Platform
|
|
21 |
- |
|
22 |
- |
|
23 |
-class CacheSizeJob(Job):
|
|
24 |
- def __init__(self, *args, complete_cb, **kwargs):
|
|
25 |
- super().__init__(*args, **kwargs)
|
|
26 |
- self._complete_cb = complete_cb
|
|
27 |
- |
|
28 |
- platform = Platform.get_platform()
|
|
29 |
- self._artifacts = platform.artifactcache
|
|
30 |
- |
|
31 |
- def child_process(self):
|
|
32 |
- return self._artifacts.compute_cache_size()
|
|
33 |
- |
|
34 |
- def parent_complete(self, success, result):
|
|
35 |
- if success:
|
|
36 |
- self._artifacts.set_cache_size(result)
|
|
37 |
- |
|
38 |
- if self._complete_cb:
|
|
39 |
- self._complete_cb(result)
|
|
40 |
- |
|
41 |
- def child_process_data(self):
|
|
42 |
- return {}
|
... | ... | @@ -21,9 +21,8 @@ from ..._platform import Platform |
21 | 21 |
|
22 | 22 |
|
23 | 23 |
class CleanupJob(Job):
|
24 |
- def __init__(self, *args, complete_cb, **kwargs):
|
|
24 |
+ def __init__(self, *args, **kwargs):
|
|
25 | 25 |
super().__init__(*args, **kwargs)
|
26 |
- self._complete_cb = complete_cb
|
|
27 | 26 |
|
28 | 27 |
platform = Platform.get_platform()
|
29 | 28 |
self._artifacts = platform.artifactcache
|
... | ... | @@ -33,10 +32,4 @@ class CleanupJob(Job): |
33 | 32 |
|
34 | 33 |
def parent_complete(self, success, result):
|
35 | 34 |
if success:
|
36 |
- self._artifacts.set_cache_size(result)
|
|
37 |
- |
|
38 |
- if self._complete_cb:
|
|
39 |
- self._complete_cb()
|
|
40 |
- |
|
41 |
- def child_process_data(self):
|
|
42 |
- return {}
|
|
35 |
+ self._artifacts.subtract_artifact_size(result)
|
1 | 1 |
#
|
2 |
-# Copyright (C) 2016 Codethink Limited
|
|
2 |
+# Copyright (C) 2018 Codethink Limited
|
|
3 | 3 |
#
|
4 | 4 |
# This program is free software; you can redistribute it and/or
|
5 | 5 |
# modify it under the terms of the GNU Lesser General Public
|
... | ... | @@ -87,31 +87,17 @@ class BuildQueue(Queue): |
87 | 87 |
|
88 | 88 |
return QueueStatus.READY
|
89 | 89 |
|
90 |
- def _check_cache_size(self, job, element, artifact_size):
|
|
91 |
- |
|
92 |
- # After completing a build job, add the artifact size
|
|
93 |
- # as returned from Element._assemble() to the estimated
|
|
94 |
- # artifact cache size
|
|
95 |
- #
|
|
96 |
- platform = Platform.get_platform()
|
|
97 |
- artifacts = platform.artifactcache
|
|
98 |
- |
|
99 |
- artifacts.add_artifact_size(artifact_size)
|
|
100 |
- |
|
101 |
- # If the estimated size outgrows the quota, ask the scheduler
|
|
102 |
- # to queue a job to actually check the real cache size.
|
|
103 |
- #
|
|
104 |
- if artifacts.get_quota_exceeded():
|
|
105 |
- self._scheduler.check_cache_size()
|
|
106 |
- |
|
107 | 90 |
def done(self, job, element, result, success):
|
91 |
+ if not success:
|
|
92 |
+ return False
|
|
93 |
+ |
|
94 |
+ element._assemble_done()
|
|
108 | 95 |
|
109 |
- if success:
|
|
110 |
- # Inform element in main process that assembly is done
|
|
111 |
- element._assemble_done()
|
|
96 |
+ artifacts = Platform.get_platform().artifactcache
|
|
97 |
+ artifacts.add_artifact_size(result)
|
|
112 | 98 |
|
113 |
- # This has to be done after _assemble_done, such that the
|
|
114 |
- # element may register its cache key as required
|
|
115 |
- self._check_cache_size(job, element, result)
|
|
99 |
+ # This has to be done after _assemble_done, such that the
|
|
100 |
+ # element may register its cache key as required
|
|
101 |
+ self._scheduler.check_cache_size()
|
|
116 | 102 |
|
117 |
- return True
|
|
103 |
+ return success
|
1 | 1 |
#
|
2 |
-# Copyright (C) 2016 Codethink Limited
|
|
2 |
+# Copyright (C) 2018 Codethink Limited
|
|
3 | 3 |
#
|
4 | 4 |
# This program is free software; you can redistribute it and/or
|
5 | 5 |
# modify it under the terms of the GNU Lesser General Public
|
... | ... | @@ -21,6 +21,7 @@ |
21 | 21 |
# Local imports
|
22 | 22 |
from . import Queue, QueueStatus
|
23 | 23 |
from ..resources import ResourceType
|
24 |
+from ..._platform import Platform
|
|
24 | 25 |
|
25 | 26 |
|
26 | 27 |
# A queue which pulls element artifacts
|
... | ... | @@ -52,18 +53,21 @@ class PullQueue(Queue): |
52 | 53 |
else:
|
53 | 54 |
return QueueStatus.SKIP
|
54 | 55 |
|
55 |
- def done(self, _, element, result, success):
|
|
56 |
- |
|
56 |
+ def done(self, job, element, result, success):
|
|
57 | 57 |
if not success:
|
58 | 58 |
return False
|
59 | 59 |
|
60 | 60 |
element._pull_done()
|
61 | 61 |
|
62 |
- # Build jobs will check the "approximate" size first. Since we
|
|
63 |
- # do not get an artifact size from pull jobs, we have to
|
|
64 |
- # actually check the cache size.
|
|
62 |
+ pulled, artifact_size = result
|
|
63 |
+ |
|
64 |
+ artifacts = Platform.get_platform().artifactcache
|
|
65 |
+ artifacts.add_artifact_size(artifact_size)
|
|
66 |
+ |
|
67 |
+ # This has to be done after _pull_done, such that the
|
|
68 |
+ # element may register its cache key as required
|
|
65 | 69 |
self._scheduler.check_cache_size()
|
66 | 70 |
|
67 | 71 |
# Element._pull() returns True if it downloaded an artifact,
|
68 | 72 |
# here we want to appear skipped if we did not download.
|
69 |
- return result
|
|
73 |
+ return pulled
|
1 | 1 |
#
|
2 |
-# Copyright (C) 2016 Codethink Limited
|
|
2 |
+# Copyright (C) 2018 Codethink Limited
|
|
3 | 3 |
#
|
4 | 4 |
# This program is free software; you can redistribute it and/or
|
5 | 5 |
# modify it under the terms of the GNU Lesser General Public
|
... | ... | @@ -28,7 +28,7 @@ from contextlib import contextmanager |
28 | 28 |
|
29 | 29 |
# Local imports
|
30 | 30 |
from .resources import Resources, ResourceType
|
31 |
-from .jobs import CacheSizeJob, CleanupJob
|
|
31 |
+from .jobs import CleanupJob
|
|
32 | 32 |
from .._platform import Platform
|
33 | 33 |
|
34 | 34 |
|
... | ... | @@ -243,22 +243,19 @@ class Scheduler(): |
243 | 243 |
|
244 | 244 |
# check_cache_size():
|
245 | 245 |
#
|
246 |
- # Queues a cache size calculation job, after the cache
|
|
247 |
- # size is calculated, a cleanup job will be run automatically
|
|
248 |
- # if needed.
|
|
249 |
- #
|
|
250 |
- # FIXME: This should ensure that only one cache size job
|
|
251 |
- # is ever pending at a given time. If a cache size
|
|
252 |
- # job is already running, it is correct to queue
|
|
253 |
- # a new one, it is incorrect to have more than one
|
|
254 |
- # of these jobs pending at a given time, though.
|
|
246 |
+ # Queues a cleanup job if the size of the artifact cache exceeded
|
|
247 |
+ # the quota
|
|
255 | 248 |
#
|
256 | 249 |
def check_cache_size(self):
|
257 |
- job = CacheSizeJob(self, 'cache_size', 'cache_size/cache_size',
|
|
258 |
- resources=[ResourceType.CACHE,
|
|
259 |
- ResourceType.PROCESS],
|
|
260 |
- complete_cb=self._run_cleanup)
|
|
261 |
- self.schedule_jobs([job])
|
|
250 |
+ artifacts = Platform.get_platform().artifactcache
|
|
251 |
+ |
|
252 |
+ if artifacts.has_quota_exceeded():
|
|
253 |
+ job = CleanupJob(self, 'Clean artifact cache',
|
|
254 |
+ 'cleanup/cleanup',
|
|
255 |
+ resources=[ResourceType.CACHE,
|
|
256 |
+ ResourceType.PROCESS],
|
|
257 |
+ exclusive_resources=[ResourceType.CACHE])
|
|
258 |
+ self.schedule_jobs([job])
|
|
262 | 259 |
|
263 | 260 |
#######################################################
|
264 | 261 |
# Local Private Methods #
|
... | ... | @@ -335,32 +332,6 @@ class Scheduler(): |
335 | 332 |
self.schedule_jobs(ready)
|
336 | 333 |
self._sched()
|
337 | 334 |
|
338 |
- # _run_cleanup()
|
|
339 |
- #
|
|
340 |
- # Schedules the cache cleanup job if the passed size
|
|
341 |
- # exceeds the cache quota.
|
|
342 |
- #
|
|
343 |
- # Args:
|
|
344 |
- # cache_size (int): The calculated cache size (ignored)
|
|
345 |
- #
|
|
346 |
- # NOTE: This runs in response to completion of the cache size
|
|
347 |
- # calculation job lauched by Scheduler.check_cache_size(),
|
|
348 |
- # which will report the calculated cache size.
|
|
349 |
- #
|
|
350 |
- def _run_cleanup(self, cache_size):
|
|
351 |
- platform = Platform.get_platform()
|
|
352 |
- artifacts = platform.artifactcache
|
|
353 |
- |
|
354 |
- if not artifacts.get_quota_exceeded():
|
|
355 |
- return
|
|
356 |
- |
|
357 |
- job = CleanupJob(self, 'cleanup', 'cleanup/cleanup',
|
|
358 |
- resources=[ResourceType.CACHE,
|
|
359 |
- ResourceType.PROCESS],
|
|
360 |
- exclusive_resources=[ResourceType.CACHE],
|
|
361 |
- complete_cb=None)
|
|
362 |
- self.schedule_jobs([job])
|
|
363 |
- |
|
364 | 335 |
# _suspend_jobs()
|
365 | 336 |
#
|
366 | 337 |
# Suspend all ongoing jobs.
|
... | ... | @@ -938,13 +938,10 @@ class Stream(): |
938 | 938 |
# Set the "required" artifacts that should not be removed
|
939 | 939 |
# while this pipeline is active
|
940 | 940 |
#
|
941 |
- # FIXME: The set of required artifacts is only really needed
|
|
942 |
- # for build and pull tasks.
|
|
941 |
+ # It must include all the artifacts which are required by the
|
|
942 |
+ # final product. Note that this is a superset of the build plan.
|
|
943 | 943 |
#
|
944 |
- # It must include all the artifacts which are required by the
|
|
945 |
- # final product. Note that this is a superset of the build plan.
|
|
946 |
- #
|
|
947 |
- self._artifacts.append_required_artifacts((e for e in self._pipeline.dependencies(elements, Scope.ALL)))
|
|
944 |
+ self._artifacts.mark_required_elements(self._pipeline.dependencies(elements, Scope.ALL))
|
|
948 | 945 |
|
949 | 946 |
if selection == PipelineSelection.PLAN and dynamic_plan:
|
950 | 947 |
# We use a dynamic build plan, only request artifacts of top-level targets,
|
... | ... | @@ -200,7 +200,6 @@ class Element(Plugin): |
200 | 200 |
self.__strict_cache_key = None # Our cached cache key for strict builds
|
201 | 201 |
self.__artifacts = artifacts # Artifact cache
|
202 | 202 |
self.__consistency = Consistency.INCONSISTENT # Cached overall consistency state
|
203 |
- self.__cached = None # Whether we have a cached artifact
|
|
204 | 203 |
self.__strong_cached = None # Whether we have a cached artifact
|
205 | 204 |
self.__weak_cached = None # Whether we have a cached artifact
|
206 | 205 |
self.__assemble_scheduled = False # Element is scheduled to be assembled
|
... | ... | @@ -1126,8 +1125,6 @@ class Element(Plugin): |
1126 | 1125 |
|
1127 | 1126 |
# Query caches now that the weak and strict cache keys are available
|
1128 | 1127 |
key_for_cache_lookup = self.__strict_cache_key if context.get_strict() else self.__weak_cache_key
|
1129 |
- if not self.__cached:
|
|
1130 |
- self.__cached = self.__artifacts.contains(self, key_for_cache_lookup)
|
|
1131 | 1128 |
if not self.__strong_cached:
|
1132 | 1129 |
self.__strong_cached = self.__artifacts.contains(self, self.__strict_cache_key)
|
1133 | 1130 |
if key_for_cache_lookup == self.__weak_cache_key:
|
... | ... | @@ -1489,15 +1486,20 @@ class Element(Plugin): |
1489 | 1486 |
workspace.clear_running_files()
|
1490 | 1487 |
self._get_context().get_workspaces().save_config()
|
1491 | 1488 |
|
1492 |
- # We also need to update the required artifacts, since
|
|
1493 |
- # workspaced dependencies do not have a fixed cache key
|
|
1494 |
- # when the build starts.
|
|
1489 |
+ # This element will have already been marked as
|
|
1490 |
+ # required, but we bump the atime again, in case
|
|
1491 |
+ # we did not know the cache key until now.
|
|
1495 | 1492 |
#
|
1496 |
- # This does *not* cause a race condition, because
|
|
1497 |
- # _assemble_done is called before a cleanup job may be
|
|
1498 |
- # launched.
|
|
1493 |
+ # FIXME: This is not exactly correct, we should be
|
|
1494 |
+ # doing this at the time which we have discovered
|
|
1495 |
+ # a new cache key, this just happens to be the
|
|
1496 |
+ # last place where that can happen.
|
|
1499 | 1497 |
#
|
1500 |
- self.__artifacts.append_required_artifacts([self])
|
|
1498 |
+ # Ultimately, we should be refactoring
|
|
1499 |
+ # Element._update_state() such that we know
|
|
1500 |
+ # when a cache key is actually discovered.
|
|
1501 |
+ #
|
|
1502 |
+ self.__artifacts.mark_required_elements([self])
|
|
1501 | 1503 |
|
1502 | 1504 |
# _assemble():
|
1503 | 1505 |
#
|
... | ... | @@ -1657,8 +1659,8 @@ class Element(Plugin): |
1657 | 1659 |
}), os.path.join(metadir, 'workspaced-dependencies.yaml'))
|
1658 | 1660 |
|
1659 | 1661 |
with self.timed_activity("Caching artifact"):
|
1660 |
- artifact_size = utils._get_dir_size(assembledir)
|
|
1661 |
- self.__artifacts.commit(self, assembledir, self.__get_cache_keys_for_commit())
|
|
1662 |
+ artifact_size = self.__artifacts.commit(
|
|
1663 |
+ self, assembledir, self.__get_cache_keys_for_commit())
|
|
1662 | 1664 |
|
1663 | 1665 |
if collect is not None and collectvdir is None:
|
1664 | 1666 |
raise ElementError(
|
... | ... | @@ -1710,31 +1712,31 @@ class Element(Plugin): |
1710 | 1712 |
self._update_state()
|
1711 | 1713 |
|
1712 | 1714 |
def _pull_strong(self, *, progress=None):
|
1713 |
- weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
|
|
1714 |
- |
|
1715 | 1715 |
key = self.__strict_cache_key
|
1716 |
- if not self.__artifacts.pull(self, key, progress=progress):
|
|
1717 |
- return False
|
|
1716 |
+ pulled, artifact_size = self.__artifacts.pull(self, key,
|
|
1717 |
+ progress=progress)
|
|
1718 | 1718 |
|
1719 |
- # update weak ref by pointing it to this newly fetched artifact
|
|
1720 |
- self.__artifacts.link_key(self, key, weak_key)
|
|
1719 |
+ if pulled:
|
|
1720 |
+ # update weak ref by pointing it to this newly fetched artifact
|
|
1721 |
+ weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
|
|
1722 |
+ self.__artifacts.link_key(self, key, weak_key)
|
|
1721 | 1723 |
|
1722 |
- return True
|
|
1724 |
+ return pulled, artifact_size
|
|
1723 | 1725 |
|
1724 | 1726 |
def _pull_weak(self, *, progress=None):
|
1725 | 1727 |
weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
|
1728 |
+ pulled, artifact_size = self.__artifacts.pull(self, weak_key,
|
|
1729 |
+ progress=progress)
|
|
1726 | 1730 |
|
1727 |
- if not self.__artifacts.pull(self, weak_key, progress=progress):
|
|
1728 |
- return False
|
|
1729 |
- |
|
1730 |
- # extract strong cache key from this newly fetched artifact
|
|
1731 |
- self._pull_done()
|
|
1731 |
+ if pulled:
|
|
1732 |
+ # extract strong cache key from this newly fetched artifact
|
|
1733 |
+ self._pull_done()
|
|
1732 | 1734 |
|
1733 |
- # create tag for strong cache key
|
|
1734 |
- key = self._get_cache_key(strength=_KeyStrength.STRONG)
|
|
1735 |
- self.__artifacts.link_key(self, weak_key, key)
|
|
1735 |
+ # create tag for strong cache key
|
|
1736 |
+ key = self._get_cache_key(strength=_KeyStrength.STRONG)
|
|
1737 |
+ self.__artifacts.link_key(self, weak_key, key)
|
|
1736 | 1738 |
|
1737 |
- return True
|
|
1739 |
+ return pulled, artifact_size
|
|
1738 | 1740 |
|
1739 | 1741 |
# _pull():
|
1740 | 1742 |
#
|
... | ... | @@ -1749,18 +1751,17 @@ class Element(Plugin): |
1749 | 1751 |
self.status(message)
|
1750 | 1752 |
|
1751 | 1753 |
# Attempt to pull artifact without knowing whether it's available
|
1752 |
- pulled = self._pull_strong(progress=progress)
|
|
1754 |
+ pulled, artifact_size = self._pull_strong(progress=progress)
|
|
1753 | 1755 |
|
1754 | 1756 |
if not pulled and not self._cached() and not context.get_strict():
|
1755 |
- pulled = self._pull_weak(progress=progress)
|
|
1757 |
+ pulled, artifact_size = self._pull_weak(progress=progress)
|
|
1756 | 1758 |
|
1757 |
- if not pulled:
|
|
1758 |
- return False
|
|
1759 |
+ if pulled:
|
|
1760 |
+ # Notify successfull download
|
|
1761 |
+ display_key = self._get_brief_display_key()
|
|
1762 |
+ self.info("Downloaded artifact {}".format(display_key))
|
|
1759 | 1763 |
|
1760 |
- # Notify successfull download
|
|
1761 |
- display_key = self._get_brief_display_key()
|
|
1762 |
- self.info("Downloaded artifact {}".format(display_key))
|
|
1763 |
- return True
|
|
1764 |
+ return pulled, artifact_size
|
|
1764 | 1765 |
|
1765 | 1766 |
# _skip_push():
|
1766 | 1767 |
#
|
... | ... | @@ -2079,7 +2080,7 @@ class Element(Plugin): |
2079 | 2080 |
|
2080 | 2081 |
def __is_cached(self, keystrength):
|
2081 | 2082 |
if keystrength is None:
|
2082 |
- return self.__cached
|
|
2083 |
+ keystrength = _KeyStrength.STRONG if self._get_context().get_strict() else _KeyStrength.WEAK
|
|
2083 | 2084 |
|
2084 | 2085 |
return self.__strong_cached if keystrength == _KeyStrength.STRONG else self.__weak_cached
|
2085 | 2086 |
|
... | ... | @@ -111,7 +111,7 @@ class CasBasedDirectory(Directory): |
111 | 111 |
the parent).
|
112 | 112 |
|
113 | 113 |
"""
|
114 |
- self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString())
|
|
114 |
+ self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString())[0]
|
|
115 | 115 |
if caller:
|
116 | 116 |
old_dir = self._find_pb2_entry(caller.filename)
|
117 | 117 |
self.cas_cache.add_object(digest=old_dir.digest, buffer=caller.pb2_directory.SerializeToString())
|
... | ... | @@ -130,9 +130,10 @@ class CasBasedDirectory(Directory): |
130 | 130 |
self.index[entry.name].buildstream_object._recalculate_recursing_down(entry)
|
131 | 131 |
|
132 | 132 |
if parent:
|
133 |
- self.ref = self.cas_cache.add_object(digest=parent.digest, buffer=self.pb2_directory.SerializeToString())
|
|
133 |
+ self.ref = self.cas_cache.add_object(digest=parent.digest,
|
|
134 |
+ buffer=self.pb2_directory.SerializeToString())[0]
|
|
134 | 135 |
else:
|
135 |
- self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString())
|
|
136 |
+ self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString())[0]
|
|
136 | 137 |
# We don't need to do anything more than that; files were already added ealier, and symlinks are
|
137 | 138 |
# part of the directory structure.
|
138 | 139 |
|
... | ... | @@ -372,6 +372,8 @@ def copy_files(src, dest, *, files=None, ignore_missing=False, report_written=Fa |
372 | 372 |
Directories in `dest` are replaced with files from `src`,
|
373 | 373 |
unless the existing directory in `dest` is not empty in which
|
374 | 374 |
case the path will be reported in the return value.
|
375 |
+ |
|
376 |
+ UNIX domain socket files from `src` are ignored.
|
|
375 | 377 |
"""
|
376 | 378 |
presorted = False
|
377 | 379 |
if files is None:
|
... | ... | @@ -414,6 +416,8 @@ def link_files(src, dest, *, files=None, ignore_missing=False, report_written=Fa |
414 | 416 |
|
415 | 417 |
If a hardlink cannot be created due to crossing filesystems,
|
416 | 418 |
then the file will be copied instead.
|
419 |
+ |
|
420 |
+ UNIX domain socket files from `src` are ignored.
|
|
417 | 421 |
"""
|
418 | 422 |
presorted = False
|
419 | 423 |
if files is None:
|
... | ... | @@ -841,6 +845,13 @@ def _process_list(srcdir, destdir, filelist, actionfunc, result, |
841 | 845 |
os.mknod(destpath, file_stat.st_mode, file_stat.st_rdev)
|
842 | 846 |
os.chmod(destpath, file_stat.st_mode)
|
843 | 847 |
|
848 |
+ elif stat.S_ISFIFO(mode):
|
|
849 |
+ os.mkfifo(destpath, mode)
|
|
850 |
+ |
|
851 |
+ elif stat.S_ISSOCK(mode):
|
|
852 |
+ # We can't duplicate the process serving the socket anyway
|
|
853 |
+ pass
|
|
854 |
+ |
|
844 | 855 |
else:
|
845 | 856 |
# Unsupported type.
|
846 | 857 |
raise UtilError('Cannot extract {} into staging-area. Unsupported type.'.format(srcpath))
|
... | ... | @@ -24,7 +24,7 @@ import pytest |
24 | 24 |
from buildstream import _yaml
|
25 | 25 |
from buildstream._exceptions import ErrorDomain, LoadErrorReason
|
26 | 26 |
|
27 |
-from tests.testutils import cli, create_element_size, wait_for_cache_granularity
|
|
27 |
+from tests.testutils import cli, create_element_size, update_element_size, wait_for_cache_granularity
|
|
28 | 28 |
|
29 | 29 |
|
30 | 30 |
DATA_DIR = os.path.join(
|
... | ... | @@ -93,6 +93,7 @@ def test_artifact_too_large(cli, datafiles, tmpdir, size): |
93 | 93 |
create_element_size('target.bst', project, element_path, [], size)
|
94 | 94 |
res = cli.run(project=project, args=['build', 'target.bst'])
|
95 | 95 |
res.assert_main_error(ErrorDomain.STREAM, None)
|
96 |
+ res.assert_task_error(ErrorDomain.ARTIFACT, 'cache-too-full')
|
|
96 | 97 |
|
97 | 98 |
|
98 | 99 |
@pytest.mark.datafiles(DATA_DIR)
|
... | ... | @@ -196,24 +197,8 @@ def test_keep_dependencies(cli, datafiles, tmpdir): |
196 | 197 |
|
197 | 198 |
|
198 | 199 |
# Assert that we never delete a dependency required for a build tree
|
199 |
-#
|
|
200 |
-# NOTE: This test expects that a build will fail if it attempts to
|
|
201 |
-# put more artifacts in the cache than the quota can hold,
|
|
202 |
-# and expects that the last two elements which don't fit into
|
|
203 |
-# the quota wont even be built.
|
|
204 |
-#
|
|
205 |
-# In real life, this will not be the case, since once we reach
|
|
206 |
-# the estimated quota we launch a cache size calculation job and
|
|
207 |
-# only launch a cleanup job when the size is calculated; and
|
|
208 |
-# other build tasks will be scheduled while the cache size job
|
|
209 |
-# is running.
|
|
210 |
-#
|
|
211 |
-# This test only passes because we configure `builders` to 1,
|
|
212 |
-# ensuring that the cache size job runs exclusively since it
|
|
213 |
-# also requires a compute resource (a "builder").
|
|
214 |
-#
|
|
215 | 200 |
@pytest.mark.datafiles(DATA_DIR)
|
216 |
-def test_never_delete_dependencies(cli, datafiles, tmpdir):
|
|
201 |
+def test_never_delete_required(cli, datafiles, tmpdir):
|
|
217 | 202 |
project = os.path.join(datafiles.dirname, datafiles.basename)
|
218 | 203 |
element_path = 'elements'
|
219 | 204 |
|
... | ... | @@ -226,37 +211,94 @@ def test_never_delete_dependencies(cli, datafiles, tmpdir): |
226 | 211 |
}
|
227 | 212 |
})
|
228 | 213 |
|
229 |
- # Create a build tree
|
|
230 |
- create_element_size('dependency.bst', project,
|
|
231 |
- element_path, [], 8000000)
|
|
232 |
- create_element_size('related.bst', project,
|
|
233 |
- element_path, ['dependency.bst'], 8000000)
|
|
234 |
- create_element_size('target.bst', project,
|
|
235 |
- element_path, ['related.bst'], 8000000)
|
|
236 |
- create_element_size('target2.bst', project,
|
|
237 |
- element_path, ['target.bst'], 8000000)
|
|
214 |
+ # Create a linear build tree
|
|
215 |
+ create_element_size('dep1.bst', project, element_path, [], 8000000)
|
|
216 |
+ create_element_size('dep2.bst', project, element_path, ['dep1.bst'], 8000000)
|
|
217 |
+ create_element_size('dep3.bst', project, element_path, ['dep2.bst'], 8000000)
|
|
218 |
+ create_element_size('target.bst', project, element_path, ['dep3.bst'], 8000000)
|
|
238 | 219 |
|
239 | 220 |
# We try to build this pipeline, but it's too big for the
|
240 | 221 |
# cache. Since all elements are required, the build should fail.
|
241 |
- res = cli.run(project=project, args=['build', 'target2.bst'])
|
|
222 |
+ res = cli.run(project=project, args=['build', 'target.bst'])
|
|
242 | 223 |
res.assert_main_error(ErrorDomain.STREAM, None)
|
224 |
+ res.assert_task_error(ErrorDomain.ARTIFACT, 'cache-too-full')
|
|
243 | 225 |
|
244 |
- assert cli.get_element_state(project, 'dependency.bst') == 'cached'
|
|
226 |
+ # Only the first artifact fits in the cache, but we expect
|
|
227 |
+ # that the first *two* artifacts will be cached.
|
|
228 |
+ #
|
|
229 |
+ # This is because after caching the first artifact we must
|
|
230 |
+ # proceed to build the next artifact, and we cannot really
|
|
231 |
+ # know how large an artifact will be until we try to cache it.
|
|
232 |
+ #
|
|
233 |
+ # In this case, we deem it more acceptable to not delete an
|
|
234 |
+ # artifact which caused the cache to outgrow the quota.
|
|
235 |
+ #
|
|
236 |
+ # Note that this test only works because we have forced
|
|
237 |
+ # the configuration to build one element at a time, in real
|
|
238 |
+ # life there may potentially be N-builders cached artifacts
|
|
239 |
+ # which exceed the quota
|
|
240 |
+ #
|
|
241 |
+ assert cli.get_element_state(project, 'dep1.bst') == 'cached'
|
|
242 |
+ assert cli.get_element_state(project, 'dep2.bst') == 'cached'
|
|
243 |
+ |
|
244 |
+ assert cli.get_element_state(project, 'dep3.bst') != 'cached'
|
|
245 |
+ assert cli.get_element_state(project, 'target.bst') != 'cached'
|
|
246 |
+ |
|
247 |
+ |
|
248 |
+# Assert that we never delete a dependency required for a build tree,
|
|
249 |
+# even when the artifact cache was previously populated with
|
|
250 |
+# artifacts we do not require, and the new build is run with dynamic tracking.
|
|
251 |
+#
|
|
252 |
+@pytest.mark.datafiles(DATA_DIR)
|
|
253 |
+def test_never_delete_required_track(cli, datafiles, tmpdir):
|
|
254 |
+ project = os.path.join(datafiles.dirname, datafiles.basename)
|
|
255 |
+ element_path = 'elements'
|
|
256 |
+ |
|
257 |
+ cli.configure({
|
|
258 |
+ 'cache': {
|
|
259 |
+ 'quota': 10000000
|
|
260 |
+ },
|
|
261 |
+ 'scheduler': {
|
|
262 |
+ 'builders': 1
|
|
263 |
+ }
|
|
264 |
+ })
|
|
245 | 265 |
|
246 |
- # This is *technically* above the cache limit. BuildStream accepts
|
|
247 |
- # some fuzziness, since it's hard to assert that we don't create
|
|
248 |
- # an artifact larger than the cache quota. We would have to remove
|
|
249 |
- # the artifact after-the-fact, but since it is required for the
|
|
250 |
- # current build and nothing broke yet, it's nicer to keep it
|
|
251 |
- # around.
|
|
266 |
+ # Create a linear build tree
|
|
267 |
+ repo_dep1 = create_element_size('dep1.bst', project, element_path, [], 2000000)
|
|
268 |
+ repo_dep2 = create_element_size('dep2.bst', project, element_path, ['dep1.bst'], 2000000)
|
|
269 |
+ repo_dep3 = create_element_size('dep3.bst', project, element_path, ['dep2.bst'], 2000000)
|
|
270 |
+ repo_target = create_element_size('target.bst', project, element_path, ['dep3.bst'], 2000000)
|
|
271 |
+ |
|
272 |
+ # This should all fit into the artifact cache
|
|
273 |
+ res = cli.run(project=project, args=['build', 'target.bst'])
|
|
274 |
+ res.assert_success()
|
|
275 |
+ |
|
276 |
+ # They should all be cached
|
|
277 |
+ assert cli.get_element_state(project, 'dep1.bst') == 'cached'
|
|
278 |
+ assert cli.get_element_state(project, 'dep2.bst') == 'cached'
|
|
279 |
+ assert cli.get_element_state(project, 'dep3.bst') == 'cached'
|
|
280 |
+ assert cli.get_element_state(project, 'target.bst') == 'cached'
|
|
281 |
+ |
|
282 |
+ # Now increase the size of all the elements
|
|
252 | 283 |
#
|
253 |
- # This scenario is quite unlikely, and the cache overflow will be
|
|
254 |
- # resolved if the user does something about it anyway.
|
|
284 |
+ update_element_size('dep1.bst', project, repo_dep1, 8000000)
|
|
285 |
+ update_element_size('dep2.bst', project, repo_dep2, 8000000)
|
|
286 |
+ update_element_size('dep3.bst', project, repo_dep3, 8000000)
|
|
287 |
+ update_element_size('target.bst', project, repo_target, 8000000)
|
|
288 |
+ |
|
289 |
+ # Now repeat the same test we did in test_never_delete_required(),
|
|
290 |
+ # except this time let's add dynamic tracking
|
|
255 | 291 |
#
|
256 |
- assert cli.get_element_state(project, 'related.bst') == 'cached'
|
|
292 |
+ res = cli.run(project=project, args=['build', '--track-all', 'target.bst'])
|
|
293 |
+ res.assert_main_error(ErrorDomain.STREAM, None)
|
|
294 |
+ res.assert_task_error(ErrorDomain.ARTIFACT, 'cache-too-full')
|
|
257 | 295 |
|
296 |
+ # Expect the same result that we did in test_never_delete_required()
|
|
297 |
+ #
|
|
298 |
+ assert cli.get_element_state(project, 'dep1.bst') == 'cached'
|
|
299 |
+ assert cli.get_element_state(project, 'dep2.bst') == 'cached'
|
|
300 |
+ assert cli.get_element_state(project, 'dep3.bst') != 'cached'
|
|
258 | 301 |
assert cli.get_element_state(project, 'target.bst') != 'cached'
|
259 |
- assert cli.get_element_state(project, 'target2.bst') != 'cached'
|
|
260 | 302 |
|
261 | 303 |
|
262 | 304 |
# Ensure that only valid cache quotas make it through the loading
|
... | ... | @@ -54,8 +54,7 @@ def test_custom_logging(cli, tmpdir, datafiles): |
54 | 54 |
|
55 | 55 |
custom_log_format = '%{elapsed},%{elapsed-us},%{wallclock},%{key},%{element},%{action},%{message}'
|
56 | 56 |
user_config = {'logging': {'message-format': custom_log_format}}
|
57 |
- user_config_file = str(tmpdir.join('buildstream.conf'))
|
|
58 |
- _yaml.dump(_yaml.node_sanitize(user_config), filename=user_config_file)
|
|
57 |
+ cli.configure(user_config)
|
|
59 | 58 |
|
60 | 59 |
# Create our repo object of the given source type with
|
61 | 60 |
# the bin files, and then collect the initial ref.
|
... | ... | @@ -75,7 +74,7 @@ def test_custom_logging(cli, tmpdir, datafiles): |
75 | 74 |
element_name))
|
76 | 75 |
|
77 | 76 |
# Now try to fetch it
|
78 |
- result = cli.run(project=project, args=['-c', user_config_file, 'fetch', element_name])
|
|
77 |
+ result = cli.run(project=project, args=['fetch', element_name])
|
|
79 | 78 |
result.assert_success()
|
80 | 79 |
|
81 | 80 |
m = re.search("\d\d:\d\d:\d\d,\d\d:\d\d:\d\d.\d{6},\d\d:\d\d:\d\d,,,SUCCESS,Checking sources", result.stderr)
|
... | ... | @@ -43,10 +43,13 @@ DATA_DIR = os.path.join( |
43 | 43 |
)
|
44 | 44 |
|
45 | 45 |
|
46 |
-def open_workspace(cli, tmpdir, datafiles, kind, track, suffix='', workspace_dir=None):
|
|
46 |
+def open_workspace(cli, tmpdir, datafiles, kind, track, suffix='', workspace_dir=None, project_path=None):
|
|
47 | 47 |
if not workspace_dir:
|
48 | 48 |
workspace_dir = os.path.join(str(tmpdir), 'workspace{}'.format(suffix))
|
49 |
- project_path = os.path.join(datafiles.dirname, datafiles.basename)
|
|
49 |
+ if not project_path:
|
|
50 |
+ project_path = os.path.join(datafiles.dirname, datafiles.basename)
|
|
51 |
+ else:
|
|
52 |
+ shutil.copytree(os.path.join(datafiles.dirname, datafiles.basename), project_path)
|
|
50 | 53 |
bin_files_path = os.path.join(project_path, 'files', 'bin-files')
|
51 | 54 |
element_path = os.path.join(project_path, 'elements')
|
52 | 55 |
element_name = 'workspace-test-{}{}.bst'.format(kind, suffix)
|
... | ... | @@ -218,41 +221,42 @@ def test_close(cli, tmpdir, datafiles, kind): |
218 | 221 |
|
219 | 222 |
@pytest.mark.datafiles(DATA_DIR)
|
220 | 223 |
def test_close_external_after_move_project(cli, tmpdir, datafiles):
|
221 |
- tmp_parent = os.path.dirname(str(tmpdir))
|
|
222 |
- workspace_dir = os.path.join(tmp_parent, "workspace")
|
|
223 |
- element_name, project_path, _ = open_workspace(cli, tmpdir, datafiles, 'git', False, "", workspace_dir)
|
|
224 |
+ workspace_dir = os.path.join(str(tmpdir), "workspace")
|
|
225 |
+ project_path = os.path.join(str(tmpdir), 'initial_project')
|
|
226 |
+ element_name, _, _ = open_workspace(cli, tmpdir, datafiles, 'git', False, "", workspace_dir, project_path)
|
|
224 | 227 |
assert os.path.exists(workspace_dir)
|
225 |
- tmp_dir = os.path.join(tmp_parent, 'external_project')
|
|
226 |
- shutil.move(project_path, tmp_dir)
|
|
227 |
- assert os.path.exists(tmp_dir)
|
|
228 |
+ moved_dir = os.path.join(str(tmpdir), 'external_project')
|
|
229 |
+ shutil.move(project_path, moved_dir)
|
|
230 |
+ assert os.path.exists(moved_dir)
|
|
228 | 231 |
|
229 | 232 |
# Close the workspace
|
230 |
- result = cli.run(configure=False, project=tmp_dir, args=[
|
|
233 |
+ result = cli.run(project=moved_dir, args=[
|
|
231 | 234 |
'workspace', 'close', '--remove-dir', element_name
|
232 | 235 |
])
|
233 | 236 |
result.assert_success()
|
234 | 237 |
|
235 | 238 |
# Assert the workspace dir has been deleted
|
236 | 239 |
assert not os.path.exists(workspace_dir)
|
237 |
- # Move directory back inside tmp directory so it can be recognised
|
|
238 |
- shutil.move(tmp_dir, project_path)
|
|
239 | 240 |
|
240 | 241 |
|
241 | 242 |
@pytest.mark.datafiles(DATA_DIR)
|
242 | 243 |
def test_close_internal_after_move_project(cli, tmpdir, datafiles):
|
243 |
- element_name, project, _ = open_workspace(cli, tmpdir, datafiles, 'git', False)
|
|
244 |
- tmp_dir = os.path.join(os.path.dirname(str(tmpdir)), 'external_project')
|
|
245 |
- shutil.move(str(tmpdir), tmp_dir)
|
|
246 |
- assert os.path.exists(tmp_dir)
|
|
244 |
+ initial_dir = os.path.join(str(tmpdir), 'initial_project')
|
|
245 |
+ initial_workspace = os.path.join(initial_dir, 'workspace')
|
|
246 |
+ element_name, _, _ = open_workspace(cli, tmpdir, datafiles, 'git', False,
|
|
247 |
+ workspace_dir=initial_workspace, project_path=initial_dir)
|
|
248 |
+ moved_dir = os.path.join(str(tmpdir), 'internal_project')
|
|
249 |
+ shutil.move(initial_dir, moved_dir)
|
|
250 |
+ assert os.path.exists(moved_dir)
|
|
247 | 251 |
|
248 | 252 |
# Close the workspace
|
249 |
- result = cli.run(configure=False, project=tmp_dir, args=[
|
|
253 |
+ result = cli.run(project=moved_dir, args=[
|
|
250 | 254 |
'workspace', 'close', '--remove-dir', element_name
|
251 | 255 |
])
|
252 | 256 |
result.assert_success()
|
253 | 257 |
|
254 | 258 |
# Assert the workspace dir has been deleted
|
255 |
- workspace = os.path.join(tmp_dir, 'workspace')
|
|
259 |
+ workspace = os.path.join(moved_dir, 'workspace')
|
|
256 | 260 |
assert not os.path.exists(workspace)
|
257 | 261 |
|
258 | 262 |
|
1 |
+kind: manual
|
|
2 |
+ |
|
3 |
+depends:
|
|
4 |
+- filename: base.bst
|
|
5 |
+ type: build
|
|
6 |
+ |
|
7 |
+config:
|
|
8 |
+ build-commands:
|
|
9 |
+ - |
|
|
10 |
+ python3 -c '
|
|
11 |
+ from socket import socket, AF_UNIX, SOCK_STREAM
|
|
12 |
+ s = socket(AF_UNIX, SOCK_STREAM)
|
|
13 |
+ s.bind("testsocket")
|
|
14 |
+ '
|
1 |
+kind: manual
|
|
2 |
+ |
|
3 |
+depends:
|
|
4 |
+- filename: base.bst
|
|
5 |
+ type: build
|
|
6 |
+ |
|
7 |
+config:
|
|
8 |
+ install-commands:
|
|
9 |
+ - |
|
|
10 |
+ python3 -c '
|
|
11 |
+ from os.path import join
|
|
12 |
+ from sys import argv
|
|
13 |
+ from socket import socket, AF_UNIX, SOCK_STREAM
|
|
14 |
+ s = socket(AF_UNIX, SOCK_STREAM)
|
|
15 |
+ s.bind(join(argv[1], "testsocket"))
|
|
16 |
+ ' %{install-root}
|
1 |
+import os
|
|
2 |
+import pytest
|
|
3 |
+ |
|
4 |
+from buildstream import _yaml
|
|
5 |
+ |
|
6 |
+from tests.testutils import cli_integration as cli
|
|
7 |
+from tests.testutils.integration import assert_contains
|
|
8 |
+ |
|
9 |
+ |
|
10 |
+pytestmark = pytest.mark.integration
|
|
11 |
+ |
|
12 |
+DATA_DIR = os.path.join(
|
|
13 |
+ os.path.dirname(os.path.realpath(__file__)),
|
|
14 |
+ "project"
|
|
15 |
+)
|
|
16 |
+ |
|
17 |
+ |
|
18 |
+@pytest.mark.datafiles(DATA_DIR)
|
|
19 |
+def test_builddir_socket_ignored(cli, tmpdir, datafiles):
|
|
20 |
+ project = os.path.join(datafiles.dirname, datafiles.basename)
|
|
21 |
+ element_name = 'sockets/make-builddir-socket.bst'
|
|
22 |
+ |
|
23 |
+ result = cli.run(project=project, args=['build', element_name])
|
|
24 |
+ assert result.exit_code == 0
|
|
25 |
+ |
|
26 |
+ |
|
27 |
+@pytest.mark.datafiles(DATA_DIR)
|
|
28 |
+def test_install_root_socket_ignored(cli, tmpdir, datafiles):
|
|
29 |
+ project = os.path.join(datafiles.dirname, datafiles.basename)
|
|
30 |
+ element_name = 'sockets/make-install-root-socket.bst'
|
|
31 |
+ |
|
32 |
+ result = cli.run(project=project, args=['build', element_name])
|
|
33 |
+ assert result.exit_code == 0
|
... | ... | @@ -26,6 +26,6 @@ |
26 | 26 |
from .runcli import cli, cli_integration
|
27 | 27 |
from .repo import create_repo, ALL_REPO_KINDS
|
28 | 28 |
from .artifactshare import create_artifact_share
|
29 |
-from .element_generators import create_element_size
|
|
29 |
+from .element_generators import create_element_size, update_element_size
|
|
30 | 30 |
from .junction import generate_junction
|
31 | 31 |
from .runner_integration import wait_for_cache_granularity
|
1 | 1 |
import os
|
2 | 2 |
|
3 | 3 |
from buildstream import _yaml
|
4 |
+from buildstream import utils
|
|
5 |
+ |
|
6 |
+from . import create_repo
|
|
4 | 7 |
|
5 | 8 |
|
6 | 9 |
# create_element_size()
|
7 | 10 |
#
|
8 |
-# This will open a "<name>_data" file for writing and write
|
|
9 |
-# <size> MB of urandom (/dev/urandom) "stuff" into the file.
|
|
10 |
-# A bst import element file is then created: <name>.bst
|
|
11 |
+# Creates an import element with a git repo, using random
|
|
12 |
+# data to create a file in that repo of the specified size,
|
|
13 |
+# such that building it will add an artifact of the specified
|
|
14 |
+# size to the artifact cache.
|
|
11 | 15 |
#
|
12 | 16 |
# Args:
|
13 |
-# name: (str) of the element name (e.g. target.bst)
|
|
14 |
-# path: (str) pathway to the project/elements directory
|
|
15 |
-# dependencies: A list of strings (can also be an empty list)
|
|
16 |
-# size: (int) size of the element in bytes
|
|
17 |
+# name: (str) of the element name (e.g. target.bst)
|
|
18 |
+# project_dir (str): The path to the project
|
|
19 |
+# element_path (str): The element path within the project
|
|
20 |
+# dependencies: A list of strings (can also be an empty list)
|
|
21 |
+# size: (int) size of the element in bytes
|
|
17 | 22 |
#
|
18 | 23 |
# Returns:
|
19 |
-# Nothing (creates a .bst file of specified size)
|
|
24 |
+# (Repo): A git repo which can be used to introduce trackable changes
|
|
25 |
+# by using the update_element_size() function below.
|
|
20 | 26 |
#
|
21 | 27 |
def create_element_size(name, project_dir, elements_path, dependencies, size):
|
22 | 28 |
full_elements_path = os.path.join(project_dir, elements_path)
|
23 | 29 |
os.makedirs(full_elements_path, exist_ok=True)
|
24 | 30 |
|
25 |
- # Create a file to be included in this element's artifact
|
|
26 |
- with open(os.path.join(project_dir, name + '_data'), 'wb+') as f:
|
|
27 |
- f.write(os.urandom(size))
|
|
31 |
+ # Create a git repo
|
|
32 |
+ repodir = os.path.join(project_dir, 'repos')
|
|
33 |
+ repo = create_repo('git', repodir, subdir=name)
|
|
34 |
+ |
|
35 |
+ with utils._tempdir(dir=project_dir) as tmp:
|
|
36 |
+ |
|
37 |
+ # We use a data/ subdir in the git repo we create,
|
|
38 |
+ # and we set the import element to only extract that
|
|
39 |
+ # part; this ensures we never include a .git/ directory
|
|
40 |
+ # in the cached artifacts for these sized elements.
|
|
41 |
+ #
|
|
42 |
+ datadir = os.path.join(tmp, 'data')
|
|
43 |
+ os.makedirs(datadir)
|
|
44 |
+ |
|
45 |
+ # Use /dev/urandom to create the sized file in the datadir
|
|
46 |
+ with open(os.path.join(datadir, name), 'wb+') as f:
|
|
47 |
+ f.write(os.urandom(size))
|
|
48 |
+ |
|
49 |
+ # Create the git repo from the temp directory
|
|
50 |
+ ref = repo.create(tmp)
|
|
28 | 51 |
|
29 |
- # Simplest case: We want this file (of specified size) to just
|
|
30 |
- # be an import element.
|
|
31 | 52 |
element = {
|
32 | 53 |
'kind': 'import',
|
33 | 54 |
'sources': [
|
34 |
- {
|
|
35 |
- 'kind': 'local',
|
|
36 |
- 'path': name + '_data'
|
|
37 |
- }
|
|
55 |
+ repo.source_config(ref=ref)
|
|
38 | 56 |
],
|
57 |
+ 'config': {
|
|
58 |
+ # Extract only the data directory
|
|
59 |
+ 'source': 'data'
|
|
60 |
+ },
|
|
39 | 61 |
'depends': dependencies
|
40 | 62 |
}
|
41 | 63 |
_yaml.dump(element, os.path.join(project_dir, elements_path, name))
|
64 |
+ |
|
65 |
+ # Return the repo, so that it can later be used to add commits
|
|
66 |
+ return repo
|
|
67 |
+ |
|
68 |
+ |
|
69 |
+# update_element_size()
|
|
70 |
+#
|
|
71 |
+# Updates a repo returned by create_element_size() such that
|
|
72 |
+# the newly added commit is completely changed, and has the newly
|
|
73 |
+# specified size.
|
|
74 |
+#
|
|
75 |
+# The name and project_dir arguments must match the arguments
|
|
76 |
+# previously given to create_element_size()
|
|
77 |
+#
|
|
78 |
+# Args:
|
|
79 |
+# name: (str) of the element name (e.g. target.bst)
|
|
80 |
+# project_dir (str): The path to the project
|
|
81 |
+# repo: (Repo) The Repo returned by create_element_size()
|
|
82 |
+# size: (int) The new size which the element generates, in bytes
|
|
83 |
+#
|
|
84 |
+# Returns:
|
|
85 |
+# (Repo): A git repo which can be used to introduce trackable changes
|
|
86 |
+# by using the update_element_size() function below.
|
|
87 |
+#
|
|
88 |
+def update_element_size(name, project_dir, repo, size):
|
|
89 |
+ |
|
90 |
+ with utils._tempdir(dir=project_dir) as tmp:
|
|
91 |
+ |
|
92 |
+ new_file = os.path.join(tmp, name)
|
|
93 |
+ |
|
94 |
+ # Use /dev/urandom to create the sized file in the datadir
|
|
95 |
+ with open(new_file, 'wb+') as f:
|
|
96 |
+ f.write(os.urandom(size))
|
|
97 |
+ |
|
98 |
+ # Modify the git repo with a new commit to the same path,
|
|
99 |
+ # replacing the original file with a new one.
|
|
100 |
+ repo.modify_file(new_file, os.path.join('data', name))
|
... | ... | @@ -52,6 +52,13 @@ class Git(Repo): |
52 | 52 |
self._run_git('commit', '-m', 'Added {}'.format(os.path.basename(filename)))
|
53 | 53 |
return self.latest_commit()
|
54 | 54 |
|
55 |
+ def modify_file(self, new_file, path):
|
|
56 |
+ shutil.copy(new_file, os.path.join(self.repo, path))
|
|
57 |
+ subprocess.call([
|
|
58 |
+ 'git', 'commit', path, '-m', 'Modified {}'.format(os.path.basename(path))
|
|
59 |
+ ], env=GIT_ENV, cwd=self.repo)
|
|
60 |
+ return self.latest_commit()
|
|
61 |
+ |
|
55 | 62 |
def add_submodule(self, subdir, url=None, checkout=None):
|
56 | 63 |
submodule = {}
|
57 | 64 |
if checkout is not None:
|