[Notes] [Git][BuildStream/buildstream][jmac/remote_execution_client] 2 commits: tests/artifactcache: Add push unit-tests



Title: GitLab

Martin Blanchard pushed to branch jmac/remote_execution_client at BuildStream / buildstream

Commits:

3 changed files:

Changes:

  • tests/artifactcache/pull.py
    1
    +import hashlib
    
    2
    +import multiprocessing
    
    3
    +import os
    
    4
    +import signal
    
    5
    +
    
    6
    +import pytest
    
    7
    +
    
    8
    +from buildstream import _yaml, _signals, utils
    
    9
    +from buildstream._artifactcache.artifactcache import ArtifactCacheSpec
    
    10
    +from buildstream._artifactcache.cascache import CASCache
    
    11
    +from buildstream._context import Context
    
    12
    +from buildstream._project import Project
    
    13
    +from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    14
    +
    
    15
    +from tests.testutils import cli, create_artifact_share
    
    16
    +
    
    17
    +
    
    18
    +# Project directory
    
    19
    +DATA_DIR = os.path.join(
    
    20
    +    os.path.dirname(os.path.realpath(__file__)),
    
    21
    +    "project",
    
    22
    +)
    
    23
    +
    
    24
    +
    
    25
    +# Handle messages from the pipeline
    
    26
    +def message_handler(message, context):
    
    27
    +    pass
    
    28
    +
    
    29
    +
    
    30
    +def tree_maker(cas, tree, directory):
    
    31
    +    if tree.root.ByteSize() == 0:
    
    32
    +        tree.root.CopyFrom(directory)
    
    33
    +
    
    34
    +    for directory_node in directory.directories:
    
    35
    +        child_directory = tree.children.add()
    
    36
    +
    
    37
    +        with open(cas.objpath(directory_node.digest), 'rb') as f:
    
    38
    +            child_directory.ParseFromString(f.read())
    
    39
    +
    
    40
    +        tree_maker(cas, tree, child_directory)
    
    41
    +
    
    42
    +
    
    43
    +@pytest.mark.datafiles(DATA_DIR)
    
    44
    +def test_pull(cli, tmpdir, datafiles):
    
    45
    +    project_dir = str(datafiles)
    
    46
    +
    
    47
    +    # Set up an artifact cache.
    
    48
    +    with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
    
    49
    +        # Configure artifact share
    
    50
    +        artifact_dir = os.path.join(str(tmpdir), 'cache', 'artifacts')
    
    51
    +        user_config_file = str(tmpdir.join('buildstream.conf'))
    
    52
    +        user_config = {
    
    53
    +            'scheduler': {
    
    54
    +                'pushers': 1
    
    55
    +            },
    
    56
    +            'artifacts': {
    
    57
    +                'url': share.repo,
    
    58
    +                'push': True,
    
    59
    +            }
    
    60
    +        }
    
    61
    +
    
    62
    +        _yaml.dump(_yaml.node_sanitize(user_config), filename=user_config_file)
    
    63
    +        cli.configure(user_config)
    
    64
    +
    
    65
    +        # First build the project with the artifact cache configured
    
    66
    +        result = cli.run(project=project_dir, args=['build', 'target.bst'])
    
    67
    +        result.assert_success()
    
    68
    +
    
    69
    +        # Assert that we are now cached locally
    
    70
    +        assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
    
    71
    +        # Assert that we shared/pushed the cached artifact
    
    72
    +        element_key = cli.get_element_key(project_dir, 'target.bst')
    
    73
    +        assert share.has_artifact('test', 'target.bst', element_key)
    
    74
    +
    
    75
    +        # Delete the artifact locally
    
    76
    +        cli.remove_artifact_from_cache(project_dir, 'target.bst')
    
    77
    +
    
    78
    +        # Assert that we are not cached locally anymore
    
    79
    +        assert cli.get_element_state(project_dir, 'target.bst') != 'cached'
    
    80
    +
    
    81
    +        # Fake minimal context
    
    82
    +        context = Context()
    
    83
    +        context.load(config=user_config_file)
    
    84
    +        context.artifactdir = os.path.join(str(tmpdir), 'cache', 'artifacts')
    
    85
    +        context.set_message_handler(message_handler)
    
    86
    +
    
    87
    +        # Load the project and CAS cache
    
    88
    +        project = Project(project_dir, context)
    
    89
    +        project.ensure_fully_loaded()
    
    90
    +        cas = CASCache(context)
    
    91
    +
    
    92
    +        # Assert that the element's artifact is **not** cached
    
    93
    +        element = project.load_elements(['target.bst'], cas)[0]
    
    94
    +        element_key = cli.get_element_key(project_dir, 'target.bst')
    
    95
    +        assert not cas.contains(element, element_key)
    
    96
    +
    
    97
    +        queue = multiprocessing.Queue()
    
    98
    +        # Use subprocess to avoid creation of gRPC threads in main BuildStream process
    
    99
    +        process = multiprocessing.Process(target=_test_pull,
    
    100
    +                                          args=(user_config_file, project_dir, artifact_dir,
    
    101
    +                                                'target.bst', element_key, queue))
    
    102
    +
    
    103
    +        try:
    
    104
    +            # Keep SIGINT blocked in the child process
    
    105
    +            with _signals.blocked([signal.SIGINT], ignore=False):
    
    106
    +                process.start()
    
    107
    +
    
    108
    +            error = queue.get()
    
    109
    +            process.join()
    
    110
    +        except KeyboardInterrupt:
    
    111
    +            utils._kill_process_tree(process.pid)
    
    112
    +            raise
    
    113
    +
    
    114
    +        assert not error
    
    115
    +        assert cas.contains(element, element_key)
    
    116
    +
    
    117
    +
    
    118
    +def _test_pull(user_config_file, project_dir, artifact_dir,
    
    119
    +               element_name, element_key, queue):
    
    120
    +    # Fake minimal context
    
    121
    +    context = Context()
    
    122
    +    context.load(config=user_config_file)
    
    123
    +    context.artifactdir = artifact_dir
    
    124
    +    context.set_message_handler(message_handler)
    
    125
    +
    
    126
    +    # Load the project manually
    
    127
    +    project = Project(project_dir, context)
    
    128
    +    project.ensure_fully_loaded()
    
    129
    +
    
    130
    +    # Create a local CAS cache handle
    
    131
    +    cas = CASCache(context)
    
    132
    +
    
    133
    +    # Load the target element
    
    134
    +    element = project.load_elements([element_name], cas)[0]
    
    135
    +
    
    136
    +    # Manually setup the CAS remote
    
    137
    +    cas.setup_remotes(use_config=True)
    
    138
    +    cas.initialize_remotes()
    
    139
    +    if not cas.has_push_remotes(element=element):
    
    140
    +        queue.put("")
    
    141
    +
    
    142
    +    # Push the element's artifact
    
    143
    +    if not cas.pull(element, element_key):
    
    144
    +        queue.put("")
    
    145
    +
    
    146
    +    queue.put(None)
    
    147
    +
    
    148
    +
    
    149
    +@pytest.mark.datafiles(DATA_DIR)
    
    150
    +def test_pull_tree(cli, tmpdir, datafiles):
    
    151
    +    project_dir = str(datafiles)
    
    152
    +
    
    153
    +    # Set up an artifact cache.
    
    154
    +    with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
    
    155
    +        # Configure artifact share
    
    156
    +        artifact_dir = os.path.join(str(tmpdir), 'cache', 'artifacts')
    
    157
    +        user_config_file = str(tmpdir.join('buildstream.conf'))
    
    158
    +        user_config = {
    
    159
    +            'scheduler': {
    
    160
    +                'pushers': 1
    
    161
    +            },
    
    162
    +            'artifacts': {
    
    163
    +                'url': share.repo,
    
    164
    +                'push': True,
    
    165
    +            }
    
    166
    +        }
    
    167
    +
    
    168
    +        _yaml.dump(_yaml.node_sanitize(user_config), filename=user_config_file)
    
    169
    +        cli.configure(user_config)
    
    170
    +
    
    171
    +        # First build the project with the artifact cache configured
    
    172
    +        result = cli.run(project=project_dir, args=['build', 'target.bst'])
    
    173
    +        result.assert_success()
    
    174
    +
    
    175
    +        # Assert that we are now cached locally
    
    176
    +        assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
    
    177
    +        # Assert that we shared/pushed the cached artifact
    
    178
    +        element_key = cli.get_element_key(project_dir, 'target.bst')
    
    179
    +        assert share.has_artifact('test', 'target.bst', element_key)
    
    180
    +
    
    181
    +        # Fake minimal context
    
    182
    +        context = Context()
    
    183
    +        context.load(config=user_config_file)
    
    184
    +        context.artifactdir = os.path.join(str(tmpdir), 'cache', 'artifacts')
    
    185
    +        context.set_message_handler(message_handler)
    
    186
    +
    
    187
    +        # Load the project and CAS cache
    
    188
    +        project = Project(project_dir, context)
    
    189
    +        project.ensure_fully_loaded()
    
    190
    +        cas = CASCache(context)
    
    191
    +
    
    192
    +        # Assert that the element's artifact is cached
    
    193
    +        element = project.load_elements(['target.bst'], cas)[0]
    
    194
    +        element_key = cli.get_element_key(project_dir, 'target.bst')
    
    195
    +        assert cas.contains(element, element_key)
    
    196
    +
    
    197
    +        # Retrieve the Directory object from the cached artifact
    
    198
    +        artifact_ref = cas.get_artifact_fullname(element, element_key)
    
    199
    +        artifact_digest = cas.resolve_ref(artifact_ref)
    
    200
    +
    
    201
    +        queue = multiprocessing.Queue()
    
    202
    +        # Use subprocess to avoid creation of gRPC threads in main BuildStream process
    
    203
    +        process = multiprocessing.Process(target=_test_push_tree,
    
    204
    +                                          args=(user_config_file, project_dir, artifact_dir,
    
    205
    +                                                artifact_digest, queue))
    
    206
    +
    
    207
    +        try:
    
    208
    +            # Keep SIGINT blocked in the child process
    
    209
    +            with _signals.blocked([signal.SIGINT], ignore=False):
    
    210
    +                process.start()
    
    211
    +
    
    212
    +            tree_hash, tree_size = queue.get()
    
    213
    +            process.join()
    
    214
    +        except KeyboardInterrupt:
    
    215
    +            utils._kill_process_tree(process.pid)
    
    216
    +            raise
    
    217
    +
    
    218
    +        assert tree_hash and tree_size
    
    219
    +
    
    220
    +        # Now delete the artifact locally
    
    221
    +        cli.remove_artifact_from_cache(project_dir, 'target.bst')
    
    222
    +
    
    223
    +        # Assert that we are not cached locally anymore
    
    224
    +        assert cli.get_element_state(project_dir, 'target.bst') != 'cached'
    
    225
    +
    
    226
    +        tree_digest = remote_execution_pb2.Digest(hash=tree_hash,
    
    227
    +                                                  size_bytes=tree_size)
    
    228
    +
    
    229
    +        queue = multiprocessing.Queue()
    
    230
    +        # Use subprocess to avoid creation of gRPC threads in main BuildStream process
    
    231
    +        process = multiprocessing.Process(target=_test_pull_tree,
    
    232
    +                                          args=(user_config_file, project_dir, artifact_dir,
    
    233
    +                                                tree_digest, queue))
    
    234
    +
    
    235
    +        try:
    
    236
    +            # Keep SIGINT blocked in the child process
    
    237
    +            with _signals.blocked([signal.SIGINT], ignore=False):
    
    238
    +                process.start()
    
    239
    +
    
    240
    +            directory_hash, directory_size = queue.get()
    
    241
    +            process.join()
    
    242
    +        except KeyboardInterrupt:
    
    243
    +            utils._kill_process_tree(process.pid)
    
    244
    +            raise
    
    245
    +
    
    246
    +        assert directory_hash and directory_size
    
    247
    +
    
    248
    +        directory_digest = remote_execution_pb2.Digest(hash=directory_hash,
    
    249
    +                                                       size_bytes=directory_size)
    
    250
    +
    
    251
    +        # Ensure the entire Tree stucture has been pulled
    
    252
    +        assert os.path.exists(cas.objpath(directory_digest))
    
    253
    +
    
    254
    +
    
    255
    +def _test_push_tree(user_config_file, project_dir, artifact_dir, artifact_digest, queue):
    
    256
    +    # Fake minimal context
    
    257
    +    context = Context()
    
    258
    +    context.load(config=user_config_file)
    
    259
    +    context.artifactdir = artifact_dir
    
    260
    +    context.set_message_handler(message_handler)
    
    261
    +
    
    262
    +    # Load the project manually
    
    263
    +    project = Project(project_dir, context)
    
    264
    +    project.ensure_fully_loaded()
    
    265
    +
    
    266
    +    # Create a local CAS cache handle
    
    267
    +    cas = CASCache(context)
    
    268
    +
    
    269
    +    # Manually setup the CAS remote
    
    270
    +    cas.setup_remotes(use_config=True)
    
    271
    +    cas.initialize_remotes()
    
    272
    +    if not cas.has_push_remotes():
    
    273
    +        queue.put("")
    
    274
    +
    
    275
    +    directory = remote_execution_pb2.Directory()
    
    276
    +
    
    277
    +    with open(cas.objpath(artifact_digest), 'rb') as f:
    
    278
    +        directory.ParseFromString(f.read())
    
    279
    +
    
    280
    +    # Build the Tree object while we are still cached
    
    281
    +    tree = remote_execution_pb2.Tree()
    
    282
    +    tree_maker(cas, tree, directory)
    
    283
    +
    
    284
    +    # Push the Tree as a regular message
    
    285
    +    tree_digest = cas.push_message(project, tree)
    
    286
    +
    
    287
    +    queue.put((tree_digest.hash, tree_digest.size_bytes))
    
    288
    +
    
    289
    +
    
    290
    +def _test_pull_tree(user_config_file, project_dir, artifact_dir, artifact_digest, queue):
    
    291
    +    # Fake minimal context
    
    292
    +    context = Context()
    
    293
    +    context.load(config=user_config_file)
    
    294
    +    context.artifactdir = artifact_dir
    
    295
    +    context.set_message_handler(message_handler)
    
    296
    +
    
    297
    +    # Load the project manually
    
    298
    +    project = Project(project_dir, context)
    
    299
    +    project.ensure_fully_loaded()
    
    300
    +
    
    301
    +    # Create a local CAS cache handle
    
    302
    +    cas = CASCache(context)
    
    303
    +
    
    304
    +    # Manually setup the CAS remote
    
    305
    +    cas.setup_remotes(use_config=True)
    
    306
    +    cas.initialize_remotes()
    
    307
    +    if not cas.has_push_remotes():
    
    308
    +        queue.put("")
    
    309
    +
    
    310
    +    # Pull the artifact using the Tree object
    
    311
    +    directory_digest = cas.pull_tree(project, artifact_digest)
    
    312
    +
    
    313
    +    queue.put((directory_digest.hash, directory_digest.size_bytes))

  • tests/artifactcache/push.py
    1
    +import multiprocessing
    
    2
    +import os
    
    3
    +import signal
    
    4
    +
    
    5
    +import pytest
    
    6
    +
    
    7
    +from pluginbase import PluginBase
    
    8
    +from buildstream import _yaml, _signals, utils
    
    9
    +from buildstream._artifactcache.cascache import CASCache
    
    10
    +from buildstream._context import Context
    
    11
    +from buildstream._project import Project
    
    12
    +from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    13
    +from buildstream.storage._casbaseddirectory import CasBasedDirectory
    
    14
    +
    
    15
    +from tests.testutils import cli, create_artifact_share
    
    16
    +
    
    17
    +
    
    18
    +# Project directory
    
    19
    +DATA_DIR = os.path.join(
    
    20
    +    os.path.dirname(os.path.realpath(__file__)),
    
    21
    +    "project",
    
    22
    +)
    
    23
    +
    
    24
    +
    
    25
    +# Handle messages from the pipeline
    
    26
    +def message_handler(message, context):
    
    27
    +    pass
    
    28
    +
    
    29
    +
    
    30
    +@pytest.mark.datafiles(DATA_DIR)
    
    31
    +def test_push(cli, tmpdir, datafiles):
    
    32
    +    project_dir = str(datafiles)
    
    33
    +
    
    34
    +    # First build the project without the artifact cache configured
    
    35
    +    result = cli.run(project=project_dir, args=['build', 'target.bst'])
    
    36
    +    result.assert_success()
    
    37
    +
    
    38
    +    # Assert that we are now cached locally
    
    39
    +    assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
    
    40
    +
    
    41
    +    # Set up an artifact cache.
    
    42
    +    with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
    
    43
    +        # Configure artifact share
    
    44
    +        artifact_dir = os.path.join(str(tmpdir), 'cache', 'artifacts')
    
    45
    +        user_config_file = str(tmpdir.join('buildstream.conf'))
    
    46
    +        user_config = {
    
    47
    +            'scheduler': {
    
    48
    +                'pushers': 1
    
    49
    +            },
    
    50
    +            'artifacts': {
    
    51
    +                'url': share.repo,
    
    52
    +                'push': True,
    
    53
    +            }
    
    54
    +        }
    
    55
    +
    
    56
    +        _yaml.dump(_yaml.node_sanitize(user_config), filename=user_config_file)
    
    57
    +
    
    58
    +        # Fake minimal context
    
    59
    +        context = Context()
    
    60
    +        context.load(config=user_config_file)
    
    61
    +        context.artifactdir = artifact_dir
    
    62
    +        context.set_message_handler(message_handler)
    
    63
    +
    
    64
    +        # Load the project manually
    
    65
    +        project = Project(project_dir, context)
    
    66
    +        project.ensure_fully_loaded()
    
    67
    +
    
    68
    +        # Create a local CAS cache handle
    
    69
    +        cas = CASCache(context)
    
    70
    +
    
    71
    +        # Assert that the element's artifact is cached
    
    72
    +        element = project.load_elements(['target.bst'], cas)[0]
    
    73
    +        element_key = cli.get_element_key(project_dir, 'target.bst')
    
    74
    +        assert cas.contains(element, element_key)
    
    75
    +
    
    76
    +        queue = multiprocessing.Queue()
    
    77
    +        # Use subprocess to avoid creation of gRPC threads in main BuildStream process
    
    78
    +        process = multiprocessing.Process(target=_test_push,
    
    79
    +                                          args=(user_config_file, project_dir, artifact_dir,
    
    80
    +                                                'target.bst', element_key, queue))
    
    81
    +
    
    82
    +        try:
    
    83
    +            # Keep SIGINT blocked in the child process
    
    84
    +            with _signals.blocked([signal.SIGINT], ignore=False):
    
    85
    +                process.start()
    
    86
    +
    
    87
    +            error = queue.get()
    
    88
    +            process.join()
    
    89
    +        except KeyboardInterrupt:
    
    90
    +            utils._kill_process_tree(process.pid)
    
    91
    +            raise
    
    92
    +
    
    93
    +        assert not error
    
    94
    +        assert share.has_artifact('test', 'target.bst', element_key)
    
    95
    +
    
    96
    +
    
    97
    +def _test_push(user_config_file, project_dir, artifact_dir,
    
    98
    +               element_name, element_key, queue):
    
    99
    +    # Fake minimal context
    
    100
    +    context = Context()
    
    101
    +    context.load(config=user_config_file)
    
    102
    +    context.artifactdir = artifact_dir
    
    103
    +    context.set_message_handler(message_handler)
    
    104
    +
    
    105
    +    # Load the project manually
    
    106
    +    project = Project(project_dir, context)
    
    107
    +    project.ensure_fully_loaded()
    
    108
    +
    
    109
    +    # Create a local CAS cache handle
    
    110
    +    cas = CASCache(context)
    
    111
    +
    
    112
    +    # Load the target element
    
    113
    +    element = project.load_elements([element_name], cas)[0]
    
    114
    +
    
    115
    +    # Manually setup the CAS remote
    
    116
    +    cas.setup_remotes(use_config=True)
    
    117
    +    cas.initialize_remotes()
    
    118
    +    if not cas.has_push_remotes(element=element):
    
    119
    +        queue.put("")
    
    120
    +
    
    121
    +    # Push the element's artifact
    
    122
    +    if not cas.push(element, [element_key]):
    
    123
    +        queue.put("")
    
    124
    +
    
    125
    +    queue.put(None)
    
    126
    +
    
    127
    +
    
    128
    +@pytest.mark.datafiles(DATA_DIR)
    
    129
    +def test_push_directory(cli, tmpdir, datafiles):
    
    130
    +    project_dir = str(datafiles)
    
    131
    +
    
    132
    +    # First build the project without the artifact cache configured
    
    133
    +    result = cli.run(project=project_dir, args=['build', 'target.bst'])
    
    134
    +    result.assert_success()
    
    135
    +
    
    136
    +    # Assert that we are now cached locally
    
    137
    +    assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
    
    138
    +
    
    139
    +    # Set up an artifact cache.
    
    140
    +    with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
    
    141
    +        # Configure artifact share
    
    142
    +        artifact_dir = os.path.join(str(tmpdir), 'cache', 'artifacts')
    
    143
    +        user_config_file = str(tmpdir.join('buildstream.conf'))
    
    144
    +        user_config = {
    
    145
    +            'scheduler': {
    
    146
    +                'pushers': 1
    
    147
    +            },
    
    148
    +            'artifacts': {
    
    149
    +                'url': share.repo,
    
    150
    +                'push': True,
    
    151
    +            }
    
    152
    +        }
    
    153
    +
    
    154
    +        _yaml.dump(_yaml.node_sanitize(user_config), filename=user_config_file)
    
    155
    +
    
    156
    +        # Fake minimal context
    
    157
    +        context = Context()
    
    158
    +        context.load(config=user_config_file)
    
    159
    +        context.artifactdir = os.path.join(str(tmpdir), 'cache', 'artifacts')
    
    160
    +        context.set_message_handler(message_handler)
    
    161
    +
    
    162
    +        # Load the project and CAS cache
    
    163
    +        project = Project(project_dir, context)
    
    164
    +        project.ensure_fully_loaded()
    
    165
    +        cas = CASCache(context)
    
    166
    +
    
    167
    +        # Assert that the element's artifact is cached
    
    168
    +        element = project.load_elements(['target.bst'], cas)[0]
    
    169
    +        element_key = cli.get_element_key(project_dir, 'target.bst')
    
    170
    +        assert cas.contains(element, element_key)
    
    171
    +
    
    172
    +        # Manually setup the CAS remote
    
    173
    +        cas.setup_remotes(use_config=True)
    
    174
    +        cas.initialize_remotes()
    
    175
    +        assert cas.has_push_remotes(element=element)
    
    176
    +
    
    177
    +        # Recreate the CasBasedDirectory object from the cached artifact
    
    178
    +        artifact_ref = cas.get_artifact_fullname(element, element_key)
    
    179
    +        artifact_digest = cas.resolve_ref(artifact_ref)
    
    180
    +
    
    181
    +        queue = multiprocessing.Queue()
    
    182
    +        # Use subprocess to avoid creation of gRPC threads in main BuildStream process
    
    183
    +        process = multiprocessing.Process(target=_test_push_directory,
    
    184
    +                                          args=(user_config_file, project_dir, artifact_dir,
    
    185
    +                                                artifact_digest, queue))
    
    186
    +
    
    187
    +        try:
    
    188
    +            # Keep SIGINT blocked in the child process
    
    189
    +            with _signals.blocked([signal.SIGINT], ignore=False):
    
    190
    +                process.start()
    
    191
    +
    
    192
    +            directory_hash = queue.get()
    
    193
    +            process.join()
    
    194
    +        except KeyboardInterrupt:
    
    195
    +            utils._kill_process_tree(process.pid)
    
    196
    +            raise
    
    197
    +
    
    198
    +        assert directory_hash
    
    199
    +        assert artifact_digest.hash == directory_hash
    
    200
    +        assert share.has_object(artifact_digest)
    
    201
    +
    
    202
    +
    
    203
    +def _test_push_directory(user_config_file, project_dir, artifact_dir, artifact_digest, queue):
    
    204
    +    # Fake minimal context
    
    205
    +    context = Context()
    
    206
    +    context.load(config=user_config_file)
    
    207
    +    context.artifactdir = artifact_dir
    
    208
    +    context.set_message_handler(message_handler)
    
    209
    +
    
    210
    +    # Load the project manually
    
    211
    +    project = Project(project_dir, context)
    
    212
    +    project.ensure_fully_loaded()
    
    213
    +
    
    214
    +    # Create a local CAS cache handle
    
    215
    +    cas = CASCache(context)
    
    216
    +
    
    217
    +    # Manually setup the CAS remote
    
    218
    +    cas.setup_remotes(use_config=True)
    
    219
    +    cas.initialize_remotes()
    
    220
    +    if not cas.has_push_remotes():
    
    221
    +        queue.put("")
    
    222
    +
    
    223
    +    # Create a CasBasedDirectory from local CAS cache content
    
    224
    +    directory = CasBasedDirectory(context, ref=artifact_digest)
    
    225
    +
    
    226
    +    # Push the CasBasedDirectory object
    
    227
    +    directory_digest = cas.push_directory(project, directory)
    
    228
    +
    
    229
    +    queue.put(directory_digest.hash)
    
    230
    +
    
    231
    +
    
    232
    +@pytest.mark.datafiles(DATA_DIR)
    
    233
    +def test_push_message(cli, tmpdir, datafiles):
    
    234
    +    project_dir = str(datafiles)
    
    235
    +
    
    236
    +    # Set up an artifact cache.
    
    237
    +    with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
    
    238
    +        # Configure artifact share
    
    239
    +        artifact_dir = os.path.join(str(tmpdir), 'cache', 'artifacts')
    
    240
    +        user_config_file = str(tmpdir.join('buildstream.conf'))
    
    241
    +        user_config = {
    
    242
    +            'scheduler': {
    
    243
    +                'pushers': 1
    
    244
    +            },
    
    245
    +            'artifacts': {
    
    246
    +                'url': share.repo,
    
    247
    +                'push': True,
    
    248
    +            }
    
    249
    +        }
    
    250
    +
    
    251
    +        _yaml.dump(_yaml.node_sanitize(user_config), filename=user_config_file)
    
    252
    +
    
    253
    +        queue = multiprocessing.Queue()
    
    254
    +        # Use subprocess to avoid creation of gRPC threads in main BuildStream process
    
    255
    +        process = multiprocessing.Process(target=_test_push_message,
    
    256
    +                                          args=(user_config_file, project_dir, artifact_dir, queue))
    
    257
    +
    
    258
    +        try:
    
    259
    +            # Keep SIGINT blocked in the child process
    
    260
    +            with _signals.blocked([signal.SIGINT], ignore=False):
    
    261
    +                process.start()
    
    262
    +
    
    263
    +            message_hash, message_size = queue.get()
    
    264
    +            process.join()
    
    265
    +        except KeyboardInterrupt:
    
    266
    +            utils._kill_process_tree(process.pid)
    
    267
    +            raise
    
    268
    +
    
    269
    +        assert message_hash and message_size
    
    270
    +        message_digest = remote_execution_pb2.Digest(hash=message_hash,
    
    271
    +                                                     size_bytes=message_size)
    
    272
    +        assert share.has_object(message_digest)
    
    273
    +
    
    274
    +
    
    275
    +def _test_push_message(user_config_file, project_dir, artifact_dir, queue):
    
    276
    +    # Fake minimal context
    
    277
    +    context = Context()
    
    278
    +    context.load(config=user_config_file)
    
    279
    +    context.artifactdir = artifact_dir
    
    280
    +    context.set_message_handler(message_handler)
    
    281
    +
    
    282
    +    # Load the project manually
    
    283
    +    project = Project(project_dir, context)
    
    284
    +    project.ensure_fully_loaded()
    
    285
    +
    
    286
    +    # Create a local CAS cache handle
    
    287
    +    cas = CASCache(context)
    
    288
    +
    
    289
    +    # Manually setup the CAS remote
    
    290
    +    cas.setup_remotes(use_config=True)
    
    291
    +    cas.initialize_remotes()
    
    292
    +    if not cas.has_push_remotes():
    
    293
    +        queue.put("")
    
    294
    +
    
    295
    +    # Create an example message object
    
    296
    +    command = remote_execution_pb2.Command(arguments=['/usr/bin/gcc', '--help'],
    
    297
    +                                           working_directory='/buildstream-build',
    
    298
    +                                           output_directories=['/buildstream-install'])
    
    299
    +
    
    300
    +    # Push the message object
    
    301
    +    command_digest = cas.push_message(project, command)
    
    302
    +
    
    303
    +    queue.put((command_digest.hash, command_digest.size_bytes))

  • tests/testutils/artifactshare.py
    ... ... @@ -15,6 +15,7 @@ from buildstream._artifactcache.cascache import CASCache
    15 15
     from buildstream._artifactcache.casserver import create_server
    
    16 16
     from buildstream._context import Context
    
    17 17
     from buildstream._exceptions import ArtifactError
    
    18
    +from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    18 19
     
    
    19 20
     
    
    20 21
     # ArtifactShare()
    
    ... ... @@ -87,6 +88,23 @@ class ArtifactShare():
    87 88
             # Sleep until termination by signal
    
    88 89
             signal.pause()
    
    89 90
     
    
    91
    +    # has_object():
    
    92
    +    #
    
    93
    +    # Checks whether the object is present in the share
    
    94
    +    #
    
    95
    +    # Args:
    
    96
    +    #    digest (str): The object's digest
    
    97
    +    #
    
    98
    +    # Returns:
    
    99
    +    #    (bool): True if the object exists in the share, otherwise false.
    
    100
    +    def has_object(self, digest):
    
    101
    +
    
    102
    +        assert isinstance(digest, remote_execution_pb2.Digest)
    
    103
    +
    
    104
    +        object_path = self.cas.objpath(digest)
    
    105
    +
    
    106
    +        return os.path.exists(object_path)
    
    107
    +
    
    90 108
         # has_artifact():
    
    91 109
         #
    
    92 110
         # Checks whether the artifact is present in the share
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]