[Notes] [Git][BuildStream/buildstream][valentindavid/cache_server_fill_up] 3 commits: Use fallocate instead of checking for disk space for every write



Title: GitLab

Valentin David pushed to branch valentindavid/cache_server_fill_up at BuildStream / buildstream

Commits:

2 changed files:

Changes:

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -446,14 +446,14 @@ class CASCache(ArtifactCache):
    446 446
         #     digest (Digest): An optional Digest object to populate
    
    447 447
         #     path (str): Path to file to add
    
    448 448
         #     buffer (bytes): Byte buffer to add
    
    449
    -    #     link_file (bool): Whether file given by path can be linked
    
    449
    +    #     link_directly (bool): Whether file given by path can be linked
    
    450 450
         #
    
    451 451
         # Returns:
    
    452 452
         #     (Digest): The digest of the added object
    
    453 453
         #
    
    454 454
         # Either `path` or `buffer` must be passed, but not both.
    
    455 455
         #
    
    456
    -    def add_object(self, *, digest=None, path=None, buffer=None, link_file=False):
    
    456
    +    def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False):
    
    457 457
             # Exactly one of the two parameters has to be specified
    
    458 458
             assert (path is None) != (buffer is None)
    
    459 459
     
    
    ... ... @@ -464,7 +464,7 @@ class CASCache(ArtifactCache):
    464 464
                 h = hashlib.sha256()
    
    465 465
                 # Always write out new file to avoid corruption if input file is modified
    
    466 466
                 with contextlib.ExitStack() as stack:
    
    467
    -                if link_file:
    
    467
    +                if path is not None and link_directly:
    
    468 468
                         tmp = stack.enter_context(open(path, 'rb'))
    
    469 469
                         for chunk in iter(lambda: tmp.read(4096), b""):
    
    470 470
                             h.update(chunk)
    
    ... ... @@ -626,8 +626,9 @@ class CASCache(ArtifactCache):
    626 626
         # Prune unreachable objects from the repo.
    
    627 627
         #
    
    628 628
         # Args:
    
    629
    -    #    keep_after (int): timestamp after which unreachable objects are kept.
    
    630
    -    #                      None if no unreachable object should be kept.
    
    629
    +    #    keep_after (int|None): timestamp after which unreachable objects
    
    630
    +    #                           are kept. None if no unreachable object
    
    631
    +    #                           should be kept.
    
    631 632
         #
    
    632 633
         def prune(self, keep_after=None):
    
    633 634
             ref_heads = os.path.join(self.casdir, 'refs', 'heads')
    
    ... ... @@ -895,7 +896,7 @@ class CASCache(ArtifactCache):
    895 896
             with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
    
    896 897
                 self._fetch_blob(remote, digest, f)
    
    897 898
     
    
    898
    -            added_digest = self.add_object(path=f.name)
    
    899
    +            added_digest = self.add_object(path=f.name, link_directly=True)
    
    899 900
                 assert added_digest.hash == digest.hash
    
    900 901
     
    
    901 902
             return objpath
    
    ... ... @@ -906,7 +907,7 @@ class CASCache(ArtifactCache):
    906 907
                     f.write(data)
    
    907 908
                     f.flush()
    
    908 909
     
    
    909
    -                added_digest = self.add_object(path=f.name)
    
    910
    +                added_digest = self.add_object(path=f.name, link_directly=True)
    
    910 911
                     assert added_digest.hash == digest.hash
    
    911 912
     
    
    912 913
         # Helper function for _fetch_directory().
    

  • buildstream/_artifactcache/casserver.py
    ... ... @@ -26,6 +26,7 @@ import tempfile
    26 26
     import uuid
    
    27 27
     import time
    
    28 28
     import errno
    
    29
    +import ctypes
    
    29 30
     
    
    30 31
     import click
    
    31 32
     import grpc
    
    ... ... @@ -137,11 +138,43 @@ def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
    137 138
             server.stop(0)
    
    138 139
     
    
    139 140
     
    
    141
    +class _FallocateCall:
    
    142
    +
    
    143
    +    FALLOC_FL_KEEP_SIZE = 1
    
    144
    +    FALLOC_FL_PUNCH_HOLE = 2
    
    145
    +    FALLOC_FL_NO_HIDE_STALE = 4
    
    146
    +    FALLOC_FL_COLLAPSE_RANGE = 8
    
    147
    +    FALLOC_FL_ZERO_RANGE = 16
    
    148
    +    FALLOC_FL_INSERT_RANGE = 32
    
    149
    +    FALLOC_FL_UNSHARE_RANGE = 64
    
    150
    +
    
    151
    +    def __init__(self):
    
    152
    +        self.libc = ctypes.CDLL("libc.so.6", use_errno=True)
    
    153
    +        try:
    
    154
    +            self.fallocate64 = self.libc.fallocate64
    
    155
    +        except AttributeError:
    
    156
    +            self.fallocate = self.libc.fallocate
    
    157
    +
    
    158
    +    def __call__(self, fd, mode, offset, length):
    
    159
    +        if hasattr(self, 'fallocate64'):
    
    160
    +            print(fd, mode, offset, length)
    
    161
    +            ret = self.fallocate64(ctypes.c_int(fd), ctypes.c_int(mode),
    
    162
    +                                   ctypes.c_int64(offset), ctypes.c_int64(length))
    
    163
    +        else:
    
    164
    +            ret = self.fallocate(ctypes.c_int(fd), ctypes.c_int(mode),
    
    165
    +                                 ctypes.c_int(offset), ctypes.c_int(length))
    
    166
    +        if ret == -1:
    
    167
    +            errno = ctypes.get_errno()
    
    168
    +            raise OSError(errno, os.strerror(errno))
    
    169
    +        return ret
    
    170
    +
    
    171
    +
    
    140 172
     class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    
    141 173
         def __init__(self, cas, *, enable_push):
    
    142 174
             super().__init__()
    
    143 175
             self.cas = cas
    
    144 176
             self.enable_push = enable_push
    
    177
    +        self.fallocate = _FallocateCall()
    
    145 178
     
    
    146 179
         def Read(self, request, context):
    
    147 180
             resource_name = request.resource_name
    
    ... ... @@ -198,36 +231,45 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    198 231
                         if client_digest is None:
    
    199 232
                             context.set_code(grpc.StatusCode.NOT_FOUND)
    
    200 233
                             return response
    
    234
    +
    
    235
    +                    while True:
    
    236
    +                        if client_digest.size_bytes == 0:
    
    237
    +                            break
    
    238
    +                        try:
    
    239
    +                            _clean_up_cache(self.cas, client_digest.size_bytes)
    
    240
    +                        except ArtifactTooLargeException as e:
    
    241
    +                            context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
    
    242
    +                            context.set_details(str(e))
    
    243
    +                            return response
    
    244
    +
    
    245
    +                        try:
    
    246
    +                            self.fallocate(out.fileno(), 0, 0, client_digest.size_bytes)
    
    247
    +                            break
    
    248
    +                        except OSError as e:
    
    249
    +                            # Multiple upload can happen in the same time
    
    250
    +                            if e.errno != errno.ENOSPC:
    
    251
    +                                raise
    
    252
    +
    
    201 253
                     elif request.resource_name:
    
    202 254
                         # If it is set on subsequent calls, it **must** match the value of the first request.
    
    203 255
                         if request.resource_name != resource_name:
    
    204 256
                             context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    205 257
                             return response
    
    206 258
     
    
    207
    -                while True:
    
    208
    -                    try:
    
    209
    -                        _clean_up_cache(self.cas, client_digest.size_bytes - offset)
    
    210
    -                    except ArtifactTooLargeException as e:
    
    211
    -                        context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
    
    212
    -                        context.set_details(str(e))
    
    213
    -                        return response
    
    214
    -                    try:
    
    215
    -                        out.write(request.data)
    
    216
    -                        break
    
    217
    -                    except OSError as e:
    
    218
    -                        # Multiple upload can happen in the same time
    
    219
    -                        if e.errno == errno.ENOSPC:
    
    220
    -                            continue
    
    221
    -                        else:
    
    222
    -                            raise
    
    259
    +                if (offset + len(request.data)) > client_digest.size_bytes:
    
    260
    +                    context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    261
    +                    return response
    
    262
    +
    
    263
    +                out.write(request.data)
    
    223 264
     
    
    224 265
                     offset += len(request.data)
    
    266
    +
    
    225 267
                     if request.finish_write:
    
    226 268
                         if client_digest.size_bytes != offset:
    
    227 269
                             context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    228 270
                             return response
    
    229 271
                         out.flush()
    
    230
    -                    digest = self.cas.add_object(path=out.name, link_file=True)
    
    272
    +                    digest = self.cas.add_object(path=out.name, link_directly=True)
    
    231 273
                         if digest.hash != client_digest.hash:
    
    232 274
                             context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    233 275
                             return response
    
    ... ... @@ -248,12 +290,16 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
    248 290
             response = remote_execution_pb2.FindMissingBlobsResponse()
    
    249 291
             for digest in request.blob_digests:
    
    250 292
                 objpath = self.cas.objpath(digest)
    
    251
    -            if not os.path.exists(objpath):
    
    252
    -                d = response.missing_blob_digests.add()
    
    253
    -                d.hash = digest.hash
    
    254
    -                d.size_bytes = digest.size_bytes
    
    255
    -            else:
    
    293
    +            try:
    
    256 294
                     os.utime(objpath)
    
    295
    +            except OSError as e:
    
    296
    +                if e.errno != errno.ENOENT:
    
    297
    +                    raise
    
    298
    +                else:
    
    299
    +                    d = response.missing_blob_digests.add()
    
    300
    +                    d.hash = digest.hash
    
    301
    +                    d.size_bytes = digest.size_bytes
    
    302
    +
    
    257 303
             return response
    
    258 304
     
    
    259 305
         def BatchReadBlobs(self, request, context):
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]