[Notes] [Git][BuildGrid/buildgrid][mablanch/61-bazel-support] 4 commits: utils.py: Support symlinks and folders in fetcher helper



Title: GitLab

Martin Blanchard pushed to branch mablanch/61-bazel-support at BuildGrid / buildgrid

Commits:

4 changed files:

Changes:

  • buildgrid/_app/bots/temp_directory.py
    ... ... @@ -19,7 +19,7 @@ import tempfile
    19 19
     
    
    20 20
     from google.protobuf import any_pb2
    
    21 21
     
    
    22
    -from buildgrid.utils import read_file, create_digest, write_fetch_directory, parse_to_pb2_from_fetch
    
    22
    +from buildgrid.utils import output_file_maker, write_fetch_directory, parse_to_pb2_from_fetch
    
    23 23
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    24 24
     from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
    
    25 25
     
    
    ... ... @@ -35,54 +35,68 @@ def work_temp_directory(context, lease):
    35 35
         action_digest = remote_execution_pb2.Digest()
    
    36 36
         lease.payload.Unpack(action_digest)
    
    37 37
     
    
    38
    -    action = remote_execution_pb2.Action()
    
    38
    +    action = parse_to_pb2_from_fetch(remote_execution_pb2.Action(),
    
    39
    +                                     stub_bytestream, action_digest, instance_name)
    
    39 40
     
    
    40
    -    action = parse_to_pb2_from_fetch(action, stub_bytestream, action_digest, instance_name)
    
    41
    +    with tempfile.TemporaryDirectory() as temp_directory:
    
    41 42
     
    
    42
    -    with tempfile.TemporaryDirectory() as temp_dir:
    
    43
    +        command = parse_to_pb2_from_fetch(remote_execution_pb2.Command(),
    
    44
    +                                          stub_bytestream, action.command_digest, instance_name)
    
    43 45
     
    
    44
    -        command = remote_execution_pb2.Command()
    
    45
    -        command = parse_to_pb2_from_fetch(command, stub_bytestream, action.command_digest, instance_name)
    
    46
    +        write_fetch_directory(temp_directory, stub_bytestream,
    
    47
    +                              action.input_root_digest, instance_name)
    
    46 48
     
    
    47
    -        arguments = "cd {} &&".format(temp_dir)
    
    49
    +        execution_envionment = os.environ.copy()
    
    50
    +        for variable in command.environment_variables:
    
    51
    +            if variable.name not in ['PATH', 'PWD']:
    
    52
    +                execution_envionment[variable.name] = variable.value
    
    48 53
     
    
    54
    +        command_arguments = list()
    
    49 55
             for argument in command.arguments:
    
    50
    -            arguments += " {}".format(argument)
    
    51
    -
    
    52
    -        context.logger.info(arguments)
    
    53
    -
    
    54
    -        write_fetch_directory(temp_dir, stub_bytestream, action.input_root_digest, instance_name)
    
    55
    -
    
    56
    -        proc = subprocess.Popen(arguments,
    
    57
    -                                shell=True,
    
    58
    -                                stdin=subprocess.PIPE,
    
    59
    -                                stdout=subprocess.PIPE)
    
    60
    -
    
    61
    -        # TODO: Should return the std_out to the user
    
    62
    -        proc.communicate()
    
    63
    -
    
    64
    -        result = remote_execution_pb2.ActionResult()
    
    65
    -        requests = []
    
    66
    -        for output_file in command.output_files:
    
    67
    -            path = os.path.join(temp_dir, output_file)
    
    68
    -            chunk = read_file(path)
    
    69
    -
    
    70
    -            digest = create_digest(chunk)
    
    71
    -
    
    72
    -            result.output_files.extend([remote_execution_pb2.OutputFile(path=output_file,
    
    73
    -                                                                        digest=digest)])
    
    74
    -
    
    75
    -            requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
    
    76
    -                digest=digest, data=chunk))
    
    77
    -
    
    78
    -        request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
    
    79
    -                                                               requests=requests)
    
    56
    +            command_arguments.append(argument.strip())
    
    57
    +
    
    58
    +        working_directory = None
    
    59
    +        if command.working_directory:
    
    60
    +            working_directory = os.path.join(temp_directory,
    
    61
    +                                             command.working_directory)
    
    62
    +            os.makedirs(working_directory, exist_ok=True)
    
    63
    +        else:
    
    64
    +            working_directory = temp_directory
    
    65
    +
    
    66
    +        # Ensure that output files structure exists:
    
    67
    +        for output_path in command.output_files:
    
    68
    +            directory_path = os.path.join(working_directory,
    
    69
    +                                          os.path.dirname(output_path))
    
    70
    +            os.makedirs(directory_path, exist_ok=True)
    
    71
    +
    
    72
    +        process = subprocess.Popen(command_arguments,
    
    73
    +                                   cwd=working_directory,
    
    74
    +                                   universal_newlines=True,
    
    75
    +                                   env=execution_envionment,
    
    76
    +                                   stdin=subprocess.PIPE,
    
    77
    +                                   stdout=subprocess.PIPE)
    
    78
    +        # TODO: Should return the stdout and stderr to the user.
    
    79
    +        process.communicate()
    
    80
    +
    
    81
    +        update_requests = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name)
    
    82
    +        action_result = remote_execution_pb2.ActionResult()
    
    83
    +
    
    84
    +        for output_path in command.output_files:
    
    85
    +            file_path = os.path.join(working_directory, output_path)
    
    86
    +            # Missing outputs should simply be omitted in ActionResult:
    
    87
    +            if not os.path.isfile(file_path):
    
    88
    +                continue
    
    89
    +
    
    90
    +            output_file, update_request = output_file_maker(file_path, working_directory)
    
    91
    +
    
    92
    +            action_result.output_files.extend([output_file])
    
    93
    +            update_requests.requests.extend([update_request])
    
    80 94
     
    
    81 95
             stub_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel)
    
    82
    -        stub_cas.BatchUpdateBlobs(request)
    
    96
    +        stub_cas.BatchUpdateBlobs(update_requests)
    
    83 97
     
    
    84 98
             result_any = any_pb2.Any()
    
    85
    -        result_any.Pack(result)
    
    99
    +        result_any.Pack(action_result)
    
    86 100
     
    
    87 101
             lease.result.CopyFrom(result_any)
    
    88 102
     
    

  • buildgrid/server/action_cache.py deleted
    1
    -# Copyright (C) 2018 Bloomberg LP
    
    2
    -#
    
    3
    -# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    -# you may not use this file except in compliance with the License.
    
    5
    -# You may obtain a copy of the License at
    
    6
    -#
    
    7
    -#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    -#
    
    9
    -# Unless required by applicable law or agreed to in writing, software
    
    10
    -# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    -# See the License for the specific language governing permissions and
    
    13
    -# limitations under the License.
    
    14
    -
    
    15
    -
    
    16
    -"""
    
    17
    -ActionCache
    
    18
    -===========
    
    19
    -
    
    20
    -Implements a simple in-memory action cache.
    
    21
    -
    
    22
    -The action cache maps Action to their corresponding ActionResult. An
    
    23
    -ActionResult may be found in cache, for any given Action, if that action has
    
    24
    -already been executed.
    
    25
    -
    
    26
    -Note:
    
    27
    -    Action and ActionResult are referenced by their Digest and mapping is stored
    
    28
    -    in-memory.
    
    29
    -"""
    
    30
    -
    
    31
    -import collections
    
    32
    -
    
    33
    -from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
    
    34
    -
    
    35
    -
    
    36
    -class ActionCache:
    
    37
    -    """In-memory Action to ActionResult associative array.
    
    38
    -    """
    
    39
    -
    
    40
    -    def __init__(self, storage, max_cached_actions):
    
    41
    -        """Initialises a new ActionCache instance.
    
    42
    -
    
    43
    -        Args:
    
    44
    -            storage (StorageABC): storage backend instance to be used.
    
    45
    -            max_cached_actions (int): maximun number of entries to cache.
    
    46
    -        """
    
    47
    -        self._storage = storage
    
    48
    -        self._max_cached_actions = max_cached_actions
    
    49
    -        self._digest_map = collections.OrderedDict()
    
    50
    -
    
    51
    -    def get_action_result(self, action_digest):
    
    52
    -        """Retrieves the cached ActionResult for the given Action digest.
    
    53
    -
    
    54
    -        Args:
    
    55
    -            action_digest (Digest): digest of the Action to query.
    
    56
    -
    
    57
    -        Returns:
    
    58
    -            The cached ActionResult matching the given Action digest or None if
    
    59
    -            the nothing hass been cached yet for that Action.
    
    60
    -        """
    
    61
    -        key = (action_digest.hash, action_digest.size_bytes)
    
    62
    -        if key in self._digest_map:
    
    63
    -            action_result = self._storage.get_message(self._digest_map[key],
    
    64
    -                                                      re_pb2.ActionResult)
    
    65
    -            if action_result is not None:
    
    66
    -                if self._blobs_still_exist(action_result):
    
    67
    -                    self._digest_map.move_to_end(key)
    
    68
    -                    return action_result
    
    69
    -            del self._digest_map[key]
    
    70
    -        return None
    
    71
    -
    
    72
    -    def put_action_result(self, action_digest, action_result):
    
    73
    -        """Stores an ActionResult in cache for the given Action digest.
    
    74
    -
    
    75
    -        If the cache size limit has been reached, the oldest cache entries will
    
    76
    -        be dropped before insertion so that the cache size never exceeds the
    
    77
    -        maximum numbers of entries allowed.
    
    78
    -
    
    79
    -        Args:
    
    80
    -            action_digest (Digest): digest of the Action to select.
    
    81
    -            action_result (ActionResult): result object to store.
    
    82
    -        """
    
    83
    -        if self._max_cached_actions == 0:
    
    84
    -            return
    
    85
    -
    
    86
    -        while len(self._digest_map) >= self._max_cached_actions:
    
    87
    -            self._digest_map.popitem(last=False)
    
    88
    -
    
    89
    -        key = (action_digest.hash, action_digest.size_bytes)
    
    90
    -        action_result_digest = self._storage.put_message(action_result)
    
    91
    -        self._digest_map[key] = action_result_digest
    
    92
    -
    
    93
    -    def _blobs_still_exist(self, action_result):
    
    94
    -        """Checks CAS for ActionResult output blobs existance.
    
    95
    -
    
    96
    -        Args:
    
    97
    -            action_result (ActionResult): ActionResult to search referenced
    
    98
    -                output blobs for.
    
    99
    -
    
    100
    -        Returns:
    
    101
    -            True if all referenced blobs are present in CAS, False otherwise.
    
    102
    -        """
    
    103
    -        blobs_needed = []
    
    104
    -
    
    105
    -        for output_file in action_result.output_files:
    
    106
    -            blobs_needed.append(output_file.digest)
    
    107
    -
    
    108
    -        for output_directory in action_result.output_directories:
    
    109
    -            blobs_needed.append(output_directory.tree_digest)
    
    110
    -            tree = self._storage.get_message(output_directory.tree_digest,
    
    111
    -                                             re_pb2.Tree)
    
    112
    -            if tree is None:
    
    113
    -                return False
    
    114
    -            for file_node in tree.root.files:
    
    115
    -                blobs_needed.append(file_node.digest)
    
    116
    -            for child in tree.children:
    
    117
    -                for file_node in child.files:
    
    118
    -                    blobs_needed.append(file_node.digest)
    
    119
    -
    
    120
    -        if action_result.stdout_digest.hash and not action_result.stdout_raw:
    
    121
    -            blobs_needed.append(action_result.stdout_digest)
    
    122
    -        if action_result.stderr_digest.hash and not action_result.stderr_raw:
    
    123
    -            blobs_needed.append(action_result.stderr_digest)
    
    124
    -
    
    125
    -        missing = self._storage.missing_blobs(blobs_needed)
    
    126
    -        return len(missing) == 0

  • buildgrid/server/scheduler.py
    ... ... @@ -90,7 +90,7 @@ class Scheduler:
    90 90
             job.update_execute_stage(ExecuteStage.COMPLETED)
    
    91 91
             self.jobs[name] = job
    
    92 92
             if not job.do_not_cache and self._action_cache is not None:
    
    93
    -            self._action_cache.put_action_result(job.action_digest, result)
    
    93
    +            self._action_cache.update_action_result(job.action_digest, result)
    
    94 94
     
    
    95 95
         def get_operations(self):
    
    96 96
             response = operations_pb2.ListOperationsResponse()
    

  • buildgrid/utils.py
    ... ... @@ -31,30 +31,59 @@ def gen_fetch_blob(stub, digest, instance_name=""):
    31 31
             yield response.data
    
    32 32
     
    
    33 33
     
    
    34
    -def write_fetch_directory(directory, stub, digest, instance_name=""):
    
    35
    -    """ Given a directory digest, fetches files and writes them to a directory
    
    34
    +def write_fetch_directory(root_directory, stub, digest, instance_name=None):
    
    35
    +    """Locally replicates a directory from CAS.
    
    36
    +
    
    37
    +    Args:
    
    38
    +        root_directory (str): local directory to populate.
    
    39
    +        stub (): gRPC stub for CAS communication.
    
    40
    +        digest (Digest): digest for the directory to fetch from CAS.
    
    41
    +        instance_name (str, optional): farm instance name to query data from.
    
    36 42
         """
    
    37
    -    # TODO: Extend to symlinks and inner directories
    
    38
    -    # pathlib.Path('/my/directory').mkdir(parents=True, exist_ok=True)
    
    43
    +    if not os.path.isabs(root_directory):
    
    44
    +        root_directory = os.path.abspath(root_directory)
    
    45
    +    if not os.path.exists(root_directory):
    
    46
    +        os.makedirs(root_directory, exist_ok=True)
    
    39 47
     
    
    40
    -    directory_pb2 = remote_execution_pb2.Directory()
    
    41
    -    directory_pb2 = parse_to_pb2_from_fetch(directory_pb2, stub, digest, instance_name)
    
    48
    +    directory = parse_to_pb2_from_fetch(remote_execution_pb2.Directory(),
    
    49
    +                                        stub, digest, instance_name)
    
    50
    +
    
    51
    +    for directory_node in directory.directories:
    
    52
    +        child_path = os.path.join(root_directory, directory_node.name)
    
    53
    +
    
    54
    +        write_fetch_directory(child_path, stub, directory_node.digest, instance_name)
    
    55
    +
    
    56
    +    for file_node in directory.files:
    
    57
    +        child_path = os.path.join(root_directory, file_node.name)
    
    58
    +
    
    59
    +        with open(child_path, 'wb') as child_file:
    
    60
    +            write_fetch_blob(child_file, stub, file_node.digest, instance_name)
    
    61
    +
    
    62
    +    for symlink_node in directory.symlinks:
    
    63
    +        child_path = os.path.join(root_directory, symlink_node.name)
    
    42 64
     
    
    43
    -    for file_node in directory_pb2.files:
    
    44
    -        path = os.path.join(directory, file_node.name)
    
    45
    -        with open(path, 'wb') as f:
    
    46
    -            write_fetch_blob(f, stub, file_node.digest, instance_name)
    
    65
    +        if os.path.isabs(symlink_node.target):
    
    66
    +            continue  # No out of temp-directory links for now.
    
    67
    +        target_path = os.path.join(root_directory, symlink_node.target)
    
    47 68
     
    
    69
    +        os.symlink(child_path, target_path)
    
    48 70
     
    
    49
    -def write_fetch_blob(out, stub, digest, instance_name=""):
    
    50
    -    """ Given an output buffer, fetches blob and writes to buffer
    
    71
    +
    
    72
    +def write_fetch_blob(target_file, stub, digest, instance_name=None):
    
    73
    +    """Extracts a blob from CAS into a local file.
    
    74
    +
    
    75
    +    Args:
    
    76
    +        target_file (str): local file to write.
    
    77
    +        stub (): gRPC stub for CAS communication.
    
    78
    +        digest (Digest): digest for the blob to fetch from CAS.
    
    79
    +        instance_name (str, optional): farm instance name to query data from.
    
    51 80
         """
    
    52 81
     
    
    53 82
         for stream in gen_fetch_blob(stub, digest, instance_name):
    
    54
    -        out.write(stream)
    
    83
    +        target_file.write(stream)
    
    84
    +    target_file.flush()
    
    55 85
     
    
    56
    -    out.flush()
    
    57
    -    assert digest.size_bytes == os.fstat(out.fileno()).st_size
    
    86
    +    assert digest.size_bytes == os.fstat(target_file.fileno()).st_size
    
    58 87
     
    
    59 88
     
    
    60 89
     def parse_to_pb2_from_fetch(pb2, stub, digest, instance_name=""):
    
    ... ... @@ -107,6 +136,49 @@ def file_maker(file_path, file_digest):
    107 136
                                              is_executable=os.access(file_path, os.X_OK))
    
    108 137
     
    
    109 138
     
    
    110
    -def read_file(read):
    
    111
    -    with open(read, 'rb') as f:
    
    112
    -        return f.read()
    139
    +def read_file(file_path):
    
    140
    +    """Loads raw file content in memory.
    
    141
    +
    
    142
    +    Returns:
    
    143
    +        bytes: Raw file's content until EOF.
    
    144
    +
    
    145
    +    Raises:
    
    146
    +        OSError: If `file_path` does not exist or is not readable.
    
    147
    +    """
    
    148
    +    with open(file_path, 'rb') as byte_file:
    
    149
    +        return byte_file.read()
    
    150
    +
    
    151
    +
    
    152
    +def output_file_maker(file_path, input_path):
    
    153
    +    """Creates a gRPC :obj:`OutputFile` for a local file.
    
    154
    +
    
    155
    +    `file_path` **must** point inside or be relative to `input_path`.
    
    156
    +
    
    157
    +    Args:
    
    158
    +        file_path (str): absolute or relative path to a local file.
    
    159
    +        input_path (str): absolute or relative path to the input root directory.
    
    160
    +
    
    161
    +    Returns:
    
    162
    +        :obj:`OutputFile`, :obj:`BatchUpdateBlobsRequest`: Tuple of a new gRPC
    
    163
    +        :obj:`OutputFile` object for the file pointed by `file_path` and the
    
    164
    +        corresponding :obj:`BatchUpdateBlobsRequest` for CAS upload.
    
    165
    +    """
    
    166
    +    if not os.path.isabs(file_path):
    
    167
    +        file_path = os.path.abspath(file_path)
    
    168
    +    if not os.path.isabs(input_path):
    
    169
    +        input_path = os.path.abspath(input_path)
    
    170
    +
    
    171
    +    file_digest = remote_execution_pb2.Digest()
    
    172
    +
    
    173
    +    file_data = read_file(file_path)
    
    174
    +    file_digest.hash = HASH(file_data).hexdigest()
    
    175
    +    file_digest.size_bytes = len(file_data)
    
    176
    +
    
    177
    +    output_file = remote_execution_pb2.OutputFile(digest=file_digest)
    
    178
    +    output_file.path = os.path.relpath(file_path, start=input_path)
    
    179
    +    output_file.is_executable = os.access(file_path, os.X_OK)
    
    180
    +
    
    181
    +    update_request = remote_execution_pb2.BatchUpdateBlobsRequest.Request(digest=file_digest)
    
    182
    +    update_request.data = file_data
    
    183
    +
    
    184
    +    return output_file, update_request



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]