[Notes] [Git][BuildStream/buildstream][valentindavid/cache_server_fill_up-1.2] 3 commits: Use fallocate instead of checking for disk space for every write



Title: GitLab

Valentin David pushed to branch valentindavid/cache_server_fill_up-1.2 at BuildStream / buildstream

Commits:

2 changed files:

Changes:

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -394,14 +394,14 @@ class CASCache(ArtifactCache):
    394 394
         #     digest (Digest): An optional Digest object to populate
    
    395 395
         #     path (str): Path to file to add
    
    396 396
         #     buffer (bytes): Byte buffer to add
    
    397
    -    #     link_file (bool): Whether file given by path can be linked
    
    397
    +    #     link_directly (bool): Whether file given by path can be linked
    
    398 398
         #
    
    399 399
         # Returns:
    
    400 400
         #     (Digest): The digest of the added object
    
    401 401
         #
    
    402 402
         # Either `path` or `buffer` must be passed, but not both.
    
    403 403
         #
    
    404
    -    def add_object(self, *, digest=None, path=None, buffer=None, link_file=False):
    
    404
    +    def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False):
    
    405 405
             # Exactly one of the two parameters has to be specified
    
    406 406
             assert (path is None) != (buffer is None)
    
    407 407
     
    
    ... ... @@ -412,7 +412,7 @@ class CASCache(ArtifactCache):
    412 412
                 h = hashlib.sha256()
    
    413 413
                 # Always write out new file to avoid corruption if input file is modified
    
    414 414
                 with contextlib.ExitStack() as stack:
    
    415
    -                if link_file:
    
    415
    +                if path is not None and link_directly:
    
    416 416
                         tmp = stack.enter_context(open(path, 'rb'))
    
    417 417
                         for chunk in iter(lambda: tmp.read(4096), b""):
    
    418 418
                             h.update(chunk)
    
    ... ... @@ -574,8 +574,9 @@ class CASCache(ArtifactCache):
    574 574
         # Prune unreachable objects from the repo.
    
    575 575
         #
    
    576 576
         # Args:
    
    577
    -    #    keep_after (int): timestamp after which unreachable objects are kept.
    
    578
    -    #                      None if no unreachable object should be kept.
    
    577
    +    #    keep_after (int|None): timestamp after which unreachable objects
    
    578
    +    #                           are kept. None if no unreachable object
    
    579
    +    #                           should be kept.
    
    579 580
         #
    
    580 581
         def prune(self, keep_after=None):
    
    581 582
             ref_heads = os.path.join(self.casdir, 'refs', 'heads')
    

  • buildstream/_artifactcache/casserver.py
    ... ... @@ -26,6 +26,7 @@ import tempfile
    26 26
     import uuid
    
    27 27
     import time
    
    28 28
     import errno
    
    29
    +import ctypes
    
    29 30
     
    
    30 31
     import click
    
    31 32
     import grpc
    
    ... ... @@ -136,11 +137,43 @@ def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
    136 137
             server.stop(0)
    
    137 138
     
    
    138 139
     
    
    140
    +class _FallocateCall:
    
    141
    +
    
    142
    +    FALLOC_FL_KEEP_SIZE = 1
    
    143
    +    FALLOC_FL_PUNCH_HOLE = 2
    
    144
    +    FALLOC_FL_NO_HIDE_STALE = 4
    
    145
    +    FALLOC_FL_COLLAPSE_RANGE = 8
    
    146
    +    FALLOC_FL_ZERO_RANGE = 16
    
    147
    +    FALLOC_FL_INSERT_RANGE = 32
    
    148
    +    FALLOC_FL_UNSHARE_RANGE = 64
    
    149
    +
    
    150
    +    def __init__(self):
    
    151
    +        self.libc = ctypes.CDLL("libc.so.6", use_errno=True)
    
    152
    +        try:
    
    153
    +            self.fallocate64 = self.libc.fallocate64
    
    154
    +        except AttributeError:
    
    155
    +            self.fallocate = self.libc.fallocate
    
    156
    +
    
    157
    +    def __call__(self, fd, mode, offset, length):
    
    158
    +        if hasattr(self, 'fallocate64'):
    
    159
    +            print(fd, mode, offset, length)
    
    160
    +            ret = self.fallocate64(ctypes.c_int(fd), ctypes.c_int(mode),
    
    161
    +                                   ctypes.c_int64(offset), ctypes.c_int64(length))
    
    162
    +        else:
    
    163
    +            ret = self.fallocate(ctypes.c_int(fd), ctypes.c_int(mode),
    
    164
    +                                 ctypes.c_int(offset), ctypes.c_int(length))
    
    165
    +        if ret == -1:
    
    166
    +            errno = ctypes.get_errno()
    
    167
    +            raise OSError(errno, os.strerror(errno))
    
    168
    +        return ret
    
    169
    +
    
    170
    +
    
    139 171
     class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    
    140 172
         def __init__(self, cas, *, enable_push):
    
    141 173
             super().__init__()
    
    142 174
             self.cas = cas
    
    143 175
             self.enable_push = enable_push
    
    176
    +        self.fallocate = _FallocateCall()
    
    144 177
     
    
    145 178
         def Read(self, request, context):
    
    146 179
             resource_name = request.resource_name
    
    ... ... @@ -197,36 +230,45 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    197 230
                         if client_digest is None:
    
    198 231
                             context.set_code(grpc.StatusCode.NOT_FOUND)
    
    199 232
                             return response
    
    233
    +
    
    234
    +                    while True:
    
    235
    +                        if client_digest.size_bytes == 0:
    
    236
    +                            break
    
    237
    +                        try:
    
    238
    +                            _clean_up_cache(self.cas, client_digest.size_bytes)
    
    239
    +                        except ArtifactTooLargeException as e:
    
    240
    +                            context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
    
    241
    +                            context.set_details(str(e))
    
    242
    +                            return response
    
    243
    +
    
    244
    +                        try:
    
    245
    +                            self.fallocate(out.fileno(), 0, 0, client_digest.size_bytes)
    
    246
    +                            break
    
    247
    +                        except OSError as e:
    
    248
    +                            # Multiple upload can happen in the same time
    
    249
    +                            if e.errno != errno.ENOSPC:
    
    250
    +                                raise
    
    251
    +
    
    200 252
                     elif request.resource_name:
    
    201 253
                         # If it is set on subsequent calls, it **must** match the value of the first request.
    
    202 254
                         if request.resource_name != resource_name:
    
    203 255
                             context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    204 256
                             return response
    
    205 257
     
    
    206
    -                while True:
    
    207
    -                    try:
    
    208
    -                        _clean_up_cache(self.cas, client_digest.size_bytes - offset)
    
    209
    -                    except ArtifactTooLargeException as e:
    
    210
    -                        context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
    
    211
    -                        context.set_details(str(e))
    
    212
    -                        return response
    
    213
    -                    try:
    
    214
    -                        out.write(request.data)
    
    215
    -                        break
    
    216
    -                    except OSError as e:
    
    217
    -                        # Multiple upload can happen in the same time
    
    218
    -                        if e.errno == errno.ENOSPC:
    
    219
    -                            continue
    
    220
    -                        else:
    
    221
    -                            raise
    
    258
    +                if (offset + len(request.data)) > client_digest.size_bytes:
    
    259
    +                    context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    260
    +                    return response
    
    261
    +
    
    262
    +                out.write(request.data)
    
    222 263
     
    
    223 264
                     offset += len(request.data)
    
    265
    +
    
    224 266
                     if request.finish_write:
    
    225 267
                         if client_digest.size_bytes != offset:
    
    226 268
                             context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    227 269
                             return response
    
    228 270
                         out.flush()
    
    229
    -                    digest = self.cas.add_object(path=out.name, link_file=True)
    
    271
    +                    digest = self.cas.add_object(path=out.name, link_directly=True)
    
    230 272
                         if digest.hash != client_digest.hash:
    
    231 273
                             context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    232 274
                             return response
    
    ... ... @@ -247,12 +289,16 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
    247 289
             response = remote_execution_pb2.FindMissingBlobsResponse()
    
    248 290
             for digest in request.blob_digests:
    
    249 291
                 objpath = self.cas.objpath(digest)
    
    250
    -            if not os.path.exists(objpath):
    
    251
    -                d = response.missing_blob_digests.add()
    
    252
    -                d.hash = digest.hash
    
    253
    -                d.size_bytes = digest.size_bytes
    
    254
    -            else:
    
    292
    +            try:
    
    255 293
                     os.utime(objpath)
    
    294
    +            except OSError as e:
    
    295
    +                if e.errno != errno.ENOENT:
    
    296
    +                    raise
    
    297
    +                else:
    
    298
    +                    d = response.missing_blob_digests.add()
    
    299
    +                    d.hash = digest.hash
    
    300
    +                    d.size_bytes = digest.size_bytes
    
    301
    +
    
    256 302
             return response
    
    257 303
     
    
    258 304
         def BatchReadBlobs(self, request, context):
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]