[Notes] [Git][BuildGrid/buildgrid][mablanch/61-bazel-support] 12 commits: Added instance support to BuildGrid



Title: GitLab

Martin Blanchard pushed to branch mablanch/61-bazel-support at BuildGrid / buildgrid

Commits:

18 changed files:

Changes:

  • buildgrid/_app/bots/buildbox.py
    ... ... @@ -21,16 +21,16 @@ import grpc
    21 21
     from google.protobuf import any_pb2
    
    22 22
     
    
    23 23
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    24
    -from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    25
    -from buildgrid.utils import read_file
    
    24
    +from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
    
    25
    +from buildgrid.utils import read_file, parse_to_pb2_from_fetch
    
    26 26
     
    
    27 27
     
    
    28 28
     def work_buildbox(context, lease):
    
    29 29
         logger = context.logger
    
    30 30
     
    
    31
    -    action_any = lease.payload
    
    32
    -    action = remote_execution_pb2.Action()
    
    33
    -    action_any.Unpack(action)
    
    31
    +    action_digest_any = lease.payload
    
    32
    +    action_digest = remote_execution_pb2.Digest()
    
    33
    +    action_digest_any.Unpack(action_digest)
    
    34 34
     
    
    35 35
         cert_server = read_file(context.server_cert)
    
    36 36
         cert_client = read_file(context.client_cert)
    
    ... ... @@ -45,38 +45,57 @@ def work_buildbox(context, lease):
    45 45
     
    
    46 46
         stub = bytestream_pb2_grpc.ByteStreamStub(channel)
    
    47 47
     
    
    48
    -    remote_command = _buildstream_fetch_command(context.local_cas, stub, action.command_digest)
    
    48
    +    action = remote_execution_pb2.Action()
    
    49
    +    parse_to_pb2_from_fetch(action, stub, action_digest)
    
    50
    +
    
    51
    +    casdir = context.local_cas
    
    52
    +    remote_command = remote_execution_pb2.Command()
    
    53
    +    parse_to_pb2_from_fetch(remote_command, stub, action.command_digest)
    
    54
    +
    
    49 55
         environment = dict((x.name, x.value) for x in remote_command.environment_variables)
    
    50 56
         logger.debug("command hash: {}".format(action.command_digest.hash))
    
    51 57
         logger.debug("vdir hash: {}".format(action.input_root_digest.hash))
    
    52 58
         logger.debug("\n{}".format(' '.join(remote_command.arguments)))
    
    53 59
     
    
    54
    -    command = ['buildbox',
    
    55
    -               '--remote={}'.format('https://{}:{}'.format(context.remote, context.port)),
    
    56
    -               '--server-cert={}'.format(context.server_cert),
    
    57
    -               '--client-key={}'.format(context.client_key),
    
    58
    -               '--client-cert={}'.format(context.client_cert),
    
    59
    -               '--local={}'.format(context.local_cas),
    
    60
    -               '--chdir={}'.format(environment['PWD']),
    
    61
    -               context.fuse_dir]
    
    62
    -
    
    63
    -    command.extend(remote_command.arguments)
    
    64
    -
    
    65
    -    logger.debug(' '.join(command))
    
    66
    -    logger.debug("Input root digest:\n{}".format(action.input_root_digest))
    
    67
    -    logger.info("Launching process")
    
    68
    -
    
    69
    -    proc = subprocess.Popen(command,
    
    70
    -                            stdin=subprocess.PIPE,
    
    71
    -                            stdout=subprocess.PIPE)
    
    72
    -    std_send = action.input_root_digest.SerializeToString()
    
    73
    -    std_out, _ = proc.communicate(std_send)
    
    74
    -
    
    75
    -    output_root_digest = remote_execution_pb2.Digest()
    
    76
    -    output_root_digest.ParseFromString(std_out)
    
    77
    -    logger.debug("Output root digest: {}".format(output_root_digest))
    
    78
    -
    
    79
    -    output_file = remote_execution_pb2.OutputDirectory(tree_digest=output_root_digest)
    
    60
    +    # Input hash must be written to disk for buildbox.
    
    61
    +    os.makedirs(os.path.join(casdir, 'tmp'), exist_ok=True)
    
    62
    +    with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as input_digest_file:
    
    63
    +        with open(input_digest_file.name, 'wb') as f:
    
    64
    +            f.write(action.input_root_digest.SerializeToString())
    
    65
    +            f.flush()
    
    66
    +
    
    67
    +        with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as output_digest_file:
    
    68
    +            command = ['buildbox',
    
    69
    +                       '--remote={}'.format('https://{}:{}'.format(context.remote, context.port)),
    
    70
    +                       '--server-cert={}'.format(context.server_cert),
    
    71
    +                       '--client-key={}'.format(context.client_key),
    
    72
    +                       '--client-cert={}'.format(context.client_cert),
    
    73
    +                       '--input-digest={}'.format(input_digest_file.name),
    
    74
    +                       '--output-digest={}'.format(output_digest_file.name),
    
    75
    +                       '--local={}'.format(casdir)]
    
    76
    +            if 'PWD' in environment and environment['PWD']:
    
    77
    +                command.append('--chdir={}'.format(environment['PWD']))
    
    78
    +
    
    79
    +            command.append(context.fuse_dir)
    
    80
    +            command.extend(remote_command.arguments)
    
    81
    +
    
    82
    +            logger.debug(' '.join(command))
    
    83
    +            logger.debug("Input root digest:\n{}".format(action.input_root_digest))
    
    84
    +            logger.info("Launching process")
    
    85
    +
    
    86
    +            proc = subprocess.Popen(command,
    
    87
    +                                    stdin=subprocess.PIPE,
    
    88
    +                                    stdout=subprocess.PIPE)
    
    89
    +            proc.communicate()
    
    90
    +
    
    91
    +            output_root_digest = remote_execution_pb2.Digest()
    
    92
    +            with open(output_digest_file.name, 'rb') as f:
    
    93
    +                output_root_digest.ParseFromString(f.read())
    
    94
    +            logger.debug("Output root digest: {}".format(output_root_digest))
    
    95
    +
    
    96
    +            if len(output_root_digest.hash) < 64:
    
    97
    +                logger.warning("Buildbox command failed - no output root digest present.")
    
    98
    +            output_file = remote_execution_pb2.OutputDirectory(tree_digest=output_root_digest)
    
    80 99
     
    
    81 100
         action_result = remote_execution_pb2.ActionResult()
    
    82 101
         action_result.output_directories.extend([output_file])
    
    ... ... @@ -87,33 +106,3 @@ def work_buildbox(context, lease):
    87 106
         lease.result.CopyFrom(action_result_any)
    
    88 107
     
    
    89 108
         return lease
    90
    -
    
    91
    -
    
    92
    -def _buildstream_fetch_blob(remote, digest, out):
    
    93
    -    resource_name = os.path.join(digest.hash, str(digest.size_bytes))
    
    94
    -    request = bytestream_pb2.ReadRequest()
    
    95
    -    request.resource_name = resource_name
    
    96
    -    request.read_offset = 0
    
    97
    -    for response in remote.Read(request):
    
    98
    -        out.write(response.data)
    
    99
    -
    
    100
    -    out.flush()
    
    101
    -    assert digest.size_bytes == os.fstat(out.fileno()).st_size
    
    102
    -
    
    103
    -
    
    104
    -def _buildstream_fetch_command(casdir, remote, digest):
    
    105
    -    with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as out:
    
    106
    -        _buildstream_fetch_blob(remote, digest, out)
    
    107
    -        remote_command = remote_execution_pb2.Command()
    
    108
    -        with open(out.name, 'rb') as f:
    
    109
    -            remote_command.ParseFromString(f.read())
    
    110
    -        return remote_command
    
    111
    -
    
    112
    -
    
    113
    -def _buildstream_fetch_action(casdir, remote, digest):
    
    114
    -    with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as out:
    
    115
    -        _buildstream_fetch_blob(remote, digest, out)
    
    116
    -        remote_action = remote_execution_pb2.Action()
    
    117
    -        with open(out.name, 'rb') as f:
    
    118
    -            remote_action.ParseFromString(f.read())
    
    119
    -        return remote_action

  • buildgrid/_app/bots/temp_directory.py
    ... ... @@ -19,7 +19,7 @@ import tempfile
    19 19
     
    
    20 20
     from google.protobuf import any_pb2
    
    21 21
     
    
    22
    -from buildgrid.utils import read_file, create_digest, write_fetch_directory, parse_to_pb2_from_fetch
    
    22
    +from buildgrid.utils import output_file_maker, write_fetch_directory, parse_to_pb2_from_fetch
    
    23 23
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    24 24
     from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
    
    25 25
     
    
    ... ... @@ -29,60 +29,86 @@ def work_temp_directory(context, lease):
    29 29
         then uploads results back to CAS
    
    30 30
         """
    
    31 31
     
    
    32
    -    instance_name = context.instance_name
    
    32
    +    instance_name = context.parent
    
    33 33
         stub_bytestream = bytestream_pb2_grpc.ByteStreamStub(context.channel)
    
    34 34
     
    
    35 35
         action_digest = remote_execution_pb2.Digest()
    
    36 36
         lease.payload.Unpack(action_digest)
    
    37 37
     
    
    38
    -    action = remote_execution_pb2.Action()
    
    38
    +    action = parse_to_pb2_from_fetch(remote_execution_pb2.Action(),
    
    39
    +                                     stub_bytestream, action_digest, instance_name)
    
    39 40
     
    
    40
    -    action = parse_to_pb2_from_fetch(action, stub_bytestream, action_digest, instance_name)
    
    41
    +    with tempfile.TemporaryDirectory() as temp_directory:
    
    42
    +        command = parse_to_pb2_from_fetch(remote_execution_pb2.Command(),
    
    43
    +                                          stub_bytestream, action.command_digest, instance_name)
    
    41 44
     
    
    42
    -    with tempfile.TemporaryDirectory() as temp_dir:
    
    45
    +        write_fetch_directory(temp_directory, stub_bytestream,
    
    46
    +                              action.input_root_digest, instance_name)
    
    43 47
     
    
    44
    -        command = remote_execution_pb2.Command()
    
    45
    -        command = parse_to_pb2_from_fetch(command, stub_bytestream, action.command_digest, instance_name)
    
    46
    -
    
    47
    -        arguments = "cd {} &&".format(temp_dir)
    
    48
    +        execution_envionment = os.environ.copy()
    
    49
    +        for variable in command.environment_variables:
    
    50
    +            if variable.name not in ['PATH', 'PWD']:
    
    51
    +                execution_envionment[variable.name] = variable.value
    
    48 52
     
    
    53
    +        command_arguments = list()
    
    49 54
             for argument in command.arguments:
    
    50
    -            arguments += " {}".format(argument)
    
    51
    -
    
    52
    -        context.logger.info(arguments)
    
    53
    -
    
    54
    -        write_fetch_directory(temp_dir, stub_bytestream, action.input_root_digest, instance_name)
    
    55
    -
    
    56
    -        proc = subprocess.Popen(arguments,
    
    57
    -                                shell=True,
    
    58
    -                                stdin=subprocess.PIPE,
    
    59
    -                                stdout=subprocess.PIPE)
    
    60
    -
    
    61
    -        # TODO: Should return the std_out to the user
    
    62
    -        proc.communicate()
    
    63
    -
    
    64
    -        result = remote_execution_pb2.ActionResult()
    
    65
    -        requests = []
    
    66
    -        for output_file in command.output_files:
    
    67
    -            path = os.path.join(temp_dir, output_file)
    
    68
    -            chunk = read_file(path)
    
    69
    -
    
    70
    -            digest = create_digest(chunk)
    
    71
    -
    
    72
    -            result.output_files.extend([remote_execution_pb2.OutputFile(path=output_file,
    
    73
    -                                                                        digest=digest)])
    
    74
    -
    
    75
    -            requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
    
    76
    -                digest=digest, data=chunk))
    
    77
    -
    
    78
    -        request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
    
    79
    -                                                               requests=requests)
    
    55
    +            command_arguments.append(argument.strip())
    
    56
    +
    
    57
    +        working_directory = None
    
    58
    +        if command.working_directory:
    
    59
    +            working_directory = os.path.join(temp_directory,
    
    60
    +                                             command.working_directory)
    
    61
    +            os.makedirs(working_directory, exist_ok=True)
    
    62
    +        else:
    
    63
    +            working_directory = temp_directory
    
    64
    +
    
    65
    +        # Ensure that output files structure exists:
    
    66
    +        for output_path in command.output_files:
    
    67
    +            directory_path = os.path.join(working_directory,
    
    68
    +                                          os.path.dirname(output_path))
    
    69
    +            os.makedirs(directory_path, exist_ok=True)
    
    70
    +
    
    71
    +        process = subprocess.Popen(command_arguments,
    
    72
    +                                   cwd=working_directory,
    
    73
    +                                   universal_newlines=True,
    
    74
    +                                   env=execution_envionment,
    
    75
    +                                   stdin=subprocess.PIPE,
    
    76
    +                                   stdout=subprocess.PIPE)
    
    77
    +        # TODO: Should return the stdout and stderr to the user.
    
    78
    +        process.communicate()
    
    79
    +
    
    80
    +        update_requests = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name)
    
    81
    +        action_result = remote_execution_pb2.ActionResult()
    
    82
    +
    
    83
    +        for output_path in command.output_files:
    
    84
    +            file_path = os.path.join(working_directory, output_path)
    
    85
    +            # Missing outputs should simply be omitted in ActionResult:
    
    86
    +            if not os.path.isfile(file_path):
    
    87
    +                continue
    
    88
    +
    
    89
    +            # OutputFile.path should be relative to the working direcory:
    
    90
    +            output_file, update_request = output_file_maker(file_path, working_directory)
    
    91
    +
    
    92
    +            action_result.output_files.extend([output_file])
    
    93
    +            update_requests.requests.extend([update_request])
    
    94
    +
    
    95
    +        for output_path in command.output_directories:
    
    96
    +            directory_path = os.path.join(working_directory, output_path)
    
    97
    +            # Missing outputs should simply be omitted in ActionResult:
    
    98
    +            if not os.path.isdir(directory_path):
    
    99
    +                continue
    
    100
    +
    
    101
    +            # OutputDirectory.path should be relative to the working direcory:
    
    102
    +            output_directory, update_request = output_directory_maker(directory_path, working_directory)
    
    103
    +
    
    104
    +            action_result.output_directories.extend([output_directory])
    
    105
    +            update_requests.requests.extend(update_request)
    
    80 106
     
    
    81 107
             stub_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel)
    
    82
    -        stub_cas.BatchUpdateBlobs(request)
    
    108
    +        stub_cas.BatchUpdateBlobs(update_requests)
    
    83 109
     
    
    84 110
             result_any = any_pb2.Any()
    
    85
    -        result_any.Pack(result)
    
    111
    +        result_any.Pack(action_result)
    
    86 112
     
    
    87 113
             lease.result.CopyFrom(result_any)
    
    88 114
     
    

  • buildgrid/_app/commands/cmd_bot.py
    ... ... @@ -35,7 +35,7 @@ from ..cli import pass_context
    35 35
     
    
    36 36
     
    
    37 37
     @click.group(name='bot', short_help="Create and register bot clients.")
    
    38
    -@click.option('--parent', type=click.STRING, default='bgd_test', show_default=True,
    
    38
    +@click.option('--parent', type=click.STRING, default='main', show_default=True,
    
    39 39
                   help="Targeted farm resource.")
    
    40 40
     @click.option('--port', type=click.INT, default='50051', show_default=True,
    
    41 41
                   help="Remote server's port number.")
    
    ... ... @@ -49,6 +49,7 @@ def cli(context, host, port, parent):
    49 49
         context.logger = logging.getLogger(__name__)
    
    50 50
         context.logger.info("Starting on port {}".format(port))
    
    51 51
         context.channel = channel
    
    52
    +    context.parent = parent
    
    52 53
     
    
    53 54
         worker = Worker()
    
    54 55
         worker.add_device(Device())
    
    ... ... @@ -75,14 +76,11 @@ def run_dummy(context):
    75 76
     
    
    76 77
     
    
    77 78
     @cli.command('temp-directory', short_help="Runs commands in temp directory and uploads results.")
    
    78
    -@click.option('--instance-name', type=click.STRING, default='testing', show_default=True,
    
    79
    -              help="Targeted farm instance name.")
    
    80 79
     @pass_context
    
    81
    -def run_temp_directory(context, instance_name):
    
    80
    +def run_temp_directory(context):
    
    82 81
         """ Downloads files and command from CAS and runs
    
    83 82
         in a temp directory, uploading result back to CAS
    
    84 83
         """
    
    85
    -    context.instance_name = instance_name
    
    86 84
         try:
    
    87 85
             b = bot.Bot(context.bot_session)
    
    88 86
             b.session(temp_directory.work_temp_directory,
    

  • buildgrid/_app/commands/cmd_cas.py
    ... ... @@ -31,25 +31,26 @@ from ..cli import pass_context
    31 31
     
    
    32 32
     
    
    33 33
     @click.group(name='cas', short_help="Interact with the CAS server.")
    
    34
    +@click.option('--instance-name', type=click.STRING, default='main', show_default=True,
    
    35
    +              help="Targeted farm instance name.")
    
    34 36
     @click.option('--port', type=click.INT, default='50051', show_default=True,
    
    35 37
                   help="Remote server's port number.")
    
    36 38
     @click.option('--host', type=click.STRING, default='localhost', show_default=True,
    
    37 39
                   help="Remote server's hostname.")
    
    38 40
     @pass_context
    
    39
    -def cli(context, host, port):
    
    41
    +def cli(context, instance_name, host, port):
    
    40 42
         context.logger = logging.getLogger(__name__)
    
    41 43
         context.logger.info("Starting on port {}".format(port))
    
    42 44
     
    
    45
    +    context.instance_name = instance_name
    
    43 46
         context.channel = grpc.insecure_channel('{}:{}'.format(host, port))
    
    44 47
         context.port = port
    
    45 48
     
    
    46 49
     
    
    47 50
     @cli.command('upload-files', short_help="Upload files to the CAS server.")
    
    48
    -@click.option('--instance-name', type=click.STRING, default='testing', show_default=True,
    
    49
    -              help="Targeted farm instance name.")
    
    50 51
     @click.argument('files', nargs=-1, type=click.File('rb'), required=True)
    
    51 52
     @pass_context
    
    52
    -def upload_files(context, files, instance_name):
    
    53
    +def upload_files(context, files):
    
    53 54
         stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel)
    
    54 55
     
    
    55 56
         requests = []
    
    ... ... @@ -58,7 +59,7 @@ def upload_files(context, files, instance_name):
    58 59
             requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
    
    59 60
                 digest=create_digest(chunk), data=chunk))
    
    60 61
     
    
    61
    -    request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
    
    62
    +    request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=context.instance_name,
    
    62 63
                                                                requests=requests)
    
    63 64
     
    
    64 65
         context.logger.info("Sending: {}".format(request))
    
    ... ... @@ -67,11 +68,9 @@ def upload_files(context, files, instance_name):
    67 68
     
    
    68 69
     
    
    69 70
     @cli.command('upload-dir', short_help="Upload a directory to the CAS server.")
    
    70
    -@click.option('--instance-name', type=click.STRING, default='testing', show_default=True,
    
    71
    -              help="Targeted farm instance name.")
    
    72 71
     @click.argument('directory', nargs=1, type=click.Path(), required=True)
    
    73 72
     @pass_context
    
    74
    -def upload_dir(context, directory, instance_name):
    
    73
    +def upload_dir(context, directory):
    
    75 74
         context.logger.info("Uploading directory to cas")
    
    76 75
         stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel)
    
    77 76
     
    
    ... ... @@ -81,7 +80,7 @@ def upload_dir(context, directory, instance_name):
    81 80
             requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
    
    82 81
                 digest=file_digest, data=chunk))
    
    83 82
     
    
    84
    -    request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
    
    83
    +    request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=context.instance_name,
    
    85 84
                                                                requests=requests)
    
    86 85
     
    
    87 86
         context.logger.info("Request:\n{}".format(request))
    

  • buildgrid/_app/commands/cmd_execute.py
    ... ... @@ -36,34 +36,35 @@ from ..cli import pass_context
    36 36
     
    
    37 37
     
    
    38 38
     @click.group(name='execute', short_help="Execute simple operations.")
    
    39
    +@click.option('--instance-name', type=click.STRING, default='main',
    
    40
    +              show_default=True, help="Targeted farm instance name.")
    
    39 41
     @click.option('--port', type=click.INT, default='50051', show_default=True,
    
    40 42
                   help="Remote server's port number.")
    
    41 43
     @click.option('--host', type=click.STRING, default='localhost', show_default=True,
    
    42 44
                   help="Remote server's hostname.")
    
    43 45
     @pass_context
    
    44
    -def cli(context, host, port):
    
    46
    +def cli(context, instance_name, host, port):
    
    45 47
         context.logger = logging.getLogger(__name__)
    
    46 48
         context.logger.info("Starting on port {}".format(port))
    
    47 49
     
    
    50
    +    context.instance_name = instance_name
    
    48 51
         context.channel = grpc.insecure_channel('{}:{}'.format(host, port))
    
    49 52
         context.port = port
    
    50 53
     
    
    51 54
     
    
    52 55
     @cli.command('request-dummy', short_help="Send a dummy action.")
    
    53
    -@click.option('--instance-name', type=click.STRING, default='testing', show_default=True,
    
    54
    -              help="Targeted farm instance name.")
    
    55 56
     @click.option('--number', type=click.INT, default=1, show_default=True,
    
    56 57
                   help="Number of request to send.")
    
    57 58
     @click.option('--wait-for-completion', is_flag=True,
    
    58 59
                   help="Stream updates until jobs are completed.")
    
    59 60
     @pass_context
    
    60
    -def request_dummy(context, number, instance_name, wait_for_completion):
    
    61
    +def request_dummy(context, number, wait_for_completion):
    
    61 62
         action_digest = remote_execution_pb2.Digest()
    
    62 63
     
    
    63 64
         context.logger.info("Sending execution request...")
    
    64 65
         stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
    
    65 66
     
    
    66
    -    request = remote_execution_pb2.ExecuteRequest(instance_name=instance_name,
    
    67
    +    request = remote_execution_pb2.ExecuteRequest(instance_name=context.instance_name,
    
    67 68
                                                       action_digest=action_digest,
    
    68 69
                                                       skip_cache_lookup=True)
    
    69 70
     
    
    ... ... @@ -98,7 +99,7 @@ def list_operations(context):
    98 99
         context.logger.info("Getting list of operations")
    
    99 100
         stub = operations_pb2_grpc.OperationsStub(context.channel)
    
    100 101
     
    
    101
    -    request = operations_pb2.ListOperationsRequest()
    
    102
    +    request = operations_pb2.ListOperationsRequest(name=context.instance_name)
    
    102 103
     
    
    103 104
         response = stub.ListOperations(request)
    
    104 105
     
    
    ... ... @@ -115,7 +116,8 @@ def list_operations(context):
    115 116
     @pass_context
    
    116 117
     def wait_execution(context, operation_name):
    
    117 118
         stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
    
    118
    -    request = remote_execution_pb2.WaitExecutionRequest(name=operation_name)
    
    119
    +    request = remote_execution_pb2.WaitExecutionRequest(instance_name=context.instance_name,
    
    120
    +                                                        name=operation_name)
    
    119 121
     
    
    120 122
         response = stub.WaitExecution(request)
    
    121 123
     
    
    ... ... @@ -124,8 +126,6 @@ def wait_execution(context, operation_name):
    124 126
     
    
    125 127
     
    
    126 128
     @cli.command('command', short_help="Send a command to be executed.")
    
    127
    -@click.option('--instance-name', type=click.STRING, default='testing', show_default=True,
    
    128
    -              help="Targeted farm instance name.")
    
    129 129
     @click.option('--output-file', nargs=2, type=(click.STRING, click.BOOL), multiple=True,
    
    130 130
                   help="Tuple of expected output file and is-executeable flag.")
    
    131 131
     @click.option('--output-directory', default='testing', show_default=True,
    
    ... ... @@ -133,7 +133,7 @@ def wait_execution(context, operation_name):
    133 133
     @click.argument('input-root', nargs=1, type=click.Path(), required=True)
    
    134 134
     @click.argument('commands', nargs=-1, type=click.STRING, required=True)
    
    135 135
     @pass_context
    
    136
    -def command(context, input_root, commands, output_file, output_directory, instance_name):
    
    136
    +def command(context, input_root, commands, output_file, output_directory):
    
    137 137
         stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
    
    138 138
     
    
    139 139
         execute_command = remote_execution_pb2.Command()
    
    ... ... @@ -170,11 +170,11 @@ def command(context, input_root, commands, output_file, output_directory, instan
    170 170
         requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
    
    171 171
             digest=action_digest, data=action.SerializeToString()))
    
    172 172
     
    
    173
    -    request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
    
    173
    +    request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=context.instance_name,
    
    174 174
                                                                requests=requests)
    
    175 175
         remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel).BatchUpdateBlobs(request)
    
    176 176
     
    
    177
    -    request = remote_execution_pb2.ExecuteRequest(instance_name=instance_name,
    
    177
    +    request = remote_execution_pb2.ExecuteRequest(instance_name=context.instance_name,
    
    178 178
                                                       action_digest=action_digest,
    
    179 179
                                                       skip_cache_lookup=True)
    
    180 180
         response = stub.Execute(request)
    
    ... ... @@ -201,7 +201,7 @@ def command(context, input_root, commands, output_file, output_directory, instan
    201 201
                         raise
    
    202 202
     
    
    203 203
             with open(path, 'wb+') as f:
    
    204
    -            write_fetch_blob(f, stub, output_file_response.digest, instance_name)
    
    204
    +            write_fetch_blob(f, stub, output_file_response.digest, context.instance_name)
    
    205 205
     
    
    206 206
             if output_file_response.path in output_executeables:
    
    207 207
                 st = os.stat(path)
    

  • buildgrid/_app/commands/cmd_server.py
    ... ... @@ -25,7 +25,7 @@ import logging
    25 25
     
    
    26 26
     import click
    
    27 27
     
    
    28
    -from buildgrid.server import build_grid_server
    
    28
    +from buildgrid.server import buildgrid_server
    
    29 29
     from buildgrid.server.cas.storage.disk import DiskStorage
    
    30 30
     from buildgrid.server.cas.storage.lru_memory_cache import LRUMemoryCache
    
    31 31
     from buildgrid.server.cas.storage.s3 import S3Storage
    
    ... ... @@ -45,6 +45,7 @@ def cli(context):
    45 45
     
    
    46 46
     
    
    47 47
     @cli.command('start', short_help="Setup a new server instance.")
    
    48
    +@click.argument('instances', nargs=-1, type=click.STRING)
    
    48 49
     @click.option('--port', type=click.INT, default='50051', show_default=True,
    
    49 50
                   help="The port number to be listened.")
    
    50 51
     @click.option('--max-cached-actions', type=click.INT, default=50, show_default=True,
    
    ... ... @@ -67,7 +68,9 @@ def cli(context):
    67 68
     @click.option('--cas-disk-directory', type=click.Path(file_okay=False, dir_okay=True, writable=True),
    
    68 69
                   help="For --cas=disk, the folder to store CAS blobs in.")
    
    69 70
     @pass_context
    
    70
    -def start(context, port, max_cached_actions, allow_uar, cas, **cas_args):
    
    71
    +def start(context, instances, port, max_cached_actions, allow_uar, cas, **cas_args):
    
    72
    +    """ Starts a BuildGrid server.
    
    73
    +    """
    
    71 74
         context.logger.info("Starting on port {}".format(port))
    
    72 75
     
    
    73 76
         cas_storage = _make_cas_storage(context, cas, cas_args)
    
    ... ... @@ -79,9 +82,13 @@ def start(context, port, max_cached_actions, allow_uar, cas, **cas_args):
    79 82
         else:
    
    80 83
             action_cache = ActionCache(cas_storage, max_cached_actions, allow_uar)
    
    81 84
     
    
    82
    -    server = build_grid_server.BuildGridServer(port,
    
    83
    -                                               cas_storage=cas_storage,
    
    84
    -                                               action_cache=action_cache)
    
    85
    +    if instances is None:
    
    86
    +        instances = ['main']
    
    87
    +
    
    88
    +    server = buildgrid_server.BuildGridServer(port,
    
    89
    +                                              instances,
    
    90
    +                                              cas_storage=cas_storage,
    
    91
    +                                              action_cache=action_cache)
    
    85 92
         loop = asyncio.get_event_loop()
    
    86 93
         try:
    
    87 94
             server.start()
    

  • buildgrid/server/buildgrid_instance.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +"""
    
    17
    +BuildGrid Instance
    
    18
    +==================
    
    19
    +
    
    20
    +An instance of the BuildGrid server.
    
    21
    +
    
    22
    +Contains scheduler, execution instance and an interface to the bots.
    
    23
    +"""
    
    24
    +
    
    25
    +
    
    26
    +import logging
    
    27
    +
    
    28
    +from .execution.execution_instance import ExecutionInstance
    
    29
    +from .scheduler import Scheduler
    
    30
    +from .worker.bots_interface import BotsInterface
    
    31
    +
    
    32
    +
    
    33
    +class BuildGridInstance(ExecutionInstance, BotsInterface):
    
    34
    +
    
    35
    +    def __init__(self, action_cache=None, cas_storage=None):
    
    36
    +        scheduler = Scheduler(action_cache)
    
    37
    +
    
    38
    +        self.logger = logging.getLogger(__name__)
    
    39
    +
    
    40
    +        ExecutionInstance.__init__(self, scheduler, cas_storage)
    
    41
    +        BotsInterface.__init__(self, scheduler)
    
    42
    +
    
    43
    +    def stream_operation_updates(self, message_queue, operation_name):
    
    44
    +        operation = message_queue.get()
    
    45
    +        while not operation.done:
    
    46
    +            yield operation
    
    47
    +            operation = message_queue.get()
    
    48
    +        yield operation
    
    49
    +
    
    50
    +    def cancel_operation(self, name):
    
    51
    +        # TODO: Cancel leases
    
    52
    +        raise NotImplementedError("Cancelled operations not supported")

  • buildgrid/server/build_grid_server.pybuildgrid/server/buildgrid_server.py
    ... ... @@ -29,35 +29,23 @@ from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_p
    29 29
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc
    
    30 30
     from buildgrid._protos.google.longrunning import operations_pb2_grpc
    
    31 31
     
    
    32
    +from .buildgrid_instance import BuildGridInstance
    
    32 33
     from .cas.bytestream_service import ByteStreamService
    
    33 34
     from .cas.content_addressable_storage_service import ContentAddressableStorageService
    
    34 35
     from .execution.action_cache_service import ActionCacheService
    
    35 36
     from .execution.execution_service import ExecutionService
    
    36 37
     from .execution.operations_service import OperationsService
    
    37
    -from .execution.execution_instance import ExecutionInstance
    
    38
    -from .scheduler import Scheduler
    
    39 38
     from .worker.bots_service import BotsService
    
    40
    -from .worker.bots_interface import BotsInterface
    
    41 39
     
    
    42 40
     
    
    43 41
     class BuildGridServer:
    
    44 42
     
    
    45
    -    def __init__(self, port='50051', max_workers=10, cas_storage=None, action_cache=None):
    
    43
    +    def __init__(self, port='50051', instances=None, max_workers=10, action_cache=None, cas_storage=None):
    
    46 44
             port = '[::]:{0}'.format(port)
    
    47
    -        scheduler = Scheduler(action_cache)
    
    48
    -        bots_interface = BotsInterface(scheduler)
    
    49
    -        execution_instance = ExecutionInstance(scheduler, cas_storage)
    
    50 45
     
    
    51 46
             self._server = grpc.server(futures.ThreadPoolExecutor(max_workers))
    
    52 47
             self._server.add_insecure_port(port)
    
    53 48
     
    
    54
    -        bots_pb2_grpc.add_BotsServicer_to_server(BotsService(bots_interface),
    
    55
    -                                                 self._server)
    
    56
    -        remote_execution_pb2_grpc.add_ExecutionServicer_to_server(ExecutionService(execution_instance),
    
    57
    -                                                                  self._server)
    
    58
    -        operations_pb2_grpc.add_OperationsServicer_to_server(OperationsService(execution_instance),
    
    59
    -                                                             self._server)
    
    60
    -
    
    61 49
             if cas_storage is not None:
    
    62 50
                 cas_service = ContentAddressableStorageService(cas_storage)
    
    63 51
                 remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(cas_service,
    
    ... ... @@ -69,6 +57,20 @@ class BuildGridServer:
    69 57
                 remote_execution_pb2_grpc.add_ActionCacheServicer_to_server(action_cache_service,
    
    70 58
                                                                             self._server)
    
    71 59
     
    
    60
    +        buildgrid_instances = {}
    
    61
    +        if not instances:
    
    62
    +            buildgrid_instances["main"] = BuildGridInstance(action_cache, cas_storage)
    
    63
    +        else:
    
    64
    +            for name in instances:
    
    65
    +                buildgrid_instances[name] = BuildGridInstance(action_cache, cas_storage)
    
    66
    +
    
    67
    +        bots_pb2_grpc.add_BotsServicer_to_server(BotsService(buildgrid_instances),
    
    68
    +                                                 self._server)
    
    69
    +        remote_execution_pb2_grpc.add_ExecutionServicer_to_server(ExecutionService(buildgrid_instances),
    
    70
    +                                                                  self._server)
    
    71
    +        operations_pb2_grpc.add_OperationsServicer_to_server(OperationsService(buildgrid_instances),
    
    72
    +                                                             self._server)
    
    73
    +
    
    72 74
         def start(self):
    
    73 75
             self._server.start()
    
    74 76
     
    

  • buildgrid/server/execution/execution_instance.py
    ... ... @@ -56,12 +56,14 @@ class ExecutionInstance:
    56 56
     
    
    57 57
         def get_operation(self, name):
    
    58 58
             operation = self._scheduler.jobs.get(name)
    
    59
    +
    
    59 60
             if operation is None:
    
    60 61
                 raise InvalidArgumentError("Operation name does not exist: {}".format(name))
    
    62
    +
    
    61 63
             else:
    
    62 64
                 return operation.get_operation()
    
    63 65
     
    
    64
    -    def list_operations(self, name, list_filter, page_size, page_token):
    
    66
    +    def list_operations(self, list_filter, page_size, page_token):
    
    65 67
             # TODO: Pages
    
    66 68
             # Spec says number of pages and length of a page are optional
    
    67 69
             return self._scheduler.get_operations()
    
    ... ... @@ -72,10 +74,6 @@ class ExecutionInstance:
    72 74
             except KeyError:
    
    73 75
                 raise InvalidArgumentError("Operation name does not exist: {}".format(name))
    
    74 76
     
    
    75
    -    def cancel_operation(self, name):
    
    76
    -        # TODO: Cancel leases
    
    77
    -        raise NotImplementedError("Cancelled operations not supported")
    
    78
    -
    
    79 77
         def register_message_client(self, name, queue):
    
    80 78
             try:
    
    81 79
                 self._scheduler.register_client(name, queue)
    

  • buildgrid/server/execution/execution_service.py
    ... ... @@ -35,23 +35,23 @@ from .._exceptions import InvalidArgumentError
    35 35
     
    
    36 36
     class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    
    37 37
     
    
    38
    -    def __init__(self, instance):
    
    38
    +    def __init__(self, instances):
    
    39 39
             self.logger = logging.getLogger(__name__)
    
    40
    -        self._instance = instance
    
    40
    +        self._instances = instances
    
    41 41
     
    
    42 42
         def Execute(self, request, context):
    
    43
    -        # Ignore request.instance_name for now
    
    44
    -        # Have only one instance
    
    45 43
             try:
    
    46 44
                 message_queue = queue.Queue()
    
    47
    -            operation = self._instance.execute(request.action_digest,
    
    48
    -                                               request.skip_cache_lookup,
    
    49
    -                                               message_queue)
    
    45
    +            instance = self._get_instance(request.instance_name)
    
    46
    +            operation = instance.execute(request.action_digest,
    
    47
    +                                         request.skip_cache_lookup,
    
    48
    +                                         message_queue)
    
    50 49
     
    
    51
    -            context.add_callback(partial(self._remove_client, operation.name, message_queue))
    
    50
    +            context.add_callback(partial(instance.unregister_message_client,
    
    51
    +                                         operation.name, message_queue))
    
    52 52
     
    
    53
    -            yield from self._stream_operation_updates(message_queue,
    
    54
    -                                                      operation.name)
    
    53
    +            yield from instance.stream_operation_updates(message_queue,
    
    54
    +                                                         operation.name)
    
    55 55
     
    
    56 56
             except InvalidArgumentError as e:
    
    57 57
                 self.logger.error(e)
    
    ... ... @@ -59,23 +59,25 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    59 59
                 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    60 60
                 yield operations_pb2.Operation()
    
    61 61
     
    
    62
    -        except NotImplementedError as e:
    
    63
    -            self.logger.error(e)
    
    64
    -            context.set_details(str(e))
    
    65
    -            context.set_code(grpc.StatusCode.UNIMPLEMENTED)
    
    66
    -            yield operations_pb2.Operation()
    
    67
    -
    
    68 62
         def WaitExecution(self, request, context):
    
    69 63
             try:
    
    64
    +            names = request.name.split("/")
    
    65
    +
    
    66
    +            # Operation name should be in format:
    
    67
    +            # {instance/name}/{operation_id}
    
    68
    +            instance_name = ''.join(names[0:-1])
    
    69
    +
    
    70 70
                 message_queue = queue.Queue()
    
    71
    -            operation_name = request.name
    
    71
    +            operation_name = names[-1]
    
    72
    +            instance = self._get_instance(instance_name)
    
    72 73
     
    
    73
    -            self._instance.register_message_client(operation_name, message_queue)
    
    74
    +            instance.register_message_client(operation_name, message_queue)
    
    74 75
     
    
    75
    -            context.add_callback(partial(self._remove_client, operation_name, message_queue))
    
    76
    +            context.add_callback(partial(instance.unregister_message_client,
    
    77
    +                                         operation_name, message_queue))
    
    76 78
     
    
    77
    -            yield from self._stream_operation_updates(message_queue,
    
    78
    -                                                      operation_name)
    
    79
    +            yield from instance.stream_operation_updates(message_queue,
    
    80
    +                                                         operation_name)
    
    79 81
     
    
    80 82
             except InvalidArgumentError as e:
    
    81 83
                 self.logger.error(e)
    
    ... ... @@ -83,12 +85,14 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    83 85
                 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    84 86
                 yield operations_pb2.Operation()
    
    85 87
     
    
    86
    -    def _remove_client(self, operation_name, message_queue):
    
    87
    -        self._instance.unregister_message_client(operation_name, message_queue)
    
    88
    +    def _get_instance(self, name):
    
    89
    +        # If client does not support multiple instances, it may omit the
    
    90
    +        # instance name request parameter, so better map our default:
    
    91
    +        if not name and len(self._instances) == 1:
    
    92
    +            name = 'main'
    
    93
    +
    
    94
    +        try:
    
    95
    +            return self._instances[name]
    
    88 96
     
    
    89
    -    def _stream_operation_updates(self, message_queue, operation_name):
    
    90
    -        operation = message_queue.get()
    
    91
    -        while not operation.done:
    
    92
    -            yield operation
    
    93
    -            operation = message_queue.get()
    
    94
    -        yield operation
    97
    +        except KeyError:
    
    98
    +            raise InvalidArgumentError("Instance doesn't exist on server: {}".format(name))

  • buildgrid/server/execution/operations_service.py
    ... ... @@ -23,6 +23,8 @@ import logging
    23 23
     
    
    24 24
     import grpc
    
    25 25
     
    
    26
    +from google.protobuf.empty_pb2 import Empty
    
    27
    +
    
    26 28
     from buildgrid._protos.google.longrunning import operations_pb2_grpc, operations_pb2
    
    27 29
     
    
    28 30
     from .._exceptions import InvalidArgumentError
    
    ... ... @@ -30,42 +32,102 @@ from .._exceptions import InvalidArgumentError
    30 32
     
    
    31 33
     class OperationsService(operations_pb2_grpc.OperationsServicer):
    
    32 34
     
    
    33
    -    def __init__(self, instance):
    
    34
    -        self._instance = instance
    
    35
    +    def __init__(self, instances):
    
    36
    +        self._instances = instances
    
    35 37
             self.logger = logging.getLogger(__name__)
    
    36 38
     
    
    37 39
         def GetOperation(self, request, context):
    
    38 40
             try:
    
    39
    -            return self._instance.get_operation(request.name)
    
    41
    +            name = request.name
    
    42
    +            operation_name = self._get_operation_name(name)
    
    43
    +
    
    44
    +            instance = self._get_instance(name)
    
    45
    +
    
    46
    +            operation = instance.get_operation(operation_name)
    
    47
    +            operation.name = name
    
    48
    +            return operation
    
    40 49
     
    
    41 50
             except InvalidArgumentError as e:
    
    42 51
                 self.logger.error(e)
    
    43 52
                 context.set_details(str(e))
    
    44 53
                 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    45
    -            return operations_pb2.Operation()
    
    54
    +
    
    55
    +        return operations_pb2.Operation()
    
    46 56
     
    
    47 57
         def ListOperations(self, request, context):
    
    48
    -        return self._instance.list_operations(request.name,
    
    49
    -                                              request.filter,
    
    58
    +        try:
    
    59
    +            # Name should be the collection name
    
    60
    +            # Or in this case, the instance_name
    
    61
    +            name = request.name
    
    62
    +            instance = self._get_instance(name)
    
    63
    +
    
    64
    +            result = instance.list_operations(request.filter,
    
    50 65
                                                   request.page_size,
    
    51 66
                                                   request.page_token)
    
    52 67
     
    
    68
    +            for operation in result.operations:
    
    69
    +                operation.name = "{}/{}".format(name, operation.name)
    
    70
    +
    
    71
    +            return result
    
    72
    +
    
    73
    +        except InvalidArgumentError as e:
    
    74
    +            self.logger.error(e)
    
    75
    +            context.set_details(str(e))
    
    76
    +            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    77
    +
    
    78
    +        return operations_pb2.ListOperationsResponse()
    
    79
    +
    
    53 80
         def DeleteOperation(self, request, context):
    
    54 81
             try:
    
    55
    -            return self._instance.delete_operation(request.name)
    
    82
    +            name = request.name
    
    83
    +            operation_name = self._get_operation_name(name)
    
    84
    +
    
    85
    +            instance = self._get_instance(name)
    
    86
    +
    
    87
    +            instance.delete_operation(operation_name)
    
    56 88
     
    
    57 89
             except InvalidArgumentError as e:
    
    58 90
                 self.logger.error(e)
    
    59 91
                 context.set_details(str(e))
    
    60 92
                 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    61
    -            return operations_pb2.Operation()
    
    93
    +
    
    94
    +        return Empty()
    
    62 95
     
    
    63 96
         def CancelOperation(self, request, context):
    
    64 97
             try:
    
    65
    -            return self._instance.cancel_operation(request.name)
    
    98
    +            name = request.name
    
    99
    +            operation_name = self._get_operation_name(name)
    
    100
    +
    
    101
    +            instance = self._get_instance(name)
    
    102
    +
    
    103
    +            instance.cancel_operation(operation_name)
    
    66 104
     
    
    67 105
             except NotImplementedError as e:
    
    68 106
                 self.logger.error(e)
    
    69 107
                 context.set_details(str(e))
    
    70 108
                 context.set_code(grpc.StatusCode.UNIMPLEMENTED)
    
    71
    -            return operations_pb2.Operation()
    109
    +
    
    110
    +        except InvalidArgumentError as e:
    
    111
    +            self.logger.error(e)
    
    112
    +            context.set_details(str(e))
    
    113
    +            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    114
    +
    
    115
    +        return Empty()
    
    116
    +
    
    117
    +    def _get_operation_name(self, name):
    
    118
    +        return name.split("/")[-1]
    
    119
    +
    
    120
    +    def _get_instance(self, name):
    
    121
    +        try:
    
    122
    +            names = name.split("/")
    
    123
    +
    
    124
    +            # Operation name should be in format:
    
    125
    +            # {instance/name}/{operation_id}
    
    126
    +            instance_name = ''.join(names[0:-1])
    
    127
    +            if not instance_name:
    
    128
    +                return self._instances[name]
    
    129
    +
    
    130
    +            return self._instances[instance_name]
    
    131
    +
    
    132
    +        except KeyError:
    
    133
    +            raise InvalidArgumentError("Instance doesn't exist on server: {}".format(name))

  • buildgrid/server/scheduler.py
    ... ... @@ -90,7 +90,7 @@ class Scheduler:
    90 90
             job.update_execute_stage(ExecuteStage.COMPLETED)
    
    91 91
             self.jobs[name] = job
    
    92 92
             if not job.do_not_cache and self._action_cache is not None:
    
    93
    -            self._action_cache.put_action_result(job.action_digest, result)
    
    93
    +            self._action_cache.update_action_result(job.action_digest, result)
    
    94 94
     
    
    95 95
         def get_operations(self):
    
    96 96
             response = operations_pb2.ListOperationsResponse()
    

  • buildgrid/server/worker/bots_interface.py
    ... ... @@ -54,7 +54,8 @@ class BotsInterface:
    54 54
                 pass
    
    55 55
     
    
    56 56
             # Bot session name, selected by the server
    
    57
    -        name = str(uuid.uuid4())
    
    57
    +        name = "{}/{}".format(parent, str(uuid.uuid4()))
    
    58
    +
    
    58 59
             bot_session.name = name
    
    59 60
     
    
    60 61
             self._bot_ids[name] = bot_id
    

  • buildgrid/server/worker/bots_service.py
    ... ... @@ -33,14 +33,17 @@ from .._exceptions import InvalidArgumentError, OutofSyncError
    33 33
     
    
    34 34
     class BotsService(bots_pb2_grpc.BotsServicer):
    
    35 35
     
    
    36
    -    def __init__(self, instance):
    
    37
    -        self._instance = instance
    
    36
    +    def __init__(self, instances):
    
    37
    +        self._instances = instances
    
    38 38
             self.logger = logging.getLogger(__name__)
    
    39 39
     
    
    40 40
         def CreateBotSession(self, request, context):
    
    41 41
             try:
    
    42
    -            return self._instance.create_bot_session(request.parent,
    
    43
    -                                                     request.bot_session)
    
    42
    +            parent = request.parent
    
    43
    +            instance = self._get_instance(request.parent)
    
    44
    +            return instance.create_bot_session(parent,
    
    45
    +                                               request.bot_session)
    
    46
    +
    
    44 47
             except InvalidArgumentError as e:
    
    45 48
                 self.logger.error(e)
    
    46 49
                 context.set_details(str(e))
    
    ... ... @@ -50,8 +53,15 @@ class BotsService(bots_pb2_grpc.BotsServicer):
    50 53
     
    
    51 54
         def UpdateBotSession(self, request, context):
    
    52 55
             try:
    
    53
    -            return self._instance.update_bot_session(request.name,
    
    54
    -                                                     request.bot_session)
    
    56
    +            names = request.name.split("/")
    
    57
    +            # Operation name should be in format:
    
    58
    +            # {instance/name}/{uuid}
    
    59
    +            instance_name = ''.join(names[0:-1])
    
    60
    +
    
    61
    +            instance = self._get_instance(instance_name)
    
    62
    +            return instance.update_bot_session(request.name,
    
    63
    +                                               request.bot_session)
    
    64
    +
    
    55 65
             except InvalidArgumentError as e:
    
    56 66
                 self.logger.error(e)
    
    57 67
                 context.set_details(str(e))
    
    ... ... @@ -72,3 +82,10 @@ class BotsService(bots_pb2_grpc.BotsServicer):
    72 82
         def PostBotEventTemp(self, request, context):
    
    73 83
             context.set_code(grpc.StatusCode.UNIMPLEMENTED)
    
    74 84
             return Empty()
    
    85
    +
    
    86
    +    def _get_instance(self, name):
    
    87
    +        try:
    
    88
    +            return self._instances[name]
    
    89
    +
    
    90
    +        except KeyError:
    
    91
    +            raise InvalidArgumentError("Instance doesn't exist on server: {}".format(name))

  • buildgrid/utils.py
    ... ... @@ -13,6 +13,7 @@
    13 13
     # limitations under the License.
    
    14 14
     
    
    15 15
     
    
    16
    +from operator import attrgetter
    
    16 17
     import os
    
    17 18
     
    
    18 19
     from buildgrid.settings import HASH
    
    ... ... @@ -31,30 +32,59 @@ def gen_fetch_blob(stub, digest, instance_name=""):
    31 32
             yield response.data
    
    32 33
     
    
    33 34
     
    
    34
    -def write_fetch_directory(directory, stub, digest, instance_name=""):
    
    35
    -    """ Given a directory digest, fetches files and writes them to a directory
    
    35
    +def write_fetch_directory(root_directory, stub, digest, instance_name=None):
    
    36
    +    """Locally replicates a directory from CAS.
    
    37
    +
    
    38
    +    Args:
    
    39
    +        root_directory (str): local directory to populate.
    
    40
    +        stub (): gRPC stub for CAS communication.
    
    41
    +        digest (Digest): digest for the directory to fetch from CAS.
    
    42
    +        instance_name (str, optional): farm instance name to query data from.
    
    36 43
         """
    
    37
    -    # TODO: Extend to symlinks and inner directories
    
    38
    -    # pathlib.Path('/my/directory').mkdir(parents=True, exist_ok=True)
    
    44
    +    if not os.path.isabs(root_directory):
    
    45
    +        root_directory = os.path.abspath(root_directory)
    
    46
    +    if not os.path.exists(root_directory):
    
    47
    +        os.makedirs(root_directory, exist_ok=True)
    
    39 48
     
    
    40
    -    directory_pb2 = remote_execution_pb2.Directory()
    
    41
    -    directory_pb2 = parse_to_pb2_from_fetch(directory_pb2, stub, digest, instance_name)
    
    49
    +    directory = parse_to_pb2_from_fetch(remote_execution_pb2.Directory(),
    
    50
    +                                        stub, digest, instance_name)
    
    51
    +
    
    52
    +    for directory_node in directory.directories:
    
    53
    +        child_path = os.path.join(root_directory, directory_node.name)
    
    54
    +
    
    55
    +        write_fetch_directory(child_path, stub, directory_node.digest, instance_name)
    
    56
    +
    
    57
    +    for file_node in directory.files:
    
    58
    +        child_path = os.path.join(root_directory, file_node.name)
    
    59
    +
    
    60
    +        with open(child_path, 'wb') as child_file:
    
    61
    +            write_fetch_blob(child_file, stub, file_node.digest, instance_name)
    
    62
    +
    
    63
    +    for symlink_node in directory.symlinks:
    
    64
    +        child_path = os.path.join(root_directory, symlink_node.name)
    
    65
    +
    
    66
    +        if os.path.isabs(symlink_node.target):
    
    67
    +            continue  # No out of temp-directory links for now.
    
    68
    +        target_path = os.path.join(root_directory, symlink_node.target)
    
    69
    +
    
    70
    +        os.symlink(child_path, target_path)
    
    42 71
     
    
    43
    -    for file_node in directory_pb2.files:
    
    44
    -        path = os.path.join(directory, file_node.name)
    
    45
    -        with open(path, 'wb') as f:
    
    46
    -            write_fetch_blob(f, stub, file_node.digest, instance_name)
    
    47 72
     
    
    73
    +def write_fetch_blob(target_file, stub, digest, instance_name=None):
    
    74
    +    """Extracts a blob from CAS into a local file.
    
    48 75
     
    
    49
    -def write_fetch_blob(out, stub, digest, instance_name=""):
    
    50
    -    """ Given an output buffer, fetches blob and writes to buffer
    
    76
    +    Args:
    
    77
    +        target_file (str): local file to write.
    
    78
    +        stub (): gRPC stub for CAS communication.
    
    79
    +        digest (Digest): digest for the blob to fetch from CAS.
    
    80
    +        instance_name (str, optional): farm instance name to query data from.
    
    51 81
         """
    
    52 82
     
    
    53 83
         for stream in gen_fetch_blob(stub, digest, instance_name):
    
    54
    -        out.write(stream)
    
    84
    +        target_file.write(stream)
    
    85
    +    target_file.flush()
    
    55 86
     
    
    56
    -    out.flush()
    
    57
    -    assert digest.size_bytes == os.fstat(out.fileno()).st_size
    
    87
    +    assert digest.size_bytes == os.fstat(target_file.fileno()).st_size
    
    58 88
     
    
    59 89
     
    
    60 90
     def parse_to_pb2_from_fetch(pb2, stub, digest, instance_name=""):
    
    ... ... @@ -70,7 +100,15 @@ def parse_to_pb2_from_fetch(pb2, stub, digest, instance_name=""):
    70 100
     
    
    71 101
     
    
    72 102
     def create_digest(bytes_to_digest):
    
    73
    -    """ Creates a hash based on the hex digest and returns the digest
    
    103
    +    """Computes the :obj:`Digest` of a piece of data.
    
    104
    +
    
    105
    +    The :obj:`Digest` of a data is a function of its hash **and** size.
    
    106
    +
    
    107
    +    Args:
    
    108
    +        bytes_to_digest (bytes): byte data to digest.
    
    109
    +
    
    110
    +    Returns:
    
    111
    +        :obj:`Digest`: The gRPC :obj:`Digest` for the given byte data.
    
    74 112
         """
    
    75 113
         return remote_execution_pb2.Digest(hash=HASH(bytes_to_digest).hexdigest(),
    
    76 114
                                            size_bytes=len(bytes_to_digest))
    
    ... ... @@ -107,6 +145,183 @@ def file_maker(file_path, file_digest):
    107 145
                                              is_executable=os.access(file_path, os.X_OK))
    
    108 146
     
    
    109 147
     
    
    110
    -def read_file(read):
    
    111
    -    with open(read, 'rb') as f:
    
    112
    -        return f.read()
    148
    +def directory_maker(directory_path):
    
    149
    +    """
    
    150
    +    """
    
    151
    +    if not os.path.isabs(directory_path):
    
    152
    +        directory_path = os.path.abspath(directory_path)
    
    153
    +
    
    154
    +    child_directories = list()
    
    155
    +    update_requests = list()
    
    156
    +
    
    157
    +    files, directories, symlinks = list(), list(), list()
    
    158
    +    for directory_entry in os.scandir(directory_path):
    
    159
    +        # Create a FileNode and corresponding BatchUpdateBlobsRequest:
    
    160
    +        if directory_entry.is_file(follow_symlinks=False):
    
    161
    +            node_blob = read_file(directory_entry.path)
    
    162
    +            node_digest = create_digest(node_blob)
    
    163
    +
    
    164
    +            node = remote_execution_pb2.FileNode()
    
    165
    +            node.name = directory_entry.name
    
    166
    +            node.digest = node_digest
    
    167
    +            node.is_executable = os.access(directory_entry.path, os.X_OK)
    
    168
    +
    
    169
    +            node_request = remote_execution_pb2.BatchUpdateBlobsRequest.Request(digest=node_digest)
    
    170
    +            node_request.data = node_blob
    
    171
    +
    
    172
    +            update_requests.append(node_request)
    
    173
    +            files.append(node)
    
    174
    +
    
    175
    +        # Create a DirectoryNode and corresponding BatchUpdateBlobsRequest:
    
    176
    +        elif directory_entry.is_dir(follow_symlinks=False):
    
    177
    +            node_directory, node_children, node_requests = directory_maker(directory_entry.path)
    
    178
    +
    
    179
    +            node = remote_execution_pb2.DirectoryNode()
    
    180
    +            node.name = directory_entry.name
    
    181
    +            node.digest = node_requests[-1].digest
    
    182
    +
    
    183
    +            child_directories.extend(node_children)
    
    184
    +            child_directories.append(node_directory)
    
    185
    +            update_requests.expend(node_requests)
    
    186
    +            directories.append(node)
    
    187
    +
    
    188
    +        # Create a SymlinkNode if necessary;
    
    189
    +        elif os.path.islink(directory_entry.path):
    
    190
    +            node_target = os.readlink(directory_entry.path)
    
    191
    +
    
    192
    +            node = remote_execution_pb2.SymlinkNode()
    
    193
    +            node.name = directory_entry.name
    
    194
    +            node.target = node_target
    
    195
    +
    
    196
    +            symlinks.append(node)
    
    197
    +
    
    198
    +    directory = remote_execution_pb2.Directory()
    
    199
    +    directory.files.extend(files.sort(key=attrgetter('name')))
    
    200
    +    directory.directories.extend(directories.sort(key=attrgetter('name')))
    
    201
    +    directory.symlinks.extend(symlinks.sort(key=attrgetter('name')))
    
    202
    +
    
    203
    +    directory_blob = directory.SerializeToString()
    
    204
    +    directory_digest = create_digest(directory_blob)
    
    205
    +
    
    206
    +    update_request = remote_execution_pb2.BatchUpdateBlobsRequest.Request(digest=directory_digest)
    
    207
    +    update_request.data = directory_blob
    
    208
    +
    
    209
    +    update_requests.append(update_request)
    
    210
    +
    
    211
    +    return directory, child_directories, update_requests
    
    212
    +
    
    213
    +
    
    214
    +def read_file(file_path):
    
    215
    +    """Loads raw file content in memory.
    
    216
    +
    
    217
    +    Returns:
    
    218
    +        bytes: Raw file's content until EOF.
    
    219
    +
    
    220
    +    Raises:
    
    221
    +        OSError: If `file_path` does not exist or is not readable.
    
    222
    +    """
    
    223
    +    with open(file_path, 'rb') as byte_file:
    
    224
    +        return byte_file.read()
    
    225
    +
    
    226
    +
    
    227
    +def output_file_maker(file_path, input_path):
    
    228
    +    """Creates an :obj:`OutputFile` from a local file.
    
    229
    +
    
    230
    +    `file_path` **must** point inside or be relative to `input_path`.
    
    231
    +
    
    232
    +    Args:
    
    233
    +        file_path (str): absolute or relative path to a local file.
    
    234
    +        input_path (str): absolute or relative path to the input root directory.
    
    235
    +
    
    236
    +    Returns:
    
    237
    +        :obj:`OutputFile`, :obj:`BatchUpdateBlobsRequest`: Tuple of a new gRPC
    
    238
    +        :obj:`OutputFile` object for the file pointed by `file_path` and the
    
    239
    +        corresponding :obj:`BatchUpdateBlobsRequest` for CAS upload.
    
    240
    +    """
    
    241
    +    if not os.path.isabs(file_path):
    
    242
    +        file_path = os.path.abspath(file_path)
    
    243
    +    if not os.path.isabs(input_path):
    
    244
    +        input_path = os.path.abspath(input_path)
    
    245
    +
    
    246
    +    file_blob = read_file(file_path)
    
    247
    +    file_digest = create_digest(file_blob)
    
    248
    +
    
    249
    +    output_file = remote_execution_pb2.OutputFile(digest=file_digest)
    
    250
    +    output_file.path = os.path.relpath(file_path, start=input_path)
    
    251
    +    output_file.is_executable = os.access(file_path, os.X_OK)
    
    252
    +
    
    253
    +    update_request = remote_execution_pb2.BatchUpdateBlobsRequest.Request(digest=file_digest)
    
    254
    +    update_request.data = file_blob
    
    255
    +
    
    256
    +    return output_file, update_request
    
    257
    +
    
    258
    +
    
    259
    +def output_directory_maker(directory_path, working_path):
    
    260
    +    """Creates a gRPC :obj:`OutputDirectory` from a local directory.
    
    261
    +
    
    262
    +    `directory_path` **must** point inside or be relative to `input_path`.
    
    263
    +
    
    264
    +    Args:
    
    265
    +        directory_path (str): absolute or relative path to a local directory.
    
    266
    +        working_path (str): absolute or relative path to the working directory.
    
    267
    +
    
    268
    +    Returns:
    
    269
    +        :obj:`OutputDirectory`, :obj:`BatchUpdateBlobsRequest`: Tuple of a new
    
    270
    +        gRPC :obj:`OutputDirectory` for the directory pointed by
    
    271
    +        `directory_path` and the corresponding list of
    
    272
    +        :obj:`BatchUpdateBlobsRequest` for CAS upload.
    
    273
    +    """
    
    274
    +    if not os.path.isabs(directory_path):
    
    275
    +        directory_path = os.path.abspath(directory_path)
    
    276
    +    if not os.path.isabs(working_path):
    
    277
    +        working_path = os.path.abspath(working_path)
    
    278
    +
    
    279
    +    tree, update_requests = tree_maker(directory_path)
    
    280
    +
    
    281
    +    output_directory = remote_execution_pb2.OutputDirectory()
    
    282
    +    output_directory.tree_digest = update_requests[-1].digest
    
    283
    +    output_directory.path = os.path.relpath(directory_path, start=working_path)
    
    284
    +
    
    285
    +    output_directory_blob = output_directory.SerializeToString()
    
    286
    +    output_directory_digest = create_digest(output_directory_blob)
    
    287
    +
    
    288
    +    update_request = remote_execution_pb2.BatchUpdateBlobsRequest.Request(digest=output_directory_digest)
    
    289
    +    update_request.data = output_directory_blob
    
    290
    +
    
    291
    +    update_requests.append(update_request)
    
    292
    +
    
    293
    +    return output_directory, update_requests
    
    294
    +
    
    295
    +
    
    296
    +def tree_maker(directory_path):
    
    297
    +    """Creates a gRPC :obj:`Tree` from a local directory.
    
    298
    +
    
    299
    +    Args:
    
    300
    +        directory_path (str): absolute or relative path to a local directory.
    
    301
    +
    
    302
    +    Returns:
    
    303
    +        :obj:`Tree`, :obj:`BatchUpdateBlobsRequest`: Tuple of a new
    
    304
    +        gRPC :obj:`Tree` for the directory pointed by `directory_path` and the
    
    305
    +        corresponding list of :obj:`BatchUpdateBlobsRequest` for CAS upload.
    
    306
    +
    
    307
    +        The :obj:`BatchUpdateBlobsRequest` list may come in any order. However,
    
    308
    +        its last element is guaranteed to be the :obj:`Tree`'s request.
    
    309
    +    """
    
    310
    +    if not os.path.isabs(directory_path):
    
    311
    +        directory_path = os.path.abspath(directory_path)
    
    312
    +
    
    313
    +    directory, child_directories, update_requests = directory_maker(directory_path)
    
    314
    +
    
    315
    +    tree = remote_execution_pb2.Tree()
    
    316
    +    tree.children.expend([child_directories])
    
    317
    +    tree.root = directory
    
    318
    +
    
    319
    +    tree_blob = tree.SerializeToString()
    
    320
    +    tree_digest = create_digest(file_blob)
    
    321
    +
    
    322
    +    update_request = remote_execution_pb2.BatchUpdateBlobsRequest.Request(digest=tree_digest)
    
    323
    +    update_request.data = tree_blob
    
    324
    +
    
    325
    +    update_requests.append(update_request)
    
    326
    +
    
    327
    +    return tree, update_requests

  • tests/integration/bots_service.py
    ... ... @@ -18,7 +18,6 @@
    18 18
     # pylint: disable=redefined-outer-name
    
    19 19
     
    
    20 20
     import copy
    
    21
    -import uuid
    
    22 21
     from unittest import mock
    
    23 22
     
    
    24 23
     import grpc
    
    ... ... @@ -27,7 +26,7 @@ import pytest
    27 26
     
    
    28 27
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    29 28
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    30
    -from buildgrid.server import scheduler, job
    
    29
    +from buildgrid.server import job, buildgrid_instance
    
    31 30
     from buildgrid.server.job import LeaseState
    
    32 31
     from buildgrid.server.worker import bots_interface, bots_service
    
    33 32
     
    
    ... ... @@ -53,8 +52,8 @@ def bot_session():
    53 52
     
    
    54 53
     
    
    55 54
     @pytest.fixture
    
    56
    -def schedule():
    
    57
    -    yield scheduler.Scheduler()
    
    55
    +def buildgrid():
    
    56
    +    yield buildgrid_instance.BuildGridInstance()
    
    58 57
     
    
    59 58
     
    
    60 59
     @pytest.fixture
    
    ... ... @@ -64,19 +63,17 @@ def bots(schedule):
    64 63
     
    
    65 64
     # Instance to test
    
    66 65
     @pytest.fixture
    
    67
    -def instance(bots):
    
    68
    -    yield bots_service.BotsService(bots)
    
    66
    +def instance(buildgrid):
    
    67
    +    instances = {"": buildgrid}
    
    68
    +    yield bots_service.BotsService(instances)
    
    69 69
     
    
    70 70
     
    
    71 71
     def test_create_bot_session(bot_session, context, instance):
    
    72
    -    parent = 'rach'
    
    73
    -    request = bots_pb2.CreateBotSessionRequest(parent=parent,
    
    74
    -                                               bot_session=bot_session)
    
    72
    +    request = bots_pb2.CreateBotSessionRequest(bot_session=bot_session)
    
    75 73
     
    
    76 74
         response = instance.CreateBotSession(request, context)
    
    77 75
     
    
    78 76
         assert isinstance(response, bots_pb2.BotSession)
    
    79
    -    assert uuid.UUID(response.name, version=4)
    
    80 77
         assert bot_session.bot_id == response.bot_id
    
    81 78
     
    
    82 79
     
    
    ... ... @@ -92,8 +89,7 @@ def test_create_bot_session_bot_id_fail(context, instance):
    92 89
     
    
    93 90
     
    
    94 91
     def test_update_bot_session(bot_session, context, instance):
    
    95
    -    request = bots_pb2.CreateBotSessionRequest(parent='',
    
    96
    -                                               bot_session=bot_session)
    
    92
    +    request = bots_pb2.CreateBotSessionRequest(bot_session=bot_session)
    
    97 93
         bot = instance.CreateBotSession(request, context)
    
    98 94
     
    
    99 95
         request = bots_pb2.UpdateBotSessionRequest(name=bot.name,
    
    ... ... @@ -106,8 +102,7 @@ def test_update_bot_session(bot_session, context, instance):
    106 102
     
    
    107 103
     
    
    108 104
     def test_update_bot_session_zombie(bot_session, context, instance):
    
    109
    -    request = bots_pb2.CreateBotSessionRequest(parent='',
    
    110
    -                                               bot_session=bot_session)
    
    105
    +    request = bots_pb2.CreateBotSessionRequest(bot_session=bot_session)
    
    111 106
         bot = instance.CreateBotSession(request, context)
    
    112 107
         # Update server with incorrect UUID by rotating it
    
    113 108
         bot.name = bot.name[len(bot.name): 0]
    
    ... ... @@ -121,8 +116,7 @@ def test_update_bot_session_zombie(bot_session, context, instance):
    121 116
     
    
    122 117
     
    
    123 118
     def test_update_bot_session_bot_id_fail(bot_session, context, instance):
    
    124
    -    request = bots_pb2.UpdateBotSessionRequest(name='ana',
    
    125
    -                                               bot_session=bot_session)
    
    119
    +    request = bots_pb2.UpdateBotSessionRequest(bot_session=bot_session)
    
    126 120
     
    
    127 121
         instance.UpdateBotSession(request, context)
    
    128 122
     
    
    ... ... @@ -131,17 +125,15 @@ def test_update_bot_session_bot_id_fail(bot_session, context, instance):
    131 125
     
    
    132 126
     @pytest.mark.parametrize("number_of_jobs", [0, 1, 3, 500])
    
    133 127
     def test_number_of_leases(number_of_jobs, bot_session, context, instance):
    
    134
    -    request = bots_pb2.CreateBotSessionRequest(parent='',
    
    135
    -                                               bot_session=bot_session)
    
    128
    +    request = bots_pb2.CreateBotSessionRequest(bot_session=bot_session)
    
    136 129
         # Inject work
    
    137 130
         for _ in range(0, number_of_jobs):
    
    138 131
             action_digest = remote_execution_pb2.Digest()
    
    139
    -        instance._instance._scheduler.append_job(job.Job(action_digest))
    
    132
    +        instance._instances[""].execute(action_digest, True)
    
    140 133
     
    
    141 134
         response = instance.CreateBotSession(request, context)
    
    142 135
     
    
    143 136
         assert len(response.leases) == number_of_jobs
    
    144
    -    assert isinstance(response, bots_pb2.BotSession)
    
    145 137
     
    
    146 138
     
    
    147 139
     def test_update_leases_with_work(bot_session, context, instance):
    
    ... ... @@ -149,7 +141,7 @@ def test_update_leases_with_work(bot_session, context, instance):
    149 141
                                                    bot_session=bot_session)
    
    150 142
         # Inject work
    
    151 143
         action_digest = remote_execution_pb2.Digest(hash='gaff')
    
    152
    -    instance._instance._scheduler.append_job(job.Job(action_digest))
    
    144
    +    instance._instances[""].execute(action_digest, True)
    
    153 145
     
    
    154 146
         response = instance.CreateBotSession(request, context)
    
    155 147
     
    
    ... ... @@ -159,7 +151,6 @@ def test_update_leases_with_work(bot_session, context, instance):
    159 151
     
    
    160 152
         assert isinstance(response, bots_pb2.BotSession)
    
    161 153
         assert response.leases[0].state == LeaseState.PENDING.value
    
    162
    -    assert uuid.UUID(response.leases[0].id, version=4)
    
    163 154
         assert response_action == action_digest
    
    164 155
     
    
    165 156
     
    
    ... ... @@ -172,7 +163,7 @@ def test_update_leases_work_complete(bot_session, context, instance):
    172 163
     
    
    173 164
         # Inject work
    
    174 165
         action_digest = remote_execution_pb2.Digest(hash='gaff')
    
    175
    -    instance._instance._scheduler.append_job(job.Job(action_digest))
    
    166
    +    instance._instances[""].execute(action_digest, True)
    
    176 167
     
    
    177 168
         request = bots_pb2.UpdateBotSessionRequest(name=response.name,
    
    178 169
                                                    bot_session=response)
    
    ... ... @@ -200,7 +191,7 @@ def test_work_rejected_by_bot(bot_session, context, instance):
    200 191
                                                    bot_session=bot_session)
    
    201 192
         # Inject work
    
    202 193
         action_digest = remote_execution_pb2.Digest(hash='gaff')
    
    203
    -    instance._instance._scheduler.append_job(job.Job(action_digest))
    
    194
    +    instance._instances[""].execute(action_digest, True)
    
    204 195
     
    
    205 196
         # Simulated the severed binding between client and server
    
    206 197
         response = copy.deepcopy(instance.CreateBotSession(request, context))
    
    ... ... @@ -222,7 +213,8 @@ def test_work_out_of_sync_from_pending(state, bot_session, context, instance):
    222 213
                                                    bot_session=bot_session)
    
    223 214
         # Inject work
    
    224 215
         action_digest = remote_execution_pb2.Digest(hash='gaff')
    
    225
    -    instance._instance._scheduler.append_job(job.Job(action_digest))
    
    216
    +    instance._instances[""].execute(action_digest, True)
    
    217
    +
    
    226 218
         # Simulated the severed binding between client and server
    
    227 219
         response = copy.deepcopy(instance.CreateBotSession(request, context))
    
    228 220
     
    
    ... ... @@ -242,7 +234,8 @@ def test_work_out_of_sync_from_active(state, bot_session, context, instance):
    242 234
                                                    bot_session=bot_session)
    
    243 235
         # Inject work
    
    244 236
         action_digest = remote_execution_pb2.Digest(hash='gaff')
    
    245
    -    instance._instance._scheduler.append_job(job.Job(action_digest))
    
    237
    +    instance._instances[""].execute(action_digest, True)
    
    238
    +
    
    246 239
         # Simulated the severed binding between client and server
    
    247 240
         response = copy.deepcopy(instance.CreateBotSession(request, context))
    
    248 241
     
    
    ... ... @@ -268,7 +261,8 @@ def test_work_active_to_active(bot_session, context, instance):
    268 261
                                                    bot_session=bot_session)
    
    269 262
         # Inject work
    
    270 263
         action_digest = remote_execution_pb2.Digest(hash='gaff')
    
    271
    -    instance._instance._scheduler.append_job(job.Job(action_digest))
    
    264
    +    instance._instances[""].execute(action_digest, True)
    
    265
    +
    
    272 266
         # Simulated the severed binding between client and server
    
    273 267
         response = copy.deepcopy(instance.CreateBotSession(request, context))
    
    274 268
     
    

  • tests/integration/execution_service.py
    ... ... @@ -20,15 +20,17 @@
    20 20
     import uuid
    
    21 21
     from unittest import mock
    
    22 22
     
    
    23
    +import grpc
    
    23 24
     from grpc._server import _Context
    
    24 25
     import pytest
    
    26
    +from google.protobuf import any_pb2
    
    25 27
     
    
    26 28
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    27 29
     from buildgrid._protos.google.longrunning import operations_pb2
    
    28 30
     
    
    29
    -from buildgrid.server import scheduler, job
    
    31
    +from buildgrid.server import job, buildgrid_instance
    
    30 32
     from buildgrid.server.cas.storage import lru_memory_cache
    
    31
    -from buildgrid.server.execution import action_cache, execution_instance, execution_service
    
    33
    +from buildgrid.server.execution import action_cache, execution_service
    
    32 34
     
    
    33 35
     
    
    34 36
     @pytest.fixture
    
    ... ... @@ -38,19 +40,21 @@ def context():
    38 40
     
    
    39 41
     
    
    40 42
     @pytest.fixture(params=["action-cache", "no-action-cache"])
    
    41
    -def execution(request):
    
    43
    +def buildgrid(request):
    
    42 44
         if request.param == "action-cache":
    
    43 45
             storage = lru_memory_cache.LRUMemoryCache(1024 * 1024)
    
    44 46
             cache = action_cache.ActionCache(storage, 50)
    
    45
    -        schedule = scheduler.Scheduler(cache)
    
    46
    -        return execution_instance.ExecutionInstance(schedule, storage)
    
    47
    -    return execution_instance.ExecutionInstance(scheduler.Scheduler())
    
    47
    +
    
    48
    +        return buildgrid_instance.BuildGridInstance(action_cache=cache,
    
    49
    +                                                    cas_storage=storage)
    
    50
    +    return buildgrid_instance.BuildGridInstance()
    
    48 51
     
    
    49 52
     
    
    50 53
     # Instance to test
    
    51 54
     @pytest.fixture
    
    52
    -def instance(execution):
    
    53
    -    yield execution_service.ExecutionService(execution)
    
    55
    +def instance(buildgrid):
    
    56
    +    instances = {"": buildgrid}
    
    57
    +    yield execution_service.ExecutionService(instances)
    
    54 58
     
    
    55 59
     
    
    56 60
     @pytest.mark.parametrize("skip_cache_lookup", [True, False])
    
    ... ... @@ -72,23 +76,45 @@ def test_execute(skip_cache_lookup, instance, context):
    72 76
         assert result.done is False
    
    73 77
     
    
    74 78
     
    
    75
    -# def test_wait_execution(instance, context):
    
    76
    -    # TODO: Figure out why next(response) hangs on the .get()
    
    77
    -    # method when running in pytest.
    
    78
    -#     action_digest = remote_execution_pb2.Digest()
    
    79
    -#     action_digest.hash = 'zhora'
    
    79
    +def test_wrong_execute_instance(instance, context):
    
    80
    +    request = remote_execution_pb2.ExecuteRequest(instance_name='blade')
    
    81
    +    response = instance.Execute(request, context)
    
    82
    +
    
    83
    +    next(response)
    
    84
    +    context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
    
    85
    +
    
    86
    +
    
    87
    +def test_wait_execution(instance, buildgrid, context):
    
    88
    +    action_digest = remote_execution_pb2.Digest()
    
    89
    +    action_digest.hash = 'zhora'
    
    90
    +
    
    91
    +    j = job.Job(action_digest, None)
    
    92
    +    j._operation.done = True
    
    93
    +
    
    94
    +    request = remote_execution_pb2.WaitExecutionRequest(name="{}/{}".format('', j.name))
    
    80 95
     
    
    81
    -#     j = job.Job(action_digest, None)
    
    82
    -#     j._operation.done = True
    
    96
    +    buildgrid._scheduler.jobs[j.name] = j
    
    83 97
     
    
    84
    -#     request = remote_execution_pb2.WaitExecutionRequest(name=j.name)
    
    98
    +    action_result_any = any_pb2.Any()
    
    99
    +    action_result = remote_execution_pb2.ActionResult()
    
    100
    +    action_result_any.Pack(action_result)
    
    85 101
     
    
    86
    -#     instance._instance._scheduler.jobs[j.name] = j
    
    102
    +    j.update_execute_stage(job.ExecuteStage.COMPLETED)
    
    103
    +
    
    104
    +    response = instance.WaitExecution(request, context)
    
    105
    +
    
    106
    +    result = next(response)
    
    107
    +
    
    108
    +    assert isinstance(result, operations_pb2.Operation)
    
    109
    +    metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    110
    +    result.metadata.Unpack(metadata)
    
    111
    +    assert metadata.stage == job.ExecuteStage.COMPLETED.value
    
    112
    +    assert uuid.UUID(result.name, version=4)
    
    113
    +    assert result.done is True
    
    87 114
     
    
    88
    -#     action_result_any = any_pb2.Any()
    
    89
    -#     action_result = remote_execution_pb2.ActionResult()
    
    90
    -#     action_result_any.Pack(action_result)
    
    91 115
     
    
    92
    -#     instance._instance._scheduler._update_execute_stage(j, job.ExecuteStage.COMPLETED)
    
    116
    +def test_wrong_instance_wait_execution(instance, buildgrid, context):
    
    117
    +    request = remote_execution_pb2.WaitExecutionRequest(name="blade")
    
    118
    +    next(instance.WaitExecution(request, context))
    
    93 119
     
    
    94
    -#     response = instance.WaitExecution(request, context)
    120
    +    context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)

  • tests/integration/operations_service.py
    ... ... @@ -28,10 +28,13 @@ from google.protobuf import any_pb2
    28 28
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    29 29
     from buildgrid._protos.google.longrunning import operations_pb2
    
    30 30
     
    
    31
    -from buildgrid.server import scheduler
    
    31
    +from buildgrid.server import buildgrid_instance
    
    32 32
     from buildgrid.server._exceptions import InvalidArgumentError
    
    33 33
     
    
    34
    -from buildgrid.server.execution import execution_instance, operations_service
    
    34
    +from buildgrid.server.execution import operations_service
    
    35
    +
    
    36
    +
    
    37
    +instance_name = "blade"
    
    35 38
     
    
    36 39
     
    
    37 40
     # Can mock this
    
    ... ... @@ -52,65 +55,80 @@ def execute_request():
    52 55
     
    
    53 56
     
    
    54 57
     @pytest.fixture
    
    55
    -def schedule():
    
    56
    -    yield scheduler.Scheduler()
    
    57
    -
    
    58
    -
    
    59
    -@pytest.fixture
    
    60
    -def execution(schedule):
    
    61
    -    yield execution_instance.ExecutionInstance(schedule)
    
    58
    +def buildgrid():
    
    59
    +    yield buildgrid_instance.BuildGridInstance()
    
    62 60
     
    
    63 61
     
    
    64 62
     # Instance to test
    
    65 63
     @pytest.fixture
    
    66
    -def instance(execution):
    
    67
    -    yield operations_service.OperationsService(execution)
    
    64
    +def instance(buildgrid):
    
    65
    +    instances = {instance_name: buildgrid}
    
    66
    +    yield operations_service.OperationsService(instances)
    
    68 67
     
    
    69 68
     
    
    70 69
     # Queue an execution, get operation corresponding to that request
    
    71
    -def test_get_operation(instance, execute_request, context):
    
    72
    -    response_execute = instance._instance.execute(execute_request.action_digest,
    
    73
    -                                                  execute_request.skip_cache_lookup)
    
    70
    +def test_get_operation(instance, buildgrid, execute_request, context):
    
    71
    +    response_execute = buildgrid.execute(execute_request.action_digest,
    
    72
    +                                         execute_request.skip_cache_lookup)
    
    74 73
     
    
    75 74
         request = operations_pb2.GetOperationRequest()
    
    76 75
     
    
    77
    -    request.name = response_execute.name
    
    76
    +    request.name = "{}/{}".format(instance_name, response_execute.name)
    
    78 77
     
    
    79 78
         response = instance.GetOperation(request, context)
    
    80 79
         assert response is response_execute
    
    81 80
     
    
    82 81
     
    
    83 82
     def test_get_operation_fail(instance, context):
    
    83
    +    request = operations_pb2.GetOperationRequest()
    
    84
    +    request.name = "{}/{}".format(instance_name, "runner")
    
    85
    +    instance.GetOperation(request, context)
    
    86
    +
    
    87
    +    context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
    
    88
    +
    
    89
    +
    
    90
    +def test_get_operation_instance_fail(instance, context):
    
    84 91
         request = operations_pb2.GetOperationRequest()
    
    85 92
         instance.GetOperation(request, context)
    
    86 93
     
    
    87 94
         context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
    
    88 95
     
    
    89 96
     
    
    90
    -def test_list_operations(instance, execute_request, context):
    
    91
    -    response_execute = instance._instance.execute(execute_request.action_digest,
    
    92
    -                                                  execute_request.skip_cache_lookup)
    
    97
    +def test_list_operations(instance, buildgrid, execute_request, context):
    
    98
    +    response_execute = buildgrid.execute(execute_request.action_digest,
    
    99
    +                                         execute_request.skip_cache_lookup)
    
    93 100
     
    
    94
    -    request = operations_pb2.ListOperationsRequest()
    
    101
    +    request = operations_pb2.ListOperationsRequest(name=instance_name)
    
    95 102
         response = instance.ListOperations(request, context)
    
    96 103
     
    
    97
    -    assert response.operations[0].name == response_execute.name
    
    104
    +    assert response.operations[0].name.split('/')[-1] == response_execute.name
    
    105
    +
    
    98 106
     
    
    107
    +def test_list_operations_instance_fail(instance, buildgrid, execute_request, context):
    
    108
    +    buildgrid.execute(execute_request.action_digest,
    
    109
    +                      execute_request.skip_cache_lookup)
    
    99 110
     
    
    100
    -def test_list_operations_with_result(instance, execute_request, context):
    
    101
    -    response_execute = instance._instance.execute(execute_request.action_digest,
    
    102
    -                                                  execute_request.skip_cache_lookup)
    
    111
    +    request = operations_pb2.ListOperationsRequest()
    
    112
    +    instance.ListOperations(request, context)
    
    113
    +
    
    114
    +    context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
    
    115
    +
    
    116
    +
    
    117
    +def test_list_operations_with_result(instance, buildgrid, execute_request, context):
    
    118
    +    response_execute = buildgrid.execute(execute_request.action_digest,
    
    119
    +                                         execute_request.skip_cache_lookup)
    
    103 120
     
    
    104 121
         action_result = remote_execution_pb2.ActionResult()
    
    105 122
         output_file = remote_execution_pb2.OutputFile(path='unicorn')
    
    106 123
         action_result.output_files.extend([output_file])
    
    107 124
     
    
    108
    -    instance._instance._scheduler.job_complete(response_execute.name, _pack_any(action_result))
    
    125
    +    buildgrid._scheduler.job_complete(response_execute.name,
    
    126
    +                                      _pack_any(action_result))
    
    109 127
     
    
    110
    -    request = operations_pb2.ListOperationsRequest()
    
    128
    +    request = operations_pb2.ListOperationsRequest(name=instance_name)
    
    111 129
         response = instance.ListOperations(request, context)
    
    112 130
     
    
    113
    -    assert response.operations[0].name == response_execute.name
    
    131
    +    assert response.operations[0].name.split('/')[-1] == response_execute.name
    
    114 132
     
    
    115 133
         execute_response = remote_execution_pb2.ExecuteResponse()
    
    116 134
         response.operations[0].response.Unpack(execute_response)
    
    ... ... @@ -118,7 +136,7 @@ def test_list_operations_with_result(instance, execute_request, context):
    118 136
     
    
    119 137
     
    
    120 138
     def test_list_operations_empty(instance, context):
    
    121
    -    request = operations_pb2.ListOperationsRequest()
    
    139
    +    request = operations_pb2.ListOperationsRequest(name=instance_name)
    
    122 140
     
    
    123 141
         response = instance.ListOperations(request, context)
    
    124 142
     
    
    ... ... @@ -126,21 +144,23 @@ def test_list_operations_empty(instance, context):
    126 144
     
    
    127 145
     
    
    128 146
     # Send execution off, delete, try to find operation should fail
    
    129
    -def test_delete_operation(instance, execute_request, context):
    
    130
    -    response_execute = instance._instance.execute(execute_request.action_digest,
    
    131
    -                                                  execute_request.skip_cache_lookup)
    
    147
    +def test_delete_operation(instance, buildgrid, execute_request, context):
    
    148
    +    response_execute = buildgrid.execute(execute_request.action_digest,
    
    149
    +                                         execute_request.skip_cache_lookup)
    
    132 150
         request = operations_pb2.DeleteOperationRequest()
    
    133
    -    request.name = response_execute.name
    
    151
    +    request.name = "{}/{}".format(instance_name, response_execute.name)
    
    134 152
         instance.DeleteOperation(request, context)
    
    135 153
     
    
    136 154
         request = operations_pb2.GetOperationRequest()
    
    137
    -    request.name = response_execute.name
    
    155
    +    request.name = "{}/{}".format(instance_name, response_execute.name)
    
    156
    +
    
    138 157
         with pytest.raises(InvalidArgumentError):
    
    139
    -        instance._instance.get_operation(response_execute.name)
    
    158
    +        buildgrid.get_operation(response_execute.name)
    
    140 159
     
    
    141 160
     
    
    142
    -def test_delete_operation_fail(instance, execute_request, context):
    
    161
    +def test_delete_operation_fail(instance, context):
    
    143 162
         request = operations_pb2.DeleteOperationRequest()
    
    163
    +    request.name = "{}/{}".format(instance_name, "runner")
    
    144 164
         instance.DeleteOperation(request, context)
    
    145 165
     
    
    146 166
         context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
    
    ... ... @@ -148,11 +168,19 @@ def test_delete_operation_fail(instance, execute_request, context):
    148 168
     
    
    149 169
     def test_cancel_operation(instance, context):
    
    150 170
         request = operations_pb2.CancelOperationRequest()
    
    171
    +    request.name = "{}/{}".format(instance_name, "runner")
    
    151 172
         instance.CancelOperation(request, context)
    
    152 173
     
    
    153 174
         context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
    
    154 175
     
    
    155 176
     
    
    177
    +def test_cancel_operation_instance_fail(instance, context):
    
    178
    +    request = operations_pb2.CancelOperationRequest()
    
    179
    +    instance.CancelOperation(request, context)
    
    180
    +
    
    181
    +    context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
    
    182
    +
    
    183
    +
    
    156 184
     def _pack_any(pack):
    
    157 185
         some_any = any_pb2.Any()
    
    158 186
         some_any.Pack(pack)
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]