[Notes] [Git][BuildGrid/buildgrid][finn/separate-services] Adding remote storage



Title: GitLab

finn pushed to branch finn/separate-services at BuildGrid / buildgrid

Commits:

1 changed file:

Changes:

  • buildgrid/server/cas/storage/remote.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +"""
    
    17
    +RemoteStorage
    
    18
    +==================
    
    19
    +
    
    20
    +Forwwards storage requests to a remote storage.
    
    21
    +"""
    
    22
    +
    
    23
    +import io
    
    24
    +import logging
    
    25
    +import uuid
    
    26
    +
    
    27
    +import grpc
    
    28
    +
    
    29
    +from buildgrid.utils import create_digest, gen_fetch_blob
    
    30
    +from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    31
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    32
    +
    
    33
    +from .storage_abc import StorageABC
    
    34
    +
    
    35
    +
    
    36
    +class RemoteStorage(StorageABC):
    
    37
    +
    
    38
    +    def __init__(self, channel, instance_name):
    
    39
    +        self.logger = logging.getLogger(__name__)
    
    40
    +        self._instance_name = instance_name
    
    41
    +        self._stub_bs = bytestream_pb2_grpc.ByteStreamStub(channel)
    
    42
    +        self._stub_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(channel)
    
    43
    +
    
    44
    +    def has_blob(self, digest):
    
    45
    +        try:
    
    46
    +            self.get_blob(digest)
    
    47
    +            return True
    
    48
    +
    
    49
    +        except grpc.RpcError:
    
    50
    +            pass
    
    51
    +
    
    52
    +        return False
    
    53
    +
    
    54
    +    def get_blob(self, digest):
    
    55
    +        fetched_data = None
    
    56
    +        for data in gen_fetch_blob(self._stub_bs, digest, self._instance_name):
    
    57
    +            fetched_data += data
    
    58
    +
    
    59
    +        assert digest.size_bytes == len(fetched_data)
    
    60
    +
    
    61
    +        return io.BytesIO(fetched_data)
    
    62
    +
    
    63
    +    def begin_write(self, digest):
    
    64
    +        return io.BytesIO()
    
    65
    +
    
    66
    +    def commit_write(self, digest, write_session):
    
    67
    +        pass
    
    68
    +
    
    69
    +    def missing_blobs(self, blobs):
    
    70
    +        digests = []
    
    71
    +        for blob in blobs:
    
    72
    +            digests.append(create_digest(blob))
    
    73
    +
    
    74
    +        request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=self._instance_name,
    
    75
    +                                                               blob_digests=digests)
    
    76
    +        response = self._stub_cas.FindMissingBlobs(request)
    
    77
    +
    
    78
    +        return response.missing_blob_digests
    
    79
    +
    
    80
    +    def bulk_update_blobs(self, blobs):
    
    81
    +        requests = []
    
    82
    +        for blob in blobs:
    
    83
    +            request = remote_execution_pb2.BatchUpdateBlobsRequest.Request(digest=create_digest(blob),
    
    84
    +                                                                           data=blob)
    
    85
    +            requests.append(request)
    
    86
    +
    
    87
    +        response = self._stub_cas.BatchUpdateBlobs(instance_name=self._instance_name,
    
    88
    +                                                   requests=requests)
    
    89
    +        result = [x.status for x in response.responses]
    
    90
    +        return result
    
    91
    +
    
    92
    +    def _request_write_stream(self, digest):
    
    93
    +        resname = '{}/uploads/{}/blobs/{}/{}'.format(self._instance_name,
    
    94
    +                                                     uuid.uuid4(),
    
    95
    +                                                     digest.hash,
    
    96
    +                                                     digest.size_bytes)
    
    97
    +        offset = 0
    
    98
    +        finished = False
    
    99
    +        remaining = digest.size_bytes
    
    100
    +        digest_bytes = str.encode(digest.SerializeToString())
    
    101
    +
    
    102
    +        while not finished:
    
    103
    +            chunk_size = min(remaining, 64 * 1024)
    
    104
    +            remaining -= chunk_size
    
    105
    +            finish_write = remaining <= 0
    
    106
    +
    
    107
    +            request = bytestream_pb2.WriteRequest()
    
    108
    +            request.resource_name = resname
    
    109
    +            request.write_offset = offset
    
    110
    +            request.data = digest_bytes[offset: chunk_size]
    
    111
    +            request.finish_write = finish_write
    
    112
    +            yield request
    
    113
    +
    
    114
    +            offset += chunk_size



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]