[Notes] [Git][BuildGrid/buildgrid][mablanch/71-buildstream-support] 4 commits: Fixed race condition.



Title: GitLab

Martin Blanchard pushed to branch mablanch/71-buildstream-support at BuildGrid / buildgrid

Commits:

4 changed files:

Changes:

  • buildgrid/_app/bots/buildbox.py
    ... ... @@ -19,83 +19,125 @@ import tempfile
    19 19
     
    
    20 20
     from google.protobuf import any_pb2
    
    21 21
     
    
    22
    +from buildgrid.client.cas import upload
    
    22 23
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    23 24
     from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
    
    24
    -from buildgrid.utils import parse_to_pb2_from_fetch
    
    25
    +from buildgrid.utils import read_file, write_file, parse_to_pb2_from_fetch
    
    25 26
     
    
    26 27
     
    
    27 28
     def work_buildbox(context, lease):
    
    29
    +    """Executes a lease for a build action, using buildbox.
    
    30
    +    """
    
    31
    +
    
    32
    +    stub_bytestream = bytestream_pb2_grpc.ByteStreamStub(context.cas_channel)
    
    33
    +    local_cas_directory = context.local_cas
    
    28 34
         logger = context.logger
    
    29 35
     
    
    30
    -    action_digest_any = lease.payload
    
    31 36
         action_digest = remote_execution_pb2.Digest()
    
    32
    -    action_digest_any.Unpack(action_digest)
    
    37
    +    lease.payload.Unpack(action_digest)
    
    38
    +
    
    39
    +    action = parse_to_pb2_from_fetch(remote_execution_pb2.Action(),
    
    40
    +                                     stub_bytestream, action_digest)
    
    33 41
     
    
    34
    -    stub = bytestream_pb2_grpc.ByteStreamStub(context.cas_channel)
    
    42
    +    command = parse_to_pb2_from_fetch(remote_execution_pb2.Command(),
    
    43
    +                                      stub_bytestream, action.command_digest)
    
    35 44
     
    
    36
    -    action = remote_execution_pb2.Action()
    
    37
    -    parse_to_pb2_from_fetch(action, stub, action_digest)
    
    45
    +    environment = dict()
    
    46
    +    for variable in command.environment_variables:
    
    47
    +        if variable.name not in ['PWD']:
    
    48
    +            environment[variable.name] = variable.value
    
    38 49
     
    
    39
    -    casdir = context.local_cas
    
    40
    -    remote_command = remote_execution_pb2.Command()
    
    41
    -    parse_to_pb2_from_fetch(remote_command, stub, action.command_digest)
    
    50
    +    if command.working_directory:
    
    51
    +        working_directory = command.working_directory
    
    52
    +    else:
    
    53
    +        working_directory = '/'
    
    42 54
     
    
    43
    -    environment = dict((x.name, x.value) for x in remote_command.environment_variables)
    
    44 55
         logger.debug("command hash: {}".format(action.command_digest.hash))
    
    45 56
         logger.debug("vdir hash: {}".format(action.input_root_digest.hash))
    
    46
    -    logger.debug("\n{}".format(' '.join(remote_command.arguments)))
    
    47
    -
    
    48
    -    # Input hash must be written to disk for buildbox.
    
    49
    -    os.makedirs(os.path.join(casdir, 'tmp'), exist_ok=True)
    
    50
    -    with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as input_digest_file:
    
    51
    -        with open(input_digest_file.name, 'wb') as f:
    
    52
    -            f.write(action.input_root_digest.SerializeToString())
    
    53
    -            f.flush()
    
    54
    -
    
    55
    -        with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as output_digest_file:
    
    56
    -            command = ['buildbox',
    
    57
    -                       '--remote={}'.format(context.remote_cas_url),
    
    58
    -                       '--input-digest={}'.format(input_digest_file.name),
    
    59
    -                       '--output-digest={}'.format(output_digest_file.name),
    
    60
    -                       '--local={}'.format(casdir)]
    
    57
    +    logger.debug("\n{}".format(' '.join(command.arguments)))
    
    58
    +
    
    59
    +    os.makedirs(os.path.join(local_cas_directory, 'tmp'), exist_ok=True)
    
    60
    +    os.makedirs(context.fuse_dir, exist_ok=True)
    
    61
    +
    
    62
    +    with tempfile.NamedTemporaryFile(dir=os.path.join(local_cas_directory, 'tmp')) as input_digest_file:
    
    63
    +        # Input hash must be written to disk for BuildBox
    
    64
    +        write_file(input_digest_file.name, action.input_root_digest.SerializeToString())
    
    65
    +
    
    66
    +        with tempfile.NamedTemporaryFile(dir=os.path.join(local_cas_directory, 'tmp')) as output_digest_file:
    
    67
    +            command_line = ['buildbox',
    
    68
    +                            '--remote={}'.format(context.remote_cas_url),
    
    69
    +                            '--input-digest={}'.format(input_digest_file.name),
    
    70
    +                            '--output-digest={}'.format(output_digest_file.name),
    
    71
    +                            '--chdir={}'.format(working_directory),
    
    72
    +                            '--local={}'.format(local_cas_directory)]
    
    61 73
     
    
    62 74
                 if context.cas_client_key:
    
    63
    -                command.append('--client-key={}'.format(context.cas_client_key))
    
    75
    +                command_line.append('--client-key={}'.format(context.cas_client_key))
    
    64 76
                 if context.cas_client_cert:
    
    65
    -                command.append('--client-cert={}'.format(context.cas_client_cert))
    
    77
    +                command_line.append('--client-cert={}'.format(context.cas_client_cert))
    
    66 78
                 if context.cas_server_cert:
    
    67
    -                command.append('--server-cert={}'.format(context.cas_server_cert))
    
    68
    -
    
    69
    -            if 'PWD' in environment and environment['PWD']:
    
    70
    -                command.append('--chdir={}'.format(environment['PWD']))
    
    79
    +                command_line.append('--server-cert={}'.format(context.cas_server_cert))
    
    71 80
     
    
    72
    -            command.append(context.fuse_dir)
    
    73
    -            command.extend(remote_command.arguments)
    
    81
    +            command_line.append(context.fuse_dir)
    
    82
    +            command_line.extend(command.arguments)
    
    74 83
     
    
    75
    -            logger.debug(' '.join(command))
    
    84
    +            logger.debug(' '.join(command_line))
    
    76 85
                 logger.debug("Input root digest:\n{}".format(action.input_root_digest))
    
    77 86
                 logger.info("Launching process")
    
    78 87
     
    
    79
    -            proc = subprocess.Popen(command,
    
    80
    -                                    stdin=subprocess.PIPE,
    
    81
    -                                    stdout=subprocess.PIPE)
    
    82
    -            proc.communicate()
    
    88
    +            command_line = subprocess.Popen(command_line,
    
    89
    +                                            stdin=subprocess.PIPE,
    
    90
    +                                            stdout=subprocess.PIPE)
    
    91
    +            # TODO: Should return the stdout and stderr to the user.
    
    92
    +            command_line.communicate()
    
    83 93
     
    
    84
    -            output_root_digest = remote_execution_pb2.Digest()
    
    85
    -            with open(output_digest_file.name, 'rb') as f:
    
    86
    -                output_root_digest.ParseFromString(f.read())
    
    87
    -            logger.debug("Output root digest: {}".format(output_root_digest))
    
    94
    +            output_digest = remote_execution_pb2.Digest()
    
    95
    +            output_digest.ParseFromString(read_file(output_digest_file.name))
    
    88 96
     
    
    89
    -            if len(output_root_digest.hash) < 64:
    
    97
    +            logger.debug("Output root digest: {}".format(output_digest))
    
    98
    +
    
    99
    +            if len(output_digest.hash) < 64:
    
    90 100
                     logger.warning("Buildbox command failed - no output root digest present.")
    
    91
    -            output_file = remote_execution_pb2.OutputDirectory(tree_digest=output_root_digest)
    
    92 101
     
    
    93
    -    action_result = remote_execution_pb2.ActionResult()
    
    94
    -    action_result.output_directories.extend([output_file])
    
    102
    +            # TODO: Have BuildBox helping us creating the Tree instance here
    
    103
    +            # See https://gitlab.com/BuildStream/buildbox/issues/7 for details
    
    104
    +            output_tree = _cas_tree_maker(stub_bytestream, output_digest)
    
    105
    +
    
    106
    +            with upload(context.cas_channel) as cas:
    
    107
    +                output_tree_digest = cas.send_message(output_tree)
    
    95 108
     
    
    96
    -    action_result_any = any_pb2.Any()
    
    97
    -    action_result_any.Pack(action_result)
    
    109
    +            output_directory = remote_execution_pb2.OutputDirectory()
    
    110
    +            output_directory.tree_digest.CopyFrom(output_tree_digest)
    
    111
    +            output_directory.path = os.path.relpath(working_directory, start='/')
    
    98 112
     
    
    99
    -    lease.result.CopyFrom(action_result_any)
    
    113
    +            action_result = remote_execution_pb2.ActionResult()
    
    114
    +            action_result.output_directories.extend([output_directory])
    
    115
    +
    
    116
    +            action_result_any = any_pb2.Any()
    
    117
    +            action_result_any.Pack(action_result)
    
    118
    +
    
    119
    +            lease.result.CopyFrom(action_result_any)
    
    100 120
     
    
    101 121
         return lease
    
    122
    +
    
    123
    +
    
    124
    +def _cas_tree_maker(stub_bytestream, directory_digest):
    
    125
    +    # Generates and stores a Tree for a given Directory
    
    126
    +    output_tree = remote_execution_pb2.Tree()
    
    127
    +
    
    128
    +    def list_directories(parent_directory):
    
    129
    +        directory_list = list()
    
    130
    +        for directory_node in parent_directory.directories:
    
    131
    +            directory = parse_to_pb2_from_fetch(remote_execution_pb2.Directory(),
    
    132
    +                                                stub_bytestream, directory_node.digest)
    
    133
    +            directory_list.extend(list_directories(directory))
    
    134
    +            directory_list.append(directory)
    
    135
    +
    
    136
    +        return directory_list
    
    137
    +
    
    138
    +    root_directory = parse_to_pb2_from_fetch(remote_execution_pb2.Directory(),
    
    139
    +                                             stub_bytestream, directory_digest)
    
    140
    +    output_tree.children.extend(list_directories(root_directory))
    
    141
    +    output_tree.root.CopyFrom(root_directory)
    
    142
    +
    
    143
    +    return output_tree

  • buildgrid/server/scheduler.py
    ... ... @@ -82,13 +82,10 @@ class Scheduler:
    82 82
                     job.n_tries += 1
    
    83 83
                     self.queue.appendleft(job)
    
    84 84
     
    
    85
    -            self.jobs[name] = job
    
    86
    -
    
    87 85
         def job_complete(self, name, result):
    
    88 86
             job = self.jobs[name]
    
    89 87
             job.result = result
    
    90 88
             job.update_execute_stage(ExecuteStage.COMPLETED)
    
    91
    -        self.jobs[name] = job
    
    92 89
             if not job.do_not_cache and self._action_cache is not None:
    
    93 90
                 self._action_cache.update_action_result(job.action_digest, result)
    
    94 91
     
    
    ... ... @@ -101,7 +98,6 @@ class Scheduler:
    101 98
         def update_job_lease_state(self, name, state):
    
    102 99
             job = self.jobs[name]
    
    103 100
             job.lease.state = state
    
    104
    -        self.jobs[name] = job
    
    105 101
     
    
    106 102
         def get_job_lease(self, name):
    
    107 103
             return self.jobs[name].lease
    
    ... ... @@ -118,5 +114,4 @@ class Scheduler:
    118 114
                 job.update_execute_stage(ExecuteStage.EXECUTING)
    
    119 115
                 job.lease = job.create_lease()
    
    120 116
                 job.lease.state = LeaseState.PENDING.value
    
    121
    -            self.jobs[job.name] = job
    
    122 117
                 yield job.lease

  • buildgrid/server/worker/bots_interface.py
    ... ... @@ -113,8 +113,8 @@ class BotsInterface:
    113 113
                     pass
    
    114 114
     
    
    115 115
                 elif client_state == LeaseState.COMPLETED:
    
    116
    -                self._scheduler.job_complete(client_lease.id, client_lease.result)
    
    117 116
                     self._scheduler.update_job_lease_state(client_lease.id, client_lease.state)
    
    117
    +                self._scheduler.job_complete(client_lease.id, client_lease.result)
    
    118 118
                     return None
    
    119 119
     
    
    120 120
                 else:
    

  • buildgrid/utils.py
    ... ... @@ -275,6 +275,21 @@ def read_file(file_path):
    275 275
             return byte_file.read()
    
    276 276
     
    
    277 277
     
    
    278
    +def write_file(file_path, content):
    
    279
    +    """Dumps raw memory content to a file.
    
    280
    +
    
    281
    +    Args:
    
    282
    +        file_path (str): path to the target file.
    
    283
    +        content (bytes): raw file's content.
    
    284
    +
    
    285
    +    Raises:
    
    286
    +        OSError: If `file_path` does not exist or is not writable.
    
    287
    +    """
    
    288
    +    with open(file_path, 'wb') as byte_file:
    
    289
    +        byte_file.write(content)
    
    290
    +        byte_file.flush()
    
    291
    +
    
    292
    +
    
    278 293
     def output_file_maker(file_path, input_path, cas=None):
    
    279 294
         """Creates an :obj:`OutputFile` from a local file and possibly upload it.
    
    280 295
     
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]