[Notes] [Git][BuildGrid/buildgrid][mablanch/79-cas-downloader] 12 commits: Added remote parser.



Title: GitLab

Martin Blanchard pushed to branch mablanch/79-cas-downloader at BuildGrid / buildgrid

Commits:

26 changed files:

Changes:

  • buildgrid/_app/bots/buildbox.py
    ... ... @@ -19,28 +19,31 @@ import tempfile
    19 19
     
    
    20 20
     from google.protobuf import any_pb2
    
    21 21
     
    
    22
    -from buildgrid.client.cas import upload
    
    22
    +from buildgrid.client.cas import download, upload
    
    23 23
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    24 24
     from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
    
    25
    -from buildgrid.utils import read_file, write_file, parse_to_pb2_from_fetch
    
    25
    +from buildgrid.utils import read_file, write_file
    
    26 26
     
    
    27 27
     
    
    28 28
     def work_buildbox(context, lease):
    
    29 29
         """Executes a lease for a build action, using buildbox.
    
    30 30
         """
    
    31 31
     
    
    32
    -    stub_bytestream = bytestream_pb2_grpc.ByteStreamStub(context.cas_channel)
    
    33 32
         local_cas_directory = context.local_cas
    
    33
    +    # instance_name = context.parent
    
    34 34
         logger = context.logger
    
    35 35
     
    
    36 36
         action_digest = remote_execution_pb2.Digest()
    
    37 37
         lease.payload.Unpack(action_digest)
    
    38 38
     
    
    39
    -    action = parse_to_pb2_from_fetch(remote_execution_pb2.Action(),
    
    40
    -                                     stub_bytestream, action_digest)
    
    39
    +    with download(context.cas_channel) as cas:
    
    40
    +        action = cas.get_message(action_digest,
    
    41
    +                                 remote_execution_pb2.Action())
    
    41 42
     
    
    42
    -    command = parse_to_pb2_from_fetch(remote_execution_pb2.Command(),
    
    43
    -                                      stub_bytestream, action.command_digest)
    
    43
    +        assert action.command_digest.hash
    
    44
    +
    
    45
    +        command = cas.get_message(action.command_digest,
    
    46
    +                                  remote_execution_pb2.Command())
    
    44 47
     
    
    45 48
         environment = dict()
    
    46 49
         for variable in command.environment_variables:
    
    ... ... @@ -101,7 +104,8 @@ def work_buildbox(context, lease):
    101 104
     
    
    102 105
                 # TODO: Have BuildBox helping us creating the Tree instance here
    
    103 106
                 # See https://gitlab.com/BuildStream/buildbox/issues/7 for details
    
    104
    -            output_tree = _cas_tree_maker(stub_bytestream, output_digest)
    
    107
    +            with download(context.cas_channel) as cas:
    
    108
    +                output_tree = _cas_tree_maker(cas, output_digest)
    
    105 109
     
    
    106 110
                 with upload(context.cas_channel) as cas:
    
    107 111
                     output_tree_digest = cas.send_message(output_tree)
    
    ... ... @@ -121,24 +125,28 @@ def work_buildbox(context, lease):
    121 125
         return lease
    
    122 126
     
    
    123 127
     
    
    124
    -def _cas_tree_maker(stub_bytestream, directory_digest):
    
    128
    +def _cas_tree_maker(cas, directory_digest):
    
    125 129
         # Generates and stores a Tree for a given Directory. This is very inefficient
    
    126 130
         # and only temporary. See https://gitlab.com/BuildStream/buildbox/issues/7.
    
    127 131
         output_tree = remote_execution_pb2.Tree()
    
    128 132
     
    
    129
    -    def list_directories(parent_directory):
    
    130
    -        directory_list = list()
    
    133
    +    def __cas_tree_maker(cas, parent_directory):
    
    134
    +        digests, directories = list(), list()
    
    131 135
             for directory_node in parent_directory.directories:
    
    132
    -            directory = parse_to_pb2_from_fetch(remote_execution_pb2.Directory(),
    
    133
    -                                                stub_bytestream, directory_node.digest)
    
    134
    -            directory_list.extend(list_directories(directory))
    
    135
    -            directory_list.append(directory)
    
    136
    +            directories.append(remote_execution_pb2.Directory())
    
    137
    +            digests.append(directory_node.digest)
    
    138
    +
    
    139
    +        cas.get_messages(digests, directories)
    
    140
    +
    
    141
    +        for directory in directories[:]:
    
    142
    +            directories.extend(__cas_tree_maker(directory))
    
    143
    +
    
    144
    +        return directories
    
    136 145
     
    
    137
    -        return directory_list
    
    146
    +    root_directory = cas.get_message(directory_digest,
    
    147
    +                                     remote_execution_pb2.Directory())
    
    138 148
     
    
    139
    -    root_directory = parse_to_pb2_from_fetch(remote_execution_pb2.Directory(),
    
    140
    -                                             stub_bytestream, directory_digest)
    
    141
    -    output_tree.children.extend(list_directories(root_directory))
    
    149
    +    output_tree.children.extend(__cas_tree_maker(root_directory))
    
    142 150
         output_tree.root.CopyFrom(root_directory)
    
    143 151
     
    
    144 152
         return output_tree

  • buildgrid/_app/bots/temp_directory.py
    ... ... @@ -19,10 +19,9 @@ import tempfile
    19 19
     
    
    20 20
     from google.protobuf import any_pb2
    
    21 21
     
    
    22
    -from buildgrid.client.cas import upload
    
    22
    +from buildgrid.client.cas import download, upload
    
    23 23
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    24 24
     from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
    
    25
    -from buildgrid.utils import write_fetch_directory, parse_to_pb2_from_fetch
    
    26 25
     from buildgrid.utils import output_file_maker, output_directory_maker
    
    27 26
     
    
    28 27
     
    
    ... ... @@ -30,22 +29,23 @@ def work_temp_directory(context, lease):
    30 29
         """Executes a lease for a build action, using host tools.
    
    31 30
         """
    
    32 31
     
    
    33
    -    stub_bytestream = bytestream_pb2_grpc.ByteStreamStub(context.cas_channel)
    
    34 32
         instance_name = context.parent
    
    35 33
         logger = context.logger
    
    36 34
     
    
    37 35
         action_digest = remote_execution_pb2.Digest()
    
    38 36
         lease.payload.Unpack(action_digest)
    
    39 37
     
    
    40
    -    action = parse_to_pb2_from_fetch(remote_execution_pb2.Action(),
    
    41
    -                                     stub_bytestream, action_digest, instance_name)
    
    42
    -
    
    43 38
         with tempfile.TemporaryDirectory() as temp_directory:
    
    44
    -        command = parse_to_pb2_from_fetch(remote_execution_pb2.Command(),
    
    45
    -                                          stub_bytestream, action.command_digest, instance_name)
    
    39
    +        with download(context.cas_channel, instance=instance_name) as cas:
    
    40
    +            action = cas.get_message(action_digest,
    
    41
    +                                     remote_execution_pb2.Action())
    
    42
    +
    
    43
    +            assert action.command_digest.hash
    
    44
    +
    
    45
    +            command = cas.get_message(action.command_digest,
    
    46
    +                                      remote_execution_pb2.Command())
    
    46 47
     
    
    47
    -        write_fetch_directory(temp_directory, stub_bytestream,
    
    48
    -                              action.input_root_digest, instance_name)
    
    48
    +            cas.download_directory(action.input_root_digest, temp_directory)
    
    49 49
     
    
    50 50
             environment = os.environ.copy()
    
    51 51
             for variable in command.environment_variables:
    

  • buildgrid/_app/commands/cmd_execute.py
    ... ... @@ -30,7 +30,7 @@ from urllib.parse import urlparse
    30 30
     import click
    
    31 31
     import grpc
    
    32 32
     
    
    33
    -from buildgrid.utils import merkle_maker, create_digest, write_fetch_blob
    
    33
    +from buildgrid.utils import merkle_maker, create_digest
    
    34 34
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    35 35
     from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
    
    36 36
     
    
    ... ... @@ -165,8 +165,6 @@ def command(context, input_root, commands, output_file, output_directory):
    165 165
                                                       skip_cache_lookup=True)
    
    166 166
         response = stub.Execute(request)
    
    167 167
     
    
    168
    -    stub = bytestream_pb2_grpc.ByteStreamStub(context.channel)
    
    169
    -
    
    170 168
         stream = None
    
    171 169
         for stream in response:
    
    172 170
             context.logger.info(stream)
    
    ... ... @@ -174,21 +172,16 @@ def command(context, input_root, commands, output_file, output_directory):
    174 172
         execute_response = remote_execution_pb2.ExecuteResponse()
    
    175 173
         stream.response.Unpack(execute_response)
    
    176 174
     
    
    177
    -    for output_file_response in execute_response.result.output_files:
    
    178
    -        path = os.path.join(output_directory, output_file_response.path)
    
    179
    -
    
    180
    -        if not os.path.exists(os.path.dirname(path)):
    
    175
    +    with download(context.channel, instance=context.instance_name) as cas:
    
    181 176
     
    
    182
    -            try:
    
    183
    -                os.makedirs(os.path.dirname(path))
    
    177
    +        for output_file_response in execute_response.result.output_files:
    
    178
    +            path = os.path.join(output_directory, output_file_response.path)
    
    184 179
     
    
    185
    -            except OSError as exc:
    
    186
    -                if exc.errno != errno.EEXIST:
    
    187
    -                    raise
    
    180
    +            if not os.path.exists(os.path.dirname(path)):
    
    181
    +                os.makedirs(os.path.dirname(path), exist_ok=True)
    
    188 182
     
    
    189
    -        with open(path, 'wb+') as f:
    
    190
    -            write_fetch_blob(f, stub, output_file_response.digest, context.instance_name)
    
    183
    +            cas.download_file(output_file_response.digest, path)
    
    191 184
     
    
    192
    -        if output_file_response.path in output_executeables:
    
    193
    -            st = os.stat(path)
    
    194
    -            os.chmod(path, st.st_mode | stat.S_IXUSR)
    185
    +            if output_file_response.path in output_executeables:
    
    186
    +                st = os.stat(path)
    
    187
    +                os.chmod(path, st.st_mode | stat.S_IXUSR)

  • buildgrid/_app/commands/cmd_server.py
    ... ... @@ -49,41 +49,48 @@ def start(context, config):
    49 49
         with open(config) as f:
    
    50 50
             settings = parser.get_parser().safe_load(f)
    
    51 51
     
    
    52
    -    server_settings = settings['server']
    
    53
    -    insecure_mode = server_settings['insecure-mode']
    
    54
    -
    
    55
    -    credentials = None
    
    56
    -    if not insecure_mode:
    
    57
    -        server_key = server_settings['tls-server-key']
    
    58
    -        server_cert = server_settings['tls-server-cert']
    
    59
    -        client_certs = server_settings['tls-client-certs']
    
    60
    -        credentials = context.load_server_credentials(server_key, server_cert, client_certs)
    
    61
    -
    
    62
    -        if not credentials:
    
    63
    -            click.echo("ERROR: no TLS keys were specified and no defaults could be found.\n" +
    
    64
    -                       "Set `insecure-mode: false` in order to deactivate TLS encryption.\n", err=True)
    
    65
    -            sys.exit(-1)
    
    66
    -
    
    67
    -    instances = settings['instances']
    
    68
    -
    
    69
    -    execution_controllers = _instance_maker(instances, ExecutionController)
    
    70
    -
    
    71
    -    execution_instances = {}
    
    72
    -    bots_interfaces = {}
    
    73
    -    operations_instances = {}
    
    74
    -
    
    75
    -    # TODO: map properly in parser
    
    76
    -    for k, v in execution_controllers.items():
    
    77
    -        execution_instances[k] = v.execution_instance
    
    78
    -        bots_interfaces[k] = v.bots_interface
    
    79
    -        operations_instances[k] = v.operations_instance
    
    80
    -
    
    81
    -    reference_caches = _instance_maker(instances, ReferenceCache)
    
    82
    -    action_caches = _instance_maker(instances, ActionCache)
    
    83
    -    cas = _instance_maker(instances, ContentAddressableStorageInstance)
    
    84
    -    bytestreams = _instance_maker(instances, ByteStreamInstance)
    
    52
    +    try:
    
    53
    +        server_settings = settings['server']
    
    54
    +        insecure_mode = server_settings['insecure-mode']
    
    55
    +
    
    56
    +        credentials = None
    
    57
    +        if not insecure_mode:
    
    58
    +            credential_settings = server_settings['credentials']
    
    59
    +            server_key = credential_settings['tls-server-key']
    
    60
    +            server_cert = credential_settings['tls-server-cert']
    
    61
    +            client_certs = credential_settings['tls-client-certs']
    
    62
    +            credentials = context.load_server_credentials(server_key, server_cert, client_certs)
    
    63
    +
    
    64
    +            if not credentials:
    
    65
    +                click.echo("ERROR: no TLS keys were specified and no defaults could be found.\n" +
    
    66
    +                           "Set `insecure-mode: false` in order to deactivate TLS encryption.\n", err=True)
    
    67
    +                sys.exit(-1)
    
    68
    +
    
    69
    +        port = server_settings['port']
    
    70
    +        instances = settings['instances']
    
    71
    +
    
    72
    +        execution_controllers = _instance_maker(instances, ExecutionController)
    
    73
    +
    
    74
    +        execution_instances = {}
    
    75
    +        bots_interfaces = {}
    
    76
    +        operations_instances = {}
    
    77
    +
    
    78
    +        # TODO: map properly in parser
    
    79
    +        # Issue 82
    
    80
    +        for k, v in execution_controllers.items():
    
    81
    +            execution_instances[k] = v.execution_instance
    
    82
    +            bots_interfaces[k] = v.bots_interface
    
    83
    +            operations_instances[k] = v.operations_instance
    
    84
    +
    
    85
    +        reference_caches = _instance_maker(instances, ReferenceCache)
    
    86
    +        action_caches = _instance_maker(instances, ActionCache)
    
    87
    +        cas = _instance_maker(instances, ContentAddressableStorageInstance)
    
    88
    +        bytestreams = _instance_maker(instances, ByteStreamInstance)
    
    89
    +
    
    90
    +    except KeyError as e:
    
    91
    +        click.echo("ERROR: Could not parse config: {}.\n".format(str(e)), err=True)
    
    92
    +        sys.exit(-1)
    
    85 93
     
    
    86
    -    port = server_settings['port']
    
    87 94
         server = BuildGridServer(port=port,
    
    88 95
                                  credentials=credentials,
    
    89 96
                                  execution_instances=execution_instances,
    

  • buildgrid/_app/settings/__init__.py

  • buildgrid/_app/settings/cas.yml
    1
    +server:
    
    2
    +  port: 50052
    
    3
    +  insecure-mode: true
    
    4
    +  credentials:
    
    5
    +    tls-server-key: null
    
    6
    +    tls-server-cert: null
    
    7
    +    tls-client-certs: null
    
    8
    +
    
    9
    +description: |
    
    10
    +  Just a CAS.
    
    11
    +
    
    12
    +instances:
    
    13
    +  - name: main
    
    14
    +    description: |
    
    15
    +      The main server
    
    16
    +
    
    17
    +    storages:
    
    18
    +        - !disk-storage &main-storage
    
    19
    +          path: ~/cas/
    
    20
    +
    
    21
    +    services:
    
    22
    +      - !cas
    
    23
    +        storage: *main-storage
    
    24
    +
    
    25
    +      - !bytestream
    
    26
    +        storage: *main-storage

  • buildgrid/_app/settings/default.yml
    1 1
     server:
    
    2 2
       port: 50051
    
    3
    -  tls-server-key: null
    
    4
    -  tls-server-cert: null
    
    5
    -  tls-client-certs: null
    
    6 3
       insecure-mode: true
    
    4
    +  credentials:
    
    5
    +    tls-server-key: null
    
    6
    +    tls-server-cert: null
    
    7
    +    tls-client-certs: null
    
    7 8
     
    
    8 9
     description: |
    
    9 10
       A single default instance
    

  • buildgrid/_app/settings/parser.py
    ... ... @@ -14,7 +14,11 @@
    14 14
     
    
    15 15
     
    
    16 16
     import os
    
    17
    +import sys
    
    18
    +from urllib.parse import urlparse
    
    17 19
     
    
    20
    +import click
    
    21
    +import grpc
    
    18 22
     import yaml
    
    19 23
     
    
    20 24
     from buildgrid.server.controller import ExecutionController
    
    ... ... @@ -22,9 +26,12 @@ from buildgrid.server.actioncache.storage import ActionCache
    22 26
     from buildgrid.server.cas.instance import ByteStreamInstance, ContentAddressableStorageInstance
    
    23 27
     from buildgrid.server.cas.storage.disk import DiskStorage
    
    24 28
     from buildgrid.server.cas.storage.lru_memory_cache import LRUMemoryCache
    
    29
    +from buildgrid.server.cas.storage.remote import RemoteStorage
    
    25 30
     from buildgrid.server.cas.storage.s3 import S3Storage
    
    26 31
     from buildgrid.server.cas.storage.with_cache import WithCacheStorage
    
    27 32
     
    
    33
    +from ..cli import Context
    
    34
    +
    
    28 35
     
    
    29 36
     class YamlFactory(yaml.YAMLObject):
    
    30 37
         @classmethod
    
    ... ... @@ -58,6 +65,47 @@ class S3(YamlFactory):
    58 65
             return S3Storage(bucket, endpoint_url=endpoint)
    
    59 66
     
    
    60 67
     
    
    68
    +class Remote(YamlFactory):
    
    69
    +
    
    70
    +    yaml_tag = u'!remote-storage'
    
    71
    +
    
    72
    +    def __new__(cls, url, instance_name, credentials=None):
    
    73
    +        # TODO: Context could be passed into the parser.
    
    74
    +        # Also find way to get instance_name from parent
    
    75
    +        # Issue 82
    
    76
    +        context = Context()
    
    77
    +
    
    78
    +        url = urlparse(url)
    
    79
    +        remote = '{}:{}'.format(url.hostname, url.port or 50051)
    
    80
    +
    
    81
    +        channel = None
    
    82
    +        if url.scheme == 'http':
    
    83
    +            channel = grpc.insecure_channel(remote)
    
    84
    +
    
    85
    +        else:
    
    86
    +            if not credentials:
    
    87
    +                click.echo("ERROR: no TLS keys were specified and no defaults could be found.\n" +
    
    88
    +                           "Set remote url scheme to `http` in order to deactivate" +
    
    89
    +                           "TLS encryption.\n", err=True)
    
    90
    +                sys.exit(-1)
    
    91
    +
    
    92
    +            client_key = credentials['tls-client-key']
    
    93
    +            client_cert = credentials['tls-client-cert']
    
    94
    +            server_cert = credentials['tls-server-cert']
    
    95
    +            credentials = context.load_client_credentials(client_key,
    
    96
    +                                                          client_cert,
    
    97
    +                                                          server_cert)
    
    98
    +            if not credentials:
    
    99
    +                click.echo("ERROR: no TLS keys were specified and no defaults could be found.\n" +
    
    100
    +                           "Set remote url scheme to `http` in order to deactivate" +
    
    101
    +                           "TLS encryption.\n", err=True)
    
    102
    +                sys.exit(-1)
    
    103
    +
    
    104
    +            channel = grpc.secure_channel(remote, credentials)
    
    105
    +
    
    106
    +        return RemoteStorage(channel, instance_name)
    
    107
    +
    
    108
    +
    
    61 109
     class WithCache(YamlFactory):
    
    62 110
     
    
    63 111
         yaml_tag = u'!with-cache-storage'
    
    ... ... @@ -118,6 +166,7 @@ def get_parser():
    118 166
         yaml.SafeLoader.add_constructor(Disk.yaml_tag, Disk.from_yaml)
    
    119 167
         yaml.SafeLoader.add_constructor(LRU.yaml_tag, LRU.from_yaml)
    
    120 168
         yaml.SafeLoader.add_constructor(S3.yaml_tag, S3.from_yaml)
    
    169
    +    yaml.SafeLoader.add_constructor(Remote.yaml_tag, Remote.from_yaml)
    
    121 170
         yaml.SafeLoader.add_constructor(WithCache.yaml_tag, WithCache.from_yaml)
    
    122 171
         yaml.SafeLoader.add_constructor(CAS.yaml_tag, CAS.from_yaml)
    
    123 172
         yaml.SafeLoader.add_constructor(ByteStream.yaml_tag, ByteStream.from_yaml)
    

  • buildgrid/_app/settings/remote-storage.yml
    1
    +server:
    
    2
    +  port: 50051
    
    3
    +  insecure-mode: true
    
    4
    +  credentials:
    
    5
    +    tls-server-key: null
    
    6
    +    tls-server-cert: null
    
    7
    +    tls-client-certs: null
    
    8
    +
    
    9
    +
    
    10
    +description: |
    
    11
    +  A single default instance with remote storage.
    
    12
    +
    
    13
    +instances:
    
    14
    +  - name: main
    
    15
    +    description: |
    
    16
    +      The main server
    
    17
    +
    
    18
    +    storages:
    
    19
    +        - !remote-storage &main-storage
    
    20
    +          url: "http://localhost:50052"
    
    21
    +          instance_name: main
    
    22
    +          credentials:
    
    23
    +            tls-client-key: null
    
    24
    +            tls-client-cert: null
    
    25
    +            tls-server-cert: null
    
    26
    +
    
    27
    +    services:
    
    28
    +      - !action-cache &main-action
    
    29
    +        storage: *main-storage
    
    30
    +        max_cached_refs: 256
    
    31
    +        allow_updates: true
    
    32
    +
    
    33
    +      - !execution
    
    34
    +        storage: *main-storage
    
    35
    +        action_cache: *main-action
    
    36
    +
    
    37
    +      - !cas
    
    38
    +        storage: *main-storage
    
    39
    +
    
    40
    +      - !bytestream
    
    41
    +        storage: *main-storage

  • buildgrid/_exceptions.py
    ... ... @@ -50,3 +50,27 @@ class ServerError(BgdError):
    50 50
     class BotError(BgdError):
    
    51 51
         def __init__(self, message, detail=None, reason=None):
    
    52 52
             super().__init__(message, detail=detail, domain=ErrorDomain.BOT, reason=reason)
    
    53
    +
    
    54
    +
    
    55
    +class InvalidArgumentError(BgdError):
    
    56
    +    """A bad argument was passed, such as a name which doesn't exist."""
    
    57
    +    def __init__(self, message, detail=None, reason=None):
    
    58
    +        super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
    
    59
    +
    
    60
    +
    
    61
    +class NotFoundError(BgdError):
    
    62
    +    """Requested resource not found."""
    
    63
    +    def __init__(self, message, detail=None, reason=None):
    
    64
    +        super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
    
    65
    +
    
    66
    +
    
    67
    +class OutOfSyncError(BgdError):
    
    68
    +    """The worker is out of sync with the server, such as having a differing number of leases."""
    
    69
    +    def __init__(self, message, detail=None, reason=None):
    
    70
    +        super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
    
    71
    +
    
    72
    +
    
    73
    +class OutOfRangeError(BgdError):
    
    74
    +    """ ByteStream service read data out of range."""
    
    75
    +    def __init__(self, message, detail=None, reason=None):
    
    76
    +        super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)

  • buildgrid/client/cas.py
    ... ... @@ -14,12 +14,452 @@
    14 14
     
    
    15 15
     
    
    16 16
     from contextlib import contextmanager
    
    17
    +import io
    
    17 18
     import uuid
    
    18 19
     import os
    
    20
    +import stat
    
    19 21
     
    
    20
    -from buildgrid.settings import HASH
    
    22
    +import grpc
    
    23
    +
    
    24
    +from buildgrid._exceptions import NotFoundError
    
    21 25
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    22 26
     from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    27
    +from buildgrid._protos.google.rpc import code_pb2
    
    28
    +from buildgrid.settings import HASH
    
    29
    +from buildgrid.utils import write_file
    
    30
    +
    
    31
    +
    
    32
    +# Maximum size for a queueable file:
    
    33
    +__FILE_SIZE_THRESHOLD = 1 * 1024 * 1024
    
    34
    +
    
    35
    +# Maximum size for a single gRPC request:
    
    36
    +__MAX_REQUEST_SIZE = 2 * 1024 * 1024
    
    37
    +
    
    38
    +# Maximum number of elements per gRPC request:
    
    39
    +__MAX_REQUEST_COUNT = 500
    
    40
    +
    
    41
    +
    
    42
    +class CallCache:
    
    43
    +    """Per remote grpc.StatusCode.UNIMPLEMENTED call cache."""
    
    44
    +    __calls = dict()
    
    45
    +
    
    46
    +    @classmethod
    
    47
    +    def mark_unimplemented(cls, channel, name):
    
    48
    +        if channel not in cls.__calls:
    
    49
    +            cls.__calls[channel] = set()
    
    50
    +        cls.__calls[channel].add(name)
    
    51
    +
    
    52
    +    @classmethod
    
    53
    +    def unimplemented(cls, channel, name):
    
    54
    +        if channel not in cls.__calls:
    
    55
    +            return True
    
    56
    +        return name in cls.__calls[channel]
    
    57
    +
    
    58
    +
    
    59
    +@contextmanager
    
    60
    +def download(channel, instance=None, u_uid=None):
    
    61
    +    downloader = Downloader(channel, instance=instance)
    
    62
    +    try:
    
    63
    +        yield downloader
    
    64
    +    finally:
    
    65
    +        downloader.close()
    
    66
    +
    
    67
    +
    
    68
    +class Downloader:
    
    69
    +    """Remote CAS files, directories and messages download helper.
    
    70
    +
    
    71
    +    The :class:`Downloader` class comes with a generator factory function that
    
    72
    +    can be used together with the `with` statement for context management::
    
    73
    +
    
    74
    +        with download(channel, instance='build') as cas:
    
    75
    +            cas.get_message(message_digest)
    
    76
    +    """
    
    77
    +
    
    78
    +    def __init__(self, channel, instance=None):
    
    79
    +        """Initializes a new :class:`Downloader` instance.
    
    80
    +
    
    81
    +        Args:
    
    82
    +            channel (grpc.Channel): A gRPC channel to the CAS endpoint.
    
    83
    +            instance (str, optional): the targeted instance's name.
    
    84
    +        """
    
    85
    +        self.channel = channel
    
    86
    +
    
    87
    +        self.instance_name = instance
    
    88
    +
    
    89
    +        self.__bytestream_stub = bytestream_pb2_grpc.ByteStreamStub(self.channel)
    
    90
    +        self.__cas_stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
    
    91
    +
    
    92
    +        self.__file_requests = dict()
    
    93
    +        self.__file_request_count = 0
    
    94
    +        self.__file_request_size = 0
    
    95
    +        self.__file_response_size = 0
    
    96
    +
    
    97
    +    ## Public API:
    
    98
    +
    
    99
    +    def get_blob(self, digest):
    
    100
    +        """Retrieves a blob from the remote CAS server.
    
    101
    +
    
    102
    +        Args:
    
    103
    +            digest (:obj:`Digest`): the blob's digest to fetch.
    
    104
    +
    
    105
    +        Returns:
    
    106
    +            bytearray: the fetched blob data or None if not found.
    
    107
    +        """
    
    108
    +        try:
    
    109
    +            blob = self._fetch_blob(digest)
    
    110
    +        except NotFoundError:
    
    111
    +            return None
    
    112
    +
    
    113
    +        return blob
    
    114
    +
    
    115
    +    def get_blobs(self, digests):
    
    116
    +        """Retrieves a list of blobs from the remote CAS server.
    
    117
    +
    
    118
    +        Args:
    
    119
    +            digests (list): list of :obj:`Digest`s for the blobs to fetch.
    
    120
    +
    
    121
    +        Returns:
    
    122
    +            list: the fetched blob data list.
    
    123
    +        """
    
    124
    +        return self._fetch_blob_batch(digests)
    
    125
    +
    
    126
    +    def get_message(self, digest, message):
    
    127
    +        """Retrieves a :obj:`Message` from the remote CAS server.
    
    128
    +
    
    129
    +        Args:
    
    130
    +            digest (:obj:`Digest`): the message's digest to fetch.
    
    131
    +            message (:obj:`Message`): an empty message to fill.
    
    132
    +
    
    133
    +        Returns:
    
    134
    +            :obj:`Message`: `message` filled or emptied if not found.
    
    135
    +        """
    
    136
    +        try:
    
    137
    +            message_blob = self._fetch_blob(digest)
    
    138
    +        except NotFoundError:
    
    139
    +            message_blob = None
    
    140
    +
    
    141
    +        if message_blob is not None:
    
    142
    +            message.ParseFromString(message_blob)
    
    143
    +        else:
    
    144
    +            message.Clear()
    
    145
    +
    
    146
    +        return message
    
    147
    +
    
    148
    +    def get_messages(self, digests, messages):
    
    149
    +        """Retrieves a list of :obj:`Message`s from the remote CAS server.
    
    150
    +
    
    151
    +        Note:
    
    152
    +            The `digests` and `messages` list **must** contain the same number
    
    153
    +            of elements.
    
    154
    +
    
    155
    +        Args:
    
    156
    +            digests (list):  list of :obj:`Digest`s for the messages to fetch.
    
    157
    +            messages (list): list of empty :obj:`Message`s to fill.
    
    158
    +
    
    159
    +        Returns:
    
    160
    +            list: the fetched and filled message list.
    
    161
    +        """
    
    162
    +        assert len(digests) == len(messages)
    
    163
    +
    
    164
    +        message_blobs = self._fetch_blob_batch(digests)
    
    165
    +
    
    166
    +        assert len(message_blobs) == len(messages)
    
    167
    +
    
    168
    +        for message, message_blob in zip(messages, message_blobs):
    
    169
    +            message.ParseFromString(message_blob)
    
    170
    +
    
    171
    +        return messages
    
    172
    +
    
    173
    +    def download_file(self, digest, file_path, queue=True):
    
    174
    +        """Retrieves a file from the remote CAS server.
    
    175
    +
    
    176
    +        If queuing is allowed (`queue=True`), the download request **may** be
    
    177
    +        defer. An explicit call to :method:`flush` can force the request to be
    
    178
    +        send immediately (along with the rest of the queued batch).
    
    179
    +
    
    180
    +        Args:
    
    181
    +            digest (:obj:`Digest`): the file's digest to fetch.
    
    182
    +            file_path (str): absolute or relative path to the local file to write.
    
    183
    +            queue (bool, optional): whether or not the download request may be
    
    184
    +                queued and submitted as part of a batch upload request. Defaults
    
    185
    +                to True.
    
    186
    +
    
    187
    +        Raises:
    
    188
    +            NotFoundError: if `digest` is not present in the remote CAS server.
    
    189
    +            OSError: if `file_path` does not exist or is not readable.
    
    190
    +        """
    
    191
    +        if not os.path.isabs(file_path):
    
    192
    +            file_path = os.path.abspath(file_path)
    
    193
    +
    
    194
    +        if not queue or digest.size_bytes > __FILE_SIZE_THRESHOLD:
    
    195
    +            self._fetch_file(digest, file_path)
    
    196
    +        else:
    
    197
    +            self._queue_file(digest, file_path)
    
    198
    +
    
    199
    +    def download_directory(self, digest, directory_path):
    
    200
    +        """Retrieves a :obj:`Directory` from the remote CAS server.
    
    201
    +
    
    202
    +        Args:
    
    203
    +            digest (:obj:`Digest`): the directory's digest to fetch.
    
    204
    +
    
    205
    +        Returns:
    
    206
    +            :obj:`Digest`: The digest of the :obj:`Directory`.
    
    207
    +            directory_path (str): absolute or relative path to the local
    
    208
    +                directory to write.
    
    209
    +
    
    210
    +        Raises:
    
    211
    +            NotFoundError: if `digest` is not present in the remote CAS server.
    
    212
    +            FileExistsError: if `directory_path` already contains parts of their
    
    213
    +                fetched directory's content.
    
    214
    +        """
    
    215
    +        if not os.path.isabs(directory_path):
    
    216
    +            directory_path = os.path.abspath(directory_path)
    
    217
    +
    
    218
    +        # We want to start fresh here, the rest is very synchronous...
    
    219
    +        self.flush()
    
    220
    +
    
    221
    +        self._fetch_directory(digest, directory_path)
    
    222
    +
    
    223
    +    def flush(self):
    
    224
    +        """Ensures any queued request gets sent."""
    
    225
    +        if self.__file_requests:
    
    226
    +            self._fetch_batch()
    
    227
    +
    
    228
    +    def close(self):
    
    229
    +        """Closes the underlying connection stubs.
    
    230
    +
    
    231
    +        Note:
    
    232
    +            This will always send pending requests before closing connections,
    
    233
    +            if any.
    
    234
    +        """
    
    235
    +        self.flush()
    
    236
    +
    
    237
    +        self.__bytestream_stub = None
    
    238
    +        self.__cas_stub = None
    
    239
    +
    
    240
    +    ## Private API:
    
    241
    +
    
    242
    +    def _fetch_blob(self, digest):
    
    243
    +        """Fetches a blob using ByteStream.Read()"""
    
    244
    +        read_blob = bytearray()
    
    245
    +
    
    246
    +        if self.instance_name is not None:
    
    247
    +            resource_name = '/'.join([self.instance_name, 'blobs',
    
    248
    +                                      digest.hash, str(digest.size_bytes)])
    
    249
    +        else:
    
    250
    +            resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
    
    251
    +
    
    252
    +        read_request = bytestream_pb2.ReadRequest()
    
    253
    +        read_request.resource_name = resource_name
    
    254
    +        read_request.read_offset = 0
    
    255
    +
    
    256
    +        try:
    
    257
    +            # TODO: Handle connection loss/recovery
    
    258
    +            for read_response in self.__bytestream_stub.Read(read_request):
    
    259
    +                read_blob += read_response.data
    
    260
    +
    
    261
    +            assert len(read_blob) == digest.size_bytes
    
    262
    +
    
    263
    +        except grpc.RpcError as e:
    
    264
    +            status_code = e.code()
    
    265
    +            if status_code == grpc.StatusCode.NOT_FOUND:
    
    266
    +                raise NotFoundError("Requested data does not exist on the remote.")
    
    267
    +
    
    268
    +            else:
    
    269
    +                assert False
    
    270
    +
    
    271
    +        return read_blob
    
    272
    +
    
    273
    +    def _fetch_blob_batch(self, digests):
    
    274
    +        """Fetches blobs using ContentAddressableStorage.BatchReadBlobs()"""
    
    275
    +        batch_fetched = False
    
    276
    +        read_blobs = list()
    
    277
    +
    
    278
    +        # First, try BatchReadBlobs(), if not already known not being implemented:
    
    279
    +        if not CallCache.unimplemented(self.channel, 'BatchReadBlobs'):
    
    280
    +            batch_request = remote_execution_pb2.BatchReadBlobsRequest()
    
    281
    +            batch_request.digests.extend(digests)
    
    282
    +            if self.instance_name is not None:
    
    283
    +                batch_request.instance_name = self.instance_name
    
    284
    +
    
    285
    +            try:
    
    286
    +                batch_response = self.__cas_stub.BatchReadBlobs(batch_request)
    
    287
    +                for response in batch_response.responses:
    
    288
    +                    assert response.digest.hash in digests
    
    289
    +
    
    290
    +                    read_blobs.append(response.data)
    
    291
    +
    
    292
    +                    if response.status.code != code_pb2.OK:
    
    293
    +                        assert False
    
    294
    +
    
    295
    +                batch_fetched = True
    
    296
    +
    
    297
    +            except grpc.RpcError as e:
    
    298
    +                status_code = e.code()
    
    299
    +                if status_code == grpc.StatusCode.UNIMPLEMENTED:
    
    300
    +                    CallCache.mark_unimplemented(self.channel, 'BatchReadBlobs')
    
    301
    +
    
    302
    +                else:
    
    303
    +                    assert False
    
    304
    +
    
    305
    +        # Fallback to Read() if no BatchReadBlobs():
    
    306
    +        if not batch_fetched:
    
    307
    +            for digest in digests:
    
    308
    +                read_blobs.append(self._fetch_blob(digest))
    
    309
    +
    
    310
    +        return read_blobs
    
    311
    +
    
    312
    +    def _fetch_file(self, digest, file_path):
    
    313
    +        """Fetches a file using ByteStream.Read()"""
    
    314
    +        if self.instance_name is not None:
    
    315
    +            resource_name = '/'.join([self.instance_name, 'blobs',
    
    316
    +                                      digest.hash, str(digest.size_bytes)])
    
    317
    +        else:
    
    318
    +            resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
    
    319
    +
    
    320
    +        read_request = bytestream_pb2.ReadRequest()
    
    321
    +        read_request.resource_name = resource_name
    
    322
    +        read_request.read_offset = 0
    
    323
    +
    
    324
    +        with open(file_path, 'wb') as byte_file:
    
    325
    +            # TODO: Handle connection loss/recovery
    
    326
    +            for read_response in self.__bytestream_stub.Read(read_request):
    
    327
    +                byte_file.write(read_response.data)
    
    328
    +
    
    329
    +            assert byte_file.tell() == digest.size_bytes
    
    330
    +
    
    331
    +    def _queue_file(self, digest, file_path):
    
    332
    +        """Queues a file for later batch download"""
    
    333
    +        if self.__file_request_size + digest.ByteSize() > __MAX_REQUEST_SIZE:
    
    334
    +            self._fetch_file_batch(self.__file_requests.values())
    
    335
    +        elif self.__file_response_size + digest.size_bytes > __MAX_REQUEST_SIZE:
    
    336
    +            self._fetch_file_batch(self.__file_requests.values())
    
    337
    +        elif self.__file_request_count >= __MAX_REQUEST_COUNT:
    
    338
    +            self._fetch_file_batch(self.__file_requests.values())
    
    339
    +
    
    340
    +        self.__file_requests[digest.hash] = (digest, file_path)
    
    341
    +        self.__file_request_count += 1
    
    342
    +        self.__file_request_size += digest.ByteSize()
    
    343
    +        self.__file_response_size += digest.size_bytes
    
    344
    +
    
    345
    +    def _fetch_file_batch(self, digests_paths):
    
    346
    +        """Sends queued data using ContentAddressableStorage.BatchReadBlobs()"""
    
    347
    +        batch_digests = [digest for digest, _ in digests_paths]
    
    348
    +        batch_blobs = self._fetch_blob_batch(batch_digests)
    
    349
    +
    
    350
    +        for (_, file_path), file_blob in zip(digests_paths, batch_blobs):
    
    351
    +            self._write_file(file_blob, file_path)
    
    352
    +
    
    353
    +        self.__file_requests.clear()
    
    354
    +        self.__file_request_count = 0
    
    355
    +        self.__file_request_size = 0
    
    356
    +        self.__file_response_size = 0
    
    357
    +
    
    358
    +    def _write_file(self, blob, file_path, create_parent=False):
    
    359
    +        """Dumps a memory blob to a local file"""
    
    360
    +        if create_parent:
    
    361
    +            os.makedirs(os.path.dirname(file_path), exist_ok=True)
    
    362
    +
    
    363
    +        write_file(file_path, blob)
    
    364
    +
    
    365
    +    def _fetch_directory(self, digest, directory_path):
    
    366
    +        """Fetches a file using ByteStream.GetTree()"""
    
    367
    +        # Better fail early if the local root path cannot be created:
    
    368
    +        os.makedirs(directory_path, exist_ok=True)
    
    369
    +
    
    370
    +        directories = dict()
    
    371
    +        directory_fetched = False
    
    372
    +        # First, try GetTree() if not known to be unimplemented yet:
    
    373
    +        if not CallCache.unimplemented(self.channel, 'GetTree'):
    
    374
    +            tree_request = remote_execution_pb2.GetTreeRequest()
    
    375
    +            tree_request.root_digest.CopyFrom(digest)
    
    376
    +            tree_request.page_size = __MAX_REQUEST_COUNT
    
    377
    +            if self.instance_name is not None:
    
    378
    +                tree_request.instance_name = self.instance_name
    
    379
    +
    
    380
    +            try:
    
    381
    +                tree_fetched = False
    
    382
    +                while not tree_fetched:
    
    383
    +                    tree_response = self.__cas_stub.GetTree(tree_request)
    
    384
    +                    for directory in tree_response.directories:
    
    385
    +                        directory_blob = directory.SerializeToString()
    
    386
    +                        directory_hash = HASH(directory_blob).hexdigest()
    
    387
    +
    
    388
    +                        directories[directory_hash] = directory
    
    389
    +
    
    390
    +                    if tree_response.next_page_token:
    
    391
    +                        tree_request = remote_execution_pb2.BatchReadBlobsRequest()
    
    392
    +                        tree_request.root_digest.CopyFrom(digest)
    
    393
    +                        tree_request.page_size = __MAX_REQUEST_COUNT
    
    394
    +                        tree_request.page_token = tree_response.next_page_token
    
    395
    +
    
    396
    +                    else:
    
    397
    +                        tree_fetched = True
    
    398
    +
    
    399
    +                assert digest.hash in directories
    
    400
    +
    
    401
    +            except grpc.RpcError as e:
    
    402
    +                status_code = e.code()
    
    403
    +                if status_code == grpc.StatusCode.UNIMPLEMENTED:
    
    404
    +                    CallCache.mark_unimplemented(self.channel, 'BatchUpdateBlobs')
    
    405
    +
    
    406
    +                elif status_code == grpc.StatusCode.NOT_FOUND:
    
    407
    +                    raise NotFoundError("Requested directory does not exist on the remote.")
    
    408
    +
    
    409
    +                else:
    
    410
    +                    assert False
    
    411
    +
    
    412
    +            directory = directories[digest.hash]
    
    413
    +
    
    414
    +            self._write_directory(digest.hash, directory_path,
    
    415
    +                                  directories=directories, root_barrier=directory_path)
    
    416
    +            directory_fetched = True
    
    417
    +
    
    418
    +        # TODO: Try with BatchReadBlobs().
    
    419
    +
    
    420
    +        # Fallback to Read() if no GetTree():
    
    421
    +        if not directory_fetched:
    
    422
    +            directory = remote_execution_pb2.Directory()
    
    423
    +            directory.ParseFromString(self._fetch_blob(digest))
    
    424
    +
    
    425
    +            self._write_directory(directory, directory_path,
    
    426
    +                                  root_barrier=directory_path)
    
    427
    +
    
    428
    +    def _write_directory(self, root_directory, root_path, directories=None, root_barrier=None):
    
    429
    +        """Generates a local directory structure"""
    
    430
    +        for file_node in root_directory.files:
    
    431
    +            file_path = os.path.join(root_path, file_node.name)
    
    432
    +
    
    433
    +            self._queue_file(file_node.digest, file_path)
    
    434
    +
    
    435
    +        for directory_node in root_directory.directories:
    
    436
    +            directory_path = os.path.join(root_path, directory_node.name)
    
    437
    +            if directories and directory_node.digest.hash in directories:
    
    438
    +                directory = directories[directory_node.digest.hash]
    
    439
    +            else:
    
    440
    +                directory = remote_execution_pb2.Directory()
    
    441
    +                directory.ParseFromString(self._fetch_blob(digest))
    
    442
    +
    
    443
    +            os.makedirs(directory_path, exist_ok=True)
    
    444
    +
    
    445
    +            self._write_directory(directory, directory_path,
    
    446
    +                                  directories=directories, root_barrier=root_barrier)
    
    447
    +
    
    448
    +        for symlink_node in root_directory.symlinks:
    
    449
    +            symlink_path = os.path.join(root_path, symlink_node.name)
    
    450
    +            if not os.path.isabs(symlink_node.target):
    
    451
    +                target_path = os.path.join(root_path, symlink_node.target)
    
    452
    +            else:
    
    453
    +                target_path = symlink_node.target
    
    454
    +            target_path = os.path.normpath(target_path)
    
    455
    +
    
    456
    +            # Do not create links pointing outside the barrier:
    
    457
    +            if root_barrier is not None:
    
    458
    +                common_path = os.path.commonprefix([root_barrier, target_path])
    
    459
    +                if not common_path.startswith(root_barrier):
    
    460
    +                    continue
    
    461
    +
    
    462
    +            os.symlink(symlink_path, target_path)
    
    23 463
     
    
    24 464
     
    
    25 465
     @contextmanager
    
    ... ... @@ -39,15 +479,8 @@ class Uploader:
    39 479
     
    
    40 480
             with upload(channel, instance='build') as cas:
    
    41 481
                 cas.upload_file('/path/to/local/file')
    
    42
    -
    
    43
    -    Attributes:
    
    44
    -        FILE_SIZE_THRESHOLD (int): maximum size for a queueable file.
    
    45
    -        MAX_REQUEST_SIZE (int): maximum size for a single gRPC request.
    
    46 482
         """
    
    47 483
     
    
    48
    -    FILE_SIZE_THRESHOLD = 1 * 1024 * 1024
    
    49
    -    MAX_REQUEST_SIZE = 2 * 1024 * 1024
    
    50
    -
    
    51 484
         def __init__(self, channel, instance=None, u_uid=None):
    
    52 485
             """Initializes a new :class:`Uploader` instance.
    
    53 486
     
    
    ... ... @@ -95,7 +528,7 @@ class Uploader:
    95 528
             with open(file_path, 'rb') as bytes_steam:
    
    96 529
                 file_bytes = bytes_steam.read()
    
    97 530
     
    
    98
    -        if not queue or len(file_bytes) > Uploader.FILE_SIZE_THRESHOLD:
    
    531
    +        if not queue or len(file_bytes) > __FILE_SIZE_THRESHOLD:
    
    99 532
                 blob_digest = self._send_blob(file_bytes)
    
    100 533
             else:
    
    101 534
                 blob_digest = self._queue_blob(file_bytes)
    
    ... ... @@ -148,7 +581,7 @@ class Uploader:
    148 581
             blob_digest.hash = HASH(blob).hexdigest()
    
    149 582
             blob_digest.size_bytes = len(blob)
    
    150 583
     
    
    151
    -        if self.__request_size + len(blob) > Uploader.MAX_REQUEST_SIZE:
    
    584
    +        if self.__request_size + len(blob) > __MAX_REQUEST_SIZE:
    
    152 585
                 self._send_batch()
    
    153 586
     
    
    154 587
             update_request = remote_execution_pb2.BatchUpdateBlobsRequest.Request()
    
    ... ... @@ -156,7 +589,7 @@ class Uploader:
    156 589
             update_request.data = blob
    
    157 590
     
    
    158 591
             update_request_size = update_request.ByteSize()
    
    159
    -        if self.__request_size + update_request_size > Uploader.MAX_REQUEST_SIZE:
    
    592
    +        if self.__request_size + update_request_size > __MAX_REQUEST_SIZE:
    
    160 593
                 self._send_batch()
    
    161 594
     
    
    162 595
             self.__requests[update_request.digest.hash] = update_request
    

  • buildgrid/server/_exceptions.py deleted
    1
    -# Copyright (C) 2018 Bloomberg LP
    
    2
    -#
    
    3
    -# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    -# you may not use this file except in compliance with the License.
    
    5
    -# You may obtain a copy of the License at
    
    6
    -#
    
    7
    -#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    -#
    
    9
    -# Unless required by applicable law or agreed to in writing, software
    
    10
    -# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    -# See the License for the specific language governing permissions and
    
    13
    -# limitations under the License.
    
    14
    -
    
    15
    -
    
    16
    -from .._exceptions import BgdError, ErrorDomain
    
    17
    -
    
    18
    -
    
    19
    -class InvalidArgumentError(BgdError):
    
    20
    -    """A bad argument was passed, such as a name which doesn't exist.
    
    21
    -    """
    
    22
    -
    
    23
    -    def __init__(self, message, detail=None, reason=None):
    
    24
    -        super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
    
    25
    -
    
    26
    -
    
    27
    -class NotFoundError(BgdError):
    
    28
    -    """Requested resource not found.
    
    29
    -    """
    
    30
    -
    
    31
    -    def __init__(self, message, detail=None, reason=None):
    
    32
    -        super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
    
    33
    -
    
    34
    -
    
    35
    -class OutofSyncError(BgdError):
    
    36
    -    """The worker is out of sync with the server, such as having a differing number of leases.
    
    37
    -    """
    
    38
    -
    
    39
    -    def __init__(self, message, detail=None, reason=None):
    
    40
    -        super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
    
    41
    -
    
    42
    -
    
    43
    -class OutOfRangeError(BgdError):
    
    44
    -    """ ByteStream service read data out of range
    
    45
    -    """
    
    46
    -
    
    47
    -    def __init__(self, message, detail=None, reason=None):
    
    48
    -        super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)

  • buildgrid/server/actioncache/service.py
    ... ... @@ -24,11 +24,10 @@ import logging
    24 24
     
    
    25 25
     import grpc
    
    26 26
     
    
    27
    +from buildgrid._exceptions import InvalidArgumentError, NotFoundError
    
    27 28
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    28 29
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
    
    29 30
     
    
    30
    -from .._exceptions import InvalidArgumentError, NotFoundError
    
    31
    -
    
    32 31
     
    
    33 32
     class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer):
    
    34 33
     
    

  • buildgrid/server/bots/instance.py
    ... ... @@ -23,7 +23,8 @@ Instance of the Remote Workers interface.
    23 23
     import logging
    
    24 24
     import uuid
    
    25 25
     
    
    26
    -from .._exceptions import InvalidArgumentError, OutofSyncError
    
    26
    +from buildgrid._exceptions import InvalidArgumentError, OutOfSyncError
    
    27
    +
    
    27 28
     from ..job import LeaseState
    
    28 29
     
    
    29 30
     
    
    ... ... @@ -105,7 +106,7 @@ class BotsInterface:
    105 106
                     # TODO: Lease was rejected
    
    106 107
                     raise NotImplementedError("'Not Accepted' is unsupported")
    
    107 108
                 else:
    
    108
    -                raise OutofSyncError("Server lease: {}. Client lease: {}".format(server_lease, client_lease))
    
    109
    +                raise OutOfSyncError("Server lease: {}. Client lease: {}".format(server_lease, client_lease))
    
    109 110
     
    
    110 111
             elif server_state == LeaseState.ACTIVE:
    
    111 112
     
    
    ... ... @@ -118,17 +119,17 @@ class BotsInterface:
    118 119
                     return None
    
    119 120
     
    
    120 121
                 else:
    
    121
    -                raise OutofSyncError("Server lease: {}. Client lease: {}".format(server_lease, client_lease))
    
    122
    +                raise OutOfSyncError("Server lease: {}. Client lease: {}".format(server_lease, client_lease))
    
    122 123
     
    
    123 124
             elif server_state == LeaseState.COMPLETED:
    
    124
    -            raise OutofSyncError("Server lease: {}. Client lease: {}".format(server_lease, client_lease))
    
    125
    +            raise OutOfSyncError("Server lease: {}. Client lease: {}".format(server_lease, client_lease))
    
    125 126
     
    
    126 127
             elif server_state == LeaseState.CANCELLED:
    
    127 128
                 raise NotImplementedError("Cancelled states not supported yet")
    
    128 129
     
    
    129 130
             else:
    
    130 131
                 # Sould never get here
    
    131
    -            raise OutofSyncError("State now allowed: {}".format(server_state))
    
    132
    +            raise OutOfSyncError("State now allowed: {}".format(server_state))
    
    132 133
     
    
    133 134
             return client_lease
    
    134 135
     
    

  • buildgrid/server/bots/service.py
    ... ... @@ -25,11 +25,10 @@ import grpc
    25 25
     
    
    26 26
     from google.protobuf.empty_pb2 import Empty
    
    27 27
     
    
    28
    +from buildgrid._exceptions import InvalidArgumentError, OutOfSyncError
    
    28 29
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    29 30
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc
    
    30 31
     
    
    31
    -from .._exceptions import InvalidArgumentError, OutofSyncError
    
    32
    -
    
    33 32
     
    
    34 33
     class BotsService(bots_pb2_grpc.BotsServicer):
    
    35 34
     
    
    ... ... @@ -69,7 +68,7 @@ class BotsService(bots_pb2_grpc.BotsServicer):
    69 68
                 context.set_details(str(e))
    
    70 69
                 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    71 70
     
    
    72
    -        except OutofSyncError as e:
    
    71
    +        except OutOfSyncError as e:
    
    73 72
                 self.logger.error(e)
    
    74 73
                 context.set_details(str(e))
    
    75 74
                 context.set_code(grpc.StatusCode.DATA_LOSS)
    

  • buildgrid/server/cas/instance.py
    ... ... @@ -19,11 +19,10 @@ Storage Instances
    19 19
     Instances of CAS and ByteStream
    
    20 20
     """
    
    21 21
     
    
    22
    +from buildgrid._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
    
    22 23
     from buildgrid._protos.google.bytestream import bytestream_pb2
    
    23 24
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
    
    24
    -
    
    25
    -from .._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
    
    26
    -from ...settings import HASH
    
    25
    +from buildgrid.settings import HASH
    
    27 26
     
    
    28 27
     
    
    29 28
     class ContentAddressableStorageInstance:
    

  • buildgrid/server/cas/service.py
    ... ... @@ -26,12 +26,11 @@ import logging
    26 26
     
    
    27 27
     import grpc
    
    28 28
     
    
    29
    +from buildgrid._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
    
    29 30
     from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    30 31
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    31 32
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
    
    32 33
     
    
    33
    -from .._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
    
    34
    -
    
    35 34
     
    
    36 35
     class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
    
    37 36
     
    
    ... ... @@ -89,15 +88,15 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
    89 88
                 # TODO: Decide on default instance name
    
    90 89
                 if path[0] == "blobs":
    
    91 90
                     if len(path) < 3 or not path[2].isdigit():
    
    92
    -                    raise InvalidArgumentError("Invalid resource name: {}".format(context.resource_name))
    
    91
    +                    raise InvalidArgumentError("Invalid resource name: {}".format(request.resource_name))
    
    93 92
                     instance_name = ""
    
    94 93
     
    
    95 94
                 elif path[1] == "blobs":
    
    96 95
                     if len(path) < 4 or not path[3].isdigit():
    
    97
    -                    raise InvalidArgumentError("Invalid resource name: {}".format(context.resource_name))
    
    96
    +                    raise InvalidArgumentError("Invalid resource name: {}".format(request.resource_name))
    
    98 97
     
    
    99 98
                 else:
    
    100
    -                raise InvalidArgumentError("Invalid resource name: {}".format(context.resource_name))
    
    99
    +                raise InvalidArgumentError("Invalid resource name: {}".format(request.resource_name))
    
    101 100
     
    
    102 101
                 instance = self._get_instance(instance_name)
    
    103 102
                 yield from instance.read(path,
    
    ... ... @@ -134,15 +133,15 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
    134 133
                 # TODO: Sort out no instance name
    
    135 134
                 if path[0] == "uploads":
    
    136 135
                     if len(path) < 5 or path[2] != "blobs" or not path[4].isdigit():
    
    137
    -                    raise InvalidArgumentError("Invalid resource name: {}".format(context.resource_name))
    
    136
    +                    raise InvalidArgumentError("Invalid resource name: {}".format(first_request.resource_name))
    
    138 137
                     instance_name = ""
    
    139 138
     
    
    140 139
                 elif path[1] == "uploads":
    
    141 140
                     if len(path) < 6 or path[3] != "blobs" or not path[5].isdigit():
    
    142
    -                    raise InvalidArgumentError("Invalid resource name: {}".format(context.resource_name))
    
    141
    +                    raise InvalidArgumentError("Invalid resource name: {}".format(first_request.resource_name))
    
    143 142
     
    
    144 143
                 else:
    
    145
    -                raise InvalidArgumentError("Invalid resource name: {}".format(context.resource_name))
    
    144
    +                raise InvalidArgumentError("Invalid resource name: {}".format(first_request.resource_name))
    
    146 145
     
    
    147 146
                 instance = self._get_instance(instance_name)
    
    148 147
                 return instance.write(requests)
    

  • buildgrid/server/cas/storage/remote.py
    ... ... @@ -23,18 +23,28 @@ Forwwards storage requests to a remote storage.
    23 23
     import io
    
    24 24
     import logging
    
    25 25
     
    
    26
    +<<<<<<< HEAD
    
    27
    +import grpc
    
    28
    +
    
    26 29
     from buildgrid.utils import gen_fetch_blob, gen_write_request_blob
    
    30
    +=======
    
    31
    +from buildgrid.client.cas import download
    
    32
    +>>>>>>> storage/remote.py: Port to new CAS downloader helper
    
    27 33
     from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
    
    28 34
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    35
    +from buildgrid.utils import gen_write_request_blob
    
    29 36
     
    
    30 37
     from .storage_abc import StorageABC
    
    31 38
     
    
    32 39
     
    
    33 40
     class RemoteStorage(StorageABC):
    
    34 41
     
    
    35
    -    def __init__(self, channel, instance_name=""):
    
    42
    +    def __init__(self, channel, instance_name):
    
    36 43
             self.logger = logging.getLogger(__name__)
    
    37
    -        self._instance_name = instance_name
    
    44
    +
    
    45
    +        self.instance_name = instance_name
    
    46
    +        self.channel = channel
    
    47
    +
    
    38 48
             self._stub_bs = bytestream_pb2_grpc.ByteStreamStub(channel)
    
    39 49
             self._stub_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(channel)
    
    40 50
     
    
    ... ... @@ -44,18 +54,8 @@ class RemoteStorage(StorageABC):
    44 54
             return False
    
    45 55
     
    
    46 56
         def get_blob(self, digest):
    
    47
    -        fetched_data = io.BytesIO()
    
    48
    -        length = 0
    
    49
    -        for data in gen_fetch_blob(self._stub_bs, digest, self._instance_name):
    
    50
    -            length += fetched_data.write(data)
    
    51
    -
    
    52
    -        if length:
    
    53
    -            assert digest.size_bytes == length
    
    54
    -            fetched_data.seek(0)
    
    55
    -            return fetched_data
    
    56
    -
    
    57
    -        else:
    
    58
    -            return None
    
    57
    +        with download(self.channel, instance=self.instance_name) as cas:
    
    58
    +            return io.BytesIO(cas.get_blob(digest))
    
    59 59
     
    
    60 60
         def begin_write(self, digest):
    
    61 61
             return io.BytesIO(digest.SerializeToString())
    

  • buildgrid/server/execution/instance.py
    ... ... @@ -21,10 +21,10 @@ An instance of the Remote Execution Service.
    21 21
     
    
    22 22
     import logging
    
    23 23
     
    
    24
    +from buildgrid._exceptions import InvalidArgumentError
    
    24 25
     from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
    
    25 26
     
    
    26 27
     from ..job import Job
    
    27
    -from .._exceptions import InvalidArgumentError
    
    28 28
     
    
    29 29
     
    
    30 30
     class ExecutionInstance:
    

  • buildgrid/server/execution/service.py
    ... ... @@ -26,12 +26,10 @@ from functools import partial
    26 26
     
    
    27 27
     import grpc
    
    28 28
     
    
    29
    +from buildgrid._exceptions import InvalidArgumentError
    
    29 30
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
    
    30
    -
    
    31 31
     from buildgrid._protos.google.longrunning import operations_pb2
    
    32 32
     
    
    33
    -from .._exceptions import InvalidArgumentError
    
    34
    -
    
    35 33
     
    
    36 34
     class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    
    37 35
     
    

  • buildgrid/server/operations/instance.py
    ... ... @@ -21,7 +21,7 @@ An instance of the LongRunningOperations Service.
    21 21
     
    
    22 22
     import logging
    
    23 23
     
    
    24
    -from .._exceptions import InvalidArgumentError
    
    24
    +from buildgrid._exceptions import InvalidArgumentError
    
    25 25
     
    
    26 26
     
    
    27 27
     class OperationsInstance:
    

  • buildgrid/server/operations/service.py
    ... ... @@ -25,10 +25,9 @@ import grpc
    25 25
     
    
    26 26
     from google.protobuf.empty_pb2 import Empty
    
    27 27
     
    
    28
    +from buildgrid._exceptions import InvalidArgumentError
    
    28 29
     from buildgrid._protos.google.longrunning import operations_pb2_grpc, operations_pb2
    
    29 30
     
    
    30
    -from .._exceptions import InvalidArgumentError
    
    31
    -
    
    32 31
     
    
    33 32
     class OperationsService(operations_pb2_grpc.OperationsServicer):
    
    34 33
     
    

  • buildgrid/server/referencestorage/service.py
    ... ... @@ -17,11 +17,10 @@ import logging
    17 17
     
    
    18 18
     import grpc
    
    19 19
     
    
    20
    +from buildgrid._exceptions import InvalidArgumentError, NotFoundError
    
    20 21
     from buildgrid._protos.buildstream.v2 import buildstream_pb2
    
    21 22
     from buildgrid._protos.buildstream.v2 import buildstream_pb2_grpc
    
    22 23
     
    
    23
    -from .._exceptions import InvalidArgumentError, NotFoundError
    
    24
    -
    
    25 24
     
    
    26 25
     class ReferenceStorageService(buildstream_pb2_grpc.ReferenceStorageServicer):
    
    27 26
     
    

  • buildgrid/server/referencestorage/storage.py
    ... ... @@ -24,10 +24,9 @@ For a given key, it
    24 24
     
    
    25 25
     import collections
    
    26 26
     
    
    27
    +from buildgrid._exceptions import NotFoundError
    
    27 28
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    28 29
     
    
    29
    -from .._exceptions import NotFoundError
    
    30
    -
    
    31 30
     
    
    32 31
     class ReferenceCache:
    
    33 32
     
    

  • buildgrid/utils.py
    ... ... @@ -22,18 +22,6 @@ from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_p
    22 22
     from buildgrid._protos.google.bytestream import bytestream_pb2
    
    23 23
     
    
    24 24
     
    
    25
    -def gen_fetch_blob(stub, digest, instance_name=""):
    
    26
    -    """ Generates byte stream from a fetch blob request
    
    27
    -    """
    
    28
    -
    
    29
    -    resource_name = os.path.join(instance_name, 'blobs', digest.hash, str(digest.size_bytes))
    
    30
    -    request = bytestream_pb2.ReadRequest(resource_name=resource_name,
    
    31
    -                                         read_offset=0)
    
    32
    -
    
    33
    -    for response in stub.Read(request):
    
    34
    -        yield response.data
    
    35
    -
    
    36
    -
    
    37 25
     def gen_write_request_blob(digest_bytes, digest, instance_name=""):
    
    38 26
         """ Generates a bytestream write request
    
    39 27
         """
    
    ... ... @@ -60,74 +48,6 @@ def gen_write_request_blob(digest_bytes, digest, instance_name=""):
    60 48
             offset += chunk_size
    
    61 49
     
    
    62 50
     
    
    63
    -def write_fetch_directory(root_directory, stub, digest, instance_name=None):
    
    64
    -    """Locally replicates a directory from CAS.
    
    65
    -
    
    66
    -    Args:
    
    67
    -        root_directory (str): local directory to populate.
    
    68
    -        stub (): gRPC stub for CAS communication.
    
    69
    -        digest (Digest): digest for the directory to fetch from CAS.
    
    70
    -        instance_name (str, optional): farm instance name to query data from.
    
    71
    -    """
    
    72
    -
    
    73
    -    if not os.path.isabs(root_directory):
    
    74
    -        root_directory = os.path.abspath(root_directory)
    
    75
    -    if not os.path.exists(root_directory):
    
    76
    -        os.makedirs(root_directory, exist_ok=True)
    
    77
    -
    
    78
    -    directory = parse_to_pb2_from_fetch(remote_execution_pb2.Directory(),
    
    79
    -                                        stub, digest, instance_name)
    
    80
    -
    
    81
    -    for directory_node in directory.directories:
    
    82
    -        child_path = os.path.join(root_directory, directory_node.name)
    
    83
    -
    
    84
    -        write_fetch_directory(child_path, stub, directory_node.digest, instance_name)
    
    85
    -
    
    86
    -    for file_node in directory.files:
    
    87
    -        child_path = os.path.join(root_directory, file_node.name)
    
    88
    -
    
    89
    -        with open(child_path, 'wb') as child_file:
    
    90
    -            write_fetch_blob(child_file, stub, file_node.digest, instance_name)
    
    91
    -
    
    92
    -    for symlink_node in directory.symlinks:
    
    93
    -        child_path = os.path.join(root_directory, symlink_node.name)
    
    94
    -
    
    95
    -        if os.path.isabs(symlink_node.target):
    
    96
    -            continue  # No out of temp-directory links for now.
    
    97
    -        target_path = os.path.join(root_directory, symlink_node.target)
    
    98
    -
    
    99
    -        os.symlink(child_path, target_path)
    
    100
    -
    
    101
    -
    
    102
    -def write_fetch_blob(target_file, stub, digest, instance_name=None):
    
    103
    -    """Extracts a blob from CAS into a local file.
    
    104
    -
    
    105
    -    Args:
    
    106
    -        target_file (str): local file to write.
    
    107
    -        stub (): gRPC stub for CAS communication.
    
    108
    -        digest (Digest): digest for the blob to fetch from CAS.
    
    109
    -        instance_name (str, optional): farm instance name to query data from.
    
    110
    -    """
    
    111
    -
    
    112
    -    for stream in gen_fetch_blob(stub, digest, instance_name):
    
    113
    -        target_file.write(stream)
    
    114
    -    target_file.flush()
    
    115
    -
    
    116
    -    assert digest.size_bytes == os.fstat(target_file.fileno()).st_size
    
    117
    -
    
    118
    -
    
    119
    -def parse_to_pb2_from_fetch(pb2, stub, digest, instance_name=""):
    
    120
    -    """ Fetches stream and parses it into given pb2
    
    121
    -    """
    
    122
    -
    
    123
    -    stream_bytes = b''
    
    124
    -    for stream in gen_fetch_blob(stub, digest, instance_name):
    
    125
    -        stream_bytes += stream
    
    126
    -
    
    127
    -    pb2.ParseFromString(stream_bytes)
    
    128
    -    return pb2
    
    129
    -
    
    130
    -
    
    131 51
     def create_digest(bytes_to_digest):
    
    132 52
         """Computes the :obj:`Digest` of a piece of data.
    
    133 53
     
    

  • tests/cas/test_storage.py
    ... ... @@ -98,17 +98,6 @@ def instance(params):
    98 98
         return {params, MockCASStorage()}
    
    99 99
     
    
    100 100
     
    
    101
    -@pytest.fixture()
    
    102
    -@mock.patch.object(remote, 'bytestream_pb2_grpc')
    
    103
    -@mock.patch.object(remote, 'remote_execution_pb2_grpc')
    
    104
    -def remote_storage(mock_bs_grpc, mock_re_pb2_grpc):
    
    105
    -    mock_server = MockStubServer()
    
    106
    -    storage = remote.RemoteStorage(instance)
    
    107
    -    storage._stub_bs = mock_server
    
    108
    -    storage._stub_cas = mock_server
    
    109
    -    yield storage
    
    110
    -
    
    111
    -
    
    112 101
     # General tests for all storage providers
    
    113 102
     
    
    114 103
     
    
    ... ... @@ -138,7 +127,7 @@ def any_storage(request):
    138 127
             with mock.patch.object(remote, 'bytestream_pb2_grpc'):
    
    139 128
                 with mock.patch.object(remote, 'remote_execution_pb2_grpc'):
    
    140 129
                     mock_server = MockStubServer()
    
    141
    -                storage = remote.RemoteStorage(instance)
    
    130
    +                storage = remote.RemoteStorage(instance, "")
    
    142 131
                     storage._stub_bs = mock_server
    
    143 132
                     storage._stub_cas = mock_server
    
    144 133
                     yield storage
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]