[Notes] [Git][BuildGrid/buildgrid][mablanch/79-cas-downloader] 10 commits: tests/cas/test_client.py: Add unit-tests for CAS client



Title: GitLab

Martin Blanchard pushed to branch mablanch/79-cas-downloader at BuildGrid / buildgrid

Commits:

28 changed files:

Changes:

  • buildgrid/_app/bots/buildbox.py
    ... ... @@ -19,28 +19,30 @@ import tempfile
    19 19
     
    
    20 20
     from google.protobuf import any_pb2
    
    21 21
     
    
    22
    -from buildgrid.client.cas import upload
    
    22
    +from buildgrid.client.cas import download, upload
    
    23 23
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    24
    -from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
    
    25
    -from buildgrid.utils import read_file, write_file, parse_to_pb2_from_fetch
    
    24
    +from buildgrid.utils import read_file, write_file
    
    26 25
     
    
    27 26
     
    
    28 27
     def work_buildbox(context, lease):
    
    29 28
         """Executes a lease for a build action, using buildbox.
    
    30 29
         """
    
    31 30
     
    
    32
    -    stub_bytestream = bytestream_pb2_grpc.ByteStreamStub(context.cas_channel)
    
    33 31
         local_cas_directory = context.local_cas
    
    32
    +    # instance_name = context.parent
    
    34 33
         logger = context.logger
    
    35 34
     
    
    36 35
         action_digest = remote_execution_pb2.Digest()
    
    37 36
         lease.payload.Unpack(action_digest)
    
    38 37
     
    
    39
    -    action = parse_to_pb2_from_fetch(remote_execution_pb2.Action(),
    
    40
    -                                     stub_bytestream, action_digest)
    
    38
    +    with download(context.cas_channel) as cas:
    
    39
    +        action = cas.get_message(action_digest,
    
    40
    +                                 remote_execution_pb2.Action())
    
    41 41
     
    
    42
    -    command = parse_to_pb2_from_fetch(remote_execution_pb2.Command(),
    
    43
    -                                      stub_bytestream, action.command_digest)
    
    42
    +        assert action.command_digest.hash
    
    43
    +
    
    44
    +        command = cas.get_message(action.command_digest,
    
    45
    +                                  remote_execution_pb2.Command())
    
    44 46
     
    
    45 47
         environment = dict()
    
    46 48
         for variable in command.environment_variables:
    
    ... ... @@ -101,7 +103,8 @@ def work_buildbox(context, lease):
    101 103
     
    
    102 104
                 # TODO: Have BuildBox helping us creating the Tree instance here
    
    103 105
                 # See https://gitlab.com/BuildStream/buildbox/issues/7 for details
    
    104
    -            output_tree = _cas_tree_maker(stub_bytestream, output_digest)
    
    106
    +            with download(context.cas_channel) as cas:
    
    107
    +                output_tree = _cas_tree_maker(cas, output_digest)
    
    105 108
     
    
    106 109
                 with upload(context.cas_channel) as cas:
    
    107 110
                     output_tree_digest = cas.put_message(output_tree)
    
    ... ... @@ -121,24 +124,28 @@ def work_buildbox(context, lease):
    121 124
         return lease
    
    122 125
     
    
    123 126
     
    
    124
    -def _cas_tree_maker(stub_bytestream, directory_digest):
    
    127
    +def _cas_tree_maker(cas, directory_digest):
    
    125 128
         # Generates and stores a Tree for a given Directory. This is very inefficient
    
    126 129
         # and only temporary. See https://gitlab.com/BuildStream/buildbox/issues/7.
    
    127 130
         output_tree = remote_execution_pb2.Tree()
    
    128 131
     
    
    129
    -    def list_directories(parent_directory):
    
    130
    -        directory_list = list()
    
    132
    +    def __cas_tree_maker(cas, parent_directory):
    
    133
    +        digests, directories = list(), list()
    
    131 134
             for directory_node in parent_directory.directories:
    
    132
    -            directory = parse_to_pb2_from_fetch(remote_execution_pb2.Directory(),
    
    133
    -                                                stub_bytestream, directory_node.digest)
    
    134
    -            directory_list.extend(list_directories(directory))
    
    135
    -            directory_list.append(directory)
    
    135
    +            directories.append(remote_execution_pb2.Directory())
    
    136
    +            digests.append(directory_node.digest)
    
    137
    +
    
    138
    +        cas.get_messages(digests, directories)
    
    139
    +
    
    140
    +        for directory in directories[:]:
    
    141
    +            directories.extend(__cas_tree_maker(cas, directory))
    
    142
    +
    
    143
    +        return directories
    
    136 144
     
    
    137
    -        return directory_list
    
    145
    +    root_directory = cas.get_message(directory_digest,
    
    146
    +                                     remote_execution_pb2.Directory())
    
    138 147
     
    
    139
    -    root_directory = parse_to_pb2_from_fetch(remote_execution_pb2.Directory(),
    
    140
    -                                             stub_bytestream, directory_digest)
    
    141
    -    output_tree.children.extend(list_directories(root_directory))
    
    148
    +    output_tree.children.extend(__cas_tree_maker(cas, root_directory))
    
    142 149
         output_tree.root.CopyFrom(root_directory)
    
    143 150
     
    
    144 151
         return output_tree

  • buildgrid/_app/bots/temp_directory.py
    ... ... @@ -19,10 +19,8 @@ import tempfile
    19 19
     
    
    20 20
     from google.protobuf import any_pb2
    
    21 21
     
    
    22
    -from buildgrid.client.cas import upload
    
    22
    +from buildgrid.client.cas import download, upload
    
    23 23
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    24
    -from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
    
    25
    -from buildgrid.utils import write_fetch_directory, parse_to_pb2_from_fetch
    
    26 24
     from buildgrid.utils import output_file_maker, output_directory_maker
    
    27 25
     
    
    28 26
     
    
    ... ... @@ -30,22 +28,23 @@ def work_temp_directory(context, lease):
    30 28
         """Executes a lease for a build action, using host tools.
    
    31 29
         """
    
    32 30
     
    
    33
    -    stub_bytestream = bytestream_pb2_grpc.ByteStreamStub(context.cas_channel)
    
    34 31
         instance_name = context.parent
    
    35 32
         logger = context.logger
    
    36 33
     
    
    37 34
         action_digest = remote_execution_pb2.Digest()
    
    38 35
         lease.payload.Unpack(action_digest)
    
    39 36
     
    
    40
    -    action = parse_to_pb2_from_fetch(remote_execution_pb2.Action(),
    
    41
    -                                     stub_bytestream, action_digest, instance_name)
    
    42
    -
    
    43 37
         with tempfile.TemporaryDirectory() as temp_directory:
    
    44
    -        command = parse_to_pb2_from_fetch(remote_execution_pb2.Command(),
    
    45
    -                                          stub_bytestream, action.command_digest, instance_name)
    
    38
    +        with download(context.cas_channel, instance=instance_name) as cas:
    
    39
    +            action = cas.get_message(action_digest,
    
    40
    +                                     remote_execution_pb2.Action())
    
    41
    +
    
    42
    +            assert action.command_digest.hash
    
    43
    +
    
    44
    +            command = cas.get_message(action.command_digest,
    
    45
    +                                      remote_execution_pb2.Command())
    
    46 46
     
    
    47
    -        write_fetch_directory(temp_directory, stub_bytestream,
    
    48
    -                              action.input_root_digest, instance_name)
    
    47
    +            cas.download_directory(action.input_root_digest, temp_directory)
    
    49 48
     
    
    50 49
             environment = os.environ.copy()
    
    51 50
             for variable in command.environment_variables:
    

  • buildgrid/_app/commands/cmd_execute.py
    ... ... @@ -20,7 +20,6 @@ Execute command
    20 20
     Request work to be executed and monitor status of jobs.
    
    21 21
     """
    
    22 22
     
    
    23
    -import errno
    
    24 23
     import logging
    
    25 24
     import os
    
    26 25
     import stat
    
    ... ... @@ -30,10 +29,9 @@ from urllib.parse import urlparse
    30 29
     import click
    
    31 30
     import grpc
    
    32 31
     
    
    33
    -from buildgrid.client.cas import upload
    
    32
    +from buildgrid.client.cas import download, upload
    
    34 33
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    35
    -from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
    
    36
    -from buildgrid.utils import merkle_maker, write_fetch_blob
    
    34
    +from buildgrid.utils import merkle_maker
    
    37 35
     
    
    38 36
     from ..cli import pass_context
    
    39 37
     
    
    ... ... @@ -157,8 +155,6 @@ def run_command(context, input_root, commands, output_file, output_directory):
    157 155
                                                       skip_cache_lookup=True)
    
    158 156
         response = stub.Execute(request)
    
    159 157
     
    
    160
    -    stub = bytestream_pb2_grpc.ByteStreamStub(context.channel)
    
    161
    -
    
    162 158
         stream = None
    
    163 159
         for stream in response:
    
    164 160
             context.logger.info(stream)
    
    ... ... @@ -166,21 +162,16 @@ def run_command(context, input_root, commands, output_file, output_directory):
    166 162
         execute_response = remote_execution_pb2.ExecuteResponse()
    
    167 163
         stream.response.Unpack(execute_response)
    
    168 164
     
    
    169
    -    for output_file_response in execute_response.result.output_files:
    
    170
    -        path = os.path.join(output_directory, output_file_response.path)
    
    171
    -
    
    172
    -        if not os.path.exists(os.path.dirname(path)):
    
    165
    +    with download(context.channel, instance=context.instance_name) as cas:
    
    173 166
     
    
    174
    -            try:
    
    175
    -                os.makedirs(os.path.dirname(path))
    
    167
    +        for output_file_response in execute_response.result.output_files:
    
    168
    +            path = os.path.join(output_directory, output_file_response.path)
    
    176 169
     
    
    177
    -            except OSError as exc:
    
    178
    -                if exc.errno != errno.EEXIST:
    
    179
    -                    raise
    
    170
    +            if not os.path.exists(os.path.dirname(path)):
    
    171
    +                os.makedirs(os.path.dirname(path), exist_ok=True)
    
    180 172
     
    
    181
    -        with open(path, 'wb+') as f:
    
    182
    -            write_fetch_blob(f, stub, output_file_response.digest, context.instance_name)
    
    173
    +            cas.download_file(output_file_response.digest, path)
    
    183 174
     
    
    184
    -        if output_file_response.path in output_executeables:
    
    185
    -            st = os.stat(path)
    
    186
    -            os.chmod(path, st.st_mode | stat.S_IXUSR)
    175
    +            if output_file_response.path in output_executeables:
    
    176
    +                st = os.stat(path)
    
    177
    +                os.chmod(path, st.st_mode | stat.S_IXUSR)

  • buildgrid/_exceptions.py
    ... ... @@ -50,3 +50,27 @@ class ServerError(BgdError):
    50 50
     class BotError(BgdError):
    
    51 51
         def __init__(self, message, detail=None, reason=None):
    
    52 52
             super().__init__(message, detail=detail, domain=ErrorDomain.BOT, reason=reason)
    
    53
    +
    
    54
    +
    
    55
    +class InvalidArgumentError(BgdError):
    
    56
    +    """A bad argument was passed, such as a name which doesn't exist."""
    
    57
    +    def __init__(self, message, detail=None, reason=None):
    
    58
    +        super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
    
    59
    +
    
    60
    +
    
    61
    +class NotFoundError(BgdError):
    
    62
    +    """Requested resource not found."""
    
    63
    +    def __init__(self, message, detail=None, reason=None):
    
    64
    +        super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
    
    65
    +
    
    66
    +
    
    67
    +class OutOfSyncError(BgdError):
    
    68
    +    """The worker is out of sync with the server, such as having a differing number of leases."""
    
    69
    +    def __init__(self, message, detail=None, reason=None):
    
    70
    +        super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
    
    71
    +
    
    72
    +
    
    73
    +class OutOfRangeError(BgdError):
    
    74
    +    """ ByteStream service read data out of range."""
    
    75
    +    def __init__(self, message, detail=None, reason=None):
    
    76
    +        super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)

  • buildgrid/client/cas.py
    ... ... @@ -19,12 +19,23 @@ import os
    19 19
     
    
    20 20
     import grpc
    
    21 21
     
    
    22
    +from buildgrid._exceptions import NotFoundError
    
    22 23
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    23 24
     from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    24 25
     from buildgrid._protos.google.rpc import code_pb2
    
    25 26
     from buildgrid.settings import HASH
    
    26 27
     
    
    27 28
     
    
    29
    +# Maximum size for a queueable file:
    
    30
    +FILE_SIZE_THRESHOLD = 1 * 1024 * 1024
    
    31
    +
    
    32
    +# Maximum size for a single gRPC request:
    
    33
    +MAX_REQUEST_SIZE = 2 * 1024 * 1024
    
    34
    +
    
    35
    +# Maximum number of elements per gRPC request:
    
    36
    +MAX_REQUEST_COUNT = 500
    
    37
    +
    
    38
    +
    
    28 39
     class CallCache:
    
    29 40
         """Per remote grpc.StatusCode.UNIMPLEMENTED call cache."""
    
    30 41
         __calls = dict()
    
    ... ... @@ -42,6 +53,399 @@ class CallCache:
    42 53
             return name in cls.__calls[channel]
    
    43 54
     
    
    44 55
     
    
    56
    +@contextmanager
    
    57
    +def download(channel, instance=None, u_uid=None):
    
    58
    +    downloader = Downloader(channel, instance=instance)
    
    59
    +    try:
    
    60
    +        yield downloader
    
    61
    +    finally:
    
    62
    +        downloader.close()
    
    63
    +
    
    64
    +
    
    65
    +class Downloader:
    
    66
    +    """Remote CAS files, directories and messages download helper.
    
    67
    +
    
    68
    +    The :class:`Downloader` class comes with a generator factory function that
    
    69
    +    can be used together with the `with` statement for context management::
    
    70
    +
    
    71
    +        with download(channel, instance='build') as cas:
    
    72
    +            cas.get_message(message_digest)
    
    73
    +    """
    
    74
    +
    
    75
    +    def __init__(self, channel, instance=None):
    
    76
    +        """Initializes a new :class:`Downloader` instance.
    
    77
    +
    
    78
    +        Args:
    
    79
    +            channel (grpc.Channel): A gRPC channel to the CAS endpoint.
    
    80
    +            instance (str, optional): the targeted instance's name.
    
    81
    +        """
    
    82
    +        self.channel = channel
    
    83
    +
    
    84
    +        self.instance_name = instance
    
    85
    +
    
    86
    +        self.__bytestream_stub = bytestream_pb2_grpc.ByteStreamStub(self.channel)
    
    87
    +        self.__cas_stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
    
    88
    +
    
    89
    +        self.__file_requests = dict()
    
    90
    +        self.__file_request_count = 0
    
    91
    +        self.__file_request_size = 0
    
    92
    +        self.__file_response_size = 0
    
    93
    +
    
    94
    +    # --- Public API ---
    
    95
    +
    
    96
    +    def get_blob(self, digest):
    
    97
    +        """Retrieves a blob from the remote CAS server.
    
    98
    +
    
    99
    +        Args:
    
    100
    +            digest (:obj:`Digest`): the blob's digest to fetch.
    
    101
    +
    
    102
    +        Returns:
    
    103
    +            bytearray: the fetched blob data or None if not found.
    
    104
    +        """
    
    105
    +        try:
    
    106
    +            blob = self._fetch_blob(digest)
    
    107
    +        except NotFoundError:
    
    108
    +            return None
    
    109
    +
    
    110
    +        return blob
    
    111
    +
    
    112
    +    def get_blobs(self, digests):
    
    113
    +        """Retrieves a list of blobs from the remote CAS server.
    
    114
    +
    
    115
    +        Args:
    
    116
    +            digests (list): list of :obj:`Digest`s for the blobs to fetch.
    
    117
    +
    
    118
    +        Returns:
    
    119
    +            list: the fetched blob data list.
    
    120
    +        """
    
    121
    +        return self._fetch_blob_batch(digests)
    
    122
    +
    
    123
    +    def get_message(self, digest, message):
    
    124
    +        """Retrieves a :obj:`Message` from the remote CAS server.
    
    125
    +
    
    126
    +        Args:
    
    127
    +            digest (:obj:`Digest`): the message's digest to fetch.
    
    128
    +            message (:obj:`Message`): an empty message to fill.
    
    129
    +
    
    130
    +        Returns:
    
    131
    +            :obj:`Message`: `message` filled or emptied if not found.
    
    132
    +        """
    
    133
    +        try:
    
    134
    +            message_blob = self._fetch_blob(digest)
    
    135
    +        except NotFoundError:
    
    136
    +            message_blob = None
    
    137
    +
    
    138
    +        if message_blob is not None:
    
    139
    +            message.ParseFromString(message_blob)
    
    140
    +        else:
    
    141
    +            message.Clear()
    
    142
    +
    
    143
    +        return message
    
    144
    +
    
    145
    +    def get_messages(self, digests, messages):
    
    146
    +        """Retrieves a list of :obj:`Message`s from the remote CAS server.
    
    147
    +
    
    148
    +        Note:
    
    149
    +            The `digests` and `messages` list **must** contain the same number
    
    150
    +            of elements.
    
    151
    +
    
    152
    +        Args:
    
    153
    +            digests (list):  list of :obj:`Digest`s for the messages to fetch.
    
    154
    +            messages (list): list of empty :obj:`Message`s to fill.
    
    155
    +
    
    156
    +        Returns:
    
    157
    +            list: the fetched and filled message list.
    
    158
    +        """
    
    159
    +        assert len(digests) == len(messages)
    
    160
    +
    
    161
    +        message_blobs = self._fetch_blob_batch(digests)
    
    162
    +
    
    163
    +        assert len(message_blobs) == len(messages)
    
    164
    +
    
    165
    +        for message, message_blob in zip(messages, message_blobs):
    
    166
    +            message.ParseFromString(message_blob)
    
    167
    +
    
    168
    +        return messages
    
    169
    +
    
    170
    +    def download_file(self, digest, file_path, queue=True):
    
    171
    +        """Retrieves a file from the remote CAS server.
    
    172
    +
    
    173
    +        If queuing is allowed (`queue=True`), the download request **may** be
    
    174
    +        defer. An explicit call to :method:`flush` can force the request to be
    
    175
    +        send immediately (along with the rest of the queued batch).
    
    176
    +
    
    177
    +        Args:
    
    178
    +            digest (:obj:`Digest`): the file's digest to fetch.
    
    179
    +            file_path (str): absolute or relative path to the local file to write.
    
    180
    +            queue (bool, optional): whether or not the download request may be
    
    181
    +                queued and submitted as part of a batch upload request. Defaults
    
    182
    +                to True.
    
    183
    +
    
    184
    +        Raises:
    
    185
    +            NotFoundError: if `digest` is not present in the remote CAS server.
    
    186
    +            OSError: if `file_path` does not exist or is not readable.
    
    187
    +        """
    
    188
    +        if not os.path.isabs(file_path):
    
    189
    +            file_path = os.path.abspath(file_path)
    
    190
    +
    
    191
    +        if not queue or digest.size_bytes > FILE_SIZE_THRESHOLD:
    
    192
    +            self._fetch_file(digest, file_path)
    
    193
    +        else:
    
    194
    +            self._queue_file(digest, file_path)
    
    195
    +
    
    196
    +    def download_directory(self, digest, directory_path):
    
    197
    +        """Retrieves a :obj:`Directory` from the remote CAS server.
    
    198
    +
    
    199
    +        Args:
    
    200
    +            digest (:obj:`Digest`): the directory's digest to fetch.
    
    201
    +
    
    202
    +        Returns:
    
    203
    +            :obj:`Digest`: The digest of the :obj:`Directory`.
    
    204
    +            directory_path (str): absolute or relative path to the local
    
    205
    +                directory to write.
    
    206
    +
    
    207
    +        Raises:
    
    208
    +            NotFoundError: if `digest` is not present in the remote CAS server.
    
    209
    +            FileExistsError: if `directory_path` already contains parts of their
    
    210
    +                fetched directory's content.
    
    211
    +        """
    
    212
    +        if not os.path.isabs(directory_path):
    
    213
    +            directory_path = os.path.abspath(directory_path)
    
    214
    +
    
    215
    +        # We want to start fresh here, the rest is very synchronous...
    
    216
    +        self.flush()
    
    217
    +
    
    218
    +        self._fetch_directory(digest, directory_path)
    
    219
    +
    
    220
    +    def flush(self):
    
    221
    +        """Ensures any queued request gets sent."""
    
    222
    +        if self.__file_requests:
    
    223
    +            self._fetch_file_batch(self.__file_requests)
    
    224
    +
    
    225
    +            self.__file_requests.clear()
    
    226
    +            self.__file_request_count = 0
    
    227
    +            self.__file_request_size = 0
    
    228
    +            self.__file_response_size = 0
    
    229
    +
    
    230
    +    def close(self):
    
    231
    +        """Closes the underlying connection stubs.
    
    232
    +
    
    233
    +        Note:
    
    234
    +            This will always send pending requests before closing connections,
    
    235
    +            if any.
    
    236
    +        """
    
    237
    +        self.flush()
    
    238
    +
    
    239
    +        self.__bytestream_stub = None
    
    240
    +        self.__cas_stub = None
    
    241
    +
    
    242
    +    # --- Private API ---
    
    243
    +
    
    244
    +    def _fetch_blob(self, digest):
    
    245
    +        """Fetches a blob using ByteStream.Read()"""
    
    246
    +        read_blob = bytearray()
    
    247
    +
    
    248
    +        if self.instance_name is not None:
    
    249
    +            resource_name = '/'.join([self.instance_name, 'blobs',
    
    250
    +                                      digest.hash, str(digest.size_bytes)])
    
    251
    +        else:
    
    252
    +            resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
    
    253
    +
    
    254
    +        read_request = bytestream_pb2.ReadRequest()
    
    255
    +        read_request.resource_name = resource_name
    
    256
    +        read_request.read_offset = 0
    
    257
    +
    
    258
    +        try:
    
    259
    +            # TODO: Handle connection loss/recovery
    
    260
    +            for read_response in self.__bytestream_stub.Read(read_request):
    
    261
    +                read_blob += read_response.data
    
    262
    +
    
    263
    +            assert len(read_blob) == digest.size_bytes
    
    264
    +
    
    265
    +        except grpc.RpcError as e:
    
    266
    +            status_code = e.code()
    
    267
    +            if status_code == grpc.StatusCode.NOT_FOUND:
    
    268
    +                raise NotFoundError("Requested data does not exist on the remote.")
    
    269
    +
    
    270
    +            else:
    
    271
    +                assert False
    
    272
    +
    
    273
    +        return read_blob
    
    274
    +
    
    275
    +    def _fetch_blob_batch(self, digests):
    
    276
    +        """Fetches blobs using ContentAddressableStorage.BatchReadBlobs()"""
    
    277
    +        batch_fetched = False
    
    278
    +        read_blobs = list()
    
    279
    +
    
    280
    +        # First, try BatchReadBlobs(), if not already known not being implemented:
    
    281
    +        if not CallCache.unimplemented(self.channel, 'BatchReadBlobs'):
    
    282
    +            batch_request = remote_execution_pb2.BatchReadBlobsRequest()
    
    283
    +            batch_request.digests.extend(digests)
    
    284
    +            if self.instance_name is not None:
    
    285
    +                batch_request.instance_name = self.instance_name
    
    286
    +
    
    287
    +            try:
    
    288
    +                batch_response = self.__cas_stub.BatchReadBlobs(batch_request)
    
    289
    +                for response in batch_response.responses:
    
    290
    +                    assert response.digest.hash in digests
    
    291
    +
    
    292
    +                    read_blobs.append(response.data)
    
    293
    +
    
    294
    +                    if response.status.code != code_pb2.OK:
    
    295
    +                        assert False
    
    296
    +
    
    297
    +                batch_fetched = True
    
    298
    +
    
    299
    +            except grpc.RpcError as e:
    
    300
    +                status_code = e.code()
    
    301
    +                if status_code == grpc.StatusCode.UNIMPLEMENTED:
    
    302
    +                    CallCache.mark_unimplemented(self.channel, 'BatchReadBlobs')
    
    303
    +
    
    304
    +                else:
    
    305
    +                    assert False
    
    306
    +
    
    307
    +        # Fallback to Read() if no BatchReadBlobs():
    
    308
    +        if not batch_fetched:
    
    309
    +            for digest in digests:
    
    310
    +                read_blobs.append(self._fetch_blob(digest))
    
    311
    +
    
    312
    +        return read_blobs
    
    313
    +
    
    314
    +    def _fetch_file(self, digest, file_path):
    
    315
    +        """Fetches a file using ByteStream.Read()"""
    
    316
    +        if self.instance_name is not None:
    
    317
    +            resource_name = '/'.join([self.instance_name, 'blobs',
    
    318
    +                                      digest.hash, str(digest.size_bytes)])
    
    319
    +        else:
    
    320
    +            resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
    
    321
    +
    
    322
    +        read_request = bytestream_pb2.ReadRequest()
    
    323
    +        read_request.resource_name = resource_name
    
    324
    +        read_request.read_offset = 0
    
    325
    +
    
    326
    +        os.makedirs(os.path.dirname(file_path), exist_ok=True)
    
    327
    +
    
    328
    +        with open(file_path, 'wb') as byte_file:
    
    329
    +            # TODO: Handle connection loss/recovery
    
    330
    +            for read_response in self.__bytestream_stub.Read(read_request):
    
    331
    +                byte_file.write(read_response.data)
    
    332
    +
    
    333
    +            assert byte_file.tell() == digest.size_bytes
    
    334
    +
    
    335
    +    def _queue_file(self, digest, file_path):
    
    336
    +        """Queues a file for later batch download"""
    
    337
    +        if self.__file_request_size + digest.ByteSize() > MAX_REQUEST_SIZE:
    
    338
    +            self.flush()
    
    339
    +        elif self.__file_response_size + digest.size_bytes > MAX_REQUEST_SIZE:
    
    340
    +            self.flush()
    
    341
    +        elif self.__file_request_count >= MAX_REQUEST_COUNT:
    
    342
    +            self.flush()
    
    343
    +
    
    344
    +        self.__file_requests[digest.hash] = (digest, file_path)
    
    345
    +        self.__file_request_count += 1
    
    346
    +        self.__file_request_size += digest.ByteSize()
    
    347
    +        self.__file_response_size += digest.size_bytes
    
    348
    +
    
    349
    +    def _fetch_file_batch(self, batch):
    
    350
    +        """Sends queued data using ContentAddressableStorage.BatchReadBlobs()"""
    
    351
    +        batch_digests = [digest for digest, _ in batch.values()]
    
    352
    +        batch_blobs = self._fetch_blob_batch(batch_digests)
    
    353
    +
    
    354
    +        for (_, file_path), file_blob in zip(batch.values(), batch_blobs):
    
    355
    +            os.makedirs(os.path.dirname(file_path), exist_ok=True)
    
    356
    +
    
    357
    +            with open(file_path, 'wb') as byte_file:
    
    358
    +                byte_file.write(file_blob)
    
    359
    +
    
    360
    +    def _fetch_directory(self, digest, directory_path):
    
    361
    +        """Fetches a file using ByteStream.GetTree()"""
    
    362
    +        # Better fail early if the local root path cannot be created:
    
    363
    +        os.makedirs(directory_path, exist_ok=True)
    
    364
    +
    
    365
    +        directories = dict()
    
    366
    +        directory_fetched = False
    
    367
    +        # First, try GetTree() if not known to be unimplemented yet:
    
    368
    +        if not CallCache.unimplemented(self.channel, 'GetTree'):
    
    369
    +            tree_request = remote_execution_pb2.GetTreeRequest()
    
    370
    +            tree_request.root_digest.CopyFrom(digest)
    
    371
    +            tree_request.page_size = MAX_REQUEST_COUNT
    
    372
    +            if self.instance_name is not None:
    
    373
    +                tree_request.instance_name = self.instance_name
    
    374
    +
    
    375
    +            try:
    
    376
    +                for tree_response in self.__cas_stub.GetTree(tree_request):
    
    377
    +                    for directory in tree_response.directories:
    
    378
    +                        directory_blob = directory.SerializeToString()
    
    379
    +                        directory_hash = HASH(directory_blob).hexdigest()
    
    380
    +
    
    381
    +                        directories[directory_hash] = directory
    
    382
    +
    
    383
    +                assert digest.hash in directories
    
    384
    +
    
    385
    +                directory = directories[digest.hash]
    
    386
    +                self._write_directory(digest.hash, directory_path,
    
    387
    +                                      directories=directories, root_barrier=directory_path)
    
    388
    +
    
    389
    +                directory_fetched = True
    
    390
    +
    
    391
    +            except grpc.RpcError as e:
    
    392
    +                status_code = e.code()
    
    393
    +                if status_code == grpc.StatusCode.UNIMPLEMENTED:
    
    394
    +                    CallCache.mark_unimplemented(self.channel, 'BatchUpdateBlobs')
    
    395
    +
    
    396
    +                elif status_code == grpc.StatusCode.NOT_FOUND:
    
    397
    +                    raise NotFoundError("Requested directory does not exist on the remote.")
    
    398
    +
    
    399
    +                else:
    
    400
    +                    assert False
    
    401
    +
    
    402
    +        # TODO: Try with BatchReadBlobs().
    
    403
    +
    
    404
    +        # Fallback to Read() if no GetTree():
    
    405
    +        if not directory_fetched:
    
    406
    +            directory = remote_execution_pb2.Directory()
    
    407
    +            directory.ParseFromString(self._fetch_blob(digest))
    
    408
    +
    
    409
    +            self._write_directory(directory, directory_path,
    
    410
    +                                  root_barrier=directory_path)
    
    411
    +
    
    412
    +    def _write_directory(self, root_directory, root_path, directories=None, root_barrier=None):
    
    413
    +        """Generates a local directory structure"""
    
    414
    +        for file_node in root_directory.files:
    
    415
    +            file_path = os.path.join(root_path, file_node.name)
    
    416
    +
    
    417
    +            self._queue_file(file_node.digest, file_path)
    
    418
    +
    
    419
    +        for directory_node in root_directory.directories:
    
    420
    +            directory_path = os.path.join(root_path, directory_node.name)
    
    421
    +            if directories and directory_node.digest.hash in directories:
    
    422
    +                directory = directories[directory_node.digest.hash]
    
    423
    +            else:
    
    424
    +                directory = remote_execution_pb2.Directory()
    
    425
    +                directory.ParseFromString(self._fetch_blob(directory_node.digest))
    
    426
    +
    
    427
    +            os.makedirs(directory_path, exist_ok=True)
    
    428
    +
    
    429
    +            self._write_directory(directory, directory_path,
    
    430
    +                                  directories=directories, root_barrier=root_barrier)
    
    431
    +
    
    432
    +        for symlink_node in root_directory.symlinks:
    
    433
    +            symlink_path = os.path.join(root_path, symlink_node.name)
    
    434
    +            if not os.path.isabs(symlink_node.target):
    
    435
    +                target_path = os.path.join(root_path, symlink_node.target)
    
    436
    +            else:
    
    437
    +                target_path = symlink_node.target
    
    438
    +            target_path = os.path.normpath(target_path)
    
    439
    +
    
    440
    +            # Do not create links pointing outside the barrier:
    
    441
    +            if root_barrier is not None:
    
    442
    +                common_path = os.path.commonprefix([root_barrier, target_path])
    
    443
    +                if not common_path.startswith(root_barrier):
    
    444
    +                    continue
    
    445
    +
    
    446
    +            os.symlink(symlink_path, target_path)
    
    447
    +
    
    448
    +
    
    45 449
     @contextmanager
    
    46 450
     def upload(channel, instance=None, u_uid=None):
    
    47 451
         uploader = Uploader(channel, instance=instance, u_uid=u_uid)
    
    ... ... @@ -59,16 +463,8 @@ class Uploader:
    59 463
     
    
    60 464
             with upload(channel, instance='build') as cas:
    
    61 465
                 cas.upload_file('/path/to/local/file')
    
    62
    -
    
    63
    -    Attributes:
    
    64
    -        FILE_SIZE_THRESHOLD (int): maximum size for a queueable file.
    
    65
    -        MAX_REQUEST_SIZE (int): maximum size for a single gRPC request.
    
    66 466
         """
    
    67 467
     
    
    68
    -    FILE_SIZE_THRESHOLD = 1 * 1024 * 1024
    
    69
    -    MAX_REQUEST_SIZE = 2 * 1024 * 1024
    
    70
    -    MAX_REQUEST_COUNT = 500
    
    71
    -
    
    72 468
         def __init__(self, channel, instance=None, u_uid=None):
    
    73 469
             """Initializes a new :class:`Uploader` instance.
    
    74 470
     
    
    ... ... @@ -111,7 +507,7 @@ class Uploader:
    111 507
             Returns:
    
    112 508
                 :obj:`Digest`: the sent blob's digest.
    
    113 509
             """
    
    114
    -        if not queue or len(blob) > Uploader.FILE_SIZE_THRESHOLD:
    
    510
    +        if not queue or len(blob) > FILE_SIZE_THRESHOLD:
    
    115 511
                 blob_digest = self._send_blob(blob, digest=digest)
    
    116 512
             else:
    
    117 513
                 blob_digest = self._queue_blob(blob, digest=digest)
    
    ... ... @@ -137,7 +533,7 @@ class Uploader:
    137 533
             """
    
    138 534
             message_blob = message.SerializeToString()
    
    139 535
     
    
    140
    -        if not queue or len(message_blob) > Uploader.FILE_SIZE_THRESHOLD:
    
    536
    +        if not queue or len(message_blob) > FILE_SIZE_THRESHOLD:
    
    141 537
                 message_digest = self._send_blob(message_blob, digest=digest)
    
    142 538
             else:
    
    143 539
                 message_digest = self._queue_blob(message_blob, digest=digest)
    
    ... ... @@ -169,7 +565,7 @@ class Uploader:
    169 565
             with open(file_path, 'rb') as bytes_steam:
    
    170 566
                 file_bytes = bytes_steam.read()
    
    171 567
     
    
    172
    -        if not queue or len(file_bytes) > Uploader.FILE_SIZE_THRESHOLD:
    
    568
    +        if not queue or len(file_bytes) > FILE_SIZE_THRESHOLD:
    
    173 569
                 file_digest = self._send_blob(file_bytes)
    
    174 570
             else:
    
    175 571
                 file_digest = self._queue_blob(file_bytes)
    
    ... ... @@ -274,9 +670,9 @@ class Uploader:
    274 670
                 blob_digest.hash = HASH(blob).hexdigest()
    
    275 671
                 blob_digest.size_bytes = len(blob)
    
    276 672
     
    
    277
    -        if self.__request_size + blob_digest.size_bytes > Uploader.MAX_REQUEST_SIZE:
    
    673
    +        if self.__request_size + blob_digest.size_bytes > MAX_REQUEST_SIZE:
    
    278 674
                 self.flush()
    
    279
    -        elif self.__request_count >= Uploader.MAX_REQUEST_COUNT:
    
    675
    +        elif self.__request_count >= MAX_REQUEST_COUNT:
    
    280 676
                 self.flush()
    
    281 677
     
    
    282 678
             self.__requests[blob_digest.hash] = (blob, blob_digest)
    

  • buildgrid/server/_exceptions.py deleted
    1
    -# Copyright (C) 2018 Bloomberg LP
    
    2
    -#
    
    3
    -# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    -# you may not use this file except in compliance with the License.
    
    5
    -# You may obtain a copy of the License at
    
    6
    -#
    
    7
    -#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    -#
    
    9
    -# Unless required by applicable law or agreed to in writing, software
    
    10
    -# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    -# See the License for the specific language governing permissions and
    
    13
    -# limitations under the License.
    
    14
    -
    
    15
    -
    
    16
    -from .._exceptions import BgdError, ErrorDomain
    
    17
    -
    
    18
    -
    
    19
    -class InvalidArgumentError(BgdError):
    
    20
    -    """A bad argument was passed, such as a name which doesn't exist.
    
    21
    -    """
    
    22
    -
    
    23
    -    def __init__(self, message, detail=None, reason=None):
    
    24
    -        super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
    
    25
    -
    
    26
    -
    
    27
    -class NotFoundError(BgdError):
    
    28
    -    """Requested resource not found.
    
    29
    -    """
    
    30
    -
    
    31
    -    def __init__(self, message, detail=None, reason=None):
    
    32
    -        super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
    
    33
    -
    
    34
    -
    
    35
    -class OutofSyncError(BgdError):
    
    36
    -    """The worker is out of sync with the server, such as having a differing number of leases.
    
    37
    -    """
    
    38
    -
    
    39
    -    def __init__(self, message, detail=None, reason=None):
    
    40
    -        super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
    
    41
    -
    
    42
    -
    
    43
    -class OutOfRangeError(BgdError):
    
    44
    -    """ ByteStream service read data out of range
    
    45
    -    """
    
    46
    -
    
    47
    -    def __init__(self, message, detail=None, reason=None):
    
    48
    -        super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)

  • buildgrid/server/actioncache/service.py
    ... ... @@ -24,11 +24,10 @@ import logging
    24 24
     
    
    25 25
     import grpc
    
    26 26
     
    
    27
    +from buildgrid._exceptions import InvalidArgumentError, NotFoundError
    
    27 28
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    28 29
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
    
    29 30
     
    
    30
    -from .._exceptions import InvalidArgumentError, NotFoundError
    
    31
    -
    
    32 31
     
    
    33 32
     class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer):
    
    34 33
     
    

  • buildgrid/server/bots/instance.py
    ... ... @@ -23,7 +23,8 @@ Instance of the Remote Workers interface.
    23 23
     import logging
    
    24 24
     import uuid
    
    25 25
     
    
    26
    -from .._exceptions import InvalidArgumentError, OutofSyncError
    
    26
    +from buildgrid._exceptions import InvalidArgumentError, OutOfSyncError
    
    27
    +
    
    27 28
     from ..job import LeaseState
    
    28 29
     
    
    29 30
     
    
    ... ... @@ -105,7 +106,7 @@ class BotsInterface:
    105 106
                     # TODO: Lease was rejected
    
    106 107
                     raise NotImplementedError("'Not Accepted' is unsupported")
    
    107 108
                 else:
    
    108
    -                raise OutofSyncError("Server lease: {}. Client lease: {}".format(server_lease, client_lease))
    
    109
    +                raise OutOfSyncError("Server lease: {}. Client lease: {}".format(server_lease, client_lease))
    
    109 110
     
    
    110 111
             elif server_state == LeaseState.ACTIVE:
    
    111 112
     
    
    ... ... @@ -118,17 +119,17 @@ class BotsInterface:
    118 119
                     return None
    
    119 120
     
    
    120 121
                 else:
    
    121
    -                raise OutofSyncError("Server lease: {}. Client lease: {}".format(server_lease, client_lease))
    
    122
    +                raise OutOfSyncError("Server lease: {}. Client lease: {}".format(server_lease, client_lease))
    
    122 123
     
    
    123 124
             elif server_state == LeaseState.COMPLETED:
    
    124
    -            raise OutofSyncError("Server lease: {}. Client lease: {}".format(server_lease, client_lease))
    
    125
    +            raise OutOfSyncError("Server lease: {}. Client lease: {}".format(server_lease, client_lease))
    
    125 126
     
    
    126 127
             elif server_state == LeaseState.CANCELLED:
    
    127 128
                 raise NotImplementedError("Cancelled states not supported yet")
    
    128 129
     
    
    129 130
             else:
    
    130 131
                 # Sould never get here
    
    131
    -            raise OutofSyncError("State now allowed: {}".format(server_state))
    
    132
    +            raise OutOfSyncError("State now allowed: {}".format(server_state))
    
    132 133
     
    
    133 134
             return client_lease
    
    134 135
     
    

  • buildgrid/server/bots/service.py
    ... ... @@ -25,11 +25,10 @@ import grpc
    25 25
     
    
    26 26
     from google.protobuf.empty_pb2 import Empty
    
    27 27
     
    
    28
    +from buildgrid._exceptions import InvalidArgumentError, OutOfSyncError
    
    28 29
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    29 30
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc
    
    30 31
     
    
    31
    -from .._exceptions import InvalidArgumentError, OutofSyncError
    
    32
    -
    
    33 32
     
    
    34 33
     class BotsService(bots_pb2_grpc.BotsServicer):
    
    35 34
     
    
    ... ... @@ -69,7 +68,7 @@ class BotsService(bots_pb2_grpc.BotsServicer):
    69 68
                 context.set_details(str(e))
    
    70 69
                 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    71 70
     
    
    72
    -        except OutofSyncError as e:
    
    71
    +        except OutOfSyncError as e:
    
    73 72
                 self.logger.error(e)
    
    74 73
                 context.set_details(str(e))
    
    75 74
                 context.set_code(grpc.StatusCode.DATA_LOSS)
    

  • buildgrid/server/cas/instance.py
    ... ... @@ -19,11 +19,10 @@ Storage Instances
    19 19
     Instances of CAS and ByteStream
    
    20 20
     """
    
    21 21
     
    
    22
    +from buildgrid._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
    
    22 23
     from buildgrid._protos.google.bytestream import bytestream_pb2
    
    23 24
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
    
    24
    -
    
    25
    -from .._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
    
    26
    -from ...settings import HASH
    
    25
    +from buildgrid.settings import HASH
    
    27 26
     
    
    28 27
     
    
    29 28
     class ContentAddressableStorageInstance:
    

  • buildgrid/server/cas/service.py
    ... ... @@ -26,12 +26,11 @@ import logging
    26 26
     
    
    27 27
     import grpc
    
    28 28
     
    
    29
    +from buildgrid._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
    
    29 30
     from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    30 31
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    31 32
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
    
    32 33
     
    
    33
    -from .._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
    
    34
    -
    
    35 34
     
    
    36 35
     class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
    
    37 36
     
    

  • buildgrid/server/cas/storage/remote.py
    ... ... @@ -23,14 +23,10 @@ Forwwards storage requests to a remote storage.
    23 23
     import io
    
    24 24
     import logging
    
    25 25
     
    
    26
    -import grpc
    
    27
    -
    
    28
    -from buildgrid.client.cas import upload
    
    29
    -from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
    
    26
    +from buildgrid.client.cas import download, upload
    
    30 27
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    31 28
     from buildgrid._protos.google.rpc import code_pb2
    
    32 29
     from buildgrid._protos.google.rpc import status_pb2
    
    33
    -from buildgrid.utils import gen_fetch_blob
    
    34 30
     
    
    35 31
     from .storage_abc import StorageABC
    
    36 32
     
    
    ... ... @@ -43,7 +39,6 @@ class RemoteStorage(StorageABC):
    43 39
             self.instance_name = instance_name
    
    44 40
             self.channel = channel
    
    45 41
     
    
    46
    -        self._stub_bs = bytestream_pb2_grpc.ByteStreamStub(channel)
    
    47 42
             self._stub_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(channel)
    
    48 43
     
    
    49 44
         def has_blob(self, digest):
    
    ... ... @@ -52,29 +47,8 @@ class RemoteStorage(StorageABC):
    52 47
             return False
    
    53 48
     
    
    54 49
         def get_blob(self, digest):
    
    55
    -        try:
    
    56
    -            fetched_data = io.BytesIO()
    
    57
    -            length = 0
    
    58
    -
    
    59
    -            for data in gen_fetch_blob(self._stub_bs, digest, self.instance_name):
    
    60
    -                length += fetched_data.write(data)
    
    61
    -
    
    62
    -            if length:
    
    63
    -                assert digest.size_bytes == length
    
    64
    -                fetched_data.seek(0)
    
    65
    -                return fetched_data
    
    66
    -
    
    67
    -            else:
    
    68
    -                return None
    
    69
    -
    
    70
    -        except grpc.RpcError as e:
    
    71
    -            if e.code() == grpc.StatusCode.NOT_FOUND:
    
    72
    -                pass
    
    73
    -            else:
    
    74
    -                self.logger.error(e.details())
    
    75
    -                raise
    
    76
    -
    
    77
    -        return None
    
    50
    +        with download(self.channel, instance=self.instance_name) as cas:
    
    51
    +            return io.BytesIO(cas.get_blob(digest))
    
    78 52
     
    
    79 53
         def begin_write(self, digest):
    
    80 54
             return io.BytesIO()
    

  • buildgrid/server/execution/instance.py
    ... ... @@ -21,10 +21,10 @@ An instance of the Remote Execution Service.
    21 21
     
    
    22 22
     import logging
    
    23 23
     
    
    24
    +from buildgrid._exceptions import InvalidArgumentError
    
    24 25
     from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
    
    25 26
     
    
    26 27
     from ..job import Job
    
    27
    -from .._exceptions import InvalidArgumentError
    
    28 28
     
    
    29 29
     
    
    30 30
     class ExecutionInstance:
    

  • buildgrid/server/execution/service.py
    ... ... @@ -26,12 +26,10 @@ from functools import partial
    26 26
     
    
    27 27
     import grpc
    
    28 28
     
    
    29
    +from buildgrid._exceptions import InvalidArgumentError
    
    29 30
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
    
    30
    -
    
    31 31
     from buildgrid._protos.google.longrunning import operations_pb2
    
    32 32
     
    
    33
    -from .._exceptions import InvalidArgumentError
    
    34
    -
    
    35 33
     
    
    36 34
     class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    
    37 35
     
    

  • buildgrid/server/operations/instance.py
    ... ... @@ -21,7 +21,7 @@ An instance of the LongRunningOperations Service.
    21 21
     
    
    22 22
     import logging
    
    23 23
     
    
    24
    -from .._exceptions import InvalidArgumentError
    
    24
    +from buildgrid._exceptions import InvalidArgumentError
    
    25 25
     
    
    26 26
     
    
    27 27
     class OperationsInstance:
    

  • buildgrid/server/operations/service.py
    ... ... @@ -25,10 +25,9 @@ import grpc
    25 25
     
    
    26 26
     from google.protobuf.empty_pb2 import Empty
    
    27 27
     
    
    28
    +from buildgrid._exceptions import InvalidArgumentError
    
    28 29
     from buildgrid._protos.google.longrunning import operations_pb2_grpc, operations_pb2
    
    29 30
     
    
    30
    -from .._exceptions import InvalidArgumentError
    
    31
    -
    
    32 31
     
    
    33 32
     class OperationsService(operations_pb2_grpc.OperationsServicer):
    
    34 33
     
    

  • buildgrid/server/referencestorage/service.py
    ... ... @@ -17,11 +17,10 @@ import logging
    17 17
     
    
    18 18
     import grpc
    
    19 19
     
    
    20
    +from buildgrid._exceptions import InvalidArgumentError, NotFoundError
    
    20 21
     from buildgrid._protos.buildstream.v2 import buildstream_pb2
    
    21 22
     from buildgrid._protos.buildstream.v2 import buildstream_pb2_grpc
    
    22 23
     
    
    23
    -from .._exceptions import InvalidArgumentError, NotFoundError
    
    24
    -
    
    25 24
     
    
    26 25
     class ReferenceStorageService(buildstream_pb2_grpc.ReferenceStorageServicer):
    
    27 26
     
    

  • buildgrid/server/referencestorage/storage.py
    ... ... @@ -24,10 +24,9 @@ For a given key, it
    24 24
     
    
    25 25
     import collections
    
    26 26
     
    
    27
    +from buildgrid._exceptions import NotFoundError
    
    27 28
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    28 29
     
    
    29
    -from .._exceptions import NotFoundError
    
    30
    -
    
    31 30
     
    
    32 31
     class ReferenceCache:
    
    33 32
     
    

  • buildgrid/server/scheduler.py
    ... ... @@ -26,7 +26,7 @@ from collections import deque
    26 26
     from google.protobuf import any_pb2
    
    27 27
     
    
    28 28
     
    
    29
    -from buildgrid.server._exceptions import NotFoundError
    
    29
    +from buildgrid._exceptions import NotFoundError
    
    30 30
     from buildgrid._protos.google.longrunning import operations_pb2
    
    31 31
     
    
    32 32
     from .job import ExecuteStage, LeaseState
    

  • buildgrid/utils.py
    ... ... @@ -18,87 +18,6 @@ import os
    18 18
     
    
    19 19
     from buildgrid.settings import HASH
    
    20 20
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    21
    -from buildgrid._protos.google.bytestream import bytestream_pb2
    
    22
    -
    
    23
    -
    
    24
    -def gen_fetch_blob(stub, digest, instance_name=""):
    
    25
    -    """ Generates byte stream from a fetch blob request
    
    26
    -    """
    
    27
    -
    
    28
    -    resource_name = os.path.join(instance_name, 'blobs', digest.hash, str(digest.size_bytes))
    
    29
    -    request = bytestream_pb2.ReadRequest(resource_name=resource_name,
    
    30
    -                                         read_offset=0)
    
    31
    -
    
    32
    -    for response in stub.Read(request):
    
    33
    -        yield response.data
    
    34
    -
    
    35
    -
    
    36
    -def write_fetch_directory(root_directory, stub, digest, instance_name=None):
    
    37
    -    """Locally replicates a directory from CAS.
    
    38
    -
    
    39
    -    Args:
    
    40
    -        root_directory (str): local directory to populate.
    
    41
    -        stub (): gRPC stub for CAS communication.
    
    42
    -        digest (Digest): digest for the directory to fetch from CAS.
    
    43
    -        instance_name (str, optional): farm instance name to query data from.
    
    44
    -    """
    
    45
    -
    
    46
    -    if not os.path.isabs(root_directory):
    
    47
    -        root_directory = os.path.abspath(root_directory)
    
    48
    -    if not os.path.exists(root_directory):
    
    49
    -        os.makedirs(root_directory, exist_ok=True)
    
    50
    -
    
    51
    -    directory = parse_to_pb2_from_fetch(remote_execution_pb2.Directory(),
    
    52
    -                                        stub, digest, instance_name)
    
    53
    -
    
    54
    -    for directory_node in directory.directories:
    
    55
    -        child_path = os.path.join(root_directory, directory_node.name)
    
    56
    -
    
    57
    -        write_fetch_directory(child_path, stub, directory_node.digest, instance_name)
    
    58
    -
    
    59
    -    for file_node in directory.files:
    
    60
    -        child_path = os.path.join(root_directory, file_node.name)
    
    61
    -
    
    62
    -        with open(child_path, 'wb') as child_file:
    
    63
    -            write_fetch_blob(child_file, stub, file_node.digest, instance_name)
    
    64
    -
    
    65
    -    for symlink_node in directory.symlinks:
    
    66
    -        child_path = os.path.join(root_directory, symlink_node.name)
    
    67
    -
    
    68
    -        if os.path.isabs(symlink_node.target):
    
    69
    -            continue  # No out of temp-directory links for now.
    
    70
    -        target_path = os.path.join(root_directory, symlink_node.target)
    
    71
    -
    
    72
    -        os.symlink(child_path, target_path)
    
    73
    -
    
    74
    -
    
    75
    -def write_fetch_blob(target_file, stub, digest, instance_name=None):
    
    76
    -    """Extracts a blob from CAS into a local file.
    
    77
    -
    
    78
    -    Args:
    
    79
    -        target_file (str): local file to write.
    
    80
    -        stub (): gRPC stub for CAS communication.
    
    81
    -        digest (Digest): digest for the blob to fetch from CAS.
    
    82
    -        instance_name (str, optional): farm instance name to query data from.
    
    83
    -    """
    
    84
    -
    
    85
    -    for stream in gen_fetch_blob(stub, digest, instance_name):
    
    86
    -        target_file.write(stream)
    
    87
    -    target_file.flush()
    
    88
    -
    
    89
    -    assert digest.size_bytes == os.fstat(target_file.fileno()).st_size
    
    90
    -
    
    91
    -
    
    92
    -def parse_to_pb2_from_fetch(pb2, stub, digest, instance_name=""):
    
    93
    -    """ Fetches stream and parses it into given pb2
    
    94
    -    """
    
    95
    -
    
    96
    -    stream_bytes = b''
    
    97
    -    for stream in gen_fetch_blob(stub, digest, instance_name):
    
    98
    -        stream_bytes += stream
    
    99
    -
    
    100
    -    pb2.ParseFromString(stream_bytes)
    
    101
    -    return pb2
    
    102 21
     
    
    103 22
     
    
    104 23
     def create_digest(bytes_to_digest):
    

  • tests/action_cache.py
    ... ... @@ -17,10 +17,10 @@
    17 17
     
    
    18 18
     import pytest
    
    19 19
     
    
    20
    +from buildgrid._exceptions import NotFoundError
    
    21
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    20 22
     from buildgrid.server.actioncache.storage import ActionCache
    
    21 23
     from buildgrid.server.cas.storage import lru_memory_cache
    
    22
    -from buildgrid.server._exceptions import NotFoundError
    
    23
    -from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    24 24
     
    
    25 25
     
    
    26 26
     @pytest.fixture
    

  • tests/cas/data/hello.cc
    1
    +#include <iostream>
    
    2
    +
    
    3
    +int main()
    
    4
    +{
    
    5
    +  std::cout << "Hello, World!" << std::endl;
    
    6
    +  return 0;
    
    7
    +}

  • tests/cas/data/hello/hello.c
    1
    +#include <stdio.h>
    
    2
    +
    
    3
    +#include "hello.h"
    
    4
    +
    
    5
    +int main()
    
    6
    +{
    
    7
    +  printf("%s\n", HELLO_WORLD);
    
    8
    +  return 0;
    
    9
    +}

  • tests/cas/data/hello/hello.h
    1
    +#define HELLO_WORLD "Hello, World!"

  • tests/cas/data/void

  • tests/cas/test_client.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +# pylint: disable=redefined-outer-name
    
    16
    +
    
    17
    +
    
    18
    +from copy import deepcopy
    
    19
    +import multiprocessing
    
    20
    +import os
    
    21
    +import tempfile
    
    22
    +
    
    23
    +import grpc
    
    24
    +import pytest
    
    25
    +
    
    26
    +from buildgrid.client.cas import download, upload
    
    27
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    28
    +
    
    29
    +from ..utils.cas import serve_cas, kill_process_tree
    
    30
    +
    
    31
    +
    
    32
    +INTANCES = ['', 'instance']
    
    33
    +BLOBS = [(b'',), (b'test-string',), (b'test', b'string')]
    
    34
    +MESSAGES = [
    
    35
    +    (remote_execution_pb2.Directory(),),
    
    36
    +    (remote_execution_pb2.SymlinkNode(name='name', target='target'),),
    
    37
    +    (remote_execution_pb2.Action(do_not_cache=True),
    
    38
    +     remote_execution_pb2.ActionResult(exit_code=12))
    
    39
    +]
    
    40
    +DATA_DIR = os.path.join(
    
    41
    +    os.path.dirname(os.path.realpath(__file__)), 'data')
    
    42
    +FILES = [
    
    43
    +    (os.path.join(DATA_DIR, 'void'),),
    
    44
    +    (os.path.join(DATA_DIR, 'hello.cc'),),
    
    45
    +    (os.path.join(DATA_DIR, 'hello', 'hello.c'),
    
    46
    +     os.path.join(DATA_DIR, 'hello', 'hello.h'))]
    
    47
    +FOLDERS = [
    
    48
    +    (os.path.join(DATA_DIR, 'hello'),)]
    
    49
    +DIRECTORIES = [
    
    50
    +    (remote_execution_pb2.Directory(),),
    
    51
    +    (remote_execution_pb2.Directory(
    
    52
    +        files=[remote_execution_pb2.FileNode(name='helloc.c'),
    
    53
    +               remote_execution_pb2.FileNode(name='helloc.h')]),)]
    
    54
    +
    
    55
    +
    
    56
    +def run_in_subprocess(function, *arguments):
    
    57
    +    queue = multiprocessing.Queue()
    
    58
    +    # Use subprocess to avoid creation of gRPC threads in main process
    
    59
    +    # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md
    
    60
    +    process = multiprocessing.Process(target=function,
    
    61
    +                                      args=(queue, *arguments))
    
    62
    +
    
    63
    +    try:
    
    64
    +        process.start()
    
    65
    +
    
    66
    +        result = queue.get()
    
    67
    +        process.join()
    
    68
    +    except KeyboardInterrupt:
    
    69
    +        kill_process_tree(process.pid)
    
    70
    +        raise
    
    71
    +
    
    72
    +    return result
    
    73
    +
    
    74
    +
    
    75
    +@pytest.mark.parametrize('blobs', BLOBS)
    
    76
    +@pytest.mark.parametrize('instance', INTANCES)
    
    77
    +def test_upload_blob(instance, blobs):
    
    78
    +    # Actual test function, to be run in a subprocess:
    
    79
    +    def __test_upload_blob(queue, remote, instance, blobs):
    
    80
    +        # Open a channel to the remote CAS server:
    
    81
    +        channel = grpc.insecure_channel(remote)
    
    82
    +
    
    83
    +        digests = list()
    
    84
    +        with upload(channel, instance) as client:
    
    85
    +            if len(blobs) > 1:
    
    86
    +                for blob in blobs:
    
    87
    +                    digest = client.put_blob(blob, queue=True)
    
    88
    +                    digests.append(digest.SerializeToString())
    
    89
    +            else:
    
    90
    +                digest = client.put_blob(blobs[0], queue=False)
    
    91
    +                digests.append(digest.SerializeToString())
    
    92
    +
    
    93
    +        queue.put(digests)
    
    94
    +
    
    95
    +    # Start a minimal CAS server in a subprocess:
    
    96
    +    with serve_cas([instance]) as server:
    
    97
    +        digests = run_in_subprocess(__test_upload_blob,
    
    98
    +                                    server.remote, instance, blobs)
    
    99
    +
    
    100
    +        for blob, digest_blob in zip(blobs, digests):
    
    101
    +            digest = remote_execution_pb2.Digest()
    
    102
    +            digest.ParseFromString(digest_blob)
    
    103
    +
    
    104
    +            assert server.has(digest)
    
    105
    +            assert server.compare_blobs(digest, blob)
    
    106
    +
    
    107
    +
    
    108
    +@pytest.mark.parametrize('messages', MESSAGES)
    
    109
    +@pytest.mark.parametrize('instance', INTANCES)
    
    110
    +def test_upload_message(instance, messages):
    
    111
    +    # Actual test function, to be run in a subprocess:
    
    112
    +    def __test_upload_message(queue, remote, instance, messages):
    
    113
    +        # Open a channel to the remote CAS server:
    
    114
    +        channel = grpc.insecure_channel(remote)
    
    115
    +
    
    116
    +        digests = list()
    
    117
    +        with upload(channel, instance) as client:
    
    118
    +            if len(messages) > 1:
    
    119
    +                for message in messages:
    
    120
    +                    digest = client.put_message(message, queue=True)
    
    121
    +                    digests.append(digest.SerializeToString())
    
    122
    +            else:
    
    123
    +                digest = client.put_message(messages[0], queue=False)
    
    124
    +                digests.append(digest.SerializeToString())
    
    125
    +
    
    126
    +        queue.put(digests)
    
    127
    +
    
    128
    +    # Start a minimal CAS server in a subprocess:
    
    129
    +    with serve_cas([instance]) as server:
    
    130
    +        digests = run_in_subprocess(__test_upload_message,
    
    131
    +                                    server.remote, instance, messages)
    
    132
    +
    
    133
    +        for message, digest_blob in zip(messages, digests):
    
    134
    +            digest = remote_execution_pb2.Digest()
    
    135
    +            digest.ParseFromString(digest_blob)
    
    136
    +
    
    137
    +            assert server.has(digest)
    
    138
    +            assert server.compare_messages(digest, message)
    
    139
    +
    
    140
    +
    
    141
    +@pytest.mark.parametrize('file_paths', FILES)
    
    142
    +@pytest.mark.parametrize('instance', INTANCES)
    
    143
    +def test_upload_file(instance, file_paths):
    
    144
    +    # Actual test function, to be run in a subprocess:
    
    145
    +    def __test_upload_file(queue, remote, instance, file_paths):
    
    146
    +        # Open a channel to the remote CAS server:
    
    147
    +        channel = grpc.insecure_channel(remote)
    
    148
    +
    
    149
    +        digests = list()
    
    150
    +        with upload(channel, instance) as client:
    
    151
    +            if len(file_paths) > 1:
    
    152
    +                for file_path in file_paths:
    
    153
    +                    digest = client.upload_file(file_path, queue=True)
    
    154
    +                    digests.append(digest.SerializeToString())
    
    155
    +            else:
    
    156
    +                digest = client.upload_file(file_paths[0], queue=False)
    
    157
    +                digests.append(digest.SerializeToString())
    
    158
    +
    
    159
    +        queue.put(digests)
    
    160
    +
    
    161
    +    # Start a minimal CAS server in a subprocess:
    
    162
    +    with serve_cas([instance]) as server:
    
    163
    +        digests = run_in_subprocess(__test_upload_file,
    
    164
    +                                    server.remote, instance, file_paths)
    
    165
    +
    
    166
    +        for file_path, digest_blob in zip(file_paths, digests):
    
    167
    +            digest = remote_execution_pb2.Digest()
    
    168
    +            digest.ParseFromString(digest_blob)
    
    169
    +
    
    170
    +            assert server.has(digest)
    
    171
    +            assert server.compare_files(digest, file_path)
    
    172
    +
    
    173
    +
    
    174
    +@pytest.mark.parametrize('directories', DIRECTORIES)
    
    175
    +@pytest.mark.parametrize('instance', INTANCES)
    
    176
    +def test_upload_directory(instance, directories):
    
    177
    +    # Actual test function, to be run in a subprocess:
    
    178
    +    def __test_upload_directory(queue, remote, instance, directories):
    
    179
    +        # Open a channel to the remote CAS server:
    
    180
    +        channel = grpc.insecure_channel(remote)
    
    181
    +
    
    182
    +        digests = list()
    
    183
    +        with upload(channel, instance) as client:
    
    184
    +            if len(directories) > 1:
    
    185
    +                for directory in directories:
    
    186
    +                    digest = client.upload_directory(directory, queue=True)
    
    187
    +                    digests.append(digest.SerializeToString())
    
    188
    +            else:
    
    189
    +                digest = client.upload_directory(directories[0], queue=False)
    
    190
    +                digests.append(digest.SerializeToString())
    
    191
    +
    
    192
    +        queue.put(digests)
    
    193
    +
    
    194
    +    # Start a minimal CAS server in a subprocess:
    
    195
    +    with serve_cas([instance]) as server:
    
    196
    +        digests = run_in_subprocess(__test_upload_directory,
    
    197
    +                                    server.remote, instance, directories)
    
    198
    +
    
    199
    +        for directory, digest_blob in zip(directories, digests):
    
    200
    +            digest = remote_execution_pb2.Digest()
    
    201
    +            digest.ParseFromString(digest_blob)
    
    202
    +
    
    203
    +            assert server.has(digest)
    
    204
    +            assert server.compare_messages(digest, directory)
    
    205
    +
    
    206
    +
    
    207
    +@pytest.mark.parametrize('blobs', BLOBS)
    
    208
    +@pytest.mark.parametrize('instance', INTANCES)
    
    209
    +def test_download_blob(instance, blobs):
    
    210
    +    # Actual test function, to be run in a subprocess:
    
    211
    +    def __test_download_blob(queue, remote, instance, digests):
    
    212
    +        # Open a channel to the remote CAS server:
    
    213
    +        channel = grpc.insecure_channel(remote)
    
    214
    +
    
    215
    +        blobs = list()
    
    216
    +        with download(channel, instance) as client:
    
    217
    +            if len(digests) > 1:
    
    218
    +                blobs.extend(client.get_blobs(digests))
    
    219
    +            else:
    
    220
    +                blobs.append(client.get_blob(digests[0]))
    
    221
    +
    
    222
    +        queue.put(blobs)
    
    223
    +
    
    224
    +    # Start a minimal CAS server in a subprocess:
    
    225
    +    with serve_cas([instance]) as server:
    
    226
    +        digests = list()
    
    227
    +        for blob in blobs:
    
    228
    +            digest = server.store_blob(blob)
    
    229
    +            digests.append(digest)
    
    230
    +
    
    231
    +        blobs = run_in_subprocess(__test_download_blob,
    
    232
    +                                  server.remote, instance, digests)
    
    233
    +
    
    234
    +        for digest, blob in zip(digests, blobs):
    
    235
    +            assert server.compare_blobs(digest, blob)
    
    236
    +
    
    237
    +
    
    238
    +@pytest.mark.parametrize('messages', MESSAGES)
    
    239
    +@pytest.mark.parametrize('instance', INTANCES)
    
    240
    +def test_download_message(instance, messages):
    
    241
    +    # Actual test function, to be run in a subprocess:
    
    242
    +    def __test_download_message(queue, remote, instance, digests, empty_messages):
    
    243
    +        # Open a channel to the remote CAS server:
    
    244
    +        channel = grpc.insecure_channel(remote)
    
    245
    +
    
    246
    +        messages = list()
    
    247
    +        with download(channel, instance) as client:
    
    248
    +            if len(digests) > 1:
    
    249
    +                messages = client.get_messages(digests, empty_messages)
    
    250
    +                messages = list([m.SerializeToString() for m in messages])
    
    251
    +            else:
    
    252
    +                message = client.get_message(digests[0], empty_messages[0])
    
    253
    +                messages.append(message.SerializeToString())
    
    254
    +
    
    255
    +        queue.put(messages)
    
    256
    +
    
    257
    +    # Start a minimal CAS server in a subprocess:
    
    258
    +    with serve_cas([instance]) as server:
    
    259
    +        empty_messages, digests = list(), list()
    
    260
    +        for message in messages:
    
    261
    +            digest = server.store_message(message)
    
    262
    +            digests.append(digest)
    
    263
    +
    
    264
    +            empty_message = deepcopy(message)
    
    265
    +            empty_message.Clear()
    
    266
    +            empty_messages.append(empty_message)
    
    267
    +
    
    268
    +        messages = run_in_subprocess(__test_download_message,
    
    269
    +                                     server.remote, instance, digests, empty_messages)
    
    270
    +
    
    271
    +        for digest, message_blob, message in zip(digests, messages, empty_messages):
    
    272
    +            message.ParseFromString(message_blob)
    
    273
    +
    
    274
    +            assert server.compare_messages(digest, message)
    
    275
    +
    
    276
    +
    
    277
    +@pytest.mark.parametrize('file_paths', FILES)
    
    278
    +@pytest.mark.parametrize('instance', INTANCES)
    
    279
    +def test_download_file(instance, file_paths):
    
    280
    +    # Actual test function, to be run in a subprocess:
    
    281
    +    def __test_download_file(queue, remote, instance, digests, paths):
    
    282
    +        # Open a channel to the remote CAS server:
    
    283
    +        channel = grpc.insecure_channel(remote)
    
    284
    +
    
    285
    +        with download(channel, instance) as client:
    
    286
    +            if len(digests) > 1:
    
    287
    +                for digest, path in zip(digests, paths):
    
    288
    +                    client.download_file(digest, path, queue=False)
    
    289
    +            else:
    
    290
    +                client.download_file(digests[0], paths[0], queue=False)
    
    291
    +
    
    292
    +        queue.put(None)
    
    293
    +
    
    294
    +    # Start a minimal CAS server in a subprocess:
    
    295
    +    with serve_cas([instance]) as server:
    
    296
    +        with tempfile.TemporaryDirectory() as temp_folder:
    
    297
    +            paths, digests = list(), list()
    
    298
    +            for file_path in file_paths:
    
    299
    +                digest = server.store_file(file_path)
    
    300
    +                digests.append(digest)
    
    301
    +
    
    302
    +                path = os.path.relpath(file_path, start=DATA_DIR)
    
    303
    +                path = os.path.join(temp_folder, path)
    
    304
    +                paths.append(path)
    
    305
    +
    
    306
    +                run_in_subprocess(__test_download_file,
    
    307
    +                                  server.remote, instance, digests, paths)
    
    308
    +
    
    309
    +            for digest, path in zip(digests, paths):
    
    310
    +                assert server.compare_files(digest, path)
    
    311
    +
    
    312
    +
    
    313
    +@pytest.mark.parametrize('folder_paths', FOLDERS)
    
    314
    +@pytest.mark.parametrize('instance', INTANCES)
    
    315
    +def test_download_directory(instance, folder_paths):
    
    316
    +    # Actual test function, to be run in a subprocess:
    
    317
    +    def __test_download_directory(queue, remote, instance, digests, paths):
    
    318
    +        # Open a channel to the remote CAS server:
    
    319
    +        channel = grpc.insecure_channel(remote)
    
    320
    +
    
    321
    +        with download(channel, instance) as client:
    
    322
    +            if len(digests) > 1:
    
    323
    +                for digest, path in zip(digests, paths):
    
    324
    +                    client.download_directory(digest, path)
    
    325
    +            else:
    
    326
    +                client.download_directory(digests[0], paths[0])
    
    327
    +
    
    328
    +        queue.put(None)
    
    329
    +
    
    330
    +    # Start a minimal CAS server in a subprocess:
    
    331
    +    with serve_cas([instance]) as server:
    
    332
    +        with tempfile.TemporaryDirectory() as temp_folder:
    
    333
    +            paths, digests = list(), list()
    
    334
    +            for folder_path in folder_paths:
    
    335
    +                digest = server.store_folder(folder_path)
    
    336
    +                digests.append(digest)
    
    337
    +
    
    338
    +                path = os.path.relpath(folder_path, start=DATA_DIR)
    
    339
    +                path = os.path.join(temp_folder, path)
    
    340
    +                paths.append(path)
    
    341
    +
    
    342
    +                run_in_subprocess(__test_download_directory,
    
    343
    +                                  server.remote, instance, digests, paths)
    
    344
    +
    
    345
    +            for digest, path in zip(digests, paths):
    
    346
    +                assert server.check_local_folders(digest, path)

  • tests/integration/operations_service.py
    ... ... @@ -24,12 +24,10 @@ import grpc
    24 24
     from grpc._server import _Context
    
    25 25
     import pytest
    
    26 26
     
    
    27
    +from buildgrid._exceptions import InvalidArgumentError
    
    27 28
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    28 29
     from buildgrid._protos.google.longrunning import operations_pb2
    
    29
    -
    
    30 30
     from buildgrid.server.controller import ExecutionController
    
    31
    -from buildgrid.server._exceptions import InvalidArgumentError
    
    32
    -
    
    33 31
     from buildgrid.server.operations import service
    
    34 32
     from buildgrid.server.operations.service import OperationsService
    
    35 33
     
    

  • tests/utils/cas.py
    ... ... @@ -24,11 +24,13 @@ import grpc
    24 24
     import psutil
    
    25 25
     import pytest_cov
    
    26 26
     
    
    27
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    27 28
     from buildgrid.server.cas.service import ByteStreamService
    
    28 29
     from buildgrid.server.cas.service import ContentAddressableStorageService
    
    29 30
     from buildgrid.server.cas.instance import ByteStreamInstance
    
    30 31
     from buildgrid.server.cas.instance import ContentAddressableStorageInstance
    
    31 32
     from buildgrid.server.cas.storage.disk import DiskStorage
    
    33
    +from buildgrid.utils import create_digest, merkle_maker
    
    32 34
     
    
    33 35
     
    
    34 36
     @contextmanager
    
    ... ... @@ -103,6 +105,15 @@ class Server:
    103 105
         def has(self, digest):
    
    104 106
             return self.__storage.has_blob(digest)
    
    105 107
     
    
    108
    +    def store_blob(self, blob):
    
    109
    +        digest = create_digest(blob)
    
    110
    +        write_buffer = self.__storage.begin_write(digest)
    
    111
    +        write_buffer.write(blob)
    
    112
    +
    
    113
    +        self.__storage.commit_write(digest, write_buffer)
    
    114
    +
    
    115
    +        return digest
    
    116
    +
    
    106 117
         def compare_blobs(self, digest, blob):
    
    107 118
             if not self.__storage.has_blob(digest):
    
    108 119
                 return False
    
    ... ... @@ -112,6 +123,16 @@ class Server:
    112 123
     
    
    113 124
             return blob == stored_blob
    
    114 125
     
    
    126
    +    def store_message(self, message):
    
    127
    +        message_blob = message.SerializeToString()
    
    128
    +        message_digest = create_digest(message_blob)
    
    129
    +        write_buffer = self.__storage.begin_write(message_digest)
    
    130
    +        write_buffer.write(message_blob)
    
    131
    +
    
    132
    +        self.__storage.commit_write(message_digest, write_buffer)
    
    133
    +
    
    134
    +        return message_digest
    
    135
    +
    
    115 136
         def compare_messages(self, digest, message):
    
    116 137
             if not self.__storage.has_blob(digest):
    
    117 138
                 return False
    
    ... ... @@ -123,6 +144,17 @@ class Server:
    123 144
     
    
    124 145
             return message_blob == stored_blob
    
    125 146
     
    
    147
    +    def store_file(self, file_path):
    
    148
    +        with open(file_path, 'rb') as file_bytes:
    
    149
    +            file_blob = file_bytes.read()
    
    150
    +        file_digest = create_digest(file_blob)
    
    151
    +        write_buffer = self.__storage.begin_write(file_digest)
    
    152
    +        write_buffer.write(file_blob)
    
    153
    +
    
    154
    +        self.__storage.commit_write(file_digest, write_buffer)
    
    155
    +
    
    156
    +        return file_digest
    
    157
    +
    
    126 158
         def compare_files(self, digest, file_path):
    
    127 159
             if not self.__storage.has_blob(digest):
    
    128 160
                 return False
    
    ... ... @@ -135,6 +167,53 @@ class Server:
    135 167
     
    
    136 168
             return file_blob == stored_blob
    
    137 169
     
    
    170
    +    def store_folder(self, folder_path):
    
    171
    +        last_digest = None
    
    172
    +        for blob, digest in merkle_maker(folder_path):
    
    173
    +            write_buffer = self.__storage.begin_write(digest)
    
    174
    +            write_buffer.write(blob)
    
    175
    +
    
    176
    +            self.__storage.commit_write(digest, write_buffer)
    
    177
    +            last_digest = digest
    
    178
    +
    
    179
    +        return last_digest
    
    180
    +
    
    181
    +    def check_local_folders(self, digest, folder_path):
    
    182
    +        if not self.__storage.has_blob(digest):
    
    183
    +            return False
    
    184
    +
    
    185
    +        def __compare_folders(digest, path):
    
    186
    +            directory = remote_execution_pb2.Directory()
    
    187
    +            directory.ParseFromString(self.__storage.get_blob(digest).read())
    
    188
    +
    
    189
    +            for file_node in directory.files:
    
    190
    +                file_path = os.path.join(path, file_node.name)
    
    191
    +
    
    192
    +                assert os.path.isfile(file_path)
    
    193
    +                assert not os.path.islink(file_path)
    
    194
    +                if file_node.is_executable:
    
    195
    +                    assert os.access(file_path, os.X_OK)
    
    196
    +
    
    197
    +                assert self.compare_files(file_node.digest, file_path)
    
    198
    +
    
    199
    +            for symlink_node in directory.symlinks:
    
    200
    +                symlink_path = os.path.join(path, symlink_node.name)
    
    201
    +
    
    202
    +                assert os.path.islink(symlink_path)
    
    203
    +                assert os.readlink(symlink_path) == symlink_node.target
    
    204
    +
    
    205
    +            for directory_node in directory.directories:
    
    206
    +                directory_path = os.path.join(path, directory_node.name)
    
    207
    +
    
    208
    +                assert os.path.exists(directory_path)
    
    209
    +                assert not os.path.islink(directory_path)
    
    210
    +
    
    211
    +                return __compare_folders(directory_node.digest, directory_path)
    
    212
    +
    
    213
    +            return True
    
    214
    +
    
    215
    +        return __compare_folders(digest, folder_path)
    
    216
    +
    
    138 217
         def quit(self):
    
    139 218
             if self.__process:
    
    140 219
                 self.__process.terminate()
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]