[Notes] [Git][BuildStream/buildstream][valentindavid/cache_server_fill_up-1.2] 8 commits: Fix outside-of-project check when project path is not canonical.



Title: GitLab

Valentin David pushed to branch valentindavid/cache_server_fill_up-1.2 at BuildStream / buildstream

Commits:

6 changed files:

Changes:

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -26,6 +26,7 @@ import stat
    26 26
     import tempfile
    
    27 27
     import uuid
    
    28 28
     import errno
    
    29
    +import contextlib
    
    29 30
     from urllib.parse import urlparse
    
    30 31
     
    
    31 32
     import grpc
    
    ... ... @@ -393,13 +394,14 @@ class CASCache(ArtifactCache):
    393 394
         #     digest (Digest): An optional Digest object to populate
    
    394 395
         #     path (str): Path to file to add
    
    395 396
         #     buffer (bytes): Byte buffer to add
    
    397
    +    #     link_directly (bool): Whether file given by path can be linked
    
    396 398
         #
    
    397 399
         # Returns:
    
    398 400
         #     (Digest): The digest of the added object
    
    399 401
         #
    
    400 402
         # Either `path` or `buffer` must be passed, but not both.
    
    401 403
         #
    
    402
    -    def add_object(self, *, digest=None, path=None, buffer=None):
    
    404
    +    def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False):
    
    403 405
             # Exactly one of the two parameters has to be specified
    
    404 406
             assert (path is None) != (buffer is None)
    
    405 407
     
    
    ... ... @@ -409,28 +411,34 @@ class CASCache(ArtifactCache):
    409 411
             try:
    
    410 412
                 h = hashlib.sha256()
    
    411 413
                 # Always write out new file to avoid corruption if input file is modified
    
    412
    -            with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
    
    413
    -                # Set mode bits to 0644
    
    414
    -                os.chmod(out.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
    
    415
    -
    
    416
    -                if path:
    
    417
    -                    with open(path, 'rb') as f:
    
    418
    -                        for chunk in iter(lambda: f.read(4096), b""):
    
    419
    -                            h.update(chunk)
    
    420
    -                            out.write(chunk)
    
    414
    +            with contextlib.ExitStack() as stack:
    
    415
    +                if path is not None and link_directly:
    
    416
    +                    tmp = stack.enter_context(open(path, 'rb'))
    
    417
    +                    for chunk in iter(lambda: tmp.read(4096), b""):
    
    418
    +                        h.update(chunk)
    
    421 419
                     else:
    
    422
    -                    h.update(buffer)
    
    423
    -                    out.write(buffer)
    
    420
    +                    tmp = stack.enter_context(tempfile.NamedTemporaryFile(dir=self.tmpdir))
    
    421
    +                    # Set mode bits to 0644
    
    422
    +                    os.chmod(tmp.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
    
    424 423
     
    
    425
    -                out.flush()
    
    424
    +                    if path:
    
    425
    +                        with open(path, 'rb') as f:
    
    426
    +                            for chunk in iter(lambda: f.read(4096), b""):
    
    427
    +                                h.update(chunk)
    
    428
    +                                tmp.write(chunk)
    
    429
    +                    else:
    
    430
    +                        h.update(buffer)
    
    431
    +                        tmp.write(buffer)
    
    432
    +
    
    433
    +                    tmp.flush()
    
    426 434
     
    
    427 435
                     digest.hash = h.hexdigest()
    
    428
    -                digest.size_bytes = os.fstat(out.fileno()).st_size
    
    436
    +                digest.size_bytes = os.fstat(tmp.fileno()).st_size
    
    429 437
     
    
    430 438
                     # Place file at final location
    
    431 439
                     objpath = self.objpath(digest)
    
    432 440
                     os.makedirs(os.path.dirname(objpath), exist_ok=True)
    
    433
    -                os.link(out.name, objpath)
    
    441
    +                os.link(tmp.name, objpath)
    
    434 442
     
    
    435 443
             except FileExistsError as e:
    
    436 444
                 # We can ignore the failed link() if the object is already in the repo.
    
    ... ... @@ -565,7 +573,12 @@ class CASCache(ArtifactCache):
    565 573
         #
    
    566 574
         # Prune unreachable objects from the repo.
    
    567 575
         #
    
    568
    -    def prune(self):
    
    576
    +    # Args:
    
    577
    +    #    keep_after (int|None): timestamp after which unreachable objects
    
    578
    +    #                           are kept. None if no unreachable object
    
    579
    +    #                           should be kept.
    
    580
    +    #
    
    581
    +    def prune(self, keep_after=None):
    
    569 582
             ref_heads = os.path.join(self.casdir, 'refs', 'heads')
    
    570 583
     
    
    571 584
             pruned = 0
    
    ... ... @@ -586,6 +599,10 @@ class CASCache(ArtifactCache):
    586 599
                     objhash = os.path.basename(root) + filename
    
    587 600
                     if objhash not in reachable:
    
    588 601
                         obj_path = os.path.join(root, filename)
    
    602
    +                    if keep_after:
    
    603
    +                        st = os.stat(obj_path)
    
    604
    +                        if st.st_mtime >= keep_after:
    
    605
    +                            continue
    
    589 606
                         pruned += os.stat(obj_path).st_size
    
    590 607
                         os.unlink(obj_path)
    
    591 608
     
    

  • buildstream/_artifactcache/casserver.py
    ... ... @@ -24,6 +24,9 @@ import signal
    24 24
     import sys
    
    25 25
     import tempfile
    
    26 26
     import uuid
    
    27
    +import time
    
    28
    +import errno
    
    29
    +import ctypes
    
    27 30
     
    
    28 31
     import click
    
    29 32
     import grpc
    
    ... ... @@ -41,6 +44,10 @@ from .cascache import CASCache
    41 44
     # The default limit for gRPC messages is 4 MiB
    
    42 45
     _MAX_BATCH_TOTAL_SIZE_BYTES = 4 * 1024 * 1024
    
    43 46
     
    
    47
    +# The minimum age in seconds for objects before they can be cleaned
    
    48
    +# up.
    
    49
    +_OBJECT_MIN_AGE = 6 * 60 * 60
    
    50
    +
    
    44 51
     
    
    45 52
     # Trying to push an artifact that is too large
    
    46 53
     class ArtifactTooLargeException(Exception):
    
    ... ... @@ -130,11 +137,43 @@ def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
    130 137
             server.stop(0)
    
    131 138
     
    
    132 139
     
    
    140
    +class _FallocateCall:
    
    141
    +
    
    142
    +    FALLOC_FL_KEEP_SIZE = 1
    
    143
    +    FALLOC_FL_PUNCH_HOLE = 2
    
    144
    +    FALLOC_FL_NO_HIDE_STALE = 4
    
    145
    +    FALLOC_FL_COLLAPSE_RANGE = 8
    
    146
    +    FALLOC_FL_ZERO_RANGE = 16
    
    147
    +    FALLOC_FL_INSERT_RANGE = 32
    
    148
    +    FALLOC_FL_UNSHARE_RANGE = 64
    
    149
    +
    
    150
    +    def __init__(self):
    
    151
    +        self.libc = ctypes.CDLL("libc.so.6", use_errno=True)
    
    152
    +        try:
    
    153
    +            self.fallocate64 = self.libc.fallocate64
    
    154
    +        except AttributeError:
    
    155
    +            self.fallocate = self.libc.fallocate
    
    156
    +
    
    157
    +    def __call__(self, fd, mode, offset, length):
    
    158
    +        if hasattr(self, 'fallocate64'):
    
    159
    +            print(fd, mode, offset, length)
    
    160
    +            ret = self.fallocate64(ctypes.c_int(fd), ctypes.c_int(mode),
    
    161
    +                                   ctypes.c_int64(offset), ctypes.c_int64(length))
    
    162
    +        else:
    
    163
    +            ret = self.fallocate(ctypes.c_int(fd), ctypes.c_int(mode),
    
    164
    +                                 ctypes.c_int(offset), ctypes.c_int(length))
    
    165
    +        if ret == -1:
    
    166
    +            errno = ctypes.get_errno()
    
    167
    +            raise OSError(errno, os.strerror(errno))
    
    168
    +        return ret
    
    169
    +
    
    170
    +
    
    133 171
     class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    
    134 172
         def __init__(self, cas, *, enable_push):
    
    135 173
             super().__init__()
    
    136 174
             self.cas = cas
    
    137 175
             self.enable_push = enable_push
    
    176
    +        self.fallocate = _FallocateCall()
    
    138 177
     
    
    139 178
         def Read(self, request, context):
    
    140 179
             resource_name = request.resource_name
    
    ... ... @@ -192,25 +231,44 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    192 231
                             context.set_code(grpc.StatusCode.NOT_FOUND)
    
    193 232
                             return response
    
    194 233
     
    
    195
    -                    try:
    
    196
    -                        _clean_up_cache(self.cas, client_digest.size_bytes)
    
    197
    -                    except ArtifactTooLargeException as e:
    
    198
    -                        context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
    
    199
    -                        context.set_details(str(e))
    
    200
    -                        return response
    
    234
    +                    while True:
    
    235
    +                        if client_digest.size_bytes == 0:
    
    236
    +                            break
    
    237
    +                        try:
    
    238
    +                            _clean_up_cache(self.cas, client_digest.size_bytes)
    
    239
    +                        except ArtifactTooLargeException as e:
    
    240
    +                            context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
    
    241
    +                            context.set_details(str(e))
    
    242
    +                            return response
    
    243
    +
    
    244
    +                        try:
    
    245
    +                            self.fallocate(out.fileno(), 0, 0, client_digest.size_bytes)
    
    246
    +                            break
    
    247
    +                        except OSError as e:
    
    248
    +                            # Multiple upload can happen in the same time
    
    249
    +                            if e.errno != errno.ENOSPC:
    
    250
    +                                raise
    
    251
    +
    
    201 252
                     elif request.resource_name:
    
    202 253
                         # If it is set on subsequent calls, it **must** match the value of the first request.
    
    203 254
                         if request.resource_name != resource_name:
    
    204 255
                             context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    205 256
                             return response
    
    257
    +
    
    258
    +                if (offset + len(request.data)) > client_digest.size_bytes:
    
    259
    +                    context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    260
    +                    return response
    
    261
    +
    
    206 262
                     out.write(request.data)
    
    263
    +
    
    207 264
                     offset += len(request.data)
    
    265
    +
    
    208 266
                     if request.finish_write:
    
    209 267
                         if client_digest.size_bytes != offset:
    
    210 268
                             context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    211 269
                             return response
    
    212 270
                         out.flush()
    
    213
    -                    digest = self.cas.add_object(path=out.name)
    
    271
    +                    digest = self.cas.add_object(path=out.name, link_directly=True)
    
    214 272
                         if digest.hash != client_digest.hash:
    
    215 273
                             context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    216 274
                             return response
    
    ... ... @@ -230,10 +288,17 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
    230 288
         def FindMissingBlobs(self, request, context):
    
    231 289
             response = remote_execution_pb2.FindMissingBlobsResponse()
    
    232 290
             for digest in request.blob_digests:
    
    233
    -            if not _has_object(self.cas, digest):
    
    234
    -                d = response.missing_blob_digests.add()
    
    235
    -                d.hash = digest.hash
    
    236
    -                d.size_bytes = digest.size_bytes
    
    291
    +            objpath = self.cas.objpath(digest)
    
    292
    +            try:
    
    293
    +                os.utime(objpath)
    
    294
    +            except OSError as e:
    
    295
    +                if e.errno != errno.ENOENT:
    
    296
    +                    raise
    
    297
    +                else:
    
    298
    +                    d = response.missing_blob_digests.add()
    
    299
    +                    d.hash = digest.hash
    
    300
    +                    d.size_bytes = digest.size_bytes
    
    301
    +
    
    237 302
             return response
    
    238 303
     
    
    239 304
         def BatchReadBlobs(self, request, context):
    
    ... ... @@ -362,11 +427,6 @@ def _digest_from_upload_resource_name(resource_name):
    362 427
             return None
    
    363 428
     
    
    364 429
     
    
    365
    -def _has_object(cas, digest):
    
    366
    -    objpath = cas.objpath(digest)
    
    367
    -    return os.path.exists(objpath)
    
    368
    -
    
    369
    -
    
    370 430
     # _clean_up_cache()
    
    371 431
     #
    
    372 432
     # Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
    
    ... ... @@ -399,7 +459,14 @@ def _clean_up_cache(cas, object_size):
    399 459
         # obtain a list of LRP artifacts
    
    400 460
         LRP_artifacts = cas.list_artifacts()
    
    401 461
     
    
    462
    +    keep_after = time.time() - _OBJECT_MIN_AGE
    
    463
    +
    
    402 464
         removed_size = 0  # in bytes
    
    465
    +    if object_size - removed_size > free_disk_space:
    
    466
    +        # First we try to see if some unreferenced objects became old
    
    467
    +        # enough to be removed.
    
    468
    +        removed_size += cas.prune(keep_after=keep_after)
    
    469
    +
    
    403 470
         while object_size - removed_size > free_disk_space:
    
    404 471
             try:
    
    405 472
                 to_remove = LRP_artifacts.pop(0)  # The first element in the list is the LRP artifact
    
    ... ... @@ -411,7 +478,8 @@ def _clean_up_cache(cas, object_size):
    411 478
                                                 "the filesystem which mounts the remote "
    
    412 479
                                                 "cache".format(object_size))
    
    413 480
     
    
    414
    -        removed_size += cas.remove(to_remove, defer_prune=False)
    
    481
    +        cas.remove(to_remove, defer_prune=True)
    
    482
    +        removed_size += cas.prune(keep_after=keep_after)
    
    415 483
     
    
    416 484
         if removed_size > 0:
    
    417 485
             logging.info("Successfully removed {} bytes from the cache".format(removed_size))
    

  • buildstream/_yaml.py
    ... ... @@ -467,7 +467,7 @@ def node_get_project_path(node, key, project_dir, *,
    467 467
                             "{}: Specified path '{}' does not exist"
    
    468 468
                             .format(provenance, path_str))
    
    469 469
     
    
    470
    -    is_inside = project_dir_path in full_resolved_path.parents or (
    
    470
    +    is_inside = project_dir_path.resolve() in full_resolved_path.parents or (
    
    471 471
             full_resolved_path == project_dir_path)
    
    472 472
     
    
    473 473
         if path.is_absolute() or not is_inside:
    

  • tests/format/project.py
    ... ... @@ -181,3 +181,15 @@ def test_project_refs_options(cli, datafiles):
    181 181
     
    
    182 182
         # Assert that the cache keys are different
    
    183 183
         assert result1.output != result2.output
    
    184
    +
    
    185
    +
    
    186
    +@pytest.mark.datafiles(os.path.join(DATA_DIR, 'element-path'))
    
    187
    +def test_element_path_project_path_contains_symlinks(cli, datafiles, tmpdir):
    
    188
    +    real_project = str(datafiles)
    
    189
    +    linked_project = os.path.join(str(tmpdir), 'linked')
    
    190
    +    os.symlink(real_project, linked_project)
    
    191
    +    os.makedirs(os.path.join(real_project, 'elements'), exist_ok=True)
    
    192
    +    with open(os.path.join(real_project, 'elements', 'element.bst'), 'w') as f:
    
    193
    +        f.write("kind: manual\n")
    
    194
    +    result = cli.run(project=linked_project, args=['show', 'element.bst'])
    
    195
    +    result.assert_success()

  • tests/frontend/push.py
    ... ... @@ -231,6 +231,8 @@ def test_artifact_expires(cli, datafiles, tmpdir):
    231 231
             assert cli.get_element_state(project, 'element2.bst') == 'cached'
    
    232 232
             assert_shared(cli, share, project, 'element2.bst')
    
    233 233
     
    
    234
    +        share.make_all_objects_older()
    
    235
    +
    
    234 236
             # Create and build another element of 5 MB (This will exceed the free disk space available)
    
    235 237
             create_element_size('element3.bst', project, element_path, [], int(5e6))
    
    236 238
             result = cli.run(project=project, args=['build', 'element3.bst'])
    
    ... ... @@ -327,6 +329,8 @@ def test_recently_pulled_artifact_does_not_expire(cli, datafiles, tmpdir):
    327 329
             # Ensure element1 is cached locally
    
    328 330
             assert cli.get_element_state(project, 'element1.bst') == 'cached'
    
    329 331
     
    
    332
    +        share.make_all_objects_older()
    
    333
    +
    
    330 334
             # Create and build the element3 (of 5 MB)
    
    331 335
             create_element_size('element3.bst', project, element_path, [], int(5e6))
    
    332 336
             result = cli.run(project=project, args=['build', 'element3.bst'])
    

  • tests/testutils/artifactshare.py
    ... ... @@ -122,6 +122,15 @@ class ArtifactShare():
    122 122
             except ArtifactError:
    
    123 123
                 return False
    
    124 124
     
    
    125
    +    def make_all_objects_older(self):
    
    126
    +        for root, dirs, files in os.walk(os.path.join(self.cas.casdir, 'objects')):
    
    127
    +            for name in files:
    
    128
    +                fullname = os.path.join(root, name)
    
    129
    +                st = os.stat(fullname)
    
    130
    +                mtime = st.st_mtime - 6 * 60 * 60
    
    131
    +                atime = st.st_atime - 6 * 60 * 60
    
    132
    +                os.utime(fullname, times=(atime, mtime))
    
    133
    +
    
    125 134
         # close():
    
    126 135
         #
    
    127 136
         # Remove the artifact share.
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]