[Notes] [Git][BuildGrid/buildgrid][mablanch/79-cas-downloader] 14 commits: client/cas.py: Rename the message uploading helper



Title: GitLab

Martin Blanchard pushed to branch mablanch/79-cas-downloader at BuildGrid / buildgrid

Commits:

23 changed files:

Changes:

  • buildgrid/_app/bots/buildbox.py
    ... ... @@ -19,28 +19,30 @@ import tempfile
    19 19
     
    
    20 20
     from google.protobuf import any_pb2
    
    21 21
     
    
    22
    -from buildgrid.client.cas import upload
    
    22
    +from buildgrid.client.cas import download, upload
    
    23 23
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    24
    -from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
    
    25
    -from buildgrid.utils import read_file, write_file, parse_to_pb2_from_fetch
    
    24
    +from buildgrid.utils import read_file, write_file
    
    26 25
     
    
    27 26
     
    
    28 27
     def work_buildbox(context, lease):
    
    29 28
         """Executes a lease for a build action, using buildbox.
    
    30 29
         """
    
    31 30
     
    
    32
    -    stub_bytestream = bytestream_pb2_grpc.ByteStreamStub(context.cas_channel)
    
    33 31
         local_cas_directory = context.local_cas
    
    32
    +    # instance_name = context.parent
    
    34 33
         logger = context.logger
    
    35 34
     
    
    36 35
         action_digest = remote_execution_pb2.Digest()
    
    37 36
         lease.payload.Unpack(action_digest)
    
    38 37
     
    
    39
    -    action = parse_to_pb2_from_fetch(remote_execution_pb2.Action(),
    
    40
    -                                     stub_bytestream, action_digest)
    
    38
    +    with download(context.cas_channel) as cas:
    
    39
    +        action = cas.get_message(action_digest,
    
    40
    +                                 remote_execution_pb2.Action())
    
    41 41
     
    
    42
    -    command = parse_to_pb2_from_fetch(remote_execution_pb2.Command(),
    
    43
    -                                      stub_bytestream, action.command_digest)
    
    42
    +        assert action.command_digest.hash
    
    43
    +
    
    44
    +        command = cas.get_message(action.command_digest,
    
    45
    +                                  remote_execution_pb2.Command())
    
    44 46
     
    
    45 47
         environment = dict()
    
    46 48
         for variable in command.environment_variables:
    
    ... ... @@ -101,10 +103,11 @@ def work_buildbox(context, lease):
    101 103
     
    
    102 104
                 # TODO: Have BuildBox helping us creating the Tree instance here
    
    103 105
                 # See https://gitlab.com/BuildStream/buildbox/issues/7 for details
    
    104
    -            output_tree = _cas_tree_maker(stub_bytestream, output_digest)
    
    106
    +            with download(context.cas_channel) as cas:
    
    107
    +                output_tree = _cas_tree_maker(cas, output_digest)
    
    105 108
     
    
    106 109
                 with upload(context.cas_channel) as cas:
    
    107
    -                output_tree_digest = cas.send_message(output_tree)
    
    110
    +                output_tree_digest = cas.put_message(output_tree)
    
    108 111
     
    
    109 112
                 output_directory = remote_execution_pb2.OutputDirectory()
    
    110 113
                 output_directory.tree_digest.CopyFrom(output_tree_digest)
    
    ... ... @@ -121,24 +124,28 @@ def work_buildbox(context, lease):
    121 124
         return lease
    
    122 125
     
    
    123 126
     
    
    124
    -def _cas_tree_maker(stub_bytestream, directory_digest):
    
    127
    +def _cas_tree_maker(cas, directory_digest):
    
    125 128
         # Generates and stores a Tree for a given Directory. This is very inefficient
    
    126 129
         # and only temporary. See https://gitlab.com/BuildStream/buildbox/issues/7.
    
    127 130
         output_tree = remote_execution_pb2.Tree()
    
    128 131
     
    
    129
    -    def list_directories(parent_directory):
    
    130
    -        directory_list = list()
    
    132
    +    def __cas_tree_maker(cas, parent_directory):
    
    133
    +        digests, directories = list(), list()
    
    131 134
             for directory_node in parent_directory.directories:
    
    132
    -            directory = parse_to_pb2_from_fetch(remote_execution_pb2.Directory(),
    
    133
    -                                                stub_bytestream, directory_node.digest)
    
    134
    -            directory_list.extend(list_directories(directory))
    
    135
    -            directory_list.append(directory)
    
    135
    +            directories.append(remote_execution_pb2.Directory())
    
    136
    +            digests.append(directory_node.digest)
    
    137
    +
    
    138
    +        cas.get_messages(digests, directories)
    
    139
    +
    
    140
    +        for directory in directories[:]:
    
    141
    +            directories.extend(__cas_tree_maker(cas, directory))
    
    142
    +
    
    143
    +        return directories
    
    136 144
     
    
    137
    -        return directory_list
    
    145
    +    root_directory = cas.get_message(directory_digest,
    
    146
    +                                     remote_execution_pb2.Directory())
    
    138 147
     
    
    139
    -    root_directory = parse_to_pb2_from_fetch(remote_execution_pb2.Directory(),
    
    140
    -                                             stub_bytestream, directory_digest)
    
    141
    -    output_tree.children.extend(list_directories(root_directory))
    
    148
    +    output_tree.children.extend(__cas_tree_maker(cas, root_directory))
    
    142 149
         output_tree.root.CopyFrom(root_directory)
    
    143 150
     
    
    144 151
         return output_tree

  • buildgrid/_app/bots/temp_directory.py
    ... ... @@ -19,10 +19,8 @@ import tempfile
    19 19
     
    
    20 20
     from google.protobuf import any_pb2
    
    21 21
     
    
    22
    -from buildgrid.client.cas import upload
    
    22
    +from buildgrid.client.cas import download, upload
    
    23 23
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    24
    -from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
    
    25
    -from buildgrid.utils import write_fetch_directory, parse_to_pb2_from_fetch
    
    26 24
     from buildgrid.utils import output_file_maker, output_directory_maker
    
    27 25
     
    
    28 26
     
    
    ... ... @@ -30,22 +28,23 @@ def work_temp_directory(context, lease):
    30 28
         """Executes a lease for a build action, using host tools.
    
    31 29
         """
    
    32 30
     
    
    33
    -    stub_bytestream = bytestream_pb2_grpc.ByteStreamStub(context.cas_channel)
    
    34 31
         instance_name = context.parent
    
    35 32
         logger = context.logger
    
    36 33
     
    
    37 34
         action_digest = remote_execution_pb2.Digest()
    
    38 35
         lease.payload.Unpack(action_digest)
    
    39 36
     
    
    40
    -    action = parse_to_pb2_from_fetch(remote_execution_pb2.Action(),
    
    41
    -                                     stub_bytestream, action_digest, instance_name)
    
    42
    -
    
    43 37
         with tempfile.TemporaryDirectory() as temp_directory:
    
    44
    -        command = parse_to_pb2_from_fetch(remote_execution_pb2.Command(),
    
    45
    -                                          stub_bytestream, action.command_digest, instance_name)
    
    38
    +        with download(context.cas_channel, instance=instance_name) as cas:
    
    39
    +            action = cas.get_message(action_digest,
    
    40
    +                                     remote_execution_pb2.Action())
    
    41
    +
    
    42
    +            assert action.command_digest.hash
    
    43
    +
    
    44
    +            command = cas.get_message(action.command_digest,
    
    45
    +                                      remote_execution_pb2.Command())
    
    46 46
     
    
    47
    -        write_fetch_directory(temp_directory, stub_bytestream,
    
    48
    -                              action.input_root_digest, instance_name)
    
    47
    +            cas.download_directory(action.input_root_digest, temp_directory)
    
    49 48
     
    
    50 49
             environment = os.environ.copy()
    
    51 50
             for variable in command.environment_variables:
    

  • buildgrid/_app/commands/cmd_cas.py
    ... ... @@ -27,8 +27,9 @@ from urllib.parse import urlparse
    27 27
     import click
    
    28 28
     import grpc
    
    29 29
     
    
    30
    -from buildgrid.utils import merkle_maker, create_digest
    
    30
    +from buildgrid.client.cas import upload
    
    31 31
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    32
    +from buildgrid.utils import merkle_maker
    
    32 33
     
    
    33 34
     from ..cli import pass_context
    
    34 35
     
    
    ... ... @@ -66,27 +67,31 @@ def cli(context, remote, instance_name, client_key, client_cert, server_cert):
    66 67
     
    
    67 68
     
    
    68 69
     @cli.command('upload-files', short_help="Upload files to the CAS server.")
    
    69
    -@click.argument('files', nargs=-1, type=click.File('rb'), required=True)
    
    70
    +@click.argument('files', nargs=-1, type=click.Path(exists=True, dir_okay=False), required=True)
    
    70 71
     @pass_context
    
    71 72
     def upload_files(context, files):
    
    72
    -    stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel)
    
    73
    +    sent_digests, file_map = list(), dict()
    
    74
    +    with upload(context.channel, instance=context.instance_name) as cas:
    
    75
    +        for file_path in files:
    
    76
    +            context.logger.info("Queueing {}".format(file_path))
    
    73 77
     
    
    74
    -    requests = []
    
    75
    -    for file in files:
    
    76
    -        chunk = file.read()
    
    77
    -        requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
    
    78
    -            digest=create_digest(chunk), data=chunk))
    
    78
    +            file_digest = cas.upload_file(file_path, queue=True)
    
    79 79
     
    
    80
    -    request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=context.instance_name,
    
    81
    -                                                           requests=requests)
    
    80
    +            assert file_digest.hash and file_digest.size_bytes
    
    82 81
     
    
    83
    -    context.logger.info("Sending: {}".format(request))
    
    84
    -    response = stub.BatchUpdateBlobs(request)
    
    85
    -    context.logger.info("Response: {}".format(response))
    
    82
    +            file_map[file_digest.hash] = file_path
    
    83
    +            sent_digests.append(file_digest)
    
    84
    +
    
    85
    +    for file_digest in sent_digests:
    
    86
    +        file_path = file_map[file_digest.hash]
    
    87
    +        if file_digest.ByteSize():
    
    88
    +            context.logger.info("{}: {}".format(file_path, file_digest.hash))
    
    89
    +        else:
    
    90
    +            context.logger.info("{}: FAILED".format(file_path))
    
    86 91
     
    
    87 92
     
    
    88 93
     @cli.command('upload-dir', short_help="Upload a directory to the CAS server.")
    
    89
    -@click.argument('directory', nargs=1, type=click.Path(), required=True)
    
    94
    +@click.argument('directory', nargs=1, type=click.Path(exists=True, file_okay=False), required=True)
    
    90 95
     @pass_context
    
    91 96
     def upload_dir(context, directory):
    
    92 97
         context.logger.info("Uploading directory to cas")
    

  • buildgrid/_app/commands/cmd_execute.py
    ... ... @@ -20,7 +20,6 @@ Execute command
    20 20
     Request work to be executed and monitor status of jobs.
    
    21 21
     """
    
    22 22
     
    
    23
    -import errno
    
    24 23
     import logging
    
    25 24
     import os
    
    26 25
     import stat
    
    ... ... @@ -30,9 +29,9 @@ from urllib.parse import urlparse
    30 29
     import click
    
    31 30
     import grpc
    
    32 31
     
    
    33
    -from buildgrid.utils import merkle_maker, create_digest, write_fetch_blob
    
    32
    +from buildgrid.client.cas import download, upload
    
    34 33
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    35
    -from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
    
    34
    +from buildgrid.utils import merkle_maker
    
    36 35
     
    
    37 36
     from ..cli import pass_context
    
    38 37
     
    
    ... ... @@ -119,54 +118,43 @@ def wait_execution(context, operation_name):
    119 118
     @click.argument('input-root', nargs=1, type=click.Path(), required=True)
    
    120 119
     @click.argument('commands', nargs=-1, type=click.STRING, required=True)
    
    121 120
     @pass_context
    
    122
    -def command(context, input_root, commands, output_file, output_directory):
    
    121
    +def run_command(context, input_root, commands, output_file, output_directory):
    
    123 122
         stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
    
    124 123
     
    
    125
    -    execute_command = remote_execution_pb2.Command()
    
    124
    +    output_executeables = list()
    
    125
    +    with upload(context.channel, instance=context.instance_name) as cas:
    
    126
    +        command = remote_execution_pb2.Command()
    
    126 127
     
    
    127
    -    for arg in commands:
    
    128
    -        execute_command.arguments.extend([arg])
    
    128
    +        for arg in commands:
    
    129
    +            command.arguments.extend([arg])
    
    129 130
     
    
    130
    -    output_executeables = []
    
    131
    -    for file, is_executeable in output_file:
    
    132
    -        execute_command.output_files.extend([file])
    
    133
    -        if is_executeable:
    
    134
    -            output_executeables.append(file)
    
    131
    +        for file, is_executeable in output_file:
    
    132
    +            command.output_files.extend([file])
    
    133
    +            if is_executeable:
    
    134
    +                output_executeables.append(file)
    
    135 135
     
    
    136
    -    command_digest = create_digest(execute_command.SerializeToString())
    
    137
    -    context.logger.info(command_digest)
    
    136
    +        command_digest = cas.put_message(command, queue=True)
    
    138 137
     
    
    139
    -    # TODO: Check for missing blobs
    
    140
    -    digest = None
    
    141
    -    for _, digest in merkle_maker(input_root):
    
    142
    -        pass
    
    138
    +        context.logger.info('Sent command: {}'.format(command_digest))
    
    143 139
     
    
    144
    -    action = remote_execution_pb2.Action(command_digest=command_digest,
    
    145
    -                                         input_root_digest=digest,
    
    146
    -                                         do_not_cache=True)
    
    140
    +        # TODO: Check for missing blobs
    
    141
    +        input_root_digest = None
    
    142
    +        for _, input_root_digest in merkle_maker(input_root):
    
    143
    +            pass
    
    147 144
     
    
    148
    -    action_digest = create_digest(action.SerializeToString())
    
    145
    +        action = remote_execution_pb2.Action(command_digest=command_digest,
    
    146
    +                                             input_root_digest=input_root_digest,
    
    147
    +                                             do_not_cache=True)
    
    149 148
     
    
    150
    -    context.logger.info("Sending execution request...")
    
    151
    -
    
    152
    -    requests = []
    
    153
    -    requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
    
    154
    -        digest=command_digest, data=execute_command.SerializeToString()))
    
    155
    -
    
    156
    -    requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
    
    157
    -        digest=action_digest, data=action.SerializeToString()))
    
    149
    +        action_digest = cas.put_message(action, queue=True)
    
    158 150
     
    
    159
    -    request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=context.instance_name,
    
    160
    -                                                           requests=requests)
    
    161
    -    remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel).BatchUpdateBlobs(request)
    
    151
    +        context.logger.info("Sent action: {}".format(action_digest))
    
    162 152
     
    
    163 153
         request = remote_execution_pb2.ExecuteRequest(instance_name=context.instance_name,
    
    164 154
                                                       action_digest=action_digest,
    
    165 155
                                                       skip_cache_lookup=True)
    
    166 156
         response = stub.Execute(request)
    
    167 157
     
    
    168
    -    stub = bytestream_pb2_grpc.ByteStreamStub(context.channel)
    
    169
    -
    
    170 158
         stream = None
    
    171 159
         for stream in response:
    
    172 160
             context.logger.info(stream)
    
    ... ... @@ -174,21 +162,16 @@ def command(context, input_root, commands, output_file, output_directory):
    174 162
         execute_response = remote_execution_pb2.ExecuteResponse()
    
    175 163
         stream.response.Unpack(execute_response)
    
    176 164
     
    
    177
    -    for output_file_response in execute_response.result.output_files:
    
    178
    -        path = os.path.join(output_directory, output_file_response.path)
    
    179
    -
    
    180
    -        if not os.path.exists(os.path.dirname(path)):
    
    165
    +    with download(context.channel, instance=context.instance_name) as cas:
    
    181 166
     
    
    182
    -            try:
    
    183
    -                os.makedirs(os.path.dirname(path))
    
    167
    +        for output_file_response in execute_response.result.output_files:
    
    168
    +            path = os.path.join(output_directory, output_file_response.path)
    
    184 169
     
    
    185
    -            except OSError as exc:
    
    186
    -                if exc.errno != errno.EEXIST:
    
    187
    -                    raise
    
    170
    +            if not os.path.exists(os.path.dirname(path)):
    
    171
    +                os.makedirs(os.path.dirname(path), exist_ok=True)
    
    188 172
     
    
    189
    -        with open(path, 'wb+') as f:
    
    190
    -            write_fetch_blob(f, stub, output_file_response.digest, context.instance_name)
    
    173
    +            cas.download_file(output_file_response.digest, path)
    
    191 174
     
    
    192
    -        if output_file_response.path in output_executeables:
    
    193
    -            st = os.stat(path)
    
    194
    -            os.chmod(path, st.st_mode | stat.S_IXUSR)
    175
    +            if output_file_response.path in output_executeables:
    
    176
    +                st = os.stat(path)
    
    177
    +                os.chmod(path, st.st_mode | stat.S_IXUSR)

  • buildgrid/_exceptions.py
    ... ... @@ -50,3 +50,27 @@ class ServerError(BgdError):
    50 50
     class BotError(BgdError):
    
    51 51
         def __init__(self, message, detail=None, reason=None):
    
    52 52
             super().__init__(message, detail=detail, domain=ErrorDomain.BOT, reason=reason)
    
    53
    +
    
    54
    +
    
    55
    +class InvalidArgumentError(BgdError):
    
    56
    +    """A bad argument was passed, such as a name which doesn't exist."""
    
    57
    +    def __init__(self, message, detail=None, reason=None):
    
    58
    +        super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
    
    59
    +
    
    60
    +
    
    61
    +class NotFoundError(BgdError):
    
    62
    +    """Requested resource not found."""
    
    63
    +    def __init__(self, message, detail=None, reason=None):
    
    64
    +        super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
    
    65
    +
    
    66
    +
    
    67
    +class OutOfSyncError(BgdError):
    
    68
    +    """The worker is out of sync with the server, such as having a differing number of leases."""
    
    69
    +    def __init__(self, message, detail=None, reason=None):
    
    70
    +        super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
    
    71
    +
    
    72
    +
    
    73
    +class OutOfRangeError(BgdError):
    
    74
    +    """ ByteStream service read data out of range."""
    
    75
    +    def __init__(self, message, detail=None, reason=None):
    
    76
    +        super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)

  • buildgrid/client/cas.py
    ... ... @@ -17,9 +17,447 @@ from contextlib import contextmanager
    17 17
     import uuid
    
    18 18
     import os
    
    19 19
     
    
    20
    -from buildgrid.settings import HASH
    
    20
    +import grpc
    
    21
    +
    
    22
    +from buildgrid._exceptions import NotFoundError
    
    21 23
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    22 24
     from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    25
    +from buildgrid._protos.google.rpc import code_pb2
    
    26
    +from buildgrid.settings import HASH
    
    27
    +from buildgrid.utils import write_file
    
    28
    +
    
    29
    +
    
    30
    +# Maximum size for a queueable file:
    
    31
    +__FILE_SIZE_THRESHOLD = 1 * 1024 * 1024
    
    32
    +
    
    33
    +# Maximum size for a single gRPC request:
    
    34
    +__MAX_REQUEST_SIZE = 2 * 1024 * 1024
    
    35
    +
    
    36
    +# Maximum number of elements per gRPC request:
    
    37
    +__MAX_REQUEST_COUNT = 500
    
    38
    +
    
    39
    +
    
    40
    +class CallCache:
    
    41
    +    """Per remote grpc.StatusCode.UNIMPLEMENTED call cache."""
    
    42
    +    __calls = dict()
    
    43
    +
    
    44
    +    @classmethod
    
    45
    +    def mark_unimplemented(cls, channel, name):
    
    46
    +        if channel not in cls.__calls:
    
    47
    +            cls.__calls[channel] = set()
    
    48
    +        cls.__calls[channel].add(name)
    
    49
    +
    
    50
    +    @classmethod
    
    51
    +    def unimplemented(cls, channel, name):
    
    52
    +        if channel not in cls.__calls:
    
    53
    +            return False
    
    54
    +        return name in cls.__calls[channel]
    
    55
    +
    
    56
    +
    
    57
    +@contextmanager
    
    58
    +def download(channel, instance=None, u_uid=None):
    
    59
    +    downloader = Downloader(channel, instance=instance)
    
    60
    +    try:
    
    61
    +        yield downloader
    
    62
    +    finally:
    
    63
    +        downloader.close()
    
    64
    +
    
    65
    +
    
    66
    +class Downloader:
    
    67
    +    """Remote CAS files, directories and messages download helper.
    
    68
    +
    
    69
    +    The :class:`Downloader` class comes with a generator factory function that
    
    70
    +    can be used together with the `with` statement for context management::
    
    71
    +
    
    72
    +        with download(channel, instance='build') as cas:
    
    73
    +            cas.get_message(message_digest)
    
    74
    +    """
    
    75
    +
    
    76
    +    def __init__(self, channel, instance=None):
    
    77
    +        """Initializes a new :class:`Downloader` instance.
    
    78
    +
    
    79
    +        Args:
    
    80
    +            channel (grpc.Channel): A gRPC channel to the CAS endpoint.
    
    81
    +            instance (str, optional): the targeted instance's name.
    
    82
    +        """
    
    83
    +        self.channel = channel
    
    84
    +
    
    85
    +        self.instance_name = instance
    
    86
    +
    
    87
    +        self.__bytestream_stub = bytestream_pb2_grpc.ByteStreamStub(self.channel)
    
    88
    +        self.__cas_stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
    
    89
    +
    
    90
    +        self.__file_requests = dict()
    
    91
    +        self.__file_request_count = 0
    
    92
    +        self.__file_request_size = 0
    
    93
    +        self.__file_response_size = 0
    
    94
    +
    
    95
    +    # --- Public API ---
    
    96
    +
    
    97
    +    def get_blob(self, digest):
    
    98
    +        """Retrieves a blob from the remote CAS server.
    
    99
    +
    
    100
    +        Args:
    
    101
    +            digest (:obj:`Digest`): the blob's digest to fetch.
    
    102
    +
    
    103
    +        Returns:
    
    104
    +            bytearray: the fetched blob data or None if not found.
    
    105
    +        """
    
    106
    +        try:
    
    107
    +            blob = self._fetch_blob(digest)
    
    108
    +        except NotFoundError:
    
    109
    +            return None
    
    110
    +
    
    111
    +        return blob
    
    112
    +
    
    113
    +    def get_blobs(self, digests):
    
    114
    +        """Retrieves a list of blobs from the remote CAS server.
    
    115
    +
    
    116
    +        Args:
    
    117
    +            digests (list): list of :obj:`Digest`s for the blobs to fetch.
    
    118
    +
    
    119
    +        Returns:
    
    120
    +            list: the fetched blob data list.
    
    121
    +        """
    
    122
    +        return self._fetch_blob_batch(digests)
    
    123
    +
    
    124
    +    def get_message(self, digest, message):
    
    125
    +        """Retrieves a :obj:`Message` from the remote CAS server.
    
    126
    +
    
    127
    +        Args:
    
    128
    +            digest (:obj:`Digest`): the message's digest to fetch.
    
    129
    +            message (:obj:`Message`): an empty message to fill.
    
    130
    +
    
    131
    +        Returns:
    
    132
    +            :obj:`Message`: `message` filled or emptied if not found.
    
    133
    +        """
    
    134
    +        try:
    
    135
    +            message_blob = self._fetch_blob(digest)
    
    136
    +        except NotFoundError:
    
    137
    +            message_blob = None
    
    138
    +
    
    139
    +        if message_blob is not None:
    
    140
    +            message.ParseFromString(message_blob)
    
    141
    +        else:
    
    142
    +            message.Clear()
    
    143
    +
    
    144
    +        return message
    
    145
    +
    
    146
    +    def get_messages(self, digests, messages):
    
    147
    +        """Retrieves a list of :obj:`Message`s from the remote CAS server.
    
    148
    +
    
    149
    +        Note:
    
    150
    +            The `digests` and `messages` list **must** contain the same number
    
    151
    +            of elements.
    
    152
    +
    
    153
    +        Args:
    
    154
    +            digests (list):  list of :obj:`Digest`s for the messages to fetch.
    
    155
    +            messages (list): list of empty :obj:`Message`s to fill.
    
    156
    +
    
    157
    +        Returns:
    
    158
    +            list: the fetched and filled message list.
    
    159
    +        """
    
    160
    +        assert len(digests) == len(messages)
    
    161
    +
    
    162
    +        message_blobs = self._fetch_blob_batch(digests)
    
    163
    +
    
    164
    +        assert len(message_blobs) == len(messages)
    
    165
    +
    
    166
    +        for message, message_blob in zip(messages, message_blobs):
    
    167
    +            message.ParseFromString(message_blob)
    
    168
    +
    
    169
    +        return messages
    
    170
    +
    
    171
    +    def download_file(self, digest, file_path, queue=True):
    
    172
    +        """Retrieves a file from the remote CAS server.
    
    173
    +
    
    174
    +        If queuing is allowed (`queue=True`), the download request **may** be
    
    175
    +        defer. An explicit call to :method:`flush` can force the request to be
    
    176
    +        send immediately (along with the rest of the queued batch).
    
    177
    +
    
    178
    +        Args:
    
    179
    +            digest (:obj:`Digest`): the file's digest to fetch.
    
    180
    +            file_path (str): absolute or relative path to the local file to write.
    
    181
    +            queue (bool, optional): whether or not the download request may be
    
    182
    +                queued and submitted as part of a batch upload request. Defaults
    
    183
    +                to True.
    
    184
    +
    
    185
    +        Raises:
    
    186
    +            NotFoundError: if `digest` is not present in the remote CAS server.
    
    187
    +            OSError: if `file_path` does not exist or is not readable.
    
    188
    +        """
    
    189
    +        if not os.path.isabs(file_path):
    
    190
    +            file_path = os.path.abspath(file_path)
    
    191
    +
    
    192
    +        if not queue or digest.size_bytes > __FILE_SIZE_THRESHOLD:
    
    193
    +            self._fetch_file(digest, file_path)
    
    194
    +        else:
    
    195
    +            self._queue_file(digest, file_path)
    
    196
    +
    
    197
    +    def download_directory(self, digest, directory_path):
    
    198
    +        """Retrieves a :obj:`Directory` from the remote CAS server.
    
    199
    +
    
    200
    +        Args:
    
    201
    +            digest (:obj:`Digest`): the directory's digest to fetch.
    
    202
    +
    
    203
    +        Returns:
    
    204
    +            :obj:`Digest`: The digest of the :obj:`Directory`.
    
    205
    +            directory_path (str): absolute or relative path to the local
    
    206
    +                directory to write.
    
    207
    +
    
    208
    +        Raises:
    
    209
    +            NotFoundError: if `digest` is not present in the remote CAS server.
    
    210
    +            FileExistsError: if `directory_path` already contains parts of their
    
    211
    +                fetched directory's content.
    
    212
    +        """
    
    213
    +        if not os.path.isabs(directory_path):
    
    214
    +            directory_path = os.path.abspath(directory_path)
    
    215
    +
    
    216
    +        # We want to start fresh here, the rest is very synchronous...
    
    217
    +        self.flush()
    
    218
    +
    
    219
    +        self._fetch_directory(digest, directory_path)
    
    220
    +
    
    221
    +    def flush(self):
    
    222
    +        """Ensures any queued request gets sent."""
    
    223
    +        if self.__file_requests:
    
    224
    +            self._fetch_file_batch(self.__file_requests)
    
    225
    +
    
    226
    +            self.__file_requests.clear()
    
    227
    +            self.__file_request_count = 0
    
    228
    +            self.__file_request_size = 0
    
    229
    +            self.__file_response_size = 0
    
    230
    +
    
    231
    +    def close(self):
    
    232
    +        """Closes the underlying connection stubs.
    
    233
    +
    
    234
    +        Note:
    
    235
    +            This will always send pending requests before closing connections,
    
    236
    +            if any.
    
    237
    +        """
    
    238
    +        self.flush()
    
    239
    +
    
    240
    +        self.__bytestream_stub = None
    
    241
    +        self.__cas_stub = None
    
    242
    +
    
    243
    +    # --- Private API ---
    
    244
    +
    
    245
    +    def _fetch_blob(self, digest):
    
    246
    +        """Fetches a blob using ByteStream.Read()"""
    
    247
    +        read_blob = bytearray()
    
    248
    +
    
    249
    +        if self.instance_name is not None:
    
    250
    +            resource_name = '/'.join([self.instance_name, 'blobs',
    
    251
    +                                      digest.hash, str(digest.size_bytes)])
    
    252
    +        else:
    
    253
    +            resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
    
    254
    +
    
    255
    +        read_request = bytestream_pb2.ReadRequest()
    
    256
    +        read_request.resource_name = resource_name
    
    257
    +        read_request.read_offset = 0
    
    258
    +
    
    259
    +        try:
    
    260
    +            # TODO: Handle connection loss/recovery
    
    261
    +            for read_response in self.__bytestream_stub.Read(read_request):
    
    262
    +                read_blob += read_response.data
    
    263
    +
    
    264
    +            assert len(read_blob) == digest.size_bytes
    
    265
    +
    
    266
    +        except grpc.RpcError as e:
    
    267
    +            status_code = e.code()
    
    268
    +            if status_code == grpc.StatusCode.NOT_FOUND:
    
    269
    +                raise NotFoundError("Requested data does not exist on the remote.")
    
    270
    +
    
    271
    +            else:
    
    272
    +                assert False
    
    273
    +
    
    274
    +        return read_blob
    
    275
    +
    
    276
    +    def _fetch_blob_batch(self, digests):
    
    277
    +        """Fetches blobs using ContentAddressableStorage.BatchReadBlobs()"""
    
    278
    +        batch_fetched = False
    
    279
    +        read_blobs = list()
    
    280
    +
    
    281
    +        # First, try BatchReadBlobs(), if not already known not being implemented:
    
    282
    +        if not CallCache.unimplemented(self.channel, 'BatchReadBlobs'):
    
    283
    +            batch_request = remote_execution_pb2.BatchReadBlobsRequest()
    
    284
    +            batch_request.digests.extend(digests)
    
    285
    +            if self.instance_name is not None:
    
    286
    +                batch_request.instance_name = self.instance_name
    
    287
    +
    
    288
    +            try:
    
    289
    +                batch_response = self.__cas_stub.BatchReadBlobs(batch_request)
    
    290
    +                for response in batch_response.responses:
    
    291
    +                    assert response.digest.hash in digests
    
    292
    +
    
    293
    +                    read_blobs.append(response.data)
    
    294
    +
    
    295
    +                    if response.status.code != code_pb2.OK:
    
    296
    +                        assert False
    
    297
    +
    
    298
    +                batch_fetched = True
    
    299
    +
    
    300
    +            except grpc.RpcError as e:
    
    301
    +                status_code = e.code()
    
    302
    +                if status_code == grpc.StatusCode.UNIMPLEMENTED:
    
    303
    +                    CallCache.mark_unimplemented(self.channel, 'BatchReadBlobs')
    
    304
    +
    
    305
    +                else:
    
    306
    +                    assert False
    
    307
    +
    
    308
    +        # Fallback to Read() if no BatchReadBlobs():
    
    309
    +        if not batch_fetched:
    
    310
    +            for digest in digests:
    
    311
    +                read_blobs.append(self._fetch_blob(digest))
    
    312
    +
    
    313
    +        return read_blobs
    
    314
    +
    
    315
    +    def _fetch_file(self, digest, file_path):
    
    316
    +        """Fetches a file using ByteStream.Read()"""
    
    317
    +        if self.instance_name is not None:
    
    318
    +            resource_name = '/'.join([self.instance_name, 'blobs',
    
    319
    +                                      digest.hash, str(digest.size_bytes)])
    
    320
    +        else:
    
    321
    +            resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
    
    322
    +
    
    323
    +        read_request = bytestream_pb2.ReadRequest()
    
    324
    +        read_request.resource_name = resource_name
    
    325
    +        read_request.read_offset = 0
    
    326
    +
    
    327
    +        with open(file_path, 'wb') as byte_file:
    
    328
    +            # TODO: Handle connection loss/recovery
    
    329
    +            for read_response in self.__bytestream_stub.Read(read_request):
    
    330
    +                byte_file.write(read_response.data)
    
    331
    +
    
    332
    +            assert byte_file.tell() == digest.size_bytes
    
    333
    +
    
    334
    +    def _queue_file(self, digest, file_path):
    
    335
    +        """Queues a file for later batch download"""
    
    336
    +        if self.__file_request_size + digest.ByteSize() > __MAX_REQUEST_SIZE:
    
    337
    +            self.flush()
    
    338
    +        elif self.__file_response_size + digest.size_bytes > __MAX_REQUEST_SIZE:
    
    339
    +            self.flush()
    
    340
    +        elif self.__file_request_count >= __MAX_REQUEST_COUNT:
    
    341
    +            self.flush()
    
    342
    +
    
    343
    +        self.__file_requests[digest.hash] = (digest, file_path)
    
    344
    +        self.__file_request_count += 1
    
    345
    +        self.__file_request_size += digest.ByteSize()
    
    346
    +        self.__file_response_size += digest.size_bytes
    
    347
    +
    
    348
    +    def _fetch_file_batch(self, digests_paths):
    
    349
    +        """Sends queued data using ContentAddressableStorage.BatchReadBlobs()"""
    
    350
    +        batch_digests = [digest for digest, _ in digests_paths]
    
    351
    +        batch_blobs = self._fetch_blob_batch(batch_digests)
    
    352
    +
    
    353
    +        for (_, file_path), file_blob in zip(digests_paths, batch_blobs):
    
    354
    +            self._write_file(file_blob, file_path)
    
    355
    +
    
    356
    +    def _write_file(self, blob, file_path, create_parent=False):
    
    357
    +        """Dumps a memory blob to a local file"""
    
    358
    +        if create_parent:
    
    359
    +            os.makedirs(os.path.dirname(file_path), exist_ok=True)
    
    360
    +
    
    361
    +        write_file(file_path, blob)
    
    362
    +
    
    363
    +    def _fetch_directory(self, digest, directory_path):
    
    364
    +        """Fetches a file using ByteStream.GetTree()"""
    
    365
    +        # Better fail early if the local root path cannot be created:
    
    366
    +        os.makedirs(directory_path, exist_ok=True)
    
    367
    +
    
    368
    +        directories = dict()
    
    369
    +        directory_fetched = False
    
    370
    +        # First, try GetTree() if not known to be unimplemented yet:
    
    371
    +        if not CallCache.unimplemented(self.channel, 'GetTree'):
    
    372
    +            tree_request = remote_execution_pb2.GetTreeRequest()
    
    373
    +            tree_request.root_digest.CopyFrom(digest)
    
    374
    +            tree_request.page_size = __MAX_REQUEST_COUNT
    
    375
    +            if self.instance_name is not None:
    
    376
    +                tree_request.instance_name = self.instance_name
    
    377
    +
    
    378
    +            try:
    
    379
    +                tree_fetched = False
    
    380
    +                while not tree_fetched:
    
    381
    +                    tree_response = self.__cas_stub.GetTree(tree_request)
    
    382
    +                    for directory in tree_response.directories:
    
    383
    +                        directory_blob = directory.SerializeToString()
    
    384
    +                        directory_hash = HASH(directory_blob).hexdigest()
    
    385
    +
    
    386
    +                        directories[directory_hash] = directory
    
    387
    +
    
    388
    +                    if tree_response.next_page_token:
    
    389
    +                        tree_request = remote_execution_pb2.BatchReadBlobsRequest()
    
    390
    +                        tree_request.root_digest.CopyFrom(digest)
    
    391
    +                        tree_request.page_size = __MAX_REQUEST_COUNT
    
    392
    +                        tree_request.page_token = tree_response.next_page_token
    
    393
    +
    
    394
    +                    else:
    
    395
    +                        tree_fetched = True
    
    396
    +
    
    397
    +                assert digest.hash in directories
    
    398
    +
    
    399
    +            except grpc.RpcError as e:
    
    400
    +                status_code = e.code()
    
    401
    +                if status_code == grpc.StatusCode.UNIMPLEMENTED:
    
    402
    +                    CallCache.mark_unimplemented(self.channel, 'BatchUpdateBlobs')
    
    403
    +
    
    404
    +                elif status_code == grpc.StatusCode.NOT_FOUND:
    
    405
    +                    raise NotFoundError("Requested directory does not exist on the remote.")
    
    406
    +
    
    407
    +                else:
    
    408
    +                    assert False
    
    409
    +
    
    410
    +            directory = directories[digest.hash]
    
    411
    +
    
    412
    +            self._write_directory(digest.hash, directory_path,
    
    413
    +                                  directories=directories, root_barrier=directory_path)
    
    414
    +            directory_fetched = True
    
    415
    +
    
    416
    +        # TODO: Try with BatchReadBlobs().
    
    417
    +
    
    418
    +        # Fallback to Read() if no GetTree():
    
    419
    +        if not directory_fetched:
    
    420
    +            directory = remote_execution_pb2.Directory()
    
    421
    +            directory.ParseFromString(self._fetch_blob(digest))
    
    422
    +
    
    423
    +            self._write_directory(directory, directory_path,
    
    424
    +                                  root_barrier=directory_path)
    
    425
    +
    
    426
    +    def _write_directory(self, root_directory, root_path, directories=None, root_barrier=None):
    
    427
    +        """Generates a local directory structure"""
    
    428
    +        for file_node in root_directory.files:
    
    429
    +            file_path = os.path.join(root_path, file_node.name)
    
    430
    +
    
    431
    +            self._queue_file(file_node.digest, file_path)
    
    432
    +
    
    433
    +        for directory_node in root_directory.directories:
    
    434
    +            directory_path = os.path.join(root_path, directory_node.name)
    
    435
    +            if directories and directory_node.digest.hash in directories:
    
    436
    +                directory = directories[directory_node.digest.hash]
    
    437
    +            else:
    
    438
    +                directory = remote_execution_pb2.Directory()
    
    439
    +                directory.ParseFromString(self._fetch_blob(directory_node.digest))
    
    440
    +
    
    441
    +            os.makedirs(directory_path, exist_ok=True)
    
    442
    +
    
    443
    +            self._write_directory(directory, directory_path,
    
    444
    +                                  directories=directories, root_barrier=root_barrier)
    
    445
    +
    
    446
    +        for symlink_node in root_directory.symlinks:
    
    447
    +            symlink_path = os.path.join(root_path, symlink_node.name)
    
    448
    +            if not os.path.isabs(symlink_node.target):
    
    449
    +                target_path = os.path.join(root_path, symlink_node.target)
    
    450
    +            else:
    
    451
    +                target_path = symlink_node.target
    
    452
    +            target_path = os.path.normpath(target_path)
    
    453
    +
    
    454
    +            # Do not create links pointing outside the barrier:
    
    455
    +            if root_barrier is not None:
    
    456
    +                common_path = os.path.commonprefix([root_barrier, target_path])
    
    457
    +                if not common_path.startswith(root_barrier):
    
    458
    +                    continue
    
    459
    +
    
    460
    +            os.symlink(symlink_path, target_path)
    
    23 461
     
    
    24 462
     
    
    25 463
     @contextmanager
    
    ... ... @@ -28,7 +466,7 @@ def upload(channel, instance=None, u_uid=None):
    28 466
         try:
    
    29 467
             yield uploader
    
    30 468
         finally:
    
    31
    -        uploader.flush()
    
    469
    +        uploader.close()
    
    32 470
     
    
    33 471
     
    
    34 472
     class Uploader:
    
    ... ... @@ -39,15 +477,8 @@ class Uploader:
    39 477
     
    
    40 478
             with upload(channel, instance='build') as cas:
    
    41 479
                 cas.upload_file('/path/to/local/file')
    
    42
    -
    
    43
    -    Attributes:
    
    44
    -        FILE_SIZE_THRESHOLD (int): maximum size for a queueable file.
    
    45
    -        MAX_REQUEST_SIZE (int): maximum size for a single gRPC request.
    
    46 480
         """
    
    47 481
     
    
    48
    -    FILE_SIZE_THRESHOLD = 1 * 1024 * 1024
    
    49
    -    MAX_REQUEST_SIZE = 2 * 1024 * 1024
    
    50
    -
    
    51 482
         def __init__(self, channel, instance=None, u_uid=None):
    
    52 483
             """Initializes a new :class:`Uploader` instance.
    
    53 484
     
    
    ... ... @@ -68,8 +499,61 @@ class Uploader:
    68 499
             self.__cas_stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
    
    69 500
     
    
    70 501
             self.__requests = dict()
    
    502
    +        self.__request_count = 0
    
    71 503
             self.__request_size = 0
    
    72 504
     
    
    505
    +    # --- Public API ---
    
    506
    +
    
    507
    +    def put_blob(self, blob, digest=None, queue=False):
    
    508
    +        """Stores a blob into the remote CAS server.
    
    509
    +
    
    510
    +        If queuing is allowed (`queue=True`), the upload request **may** be
    
    511
    +        defer. An explicit call to :method:`flush` can force the request to be
    
    512
    +        send immediately (along with the rest of the queued batch).
    
    513
    +
    
    514
    +        Args:
    
    515
    +            blob (bytes): the blob's data.
    
    516
    +            digest (:obj:`Digest`, optional): the blob's digest.
    
    517
    +            queue (bool, optional): whether or not the upload request may be
    
    518
    +                queued and submitted as part of a batch upload request. Defaults
    
    519
    +                to False.
    
    520
    +
    
    521
    +        Returns:
    
    522
    +            :obj:`Digest`: the sent blob's digest.
    
    523
    +        """
    
    524
    +        if not queue or len(blob) > __FILE_SIZE_THRESHOLD:
    
    525
    +            blob_digest = self._send_blob(blob)
    
    526
    +        else:
    
    527
    +            blob_digest = self._queue_blob(blob)
    
    528
    +
    
    529
    +        return blob_digest
    
    530
    +
    
    531
    +    def put_message(self, message, digest=None, queue=False):
    
    532
    +        """Stores a message into the remote CAS server.
    
    533
    +
    
    534
    +        If queuing is allowed (`queue=True`), the upload request **may** be
    
    535
    +        defer. An explicit call to :method:`flush` can force the request to be
    
    536
    +        send immediately (along with the rest of the queued batch).
    
    537
    +
    
    538
    +        Args:
    
    539
    +            message (:obj:`Message`): the message object.
    
    540
    +            digest (:obj:`Digest`, optional): the message's digest.
    
    541
    +            queue (bool, optional): whether or not the upload request may be
    
    542
    +                queued and submitted as part of a batch upload request. Defaults
    
    543
    +                to False.
    
    544
    +
    
    545
    +        Returns:
    
    546
    +            :obj:`Digest`: the sent message's digest.
    
    547
    +        """
    
    548
    +        message_blob = message.SerializeToString()
    
    549
    +
    
    550
    +        if not queue or len(message_blob) > __FILE_SIZE_THRESHOLD:
    
    551
    +            message_digest = self._send_blob(message_blob)
    
    552
    +        else:
    
    553
    +            message_digest = self._queue_blob(message_blob)
    
    554
    +
    
    555
    +        return message_digest
    
    556
    +
    
    73 557
         def upload_file(self, file_path, queue=True):
    
    74 558
             """Stores a local file into the remote CAS storage.
    
    75 559
     
    
    ... ... @@ -79,7 +563,7 @@ class Uploader:
    79 563
     
    
    80 564
             Args:
    
    81 565
                 file_path (str): absolute or relative path to a local file.
    
    82
    -            queue (bool, optional): wheter or not the upload request may be
    
    566
    +            queue (bool, optional): whether or not the upload request may be
    
    83 567
                     queued and submitted as part of a batch upload request. Defaults
    
    84 568
                     to True.
    
    85 569
     
    
    ... ... @@ -95,12 +579,12 @@ class Uploader:
    95 579
             with open(file_path, 'rb') as bytes_steam:
    
    96 580
                 file_bytes = bytes_steam.read()
    
    97 581
     
    
    98
    -        if not queue or len(file_bytes) > Uploader.FILE_SIZE_THRESHOLD:
    
    99
    -            blob_digest = self._send_blob(file_bytes)
    
    582
    +        if not queue or len(file_bytes) > __FILE_SIZE_THRESHOLD:
    
    583
    +            file_digest = self._send_blob(file_bytes)
    
    100 584
             else:
    
    101
    -            blob_digest = self._queue_blob(file_bytes)
    
    585
    +            file_digest = self._queue_blob(file_bytes)
    
    102 586
     
    
    103
    -        return blob_digest
    
    587
    +        return file_digest
    
    104 588
     
    
    105 589
         def upload_directory(self, directory, queue=True):
    
    106 590
             """Stores a :obj:`Directory` into the remote CAS storage.
    
    ... ... @@ -126,50 +610,37 @@ class Uploader:
    126 610
             else:
    
    127 611
                 return self._queue_blob(directory.SerializeToString())
    
    128 612
     
    
    129
    -    def send_message(self, message):
    
    130
    -        """Stores a message into the remote CAS storage.
    
    131
    -
    
    132
    -        Args:
    
    133
    -            message (:obj:`Message`): a protobuf message object.
    
    134
    -
    
    135
    -        Returns:
    
    136
    -            :obj:`Digest`: The digest of the message.
    
    137
    -        """
    
    138
    -        return self._send_blob(message.SerializeToString())
    
    139
    -
    
    140 613
         def flush(self):
    
    141 614
             """Ensures any queued request gets sent."""
    
    142 615
             if self.__requests:
    
    143
    -            self._send_batch()
    
    616
    +            self._send_blob_batch(self.__requests)
    
    144 617
     
    
    145
    -    def _queue_blob(self, blob):
    
    146
    -        """Queues a memory block for later batch upload"""
    
    147
    -        blob_digest = remote_execution_pb2.Digest()
    
    148
    -        blob_digest.hash = HASH(blob).hexdigest()
    
    149
    -        blob_digest.size_bytes = len(blob)
    
    150
    -
    
    151
    -        if self.__request_size + len(blob) > Uploader.MAX_REQUEST_SIZE:
    
    152
    -            self._send_batch()
    
    618
    +            self.__requests.clear()
    
    619
    +            self.__request_count = 0
    
    620
    +            self.__request_size = 0
    
    153 621
     
    
    154
    -        update_request = remote_execution_pb2.BatchUpdateBlobsRequest.Request()
    
    155
    -        update_request.digest.CopyFrom(blob_digest)
    
    156
    -        update_request.data = blob
    
    622
    +    def close(self):
    
    623
    +        """Closes the underlying connection stubs.
    
    157 624
     
    
    158
    -        update_request_size = update_request.ByteSize()
    
    159
    -        if self.__request_size + update_request_size > Uploader.MAX_REQUEST_SIZE:
    
    160
    -            self._send_batch()
    
    625
    +        Note:
    
    626
    +            This will always send pending requests before closing connections,
    
    627
    +            if any.
    
    628
    +        """
    
    629
    +        self.flush()
    
    161 630
     
    
    162
    -        self.__requests[update_request.digest.hash] = update_request
    
    163
    -        self.__request_size += update_request_size
    
    631
    +        self.__bytestream_stub = None
    
    632
    +        self.__cas_stub = None
    
    164 633
     
    
    165
    -        return blob_digest
    
    634
    +    # --- Private API ---
    
    166 635
     
    
    167
    -    def _send_blob(self, blob):
    
    636
    +    def _send_blob(self, blob, digest=None):
    
    168 637
             """Sends a memory block using ByteStream.Write()"""
    
    169 638
             blob_digest = remote_execution_pb2.Digest()
    
    170
    -        blob_digest.hash = HASH(blob).hexdigest()
    
    171
    -        blob_digest.size_bytes = len(blob)
    
    172
    -
    
    639
    +        if digest is not None:
    
    640
    +            blob_digest.CopyFrom(digest)
    
    641
    +        else:
    
    642
    +            blob_digest.hash = HASH(blob).hexdigest()
    
    643
    +            blob_digest.size_bytes = len(blob)
    
    173 644
             if self.instance_name is not None:
    
    174 645
                 resource_name = '/'.join([self.instance_name, 'uploads', self.u_uid, 'blobs',
    
    175 646
                                           blob_digest.hash, str(blob_digest.size_bytes)])
    
    ... ... @@ -204,18 +675,64 @@ class Uploader:
    204 675
     
    
    205 676
             return blob_digest
    
    206 677
     
    
    207
    -    def _send_batch(self):
    
    678
    +    def _queue_blob(self, blob, digest=None):
    
    679
    +        """Queues a memory block for later batch upload"""
    
    680
    +        blob_digest = remote_execution_pb2.Digest()
    
    681
    +        if digest is not None:
    
    682
    +            blob_digest.CopyFrom(digest)
    
    683
    +        else:
    
    684
    +            blob_digest.hash = HASH(blob).hexdigest()
    
    685
    +            blob_digest.size_bytes = len(blob)
    
    686
    +
    
    687
    +        if self.__request_size + blob_digest.size_bytes > __MAX_REQUEST_SIZE:
    
    688
    +            self.flush()
    
    689
    +        elif self.__request_count >= __MAX_REQUEST_COUNT:
    
    690
    +            self.flush()
    
    691
    +
    
    692
    +        self.__requests[blob_digest.hash] = (blob, blob_digest)
    
    693
    +        self.__request_count += 1
    
    694
    +        self.__request_size += blob_digest.size_bytes
    
    695
    +
    
    696
    +        return blob_digest
    
    697
    +
    
    698
    +    def _send_blob_batch(self, batch):
    
    208 699
             """Sends queued data using ContentAddressableStorage.BatchUpdateBlobs()"""
    
    209
    -        batch_request = remote_execution_pb2.BatchUpdateBlobsRequest()
    
    210
    -        batch_request.requests.extend(self.__requests.values())
    
    211
    -        if self.instance_name is not None:
    
    212
    -            batch_request.instance_name = self.instance_name
    
    700
    +        batch_fetched = False
    
    701
    +        written_digests = list()
    
    213 702
     
    
    214
    -        batch_response = self.__cas_stub.BatchUpdateBlobs(batch_request)
    
    703
    +        # First, try BatchUpdateBlobs(), if not already known not being implemented:
    
    704
    +        if not CallCache.unimplemented(self.channel, 'BatchUpdateBlobs'):
    
    705
    +            batch_request = remote_execution_pb2.BatchUpdateBlobsRequest()
    
    706
    +            if self.instance_name is not None:
    
    707
    +                batch_request.instance_name = self.instance_name
    
    215 708
     
    
    216
    -        for response in batch_response.responses:
    
    217
    -            assert response.digest.hash in self.__requests
    
    218
    -            assert response.status.code is 0
    
    709
    +            for blob, digest in batch.values():
    
    710
    +                request = batch_request.requests.add()
    
    711
    +                request.digest.CopyFrom(digest)
    
    712
    +                request.data = blob
    
    219 713
     
    
    220
    -        self.__requests.clear()
    
    221
    -        self.__request_size = 0
    714
    +            try:
    
    715
    +                batch_response = self.__cas_stub.BatchUpdateBlobs(batch_request)
    
    716
    +                for response in batch_response.responses:
    
    717
    +                    assert response.digest.hash in batch
    
    718
    +
    
    719
    +                    written_digests.append(response.digest)
    
    720
    +                    if response.status.code != code_pb2.OK:
    
    721
    +                        response.digest.Clear()
    
    722
    +
    
    723
    +                batch_fetched = True
    
    724
    +
    
    725
    +            except grpc.RpcError as e:
    
    726
    +                status_code = e.code()
    
    727
    +                if status_code == grpc.StatusCode.UNIMPLEMENTED:
    
    728
    +                    CallCache.mark_unimplemented(self.channel, 'BatchUpdateBlobs')
    
    729
    +
    
    730
    +                else:
    
    731
    +                    assert False
    
    732
    +
    
    733
    +        # Fallback to Write() if no BatchUpdateBlobs():
    
    734
    +        if not batch_fetched:
    
    735
    +            for blob, digest in batch.values():
    
    736
    +                written_digests.append(self._send_blob(blob, digest=digest))
    
    737
    +
    
    738
    +        return written_digests

  • buildgrid/server/_exceptions.py deleted
    1
    -# Copyright (C) 2018 Bloomberg LP
    
    2
    -#
    
    3
    -# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    -# you may not use this file except in compliance with the License.
    
    5
    -# You may obtain a copy of the License at
    
    6
    -#
    
    7
    -#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    -#
    
    9
    -# Unless required by applicable law or agreed to in writing, software
    
    10
    -# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    -# See the License for the specific language governing permissions and
    
    13
    -# limitations under the License.
    
    14
    -
    
    15
    -
    
    16
    -from .._exceptions import BgdError, ErrorDomain
    
    17
    -
    
    18
    -
    
    19
    -class InvalidArgumentError(BgdError):
    
    20
    -    """A bad argument was passed, such as a name which doesn't exist.
    
    21
    -    """
    
    22
    -
    
    23
    -    def __init__(self, message, detail=None, reason=None):
    
    24
    -        super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
    
    25
    -
    
    26
    -
    
    27
    -class NotFoundError(BgdError):
    
    28
    -    """Requested resource not found.
    
    29
    -    """
    
    30
    -
    
    31
    -    def __init__(self, message, detail=None, reason=None):
    
    32
    -        super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
    
    33
    -
    
    34
    -
    
    35
    -class OutofSyncError(BgdError):
    
    36
    -    """The worker is out of sync with the server, such as having a differing number of leases.
    
    37
    -    """
    
    38
    -
    
    39
    -    def __init__(self, message, detail=None, reason=None):
    
    40
    -        super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
    
    41
    -
    
    42
    -
    
    43
    -class OutOfRangeError(BgdError):
    
    44
    -    """ ByteStream service read data out of range
    
    45
    -    """
    
    46
    -
    
    47
    -    def __init__(self, message, detail=None, reason=None):
    
    48
    -        super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)

  • buildgrid/server/actioncache/service.py
    ... ... @@ -24,11 +24,10 @@ import logging
    24 24
     
    
    25 25
     import grpc
    
    26 26
     
    
    27
    +from buildgrid._exceptions import InvalidArgumentError, NotFoundError
    
    27 28
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    28 29
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
    
    29 30
     
    
    30
    -from .._exceptions import InvalidArgumentError, NotFoundError
    
    31
    -
    
    32 31
     
    
    33 32
     class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer):
    
    34 33
     
    

  • buildgrid/server/bots/instance.py
    ... ... @@ -23,7 +23,8 @@ Instance of the Remote Workers interface.
    23 23
     import logging
    
    24 24
     import uuid
    
    25 25
     
    
    26
    -from .._exceptions import InvalidArgumentError, OutofSyncError
    
    26
    +from buildgrid._exceptions import InvalidArgumentError, OutOfSyncError
    
    27
    +
    
    27 28
     from ..job import LeaseState
    
    28 29
     
    
    29 30
     
    
    ... ... @@ -105,7 +106,7 @@ class BotsInterface:
    105 106
                     # TODO: Lease was rejected
    
    106 107
                     raise NotImplementedError("'Not Accepted' is unsupported")
    
    107 108
                 else:
    
    108
    -                raise OutofSyncError("Server lease: {}. Client lease: {}".format(server_lease, client_lease))
    
    109
    +                raise OutOfSyncError("Server lease: {}. Client lease: {}".format(server_lease, client_lease))
    
    109 110
     
    
    110 111
             elif server_state == LeaseState.ACTIVE:
    
    111 112
     
    
    ... ... @@ -118,17 +119,17 @@ class BotsInterface:
    118 119
                     return None
    
    119 120
     
    
    120 121
                 else:
    
    121
    -                raise OutofSyncError("Server lease: {}. Client lease: {}".format(server_lease, client_lease))
    
    122
    +                raise OutOfSyncError("Server lease: {}. Client lease: {}".format(server_lease, client_lease))
    
    122 123
     
    
    123 124
             elif server_state == LeaseState.COMPLETED:
    
    124
    -            raise OutofSyncError("Server lease: {}. Client lease: {}".format(server_lease, client_lease))
    
    125
    +            raise OutOfSyncError("Server lease: {}. Client lease: {}".format(server_lease, client_lease))
    
    125 126
     
    
    126 127
             elif server_state == LeaseState.CANCELLED:
    
    127 128
                 raise NotImplementedError("Cancelled states not supported yet")
    
    128 129
     
    
    129 130
             else:
    
    130 131
                 # Sould never get here
    
    131
    -            raise OutofSyncError("State now allowed: {}".format(server_state))
    
    132
    +            raise OutOfSyncError("State now allowed: {}".format(server_state))
    
    132 133
     
    
    133 134
             return client_lease
    
    134 135
     
    

  • buildgrid/server/bots/service.py
    ... ... @@ -25,11 +25,10 @@ import grpc
    25 25
     
    
    26 26
     from google.protobuf.empty_pb2 import Empty
    
    27 27
     
    
    28
    +from buildgrid._exceptions import InvalidArgumentError, OutOfSyncError
    
    28 29
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    29 30
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc
    
    30 31
     
    
    31
    -from .._exceptions import InvalidArgumentError, OutofSyncError
    
    32
    -
    
    33 32
     
    
    34 33
     class BotsService(bots_pb2_grpc.BotsServicer):
    
    35 34
     
    
    ... ... @@ -69,7 +68,7 @@ class BotsService(bots_pb2_grpc.BotsServicer):
    69 68
                 context.set_details(str(e))
    
    70 69
                 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    71 70
     
    
    72
    -        except OutofSyncError as e:
    
    71
    +        except OutOfSyncError as e:
    
    73 72
                 self.logger.error(e)
    
    74 73
                 context.set_details(str(e))
    
    75 74
                 context.set_code(grpc.StatusCode.DATA_LOSS)
    

  • buildgrid/server/cas/instance.py
    ... ... @@ -19,11 +19,10 @@ Storage Instances
    19 19
     Instances of CAS and ByteStream
    
    20 20
     """
    
    21 21
     
    
    22
    +from buildgrid._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
    
    22 23
     from buildgrid._protos.google.bytestream import bytestream_pb2
    
    23 24
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
    
    24
    -
    
    25
    -from .._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
    
    26
    -from ...settings import HASH
    
    25
    +from buildgrid.settings import HASH
    
    27 26
     
    
    28 27
     
    
    29 28
     class ContentAddressableStorageInstance:
    

  • buildgrid/server/cas/service.py
    ... ... @@ -26,12 +26,11 @@ import logging
    26 26
     
    
    27 27
     import grpc
    
    28 28
     
    
    29
    +from buildgrid._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
    
    29 30
     from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    30 31
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    31 32
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
    
    32 33
     
    
    33
    -from .._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
    
    34
    -
    
    35 34
     
    
    36 35
     class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
    
    37 36
     
    

  • buildgrid/server/cas/storage/remote.py
    ... ... @@ -23,11 +23,10 @@ Forwwards storage requests to a remote storage.
    23 23
     import io
    
    24 24
     import logging
    
    25 25
     
    
    26
    -import grpc
    
    27
    -
    
    28
    -from buildgrid.utils import gen_fetch_blob, gen_write_request_blob
    
    29
    -from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
    
    26
    +from buildgrid.client.cas import download, upload
    
    30 27
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    28
    +from buildgrid._protos.google.rpc import code_pb2
    
    29
    +from buildgrid._protos.google.rpc import status_pb2
    
    31 30
     
    
    32 31
     from .storage_abc import StorageABC
    
    33 32
     
    
    ... ... @@ -36,8 +35,10 @@ class RemoteStorage(StorageABC):
    36 35
     
    
    37 36
         def __init__(self, channel, instance_name):
    
    38 37
             self.logger = logging.getLogger(__name__)
    
    39
    -        self._instance_name = instance_name
    
    40
    -        self._stub_bs = bytestream_pb2_grpc.ByteStreamStub(channel)
    
    38
    +
    
    39
    +        self.instance_name = instance_name
    
    40
    +        self.channel = channel
    
    41
    +
    
    41 42
             self._stub_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(channel)
    
    42 43
     
    
    43 44
         def has_blob(self, digest):
    
    ... ... @@ -46,41 +47,18 @@ class RemoteStorage(StorageABC):
    46 47
             return False
    
    47 48
     
    
    48 49
         def get_blob(self, digest):
    
    49
    -        try:
    
    50
    -            fetched_data = io.BytesIO()
    
    51
    -            length = 0
    
    52
    -
    
    53
    -            for data in gen_fetch_blob(self._stub_bs, digest, self._instance_name):
    
    54
    -                length += fetched_data.write(data)
    
    55
    -
    
    56
    -            if length:
    
    57
    -                assert digest.size_bytes == length
    
    58
    -                fetched_data.seek(0)
    
    59
    -                return fetched_data
    
    60
    -
    
    61
    -            else:
    
    62
    -                return None
    
    63
    -
    
    64
    -        except grpc.RpcError as e:
    
    65
    -            if e.code() == grpc.StatusCode.NOT_FOUND:
    
    66
    -                pass
    
    67
    -            else:
    
    68
    -                self.logger.error(e.details())
    
    69
    -                raise
    
    70
    -
    
    71
    -        return None
    
    50
    +        with download(self.channel, instance=self.instance_name) as cas:
    
    51
    +            return io.BytesIO(cas.get_blob(digest))
    
    72 52
     
    
    73 53
         def begin_write(self, digest):
    
    74 54
             return io.BytesIO(digest.SerializeToString())
    
    75 55
     
    
    76 56
         def commit_write(self, digest, write_session):
    
    77
    -        write_session.seek(0)
    
    78
    -
    
    79
    -        for request in gen_write_request_blob(write_session, digest, self._instance_name):
    
    80
    -            self._stub_bs.Write(request)
    
    57
    +        with upload(self.channel, instance=self.instance_name) as cas:
    
    58
    +            cas.put_blob(write_session.getvalue())
    
    81 59
     
    
    82 60
         def missing_blobs(self, blobs):
    
    83
    -        request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=self._instance_name)
    
    61
    +        request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=self.instance_name)
    
    84 62
     
    
    85 63
             for blob in blobs:
    
    86 64
                 request_digest = request.blob_digests.add()
    
    ... ... @@ -92,19 +70,12 @@ class RemoteStorage(StorageABC):
    92 70
             return [x for x in response.missing_blob_digests]
    
    93 71
     
    
    94 72
         def bulk_update_blobs(self, blobs):
    
    95
    -        request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=self._instance_name)
    
    96
    -
    
    97
    -        for digest, data in blobs:
    
    98
    -            reqs = request.requests.add()
    
    99
    -            reqs.digest.CopyFrom(digest)
    
    100
    -            reqs.data = data
    
    101
    -
    
    102
    -        response = self._stub_cas.BatchUpdateBlobs(request)
    
    103
    -
    
    104
    -        responses = response.responses
    
    73
    +        sent_digests = list()
    
    74
    +        with upload(self.channel, instance=self.instance_name) as cas:
    
    75
    +            for digest, blob in blobs:
    
    76
    +                sent_digests.append(cas.put_blob(blob, digest=digest, queue=True))
    
    105 77
     
    
    106
    -        # Check everything was sent back, even if order changed
    
    107
    -        assert ([x.digest for x in request.requests].sort(key=lambda x: x.hash)) == \
    
    108
    -            ([x.digest for x in responses].sort(key=lambda x: x.hash))
    
    78
    +        assert len(sent_digests) == len(blobs)
    
    109 79
     
    
    110
    -        return [x.status for x in responses]
    80
    +        return [status_pb2.Status(code=code_pb2.OK) if d.ByteSize() > 0
    
    81
    +                else status_pb2.Status(code=code_pb2.UNKNOWN) for d in sent_digests]

  • buildgrid/server/execution/instance.py
    ... ... @@ -21,10 +21,10 @@ An instance of the Remote Execution Service.
    21 21
     
    
    22 22
     import logging
    
    23 23
     
    
    24
    +from buildgrid._exceptions import InvalidArgumentError
    
    24 25
     from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
    
    25 26
     
    
    26 27
     from ..job import Job
    
    27
    -from .._exceptions import InvalidArgumentError
    
    28 28
     
    
    29 29
     
    
    30 30
     class ExecutionInstance:
    

  • buildgrid/server/execution/service.py
    ... ... @@ -26,12 +26,10 @@ from functools import partial
    26 26
     
    
    27 27
     import grpc
    
    28 28
     
    
    29
    +from buildgrid._exceptions import InvalidArgumentError
    
    29 30
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
    
    30
    -
    
    31 31
     from buildgrid._protos.google.longrunning import operations_pb2
    
    32 32
     
    
    33
    -from .._exceptions import InvalidArgumentError
    
    34
    -
    
    35 33
     
    
    36 34
     class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    
    37 35
     
    

  • buildgrid/server/operations/instance.py
    ... ... @@ -21,7 +21,7 @@ An instance of the LongRunningOperations Service.
    21 21
     
    
    22 22
     import logging
    
    23 23
     
    
    24
    -from .._exceptions import InvalidArgumentError
    
    24
    +from buildgrid._exceptions import InvalidArgumentError
    
    25 25
     
    
    26 26
     
    
    27 27
     class OperationsInstance:
    

  • buildgrid/server/operations/service.py
    ... ... @@ -25,10 +25,9 @@ import grpc
    25 25
     
    
    26 26
     from google.protobuf.empty_pb2 import Empty
    
    27 27
     
    
    28
    +from buildgrid._exceptions import InvalidArgumentError
    
    28 29
     from buildgrid._protos.google.longrunning import operations_pb2_grpc, operations_pb2
    
    29 30
     
    
    30
    -from .._exceptions import InvalidArgumentError
    
    31
    -
    
    32 31
     
    
    33 32
     class OperationsService(operations_pb2_grpc.OperationsServicer):
    
    34 33
     
    

  • buildgrid/server/referencestorage/service.py
    ... ... @@ -17,11 +17,10 @@ import logging
    17 17
     
    
    18 18
     import grpc
    
    19 19
     
    
    20
    +from buildgrid._exceptions import InvalidArgumentError, NotFoundError
    
    20 21
     from buildgrid._protos.buildstream.v2 import buildstream_pb2
    
    21 22
     from buildgrid._protos.buildstream.v2 import buildstream_pb2_grpc
    
    22 23
     
    
    23
    -from .._exceptions import InvalidArgumentError, NotFoundError
    
    24
    -
    
    25 24
     
    
    26 25
     class ReferenceStorageService(buildstream_pb2_grpc.ReferenceStorageServicer):
    
    27 26
     
    

  • buildgrid/server/referencestorage/storage.py
    ... ... @@ -24,10 +24,9 @@ For a given key, it
    24 24
     
    
    25 25
     import collections
    
    26 26
     
    
    27
    +from buildgrid._exceptions import NotFoundError
    
    27 28
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    28 29
     
    
    29
    -from .._exceptions import NotFoundError
    
    30
    -
    
    31 30
     
    
    32 31
     class ReferenceCache:
    
    33 32
     
    

  • buildgrid/server/scheduler.py
    ... ... @@ -26,7 +26,7 @@ from collections import deque
    26 26
     from google.protobuf import any_pb2
    
    27 27
     
    
    28 28
     
    
    29
    -from buildgrid.server._exceptions import NotFoundError
    
    29
    +from buildgrid._exceptions import NotFoundError
    
    30 30
     from buildgrid._protos.google.longrunning import operations_pb2
    
    31 31
     
    
    32 32
     from .job import ExecuteStage, LeaseState
    

  • buildgrid/utils.py
    ... ... @@ -15,117 +15,9 @@
    15 15
     
    
    16 16
     from operator import attrgetter
    
    17 17
     import os
    
    18
    -import uuid
    
    19 18
     
    
    20 19
     from buildgrid.settings import HASH
    
    21 20
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    22
    -from buildgrid._protos.google.bytestream import bytestream_pb2
    
    23
    -
    
    24
    -
    
    25
    -def gen_fetch_blob(stub, digest, instance_name=""):
    
    26
    -    """ Generates byte stream from a fetch blob request
    
    27
    -    """
    
    28
    -
    
    29
    -    resource_name = os.path.join(instance_name, 'blobs', digest.hash, str(digest.size_bytes))
    
    30
    -    request = bytestream_pb2.ReadRequest(resource_name=resource_name,
    
    31
    -                                         read_offset=0)
    
    32
    -
    
    33
    -    for response in stub.Read(request):
    
    34
    -        yield response.data
    
    35
    -
    
    36
    -
    
    37
    -def gen_write_request_blob(digest_bytes, digest, instance_name=""):
    
    38
    -    """ Generates a bytestream write request
    
    39
    -    """
    
    40
    -    resource_name = os.path.join(instance_name, 'uploads', str(uuid.uuid4()),
    
    41
    -                                 'blobs', digest.hash, str(digest.size_bytes))
    
    42
    -
    
    43
    -    offset = 0
    
    44
    -    finished = False
    
    45
    -    remaining = digest.size_bytes
    
    46
    -
    
    47
    -    while not finished:
    
    48
    -        chunk_size = min(remaining, 64 * 1024)
    
    49
    -        remaining -= chunk_size
    
    50
    -        finished = remaining <= 0
    
    51
    -
    
    52
    -        request = bytestream_pb2.WriteRequest()
    
    53
    -        request.resource_name = resource_name
    
    54
    -        request.write_offset = offset
    
    55
    -        request.data = digest_bytes.read(chunk_size)
    
    56
    -        request.finish_write = finished
    
    57
    -
    
    58
    -        yield request
    
    59
    -
    
    60
    -        offset += chunk_size
    
    61
    -
    
    62
    -
    
    63
    -def write_fetch_directory(root_directory, stub, digest, instance_name=None):
    
    64
    -    """Locally replicates a directory from CAS.
    
    65
    -
    
    66
    -    Args:
    
    67
    -        root_directory (str): local directory to populate.
    
    68
    -        stub (): gRPC stub for CAS communication.
    
    69
    -        digest (Digest): digest for the directory to fetch from CAS.
    
    70
    -        instance_name (str, optional): farm instance name to query data from.
    
    71
    -    """
    
    72
    -
    
    73
    -    if not os.path.isabs(root_directory):
    
    74
    -        root_directory = os.path.abspath(root_directory)
    
    75
    -    if not os.path.exists(root_directory):
    
    76
    -        os.makedirs(root_directory, exist_ok=True)
    
    77
    -
    
    78
    -    directory = parse_to_pb2_from_fetch(remote_execution_pb2.Directory(),
    
    79
    -                                        stub, digest, instance_name)
    
    80
    -
    
    81
    -    for directory_node in directory.directories:
    
    82
    -        child_path = os.path.join(root_directory, directory_node.name)
    
    83
    -
    
    84
    -        write_fetch_directory(child_path, stub, directory_node.digest, instance_name)
    
    85
    -
    
    86
    -    for file_node in directory.files:
    
    87
    -        child_path = os.path.join(root_directory, file_node.name)
    
    88
    -
    
    89
    -        with open(child_path, 'wb') as child_file:
    
    90
    -            write_fetch_blob(child_file, stub, file_node.digest, instance_name)
    
    91
    -
    
    92
    -    for symlink_node in directory.symlinks:
    
    93
    -        child_path = os.path.join(root_directory, symlink_node.name)
    
    94
    -
    
    95
    -        if os.path.isabs(symlink_node.target):
    
    96
    -            continue  # No out of temp-directory links for now.
    
    97
    -        target_path = os.path.join(root_directory, symlink_node.target)
    
    98
    -
    
    99
    -        os.symlink(child_path, target_path)
    
    100
    -
    
    101
    -
    
    102
    -def write_fetch_blob(target_file, stub, digest, instance_name=None):
    
    103
    -    """Extracts a blob from CAS into a local file.
    
    104
    -
    
    105
    -    Args:
    
    106
    -        target_file (str): local file to write.
    
    107
    -        stub (): gRPC stub for CAS communication.
    
    108
    -        digest (Digest): digest for the blob to fetch from CAS.
    
    109
    -        instance_name (str, optional): farm instance name to query data from.
    
    110
    -    """
    
    111
    -
    
    112
    -    for stream in gen_fetch_blob(stub, digest, instance_name):
    
    113
    -        target_file.write(stream)
    
    114
    -    target_file.flush()
    
    115
    -
    
    116
    -    assert digest.size_bytes == os.fstat(target_file.fileno()).st_size
    
    117
    -
    
    118
    -
    
    119
    -def parse_to_pb2_from_fetch(pb2, stub, digest, instance_name=""):
    
    120
    -    """ Fetches stream and parses it into given pb2
    
    121
    -    """
    
    122
    -
    
    123
    -    stream_bytes = b''
    
    124
    -    for stream in gen_fetch_blob(stub, digest, instance_name):
    
    125
    -        stream_bytes += stream
    
    126
    -
    
    127
    -    pb2.ParseFromString(stream_bytes)
    
    128
    -    return pb2
    
    129 21
     
    
    130 22
     
    
    131 23
     def create_digest(bytes_to_digest):
    
    ... ... @@ -280,8 +172,12 @@ def tree_maker(directory_path, cas=None):
    280 172
         tree.children.extend(child_directories)
    
    281 173
         tree.root.CopyFrom(directory)
    
    282 174
     
    
    175
    +    # Ensure that we've uploded the tree structure first
    
    176
    +    if cas is not None:
    
    177
    +        cas.flush()
    
    178
    +
    
    283 179
         if cas is not None:
    
    284
    -        tree_digest = cas.send_message(tree)
    
    180
    +        tree_digest = cas.put_message(tree)
    
    285 181
         else:
    
    286 182
             tree_digest = create_digest(tree.SerializeToString())
    
    287 183
     
    

  • tests/action_cache.py
    ... ... @@ -17,10 +17,10 @@
    17 17
     
    
    18 18
     import pytest
    
    19 19
     
    
    20
    +from buildgrid._exceptions import NotFoundError
    
    21
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    20 22
     from buildgrid.server.actioncache.storage import ActionCache
    
    21 23
     from buildgrid.server.cas.storage import lru_memory_cache
    
    22
    -from buildgrid.server._exceptions import NotFoundError
    
    23
    -from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    24 24
     
    
    25 25
     
    
    26 26
     @pytest.fixture
    

  • tests/integration/operations_service.py
    ... ... @@ -24,12 +24,10 @@ import grpc
    24 24
     from grpc._server import _Context
    
    25 25
     import pytest
    
    26 26
     
    
    27
    +from buildgrid._exceptions import InvalidArgumentError
    
    27 28
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    28 29
     from buildgrid._protos.google.longrunning import operations_pb2
    
    29
    -
    
    30 30
     from buildgrid.server.controller import ExecutionController
    
    31
    -from buildgrid.server._exceptions import InvalidArgumentError
    
    32
    -
    
    33 31
     from buildgrid.server.operations import service
    
    34 32
     from buildgrid.server.operations.service import OperationsService
    
    35 33
     
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]