[Notes] [Git][BuildStream/buildstream][tiagogomes/issue-573] 16 commits: Convert uses of external_directory to get_underlying_directory()



Title: GitLab

Tiago Gomes pushed to branch tiagogomes/issue-573 at BuildStream / buildstream

Commits:

26 changed files:

Changes:

  • buildstream/__init__.py
    ... ... @@ -30,6 +30,7 @@ if "_BST_COMPLETION" not in os.environ:
    30 30
         from .sandbox import Sandbox, SandboxFlags
    
    31 31
         from .plugin import Plugin
    
    32 32
         from .source import Source, SourceError, Consistency, SourceFetcher
    
    33
    -    from .element import Element, ElementError, Scope
    
    33
    +    from .element import Element, ElementError
    
    34
    +    from .element_enums import Scope
    
    34 35
         from .buildelement import BuildElement
    
    35 36
         from .scriptelement import ScriptElement

  • buildstream/_artifactcache/artifactcache.py
    ... ... @@ -16,12 +16,13 @@
    16 16
     #
    
    17 17
     #  Authors:
    
    18 18
     #        Tristan Maat <tristan maat codethink co uk>
    
    19
    +#        Tiago Gomes <tiago gomes codethink co uk>
    
    19 20
     
    
    20 21
     import os
    
    21 22
     import string
    
    22 23
     from collections import Mapping, namedtuple
    
    23 24
     
    
    24
    -from ..element import _KeyStrength
    
    25
    +from ..element_enums import _KeyStrength
    
    25 26
     from .._exceptions import ArtifactError, ImplError, LoadError, LoadErrorReason
    
    26 27
     from .._message import Message, MessageType
    
    27 28
     from .. import utils
    
    ... ... @@ -83,7 +84,6 @@ class ArtifactCache():
    83 84
             self.tmpdir = os.path.join(context.artifactdir, 'tmp')
    
    84 85
     
    
    85 86
             self.max_size = context.cache_quota
    
    86
    -        self.estimated_size = None
    
    87 87
     
    
    88 88
             self.global_remote_specs = []
    
    89 89
             self.project_remote_specs = {}
    
    ... ... @@ -226,8 +226,9 @@ class ArtifactCache():
    226 226
         #
    
    227 227
         def clean(self):
    
    228 228
             artifacts = self.list_artifacts()
    
    229
    +        old_cache_size = self.cache_size
    
    229 230
     
    
    230
    -        while self.calculate_cache_size() >= self.context.cache_quota - self.context.cache_lower_threshold:
    
    231
    +        while self.cache_size >= self.context.cache_quota - self.context.cache_lower_threshold:
    
    231 232
                 try:
    
    232 233
                     to_remove = artifacts.pop(0)
    
    233 234
                 except IndexError:
    
    ... ... @@ -241,7 +242,7 @@ class ArtifactCache():
    241 242
                               "Please increase the cache-quota in {}."
    
    242 243
                               .format(self.context.config_origin or default_conf))
    
    243 244
     
    
    244
    -                if self.calculate_cache_size() > self.context.cache_quota:
    
    245
    +                if self.cache_size > self.context.cache_quota:
    
    245 246
                         raise ArtifactError("Cache too full. Aborting.",
    
    246 247
                                             detail=detail,
    
    247 248
                                             reason="cache-too-full")
    
    ... ... @@ -251,40 +252,17 @@ class ArtifactCache():
    251 252
                 key = to_remove.rpartition('/')[2]
    
    252 253
                 if key not in self.required_artifacts:
    
    253 254
                     size = self.remove(to_remove)
    
    254
    -                if size:
    
    255
    -                    self.cache_size -= size
    
    255
    +                self.cache_size -= size
    
    256
    +                self._message(MessageType.DEBUG,
    
    257
    +                              "Removed artifact {} ({})".format(
    
    258
    +                                  to_remove[:-(len(key) - self.context.log_key_length)],
    
    259
    +                                  utils._pretty_size(size)))
    
    256 260
     
    
    257
    -        # This should be O(1) if implemented correctly
    
    258
    -        return self.calculate_cache_size()
    
    261
    +        self._message(MessageType.INFO,
    
    262
    +                      "New artifact cache size: {}".format(
    
    263
    +                          utils._pretty_size(self.cache_size)))
    
    259 264
     
    
    260
    -    # get_approximate_cache_size()
    
    261
    -    #
    
    262
    -    # A cheap method that aims to serve as an upper limit on the
    
    263
    -    # artifact cache size.
    
    264
    -    #
    
    265
    -    # The cache size reported by this function will normally be larger
    
    266
    -    # than the real cache size, since it is calculated using the
    
    267
    -    # pre-commit artifact size, but for very small artifacts in
    
    268
    -    # certain caches additional overhead could cause this to be
    
    269
    -    # smaller than, but close to, the actual size.
    
    270
    -    #
    
    271
    -    # Nonetheless, in practice this should be safe to use as an upper
    
    272
    -    # limit on the cache size.
    
    273
    -    #
    
    274
    -    # If the cache has built-in constant-time size reporting, please
    
    275
    -    # feel free to override this method with a more accurate
    
    276
    -    # implementation.
    
    277
    -    #
    
    278
    -    # Returns:
    
    279
    -    #     (int) An approximation of the artifact cache size.
    
    280
    -    #
    
    281
    -    def get_approximate_cache_size(self):
    
    282
    -        # If we don't currently have an estimate, figure out the real
    
    283
    -        # cache size.
    
    284
    -        if self.estimated_size is None:
    
    285
    -            self.estimated_size = self.calculate_cache_size()
    
    286
    -
    
    287
    -        return self.estimated_size
    
    265
    +        return old_cache_size - self.cache_size
    
    288 266
     
    
    289 267
         ################################################
    
    290 268
         # Abstract methods for subclasses to implement #
    
    ... ... @@ -382,6 +360,10 @@ class ArtifactCache():
    382 360
         #     content (str): The element's content directory
    
    383 361
         #     keys (list): The cache keys to use
    
    384 362
         #
    
    363
    +    # Returns:
    
    364
    +    #   (int): Bytes required to cache the artifact taking deduplication
    
    365
    +    #          into account
    
    366
    +    #
    
    385 367
         def commit(self, element, content, keys):
    
    386 368
             raise ImplError("Cache '{kind}' does not implement commit()"
    
    387 369
                             .format(kind=type(self).__name__))
    
    ... ... @@ -454,6 +436,8 @@ class ArtifactCache():
    454 436
         #
    
    455 437
         # Returns:
    
    456 438
         #   (bool): True if pull was successful, False if artifact was not available
    
    439
    +    #   (int): Bytes required to cache the artifact taking deduplication
    
    440
    +    #          into account
    
    457 441
         #
    
    458 442
         def pull(self, element, key, *, progress=None):
    
    459 443
             raise ImplError("Cache '{kind}' does not implement pull()"
    
    ... ... @@ -476,8 +460,6 @@ class ArtifactCache():
    476 460
         #
    
    477 461
         # Return the real artifact cache size.
    
    478 462
         #
    
    479
    -    # Implementations should also use this to update estimated_size.
    
    480
    -    #
    
    481 463
         # Returns:
    
    482 464
         #
    
    483 465
         # (int) The size of the artifact cache.
    
    ... ... @@ -486,6 +468,20 @@ class ArtifactCache():
    486 468
             raise ImplError("Cache '{kind}' does not implement calculate_cache_size()"
    
    487 469
                             .format(kind=type(self).__name__))
    
    488 470
     
    
    471
    +    # get_cache_size()
    
    472
    +    #
    
    473
    +    # Return the artifact cache size.
    
    474
    +    #
    
    475
    +    # Returns:
    
    476
    +    #
    
    477
    +    # (int) The size of the artifact cache.
    
    478
    +    #
    
    479
    +    def get_cache_size(self):
    
    480
    +        if not self.cache_size:
    
    481
    +            self.cache_size = self.calculate_cache_size()
    
    482
    +
    
    483
    +        return self.cache_size
    
    484
    +
    
    489 485
         ################################################
    
    490 486
         #               Local Private Methods          #
    
    491 487
         ################################################
    
    ... ... @@ -529,27 +525,15 @@ class ArtifactCache():
    529 525
     
    
    530 526
         # _add_artifact_size()
    
    531 527
         #
    
    532
    -    # Since we cannot keep track of the cache size between threads,
    
    533
    -    # this method will be called by the main process every time a
    
    534
    -    # process that added something to the cache finishes.
    
    535
    -    #
    
    536
    -    # This will then add the reported size to
    
    537
    -    # ArtifactCache.estimated_size.
    
    528
    +    # Since we cannot keep track of the cache size between processes,
    
    529
    +    # this method will be called by the main process every time a job
    
    530
    +    # added or removed an artifact from the cache finishes.
    
    538 531
         #
    
    539 532
         def _add_artifact_size(self, artifact_size):
    
    540
    -        if not self.estimated_size:
    
    541
    -            self.estimated_size = self.calculate_cache_size()
    
    542
    -
    
    543
    -        self.estimated_size += artifact_size
    
    533
    +        if not self.cache_size:
    
    534
    +            self.cache_size = self.calculate_cache_size()
    
    544 535
     
    
    545
    -    # _set_cache_size()
    
    546
    -    #
    
    547
    -    # Similarly to the above method, when we calculate the actual size
    
    548
    -    # in a child thread, we can't update it. We instead pass the value
    
    549
    -    # back to the main thread and update it there.
    
    550
    -    #
    
    551
    -    def _set_cache_size(self, cache_size):
    
    552
    -        self.estimated_size = cache_size
    
    536
    +        self.cache_size += artifact_size
    
    553 537
     
    
    554 538
     
    
    555 539
     # _configured_remote_artifact_cache_specs():
    

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -16,6 +16,7 @@
    16 16
     #
    
    17 17
     #  Authors:
    
    18 18
     #        Jürg Billeter <juerg billeter codethink co uk>
    
    19
    +#        Tiago Gomes <tiago.gomes>
    
    19 20
     
    
    20 21
     import hashlib
    
    21 22
     import itertools
    
    ... ... @@ -89,7 +90,7 @@ class CASCache(ArtifactCache):
    89 90
     
    
    90 91
             with tempfile.TemporaryDirectory(prefix='tmp', dir=self.extractdir) as tmpdir:
    
    91 92
                 checkoutdir = os.path.join(tmpdir, ref)
    
    92
    -            self._checkout(checkoutdir, tree)
    
    93
    +            self._checkout_tree(checkoutdir, tree)
    
    93 94
     
    
    94 95
                 os.makedirs(os.path.dirname(dest), exist_ok=True)
    
    95 96
                 try:
    
    ... ... @@ -109,12 +110,12 @@ class CASCache(ArtifactCache):
    109 110
         def commit(self, element, content, keys):
    
    110 111
             refs = [self.get_artifact_fullname(element, key) for key in keys]
    
    111 112
     
    
    112
    -        tree = self._create_tree(content)
    
    113
    +        tree, size = self._create_tree(content)
    
    113 114
     
    
    114 115
             for ref in refs:
    
    115 116
                 self.set_ref(ref, tree)
    
    116 117
     
    
    117
    -        self.cache_size = None
    
    118
    +        return size
    
    118 119
     
    
    119 120
         def diff(self, element, key_a, key_b, *, subdir=None):
    
    120 121
             ref_a = self.get_artifact_fullname(element, key_a)
    
    ... ... @@ -232,19 +233,19 @@ class CASCache(ArtifactCache):
    232 233
                     tree.hash = response.digest.hash
    
    233 234
                     tree.size_bytes = response.digest.size_bytes
    
    234 235
     
    
    235
    -                self._fetch_directory(remote, tree)
    
    236
    +                size = self._fetch_tree(remote, tree)
    
    236 237
     
    
    237 238
                     self.set_ref(ref, tree)
    
    238 239
     
    
    239 240
                     # no need to pull from additional remotes
    
    240
    -                return True
    
    241
    +                return True, size
    
    241 242
     
    
    242 243
                 except grpc.RpcError as e:
    
    243 244
                     if e.code() != grpc.StatusCode.NOT_FOUND:
    
    244 245
                         raise ArtifactError("Failed to pull artifact {}: {}".format(
    
    245 246
                             element._get_brief_display_key(), e)) from e
    
    246 247
     
    
    247
    -        return False
    
    248
    +        return False, 0
    
    248 249
     
    
    249 250
         def link_key(self, element, oldkey, newkey):
    
    250 251
             oldref = self.get_artifact_fullname(element, oldkey)
    
    ... ... @@ -412,22 +413,39 @@ class CASCache(ArtifactCache):
    412 413
     
    
    413 414
                     out.flush()
    
    414 415
     
    
    416
    +                file_size = os.fstat(out.fileno()).st_size
    
    417
    +
    
    415 418
                     digest.hash = h.hexdigest()
    
    416
    -                digest.size_bytes = os.fstat(out.fileno()).st_size
    
    419
    +                digest.size_bytes = file_size
    
    417 420
     
    
    418 421
                     # Place file at final location
    
    419 422
                     objpath = self.objpath(digest)
    
    420
    -                os.makedirs(os.path.dirname(objpath), exist_ok=True)
    
    423
    +                dirpath = os.path.dirname(objpath)
    
    424
    +
    
    425
    +                # Track the increased size on the parent directory caused by
    
    426
    +                # adding a new entry, as these directories can contain a large
    
    427
    +                # number of files.
    
    428
    +                new_dir_size = 0
    
    429
    +                old_dir_size = 0
    
    430
    +                try:
    
    431
    +                    os.makedirs(dirpath)
    
    432
    +                except FileExistsError:
    
    433
    +                    old_dir_size = os.stat(dirpath).st_size
    
    434
    +                else:
    
    435
    +                    new_dir_size = os.stat(dirpath).st_size
    
    436
    +
    
    421 437
                     os.link(out.name, objpath)
    
    438
    +                new_dir_size = os.stat(dirpath).st_size - old_dir_size
    
    422 439
     
    
    423 440
             except FileExistsError as e:
    
    424 441
                 # We can ignore the failed link() if the object is already in the repo.
    
    442
    +            file_size = 0
    
    425 443
                 pass
    
    426 444
     
    
    427 445
             except OSError as e:
    
    428 446
                 raise ArtifactError("Failed to hash object: {}".format(e)) from e
    
    429 447
     
    
    430
    -        return digest
    
    448
    +        return digest, file_size + new_dir_size
    
    431 449
     
    
    432 450
         # set_ref():
    
    433 451
         #
    
    ... ... @@ -436,6 +454,8 @@ class CASCache(ArtifactCache):
    436 454
         # Args:
    
    437 455
         #     ref (str): The name of the ref
    
    438 456
         #
    
    457
    +    # Note: Setting a ref will have a very low overhead on the cache
    
    458
    +    # size, so we don't track this.
    
    439 459
         def set_ref(self, ref, tree):
    
    440 460
             refpath = self._refpath(ref)
    
    441 461
             os.makedirs(os.path.dirname(refpath), exist_ok=True)
    
    ... ... @@ -475,11 +495,7 @@ class CASCache(ArtifactCache):
    475 495
                 raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e
    
    476 496
     
    
    477 497
         def calculate_cache_size(self):
    
    478
    -        if self.cache_size is None:
    
    479
    -            self.cache_size = utils._get_dir_size(self.casdir)
    
    480
    -            self.estimated_size = self.cache_size
    
    481
    -
    
    482
    -        return self.cache_size
    
    498
    +        return utils._get_dir_size(self.casdir)
    
    483 499
     
    
    484 500
         # list_artifacts():
    
    485 501
         #
    
    ... ... @@ -567,7 +583,7 @@ class CASCache(ArtifactCache):
    567 583
         ################################################
    
    568 584
         #             Local Private Methods            #
    
    569 585
         ################################################
    
    570
    -    def _checkout(self, dest, tree):
    
    586
    +    def _checkout_tree(self, dest, tree):
    
    571 587
             os.makedirs(dest, exist_ok=True)
    
    572 588
     
    
    573 589
             directory = remote_execution_pb2.Directory()
    
    ... ... @@ -586,7 +602,7 @@ class CASCache(ArtifactCache):
    586 602
     
    
    587 603
             for dirnode in directory.directories:
    
    588 604
                 fullpath = os.path.join(dest, dirnode.name)
    
    589
    -            self._checkout(fullpath, dirnode.digest)
    
    605
    +            self._checkout_tree(fullpath, dirnode.digest)
    
    590 606
     
    
    591 607
             for symlinknode in directory.symlinks:
    
    592 608
                 # symlink
    
    ... ... @@ -598,6 +614,7 @@ class CASCache(ArtifactCache):
    598 614
     
    
    599 615
         def _create_tree(self, path, *, digest=None):
    
    600 616
             directory = remote_execution_pb2.Directory()
    
    617
    +        size = 0
    
    601 618
     
    
    602 619
             for name in sorted(os.listdir(path)):
    
    603 620
                 full_path = os.path.join(path, name)
    
    ... ... @@ -605,11 +622,11 @@ class CASCache(ArtifactCache):
    605 622
                 if stat.S_ISDIR(mode):
    
    606 623
                     dirnode = directory.directories.add()
    
    607 624
                     dirnode.name = name
    
    608
    -                self._create_tree(full_path, digest=dirnode.digest)
    
    625
    +                size += self._create_tree(full_path, digest=dirnode.digest)[1]
    
    609 626
                 elif stat.S_ISREG(mode):
    
    610 627
                     filenode = directory.files.add()
    
    611 628
                     filenode.name = name
    
    612
    -                self.add_object(path=full_path, digest=filenode.digest)
    
    629
    +                size += self.add_object(path=full_path, digest=filenode.digest)[1]
    
    613 630
                     filenode.is_executable = (mode & stat.S_IXUSR) == stat.S_IXUSR
    
    614 631
                 elif stat.S_ISLNK(mode):
    
    615 632
                     symlinknode = directory.symlinks.add()
    
    ... ... @@ -618,7 +635,8 @@ class CASCache(ArtifactCache):
    618 635
                 else:
    
    619 636
                     raise ArtifactError("Unsupported file type for {}".format(full_path))
    
    620 637
     
    
    621
    -        return self.add_object(digest=digest, buffer=directory.SerializeToString())
    
    638
    +        res = self.add_object(digest=digest, buffer=directory.SerializeToString())
    
    639
    +        return res[0], res[1] + size
    
    622 640
     
    
    623 641
         def _get_subdir(self, tree, subdir):
    
    624 642
             head, name = os.path.split(subdir)
    
    ... ... @@ -761,11 +779,12 @@ class CASCache(ArtifactCache):
    761 779
             out.flush()
    
    762 780
             assert digest.size_bytes == os.fstat(out.fileno()).st_size
    
    763 781
     
    
    764
    -    def _fetch_directory(self, remote, tree):
    
    782
    +    def _fetch_tree(self, remote, tree):
    
    783
    +        size = 0
    
    765 784
             objpath = self.objpath(tree)
    
    766 785
             if os.path.exists(objpath):
    
    767 786
                 # already in local cache
    
    768
    -            return
    
    787
    +            return 0
    
    769 788
     
    
    770 789
             with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
    
    771 790
                 self._fetch_blob(remote, tree, out)
    
    ... ... @@ -776,7 +795,7 @@ class CASCache(ArtifactCache):
    776 795
                     directory.ParseFromString(f.read())
    
    777 796
     
    
    778 797
                 for filenode in directory.files:
    
    779
    -                fileobjpath = self.objpath(tree)
    
    798
    +                fileobjpath = self.objpath(filenode.digest)
    
    780 799
                     if os.path.exists(fileobjpath):
    
    781 800
                         # already in local cache
    
    782 801
                         continue
    
    ... ... @@ -784,17 +803,21 @@ class CASCache(ArtifactCache):
    784 803
                     with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
    
    785 804
                         self._fetch_blob(remote, filenode.digest, f)
    
    786 805
     
    
    787
    -                    digest = self.add_object(path=f.name)
    
    806
    +                    digest, obj_size = self.add_object(path=f.name)
    
    807
    +                    size += obj_size
    
    788 808
                         assert digest.hash == filenode.digest.hash
    
    789 809
     
    
    790 810
                 for dirnode in directory.directories:
    
    791
    -                self._fetch_directory(remote, dirnode.digest)
    
    811
    +                size += self._fetch_tree(remote, dirnode.digest)
    
    792 812
     
    
    793 813
                 # place directory blob only in final location when we've downloaded
    
    794 814
                 # all referenced blobs to avoid dangling references in the repository
    
    795
    -            digest = self.add_object(path=out.name)
    
    815
    +            digest, obj_size = self.add_object(path=out.name)
    
    816
    +            size += obj_size
    
    796 817
                 assert digest.hash == tree.hash
    
    797 818
     
    
    819
    +            return size
    
    820
    +
    
    798 821
     
    
    799 822
     # Represents a single remote CAS cache.
    
    800 823
     #
    

  • buildstream/_artifactcache/casserver.py
    ... ... @@ -203,7 +203,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    203 203
                             context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    204 204
                             return response
    
    205 205
                         out.flush()
    
    206
    -                    digest = self.cas.add_object(path=out.name)
    
    206
    +                    digest = self.cas.add_object(path=out.name)[0]
    
    207 207
                         if digest.hash != client_digest.hash:
    
    208 208
                             context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    209 209
                             return response
    

  • buildstream/_context.py
    ... ... @@ -30,6 +30,7 @@ from ._exceptions import LoadError, LoadErrorReason, BstError
    30 30
     from ._message import Message, MessageType
    
    31 31
     from ._profile import Topics, profile_start, profile_end
    
    32 32
     from ._artifactcache import ArtifactCache
    
    33
    +from ._platform import Platform
    
    33 34
     from ._workspaces import Workspaces
    
    34 35
     from .plugin import _plugin_lookup
    
    35 36
     
    
    ... ... @@ -190,65 +191,13 @@ class Context():
    190 191
             # We read and parse the cache quota as specified by the user
    
    191 192
             cache_quota = _yaml.node_get(cache, str, 'quota', default_value='infinity')
    
    192 193
             try:
    
    193
    -            cache_quota = utils._parse_size(cache_quota, artifactdir_volume)
    
    194
    +            self.cache_quota = utils._parse_size(cache_quota, artifactdir_volume)
    
    194 195
             except utils.UtilError as e:
    
    195 196
                 raise LoadError(LoadErrorReason.INVALID_DATA,
    
    196 197
                                 "{}\nPlease specify the value in bytes or as a % of full disk space.\n"
    
    197 198
                                 "\nValid values are, for example: 800M 10G 1T 50%\n"
    
    198 199
                                 .format(str(e))) from e
    
    199 200
     
    
    200
    -        # Headroom intended to give BuildStream a bit of leeway.
    
    201
    -        # This acts as the minimum size of cache_quota and also
    
    202
    -        # is taken from the user requested cache_quota.
    
    203
    -        #
    
    204
    -        if 'BST_TEST_SUITE' in os.environ:
    
    205
    -            headroom = 0
    
    206
    -        else:
    
    207
    -            headroom = 2e9
    
    208
    -
    
    209
    -        stat = os.statvfs(artifactdir_volume)
    
    210
    -        available_space = (stat.f_bsize * stat.f_bavail)
    
    211
    -
    
    212
    -        # Again, the artifact directory may not yet have been created yet
    
    213
    -        #
    
    214
    -        if not os.path.exists(self.artifactdir):
    
    215
    -            cache_size = 0
    
    216
    -        else:
    
    217
    -            cache_size = utils._get_dir_size(self.artifactdir)
    
    218
    -
    
    219
    -        # Ensure system has enough storage for the cache_quota
    
    220
    -        #
    
    221
    -        # If cache_quota is none, set it to the maximum it could possibly be.
    
    222
    -        #
    
    223
    -        # Also check that cache_quota is atleast as large as our headroom.
    
    224
    -        #
    
    225
    -        if cache_quota is None:  # Infinity, set to max system storage
    
    226
    -            cache_quota = cache_size + available_space
    
    227
    -        if cache_quota < headroom:  # Check minimum
    
    228
    -            raise LoadError(LoadErrorReason.INVALID_DATA,
    
    229
    -                            "Invalid cache quota ({}): ".format(utils._pretty_size(cache_quota)) +
    
    230
    -                            "BuildStream requires a minimum cache quota of 2G.")
    
    231
    -        elif cache_quota > cache_size + available_space:  # Check maximum
    
    232
    -            raise LoadError(LoadErrorReason.INVALID_DATA,
    
    233
    -                            ("Your system does not have enough available " +
    
    234
    -                             "space to support the cache quota specified.\n" +
    
    235
    -                             "You currently have:\n" +
    
    236
    -                             "- {used} of cache in use at {local_cache_path}\n" +
    
    237
    -                             "- {available} of available system storage").format(
    
    238
    -                                 used=utils._pretty_size(cache_size),
    
    239
    -                                 local_cache_path=self.artifactdir,
    
    240
    -                                 available=utils._pretty_size(available_space)))
    
    241
    -
    
    242
    -        # Place a slight headroom (2e9 (2GB) on the cache_quota) into
    
    243
    -        # cache_quota to try and avoid exceptions.
    
    244
    -        #
    
    245
    -        # Of course, we might still end up running out during a build
    
    246
    -        # if we end up writing more than 2G, but hey, this stuff is
    
    247
    -        # already really fuzzy.
    
    248
    -        #
    
    249
    -        self.cache_quota = cache_quota - headroom
    
    250
    -        self.cache_lower_threshold = self.cache_quota / 2
    
    251
    -
    
    252 201
             # Load artifact share configuration
    
    253 202
             self.artifact_cache_specs = ArtifactCache.specs_from_config_node(defaults)
    
    254 203
     
    
    ... ... @@ -296,6 +245,67 @@ class Context():
    296 245
                                 "{}: on-error should be one of: {}".format(
    
    297 246
                                     provenance, ", ".join(valid_actions)))
    
    298 247
     
    
    248
    +    # check_quota()
    
    249
    +    #
    
    250
    +    # Validates the quota value loaded
    
    251
    +    #
    
    252
    +    # Raises:
    
    253
    +    #   LoadError
    
    254
    +    #
    
    255
    +    def check_quota(self):
    
    256
    +        cache_size = Platform.get_platform().artifactcache.get_cache_size()
    
    257
    +
    
    258
    +        artifactdir_volume = self.artifactdir
    
    259
    +        while not os.path.exists(artifactdir_volume):
    
    260
    +            artifactdir_volume = os.path.dirname(artifactdir_volume)
    
    261
    +
    
    262
    +        stat = os.statvfs(artifactdir_volume)
    
    263
    +        available_space = (stat.f_bsize * stat.f_bavail)
    
    264
    +
    
    265
    +        # Headroom intended to give BuildStream a bit of leeway.
    
    266
    +        # This acts as the minimum size of cache_quota and also
    
    267
    +        # is taken from the user requested cache_quota.
    
    268
    +        #
    
    269
    +        if 'BST_TEST_SUITE' in os.environ:
    
    270
    +            headroom = 0
    
    271
    +        else:
    
    272
    +            headroom = 2e9
    
    273
    +
    
    274
    +        # Ensure system has enough storage for the cache_quota
    
    275
    +        #
    
    276
    +        # If cache_quota is none, set it to the maximum it could possibly be.
    
    277
    +        #
    
    278
    +        # Also check that cache_quota is at least as large as our headroom.
    
    279
    +        #
    
    280
    +        if self.cache_quota is None:  # Infinity, set to max system storage
    
    281
    +            self.cache_quota = cache_size + available_space
    
    282
    +
    
    283
    +        if self.cache_quota < headroom:  # Check minimum
    
    284
    +            raise LoadError(LoadErrorReason.INVALID_DATA,
    
    285
    +                            "Invalid cache quota ({}):".format(
    
    286
    +                                utils._pretty_size(self.cache_quota)) +
    
    287
    +                            "BuildStream requires a minimum cache quota of 2G.")
    
    288
    +
    
    289
    +        if self.cache_quota > cache_size + available_space:  # Check maximum
    
    290
    +            raise LoadError(LoadErrorReason.INVALID_DATA,
    
    291
    +                            ("Your system does not have enough available " +
    
    292
    +                             "space to support the cache quota specified.\n" +
    
    293
    +                             "You currently have:\n" +
    
    294
    +                             "- {used} of cache in use at {local_cache_path}\n" +
    
    295
    +                             "- {available} of available system storage").format(
    
    296
    +                                 used=utils._pretty_size(cache_size),
    
    297
    +                                 local_cache_path=self.artifactdir,
    
    298
    +                                 available=utils._pretty_size(available_space)))
    
    299
    +
    
    300
    +        # Place a slight headroom (2e9 (2GB) on the cache_quota) into
    
    301
    +        # cache_quota to try and avoid exceptions.  Of course, we might
    
    302
    +        # still end up running out during a build if we end up writing
    
    303
    +        # more than 2G, but hey, this stuff is already really fuzzy.
    
    304
    +        #
    
    305
    +        self.cache_quota = self.cache_quota - headroom
    
    306
    +
    
    307
    +        self.cache_lower_threshold = self.cache_quota / 2
    
    308
    +
    
    299 309
         # add_project():
    
    300 310
         #
    
    301 311
         # Add a project to the context.
    

  • buildstream/_frontend/app.py
    ... ... @@ -201,6 +201,8 @@ class App():
    201 201
     
    
    202 202
             Platform.create_instance(self.context)
    
    203 203
     
    
    204
    +        self.context.check_quota()
    
    205
    +
    
    204 206
             # Create the logger right before setting the message handler
    
    205 207
             self.logger = LogLine(self.context,
    
    206 208
                                   self._content_profile,
    

  • buildstream/_scheduler/jobs/__init__.py
    1 1
     from .elementjob import ElementJob
    
    2
    -from .cachesizejob import CacheSizeJob
    
    3 2
     from .cleanupjob import CleanupJob

  • buildstream/_scheduler/jobs/cleanupjob.py
    ... ... @@ -21,18 +21,19 @@ from ..._platform import Platform
    21 21
     
    
    22 22
     
    
    23 23
     class CleanupJob(Job):
    
    24
    -    def __init__(self, *args, complete_cb, **kwargs):
    
    24
    +    def __init__(self, *args, **kwargs):
    
    25 25
             super().__init__(*args, **kwargs)
    
    26
    -        self._complete_cb = complete_cb
    
    27 26
             self._cache = Platform._instance.artifactcache
    
    28 27
     
    
    29 28
         def child_process(self):
    
    30 29
             return self._cache.clean()
    
    31 30
     
    
    32 31
         def parent_complete(self, success, result):
    
    33
    -        self._cache._set_cache_size(result)
    
    34
    -        if self._complete_cb:
    
    35
    -            self._complete_cb()
    
    32
    +        if success:
    
    33
    +            # ArtifactCache.clean() returns the number of bytes cleaned.
    
    34
    +            # We negate the number because the cache size is to be
    
    35
    +            # decreased.
    
    36
    +            self._cache._add_artifact_size(result * -1)
    
    36 37
     
    
    37 38
         def child_process_data(self):
    
    38 39
             return {}

  • buildstream/_scheduler/jobs/elementjob.py
    ... ... @@ -110,12 +110,10 @@ class ElementJob(Job):
    110 110
     
    
    111 111
             workspace = self._element._get_workspace()
    
    112 112
             artifact_size = self._element._get_artifact_size()
    
    113
    -        cache_size = self._element._get_artifact_cache().calculate_cache_size()
    
    114 113
     
    
    115 114
             if workspace is not None:
    
    116 115
                 data['workspace'] = workspace.to_dict()
    
    117 116
             if artifact_size is not None:
    
    118 117
                 data['artifact_size'] = artifact_size
    
    119
    -        data['cache_size'] = cache_size
    
    120 118
     
    
    121 119
             return data

  • buildstream/_scheduler/queues/buildqueue.py
    ... ... @@ -87,19 +87,6 @@ class BuildQueue(Queue):
    87 87
     
    
    88 88
             return QueueStatus.READY
    
    89 89
     
    
    90
    -    def _check_cache_size(self, job, element):
    
    91
    -        if not job.child_data:
    
    92
    -            return
    
    93
    -
    
    94
    -        artifact_size = job.child_data.get('artifact_size', False)
    
    95
    -
    
    96
    -        if artifact_size:
    
    97
    -            cache = element._get_artifact_cache()
    
    98
    -            cache._add_artifact_size(artifact_size)
    
    99
    -
    
    100
    -            if cache.get_approximate_cache_size() > self._scheduler.context.cache_quota:
    
    101
    -                self._scheduler._check_cache_size_real()
    
    102
    -
    
    103 90
         def done(self, job, element, result, success):
    
    104 91
     
    
    105 92
             if success:
    

  • buildstream/_scheduler/queues/pullqueue.py
    ... ... @@ -52,17 +52,14 @@ class PullQueue(Queue):
    52 52
             else:
    
    53 53
                 return QueueStatus.SKIP
    
    54 54
     
    
    55
    -    def done(self, _, element, result, success):
    
    55
    +    def done(self, job, element, result, success):
    
    56 56
     
    
    57 57
             if not success:
    
    58 58
                 return False
    
    59 59
     
    
    60 60
             element._pull_done()
    
    61 61
     
    
    62
    -        # Build jobs will check the "approximate" size first. Since we
    
    63
    -        # do not get an artifact size from pull jobs, we have to
    
    64
    -        # actually check the cache size.
    
    65
    -        self._scheduler._check_cache_size_real()
    
    62
    +        self._check_cache_size(job, element)
    
    66 63
     
    
    67 64
             # Element._pull() returns True if it downloaded an artifact,
    
    68 65
             # here we want to appear skipped if we did not download.
    

  • buildstream/_scheduler/queues/queue.py
    ... ... @@ -301,8 +301,6 @@ class Queue():
    301 301
             # Update values that need to be synchronized in the main task
    
    302 302
             # before calling any queue implementation
    
    303 303
             self._update_workspaces(element, job)
    
    304
    -        if job.child_data:
    
    305
    -            element._get_artifact_cache().cache_size = job.child_data.get('cache_size')
    
    306 304
     
    
    307 305
             # Give the result of the job to the Queue implementor,
    
    308 306
             # and determine if it should be considered as processed
    
    ... ... @@ -360,3 +358,16 @@ class Queue():
    360 358
             logfile = "{key}-{action}".format(key=key, action=action)
    
    361 359
     
    
    362 360
             return os.path.join(project.name, element.normal_name, logfile)
    
    361
    +
    
    362
    +    def _check_cache_size(self, job, element):
    
    363
    +        if not job.child_data:
    
    364
    +            return
    
    365
    +
    
    366
    +        artifact_size = job.child_data.get('artifact_size', False)
    
    367
    +
    
    368
    +        if artifact_size:
    
    369
    +            cache = element._get_artifact_cache()
    
    370
    +            cache._add_artifact_size(artifact_size)
    
    371
    +
    
    372
    +            if cache.get_cache_size() > self._scheduler.context.cache_quota:
    
    373
    +                self._scheduler._run_cache_cleanup()

  • buildstream/_scheduler/scheduler.py
    1 1
     #
    
    2
    -#  Copyright (C) 2016 Codethink Limited
    
    2
    +#  Copyright (C) 2018 Codethink Limited
    
    3 3
     #
    
    4 4
     #  This program is free software; you can redistribute it and/or
    
    5 5
     #  modify it under the terms of the GNU Lesser General Public
    
    ... ... @@ -28,7 +28,7 @@ from contextlib import contextmanager
    28 28
     
    
    29 29
     # Local imports
    
    30 30
     from .resources import Resources, ResourceType
    
    31
    -from .jobs import CacheSizeJob, CleanupJob
    
    31
    +from .jobs import CleanupJob
    
    32 32
     
    
    33 33
     
    
    34 34
     # A decent return code for Scheduler.run()
    
    ... ... @@ -315,23 +315,11 @@ class Scheduler():
    315 315
             self.schedule_jobs(ready)
    
    316 316
             self._sched()
    
    317 317
     
    
    318
    -    def _run_cleanup(self, cache_size):
    
    319
    -        if cache_size and cache_size < self.context.cache_quota:
    
    320
    -            return
    
    321
    -
    
    322
    -        job = CleanupJob(self, 'cleanup', 'cleanup',
    
    318
    +    def _run_cache_cleanup(self):
    
    319
    +        job = CleanupJob(self, 'Cleaning artifact cache', 'cleanup',
    
    323 320
                              resources=[ResourceType.CACHE,
    
    324 321
                                         ResourceType.PROCESS],
    
    325
    -                         exclusive_resources=[ResourceType.CACHE],
    
    326
    -                         complete_cb=None)
    
    327
    -        self.schedule_jobs([job])
    
    328
    -
    
    329
    -    def _check_cache_size_real(self):
    
    330
    -        job = CacheSizeJob(self, 'cache_size', 'cache_size',
    
    331
    -                           resources=[ResourceType.CACHE,
    
    332
    -                                      ResourceType.PROCESS],
    
    333
    -                           exclusive_resources=[ResourceType.CACHE],
    
    334
    -                           complete_cb=self._run_cleanup)
    
    322
    +                         exclusive_resources=[ResourceType.CACHE])
    
    335 323
             self.schedule_jobs([job])
    
    336 324
     
    
    337 325
         # _suspend_jobs()
    

  • buildstream/element.py
    ... ... @@ -78,7 +78,6 @@ import stat
    78 78
     import copy
    
    79 79
     from collections import Mapping, OrderedDict
    
    80 80
     from contextlib import contextmanager
    
    81
    -from enum import Enum
    
    82 81
     import tempfile
    
    83 82
     import shutil
    
    84 83
     
    
    ... ... @@ -98,41 +97,9 @@ from .plugin import CoreWarnings
    98 97
     from .sandbox._config import SandboxConfig
    
    99 98
     
    
    100 99
     from .storage.directory import Directory
    
    101
    -from .storage._filebaseddirectory import FileBasedDirectory, VirtualDirectoryError
    
    102
    -
    
    103
    -
    
    104
    -# _KeyStrength():
    
    105
    -#
    
    106
    -# Strength of cache key
    
    107
    -#
    
    108
    -class _KeyStrength(Enum):
    
    109
    -
    
    110
    -    # Includes strong cache keys of all build dependencies and their
    
    111
    -    # runtime dependencies.
    
    112
    -    STRONG = 1
    
    113
    -
    
    114
    -    # Includes names of direct build dependencies but does not include
    
    115
    -    # cache keys of dependencies.
    
    116
    -    WEAK = 2
    
    117
    -
    
    118
    -
    
    119
    -class Scope(Enum):
    
    120
    -    """Types of scope for a given element"""
    
    121
    -
    
    122
    -    ALL = 1
    
    123
    -    """All elements which the given element depends on, following
    
    124
    -    all elements required for building. Including the element itself.
    
    125
    -    """
    
    126
    -
    
    127
    -    BUILD = 2
    
    128
    -    """All elements required for building the element, including their
    
    129
    -    respective run dependencies. Not including the given element itself.
    
    130
    -    """
    
    131
    -
    
    132
    -    RUN = 3
    
    133
    -    """All elements required for running the element. Including the element
    
    134
    -    itself.
    
    135
    -    """
    
    100
    +from .storage._filebaseddirectory import FileBasedDirectory
    
    101
    +from .storage.directory import VirtualDirectoryError
    
    102
    +from .element_enums import _KeyStrength, Scope
    
    136 103
     
    
    137 104
     
    
    138 105
     class ElementError(BstError):
    
    ... ... @@ -1648,8 +1615,8 @@ class Element(Plugin):
    1648 1615
                         }), os.path.join(metadir, 'workspaced-dependencies.yaml'))
    
    1649 1616
     
    
    1650 1617
                         with self.timed_activity("Caching artifact"):
    
    1651
    -                        self.__artifact_size = utils._get_dir_size(assembledir)
    
    1652
    -                        self.__artifacts.commit(self, assembledir, self.__get_cache_keys_for_commit())
    
    1618
    +                        self.__artifact_size = self.__artifacts.commit(
    
    1619
    +                            self, assembledir, self.__get_cache_keys_for_commit())
    
    1653 1620
     
    
    1654 1621
                         if collect is not None and collectvdir is None:
    
    1655 1622
                             raise ElementError(
    
    ... ... @@ -1695,31 +1662,31 @@ class Element(Plugin):
    1695 1662
             self._update_state()
    
    1696 1663
     
    
    1697 1664
         def _pull_strong(self, *, progress=None):
    
    1698
    -        weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
    
    1699
    -
    
    1700 1665
             key = self.__strict_cache_key
    
    1701
    -        if not self.__artifacts.pull(self, key, progress=progress):
    
    1702
    -            return False
    
    1666
    +        pulled, self.__artifact_size = self.__artifacts.pull(
    
    1667
    +            self, key, progress=progress)
    
    1703 1668
     
    
    1704
    -        # update weak ref by pointing it to this newly fetched artifact
    
    1705
    -        self.__artifacts.link_key(self, key, weak_key)
    
    1669
    +        if pulled:
    
    1670
    +            # update weak ref by pointing it to this newly fetched artifact
    
    1671
    +            weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
    
    1672
    +            self.__artifacts.link_key(self, key, weak_key)
    
    1706 1673
     
    
    1707
    -        return True
    
    1674
    +        return pulled
    
    1708 1675
     
    
    1709 1676
         def _pull_weak(self, *, progress=None):
    
    1710 1677
             weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
    
    1678
    +        pulled, self.__artifact_size = self.__artifacts.pull(
    
    1679
    +            self, weak_key, progress=progress)
    
    1711 1680
     
    
    1712
    -        if not self.__artifacts.pull(self, weak_key, progress=progress):
    
    1713
    -            return False
    
    1714
    -
    
    1715
    -        # extract strong cache key from this newly fetched artifact
    
    1716
    -        self._pull_done()
    
    1681
    +        if pulled:
    
    1682
    +            # extract strong cache key from this newly fetched artifact
    
    1683
    +            self._pull_done()
    
    1717 1684
     
    
    1718
    -        # create tag for strong cache key
    
    1719
    -        key = self._get_cache_key(strength=_KeyStrength.STRONG)
    
    1720
    -        self.__artifacts.link_key(self, weak_key, key)
    
    1685
    +            # create tag for strong cache key
    
    1686
    +            key = self._get_cache_key(strength=_KeyStrength.STRONG)
    
    1687
    +            self.__artifacts.link_key(self, weak_key, key)
    
    1721 1688
     
    
    1722
    -        return True
    
    1689
    +        return pulled
    
    1723 1690
     
    
    1724 1691
         # _pull():
    
    1725 1692
         #
    
    ... ... @@ -1739,13 +1706,12 @@ class Element(Plugin):
    1739 1706
             if not pulled and not self._cached() and not context.get_strict():
    
    1740 1707
                 pulled = self._pull_weak(progress=progress)
    
    1741 1708
     
    
    1742
    -        if not pulled:
    
    1743
    -            return False
    
    1709
    +        if pulled:
    
    1710
    +            # Notify successfull download
    
    1711
    +            display_key = self._get_brief_display_key()
    
    1712
    +            self.info("Downloaded artifact {}".format(display_key))
    
    1744 1713
     
    
    1745
    -        # Notify successfull download
    
    1746
    -        display_key = self._get_brief_display_key()
    
    1747
    -        self.info("Downloaded artifact {}".format(display_key))
    
    1748
    -        return True
    
    1714
    +        return pulled
    
    1749 1715
     
    
    1750 1716
         # _skip_push():
    
    1751 1717
         #
    

  • buildstream/_scheduler/jobs/cachesizejob.pybuildstream/element_enums.py
    1
    -#  Copyright (C) 2018 Codethink Limited
    
    1
    +#
    
    2
    +#  Copyright (C) 2018 Bloomberg LP
    
    2 3
     #
    
    3 4
     #  This program is free software; you can redistribute it and/or
    
    4 5
     #  modify it under the terms of the GNU Lesser General Public
    
    ... ... @@ -13,26 +14,48 @@
    13 14
     #  You should have received a copy of the GNU Lesser General Public
    
    14 15
     #  License along with this library. If not, see <http://www.gnu.org/licenses/>.
    
    15 16
     #
    
    16
    -#  Author:
    
    17
    -#        Tristan Daniël Maat <tristan maat codethink co uk>
    
    17
    +#  Authors:
    
    18
    +#        Tristan Van Berkom <tristan vanberkom codethink co uk>
    
    19
    +#        Jim MacArthur <jim macarthur codethink co uk>
    
    20
    +
    
    21
    +"""
    
    22
    +Element - Globally visible enumerations
    
    23
    +=======================================
    
    24
    +
    
    25
    +"""
    
    26
    +
    
    27
    +from enum import Enum
    
    28
    +
    
    29
    +
    
    30
    +# _KeyStrength():
    
    31
    +#
    
    32
    +# Strength of cache key
    
    18 33
     #
    
    19
    -from .job import Job
    
    20
    -from ..._platform import Platform
    
    34
    +class _KeyStrength(Enum):
    
    35
    +
    
    36
    +    # Includes strong cache keys of all build dependencies and their
    
    37
    +    # runtime dependencies.
    
    38
    +    STRONG = 1
    
    39
    +
    
    40
    +    # Includes names of direct build dependencies but does not include
    
    41
    +    # cache keys of dependencies.
    
    42
    +    WEAK = 2
    
    21 43
     
    
    22 44
     
    
    23
    -class CacheSizeJob(Job):
    
    24
    -    def __init__(self, *args, complete_cb, **kwargs):
    
    25
    -        super().__init__(*args, **kwargs)
    
    26
    -        self._complete_cb = complete_cb
    
    27
    -        self._cache = Platform._instance.artifactcache
    
    45
    +class Scope(Enum):
    
    46
    +    """Types of scope for a given element"""
    
    28 47
     
    
    29
    -    def child_process(self):
    
    30
    -        return self._cache.calculate_cache_size()
    
    48
    +    ALL = 1
    
    49
    +    """All elements which the given element depends on, following
    
    50
    +    all elements required for building. Including the element itself.
    
    51
    +    """
    
    31 52
     
    
    32
    -    def parent_complete(self, success, result):
    
    33
    -        self._cache._set_cache_size(result)
    
    34
    -        if self._complete_cb:
    
    35
    -            self._complete_cb(result)
    
    53
    +    BUILD = 2
    
    54
    +    """All elements required for building the element, including their
    
    55
    +    respective run dependencies. Not including the given element itself.
    
    56
    +    """
    
    36 57
     
    
    37
    -    def child_process_data(self):
    
    38
    -        return {}
    58
    +    RUN = 3
    
    59
    +    """All elements required for running the element. Including the element
    
    60
    +    itself.
    
    61
    +    """

  • buildstream/sandbox/_mount.py
    ... ... @@ -32,8 +32,10 @@ from .._fuse import SafeHardlinks
    32 32
     class Mount():
    
    33 33
         def __init__(self, sandbox, mount_point, safe_hardlinks):
    
    34 34
             scratch_directory = sandbox._get_scratch_directory()
    
    35
    -        # Getting external_directory here is acceptable as we're part of the sandbox code.
    
    36
    -        root_directory = sandbox.get_virtual_directory().external_directory
    
    35
    +        # Getting _get_underlying_directory() here is acceptable as
    
    36
    +        # we're part of the sandbox code. This will fail if our
    
    37
    +        # directory is CAS-based.
    
    38
    +        root_directory = sandbox.get_virtual_directory()._get_underlying_directory()
    
    37 39
     
    
    38 40
             self.mount_point = mount_point
    
    39 41
             self.safe_hardlinks = safe_hardlinks
    

  • buildstream/sandbox/_sandboxbwrap.py
    ... ... @@ -58,7 +58,7 @@ class SandboxBwrap(Sandbox):
    58 58
             stdout, stderr = self._get_output()
    
    59 59
     
    
    60 60
             # Allowable access to underlying storage as we're part of the sandbox
    
    61
    -        root_directory = self.get_virtual_directory().external_directory
    
    61
    +        root_directory = self.get_virtual_directory()._get_underlying_directory()
    
    62 62
     
    
    63 63
             # Fallback to the sandbox default settings for
    
    64 64
             # the cwd and env.
    
    ... ... @@ -248,6 +248,7 @@ class SandboxBwrap(Sandbox):
    248 248
                             # a bug, bwrap mounted a tempfs here and when it exits, that better be empty.
    
    249 249
                             pass
    
    250 250
     
    
    251
    +        self._vdir._mark_changed()
    
    251 252
             return exit_code
    
    252 253
     
    
    253 254
         def run_bwrap(self, argv, stdin, stdout, stderr, interactive):
    

  • buildstream/sandbox/_sandboxchroot.py
    ... ... @@ -106,6 +106,7 @@ class SandboxChroot(Sandbox):
    106 106
                 status = self.chroot(rootfs, command, stdin, stdout,
    
    107 107
                                      stderr, cwd, env, flags)
    
    108 108
     
    
    109
    +        self._vdir._mark_changed()
    
    109 110
             return status
    
    110 111
     
    
    111 112
         # chroot()
    

  • buildstream/sandbox/sandbox.py
    ... ... @@ -31,6 +31,7 @@ See also: :ref:`sandboxing`.
    31 31
     import os
    
    32 32
     from .._exceptions import ImplError, BstError
    
    33 33
     from ..storage._filebaseddirectory import FileBasedDirectory
    
    34
    +from ..storage._casbaseddirectory import CasBasedDirectory
    
    34 35
     
    
    35 36
     
    
    36 37
     class SandboxFlags():
    
    ... ... @@ -105,6 +106,7 @@ class Sandbox():
    105 106
             self.__scratch = os.path.join(self.__directory, 'scratch')
    
    106 107
             for directory_ in [self._root, self.__scratch]:
    
    107 108
                 os.makedirs(directory_, exist_ok=True)
    
    109
    +        self._vdir = None
    
    108 110
     
    
    109 111
         def get_directory(self):
    
    110 112
             """Fetches the sandbox root directory
    
    ... ... @@ -133,8 +135,14 @@ class Sandbox():
    133 135
                (str): The sandbox root directory
    
    134 136
     
    
    135 137
             """
    
    136
    -        # For now, just create a new Directory every time we're asked
    
    137
    -        return FileBasedDirectory(self._root)
    
    138
    +        if not self._vdir:
    
    139
    +            # BST_CAS_DIRECTORIES is a deliberately hidden environment variable which
    
    140
    +            # can be used to switch on CAS-based directories for testing.
    
    141
    +            if 'BST_CAS_DIRECTORIES' in os.environ:
    
    142
    +                self._vdir = CasBasedDirectory(self.__context, ref=None)
    
    143
    +            else:
    
    144
    +                self._vdir = FileBasedDirectory(self._root)
    
    145
    +        return self._vdir
    
    138 146
     
    
    139 147
         def set_environment(self, environment):
    
    140 148
             """Sets the environment variables for the sandbox
    

  • buildstream/storage/_casbaseddirectory.py
    1
    +#
    
    2
    +#  Copyright (C) 2018 Bloomberg LP
    
    3
    +#
    
    4
    +#  This program is free software; you can redistribute it and/or
    
    5
    +#  modify it under the terms of the GNU Lesser General Public
    
    6
    +#  License as published by the Free Software Foundation; either
    
    7
    +#  version 2 of the License, or (at your option) any later version.
    
    8
    +#
    
    9
    +#  This library is distributed in the hope that it will be useful,
    
    10
    +#  but WITHOUT ANY WARRANTY; without even the implied warranty of
    
    11
    +#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
    
    12
    +#  Lesser General Public License for more details.
    
    13
    +#
    
    14
    +#  You should have received a copy of the GNU Lesser General Public
    
    15
    +#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
    
    16
    +#
    
    17
    +#  Authors:
    
    18
    +#        Jim MacArthur <jim macarthur codethink co uk>
    
    19
    +
    
    20
    +"""
    
    21
    +CasBasedDirectory
    
    22
    +=========
    
    23
    +
    
    24
    +Implementation of the Directory class which backs onto a Merkle-tree based content
    
    25
    +addressable storage system.
    
    26
    +
    
    27
    +See also: :ref:`sandboxing`.
    
    28
    +"""
    
    29
    +
    
    30
    +from collections import OrderedDict
    
    31
    +
    
    32
    +import os
    
    33
    +import tempfile
    
    34
    +import stat
    
    35
    +
    
    36
    +from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    37
    +from .._exceptions import BstError
    
    38
    +from .directory import Directory, VirtualDirectoryError
    
    39
    +from ._filebaseddirectory import FileBasedDirectory
    
    40
    +from ..utils import FileListResult, safe_copy, list_relative_paths
    
    41
    +from .._artifactcache.cascache import CASCache
    
    42
    +
    
    43
    +
    
    44
    +class IndexEntry():
    
    45
    +    """ Used in our index of names to objects to store the 'modified' flag
    
    46
    +    for directory entries. Because we need both the remote_execution_pb2 object
    
    47
    +    and our own Directory object for directory entries, we store both. For files
    
    48
    +    and symlinks, only pb_object is used. """
    
    49
    +    def __init__(self, pb_object, buildstream_object=None, modified=False):
    
    50
    +        self.pb_object = pb_object  # Short for 'protocol buffer object')
    
    51
    +        self.buildstream_object = buildstream_object
    
    52
    +        self.modified = modified
    
    53
    +
    
    54
    +
    
    55
    +# CasBasedDirectory intentionally doesn't call its superclass constuctor,
    
    56
    +# which is meant to be unimplemented.
    
    57
    +# pylint: disable=super-init-not-called
    
    58
    +
    
    59
    +class CasBasedDirectory(Directory):
    
    60
    +    """
    
    61
    +    CAS-based directories can have two names; one is a 'common name' which has no effect
    
    62
    +    on functionality, and the 'filename'. If a CasBasedDirectory has a parent, then 'filename'
    
    63
    +    must be the name of an entry in the parent directory's index which points to this object.
    
    64
    +    This is used to inform a parent directory that it must update the given hash for this
    
    65
    +    object when this object changes.
    
    66
    +
    
    67
    +    Typically a top-level CasBasedDirectory will have a common_name and no filename, and
    
    68
    +    subdirectories wil have a filename and no common_name. common_name can used to identify
    
    69
    +    CasBasedDirectory objects in a log file, since they have no unique position in a file
    
    70
    +    system.
    
    71
    +    """
    
    72
    +
    
    73
    +    # Two constants which define the separators used by the remote execution API.
    
    74
    +    _pb2_path_sep = "/"
    
    75
    +    _pb2_absolute_path_prefix = "/"
    
    76
    +
    
    77
    +    def __init__(self, context, ref=None, parent=None, common_name="untitled", filename=None):
    
    78
    +        self.context = context
    
    79
    +        self.cas_directory = os.path.join(context.artifactdir, 'cas')
    
    80
    +        self.filename = filename
    
    81
    +        self.common_name = common_name
    
    82
    +        self.pb2_directory = remote_execution_pb2.Directory()
    
    83
    +        self.cas_cache = CASCache(context)
    
    84
    +        if ref:
    
    85
    +            with open(self.cas_cache.objpath(ref), 'rb') as f:
    
    86
    +                self.pb2_directory.ParseFromString(f.read())
    
    87
    +
    
    88
    +        self.ref = ref
    
    89
    +        self.index = OrderedDict()
    
    90
    +        self.parent = parent
    
    91
    +        self._directory_read = False
    
    92
    +        self._populate_index()
    
    93
    +
    
    94
    +    def _populate_index(self):
    
    95
    +        if self._directory_read:
    
    96
    +            return
    
    97
    +        for entry in self.pb2_directory.directories:
    
    98
    +            buildStreamDirectory = CasBasedDirectory(self.context, ref=entry.digest,
    
    99
    +                                                     parent=self, filename=entry.name)
    
    100
    +            self.index[entry.name] = IndexEntry(entry, buildstream_object=buildStreamDirectory)
    
    101
    +        for entry in self.pb2_directory.files:
    
    102
    +            self.index[entry.name] = IndexEntry(entry)
    
    103
    +        for entry in self.pb2_directory.symlinks:
    
    104
    +            self.index[entry.name] = IndexEntry(entry)
    
    105
    +        self._directory_read = True
    
    106
    +
    
    107
    +    def _recalculate_recursing_up(self, caller=None):
    
    108
    +        """Recalcuate the hash for this directory and store the results in
    
    109
    +        the cache.  If this directory has a parent, tell it to
    
    110
    +        recalculate (since changing this directory changes an entry in
    
    111
    +        the parent).
    
    112
    +
    
    113
    +        """
    
    114
    +        self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString())
    
    115
    +        if caller:
    
    116
    +            old_dir = self._find_pb2_entry(caller.filename)
    
    117
    +            self.cas_cache.add_object(digest=old_dir.digest, buffer=caller.pb2_directory.SerializeToString())
    
    118
    +        if self.parent:
    
    119
    +            self.parent._recalculate_recursing_up(self)
    
    120
    +
    
    121
    +    def _recalculate_recursing_down(self, parent=None):
    
    122
    +        """Recalcuate the hash for this directory and any
    
    123
    +        subdirectories. Hashes for subdirectories should be calculated
    
    124
    +        and stored after a significant operation (e.g. an
    
    125
    +        import_files() call) but not after adding each file, as that
    
    126
    +        is extremely wasteful.
    
    127
    +
    
    128
    +        """
    
    129
    +        for entry in self.pb2_directory.directories:
    
    130
    +            self.index[entry.name].buildstream_object._recalculate_recursing_down(entry)
    
    131
    +
    
    132
    +        if parent:
    
    133
    +            self.ref = self.cas_cache.add_object(digest=parent.digest, buffer=self.pb2_directory.SerializeToString())
    
    134
    +        else:
    
    135
    +            self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString())
    
    136
    +        # We don't need to do anything more than that; files were already added ealier, and symlinks are
    
    137
    +        # part of the directory structure.
    
    138
    +
    
    139
    +    def _find_pb2_entry(self, name):
    
    140
    +        if name in self.index:
    
    141
    +            return self.index[name].pb_object
    
    142
    +        return None
    
    143
    +
    
    144
    +    def _find_self_in_parent(self):
    
    145
    +        assert self.parent is not None
    
    146
    +        parent = self.parent
    
    147
    +        for (k, v) in parent.index.items():
    
    148
    +            if v.buildstream_object == self:
    
    149
    +                return k
    
    150
    +        return None
    
    151
    +
    
    152
    +    def _add_directory(self, name):
    
    153
    +        if name in self.index:
    
    154
    +            newdir = self.index[name].buildstream_object
    
    155
    +            if not isinstance(newdir, CasBasedDirectory):
    
    156
    +                # TODO: This may not be an actual error; it may actually overwrite it
    
    157
    +                raise VirtualDirectoryError("New directory {} in {} would overwrite existing non-directory of type {}"
    
    158
    +                                            .format(name, str(self), type(newdir)))
    
    159
    +            dirnode = self._find_pb2_entry(name)
    
    160
    +        else:
    
    161
    +            newdir = CasBasedDirectory(self.context, parent=self, filename=name)
    
    162
    +            dirnode = self.pb2_directory.directories.add()
    
    163
    +
    
    164
    +        dirnode.name = name
    
    165
    +
    
    166
    +        # Calculate the hash for an empty directory
    
    167
    +        new_directory = remote_execution_pb2.Directory()
    
    168
    +        self.cas_cache.add_object(digest=dirnode.digest, buffer=new_directory.SerializeToString())
    
    169
    +        self.index[name] = IndexEntry(dirnode, buildstream_object=newdir)
    
    170
    +        return newdir
    
    171
    +
    
    172
    +    def _add_new_file(self, basename, filename):
    
    173
    +        filenode = self.pb2_directory.files.add()
    
    174
    +        filenode.name = filename
    
    175
    +        self.cas_cache.add_object(digest=filenode.digest, path=os.path.join(basename, filename))
    
    176
    +        is_executable = os.access(os.path.join(basename, filename), os.X_OK)
    
    177
    +        filenode.is_executable = is_executable
    
    178
    +        self.index[filename] = IndexEntry(filenode, modified=(filename in self.index))
    
    179
    +
    
    180
    +    def _add_new_link(self, basename, filename):
    
    181
    +        existing_link = self._find_pb2_entry(filename)
    
    182
    +        if existing_link:
    
    183
    +            symlinknode = existing_link
    
    184
    +        else:
    
    185
    +            symlinknode = self.pb2_directory.symlinks.add()
    
    186
    +        symlinknode.name = filename
    
    187
    +        # A symlink node has no digest.
    
    188
    +        symlinknode.target = os.readlink(os.path.join(basename, filename))
    
    189
    +        self.index[filename] = IndexEntry(symlinknode, modified=(existing_link is not None))
    
    190
    +
    
    191
    +    def delete_entry(self, name):
    
    192
    +        for collection in [self.pb2_directory.files, self.pb2_directory.symlinks, self.pb2_directory.directories]:
    
    193
    +            if name in collection:
    
    194
    +                collection.remove(name)
    
    195
    +        if name in self.index:
    
    196
    +            del self.index[name]
    
    197
    +
    
    198
    +    def descend(self, subdirectory_spec, create=False):
    
    199
    +        """Descend one or more levels of directory hierarchy and return a new
    
    200
    +        Directory object for that directory.
    
    201
    +
    
    202
    +        Arguments:
    
    203
    +        * subdirectory_spec (list of strings): A list of strings which are all directory
    
    204
    +          names.
    
    205
    +        * create (boolean): If this is true, the directories will be created if
    
    206
    +          they don't already exist.
    
    207
    +
    
    208
    +        Note: At the moment, creating a directory by descending does
    
    209
    +        not update this object in the CAS cache. However, performing
    
    210
    +        an import_files() into a subdirectory of any depth obtained by
    
    211
    +        descending from this object *will* cause this directory to be
    
    212
    +        updated and stored.
    
    213
    +
    
    214
    +        """
    
    215
    +
    
    216
    +        # It's very common to send a directory name instead of a list and this causes
    
    217
    +        # bizarre errors, so check for it here
    
    218
    +        if not isinstance(subdirectory_spec, list):
    
    219
    +            subdirectory_spec = [subdirectory_spec]
    
    220
    +
    
    221
    +        # Because of the way split works, it's common to get a list which begins with
    
    222
    +        # an empty string. Detect these and remove them.
    
    223
    +        while subdirectory_spec and subdirectory_spec[0] == "":
    
    224
    +            subdirectory_spec.pop(0)
    
    225
    +
    
    226
    +        # Descending into [] returns the same directory.
    
    227
    +        if not subdirectory_spec:
    
    228
    +            return self
    
    229
    +
    
    230
    +        if subdirectory_spec[0] in self.index:
    
    231
    +            entry = self.index[subdirectory_spec[0]].buildstream_object
    
    232
    +            if isinstance(entry, CasBasedDirectory):
    
    233
    +                return entry.descend(subdirectory_spec[1:], create)
    
    234
    +            else:
    
    235
    +                error = "Cannot descend into {}, which is a '{}' in the directory {}"
    
    236
    +                raise VirtualDirectoryError(error.format(subdirectory_spec[0],
    
    237
    +                                                         type(entry).__name__,
    
    238
    +                                                         self))
    
    239
    +        else:
    
    240
    +            if create:
    
    241
    +                newdir = self._add_directory(subdirectory_spec[0])
    
    242
    +                return newdir.descend(subdirectory_spec[1:], create)
    
    243
    +            else:
    
    244
    +                error = "No entry called '{}' found in {}. There are directories called {}."
    
    245
    +                directory_list = ",".join([entry.name for entry in self.pb2_directory.directories])
    
    246
    +                raise VirtualDirectoryError(error.format(subdirectory_spec[0], str(self),
    
    247
    +                                                         directory_list))
    
    248
    +        return None
    
    249
    +
    
    250
    +    def find_root(self):
    
    251
    +        """ Finds the root of this directory tree by following 'parent' until there is
    
    252
    +        no parent. """
    
    253
    +        if self.parent:
    
    254
    +            return self.parent.find_root()
    
    255
    +        else:
    
    256
    +            return self
    
    257
    +
    
    258
    +    def _resolve_symlink_or_directory(self, name):
    
    259
    +        """Used only by _import_files_from_directory. Tries to resolve a
    
    260
    +        directory name or symlink name. 'name' must be an entry in this
    
    261
    +        directory. It must be a single symlink or directory name, not a path
    
    262
    +        separated by path separators. If it's an existing directory name, it
    
    263
    +        just returns the Directory object for that. If it's a symlink, it will
    
    264
    +        attempt to find the target of the symlink and return that as a
    
    265
    +        Directory object.
    
    266
    +
    
    267
    +        If a symlink target doesn't exist, it will attempt to create it
    
    268
    +        as a directory as long as it's within this directory tree.
    
    269
    +        """
    
    270
    +
    
    271
    +        if isinstance(self.index[name].buildstream_object, Directory):
    
    272
    +            return self.index[name].buildstream_object
    
    273
    +        # OK then, it's a symlink
    
    274
    +        symlink = self._find_pb2_entry(name)
    
    275
    +        absolute = symlink.target.startswith(CasBasedDirectory._pb2_absolute_path_prefix)
    
    276
    +        if absolute:
    
    277
    +            root = self.find_root()
    
    278
    +        else:
    
    279
    +            root = self
    
    280
    +        directory = root
    
    281
    +        components = symlink.target.split(CasBasedDirectory._pb2_path_sep)
    
    282
    +        for c in components:
    
    283
    +            if c == "..":
    
    284
    +                directory = directory.parent
    
    285
    +            else:
    
    286
    +                directory = directory.descend(c, create=True)
    
    287
    +        return directory
    
    288
    +
    
    289
    +    def _check_replacement(self, name, path_prefix, fileListResult):
    
    290
    +        """ Checks whether 'name' exists, and if so, whether we can overwrite it.
    
    291
    +        If we can, add the name to 'overwritten_files' and delete the existing entry.
    
    292
    +        Returns 'True' if the import should go ahead.
    
    293
    +        fileListResult.overwritten and fileListResult.ignore are updated depending
    
    294
    +        on the result. """
    
    295
    +        existing_entry = self._find_pb2_entry(name)
    
    296
    +        relative_pathname = os.path.join(path_prefix, name)
    
    297
    +        if existing_entry is None:
    
    298
    +            return True
    
    299
    +        if (isinstance(existing_entry,
    
    300
    +                       (remote_execution_pb2.FileNode, remote_execution_pb2.SymlinkNode))):
    
    301
    +            fileListResult.overwritten.append(relative_pathname)
    
    302
    +            return True
    
    303
    +        elif isinstance(existing_entry, remote_execution_pb2.DirectoryNode):
    
    304
    +            # If 'name' maps to a DirectoryNode, then there must be an entry in index
    
    305
    +            # pointing to another Directory.
    
    306
    +            if self.index[name].buildstream_object.is_empty():
    
    307
    +                self.delete_entry(name)
    
    308
    +                fileListResult.overwritten.append(relative_pathname)
    
    309
    +                return True
    
    310
    +            else:
    
    311
    +                # We can't overwrite a non-empty directory, so we just ignore it.
    
    312
    +                fileListResult.ignored.append(relative_pathname)
    
    313
    +                return False
    
    314
    +        assert False, ("Entry '{}' is not a recognised file/link/directory and not None; it is {}"
    
    315
    +                       .format(name, type(existing_entry)))
    
    316
    +        return False  # In case asserts are disabled
    
    317
    +
    
    318
    +    def _import_directory_recursively(self, directory_name, source_directory, remaining_path, path_prefix):
    
    319
    +        """ _import_directory_recursively and _import_files_from_directory will be called alternately
    
    320
    +        as a directory tree is descended. """
    
    321
    +        if directory_name in self.index:
    
    322
    +            subdir = self._resolve_symlink_or_directory(directory_name)
    
    323
    +        else:
    
    324
    +            subdir = self._add_directory(directory_name)
    
    325
    +        new_path_prefix = os.path.join(path_prefix, directory_name)
    
    326
    +        subdir_result = subdir._import_files_from_directory(os.path.join(source_directory, directory_name),
    
    327
    +                                                            [os.path.sep.join(remaining_path)],
    
    328
    +                                                            path_prefix=new_path_prefix)
    
    329
    +        return subdir_result
    
    330
    +
    
    331
    +    def _import_files_from_directory(self, source_directory, files, path_prefix=""):
    
    332
    +        """ Imports files from a traditional directory """
    
    333
    +        result = FileListResult()
    
    334
    +        for entry in sorted(files):
    
    335
    +            split_path = entry.split(os.path.sep)
    
    336
    +            # The actual file on the FS we're importing
    
    337
    +            import_file = os.path.join(source_directory, entry)
    
    338
    +            # The destination filename, relative to the root where the import started
    
    339
    +            relative_pathname = os.path.join(path_prefix, entry)
    
    340
    +            if len(split_path) > 1:
    
    341
    +                directory_name = split_path[0]
    
    342
    +                # Hand this off to the importer for that subdir. This will only do one file -
    
    343
    +                # a better way would be to hand off all the files in this subdir at once.
    
    344
    +                subdir_result = self._import_directory_recursively(directory_name, source_directory,
    
    345
    +                                                                   split_path[1:], path_prefix)
    
    346
    +                result.combine(subdir_result)
    
    347
    +            elif os.path.islink(import_file):
    
    348
    +                if self._check_replacement(entry, path_prefix, result):
    
    349
    +                    self._add_new_link(source_directory, entry)
    
    350
    +                    result.files_written.append(relative_pathname)
    
    351
    +            elif os.path.isdir(import_file):
    
    352
    +                # A plain directory which already exists isn't a problem; just ignore it.
    
    353
    +                if entry not in self.index:
    
    354
    +                    self._add_directory(entry)
    
    355
    +            elif os.path.isfile(import_file):
    
    356
    +                if self._check_replacement(entry, path_prefix, result):
    
    357
    +                    self._add_new_file(source_directory, entry)
    
    358
    +                    result.files_written.append(relative_pathname)
    
    359
    +        return result
    
    360
    +
    
    361
    +    def import_files(self, external_pathspec, *, files=None,
    
    362
    +                     report_written=True, update_utimes=False,
    
    363
    +                     can_link=False):
    
    364
    +        """Imports some or all files from external_path into this directory.
    
    365
    +
    
    366
    +        Keyword arguments: external_pathspec: Either a string
    
    367
    +        containing a pathname, or a Directory object, to use as the
    
    368
    +        source.
    
    369
    +
    
    370
    +        files (list of strings): A list of all the files relative to
    
    371
    +        the external_pathspec to copy. If 'None' is supplied, all
    
    372
    +        files are copied.
    
    373
    +
    
    374
    +        report_written (bool): Return the full list of files
    
    375
    +        written. Defaults to true. If false, only a list of
    
    376
    +        overwritten files is returned.
    
    377
    +
    
    378
    +        update_utimes (bool): Currently ignored, since CAS does not store utimes.
    
    379
    +
    
    380
    +        can_link (bool): Ignored, since hard links do not have any meaning within CAS.
    
    381
    +        """
    
    382
    +        if isinstance(external_pathspec, FileBasedDirectory):
    
    383
    +            source_directory = external_pathspec._get_underlying_directory()
    
    384
    +        elif isinstance(external_pathspec, CasBasedDirectory):
    
    385
    +            # TODO: This transfers from one CAS to another via the
    
    386
    +            # filesystem, which is very inefficient. Alter this so it
    
    387
    +            # transfers refs across directly.
    
    388
    +            with tempfile.TemporaryDirectory(prefix="roundtrip") as tmpdir:
    
    389
    +                external_pathspec.export_files(tmpdir)
    
    390
    +                if files is None:
    
    391
    +                    files = list_relative_paths(tmpdir)
    
    392
    +                result = self._import_files_from_directory(tmpdir, files=files)
    
    393
    +            return result
    
    394
    +        else:
    
    395
    +            source_directory = external_pathspec
    
    396
    +
    
    397
    +        if files is None:
    
    398
    +            files = list_relative_paths(source_directory)
    
    399
    +
    
    400
    +        # TODO: No notice is taken of report_written, update_utimes or can_link.
    
    401
    +        # Current behaviour is to fully populate the report, which is inefficient,
    
    402
    +        # but still correct.
    
    403
    +        result = self._import_files_from_directory(source_directory, files=files)
    
    404
    +
    
    405
    +        # We need to recalculate and store the hashes of all directories both
    
    406
    +        # up and down the tree; we have changed our directory by importing files
    
    407
    +        # which changes our hash and all our parents' hashes of us. The trees
    
    408
    +        # lower down need to be stored in the CAS as they are not automatically
    
    409
    +        # added during construction.
    
    410
    +        self._recalculate_recursing_down()
    
    411
    +        if self.parent:
    
    412
    +            self.parent._recalculate_recursing_up(self)
    
    413
    +        return result
    
    414
    +
    
    415
    +    def set_deterministic_mtime(self):
    
    416
    +        """ Sets a static modification time for all regular files in this directory.
    
    417
    +        Since we don't store any modification time, we don't need to do anything.
    
    418
    +        """
    
    419
    +        pass
    
    420
    +
    
    421
    +    def set_deterministic_user(self):
    
    422
    +        """ Sets all files in this directory to the current user's euid/egid.
    
    423
    +        We also don't store user data, so this can be ignored.
    
    424
    +        """
    
    425
    +        pass
    
    426
    +
    
    427
    +    def export_files(self, to_directory, *, can_link=False, can_destroy=False):
    
    428
    +        """Copies everything from this into to_directory, which must be the name
    
    429
    +        of a traditional filesystem directory.
    
    430
    +
    
    431
    +        Arguments:
    
    432
    +
    
    433
    +        to_directory (string): a path outside this directory object
    
    434
    +        where the contents will be copied to.
    
    435
    +
    
    436
    +        can_link (bool): Whether we can create hard links in to_directory
    
    437
    +        instead of copying.
    
    438
    +
    
    439
    +        can_destroy (bool): Whether we can destroy elements in this
    
    440
    +        directory to export them (e.g. by renaming them as the
    
    441
    +        target).
    
    442
    +
    
    443
    +        """
    
    444
    +
    
    445
    +        if not os.path.exists(to_directory):
    
    446
    +            os.mkdir(to_directory)
    
    447
    +
    
    448
    +        for entry in self.pb2_directory.directories:
    
    449
    +            if entry.name not in self.index:
    
    450
    +                raise VirtualDirectoryError("CasDir {} contained {} in directories but not in the index"
    
    451
    +                                            .format(str(self), entry.name))
    
    452
    +            if not self._directory_read:
    
    453
    +                raise VirtualDirectoryError("CasDir {} has not been indexed yet".format(str(self)))
    
    454
    +            dest_dir = os.path.join(to_directory, entry.name)
    
    455
    +            if not os.path.exists(dest_dir):
    
    456
    +                os.mkdir(dest_dir)
    
    457
    +            target = self.descend([entry.name])
    
    458
    +            target.export_files(dest_dir)
    
    459
    +        for entry in self.pb2_directory.files:
    
    460
    +            # Extract the entry to a single file
    
    461
    +            dest_name = os.path.join(to_directory, entry.name)
    
    462
    +            src_name = self.cas_cache.objpath(entry.digest)
    
    463
    +            safe_copy(src_name, dest_name)
    
    464
    +            if entry.is_executable:
    
    465
    +                os.chmod(dest_name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR |
    
    466
    +                         stat.S_IRGRP | stat.S_IXGRP |
    
    467
    +                         stat.S_IROTH | stat.S_IXOTH)
    
    468
    +        for entry in self.pb2_directory.symlinks:
    
    469
    +            src_name = os.path.join(to_directory, entry.name)
    
    470
    +            target_name = entry.target
    
    471
    +            try:
    
    472
    +                os.symlink(target_name, src_name)
    
    473
    +            except FileExistsError as e:
    
    474
    +                raise BstError(("Cannot create a symlink named {} pointing to {}." +
    
    475
    +                                " The original error was: {}").
    
    476
    +                               format(src_name, entry.target, e))
    
    477
    +
    
    478
    +    def export_to_tar(self, tarfile, destination_dir, mtime=0):
    
    479
    +        raise NotImplementedError()
    
    480
    +
    
    481
    +    def mark_changed(self):
    
    482
    +        """ It should not be possible to externally modify a CAS-based
    
    483
    +        directory at the moment."""
    
    484
    +        raise NotImplementedError()
    
    485
    +
    
    486
    +    def is_empty(self):
    
    487
    +        """ Return true if this directory has no files, subdirectories or links in it.
    
    488
    +        """
    
    489
    +        return len(self.index) == 0
    
    490
    +
    
    491
    +    def _mark_directory_unmodified(self):
    
    492
    +        # Marks all entries in this directory and all child directories as unmodified.
    
    493
    +        for i in self.index.values():
    
    494
    +            i.modified = False
    
    495
    +            if isinstance(i.buildstream_object, CasBasedDirectory):
    
    496
    +                i.buildstream_object._mark_directory_unmodified()
    
    497
    +
    
    498
    +    def _mark_entry_unmodified(self, name):
    
    499
    +        # Marks an entry as unmodified. If the entry is a directory, it will
    
    500
    +        # recursively mark all its tree as unmodified.
    
    501
    +        self.index[name].modified = False
    
    502
    +        if self.index[name].buildstream_object:
    
    503
    +            self.index[name].buildstream_object._mark_directory_unmodified()
    
    504
    +
    
    505
    +    def mark_unmodified(self):
    
    506
    +        """ Marks all files in this directory (recursively) as unmodified.
    
    507
    +        If we have a parent, we mark our own entry as unmodified in that parent's
    
    508
    +        index.
    
    509
    +        """
    
    510
    +        if self.parent:
    
    511
    +            self.parent._mark_entry_unmodified(self._find_self_in_parent())
    
    512
    +        else:
    
    513
    +            self._mark_directory_unmodified()
    
    514
    +
    
    515
    +    def list_modified_paths(self):
    
    516
    +        """Provide a list of relative paths which have been modified since the
    
    517
    +        last call to mark_unmodified.
    
    518
    +
    
    519
    +        Return value: List(str) - list of modified paths
    
    520
    +        """
    
    521
    +
    
    522
    +        filelist = []
    
    523
    +        for (k, v) in self.index.items():
    
    524
    +            if isinstance(v.buildstream_object, CasBasedDirectory):
    
    525
    +                filelist.extend([k + os.path.sep + x for x in v.buildstream_object.list_modified_paths()])
    
    526
    +            elif isinstance(v.pb_object, remote_execution_pb2.FileNode) and v.modified:
    
    527
    +                filelist.append(k)
    
    528
    +        return filelist
    
    529
    +
    
    530
    +    def list_relative_paths(self):
    
    531
    +        """Provide a list of all relative paths.
    
    532
    +
    
    533
    +        NOTE: This list is not in the same order as utils.list_relative_paths.
    
    534
    +
    
    535
    +        Return value: List(str) - list of all paths
    
    536
    +        """
    
    537
    +
    
    538
    +        filelist = []
    
    539
    +        for (k, v) in self.index.items():
    
    540
    +            if isinstance(v.buildstream_object, CasBasedDirectory):
    
    541
    +                filelist.extend([k + os.path.sep + x for x in v.buildstream_object.list_relative_paths()])
    
    542
    +            elif isinstance(v.pb_object, remote_execution_pb2.FileNode):
    
    543
    +                filelist.append(k)
    
    544
    +        return filelist
    
    545
    +
    
    546
    +    def _get_identifier(self):
    
    547
    +        path = ""
    
    548
    +        if self.parent:
    
    549
    +            path = self.parent._get_identifier()
    
    550
    +        if self.filename:
    
    551
    +            path += "/" + self.filename
    
    552
    +        else:
    
    553
    +            path += "/" + self.common_name
    
    554
    +        return path
    
    555
    +
    
    556
    +    def __str__(self):
    
    557
    +        return "[CAS:{}]".format(self._get_identifier())
    
    558
    +
    
    559
    +    def _get_underlying_directory(self):
    
    560
    +        """ There is no underlying directory for a CAS-backed directory, so
    
    561
    +        throw an exception. """
    
    562
    +        raise VirtualDirectoryError("_get_underlying_directory was called on a CAS-backed directory," +
    
    563
    +                                    " which has no underlying directory.")

  • buildstream/storage/_filebaseddirectory.py
    ... ... @@ -29,25 +29,12 @@ See also: :ref:`sandboxing`.
    29 29
     
    
    30 30
     import os
    
    31 31
     import time
    
    32
    -from .._exceptions import BstError, ErrorDomain
    
    33
    -from .directory import Directory
    
    32
    +from .directory import Directory, VirtualDirectoryError
    
    34 33
     from ..utils import link_files, copy_files, list_relative_paths, _get_link_mtime, _magic_timestamp
    
    35 34
     from ..utils import _set_deterministic_user, _set_deterministic_mtime
    
    36 35
     
    
    37
    -
    
    38
    -class VirtualDirectoryError(BstError):
    
    39
    -    """Raised by Directory functions when system calls fail.
    
    40
    -    This will be handled internally by the BuildStream core,
    
    41
    -    if you need to handle this error, then it should be reraised,
    
    42
    -    or either of the :class:`.ElementError` or :class:`.SourceError`
    
    43
    -    exceptions should be raised from this error.
    
    44
    -    """
    
    45
    -    def __init__(self, message, reason=None):
    
    46
    -        super().__init__(message, domain=ErrorDomain.VIRTUAL_FS, reason=reason)
    
    47
    -
    
    48
    -
    
    49 36
     # FileBasedDirectory intentionally doesn't call its superclass constuctor,
    
    50
    -# which is mean to be unimplemented.
    
    37
    +# which is meant to be unimplemented.
    
    51 38
     # pylint: disable=super-init-not-called
    
    52 39
     
    
    53 40
     
    
    ... ... @@ -108,7 +95,8 @@ class FileBasedDirectory(Directory):
    108 95
                 if create:
    
    109 96
                     new_path = os.path.join(self.external_directory, subdirectory_spec[0])
    
    110 97
                     os.makedirs(new_path, exist_ok=True)
    
    111
    -                return FileBasedDirectory(new_path).descend(subdirectory_spec[1:], create)
    
    98
    +                self.index[subdirectory_spec[0]] = FileBasedDirectory(new_path).descend(subdirectory_spec[1:], create)
    
    99
    +                return self.index[subdirectory_spec[0]]
    
    112 100
                 else:
    
    113 101
                     error = "No entry called '{}' found in the directory rooted at {}"
    
    114 102
                     raise VirtualDirectoryError(error.format(subdirectory_spec[0], self.external_directory))
    
    ... ... @@ -134,8 +122,12 @@ class FileBasedDirectory(Directory):
    134 122
     
    
    135 123
                 for f in import_result.files_written:
    
    136 124
                     os.utime(os.path.join(self.external_directory, f), times=(cur_time, cur_time))
    
    125
    +        self._mark_changed()
    
    137 126
             return import_result
    
    138 127
     
    
    128
    +    def _mark_changed(self):
    
    129
    +        self._directory_read = False
    
    130
    +
    
    139 131
         def set_deterministic_mtime(self):
    
    140 132
             _set_deterministic_mtime(self.external_directory)
    
    141 133
     
    
    ... ... @@ -214,3 +206,8 @@ class FileBasedDirectory(Directory):
    214 206
             # which exposes the sandbox directory; we will have to assume for the time being
    
    215 207
             # that people will not abuse __str__.
    
    216 208
             return self.external_directory
    
    209
    +
    
    210
    +    def _get_underlying_directory(self) -> str:
    
    211
    +        """ Returns the underlying (real) file system directory this
    
    212
    +        object refers to. """
    
    213
    +        return self.external_directory

  • buildstream/storage/directory.py
    ... ... @@ -31,6 +31,19 @@ See also: :ref:`sandboxing`.
    31 31
     
    
    32 32
     """
    
    33 33
     
    
    34
    +from .._exceptions import BstError, ErrorDomain
    
    35
    +
    
    36
    +
    
    37
    +class VirtualDirectoryError(BstError):
    
    38
    +    """Raised by Directory functions when system calls fail.
    
    39
    +    This will be handled internally by the BuildStream core,
    
    40
    +    if you need to handle this error, then it should be reraised,
    
    41
    +    or either of the :class:`.ElementError` or :class:`.SourceError`
    
    42
    +    exceptions should be raised from this error.
    
    43
    +    """
    
    44
    +    def __init__(self, message, reason=None):
    
    45
    +        super().__init__(message, domain=ErrorDomain.VIRTUAL_FS, reason=reason)
    
    46
    +
    
    34 47
     
    
    35 48
     class Directory():
    
    36 49
         def __init__(self, external_directory=None):
    
    ... ... @@ -153,3 +166,13 @@ class Directory():
    153 166
     
    
    154 167
             """
    
    155 168
             raise NotImplementedError()
    
    169
    +
    
    170
    +    def _mark_changed(self):
    
    171
    +        """Internal function to mark this directory as having been changed
    
    172
    +        outside this API. This normally can only happen by calling the
    
    173
    +        Sandbox's `run` method. This does *not* mark everything as modified
    
    174
    +        (i.e. list_modified_paths will not necessarily return the same results
    
    175
    +        as list_relative_paths after calling this.)
    
    176
    +
    
    177
    +        """
    
    178
    +        raise NotImplementedError()

  • tests/sandboxes/storage-test/original/bin/bash
    1
    +This is the original /bin/bash.

  • tests/sandboxes/storage-test/original/bin/hello
    1
    +This is the original /bin/hello.

  • tests/sandboxes/storage-test/overlay/bin/bash
    1
    +This is the replacement /bin/bash.

  • tests/sandboxes/storage-tests.py
    1
    +import os
    
    2
    +import pytest
    
    3
    +
    
    4
    +from buildstream._exceptions import ErrorDomain
    
    5
    +
    
    6
    +from buildstream._context import Context
    
    7
    +from buildstream.storage._casbaseddirectory import CasBasedDirectory
    
    8
    +from buildstream.storage._filebaseddirectory import FileBasedDirectory
    
    9
    +
    
    10
    +DATA_DIR = os.path.join(
    
    11
    +    os.path.dirname(os.path.realpath(__file__)),
    
    12
    +    "storage-test"
    
    13
    +)
    
    14
    +
    
    15
    +
    
    16
    +def setup_backend(backend_class, tmpdir):
    
    17
    +    if backend_class == FileBasedDirectory:
    
    18
    +        return backend_class(os.path.join(tmpdir, "vdir"))
    
    19
    +    else:
    
    20
    +        context = Context()
    
    21
    +        context.artifactdir = os.path.join(tmpdir, "cas")
    
    22
    +        return backend_class(context)
    
    23
    +
    
    24
    +
    
    25
    +@pytest.mark.parametrize("backend", [
    
    26
    +    FileBasedDirectory, CasBasedDirectory])
    
    27
    +@pytest.mark.datafiles(DATA_DIR)
    
    28
    +def test_import(tmpdir, datafiles, backend):
    
    29
    +    original = os.path.join(str(datafiles), "original")
    
    30
    +
    
    31
    +    c = setup_backend(backend, str(tmpdir))
    
    32
    +
    
    33
    +    c.import_files(original)
    
    34
    +
    
    35
    +    assert("bin/bash" in c.list_relative_paths())
    
    36
    +    assert("bin/hello" in c.list_relative_paths())
    
    37
    +
    
    38
    +
    
    39
    +@pytest.mark.parametrize("backend", [
    
    40
    +    FileBasedDirectory, CasBasedDirectory])
    
    41
    +@pytest.mark.datafiles(DATA_DIR)
    
    42
    +def test_modified_file_list(tmpdir, datafiles, backend):
    
    43
    +    original = os.path.join(str(datafiles), "original")
    
    44
    +    overlay = os.path.join(str(datafiles), "overlay")
    
    45
    +
    
    46
    +    c = setup_backend(backend, str(tmpdir))
    
    47
    +
    
    48
    +    c.import_files(original)
    
    49
    +
    
    50
    +    c.mark_unmodified()
    
    51
    +
    
    52
    +    c.import_files(overlay)
    
    53
    +
    
    54
    +    print("List of all paths in imported results: {}".format(c.list_relative_paths()))
    
    55
    +    assert("bin/bash" in c.list_relative_paths())
    
    56
    +    assert("bin/bash" in c.list_modified_paths())
    
    57
    +    assert("bin/hello" not in c.list_modified_paths())



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]