[Notes] [Git][BuildGrid/buildgrid][master] 3 commits: buildbox.py: Unpack a digest from lease.payload



Title: GitLab

Martin Blanchard pushed to branch master at BuildGrid / buildgrid

Commits:

1 changed file:

Changes:

  • buildgrid/_app/bots/buildbox.py
    ... ... @@ -21,16 +21,16 @@ import grpc
    21 21
     from google.protobuf import any_pb2
    
    22 22
     
    
    23 23
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    24
    -from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    25
    -from buildgrid.utils import read_file
    
    24
    +from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
    
    25
    +from buildgrid.utils import read_file, parse_to_pb2_from_fetch
    
    26 26
     
    
    27 27
     
    
    28 28
     def work_buildbox(context, lease):
    
    29 29
         logger = context.logger
    
    30 30
     
    
    31
    -    action_any = lease.payload
    
    32
    -    action = remote_execution_pb2.Action()
    
    33
    -    action_any.Unpack(action)
    
    31
    +    action_digest_any = lease.payload
    
    32
    +    action_digest = remote_execution_pb2.Digest()
    
    33
    +    action_digest_any.Unpack(action_digest)
    
    34 34
     
    
    35 35
         cert_server = read_file(context.server_cert)
    
    36 36
         cert_client = read_file(context.client_cert)
    
    ... ... @@ -45,38 +45,57 @@ def work_buildbox(context, lease):
    45 45
     
    
    46 46
         stub = bytestream_pb2_grpc.ByteStreamStub(channel)
    
    47 47
     
    
    48
    -    remote_command = _buildstream_fetch_command(context.local_cas, stub, action.command_digest)
    
    48
    +    action = remote_execution_pb2.Action()
    
    49
    +    parse_to_pb2_from_fetch(action, stub, action_digest)
    
    50
    +
    
    51
    +    casdir = context.local_cas
    
    52
    +    remote_command = remote_execution_pb2.Command()
    
    53
    +    parse_to_pb2_from_fetch(remote_command, stub, action.command_digest)
    
    54
    +
    
    49 55
         environment = dict((x.name, x.value) for x in remote_command.environment_variables)
    
    50 56
         logger.debug("command hash: {}".format(action.command_digest.hash))
    
    51 57
         logger.debug("vdir hash: {}".format(action.input_root_digest.hash))
    
    52 58
         logger.debug("\n{}".format(' '.join(remote_command.arguments)))
    
    53 59
     
    
    54
    -    command = ['buildbox',
    
    55
    -               '--remote={}'.format('https://{}:{}'.format(context.remote, context.port)),
    
    56
    -               '--server-cert={}'.format(context.server_cert),
    
    57
    -               '--client-key={}'.format(context.client_key),
    
    58
    -               '--client-cert={}'.format(context.client_cert),
    
    59
    -               '--local={}'.format(context.local_cas),
    
    60
    -               '--chdir={}'.format(environment['PWD']),
    
    61
    -               context.fuse_dir]
    
    62
    -
    
    63
    -    command.extend(remote_command.arguments)
    
    64
    -
    
    65
    -    logger.debug(' '.join(command))
    
    66
    -    logger.debug("Input root digest:\n{}".format(action.input_root_digest))
    
    67
    -    logger.info("Launching process")
    
    68
    -
    
    69
    -    proc = subprocess.Popen(command,
    
    70
    -                            stdin=subprocess.PIPE,
    
    71
    -                            stdout=subprocess.PIPE)
    
    72
    -    std_send = action.input_root_digest.SerializeToString()
    
    73
    -    std_out, _ = proc.communicate(std_send)
    
    74
    -
    
    75
    -    output_root_digest = remote_execution_pb2.Digest()
    
    76
    -    output_root_digest.ParseFromString(std_out)
    
    77
    -    logger.debug("Output root digest: {}".format(output_root_digest))
    
    78
    -
    
    79
    -    output_file = remote_execution_pb2.OutputDirectory(tree_digest=output_root_digest)
    
    60
    +    # Input hash must be written to disk for buildbox.
    
    61
    +    os.makedirs(os.path.join(casdir, 'tmp'), exist_ok=True)
    
    62
    +    with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as input_digest_file:
    
    63
    +        with open(input_digest_file.name, 'wb') as f:
    
    64
    +            f.write(action.input_root_digest.SerializeToString())
    
    65
    +            f.flush()
    
    66
    +
    
    67
    +        with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as output_digest_file:
    
    68
    +            command = ['buildbox',
    
    69
    +                       '--remote={}'.format('https://{}:{}'.format(context.remote, context.port)),
    
    70
    +                       '--server-cert={}'.format(context.server_cert),
    
    71
    +                       '--client-key={}'.format(context.client_key),
    
    72
    +                       '--client-cert={}'.format(context.client_cert),
    
    73
    +                       '--input-digest={}'.format(input_digest_file.name),
    
    74
    +                       '--output-digest={}'.format(output_digest_file.name),
    
    75
    +                       '--local={}'.format(casdir)]
    
    76
    +            if 'PWD' in environment and environment['PWD']:
    
    77
    +                command.append('--chdir={}'.format(environment['PWD']))
    
    78
    +
    
    79
    +            command.append(context.fuse_dir)
    
    80
    +            command.extend(remote_command.arguments)
    
    81
    +
    
    82
    +            logger.debug(' '.join(command))
    
    83
    +            logger.debug("Input root digest:\n{}".format(action.input_root_digest))
    
    84
    +            logger.info("Launching process")
    
    85
    +
    
    86
    +            proc = subprocess.Popen(command,
    
    87
    +                                    stdin=subprocess.PIPE,
    
    88
    +                                    stdout=subprocess.PIPE)
    
    89
    +            proc.communicate()
    
    90
    +
    
    91
    +            output_root_digest = remote_execution_pb2.Digest()
    
    92
    +            with open(output_digest_file.name, 'rb') as f:
    
    93
    +                output_root_digest.ParseFromString(f.read())
    
    94
    +            logger.debug("Output root digest: {}".format(output_root_digest))
    
    95
    +
    
    96
    +            if len(output_root_digest.hash) < 64:
    
    97
    +                logger.warning("Buildbox command failed - no output root digest present.")
    
    98
    +            output_file = remote_execution_pb2.OutputDirectory(tree_digest=output_root_digest)
    
    80 99
     
    
    81 100
         action_result = remote_execution_pb2.ActionResult()
    
    82 101
         action_result.output_directories.extend([output_file])
    
    ... ... @@ -87,33 +106,3 @@ def work_buildbox(context, lease):
    87 106
         lease.result.CopyFrom(action_result_any)
    
    88 107
     
    
    89 108
         return lease
    90
    -
    
    91
    -
    
    92
    -def _buildstream_fetch_blob(remote, digest, out):
    
    93
    -    resource_name = os.path.join(digest.hash, str(digest.size_bytes))
    
    94
    -    request = bytestream_pb2.ReadRequest()
    
    95
    -    request.resource_name = resource_name
    
    96
    -    request.read_offset = 0
    
    97
    -    for response in remote.Read(request):
    
    98
    -        out.write(response.data)
    
    99
    -
    
    100
    -    out.flush()
    
    101
    -    assert digest.size_bytes == os.fstat(out.fileno()).st_size
    
    102
    -
    
    103
    -
    
    104
    -def _buildstream_fetch_command(casdir, remote, digest):
    
    105
    -    with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as out:
    
    106
    -        _buildstream_fetch_blob(remote, digest, out)
    
    107
    -        remote_command = remote_execution_pb2.Command()
    
    108
    -        with open(out.name, 'rb') as f:
    
    109
    -            remote_command.ParseFromString(f.read())
    
    110
    -        return remote_command
    
    111
    -
    
    112
    -
    
    113
    -def _buildstream_fetch_action(casdir, remote, digest):
    
    114
    -    with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as out:
    
    115
    -        _buildstream_fetch_blob(remote, digest, out)
    
    116
    -        remote_action = remote_execution_pb2.Action()
    
    117
    -        with open(out.name, 'rb') as f:
    
    118
    -            remote_action.ParseFromString(f.read())
    
    119
    -        return remote_action



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]