[Notes] [Git][BuildGrid/buildgrid][finn/separate-services] 3 commits: Adding `remote` implementation of storage_abc



Title: GitLab

finn pushed to branch finn/separate-services at BuildGrid / buildgrid

Commits:

6 changed files:

Changes:

  • buildgrid/server/cas/instance.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +"""
    
    17
    +Storage Instances
    
    18
    +=========
    
    19
    +Instances of CAS and ByteStream
    
    20
    +"""
    
    21
    +
    
    22
    +from buildgrid._protos.google.bytestream import bytestream_pb2
    
    23
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
    
    24
    +
    
    25
    +from .._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
    
    26
    +from ...settings import HASH
    
    27
    +
    
    28
    +
    
    29
    +class ContentAddressableStorageInstance:
    
    30
    +
    
    31
    +    def __init__(self, storage):
    
    32
    +        self._storage = storage
    
    33
    +
    
    34
    +    def find_missing_blobs(self, blob_digests):
    
    35
    +        storage = self._storage
    
    36
    +        return re_pb2.FindMissingBlobsResponse(
    
    37
    +            missing_blob_digests=storage.missing_blobs(blob_digests))
    
    38
    +
    
    39
    +    def batch_update_blobs(self, requests):
    
    40
    +        storage = self._storage
    
    41
    +        store = []
    
    42
    +        for request_proto in requests:
    
    43
    +            store.append((request_proto.digest, request_proto.data))
    
    44
    +
    
    45
    +        response = re_pb2.BatchUpdateBlobsResponse()
    
    46
    +        statuses = storage.bulk_update_blobs(store)
    
    47
    +
    
    48
    +        for (digest, _), status in zip(store, statuses):
    
    49
    +            response_proto = response.responses.add()
    
    50
    +            response_proto.digest.CopyFrom(digest)
    
    51
    +            response_proto.status.CopyFrom(status)
    
    52
    +
    
    53
    +        return response
    
    54
    +
    
    55
    +
    
    56
    +class ByteStreamInstance:
    
    57
    +
    
    58
    +    BLOCK_SIZE = 1 * 1024 * 1024  # 1 MB block size
    
    59
    +
    
    60
    +    def __init__(self, storage):
    
    61
    +        self._storage = storage
    
    62
    +
    
    63
    +    def read(self, path, read_offset, read_limit):
    
    64
    +        storage = self._storage
    
    65
    +
    
    66
    +        if len(path) == 3:
    
    67
    +            path = [""] + path
    
    68
    +
    
    69
    +        # Parse/verify resource name.
    
    70
    +        # Read resource names look like "[instance/]blobs/abc123hash/99".
    
    71
    +        digest = re_pb2.Digest(hash=path[2], size_bytes=int(path[3]))
    
    72
    +
    
    73
    +        # Check the given read offset and limit.
    
    74
    +        if read_offset < 0 or read_offset > digest.size_bytes:
    
    75
    +            raise OutOfRangeError("Read offset out of range")
    
    76
    +
    
    77
    +        elif read_limit == 0:
    
    78
    +            bytes_remaining = digest.size_bytes - read_offset
    
    79
    +
    
    80
    +        elif read_limit > 0:
    
    81
    +            bytes_remaining = read_limit
    
    82
    +
    
    83
    +        else:
    
    84
    +            raise InvalidArgumentError("Negative read_limit is invalid")
    
    85
    +
    
    86
    +        # Read the blob from storage and send its contents to the client.
    
    87
    +        result = storage.get_blob(digest)
    
    88
    +        if result is None:
    
    89
    +            raise NotFoundError("Blob not found")
    
    90
    +
    
    91
    +        elif result.seekable():
    
    92
    +            result.seek(read_offset)
    
    93
    +
    
    94
    +        else:
    
    95
    +            result.read(read_offset)
    
    96
    +
    
    97
    +        while bytes_remaining > 0:
    
    98
    +            yield bytestream_pb2.ReadResponse(
    
    99
    +                data=result.read(min(self.BLOCK_SIZE, bytes_remaining)))
    
    100
    +            bytes_remaining -= self.BLOCK_SIZE
    
    101
    +
    
    102
    +    def write(self, requests):
    
    103
    +        storage = self._storage
    
    104
    +
    
    105
    +        first_request = next(requests)
    
    106
    +        path = first_request.resource_name.split("/")
    
    107
    +
    
    108
    +        if path[0] == "uploads":
    
    109
    +            path = [""] + path
    
    110
    +
    
    111
    +        if len(path) < 6 or path[1] != "uploads" or path[3] != "blobs" or not path[5].isdigit():
    
    112
    +            raise InvalidArgumentError("Invalid resource name")
    
    113
    +
    
    114
    +        digest = re_pb2.Digest(hash=path[4], size_bytes=int(path[5]))
    
    115
    +        write_session = storage.begin_write(digest)
    
    116
    +
    
    117
    +        # Start the write session and write the first request's data.
    
    118
    +        write_session.write(first_request.data)
    
    119
    +        hash_ = HASH(first_request.data)
    
    120
    +        bytes_written = len(first_request.data)
    
    121
    +        finished = first_request.finish_write
    
    122
    +
    
    123
    +        # Handle subsequent write requests.
    
    124
    +        while not finished:
    
    125
    +
    
    126
    +            for request in requests:
    
    127
    +                if finished:
    
    128
    +                    raise InvalidArgumentError("Write request sent after write finished")
    
    129
    +
    
    130
    +                elif request.write_offset != bytes_written:
    
    131
    +                    raise InvalidArgumentError("Invalid write offset")
    
    132
    +
    
    133
    +                elif request.resource_name and request.resource_name != first_request.resource_name:
    
    134
    +                    raise InvalidArgumentError("Resource name changed mid-write")
    
    135
    +
    
    136
    +                finished = request.finish_write
    
    137
    +                bytes_written += len(request.data)
    
    138
    +                if bytes_written > digest.size_bytes:
    
    139
    +                    raise InvalidArgumentError("Wrote too much data to blob")
    
    140
    +
    
    141
    +                write_session.write(request.data)
    
    142
    +                hash_.update(request.data)
    
    143
    +
    
    144
    +        # Check that the data matches the provided digest.
    
    145
    +        if bytes_written != digest.size_bytes or not finished:
    
    146
    +            raise NotImplementedError("Cannot close stream before finishing write")
    
    147
    +
    
    148
    +        elif hash_.hexdigest() != digest.hash:
    
    149
    +            raise InvalidArgumentError("Data does not match hash")
    
    150
    +
    
    151
    +        storage.commit_write(digest, write_session)
    
    152
    +        return bytestream_pb2.WriteResponse(committed_size=bytes_written)

  • buildgrid/server/cas/service.py
    ... ... @@ -21,131 +21,128 @@ Implements the Content Addressable Storage API and ByteStream API.
    21 21
     """
    
    22 22
     
    
    23 23
     
    
    24
    +from itertools import tee
    
    25
    +import logging
    
    26
    +
    
    24 27
     import grpc
    
    25 28
     
    
    26 29
     from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    27 30
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
    
    28 31
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc as re_pb2_grpc
    
    29 32
     
    
    30
    -from ...settings import HASH
    
    33
    +from .._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
    
    31 34
     
    
    32 35
     
    
    33 36
     class ContentAddressableStorageService(re_pb2_grpc.ContentAddressableStorageServicer):
    
    34 37
     
    
    35
    -    def __init__(self, storage):
    
    36
    -        self._storage = storage
    
    38
    +    def __init__(self, instances):
    
    39
    +        self.logger = logging.getLogger(__name__)
    
    40
    +        self._instances = instances
    
    37 41
     
    
    38 42
         def FindMissingBlobs(self, request, context):
    
    39
    -        # Only one instance for now.
    
    40
    -        storage = self._storage
    
    41
    -        return re_pb2.FindMissingBlobsResponse(
    
    42
    -            missing_blob_digests=storage.missing_blobs(request.blob_digests))
    
    43
    +        try:
    
    44
    +            instance = self._get_instance(request.instance_name)
    
    45
    +            return instance.find_missing_blobs(request.blob_digests)
    
    46
    +
    
    47
    +        except InvalidArgumentError as e:
    
    48
    +            self.logger.error(e)
    
    49
    +            context.set_details(str(e))
    
    50
    +            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    51
    +
    
    52
    +        return re_pb2.FindMissingBlobsResponse()
    
    43 53
     
    
    44 54
         def BatchUpdateBlobs(self, request, context):
    
    45
    -        # Only one instance for now.
    
    46
    -        storage = self._storage
    
    47
    -        requests = []
    
    48
    -        for request_proto in request.requests:
    
    49
    -            requests.append((request_proto.digest, request_proto.data))
    
    50
    -        response = re_pb2.BatchUpdateBlobsResponse()
    
    51
    -        for (digest, _), status in zip(requests, storage.bulk_update_blobs(requests)):
    
    52
    -            response_proto = response.responses.add()
    
    53
    -            response_proto.digest.CopyFrom(digest)
    
    54
    -            response_proto.status.CopyFrom(status)
    
    55
    -        return response
    
    55
    +        try:
    
    56
    +            instance = self._get_instance(request.instance_name)
    
    57
    +            return instance.batch_update_blobs(request.requests)
    
    56 58
     
    
    59
    +        except InvalidArgumentError as e:
    
    60
    +            self.logger.error(e)
    
    61
    +            context.set_details(str(e))
    
    62
    +            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    57 63
     
    
    58
    -class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
    
    64
    +        return re_pb2.BatchReadBlobsResponse()
    
    59 65
     
    
    60
    -    BLOCK_SIZE = 1 * 1024 * 1024  # 1 MB block size
    
    66
    +    def _get_instance(self, instance_name):
    
    67
    +        try:
    
    68
    +            return self._instances[instance_name]
    
    69
    +
    
    70
    +        except KeyError:
    
    71
    +            raise InvalidArgumentError("Invalid instance name: {}".format(instance_name))
    
    72
    +
    
    73
    +
    
    74
    +class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
    
    61 75
     
    
    62
    -    def __init__(self, storage):
    
    63
    -        self._storage = storage
    
    76
    +    def __init__(self, instances):
    
    77
    +        self.logger = logging.getLogger(__name__)
    
    78
    +        self._instances = instances
    
    64 79
     
    
    65 80
         def Read(self, request, context):
    
    66
    -        # Only one instance for now.
    
    67
    -        storage = self._storage
    
    68
    -
    
    69
    -        # Parse/verify resource name.
    
    70
    -        # Read resource names look like "[instance/]blobs/abc123hash/99".
    
    71
    -        path = request.resource_name.split("/")
    
    72
    -        if len(path) == 3:
    
    73
    -            path = [""] + path
    
    74
    -        if len(path) != 4 or path[1] != "blobs" or not path[3].isdigit():
    
    75
    -            context.abort(grpc.StatusCode.NOT_FOUND, "Invalid resource name")
    
    76
    -        # instance_name = path[0]
    
    77
    -        digest = re_pb2.Digest(hash=path[2], size_bytes=int(path[3]))
    
    78
    -
    
    79
    -        # Check the given read offset and limit.
    
    80
    -        if request.read_offset < 0 or request.read_offset > digest.size_bytes:
    
    81
    -            context.abort(grpc.StatusCode.OUT_OF_RANGE, "Read offset out of range")
    
    82
    -        elif request.read_limit == 0:
    
    83
    -            bytes_remaining = digest.size_bytes - request.read_offset
    
    84
    -        elif request.read_limit > 0:
    
    85
    -            bytes_remaining = request.read_limit
    
    86
    -        else:
    
    87
    -            context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Negative read_limit is invalid")
    
    88
    -
    
    89
    -        # Read the blob from storage and send its contents to the client.
    
    90
    -        result = storage.get_blob(digest)
    
    91
    -        if result is None:
    
    92
    -            context.abort(grpc.StatusCode.NOT_FOUND, "Blob not found")
    
    93
    -        elif result.seekable():
    
    94
    -            result.seek(request.read_offset)
    
    95
    -        else:
    
    96
    -            result.read(request.read_offset)
    
    97
    -        while bytes_remaining > 0:
    
    98
    -            yield bytestream_pb2.ReadResponse(
    
    99
    -                data=result.read(min(self.BLOCK_SIZE, bytes_remaining)))
    
    100
    -            bytes_remaining -= self.BLOCK_SIZE
    
    101
    -
    
    102
    -    def Write(self, request_iterator, context):
    
    103
    -        # Only one instance for now.
    
    104
    -        storage = self._storage
    
    105
    -
    
    106
    -        requests = iter(request_iterator)
    
    107
    -        first_request = next(requests)
    
    108
    -        if first_request.write_offset != 0:
    
    109
    -            context.abort(grpc.StatusCode.UNIMPLEMENTED, "Nonzero write offset is unsupported")
    
    110
    -
    
    111
    -        # Parse/verify resource name.
    
    112
    -        # Write resource names look like "[instance/]uploads/SOME-GUID/blobs/abc123hash/99".
    
    113
    -        path = first_request.resource_name.split("/")
    
    114
    -        if path[0] == "uploads":
    
    115
    -            path = [""] + path
    
    116
    -        if len(path) < 6 or path[1] != "uploads" or path[3] != "blobs" or not path[5].isdigit():
    
    117
    -            context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Invalid resource name")
    
    118
    -        # instance_name = path[0]
    
    119
    -        digest = re_pb2.Digest(hash=path[4], size_bytes=int(path[5]))
    
    120
    -
    
    121
    -        # Start the write session and write the first request's data.
    
    122
    -        write_session = storage.begin_write(digest)
    
    123
    -        write_session.write(first_request.data)
    
    124
    -        hash_ = HASH(first_request.data)
    
    125
    -        bytes_written = len(first_request.data)
    
    126
    -        done = first_request.finish_write
    
    127
    -
    
    128
    -        # Handle subsequent write requests.
    
    129
    -        for request in requests:
    
    130
    -            if done:
    
    131
    -                context.abort(grpc.StatusCode.INVALID_ARGUMENT,
    
    132
    -                              "Write request sent after write finished")
    
    133
    -            elif request.write_offset != bytes_written:
    
    134
    -                context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Invalid write offset")
    
    135
    -            elif request.resource_name and request.resource_name != first_request.resource_name:
    
    136
    -                context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Resource name changed mid-write")
    
    137
    -            done = request.finish_write
    
    138
    -            bytes_written += len(request.data)
    
    139
    -            if bytes_written > digest.size_bytes:
    
    140
    -                context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Wrote too much data to blob")
    
    141
    -            write_session.write(request.data)
    
    142
    -            hash_.update(request.data)
    
    143
    -
    
    144
    -        # Check that the data matches the provided digest.
    
    145
    -        if bytes_written != digest.size_bytes or not done:
    
    146
    -            context.abort(grpc.StatusCode.UNIMPLEMENTED,
    
    147
    -                          "Cannot close stream before finishing write")
    
    148
    -        elif hash_.hexdigest() != digest.hash:
    
    149
    -            context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Data does not match hash")
    
    150
    -        storage.commit_write(digest, write_session)
    
    151
    -        return bytestream_pb2.WriteResponse(committed_size=bytes_written)
    81
    +        try:
    
    82
    +            path = request.resource_name.split("/")
    
    83
    +            instance_name = path[0]
    
    84
    +            if instance_name == "blobs":
    
    85
    +                # TODO: Make default if no instance_name
    
    86
    +                instance_name = ""
    
    87
    +
    
    88
    +            instance = self._get_instance(instance_name)
    
    89
    +            yield from instance.read(path,
    
    90
    +                                     request.read_offset,
    
    91
    +                                     request.read_limit)
    
    92
    +
    
    93
    +        except InvalidArgumentError as e:
    
    94
    +            self.logger.error(e)
    
    95
    +            context.set_details(str(e))
    
    96
    +            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    97
    +            yield bytestream_pb2.ReadResponse()
    
    98
    +
    
    99
    +        except NotFoundError as e:
    
    100
    +            self.logger.error(e)
    
    101
    +            context.set_details(str(e))
    
    102
    +            context.set_code(grpc.StatusCode.NOT_FOUND)
    
    103
    +            yield bytestream_pb2.ReadResponse()
    
    104
    +
    
    105
    +        except OutOfRangeError as e:
    
    106
    +            self.logger.error(e)
    
    107
    +            context.set_details(str(e))
    
    108
    +            context.set_code(grpc.StatusCode.OUT_OF_RANGE)
    
    109
    +            yield bytestream_pb2.ReadResponse()
    
    110
    +
    
    111
    +    def Write(self, requests, context):
    
    112
    +        try:
    
    113
    +            requests, request_probe = tee(requests, 2)
    
    114
    +            first_request = next(request_probe)
    
    115
    +
    
    116
    +            path = first_request.resource_name.split("/")
    
    117
    +
    
    118
    +            instance_name = path[0]
    
    119
    +            if instance_name == "uploads":
    
    120
    +                # TODO: Make default if no instance_name
    
    121
    +                instance_name = ""
    
    122
    +
    
    123
    +            instance = self._get_instance(instance_name)
    
    124
    +            return instance.write(requests)
    
    125
    +
    
    126
    +        except NotImplementedError as e:
    
    127
    +            self.logger.error(e)
    
    128
    +            context.set_details(str(e))
    
    129
    +            context.set_code(grpc.StatusCode.UNIMPLEMENTED)
    
    130
    +
    
    131
    +        except InvalidArgumentError as e:
    
    132
    +            self.logger.error(e)
    
    133
    +            context.set_details(str(e))
    
    134
    +            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    135
    +
    
    136
    +        except NotFoundError as e:
    
    137
    +            self.logger.error(e)
    
    138
    +            context.set_details(str(e))
    
    139
    +            context.set_code(grpc.StatusCode.NOT_FOUND)
    
    140
    +
    
    141
    +        return bytestream_pb2.WriteResponse()
    
    142
    +
    
    143
    +    def _get_instance(self, instance_name):
    
    144
    +        try:
    
    145
    +            return self._instances[instance_name]
    
    146
    +
    
    147
    +        except KeyError:
    
    148
    +            raise InvalidArgumentError("Invalid instance name: {}".format(instance_name))

  • buildgrid/server/cas/storage/remote.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +"""
    
    17
    +RemoteStorage
    
    18
    +==================
    
    19
    +
    
    20
    +Forwwards storage requests to a remote storage.
    
    21
    +"""
    
    22
    +
    
    23
    +import io
    
    24
    +import logging
    
    25
    +
    
    26
    +import grpc
    
    27
    +
    
    28
    +from buildgrid.utils import create_digest, gen_fetch_blob, gen_write_request_blob
    
    29
    +from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
    
    30
    +from buildgrid._protos.google.rpc.status_pb2 import Status
    
    31
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    32
    +
    
    33
    +from .storage_abc import StorageABC
    
    34
    +
    
    35
    +
    
    36
    +class RemoteStorage(StorageABC):
    
    37
    +
    
    38
    +    def __init__(self, channel, instance_name=""):
    
    39
    +        self.logger = logging.getLogger(__name__)
    
    40
    +        self._instance_name = instance_name
    
    41
    +        self._stub_bs = bytestream_pb2_grpc.ByteStreamStub(channel)
    
    42
    +        self._stub_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(channel)
    
    43
    +
    
    44
    +    def has_blob(self, digest):
    
    45
    +        try:
    
    46
    +            if self.get_blob(digest):
    
    47
    +                return True
    
    48
    +
    
    49
    +        except grpc.RpcError as e:
    
    50
    +            if e.code() == grpc.StatusCode.NOT_FOUND:
    
    51
    +                pass
    
    52
    +            else:
    
    53
    +                raise e
    
    54
    +
    
    55
    +        return False
    
    56
    +
    
    57
    +    def get_blob(self, digest):
    
    58
    +        fetched_data = io.BytesIO()
    
    59
    +        length = 0
    
    60
    +        for data in gen_fetch_blob(self._stub_bs, digest, self._instance_name):
    
    61
    +            length += fetched_data.write(data)
    
    62
    +
    
    63
    +        if length:
    
    64
    +            assert digest.size_bytes == length
    
    65
    +            fetched_data.seek(0)
    
    66
    +            return fetched_data
    
    67
    +
    
    68
    +        else:
    
    69
    +            return None
    
    70
    +
    
    71
    +    def begin_write(self, digest):
    
    72
    +        return io.BytesIO(digest.SerializeToString())
    
    73
    +
    
    74
    +    def commit_write(self, digest, write_session):
    
    75
    +        write_session.seek(0)
    
    76
    +
    
    77
    +        for request in gen_write_request_blob(write_session, digest, self._instance_name):
    
    78
    +            self._stub_bs.Write(request)
    
    79
    +
    
    80
    +    def missing_blobs(self, blobs):
    
    81
    +        request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=self._instance_name)
    
    82
    +
    
    83
    +        for blob in blobs:
    
    84
    +            request_digest = request.blob_digests.add()
    
    85
    +            request_digest.hash = blob.hash
    
    86
    +            request_digest.size_bytes = blob.size_bytes
    
    87
    +
    
    88
    +        response = self._stub_cas.FindMissingBlobs(request)
    
    89
    +
    
    90
    +        return [x for x in response.missing_blob_digests]
    
    91
    +
    
    92
    +    def bulk_update_blobs(self, blobs):
    
    93
    +        request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=self._instance_name)
    
    94
    +
    
    95
    +        for digest, data in blobs:
    
    96
    +            reqs = request.requests.add()
    
    97
    +            reqs.digest.CopyFrom(digest)
    
    98
    +            reqs.data = data
    
    99
    +
    
    100
    +        response = self._stub_cas.BatchUpdateBlobs(request)
    
    101
    +
    
    102
    +        responses = response.responses
    
    103
    +
    
    104
    +        # Check everything was sent back, even if order changed
    
    105
    +        assert ([x.digest for x in request.requests].sort(key=lambda x: x.hash)) == \
    
    106
    +            ([x.digest for x in responses].sort(key=lambda x: x.hash))
    
    107
    +
    
    108
    +        return [x.status for x in responses]

  • buildgrid/utils.py
    ... ... @@ -14,6 +14,7 @@
    14 14
     
    
    15 15
     
    
    16 16
     import os
    
    17
    +import uuid
    
    17 18
     
    
    18 19
     from buildgrid.settings import HASH
    
    19 20
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    ... ... @@ -27,10 +28,37 @@ def gen_fetch_blob(stub, digest, instance_name=""):
    27 28
         resource_name = os.path.join(instance_name, 'blobs', digest.hash, str(digest.size_bytes))
    
    28 29
         request = bytestream_pb2.ReadRequest(resource_name=resource_name,
    
    29 30
                                              read_offset=0)
    
    31
    +
    
    30 32
         for response in stub.Read(request):
    
    31 33
             yield response.data
    
    32 34
     
    
    33 35
     
    
    36
    +def gen_write_request_blob(digest_bytes, digest, instance_name=""):
    
    37
    +    """ Generates a bytestream write request
    
    38
    +    """
    
    39
    +    resource_name = os.path.join(instance_name, 'uploads', str(uuid.uuid4()),
    
    40
    +                                 'blobs', digest.hash, str(digest.size_bytes))
    
    41
    +
    
    42
    +    offset = 0
    
    43
    +    finished = False
    
    44
    +    remaining = digest.size_bytes
    
    45
    +
    
    46
    +    while not finished:
    
    47
    +        chunk_size = min(remaining, 64 * 1024)
    
    48
    +        remaining -= chunk_size
    
    49
    +        finished = remaining <= 0
    
    50
    +
    
    51
    +        request = bytestream_pb2.WriteRequest()
    
    52
    +        request.resource_name = resource_name
    
    53
    +        request.write_offset = offset
    
    54
    +        request.data = digest_bytes.read(chunk_size)
    
    55
    +        request.finish_write = finished
    
    56
    +
    
    57
    +        yield request
    
    58
    +
    
    59
    +        offset += chunk_size
    
    60
    +
    
    61
    +
    
    34 62
     def write_fetch_directory(directory, stub, digest, instance_name=""):
    
    35 63
         """ Given a directory digest, fetches files and writes them to a directory
    
    36 64
         """
    

  • tests/cas/test_services.py
    ... ... @@ -18,17 +18,23 @@
    18 18
     # pylint: disable=redefined-outer-name
    
    19 19
     
    
    20 20
     import io
    
    21
    +from unittest import mock
    
    21 22
     
    
    23
    +import grpc
    
    24
    +from grpc._server import _Context
    
    22 25
     import pytest
    
    23 26
     
    
    24 27
     from buildgrid._protos.google.bytestream import bytestream_pb2
    
    25 28
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
    
    26 29
     from buildgrid.server.cas.storage.storage_abc import StorageABC
    
    27
    -from buildgrid.server.cas.service import ByteStreamService
    
    28
    -from buildgrid.server.cas.service import ContentAddressableStorageService
    
    30
    +from buildgrid.server.cas.instance import ByteStreamInstance, ContentAddressableStorageInstance
    
    31
    +from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
    
    29 32
     from buildgrid.settings import HASH
    
    30 33
     
    
    31 34
     
    
    35
    +context = mock.create_autospec(_Context)
    
    36
    +
    
    37
    +
    
    32 38
     class SimpleStorage(StorageABC):
    
    33 39
         """Storage provider wrapper around a dictionary.
    
    34 40
     
    
    ... ... @@ -61,19 +67,6 @@ class SimpleStorage(StorageABC):
    61 67
             self.data[(digest.hash, digest.size_bytes)] = data
    
    62 68
     
    
    63 69
     
    
    64
    -class MockObject:
    
    65
    -    def __init__(self):
    
    66
    -        self.abort = None
    
    67
    -
    
    68
    -
    
    69
    -class MockException(Exception):
    
    70
    -    pass
    
    71
    -
    
    72
    -
    
    73
    -def raise_mock_exception(*args, **kwargs):
    
    74
    -    raise MockException()
    
    75
    -
    
    76
    -
    
    77 70
     test_strings = [b"", b"hij"]
    
    78 71
     instances = ["", "test_inst"]
    
    79 72
     
    
    ... ... @@ -82,7 +75,9 @@ instances = ["", "test_inst"]
    82 75
     @pytest.mark.parametrize("instance", instances)
    
    83 76
     def test_bytestream_read(data_to_read, instance):
    
    84 77
         storage = SimpleStorage([b"abc", b"defg", data_to_read])
    
    85
    -    servicer = ByteStreamService(storage)
    
    78
    +
    
    79
    +    bs_instance = ByteStreamInstance(storage)
    
    80
    +    servicer = ByteStreamService({instance: bs_instance})
    
    86 81
     
    
    87 82
         request = bytestream_pb2.ReadRequest()
    
    88 83
         if instance != "":
    
    ... ... @@ -100,7 +95,8 @@ def test_bytestream_read_many(instance):
    100 95
         data_to_read = b"testing" * 10000
    
    101 96
     
    
    102 97
         storage = SimpleStorage([b"abc", b"defg", data_to_read])
    
    103
    -    servicer = ByteStreamService(storage)
    
    98
    +    bs_instance = ByteStreamInstance(storage)
    
    99
    +    servicer = ByteStreamService({instance: bs_instance})
    
    104 100
     
    
    105 101
         request = bytestream_pb2.ReadRequest()
    
    106 102
         if instance != "":
    
    ... ... @@ -117,7 +113,8 @@ def test_bytestream_read_many(instance):
    117 113
     @pytest.mark.parametrize("extra_data", ["", "/", "/extra/data"])
    
    118 114
     def test_bytestream_write(instance, extra_data):
    
    119 115
         storage = SimpleStorage()
    
    120
    -    servicer = ByteStreamService(storage)
    
    116
    +    bs_instance = ByteStreamInstance(storage)
    
    117
    +    servicer = ByteStreamService({instance: bs_instance})
    
    121 118
     
    
    122 119
         resource_name = ""
    
    123 120
         if instance != "":
    
    ... ... @@ -139,7 +136,8 @@ def test_bytestream_write(instance, extra_data):
    139 136
     
    
    140 137
     def test_bytestream_write_rejects_wrong_hash():
    
    141 138
         storage = SimpleStorage()
    
    142
    -    servicer = ByteStreamService(storage)
    
    139
    +    bs_instance = ByteStreamInstance(storage)
    
    140
    +    servicer = ByteStreamService({"": bs_instance})
    
    143 141
     
    
    144 142
         data = b'some data'
    
    145 143
         wrong_hash = HASH(b'incorrect').hexdigest()
    
    ... ... @@ -148,10 +146,8 @@ def test_bytestream_write_rejects_wrong_hash():
    148 146
             bytestream_pb2.WriteRequest(resource_name=resource_name, data=data, finish_write=True)
    
    149 147
         ]
    
    150 148
     
    
    151
    -    context = MockObject()
    
    152
    -    context.abort = raise_mock_exception
    
    153
    -    with pytest.raises(MockException):
    
    154
    -        servicer.Write(requests, context)
    
    149
    +    servicer.Write(requests, context)
    
    150
    +    context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
    
    155 151
     
    
    156 152
         assert len(storage.data) is 0
    
    157 153
     
    
    ... ... @@ -159,7 +155,8 @@ def test_bytestream_write_rejects_wrong_hash():
    159 155
     @pytest.mark.parametrize("instance", instances)
    
    160 156
     def test_cas_find_missing_blobs(instance):
    
    161 157
         storage = SimpleStorage([b'abc', b'def'])
    
    162
    -    servicer = ContentAddressableStorageService(storage)
    
    158
    +    cas_instance = ContentAddressableStorageInstance(storage)
    
    159
    +    servicer = ContentAddressableStorageService({instance: cas_instance})
    
    163 160
         digests = [
    
    164 161
             re_pb2.Digest(hash=HASH(b'def').hexdigest(), size_bytes=3),
    
    165 162
             re_pb2.Digest(hash=HASH(b'ghij').hexdigest(), size_bytes=4)
    
    ... ... @@ -173,7 +170,9 @@ def test_cas_find_missing_blobs(instance):
    173 170
     @pytest.mark.parametrize("instance", instances)
    
    174 171
     def test_cas_batch_update_blobs(instance):
    
    175 172
         storage = SimpleStorage()
    
    176
    -    servicer = ContentAddressableStorageService(storage)
    
    173
    +    cas_instance = ContentAddressableStorageInstance(storage)
    
    174
    +    servicer = ContentAddressableStorageService({instance: cas_instance})
    
    175
    +
    
    177 176
         update_requests = [
    
    178 177
             re_pb2.BatchUpdateBlobsRequest.Request(
    
    179 178
                 digest=re_pb2.Digest(hash=HASH(b'abc').hexdigest(), size_bytes=3), data=b'abc'),
    
    ... ... @@ -181,16 +180,21 @@ def test_cas_batch_update_blobs(instance):
    181 180
                 digest=re_pb2.Digest(hash="invalid digest!", size_bytes=1000),
    
    182 181
                 data=b'wrong data')
    
    183 182
         ]
    
    183
    +
    
    184 184
         request = re_pb2.BatchUpdateBlobsRequest(instance_name=instance, requests=update_requests)
    
    185 185
         response = servicer.BatchUpdateBlobs(request, None)
    
    186 186
         assert len(response.responses) == 2
    
    187
    +
    
    187 188
         for blob_response in response.responses:
    
    188 189
             if blob_response.digest == update_requests[0].digest:
    
    189 190
                 assert blob_response.status.code == 0
    
    191
    +
    
    190 192
             elif blob_response.digest == update_requests[1].digest:
    
    191 193
                 assert blob_response.status.code != 0
    
    194
    +
    
    192 195
             else:
    
    193 196
                 raise Exception("Unexpected blob response")
    
    197
    +
    
    194 198
         assert len(storage.data) == 1
    
    195 199
         assert (update_requests[0].digest.hash, 3) in storage.data
    
    196 200
         assert storage.data[(update_requests[0].digest.hash, 3)] == b'abc'

  • tests/cas/test_storage.py
    ... ... @@ -19,18 +19,27 @@
    19 19
     
    
    20 20
     import tempfile
    
    21 21
     
    
    22
    +from unittest import mock
    
    23
    +
    
    22 24
     import boto3
    
    25
    +import grpc
    
    26
    +from grpc._server import _Context
    
    23 27
     import pytest
    
    24
    -
    
    25 28
     from moto import mock_s3
    
    26 29
     
    
    27 30
     from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
    
    31
    +from buildgrid.server.cas import service
    
    32
    +from buildgrid.server.cas.instance import ByteStreamInstance, ContentAddressableStorageInstance
    
    33
    +from buildgrid.server.cas.storage import remote
    
    28 34
     from buildgrid.server.cas.storage.lru_memory_cache import LRUMemoryCache
    
    29 35
     from buildgrid.server.cas.storage.disk import DiskStorage
    
    30 36
     from buildgrid.server.cas.storage.s3 import S3Storage
    
    31 37
     from buildgrid.server.cas.storage.with_cache import WithCacheStorage
    
    32 38
     from buildgrid.settings import HASH
    
    33 39
     
    
    40
    +
    
    41
    +context = mock.create_autospec(_Context)
    
    42
    +
    
    34 43
     abc = b"abc"
    
    35 44
     abc_digest = Digest(hash=HASH(abc).hexdigest(), size_bytes=3)
    
    36 45
     defg = b"defg"
    
    ... ... @@ -45,10 +54,62 @@ def write(storage, digest, blob):
    45 54
         storage.commit_write(digest, session)
    
    46 55
     
    
    47 56
     
    
    57
    +class MockCASStorage(ByteStreamInstance, ContentAddressableStorageInstance):
    
    58
    +
    
    59
    +    def __init__(self):
    
    60
    +        storage = LRUMemoryCache(256)
    
    61
    +        super().__init__(storage)
    
    62
    +
    
    63
    +
    
    64
    +# Mock a CAS server with LRUStorage to return "calls" made to it
    
    65
    +class MockStubServer:
    
    66
    +
    
    67
    +    def __init__(self):
    
    68
    +        instances = {"": MockCASStorage(), "dna": MockCASStorage()}
    
    69
    +        self._requests = []
    
    70
    +        self._bs_service = service.ByteStreamService(instances)
    
    71
    +        self._cas_service = service.ContentAddressableStorageService(instances)
    
    72
    +
    
    73
    +    def Read(self, request):
    
    74
    +        yield from self._bs_service.Read(request, context)
    
    75
    +
    
    76
    +    def Write(self, request):
    
    77
    +        self._requests.append(request)
    
    78
    +        if request.finish_write:
    
    79
    +            response = self._bs_service.Write(self._requests, context)
    
    80
    +            self._requests = []
    
    81
    +            return response
    
    82
    +
    
    83
    +        return None
    
    84
    +
    
    85
    +    def FindMissingBlobs(self, request):
    
    86
    +        return self._cas_service.FindMissingBlobs(request, context)
    
    87
    +
    
    88
    +    def BatchUpdateBlobs(self, request):
    
    89
    +        return self._cas_service.BatchUpdateBlobs(request, context)
    
    90
    +
    
    91
    +
    
    92
    +# Instances of MockCASStorage
    
    93
    +@pytest.fixture(params=["", "dna"])
    
    94
    +def instance(params):
    
    95
    +    return {params, MockCASStorage()}
    
    96
    +
    
    97
    +
    
    98
    +@pytest.fixture()
    
    99
    +@mock.patch.object(remote, 'bytestream_pb2_grpc')
    
    100
    +@mock.patch.object(remote, 'remote_execution_pb2_grpc')
    
    101
    +def remote_storage(mock_bs_grpc, mock_re_pb2_grpc):
    
    102
    +    mock_server = MockStubServer()
    
    103
    +    storage = remote.RemoteStorage(instance)
    
    104
    +    storage._stub_bs = mock_server
    
    105
    +    storage._stub_cas = mock_server
    
    106
    +    yield storage
    
    107
    +
    
    108
    +
    
    48 109
     # General tests for all storage providers
    
    49 110
     
    
    50 111
     
    
    51
    -@pytest.fixture(params=["lru", "disk", "s3", "lru_disk", "disk_s3"])
    
    112
    +@pytest.fixture(params=["lru", "disk", "s3", "lru_disk", "disk_s3", "remote"])
    
    52 113
     def any_storage(request):
    
    53 114
         if request.param == "lru":
    
    54 115
             yield LRUMemoryCache(256)
    
    ... ... @@ -70,6 +131,14 @@ def any_storage(request):
    70 131
                 with mock_s3():
    
    71 132
                     boto3.resource('s3').create_bucket(Bucket="testing")
    
    72 133
                     yield WithCacheStorage(DiskStorage(path), S3Storage("testing"))
    
    134
    +    elif request.param == "remote":
    
    135
    +        with mock.patch.object(remote, 'bytestream_pb2_grpc'):
    
    136
    +            with mock.patch.object(remote, 'remote_execution_pb2_grpc'):
    
    137
    +                mock_server = MockStubServer()
    
    138
    +                storage = remote.RemoteStorage(instance)
    
    139
    +                storage._stub_bs = mock_server
    
    140
    +                storage._stub_cas = mock_server
    
    141
    +                yield storage
    
    73 142
     
    
    74 143
     
    
    75 144
     def test_initially_empty(any_storage):
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]