[Notes] [Git][BuildStream/buildstream][master] 4 commits: _artifactcache/casserver.py: Fix resource_name format for blobs



Title: GitLab

Jürg Billeter pushed to branch master at BuildStream / buildstream

Commits:

2 changed files:

Changes:

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -24,6 +24,7 @@ import os
    24 24
     import signal
    
    25 25
     import stat
    
    26 26
     import tempfile
    
    27
    +import uuid
    
    27 28
     from urllib.parse import urlparse
    
    28 29
     
    
    29 30
     import grpc
    
    ... ... @@ -309,8 +310,11 @@ class CASCache(ArtifactCache):
    309 310
                         # Upload any blobs missing on the server
    
    310 311
                         skipped_remote = False
    
    311 312
                         for digest in missing_blobs.values():
    
    313
    +                        uuid_ = uuid.uuid4()
    
    314
    +                        resource_name = '/'.join(['uploads', str(uuid_), 'blobs',
    
    315
    +                                                  digest.hash, str(digest.size_bytes)])
    
    316
    +
    
    312 317
                             def request_stream():
    
    313
    -                            resource_name = os.path.join(digest.hash, str(digest.size_bytes))
    
    314 318
                                 with open(self.objpath(digest), 'rb') as f:
    
    315 319
                                     assert os.fstat(f.fileno()).st_size == digest.size_bytes
    
    316 320
                                     offset = 0
    
    ... ... @@ -747,7 +751,7 @@ class CASCache(ArtifactCache):
    747 751
                 yield from self._required_blobs(dirnode.digest)
    
    748 752
     
    
    749 753
         def _fetch_blob(self, remote, digest, out):
    
    750
    -        resource_name = os.path.join(digest.hash, str(digest.size_bytes))
    
    754
    +        resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
    
    751 755
             request = bytestream_pb2.ReadRequest()
    
    752 756
             request.resource_name = resource_name
    
    753 757
             request.read_offset = 0
    

  • buildstream/_artifactcache/casserver.py
    ... ... @@ -23,6 +23,7 @@ import os
    23 23
     import signal
    
    24 24
     import sys
    
    25 25
     import tempfile
    
    26
    +import uuid
    
    26 27
     
    
    27 28
     import click
    
    28 29
     import grpc
    
    ... ... @@ -130,12 +131,21 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    130 131
     
    
    131 132
         def Read(self, request, context):
    
    132 133
             resource_name = request.resource_name
    
    133
    -        client_digest = _digest_from_resource_name(resource_name)
    
    134
    -        assert request.read_offset <= client_digest.size_bytes
    
    134
    +        client_digest = _digest_from_download_resource_name(resource_name)
    
    135
    +        if client_digest is None:
    
    136
    +            context.set_code(grpc.StatusCode.NOT_FOUND)
    
    137
    +            return
    
    138
    +
    
    139
    +        if request.read_offset > client_digest.size_bytes:
    
    140
    +            context.set_code(grpc.StatusCode.OUT_OF_RANGE)
    
    141
    +            return
    
    135 142
     
    
    136 143
             try:
    
    137 144
                 with open(self.cas.objpath(client_digest), 'rb') as f:
    
    138
    -                assert os.fstat(f.fileno()).st_size == client_digest.size_bytes
    
    145
    +                if os.fstat(f.fileno()).st_size != client_digest.size_bytes:
    
    146
    +                    context.set_code(grpc.StatusCode.NOT_FOUND)
    
    147
    +                    return
    
    148
    +
    
    139 149
                     if request.read_offset > 0:
    
    140 150
                         f.seek(request.read_offset)
    
    141 151
     
    
    ... ... @@ -163,12 +173,18 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    163 173
             resource_name = None
    
    164 174
             with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
    
    165 175
                 for request in request_iterator:
    
    166
    -                assert not finished
    
    167
    -                assert request.write_offset == offset
    
    176
    +                if finished or request.write_offset != offset:
    
    177
    +                    context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    178
    +                    return response
    
    179
    +
    
    168 180
                     if resource_name is None:
    
    169 181
                         # First request
    
    170 182
                         resource_name = request.resource_name
    
    171
    -                    client_digest = _digest_from_resource_name(resource_name)
    
    183
    +                    client_digest = _digest_from_upload_resource_name(resource_name)
    
    184
    +                    if client_digest is None:
    
    185
    +                        context.set_code(grpc.StatusCode.NOT_FOUND)
    
    186
    +                        return response
    
    187
    +
    
    172 188
                         try:
    
    173 189
                             _clean_up_cache(self.cas, client_digest.size_bytes)
    
    174 190
                         except ArtifactTooLargeException as e:
    
    ... ... @@ -177,14 +193,20 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    177 193
                             return response
    
    178 194
                     elif request.resource_name:
    
    179 195
                         # If it is set on subsequent calls, it **must** match the value of the first request.
    
    180
    -                    assert request.resource_name == resource_name
    
    196
    +                    if request.resource_name != resource_name:
    
    197
    +                        context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    198
    +                        return response
    
    181 199
                     out.write(request.data)
    
    182 200
                     offset += len(request.data)
    
    183 201
                     if request.finish_write:
    
    184
    -                    assert client_digest.size_bytes == offset
    
    202
    +                    if client_digest.size_bytes != offset:
    
    203
    +                        context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    204
    +                        return response
    
    185 205
                         out.flush()
    
    186 206
                         digest = self.cas.add_object(path=out.name)
    
    187
    -                    assert digest.hash == client_digest.hash
    
    207
    +                    if digest.hash != client_digest.hash:
    
    208
    +                        context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    209
    +                        return response
    
    188 210
                         finished = True
    
    189 211
     
    
    190 212
             assert finished
    
    ... ... @@ -247,13 +269,48 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
    247 269
             return response
    
    248 270
     
    
    249 271
     
    
    250
    -def _digest_from_resource_name(resource_name):
    
    272
    +def _digest_from_download_resource_name(resource_name):
    
    273
    +    parts = resource_name.split('/')
    
    274
    +
    
    275
    +    # Accept requests from non-conforming BuildStream 1.1.x clients
    
    276
    +    if len(parts) == 2:
    
    277
    +        parts.insert(0, 'blobs')
    
    278
    +
    
    279
    +    if len(parts) != 3 or parts[0] != 'blobs':
    
    280
    +        return None
    
    281
    +
    
    282
    +    try:
    
    283
    +        digest = remote_execution_pb2.Digest()
    
    284
    +        digest.hash = parts[1]
    
    285
    +        digest.size_bytes = int(parts[2])
    
    286
    +        return digest
    
    287
    +    except ValueError:
    
    288
    +        return None
    
    289
    +
    
    290
    +
    
    291
    +def _digest_from_upload_resource_name(resource_name):
    
    251 292
         parts = resource_name.split('/')
    
    252
    -    assert len(parts) == 2
    
    253
    -    digest = remote_execution_pb2.Digest()
    
    254
    -    digest.hash = parts[0]
    
    255
    -    digest.size_bytes = int(parts[1])
    
    256
    -    return digest
    
    293
    +
    
    294
    +    # Accept requests from non-conforming BuildStream 1.1.x clients
    
    295
    +    if len(parts) == 2:
    
    296
    +        parts.insert(0, 'uploads')
    
    297
    +        parts.insert(1, str(uuid.uuid4()))
    
    298
    +        parts.insert(2, 'blobs')
    
    299
    +
    
    300
    +    if len(parts) < 5 or parts[0] != 'uploads' or parts[2] != 'blobs':
    
    301
    +        return None
    
    302
    +
    
    303
    +    try:
    
    304
    +        uuid_ = uuid.UUID(hex=parts[1])
    
    305
    +        if uuid_.version != 4:
    
    306
    +            return None
    
    307
    +
    
    308
    +        digest = remote_execution_pb2.Digest()
    
    309
    +        digest.hash = parts[3]
    
    310
    +        digest.size_bytes = int(parts[4])
    
    311
    +        return digest
    
    312
    +    except ValueError:
    
    313
    +        return None
    
    257 314
     
    
    258 315
     
    
    259 316
     def _has_object(cas, digest):
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]