[Notes] [Git][BuildStream/buildstream][Qinusty/531-fetch-retries-on-terminate] 6 commits: _artifactcache/casserver.py: Fix resource_name format for blobs



Title: GitLab

Qinusty pushed to branch Qinusty/531-fetch-retries-on-terminate at BuildStream / buildstream

Commits:

6 changed files:

Changes:

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -24,6 +24,7 @@ import os
    24 24
     import signal
    
    25 25
     import stat
    
    26 26
     import tempfile
    
    27
    +import uuid
    
    27 28
     from urllib.parse import urlparse
    
    28 29
     
    
    29 30
     import grpc
    
    ... ... @@ -309,8 +310,11 @@ class CASCache(ArtifactCache):
    309 310
                         # Upload any blobs missing on the server
    
    310 311
                         skipped_remote = False
    
    311 312
                         for digest in missing_blobs.values():
    
    313
    +                        uuid_ = uuid.uuid4()
    
    314
    +                        resource_name = '/'.join(['uploads', str(uuid_), 'blobs',
    
    315
    +                                                  digest.hash, str(digest.size_bytes)])
    
    316
    +
    
    312 317
                             def request_stream():
    
    313
    -                            resource_name = os.path.join(digest.hash, str(digest.size_bytes))
    
    314 318
                                 with open(self.objpath(digest), 'rb') as f:
    
    315 319
                                     assert os.fstat(f.fileno()).st_size == digest.size_bytes
    
    316 320
                                     offset = 0
    
    ... ... @@ -747,7 +751,7 @@ class CASCache(ArtifactCache):
    747 751
                 yield from self._required_blobs(dirnode.digest)
    
    748 752
     
    
    749 753
         def _fetch_blob(self, remote, digest, out):
    
    750
    -        resource_name = os.path.join(digest.hash, str(digest.size_bytes))
    
    754
    +        resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
    
    751 755
             request = bytestream_pb2.ReadRequest()
    
    752 756
             request.resource_name = resource_name
    
    753 757
             request.read_offset = 0
    

  • buildstream/_artifactcache/casserver.py
    ... ... @@ -23,6 +23,7 @@ import os
    23 23
     import signal
    
    24 24
     import sys
    
    25 25
     import tempfile
    
    26
    +import uuid
    
    26 27
     
    
    27 28
     import click
    
    28 29
     import grpc
    
    ... ... @@ -130,12 +131,21 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    130 131
     
    
    131 132
         def Read(self, request, context):
    
    132 133
             resource_name = request.resource_name
    
    133
    -        client_digest = _digest_from_resource_name(resource_name)
    
    134
    -        assert request.read_offset <= client_digest.size_bytes
    
    134
    +        client_digest = _digest_from_download_resource_name(resource_name)
    
    135
    +        if client_digest is None:
    
    136
    +            context.set_code(grpc.StatusCode.NOT_FOUND)
    
    137
    +            return
    
    138
    +
    
    139
    +        if request.read_offset > client_digest.size_bytes:
    
    140
    +            context.set_code(grpc.StatusCode.OUT_OF_RANGE)
    
    141
    +            return
    
    135 142
     
    
    136 143
             try:
    
    137 144
                 with open(self.cas.objpath(client_digest), 'rb') as f:
    
    138
    -                assert os.fstat(f.fileno()).st_size == client_digest.size_bytes
    
    145
    +                if os.fstat(f.fileno()).st_size != client_digest.size_bytes:
    
    146
    +                    context.set_code(grpc.StatusCode.NOT_FOUND)
    
    147
    +                    return
    
    148
    +
    
    139 149
                     if request.read_offset > 0:
    
    140 150
                         f.seek(request.read_offset)
    
    141 151
     
    
    ... ... @@ -163,12 +173,18 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    163 173
             resource_name = None
    
    164 174
             with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
    
    165 175
                 for request in request_iterator:
    
    166
    -                assert not finished
    
    167
    -                assert request.write_offset == offset
    
    176
    +                if finished or request.write_offset != offset:
    
    177
    +                    context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    178
    +                    return response
    
    179
    +
    
    168 180
                     if resource_name is None:
    
    169 181
                         # First request
    
    170 182
                         resource_name = request.resource_name
    
    171
    -                    client_digest = _digest_from_resource_name(resource_name)
    
    183
    +                    client_digest = _digest_from_upload_resource_name(resource_name)
    
    184
    +                    if client_digest is None:
    
    185
    +                        context.set_code(grpc.StatusCode.NOT_FOUND)
    
    186
    +                        return response
    
    187
    +
    
    172 188
                         try:
    
    173 189
                             _clean_up_cache(self.cas, client_digest.size_bytes)
    
    174 190
                         except ArtifactTooLargeException as e:
    
    ... ... @@ -177,14 +193,20 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    177 193
                             return response
    
    178 194
                     elif request.resource_name:
    
    179 195
                         # If it is set on subsequent calls, it **must** match the value of the first request.
    
    180
    -                    assert request.resource_name == resource_name
    
    196
    +                    if request.resource_name != resource_name:
    
    197
    +                        context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    198
    +                        return response
    
    181 199
                     out.write(request.data)
    
    182 200
                     offset += len(request.data)
    
    183 201
                     if request.finish_write:
    
    184
    -                    assert client_digest.size_bytes == offset
    
    202
    +                    if client_digest.size_bytes != offset:
    
    203
    +                        context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    204
    +                        return response
    
    185 205
                         out.flush()
    
    186 206
                         digest = self.cas.add_object(path=out.name)
    
    187
    -                    assert digest.hash == client_digest.hash
    
    207
    +                    if digest.hash != client_digest.hash:
    
    208
    +                        context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    209
    +                        return response
    
    188 210
                         finished = True
    
    189 211
     
    
    190 212
             assert finished
    
    ... ... @@ -247,13 +269,48 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
    247 269
             return response
    
    248 270
     
    
    249 271
     
    
    250
    -def _digest_from_resource_name(resource_name):
    
    272
    +def _digest_from_download_resource_name(resource_name):
    
    273
    +    parts = resource_name.split('/')
    
    274
    +
    
    275
    +    # Accept requests from non-conforming BuildStream 1.1.x clients
    
    276
    +    if len(parts) == 2:
    
    277
    +        parts.insert(0, 'blobs')
    
    278
    +
    
    279
    +    if len(parts) != 3 or parts[0] != 'blobs':
    
    280
    +        return None
    
    281
    +
    
    282
    +    try:
    
    283
    +        digest = remote_execution_pb2.Digest()
    
    284
    +        digest.hash = parts[1]
    
    285
    +        digest.size_bytes = int(parts[2])
    
    286
    +        return digest
    
    287
    +    except ValueError:
    
    288
    +        return None
    
    289
    +
    
    290
    +
    
    291
    +def _digest_from_upload_resource_name(resource_name):
    
    251 292
         parts = resource_name.split('/')
    
    252
    -    assert len(parts) == 2
    
    253
    -    digest = remote_execution_pb2.Digest()
    
    254
    -    digest.hash = parts[0]
    
    255
    -    digest.size_bytes = int(parts[1])
    
    256
    -    return digest
    
    293
    +
    
    294
    +    # Accept requests from non-conforming BuildStream 1.1.x clients
    
    295
    +    if len(parts) == 2:
    
    296
    +        parts.insert(0, 'uploads')
    
    297
    +        parts.insert(1, str(uuid.uuid4()))
    
    298
    +        parts.insert(2, 'blobs')
    
    299
    +
    
    300
    +    if len(parts) < 5 or parts[0] != 'uploads' or parts[2] != 'blobs':
    
    301
    +        return None
    
    302
    +
    
    303
    +    try:
    
    304
    +        uuid_ = uuid.UUID(hex=parts[1])
    
    305
    +        if uuid_.version != 4:
    
    306
    +            return None
    
    307
    +
    
    308
    +        digest = remote_execution_pb2.Digest()
    
    309
    +        digest.hash = parts[3]
    
    310
    +        digest.size_bytes = int(parts[4])
    
    311
    +        return digest
    
    312
    +    except ValueError:
    
    313
    +        return None
    
    257 314
     
    
    258 315
     
    
    259 316
     def _has_object(cas, digest):
    

  • buildstream/_scheduler/jobs/job.py
    ... ... @@ -250,7 +250,7 @@ class Job():
    250 250
         #
    
    251 251
         def resume(self, silent=False):
    
    252 252
             if self._suspended:
    
    253
    -            if not silent:
    
    253
    +            if not silent and not self._scheduler.terminated:
    
    254 254
                     self.message(MessageType.STATUS,
    
    255 255
                                  "{} resuming".format(self.action_name))
    
    256 256
     
    
    ... ... @@ -549,7 +549,7 @@ class Job():
    549 549
             #
    
    550 550
             self._retry_flag = returncode not in (RC_OK, RC_PERM_FAIL)
    
    551 551
     
    
    552
    -        if self._retry_flag and (self._tries <= self._max_retries):
    
    552
    +        if self._retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated:
    
    553 553
                 self.spawn()
    
    554 554
                 return
    
    555 555
     
    

  • tests/frontend/interruptions.py
    1
    +import pytest
    
    2
    +import os
    
    3
    +import signal
    
    4
    +import re
    
    5
    +import time
    
    6
    +import multiprocessing as mp
    
    7
    +from multiprocessing import Process
    
    8
    +from multiprocessing.queues import Queue
    
    9
    +
    
    10
    +from buildstream import _yaml
    
    11
    +from tests.testutils import cli
    
    12
    +
    
    13
    +
    
    14
    +DATA_DIR = os.path.join(
    
    15
    +    os.path.dirname(os.path.realpath(__file__)),
    
    16
    +    "interruptions",
    
    17
    +)
    
    18
    +
    
    19
    +
    
    20
    +def cli_run_in_process(cli, path, args):
    
    21
    +    def do_run(cli, path, args, queue):
    
    22
    +        result = cli.run(project=path, args=args)
    
    23
    +        queue.put(result.output)
    
    24
    +        queue.put(result.stderr)
    
    25
    +
    
    26
    +    queue = mp.Queue()
    
    27
    +    p = mp.Process(target=do_run, args=[cli, path, args, queue])
    
    28
    +    p.start()
    
    29
    +    return queue, p
    
    30
    +
    
    31
    +
    
    32
    +@pytest.mark.datafiles(DATA_DIR)
    
    33
    +def test_interrupt_fetch(cli, datafiles):
    
    34
    +    path = os.path.join(datafiles.dirname, datafiles.basename)
    
    35
    +
    
    36
    +    queue, proc = cli_run_in_process(cli, path, args=['--on-error', 'terminate',
    
    37
    +                                                      'fetch', 'remote.bst'])
    
    38
    +    time.sleep(1)
    
    39
    +    os.kill(proc.pid, signal.SIGINT)
    
    40
    +
    
    41
    +    # 5 second timeout
    
    42
    +    try:
    
    43
    +        output = queue.get(timeout=3)
    
    44
    +        stderr = queue.get(timeout=3)
    
    45
    +    except mp.queues.Empty:
    
    46
    +        assert False, 'Fetch failed to terminate'
    
    47
    +    assert output is not None
    
    48
    +
    
    49
    +    matches = re.findall('FAILURE Fetch', stderr)
    
    50
    +    assert len(matches), "Unexpected success"
    
    51
    +
    
    52
    +    matches = re.findall(r'STATUS\s*Fetch terminating', stderr)
    
    53
    +    assert len(matches) != 0, "Fetch failed to terminate"
    
    54
    +    assert len(matches) == 1, "Fetch attempted to terminate more than once"

  • tests/frontend/interruptions/elements/remote.bst
    1
    +kind: import
    
    2
    +
    
    3
    +sources:
    
    4
    +  - kind: git
    
    5
    +    track: master
    
    6
    +    ref: 03527d5afd967a94f95f2ee8f035236b387dd335
    
    7
    +    url: https://gitlab.com/BuildStream/buildstream.git
    \ No newline at end of file

  • tests/frontend/interruptions/project.conf
    1
    +name: interruptions
    
    2
    +element-path: elements
    \ No newline at end of file



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]