[Notes] [Git][BuildGrid/buildgrid][mablanch/61-bazel-support] 5 commits: client/cas.py: Introduce CAS uploader helper class



Title: GitLab

Martin Blanchard pushed to branch mablanch/61-bazel-support at BuildGrid / buildgrid

Commits:

4 changed files:

Changes:

  • buildgrid/_app/bots/temp_directory.py
    ... ... @@ -19,71 +19,94 @@ import tempfile
    19 19
     
    
    20 20
     from google.protobuf import any_pb2
    
    21 21
     
    
    22
    -from buildgrid.utils import read_file, create_digest, write_fetch_directory, parse_to_pb2_from_fetch
    
    23
    -from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    22
    +from buildgrid.client.cas import upload
    
    23
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    24 24
     from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
    
    25
    +from buildgrid.utils import write_fetch_directory, parse_to_pb2_from_fetch
    
    26
    +from buildgrid.utils import output_file_maker, output_directory_maker
    
    25 27
     
    
    26 28
     
    
    27 29
     def work_temp_directory(context, lease):
    
    28
    -    """ Bot downloads directories and files into a temp directory,
    
    29
    -    then uploads results back to CAS
    
    30
    +    """Executes a lease for a build action, using host tools.
    
    30 31
         """
    
    31 32
     
    
    32
    -    parent = context.parent
    
    33 33
         stub_bytestream = bytestream_pb2_grpc.ByteStreamStub(context.cas_channel)
    
    34
    +    instance_name = context.parent
    
    35
    +    logger = context.logger
    
    34 36
     
    
    35 37
         action_digest = remote_execution_pb2.Digest()
    
    36 38
         lease.payload.Unpack(action_digest)
    
    37 39
     
    
    38
    -    action = remote_execution_pb2.Action()
    
    40
    +    action = parse_to_pb2_from_fetch(remote_execution_pb2.Action(),
    
    41
    +                                     stub_bytestream, action_digest, instance_name)
    
    39 42
     
    
    40
    -    action = parse_to_pb2_from_fetch(action, stub_bytestream, action_digest, parent)
    
    43
    +    with tempfile.TemporaryDirectory() as temp_directory:
    
    44
    +        command = parse_to_pb2_from_fetch(remote_execution_pb2.Command(),
    
    45
    +                                          stub_bytestream, action.command_digest, instance_name)
    
    41 46
     
    
    42
    -    with tempfile.TemporaryDirectory() as temp_dir:
    
    47
    +        write_fetch_directory(temp_directory, stub_bytestream,
    
    48
    +                              action.input_root_digest, instance_name)
    
    43 49
     
    
    44
    -        command = remote_execution_pb2.Command()
    
    45
    -        command = parse_to_pb2_from_fetch(command, stub_bytestream, action.command_digest, parent)
    
    46
    -
    
    47
    -        arguments = "cd {} &&".format(temp_dir)
    
    50
    +        environment = os.environ.copy()
    
    51
    +        for variable in command.environment_variables:
    
    52
    +            if variable.name not in ['PATH', 'PWD']:
    
    53
    +                environment[variable.name] = variable.value
    
    48 54
     
    
    55
    +        command_line = list()
    
    49 56
             for argument in command.arguments:
    
    50
    -            arguments += " {}".format(argument)
    
    51
    -
    
    52
    -        context.logger.info(arguments)
    
    53
    -
    
    54
    -        write_fetch_directory(temp_dir, stub_bytestream, action.input_root_digest, parent)
    
    55
    -
    
    56
    -        proc = subprocess.Popen(arguments,
    
    57
    -                                shell=True,
    
    58
    -                                stdin=subprocess.PIPE,
    
    59
    -                                stdout=subprocess.PIPE)
    
    60
    -
    
    61
    -        # TODO: Should return the std_out to the user
    
    62
    -        proc.communicate()
    
    63
    -
    
    64
    -        result = remote_execution_pb2.ActionResult()
    
    65
    -        requests = []
    
    66
    -        for output_file in command.output_files:
    
    67
    -            path = os.path.join(temp_dir, output_file)
    
    68
    -            chunk = read_file(path)
    
    69
    -
    
    70
    -            digest = create_digest(chunk)
    
    71
    -
    
    72
    -            result.output_files.extend([remote_execution_pb2.OutputFile(path=output_file,
    
    73
    -                                                                        digest=digest)])
    
    74
    -
    
    75
    -            requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
    
    76
    -                digest=digest, data=chunk))
    
    77
    -
    
    78
    -        request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=parent,
    
    79
    -                                                               requests=requests)
    
    80
    -
    
    81
    -        stub_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.cas_channel)
    
    82
    -        stub_cas.BatchUpdateBlobs(request)
    
    83
    -
    
    84
    -        result_any = any_pb2.Any()
    
    85
    -        result_any.Pack(result)
    
    86
    -
    
    87
    -        lease.result.CopyFrom(result_any)
    
    57
    +            command_line.append(argument.strip())
    
    58
    +
    
    59
    +        working_directory = None
    
    60
    +        if command.working_directory:
    
    61
    +            working_directory = os.path.join(temp_directory,
    
    62
    +                                             command.working_directory)
    
    63
    +            os.makedirs(working_directory, exist_ok=True)
    
    64
    +        else:
    
    65
    +            working_directory = temp_directory
    
    66
    +
    
    67
    +        # Ensure that output files structure exists:
    
    68
    +        for output_path in command.output_files:
    
    69
    +            directory_path = os.path.join(working_directory,
    
    70
    +                                          os.path.dirname(output_path))
    
    71
    +            os.makedirs(directory_path, exist_ok=True)
    
    72
    +
    
    73
    +        logger.debug(' '.join(command_line))
    
    74
    +
    
    75
    +        process = subprocess.Popen(command_line,
    
    76
    +                                   cwd=working_directory,
    
    77
    +                                   universal_newlines=True,
    
    78
    +                                   env=environment,
    
    79
    +                                   stdin=subprocess.PIPE,
    
    80
    +                                   stdout=subprocess.PIPE)
    
    81
    +        # TODO: Should return the stdout and stderr in the ActionResult.
    
    82
    +        process.communicate()
    
    83
    +
    
    84
    +        action_result = remote_execution_pb2.ActionResult()
    
    85
    +
    
    86
    +        with upload(context.cas_channel, instance=instance_name) as cas:
    
    87
    +            for output_path in command.output_files:
    
    88
    +                file_path = os.path.join(working_directory, output_path)
    
    89
    +                # Missing outputs should simply be omitted in ActionResult:
    
    90
    +                if not os.path.isfile(file_path):
    
    91
    +                    continue
    
    92
    +
    
    93
    +                output_file = output_file_maker(file_path, working_directory, cas=cas)
    
    94
    +                action_result.output_files.extend([output_file])
    
    95
    +
    
    96
    +            for output_path in command.output_directories:
    
    97
    +                directory_path = os.path.join(working_directory, output_path)
    
    98
    +                # Missing outputs should simply be omitted in ActionResult:
    
    99
    +                if not os.path.isdir(directory_path):
    
    100
    +                    continue
    
    101
    +
    
    102
    +                # OutputDirectory.path should be relative to the working direcory:
    
    103
    +                output_directory = output_directory_maker(directory_path, working_directory, cas=cas)
    
    104
    +
    
    105
    +                action_result.output_directories.extend([output_directory])
    
    106
    +
    
    107
    +        action_result_any = any_pb2.Any()
    
    108
    +        action_result_any.Pack(action_result)
    
    109
    +
    
    110
    +        lease.result.CopyFrom(action_result_any)
    
    88 111
     
    
    89 112
         return lease

  • buildgrid/client/cas.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +from contextlib import contextmanager
    
    17
    +import uuid
    
    18
    +import os
    
    19
    +
    
    20
    +from buildgrid.settings import HASH
    
    21
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    22
    +from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    23
    +
    
    24
    +
    
    25
    +@contextmanager
    
    26
    +def upload(channel, instance=None, u_uid=None):
    
    27
    +    uploader = Uploader(channel, instance=instance, u_uid=u_uid)
    
    28
    +    try:
    
    29
    +        yield uploader
    
    30
    +    finally:
    
    31
    +        uploader.flush()
    
    32
    +
    
    33
    +
    
    34
    +class Uploader:
    
    35
    +    """Remote CAS files, directories and messages upload helper.
    
    36
    +
    
    37
    +    The :class:`Uploader` class comes with a generator factory function that can
    
    38
    +    be used together with the `with` statement for context management::
    
    39
    +
    
    40
    +        with upload(channel, instance='build') as cas:
    
    41
    +            cas.upload_file('/path/to/local/file')
    
    42
    +
    
    43
    +    Attributes:
    
    44
    +        FILE_SIZE_THRESHOLD (int): maximum size for a queueable file.
    
    45
    +        MAX_REQUEST_SIZE (int): maximum size for a single gRPC request.
    
    46
    +    """
    
    47
    +
    
    48
    +    FILE_SIZE_THRESHOLD = 1 * 1024 * 1024
    
    49
    +    MAX_REQUEST_SIZE = 2 * 1024 * 1024
    
    50
    +
    
    51
    +    def __init__(self, channel, instance=None, u_uid=None):
    
    52
    +        """Initializes a new :class:`Uploader` instance.
    
    53
    +
    
    54
    +        Args:
    
    55
    +            channel (grpc.Channel): A gRPC channel to the CAS endpoint.
    
    56
    +            instance (str, optional): the targeted instance's name.
    
    57
    +            u_uid (str, optional): a UUID for CAS transactions.
    
    58
    +        """
    
    59
    +        self.channel = channel
    
    60
    +
    
    61
    +        self.instance_name = instance
    
    62
    +        if u_uid is not None:
    
    63
    +            self.u_uid = u_uid
    
    64
    +        else:
    
    65
    +            self.u_uid = str(uuid.uuid4())
    
    66
    +
    
    67
    +        self.__bytestream_stub = bytestream_pb2_grpc.ByteStreamStub(self.channel)
    
    68
    +        self.__cas_stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
    
    69
    +
    
    70
    +        self.__requests = dict()
    
    71
    +        self.__request_size = 0
    
    72
    +
    
    73
    +    def upload_file(self, file_path, queue=True):
    
    74
    +        """Stores a local file into the remote CAS storage.
    
    75
    +
    
    76
    +        If queuing is allowed (`queue=True`), the upload request **may** be
    
    77
    +        defer. An explicit call to :method:`flush` can force the request to be
    
    78
    +        send immediately (allong with the rest of the queued batch).
    
    79
    +
    
    80
    +        Args:
    
    81
    +            file_path (str): absolute or relative path to a local file.
    
    82
    +            queue (bool, optional): wheter or not the upload request may be
    
    83
    +                queued and submitted as part of a batch upload request. Defaults
    
    84
    +                to True.
    
    85
    +
    
    86
    +        Returns:
    
    87
    +            :obj:`Digest`: The digest of the file's content.
    
    88
    +
    
    89
    +        Raises:
    
    90
    +            OSError: If `file_path` does not exist or is not readable.
    
    91
    +        """
    
    92
    +        if not os.path.isabs(file_path):
    
    93
    +            file_path = os.path.abspath(file_path)
    
    94
    +
    
    95
    +        with open(file_path, 'rb') as bytes_steam:
    
    96
    +            file_bytes = bytes_steam.read()
    
    97
    +
    
    98
    +        if not queue or len(file_bytes) > Uploader.FILE_SIZE_THRESHOLD:
    
    99
    +            blob_digest = self._send_blob(file_bytes)
    
    100
    +        else:
    
    101
    +            blob_digest = self._queue_blob(file_bytes)
    
    102
    +
    
    103
    +        return blob_digest
    
    104
    +
    
    105
    +    def upload_directory(self, directory, queue=True):
    
    106
    +        """Stores a :obj:`Directory` into the remote CAS storage.
    
    107
    +
    
    108
    +        If queuing is allowed (`queue=True`), the upload request **may** be
    
    109
    +        defer. An explicit call to :method:`flush` can force the request to be
    
    110
    +        send immediately (allong with the rest of the queued batch).
    
    111
    +
    
    112
    +        Args:
    
    113
    +            directory (:obj:`Directory`): a :obj:`Directory` object.
    
    114
    +            queue (bool, optional): wheter or not the upload request may be
    
    115
    +                queued and submitted as part of a batch upload request. Defaults
    
    116
    +                to True.
    
    117
    +
    
    118
    +        Returns:
    
    119
    +            :obj:`Digest`: The digest of the :obj:`Directory`.
    
    120
    +        """
    
    121
    +        if not isinstance(directory, remote_execution_pb2.Directory):
    
    122
    +            raise TypeError
    
    123
    +
    
    124
    +        if not queue:
    
    125
    +            return self._send_blob(directory.SerializeToString())
    
    126
    +        else:
    
    127
    +            return self._queue_blob(directory.SerializeToString())
    
    128
    +
    
    129
    +    def send_message(self, message):
    
    130
    +        """Stores a message into the remote CAS storage.
    
    131
    +
    
    132
    +        Args:
    
    133
    +            message (:obj:`Message`): a protobuf message object.
    
    134
    +
    
    135
    +        Returns:
    
    136
    +            :obj:`Digest`: The digest of the message.
    
    137
    +        """
    
    138
    +        return self._send_blob(message.SerializeToString())
    
    139
    +
    
    140
    +    def flush(self):
    
    141
    +        """Ensures any queued request gets sent."""
    
    142
    +        if self.__requests:
    
    143
    +            self._send_batch()
    
    144
    +
    
    145
    +    def _queue_blob(self, blob):
    
    146
    +        """Queues a memory block for later batch upload"""
    
    147
    +        blob_digest = remote_execution_pb2.Digest()
    
    148
    +        blob_digest.hash = HASH(blob).hexdigest()
    
    149
    +        blob_digest.size_bytes = len(blob)
    
    150
    +
    
    151
    +        if self.__request_size + len(blob) > Uploader.MAX_REQUEST_SIZE:
    
    152
    +            self._send_batch()
    
    153
    +
    
    154
    +        update_request = remote_execution_pb2.BatchUpdateBlobsRequest.Request()
    
    155
    +        update_request.digest.CopyFrom(blob_digest)
    
    156
    +        update_request.data = blob
    
    157
    +
    
    158
    +        update_request_size = update_request.ByteSize()
    
    159
    +        if self.__request_size + update_request_size > Uploader.MAX_REQUEST_SIZE:
    
    160
    +            self._send_batch()
    
    161
    +
    
    162
    +        self.__requests[update_request.digest.hash] = update_request
    
    163
    +        self.__request_size += update_request_size
    
    164
    +
    
    165
    +        return blob_digest
    
    166
    +
    
    167
    +    def _send_blob(self, blob):
    
    168
    +        """Sends a memory block using ByteStream.Write()"""
    
    169
    +        blob_digest = remote_execution_pb2.Digest()
    
    170
    +        blob_digest.hash = HASH(blob).hexdigest()
    
    171
    +        blob_digest.size_bytes = len(blob)
    
    172
    +
    
    173
    +        if self.instance_name is not None:
    
    174
    +            resource_name = '/'.join([self.instance_name, 'uploads', self.u_uid, 'blobs',
    
    175
    +                                      blob_digest.hash, str(blob_digest.size_bytes)])
    
    176
    +        else:
    
    177
    +            resource_name = '/'.join(['uploads', self.u_uid, 'blobs',
    
    178
    +                                      blob_digest.hash, str(blob_digest.size_bytes)])
    
    179
    +
    
    180
    +        def __write_request_stream(resource, content):
    
    181
    +            offset = 0
    
    182
    +            finished = False
    
    183
    +            remaining = len(content)
    
    184
    +            while not finished:
    
    185
    +                chunk_size = min(remaining, 64 * 1024)
    
    186
    +                remaining -= chunk_size
    
    187
    +
    
    188
    +                request = bytestream_pb2.WriteRequest()
    
    189
    +                request.resource_name = resource
    
    190
    +                request.data = content[offset:offset + chunk_size]
    
    191
    +                request.write_offset = offset
    
    192
    +                request.finish_write = remaining <= 0
    
    193
    +
    
    194
    +                yield request
    
    195
    +
    
    196
    +                offset += chunk_size
    
    197
    +                finished = request.finish_write
    
    198
    +
    
    199
    +        write_resquests = __write_request_stream(resource_name, blob)
    
    200
    +        # TODO: Handle connection loss/recovery using QueryWriteStatus()
    
    201
    +        write_response = self.__bytestream_stub.Write(write_resquests)
    
    202
    +
    
    203
    +        assert write_response.committed_size == blob_digest.size_bytes
    
    204
    +
    
    205
    +        return blob_digest
    
    206
    +
    
    207
    +    def _send_batch(self):
    
    208
    +        """Sends queued data using ContentAddressableStorage.BatchUpdateBlobs()"""
    
    209
    +        batch_request = remote_execution_pb2.BatchUpdateBlobsRequest()
    
    210
    +        batch_request.requests.extend(self.__requests.values())
    
    211
    +        if self.instance_name is not None:
    
    212
    +            batch_request.instance_name = self.instance_name
    
    213
    +
    
    214
    +        batch_response = self.__cas_stub.BatchUpdateBlobs(batch_request)
    
    215
    +
    
    216
    +        for response in batch_response.responses:
    
    217
    +            assert response.digest.hash in self.__requests
    
    218
    +            assert response.status.code is 0
    
    219
    +
    
    220
    +        self.__requests.clear()
    
    221
    +        self.__request_size = 0

  • buildgrid/server/execution/execution_service.py
    ... ... @@ -86,6 +86,11 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    86 86
                 yield operations_pb2.Operation()
    
    87 87
     
    
    88 88
         def _get_instance(self, name):
    
    89
    +        # If client does not support multiple instances, it may omit the
    
    90
    +        # instance name request parameter, so better map our default:
    
    91
    +        if not name and len(self._instances) == 1:
    
    92
    +            name = next(iter(self._instances))
    
    93
    +
    
    89 94
             try:
    
    90 95
                 return self._instances[name]
    
    91 96
     
    

  • buildgrid/utils.py
    ... ... @@ -13,6 +13,7 @@
    13 13
     # limitations under the License.
    
    14 14
     
    
    15 15
     
    
    16
    +from operator import attrgetter
    
    16 17
     import os
    
    17 18
     
    
    18 19
     from buildgrid.settings import HASH
    
    ... ... @@ -99,7 +100,15 @@ def parse_to_pb2_from_fetch(pb2, stub, digest, instance_name=""):
    99 100
     
    
    100 101
     
    
    101 102
     def create_digest(bytes_to_digest):
    
    102
    -    """ Creates a hash based on the hex digest and returns the digest
    
    103
    +    """Computes the :obj:`Digest` of a piece of data.
    
    104
    +
    
    105
    +    The :obj:`Digest` of a data is a function of its hash **and** size.
    
    106
    +
    
    107
    +    Args:
    
    108
    +        bytes_to_digest (bytes): byte data to digest.
    
    109
    +
    
    110
    +    Returns:
    
    111
    +        :obj:`Digest`: The gRPC :obj:`Digest` for the given byte data.
    
    103 112
         """
    
    104 113
         return remote_execution_pb2.Digest(hash=HASH(bytes_to_digest).hexdigest(),
    
    105 114
                                            size_bytes=len(bytes_to_digest))
    
    ... ... @@ -136,6 +145,200 @@ def file_maker(file_path, file_digest):
    136 145
                                              is_executable=os.access(file_path, os.X_OK))
    
    137 146
     
    
    138 147
     
    
    139
    -def read_file(read):
    
    140
    -    with open(read, 'rb') as f:
    
    141
    -        return f.read()
    148
    +def directory_maker(directory_path, child_directories=None, cas=None, upload_directories=True):
    
    149
    +    """Creates a :obj:`Directory` from a local directory and possibly upload it.
    
    150
    +
    
    151
    +    Args:
    
    152
    +        directory_path (str): absolute or relative path to a local directory.
    
    153
    +        child_directories (list): output list of of children :obj:`Directory`
    
    154
    +            objects.
    
    155
    +        cas (:obj:`Uploader`): a CAS client uploader.
    
    156
    +        upload_directories (bool): wheter or not to upload the :obj:`Directory`
    
    157
    +            objects along with the files.
    
    158
    +
    
    159
    +    Returns:
    
    160
    +        :obj:`Directory`, :obj:`Digest`: Tuple of a new gRPC :obj:`Directory`
    
    161
    +        for the local directory pointed by `directory_path` and the digest
    
    162
    +        for that object.
    
    163
    +    """
    
    164
    +    if not os.path.isabs(directory_path):
    
    165
    +        directory_path = os.path.abspath(directory_path)
    
    166
    +
    
    167
    +    files, directories, symlinks = list(), list(), list()
    
    168
    +    for directory_entry in os.scandir(directory_path):
    
    169
    +        # Create a FileNode and corresponding BatchUpdateBlobsRequest:
    
    170
    +        if directory_entry.is_file(follow_symlinks=False):
    
    171
    +            if cas is not None:
    
    172
    +                node_digest = cas.upload_file(directory_entry.path)
    
    173
    +            else:
    
    174
    +                node_digest = create_digest(read_file(directory_entry.path))
    
    175
    +
    
    176
    +            node = remote_execution_pb2.FileNode()
    
    177
    +            node.name = directory_entry.name
    
    178
    +            node.digest.CopyFrom(node_digest)
    
    179
    +            node.is_executable = os.access(directory_entry.path, os.X_OK)
    
    180
    +
    
    181
    +            files.append(node)
    
    182
    +
    
    183
    +        # Create a DirectoryNode and corresponding BatchUpdateBlobsRequest:
    
    184
    +        elif directory_entry.is_dir(follow_symlinks=False):
    
    185
    +            _, node_digest = directory_maker(directory_entry.path,
    
    186
    +                                             child_directories=child_directories,
    
    187
    +                                             upload_directories=upload_directories,
    
    188
    +                                             cas=cas)
    
    189
    +
    
    190
    +            node = remote_execution_pb2.DirectoryNode()
    
    191
    +            node.name = directory_entry.name
    
    192
    +            node.digest.CopyFrom(node_digest)
    
    193
    +
    
    194
    +            directories.append(node)
    
    195
    +
    
    196
    +        # Create a SymlinkNode if necessary;
    
    197
    +        elif os.path.islink(directory_entry.path):
    
    198
    +            node_target = os.readlink(directory_entry.path)
    
    199
    +
    
    200
    +            node = remote_execution_pb2.SymlinkNode()
    
    201
    +            node.name = directory_entry.name
    
    202
    +            node.target = node_target
    
    203
    +
    
    204
    +            symlinks.append(node)
    
    205
    +
    
    206
    +    files.sort(key=attrgetter('name'))
    
    207
    +    directories.sort(key=attrgetter('name'))
    
    208
    +    symlinks.sort(key=attrgetter('name'))
    
    209
    +
    
    210
    +    directory = remote_execution_pb2.Directory()
    
    211
    +    directory.files.extend(files)
    
    212
    +    directory.directories.extend(directories)
    
    213
    +    directory.symlinks.extend(symlinks)
    
    214
    +
    
    215
    +    if child_directories is not None:
    
    216
    +        child_directories.append(directory)
    
    217
    +
    
    218
    +    if cas is not None and upload_directories:
    
    219
    +        directory_digest = cas.upload_directory(directory)
    
    220
    +    else:
    
    221
    +        directory_digest = create_digest(directory.SerializeToString())
    
    222
    +
    
    223
    +    return directory, directory_digest
    
    224
    +
    
    225
    +
    
    226
    +def tree_maker(directory_path, cas=None):
    
    227
    +    """Creates a :obj:`Tree` from a local directory and possibly upload it.
    
    228
    +
    
    229
    +    If `cas` is specified, the local directory content will be uploded/stored
    
    230
    +    in remote CAS (the :obj:`Tree` message won't).
    
    231
    +
    
    232
    +    Args:
    
    233
    +        directory_path (str): absolute or relative path to a local directory.
    
    234
    +        cas (:obj:`Uploader`): a CAS client uploader.
    
    235
    +
    
    236
    +    Returns:
    
    237
    +        :obj:`Tree`, :obj:`Digest`: Tuple of a new gRPC :obj:`Tree` for the
    
    238
    +        local directory pointed by `directory_path` and the digest for that
    
    239
    +        object.
    
    240
    +    """
    
    241
    +    if not os.path.isabs(directory_path):
    
    242
    +        directory_path = os.path.abspath(directory_path)
    
    243
    +
    
    244
    +    child_directories = list()
    
    245
    +    directory, _ = directory_maker(directory_path,
    
    246
    +                                   child_directories=child_directories,
    
    247
    +                                   upload_directories=False,
    
    248
    +                                   cas=cas)
    
    249
    +
    
    250
    +    tree = remote_execution_pb2.Tree()
    
    251
    +    tree.children.extend(child_directories)
    
    252
    +    tree.root.CopyFrom(directory)
    
    253
    +
    
    254
    +    if cas is not None:
    
    255
    +        tree_digest = cas.send_message(tree)
    
    256
    +    else:
    
    257
    +        tree_digest = create_digest(tree.SerializeToString())
    
    258
    +
    
    259
    +    return tree, tree_digest
    
    260
    +
    
    261
    +
    
    262
    +def read_file(file_path):
    
    263
    +    """Loads raw file content in memory.
    
    264
    +
    
    265
    +    Args:
    
    266
    +        file_path (str): path to the target file.
    
    267
    +
    
    268
    +    Returns:
    
    269
    +        bytes: Raw file's content until EOF.
    
    270
    +
    
    271
    +    Raises:
    
    272
    +        OSError: If `file_path` does not exist or is not readable.
    
    273
    +    """
    
    274
    +    with open(file_path, 'rb') as byte_file:
    
    275
    +        return byte_file.read()
    
    276
    +
    
    277
    +
    
    278
    +def output_file_maker(file_path, input_path, cas=None):
    
    279
    +    """Creates an :obj:`OutputFile` from a local file and possibly upload it.
    
    280
    +
    
    281
    +    If `cas` is specified, the local file will be uploded/stored in remote CAS
    
    282
    +    (the :obj:`OutputFile` message won't).
    
    283
    +
    
    284
    +    Note:
    
    285
    +        `file_path` **must** point inside or be relative to `input_path`.
    
    286
    +
    
    287
    +    Args:
    
    288
    +        file_path (str): absolute or relative path to a local file.
    
    289
    +        input_path (str): absolute or relative path to the input root directory.
    
    290
    +        cas (:obj:`Uploader`): a CAS client uploader.
    
    291
    +
    
    292
    +    Returns:
    
    293
    +        :obj:`OutputFile`: a new gRPC :obj:`OutputFile` object for the file
    
    294
    +        pointed by `file_path`.
    
    295
    +    """
    
    296
    +    if not os.path.isabs(file_path):
    
    297
    +        file_path = os.path.abspath(file_path)
    
    298
    +    if not os.path.isabs(input_path):
    
    299
    +        input_path = os.path.abspath(input_path)
    
    300
    +
    
    301
    +    if cas is not None:
    
    302
    +        file_digest = cas.upload_file(file_path)
    
    303
    +    else:
    
    304
    +        file_digest = create_digest(read_file(file_path))
    
    305
    +
    
    306
    +    output_file = remote_execution_pb2.OutputFile()
    
    307
    +    output_file.digest.CopyFrom(file_digest)
    
    308
    +    # OutputFile.path should be relative to the working direcory:
    
    309
    +    output_file.path = os.path.relpath(file_path, start=input_path)
    
    310
    +    output_file.is_executable = os.access(file_path, os.X_OK)
    
    311
    +
    
    312
    +    return output_file
    
    313
    +
    
    314
    +
    
    315
    +def output_directory_maker(directory_path, working_path, cas=None):
    
    316
    +    """Creates an :obj:`OutputDirectory` from a local directory.
    
    317
    +
    
    318
    +    If `cas` is specified, the local directory content will be uploded/stored
    
    319
    +    in remote CAS (the :obj:`OutputDirectory` message won't).
    
    320
    +
    
    321
    +    Note:
    
    322
    +        `directory_path` **must** point inside or be relative to `input_path`.
    
    323
    +
    
    324
    +    Args:
    
    325
    +        directory_path (str): absolute or relative path to a local directory.
    
    326
    +        working_path (str): absolute or relative path to the working directory.
    
    327
    +        cas (:obj:`Uploader`): a CAS client uploader.
    
    328
    +
    
    329
    +    Returns:
    
    330
    +        :obj:`OutputDirectory`: a new gRPC :obj:`OutputDirectory` for the
    
    331
    +        directory pointed by `directory_path`.
    
    332
    +    """
    
    333
    +    if not os.path.isabs(directory_path):
    
    334
    +        directory_path = os.path.abspath(directory_path)
    
    335
    +    if not os.path.isabs(working_path):
    
    336
    +        working_path = os.path.abspath(working_path)
    
    337
    +
    
    338
    +    _, tree_digest = tree_maker(directory_path, cas=cas)
    
    339
    +
    
    340
    +    output_directory = remote_execution_pb2.OutputDirectory()
    
    341
    +    output_directory.tree_digest.CopyFrom(tree_digest)
    
    342
    +    output_directory.path = os.path.relpath(directory_path, start=working_path)
    
    343
    +
    
    344
    +    return output_directory



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]