[Notes] [Git][BuildGrid/buildgrid][finn/separate-services] Adding remote storage



Title: GitLab

finn pushed to branch finn/separate-services at BuildGrid / buildgrid

Commits:

2 changed files:

Changes:

  • buildgrid/server/cas/storage/remote.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +"""
    
    17
    +RemoteStorage
    
    18
    +==================
    
    19
    +
    
    20
    +Forwwards storage requests to a remote storage.
    
    21
    +"""
    
    22
    +
    
    23
    +import io
    
    24
    +import logging
    
    25
    +
    
    26
    +import grpc
    
    27
    +
    
    28
    +from buildgrid.utils import create_digest, gen_fetch_blob, gen_write_blob
    
    29
    +from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
    
    30
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    31
    +
    
    32
    +from .storage_abc import StorageABC
    
    33
    +
    
    34
    +
    
    35
    +class RemoteStorage(StorageABC):
    
    36
    +
    
    37
    +    def __init__(self, channel, instance_name):
    
    38
    +        self.logger = logging.getLogger(__name__)
    
    39
    +        self._instance_name = instance_name
    
    40
    +        self._stub_bs = bytestream_pb2_grpc.ByteStreamStub(channel)
    
    41
    +        self._stub_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(channel)
    
    42
    +
    
    43
    +    def has_blob(self, digest):
    
    44
    +        try:
    
    45
    +            self.get_blob(digest)
    
    46
    +            return True
    
    47
    +
    
    48
    +        except grpc.RpcError:
    
    49
    +            pass
    
    50
    +
    
    51
    +        return False
    
    52
    +
    
    53
    +    def get_blob(self, digest):
    
    54
    +        fetched_data = None
    
    55
    +        for data in gen_fetch_blob(self._stub_bs, digest, self._instance_name):
    
    56
    +            fetched_data += data
    
    57
    +
    
    58
    +        assert digest.size_bytes == len(fetched_data)
    
    59
    +
    
    60
    +        return io.BytesIO(fetched_data)
    
    61
    +
    
    62
    +    def begin_write(self, digest):
    
    63
    +        return gen_write_blob(digest, self._instance_name)
    
    64
    +
    
    65
    +    def commit_write(self, digest, write_session):
    
    66
    +        for _ in write_session:
    
    67
    +            pass
    
    68
    +
    
    69
    +    def missing_blobs(self, blobs):
    
    70
    +        digests = []
    
    71
    +        for blob in blobs:
    
    72
    +            digests.append(create_digest(blob))
    
    73
    +
    
    74
    +        request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=self._instance_name,
    
    75
    +                                                               blob_digests=digests)
    
    76
    +        response = self._stub_cas.FindMissingBlobs(request)
    
    77
    +
    
    78
    +        return response.missing_blob_digests
    
    79
    +
    
    80
    +    def bulk_update_blobs(self, blobs):
    
    81
    +        requests = []
    
    82
    +        for blob in blobs:
    
    83
    +            request = remote_execution_pb2.BatchUpdateBlobsRequest.Request(digest=create_digest(blob),
    
    84
    +                                                                           data=blob)
    
    85
    +            requests.append(request)
    
    86
    +
    
    87
    +        response = self._stub_cas.BatchUpdateBlobs(instance_name=self._instance_name,
    
    88
    +                                                   requests=requests)
    
    89
    +        result = [x.status for x in response.responses]
    
    90
    +        return result

  • buildgrid/utils.py
    ... ... @@ -14,6 +14,7 @@
    14 14
     
    
    15 15
     
    
    16 16
     import os
    
    17
    +import uuid
    
    17 18
     
    
    18 19
     from buildgrid.settings import HASH
    
    19 20
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    ... ... @@ -31,6 +32,32 @@ def gen_fetch_blob(stub, digest, instance_name=""):
    31 32
             yield response.data
    
    32 33
     
    
    33 34
     
    
    35
    +def gen_write_blob(stub, digest, instance_name=""):
    
    36
    +    """ Generates a write request for a given digest
    
    37
    +    """
    
    38
    +    resource_name = os.path.join(instance_name, 'uploads', uuid.uuid4(),
    
    39
    +                                 'blobs', digest.hash, str(digest.size_bytes))
    
    40
    +
    
    41
    +    offset = 0
    
    42
    +    finished = False
    
    43
    +    remaining = digest.size_bytes
    
    44
    +    digest_bytes = str.encode(digest.SerializeToString())
    
    45
    +
    
    46
    +    while not finished:
    
    47
    +        chunk_size = min(remaining, 64 * 1024)
    
    48
    +        remaining -= chunk_size
    
    49
    +        finished = remaining <= 0
    
    50
    +
    
    51
    +        request = bytestream_pb2.WriteRequest()
    
    52
    +        request.resource_name = resource_name
    
    53
    +        request.write_offset = offset
    
    54
    +        request.data = digest_bytes[offset: chunk_size]
    
    55
    +        request.finish_write = finished
    
    56
    +        yield stub.Write(request)
    
    57
    +
    
    58
    +        offset += chunk_size
    
    59
    +
    
    60
    +
    
    34 61
     def write_fetch_directory(directory, stub, digest, instance_name=""):
    
    35 62
         """ Given a directory digest, fetches files and writes them to a directory
    
    36 63
         """
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]