[Notes] [Git][BuildStream/buildstream][raoul/628-RE-flow-optimisation] 9 commits: _scheduler/queues: Don't call update state outside of error handling harness



Title: GitLab

Raoul Hidalgo Charman pushed to branch raoul/628-RE-flow-optimisation at BuildStream / buildstream

Commits:

15 changed files:

Changes:

  • NEWS
    ... ... @@ -74,6 +74,10 @@ buildstream 1.3.1
    74 74
       o Add sandbox API for command batching and use it for build, script, and
    
    75 75
         compose elements.
    
    76 76
     
    
    77
    +  o BREAKING CHANGE: The `git` plugin does not create a local `.git`
    
    78
    +    repository by default.  If `git describe` is required to work, the
    
    79
    +    plugin has now a tag tracking feature instead. This can be enabled
    
    80
    +    by setting 'track-tags'.
    
    77 81
     
    
    78 82
     =================
    
    79 83
     buildstream 1.1.5
    

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -427,10 +427,7 @@ class CASCache():
    427 427
         def push_message(self, remote, message):
    
    428 428
     
    
    429 429
             message_buffer = message.SerializeToString()
    
    430
    -        message_sha = hashlib.sha256(message_buffer)
    
    431
    -        message_digest = remote_execution_pb2.Digest()
    
    432
    -        message_digest.hash = message_sha.hexdigest()
    
    433
    -        message_digest.size_bytes = len(message_buffer)
    
    430
    +        message_digest = utils._message_digest(message_buffer)
    
    434 431
     
    
    435 432
             remote.init()
    
    436 433
     
    

  • buildstream/_scheduler/queues/buildqueue.py
    ... ... @@ -106,10 +106,16 @@ class BuildQueue(Queue):
    106 106
     
    
    107 107
         def done(self, job, element, result, success):
    
    108 108
     
    
    109
    -        if success:
    
    110
    -            # Inform element in main process that assembly is done
    
    111
    -            element._assemble_done()
    
    109
    +        # Inform element in main process that assembly is done
    
    110
    +        element._assemble_done()
    
    112 111
     
    
    113
    -            # This has to be done after _assemble_done, such that the
    
    114
    -            # element may register its cache key as required
    
    112
    +        # This has to be done after _assemble_done, such that the
    
    113
    +        # element may register its cache key as required
    
    114
    +        #
    
    115
    +        # FIXME: Element._assemble() does not report both the failure state and the
    
    116
    +        #        size of the newly cached failed artifact, so we can only adjust the
    
    117
    +        #        artifact cache size for a successful build even though we know a
    
    118
    +        #        failed build also grows the artifact cache size.
    
    119
    +        #
    
    120
    +        if success:
    
    115 121
                 self._check_cache_size(job, element, result)

  • buildstream/_scheduler/queues/queue.py
    ... ... @@ -292,7 +292,6 @@ class Queue():
    292 292
         # See the Job object for an explanation of the call signature
    
    293 293
         #
    
    294 294
         def _job_done(self, job, element, success, result):
    
    295
    -        element._update_state()
    
    296 295
     
    
    297 296
             # Update values that need to be synchronized in the main task
    
    298 297
             # before calling any queue implementation
    

  • buildstream/_versions.py
    ... ... @@ -23,7 +23,7 @@
    23 23
     # This version is bumped whenever enhancements are made
    
    24 24
     # to the `project.conf` format or the core element format.
    
    25 25
     #
    
    26
    -BST_FORMAT_VERSION = 18
    
    26
    +BST_FORMAT_VERSION = 19
    
    27 27
     
    
    28 28
     
    
    29 29
     # The base BuildStream artifact version
    

  • buildstream/plugins/sources/git.py
    ... ... @@ -76,6 +76,56 @@ git - stage files from a git repository
    76 76
            url: upstream:baz.git
    
    77 77
            checkout: False
    
    78 78
     
    
    79
    +   # Enable tag tracking.
    
    80
    +   #
    
    81
    +   # This causes the `tags` metadata to be populated automatically
    
    82
    +   # as a result of tracking the git source.
    
    83
    +   #
    
    84
    +   # By default this is 'False'.
    
    85
    +   #
    
    86
    +   track-tags: True
    
    87
    +
    
    88
    +   # If the list of tags below is set, then a lightweight dummy
    
    89
    +   # git repository will be staged along with the content at
    
    90
    +   # build time.
    
    91
    +   #
    
    92
    +   # This is useful for a growing number of modules which use
    
    93
    +   # `git describe` at build time in order to determine the version
    
    94
    +   # which will be encoded into the built software.
    
    95
    +   #
    
    96
    +   # The 'tags' below is considered as a part of the git source
    
    97
    +   # reference and will be stored in the 'project.refs' file if
    
    98
    +   # that has been selected as your project's ref-storage.
    
    99
    +   #
    
    100
    +   # Migration notes:
    
    101
    +   #
    
    102
    +   #   If you are upgrading from BuildStream 1.2, which used to
    
    103
    +   #   stage the entire repository by default, you will notice that
    
    104
    +   #   some modules which use `git describe` are broken, and will
    
    105
    +   #   need to enable this feature in order to fix them.
    
    106
    +   #
    
    107
    +   #   If you need to enable this feature without changing the
    
    108
    +   #   the specific commit that you are building, then we recommend
    
    109
    +   #   the following migration steps for any git sources where
    
    110
    +   #   `git describe` is required:
    
    111
    +   #
    
    112
    +   #     o Enable `track-tags` feature
    
    113
    +   #     o Set the `track` parameter to the desired commit sha which
    
    114
    +   #       the current `ref` points to
    
    115
    +   #     o Run `bst track` for these elements, this will result in
    
    116
    +   #       populating the `tags` portion of the refs without changing
    
    117
    +   #       the refs
    
    118
    +   #     o Restore the `track` parameter to the branches which you have
    
    119
    +   #       previously been tracking afterwards.
    
    120
    +   #
    
    121
    +   tags:
    
    122
    +   - tag: lightweight-example
    
    123
    +     commit: 04ad0dc656cb7cc6feb781aa13bdbf1d67d0af78
    
    124
    +     annotated: false
    
    125
    +   - tag: annotated-example
    
    126
    +     commit: 10abe77fe8d77385d86f225b503d9185f4ef7f3a
    
    127
    +     annotated: true
    
    128
    +
    
    79 129
     See :ref:`built-in functionality doumentation <core_source_builtins>` for
    
    80 130
     details on common configuration options for sources.
    
    81 131
     
    
    ... ... @@ -95,6 +145,7 @@ import re
    95 145
     import shutil
    
    96 146
     from collections.abc import Mapping
    
    97 147
     from io import StringIO
    
    148
    +from tempfile import TemporaryFile
    
    98 149
     
    
    99 150
     from configparser import RawConfigParser
    
    100 151
     
    
    ... ... @@ -115,13 +166,14 @@ INCONSISTENT_SUBMODULE = "inconsistent-submodules"
    115 166
     #
    
    116 167
     class GitMirror(SourceFetcher):
    
    117 168
     
    
    118
    -    def __init__(self, source, path, url, ref, *, primary=False):
    
    169
    +    def __init__(self, source, path, url, ref, *, primary=False, tags=[]):
    
    119 170
     
    
    120 171
             super().__init__()
    
    121 172
             self.source = source
    
    122 173
             self.path = path
    
    123 174
             self.url = url
    
    124 175
             self.ref = ref
    
    176
    +        self.tags = tags
    
    125 177
             self.primary = primary
    
    126 178
             self.mirror = os.path.join(source.get_mirror_directory(), utils.url_directory_name(url))
    
    127 179
             self.mark_download_url(url)
    
    ... ... @@ -214,7 +266,7 @@ class GitMirror(SourceFetcher):
    214 266
                 raise SourceError("{}: expected ref '{}' was not found in git repository: '{}'"
    
    215 267
                                   .format(self.source, self.ref, self.url))
    
    216 268
     
    
    217
    -    def latest_commit(self, tracking):
    
    269
    +    def latest_commit_with_tags(self, tracking, track_tags=False):
    
    218 270
             _, output = self.source.check_output(
    
    219 271
                 [self.source.host_git, 'rev-parse', tracking],
    
    220 272
                 fail="Unable to find commit for specified branch name '{}'".format(tracking),
    
    ... ... @@ -230,7 +282,28 @@ class GitMirror(SourceFetcher):
    230 282
                 if exit_code == 0:
    
    231 283
                     ref = output.rstrip('\n')
    
    232 284
     
    
    233
    -        return ref
    
    285
    +        if not track_tags:
    
    286
    +            return ref, []
    
    287
    +
    
    288
    +        tags = set()
    
    289
    +        for options in [[], ['--first-parent'], ['--tags'], ['--tags', '--first-parent']]:
    
    290
    +            exit_code, output = self.source.check_output(
    
    291
    +                [self.source.host_git, 'describe', '--abbrev=0', ref] + options,
    
    292
    +                cwd=self.mirror)
    
    293
    +            if exit_code == 0:
    
    294
    +                tag = output.strip()
    
    295
    +                _, commit_ref = self.source.check_output(
    
    296
    +                    [self.source.host_git, 'rev-parse', tag + '^{commit}'],
    
    297
    +                    fail="Unable to resolve tag '{}'".format(tag),
    
    298
    +                    cwd=self.mirror)
    
    299
    +                exit_code = self.source.call(
    
    300
    +                    [self.source.host_git, 'cat-file', 'tag', tag],
    
    301
    +                    cwd=self.mirror)
    
    302
    +                annotated = (exit_code == 0)
    
    303
    +
    
    304
    +                tags.add((tag, commit_ref.strip(), annotated))
    
    305
    +
    
    306
    +        return ref, list(tags)
    
    234 307
     
    
    235 308
         def stage(self, directory, track=None):
    
    236 309
             fullpath = os.path.join(directory, self.path)
    
    ... ... @@ -246,13 +319,15 @@ class GitMirror(SourceFetcher):
    246 319
                              fail="Failed to checkout git ref {}".format(self.ref),
    
    247 320
                              cwd=fullpath)
    
    248 321
     
    
    322
    +        # Remove .git dir
    
    323
    +        shutil.rmtree(os.path.join(fullpath, ".git"))
    
    324
    +
    
    325
    +        self._rebuild_git(fullpath)
    
    326
    +
    
    249 327
             # Check that the user specified ref exists in the track if provided & not already tracked
    
    250 328
             if track:
    
    251 329
                 self.assert_ref_in_track(fullpath, track)
    
    252 330
     
    
    253
    -        # Remove .git dir
    
    254
    -        shutil.rmtree(os.path.join(fullpath, ".git"))
    
    255
    -
    
    256 331
         def init_workspace(self, directory, track=None):
    
    257 332
             fullpath = os.path.join(directory, self.path)
    
    258 333
             url = self.source.translate_url(self.url)
    
    ... ... @@ -359,6 +434,78 @@ class GitMirror(SourceFetcher):
    359 434
                              .format(self.source, self.ref, track, self.url),
    
    360 435
                              detail=detail, warning_token=CoreWarnings.REF_NOT_IN_TRACK)
    
    361 436
     
    
    437
    +    def _rebuild_git(self, fullpath):
    
    438
    +        if not self.tags:
    
    439
    +            return
    
    440
    +
    
    441
    +        with self.source.tempdir() as tmpdir:
    
    442
    +            included = set()
    
    443
    +            shallow = set()
    
    444
    +            for _, commit_ref, _ in self.tags:
    
    445
    +
    
    446
    +                _, out = self.source.check_output([self.source.host_git, 'rev-list',
    
    447
    +                                                   '--boundary', '{}..{}'.format(commit_ref, self.ref)],
    
    448
    +                                                  fail="Failed to get git history {}..{} in directory: {}"
    
    449
    +                                                  .format(commit_ref, self.ref, fullpath),
    
    450
    +                                                  fail_temporarily=True,
    
    451
    +                                                  cwd=self.mirror)
    
    452
    +                for line in out.splitlines():
    
    453
    +                    rev = line.lstrip('-')
    
    454
    +                    if line[0] == '-':
    
    455
    +                        shallow.add(rev)
    
    456
    +                    else:
    
    457
    +                        included.add(rev)
    
    458
    +
    
    459
    +            shallow -= included
    
    460
    +            included |= shallow
    
    461
    +
    
    462
    +            self.source.call([self.source.host_git, 'init'],
    
    463
    +                             fail="Cannot initialize git repository: {}".format(fullpath),
    
    464
    +                             cwd=fullpath)
    
    465
    +
    
    466
    +            for rev in included:
    
    467
    +                with TemporaryFile(dir=tmpdir) as commit_file:
    
    468
    +                    self.source.call([self.source.host_git, 'cat-file', 'commit', rev],
    
    469
    +                                     stdout=commit_file,
    
    470
    +                                     fail="Failed to get commit {}".format(rev),
    
    471
    +                                     cwd=self.mirror)
    
    472
    +                    commit_file.seek(0, 0)
    
    473
    +                    self.source.call([self.source.host_git, 'hash-object', '-w', '-t', 'commit', '--stdin'],
    
    474
    +                                     stdin=commit_file,
    
    475
    +                                     fail="Failed to add commit object {}".format(rev),
    
    476
    +                                     cwd=fullpath)
    
    477
    +
    
    478
    +            with open(os.path.join(fullpath, '.git', 'shallow'), 'w') as shallow_file:
    
    479
    +                for rev in shallow:
    
    480
    +                    shallow_file.write('{}\n'.format(rev))
    
    481
    +
    
    482
    +            for tag, commit_ref, annotated in self.tags:
    
    483
    +                if annotated:
    
    484
    +                    with TemporaryFile(dir=tmpdir) as tag_file:
    
    485
    +                        tag_data = 'object {}\ntype commit\ntag {}\n'.format(commit_ref, tag)
    
    486
    +                        tag_file.write(tag_data.encode('ascii'))
    
    487
    +                        tag_file.seek(0, 0)
    
    488
    +                        _, tag_ref = self.source.check_output(
    
    489
    +                            [self.source.host_git, 'hash-object', '-w', '-t',
    
    490
    +                             'tag', '--stdin'],
    
    491
    +                            stdin=tag_file,
    
    492
    +                            fail="Failed to add tag object {}".format(tag),
    
    493
    +                            cwd=fullpath)
    
    494
    +
    
    495
    +                    self.source.call([self.source.host_git, 'tag', tag, tag_ref.strip()],
    
    496
    +                                     fail="Failed to tag: {}".format(tag),
    
    497
    +                                     cwd=fullpath)
    
    498
    +                else:
    
    499
    +                    self.source.call([self.source.host_git, 'tag', tag, commit_ref],
    
    500
    +                                     fail="Failed to tag: {}".format(tag),
    
    501
    +                                     cwd=fullpath)
    
    502
    +
    
    503
    +            with open(os.path.join(fullpath, '.git', 'HEAD'), 'w') as head:
    
    504
    +                self.source.call([self.source.host_git, 'rev-parse', self.ref],
    
    505
    +                                 stdout=head,
    
    506
    +                                 fail="Failed to parse commit {}".format(self.ref),
    
    507
    +                                 cwd=self.mirror)
    
    508
    +
    
    362 509
     
    
    363 510
     class GitSource(Source):
    
    364 511
         # pylint: disable=attribute-defined-outside-init
    
    ... ... @@ -366,11 +513,20 @@ class GitSource(Source):
    366 513
         def configure(self, node):
    
    367 514
             ref = self.node_get_member(node, str, 'ref', None)
    
    368 515
     
    
    369
    -        config_keys = ['url', 'track', 'ref', 'submodules', 'checkout-submodules', 'ref-format']
    
    516
    +        config_keys = ['url', 'track', 'ref', 'submodules',
    
    517
    +                       'checkout-submodules', 'ref-format',
    
    518
    +                       'track-tags', 'tags']
    
    370 519
             self.node_validate(node, config_keys + Source.COMMON_CONFIG_KEYS)
    
    371 520
     
    
    521
    +        tags_node = self.node_get_member(node, list, 'tags', [])
    
    522
    +        for tag_node in tags_node:
    
    523
    +            self.node_validate(tag_node, ['tag', 'commit', 'annotated'])
    
    524
    +
    
    525
    +        tags = self._load_tags(node)
    
    526
    +        self.track_tags = self.node_get_member(node, bool, 'track-tags', False)
    
    527
    +
    
    372 528
             self.original_url = self.node_get_member(node, str, 'url')
    
    373
    -        self.mirror = GitMirror(self, '', self.original_url, ref, primary=True)
    
    529
    +        self.mirror = GitMirror(self, '', self.original_url, ref, tags=tags, primary=True)
    
    374 530
             self.tracking = self.node_get_member(node, str, 'track', None)
    
    375 531
     
    
    376 532
             self.ref_format = self.node_get_member(node, str, 'ref-format', 'sha1')
    
    ... ... @@ -417,6 +573,9 @@ class GitSource(Source):
    417 573
             # the ref, if the user changes the alias to fetch the same sources
    
    418 574
             # from another location, it should not affect the cache key.
    
    419 575
             key = [self.original_url, self.mirror.ref]
    
    576
    +        if self.mirror.tags:
    
    577
    +            tags = {tag: (commit, annotated) for tag, commit, annotated in self.mirror.tags}
    
    578
    +            key.append({'tags': tags})
    
    420 579
     
    
    421 580
             # Only modify the cache key with checkout_submodules if it's something
    
    422 581
             # other than the default behaviour.
    
    ... ... @@ -442,12 +601,33 @@ class GitSource(Source):
    442 601
     
    
    443 602
         def load_ref(self, node):
    
    444 603
             self.mirror.ref = self.node_get_member(node, str, 'ref', None)
    
    604
    +        self.mirror.tags = self._load_tags(node)
    
    445 605
     
    
    446 606
         def get_ref(self):
    
    447
    -        return self.mirror.ref
    
    448
    -
    
    449
    -    def set_ref(self, ref, node):
    
    450
    -        node['ref'] = self.mirror.ref = ref
    
    607
    +        return self.mirror.ref, self.mirror.tags
    
    608
    +
    
    609
    +    def set_ref(self, ref_data, node):
    
    610
    +        if not ref_data:
    
    611
    +            self.mirror.ref = None
    
    612
    +            if 'ref' in node:
    
    613
    +                del node['ref']
    
    614
    +            self.mirror.tags = []
    
    615
    +            if 'tags' in node:
    
    616
    +                del node['tags']
    
    617
    +        else:
    
    618
    +            ref, tags = ref_data
    
    619
    +            node['ref'] = self.mirror.ref = ref
    
    620
    +            self.mirror.tags = tags
    
    621
    +            if tags:
    
    622
    +                node['tags'] = []
    
    623
    +                for tag, commit_ref, annotated in tags:
    
    624
    +                    data = {'tag': tag,
    
    625
    +                            'commit': commit_ref,
    
    626
    +                            'annotated': annotated}
    
    627
    +                    node['tags'].append(data)
    
    628
    +            else:
    
    629
    +                if 'tags' in node:
    
    630
    +                    del node['tags']
    
    451 631
     
    
    452 632
         def track(self):
    
    453 633
     
    
    ... ... @@ -470,7 +650,7 @@ class GitSource(Source):
    470 650
                 self.mirror._fetch()
    
    471 651
     
    
    472 652
                 # Update self.mirror.ref and node.ref from the self.tracking branch
    
    473
    -            ret = self.mirror.latest_commit(self.tracking)
    
    653
    +            ret = self.mirror.latest_commit_with_tags(self.tracking, self.track_tags)
    
    474 654
     
    
    475 655
             # Set tracked attribute, parameter for if self.mirror.assert_ref_in_track is needed
    
    476 656
             self.tracked = True
    
    ... ... @@ -556,6 +736,16 @@ class GitSource(Source):
    556 736
     
    
    557 737
             self.submodules = submodules
    
    558 738
     
    
    739
    +    def _load_tags(self, node):
    
    740
    +        tags = []
    
    741
    +        tags_node = self.node_get_member(node, list, 'tags', [])
    
    742
    +        for tag_node in tags_node:
    
    743
    +            tag = self.node_get_member(tag_node, str, 'tag')
    
    744
    +            commit_ref = self.node_get_member(tag_node, str, 'commit')
    
    745
    +            annotated = self.node_get_member(tag_node, bool, 'annotated')
    
    746
    +            tags.append((tag, commit_ref, annotated))
    
    747
    +        return tags
    
    748
    +
    
    559 749
     
    
    560 750
     # Plugin entry point
    
    561 751
     def setup():
    

  • buildstream/sandbox/_sandboxremote.py
    ... ... @@ -26,6 +26,8 @@ from functools import partial
    26 26
     
    
    27 27
     import grpc
    
    28 28
     
    
    29
    +from .. import utils
    
    30
    +from .._message import Message, MessageType
    
    29 31
     from . import Sandbox, SandboxCommandError
    
    30 32
     from .sandbox import _SandboxBatch
    
    31 33
     from ..storage._filebaseddirectory import FileBasedDirectory
    
    ... ... @@ -39,7 +41,7 @@ from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc
    39 41
     from .._artifactcache.cascache import CASRemote, CASRemoteSpec
    
    40 42
     
    
    41 43
     
    
    42
    -class RemoteExecutionSpec(namedtuple('RemoteExecutionSpec', 'exec_service storage_service')):
    
    44
    +class RemoteExecutionSpec(namedtuple('RemoteExecutionSpec', 'exec_service storage_service action_service')):
    
    43 45
         pass
    
    44 46
     
    
    45 47
     
    
    ... ... @@ -59,6 +61,10 @@ class SandboxRemote(Sandbox):
    59 61
     
    
    60 62
             self.storage_url = config.storage_service['url']
    
    61 63
             self.exec_url = config.exec_service['url']
    
    64
    +        if config.action_service:
    
    65
    +            self.action_url = config.action_service['url']
    
    66
    +        else:
    
    67
    +            self.action_url = None
    
    62 68
     
    
    63 69
             self.storage_remote_spec = CASRemoteSpec(self.storage_url, push=True,
    
    64 70
                                                      server_cert=config.storage_service['server-cert'],
    
    ... ... @@ -66,6 +72,9 @@ class SandboxRemote(Sandbox):
    66 72
                                                      client_cert=config.storage_service['client-cert'])
    
    67 73
             self.operation_name = None
    
    68 74
     
    
    75
    +    def info(self, msg):
    
    76
    +        self._get_context().message(Message(None, MessageType.INFO, msg))
    
    77
    +
    
    69 78
         @staticmethod
    
    70 79
         def specs_from_config_node(config_node, basedir):
    
    71 80
     
    
    ... ... @@ -88,12 +97,19 @@ class SandboxRemote(Sandbox):
    88 97
     
    
    89 98
             tls_keys = ['client-key', 'client-cert', 'server-cert']
    
    90 99
     
    
    91
    -        _yaml.node_validate(remote_config, ['execution-service', 'storage-service', 'url'])
    
    100
    +        _yaml.node_validate(
    
    101
    +            remote_config,
    
    102
    +            ['execution-service', 'storage-service', 'url', 'action-cache-service'])
    
    92 103
             remote_exec_service_config = require_node(remote_config, 'execution-service')
    
    93 104
             remote_exec_storage_config = require_node(remote_config, 'storage-service')
    
    105
    +        remote_exec_action_config = remote_config.get('action-cache-service')
    
    94 106
     
    
    95 107
             _yaml.node_validate(remote_exec_service_config, ['url'])
    
    96 108
             _yaml.node_validate(remote_exec_storage_config, ['url'] + tls_keys)
    
    109
    +        if remote_exec_action_config:
    
    110
    +            _yaml.node_validate(remote_exec_action_config, ['url'])
    
    111
    +        else:
    
    112
    +            remote_config['action-service'] = None
    
    97 113
     
    
    98 114
             if 'url' in remote_config:
    
    99 115
                 if 'execution-service' not in remote_config:
    
    ... ... @@ -114,52 +130,17 @@ class SandboxRemote(Sandbox):
    114 130
                                           "remote-execution configuration. Your config is missing '{}'."
    
    115 131
                                           .format(str(provenance), tls_keys, key))
    
    116 132
     
    
    117
    -        spec = RemoteExecutionSpec(remote_config['execution-service'], remote_config['storage-service'])
    
    133
    +        spec = RemoteExecutionSpec(remote_config['execution-service'],
    
    134
    +                                   remote_config['storage-service'],
    
    135
    +                                   remote_config['action-cache-service'])
    
    118 136
             return spec
    
    119 137
     
    
    120
    -    def run_remote_command(self, command, input_root_digest, working_directory, environment):
    
    138
    +    def run_remote_command(self, channel, action_digest):
    
    121 139
             # Sends an execution request to the remote execution server.
    
    122 140
             #
    
    123 141
             # This function blocks until it gets a response from the server.
    
    124
    -        #
    
    125
    -        environment_variables = [remote_execution_pb2.Command.
    
    126
    -                                 EnvironmentVariable(name=k, value=v)
    
    127
    -                                 for (k, v) in environment.items()]
    
    128
    -
    
    129
    -        # Create and send the Command object.
    
    130
    -        remote_command = remote_execution_pb2.Command(arguments=command,
    
    131
    -                                                      working_directory=working_directory,
    
    132
    -                                                      environment_variables=environment_variables,
    
    133
    -                                                      output_files=[],
    
    134
    -                                                      output_directories=[self._output_directory],
    
    135
    -                                                      platform=None)
    
    136
    -        context = self._get_context()
    
    137
    -        cascache = context.get_cascache()
    
    138
    -        casremote = CASRemote(self.storage_remote_spec)
    
    139
    -
    
    140
    -        # Upload the Command message to the remote CAS server
    
    141
    -        command_digest = cascache.push_message(casremote, remote_command)
    
    142
    -
    
    143
    -        # Create and send the action.
    
    144
    -        action = remote_execution_pb2.Action(command_digest=command_digest,
    
    145
    -                                             input_root_digest=input_root_digest,
    
    146
    -                                             timeout=None,
    
    147
    -                                             do_not_cache=False)
    
    148
    -
    
    149
    -        # Upload the Action message to the remote CAS server
    
    150
    -        action_digest = cascache.push_message(casremote, action)
    
    151
    -
    
    152
    -        # Next, try to create a communication channel to the BuildGrid server.
    
    153
    -        url = urlparse(self.exec_url)
    
    154
    -        if not url.port:
    
    155
    -            raise SandboxError("You must supply a protocol and port number in the execution-service url, "
    
    156
    -                               "for example: http://buildservice:50051.")
    
    157
    -        if url.scheme == 'http':
    
    158
    -            channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port))
    
    159
    -        else:
    
    160
    -            raise SandboxError("Remote execution currently only supports the 'http' protocol "
    
    161
    -                               "and '{}' was supplied.".format(url.scheme))
    
    162 142
     
    
    143
    +        # Try to create a communication channel to the BuildGrid server.
    
    163 144
             stub = remote_execution_pb2_grpc.ExecutionStub(channel)
    
    164 145
             request = remote_execution_pb2.ExecuteRequest(action_digest=action_digest,
    
    165 146
                                                           skip_cache_lookup=False)
    
    ... ... @@ -279,13 +260,12 @@ class SandboxRemote(Sandbox):
    279 260
             # to replace the sandbox's virtual directory with that. Creating a new virtual directory object
    
    280 261
             # from another hash will be interesting, though...
    
    281 262
     
    
    282
    -        new_dir = CasBasedDirectory(self._get_context().artifactcache.cas, ref=dir_digest)
    
    263
    +        new_dir = CasBasedDirectory(context.artifactcache.cas, ref=dir_digest)
    
    283 264
             self._set_virtual_directory(new_dir)
    
    284 265
     
    
    285 266
         def _run(self, command, flags, *, cwd, env):
    
    286
    -        # Upload sources
    
    267
    +        # set up virtual dircetory
    
    287 268
             upload_vdir = self.get_virtual_directory()
    
    288
    -
    
    289 269
             cascache = self._get_context().get_cascache()
    
    290 270
             if isinstance(upload_vdir, FileBasedDirectory):
    
    291 271
                 # Make a new temporary directory to put source in
    
    ... ... @@ -294,16 +274,111 @@ class SandboxRemote(Sandbox):
    294 274
     
    
    295 275
             upload_vdir.recalculate_hash()
    
    296 276
     
    
    297
    -        casremote = CASRemote(self.storage_remote_spec)
    
    298
    -        # Now, push that key (without necessarily needing a ref) to the remote.
    
    277
    +        # Generate action_digest first
    
    278
    +        input_root_digest = upload_vdir.ref
    
    279
    +        command_proto = self._create_command(command, cwd, env)
    
    280
    +        command_digest = utils._message_digest(command_proto.SerializeToString())
    
    281
    +        action = remote_execution_pb2.Action(command_digest=command_digest,
    
    282
    +                                             input_root_digest=input_root_digest)
    
    283
    +        action_digest = utils._message_digest(action.SerializeToString())
    
    284
    +
    
    285
    +        # Next, try to create a communication channel to the BuildGrid server.
    
    286
    +        url = urlparse(self.exec_url)
    
    287
    +        if not url.port:
    
    288
    +            raise SandboxError("You must supply a protocol and port number in the execution-service url, "
    
    289
    +                               "for example: http://buildservice:50051.")
    
    290
    +        if url.scheme == 'http':
    
    291
    +            channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port))
    
    292
    +        else:
    
    293
    +            raise SandboxError("Remote execution currently only supports the 'http' protocol "
    
    294
    +                               "and '{}' was supplied.".format(url.scheme))
    
    295
    +
    
    296
    +        # check action cache download and download if there
    
    297
    +        action_result = self._check_action_cache(action_digest)
    
    298
    +
    
    299
    +        if not action_result:
    
    300
    +            casremote = CASRemote(self.storage_remote_spec)
    
    301
    +
    
    302
    +            # Now, push that key (without necessarily needing a ref) to the remote.
    
    303
    +            try:
    
    304
    +                cascache.push_directory(casremote, upload_vdir)
    
    305
    +            except grpc.RpcError as e:
    
    306
    +                raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
    
    307
    +
    
    308
    +            if not cascache.verify_digest_on_remote(casremote, upload_vdir.ref):
    
    309
    +                raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
    
    310
    +
    
    311
    +            # Push command and action
    
    312
    +            try:
    
    313
    +                cascache.push_message(casremote, command_proto)
    
    314
    +            except grpc.RpcError as e:
    
    315
    +                raise SandboxError("Failed to push command to remote: {}".format(e))
    
    316
    +
    
    317
    +            try:
    
    318
    +                cascache.push_message(casremote, action)
    
    319
    +            except grpc.RpcError as e:
    
    320
    +                raise SandboxError("Failed to push action to remote: {}".format(e))
    
    321
    +
    
    322
    +            # Now request to execute the action
    
    323
    +            operation = self.run_remote_command(channel, action_digest)
    
    324
    +            action_result = self._extract_action_result(operation)
    
    325
    +
    
    326
    +        if action_result.exit_code != 0:
    
    327
    +            # A normal error during the build: the remote execution system
    
    328
    +            # has worked correctly but the command failed.
    
    329
    +            # action_result.stdout and action_result.stderr also contains
    
    330
    +            # build command outputs which we ignore at the moment.
    
    331
    +            return action_result.exit_code
    
    332
    +
    
    333
    +        # Get output of build
    
    334
    +        self.process_job_output(action_result.output_directories, action_result.output_files)
    
    335
    +
    
    336
    +        return 0
    
    337
    +
    
    338
    +    def _check_action_cache(self, action_digest):
    
    339
    +        # Checks the action cache to see if this artifact has already been built
    
    340
    +        #
    
    341
    +        # Should return either the action response or None if not found, raise
    
    342
    +        # Sandboxerror if other grpc error was raised
    
    343
    +        if not self.action_url:
    
    344
    +            return None
    
    345
    +        url = urlparse(self.action_url)
    
    346
    +        if not url.port:
    
    347
    +            raise SandboxError("You must supply a protocol and port number in the action-cache-service url, "
    
    348
    +                               "for example: http://buildservice:50051.")
    
    349
    +        if not url.scheme == "http":
    
    350
    +            raise SandboxError("Currently only support http for the action cache"
    
    351
    +                               "and {} was supplied".format(url.scheme))
    
    352
    +
    
    353
    +        channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port))
    
    354
    +        request = remote_execution_pb2.GetActionResultRequest(action_digest=action_digest)
    
    355
    +        stub = remote_execution_pb2_grpc.ActionCacheStub(channel)
    
    299 356
             try:
    
    300
    -            cascache.push_directory(casremote, upload_vdir)
    
    357
    +            result = stub.GetActionResult(request)
    
    301 358
             except grpc.RpcError as e:
    
    302
    -            raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
    
    359
    +            if e.code() != grpc.StatusCode.NOT_FOUND:
    
    360
    +                raise SandboxError("Failed to query action cache: {} ({})"
    
    361
    +                                   .format(e.code(), e.details()))
    
    362
    +            else:
    
    363
    +                return None
    
    364
    +        else:
    
    365
    +            self.info("Action result found in action cache")
    
    366
    +            return result
    
    303 367
     
    
    304
    -        # Now transmit the command to execute
    
    305
    -        operation = self.run_remote_command(command, upload_vdir.ref, cwd, env)
    
    368
    +    def _create_command(self, command, working_directory, environment):
    
    369
    +        # Creates a command proto
    
    370
    +        environment_variables = [remote_execution_pb2.Command.
    
    371
    +                                 EnvironmentVariable(name=k, value=v)
    
    372
    +                                 for (k, v) in environment.items()]
    
    373
    +        return remote_execution_pb2.Command(arguments=command,
    
    374
    +                                            working_directory=working_directory,
    
    375
    +                                            environment_variables=environment_variables,
    
    376
    +                                            output_files=[],
    
    377
    +                                            output_directories=[self._output_directory],
    
    378
    +                                            platform=None)
    
    306 379
     
    
    380
    +    @staticmethod
    
    381
    +    def _extract_action_result(operation):
    
    307 382
             if operation is None:
    
    308 383
                 # Failure of remote execution, usually due to an error in BuildStream
    
    309 384
                 raise SandboxError("No response returned from server")
    
    ... ... @@ -324,18 +399,7 @@ class SandboxRemote(Sandbox):
    324 399
                 else:
    
    325 400
                     raise SandboxError("Remote server failed at executing the build request.")
    
    326 401
     
    
    327
    -        action_result = execution_response.result
    
    328
    -
    
    329
    -        if action_result.exit_code != 0:
    
    330
    -            # A normal error during the build: the remote execution system
    
    331
    -            # has worked correctly but the command failed.
    
    332
    -            # action_result.stdout and action_result.stderr also contains
    
    333
    -            # build command outputs which we ignore at the moment.
    
    334
    -            return action_result.exit_code
    
    335
    -
    
    336
    -        self.process_job_output(action_result.output_directories, action_result.output_files)
    
    337
    -
    
    338
    -        return 0
    
    402
    +        return execution_response.result
    
    339 403
     
    
    340 404
         def _create_batch(self, main_group, flags, *, collect=None):
    
    341 405
             return _SandboxRemoteBatch(self, main_group, flags, collect=collect)
    

  • buildstream/utils.py
    ... ... @@ -41,6 +41,7 @@ import psutil
    41 41
     
    
    42 42
     from . import _signals
    
    43 43
     from ._exceptions import BstError, ErrorDomain
    
    44
    +from ._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    44 45
     
    
    45 46
     # The magic number for timestamps: 2011-11-11 11:11:11
    
    46 47
     _magic_timestamp = calendar.timegm([2011, 11, 11, 11, 11, 11])
    
    ... ... @@ -1242,3 +1243,19 @@ def _deduplicate(iterable, key=None):
    1242 1243
     def _get_link_mtime(path):
    
    1243 1244
         path_stat = os.lstat(path)
    
    1244 1245
         return path_stat.st_mtime
    
    1246
    +
    
    1247
    +
    
    1248
    +# _message_digest()
    
    1249
    +#
    
    1250
    +# Args:
    
    1251
    +#    message_buffer (str): String to create digest of
    
    1252
    +#
    
    1253
    +# Returns:
    
    1254
    +#    (remote_execution_pb2.Digest): Content digest
    
    1255
    +#
    
    1256
    +def _message_digest(message_buffer):
    
    1257
    +    sha = hashlib.sha256(message_buffer)
    
    1258
    +    digest = remote_execution_pb2.Digest()
    
    1259
    +    digest.hash = sha.hexdigest()
    
    1260
    +    digest.size_bytes = len(message_buffer)
    
    1261
    +    return digest

  • doc/source/format_project.rst
    ... ... @@ -238,6 +238,8 @@ using the `remote-execution` option:
    238 238
           server-cert: server.crt
    
    239 239
           client-cert: client.crt
    
    240 240
           client-key: client.key
    
    241
    +    action-cache-service:
    
    242
    +      url: http://bar.action.com:50052
    
    241 243
     
    
    242 244
     The execution-service part of remote execution does not support encrypted
    
    243 245
     connections yet, so the protocol must always be http.
    
    ... ... @@ -245,6 +247,11 @@ connections yet, so the protocol must always be http.
    245 247
     storage-service specifies a remote CAS store and the parameters are the
    
    246 248
     same as those used to specify an :ref:`artifact server <artifacts>`.
    
    247 249
     
    
    250
    +The action-cache-service specifies where built actions are cached, allowing
    
    251
    +buildstream to check whether an action has already been executed and download it
    
    252
    +if so. This is similar to the artifact cache but REAPI specified, and is
    
    253
    +optional for remote execution to work.
    
    254
    +
    
    248 255
     The storage service may be the same endpoint used for artifact
    
    249 256
     caching. Remote execution cannot work without push access to the
    
    250 257
     storage endpoint, so you must specify a client certificate and key,
    

  • tests/cachekey/project/sources/git3.bst
    1
    +kind: import
    
    2
    +sources:
    
    3
    +- kind: git
    
    4
    +  url: https://example.com/git/repo.git
    
    5
    +  ref: 6ac68af3e80b7b17c23a3c65233043550a7fa685
    
    6
    +  tags:
    
    7
    +  - tag: lightweight
    
    8
    +    commit: 0a3917d57477ee9afe7be49a0e8a76f56d176df1
    
    9
    +    annotated: false
    
    10
    +  - tag: annotated
    
    11
    +    commit: 68c7f0bd386684742c41ec2a54ce2325e3922f6c
    
    12
    +    annotated: true

  • tests/cachekey/project/sources/git3.expected
    1
    +6a25f539bd8629a36399c58efd2f5c9c117feb845076a37dc321b55d456932b6
    \ No newline at end of file

  • tests/cachekey/project/target.bst
    ... ... @@ -7,6 +7,7 @@ depends:
    7 7
     - sources/bzr1.bst
    
    8 8
     - sources/git1.bst
    
    9 9
     - sources/git2.bst
    
    10
    +- sources/git3.bst
    
    10 11
     - sources/local1.bst
    
    11 12
     - sources/local2.bst
    
    12 13
     - sources/ostree1.bst
    

  • tests/cachekey/project/target.expected
    1
    -125d9e7dcf4f49e5f80d85b7f144b43ed43186064afc2e596e57f26cce679cf5
    \ No newline at end of file
    1
    +bc99c288f855ac2619787f0067223f7812d2e10a9d2c7f2bf47de7113c0fd25c
    \ No newline at end of file

  • tests/sources/git.py
    ... ... @@ -22,6 +22,7 @@
    22 22
     
    
    23 23
     import os
    
    24 24
     import pytest
    
    25
    +import subprocess
    
    25 26
     
    
    26 27
     from buildstream._exceptions import ErrorDomain
    
    27 28
     from buildstream import _yaml
    
    ... ... @@ -523,3 +524,155 @@ def test_track_fetch(cli, tmpdir, datafiles, ref_format, tag, extra_commit):
    523 524
         # Fetch it
    
    524 525
         result = cli.run(project=project, args=['fetch', 'target.bst'])
    
    525 526
         result.assert_success()
    
    527
    +
    
    528
    +
    
    529
    +@pytest.mark.skipif(HAVE_GIT is False, reason="git is not available")
    
    530
    +@pytest.mark.datafiles(os.path.join(DATA_DIR, 'template'))
    
    531
    +@pytest.mark.parametrize("ref_storage", [('inline'), ('project.refs')])
    
    532
    +@pytest.mark.parametrize("tag_type", [('annotated'), ('lightweight')])
    
    533
    +def test_git_describe(cli, tmpdir, datafiles, ref_storage, tag_type):
    
    534
    +    project = str(datafiles)
    
    535
    +
    
    536
    +    project_config = _yaml.load(os.path.join(project, 'project.conf'))
    
    537
    +    project_config['ref-storage'] = ref_storage
    
    538
    +    _yaml.dump(_yaml.node_sanitize(project_config), os.path.join(project, 'project.conf'))
    
    539
    +
    
    540
    +    repofiles = os.path.join(str(tmpdir), 'repofiles')
    
    541
    +    os.makedirs(repofiles, exist_ok=True)
    
    542
    +    file0 = os.path.join(repofiles, 'file0')
    
    543
    +    with open(file0, 'w') as f:
    
    544
    +        f.write('test\n')
    
    545
    +
    
    546
    +    repo = create_repo('git', str(tmpdir))
    
    547
    +
    
    548
    +    def tag(name):
    
    549
    +        if tag_type == 'annotated':
    
    550
    +            repo.add_annotated_tag(name, name)
    
    551
    +        else:
    
    552
    +            repo.add_tag(name)
    
    553
    +
    
    554
    +    ref = repo.create(repofiles)
    
    555
    +    tag('uselesstag')
    
    556
    +
    
    557
    +    file1 = os.path.join(str(tmpdir), 'file1')
    
    558
    +    with open(file1, 'w') as f:
    
    559
    +        f.write('test\n')
    
    560
    +    repo.add_file(file1)
    
    561
    +    tag('tag1')
    
    562
    +
    
    563
    +    file2 = os.path.join(str(tmpdir), 'file2')
    
    564
    +    with open(file2, 'w') as f:
    
    565
    +        f.write('test\n')
    
    566
    +    repo.branch('branch2')
    
    567
    +    repo.add_file(file2)
    
    568
    +    tag('tag2')
    
    569
    +
    
    570
    +    repo.checkout('master')
    
    571
    +    file3 = os.path.join(str(tmpdir), 'file3')
    
    572
    +    with open(file3, 'w') as f:
    
    573
    +        f.write('test\n')
    
    574
    +    repo.add_file(file3)
    
    575
    +
    
    576
    +    repo.merge('branch2')
    
    577
    +
    
    578
    +    config = repo.source_config()
    
    579
    +    config['track'] = repo.latest_commit()
    
    580
    +    config['track-tags'] = True
    
    581
    +
    
    582
    +    # Write out our test target
    
    583
    +    element = {
    
    584
    +        'kind': 'import',
    
    585
    +        'sources': [
    
    586
    +            config
    
    587
    +        ],
    
    588
    +    }
    
    589
    +    element_path = os.path.join(project, 'target.bst')
    
    590
    +    _yaml.dump(element, element_path)
    
    591
    +
    
    592
    +    if ref_storage == 'inline':
    
    593
    +        result = cli.run(project=project, args=['track', 'target.bst'])
    
    594
    +        result.assert_success()
    
    595
    +    else:
    
    596
    +        result = cli.run(project=project, args=['track', 'target.bst', '--deps', 'all'])
    
    597
    +        result.assert_success()
    
    598
    +
    
    599
    +    if ref_storage == 'inline':
    
    600
    +        element = _yaml.load(element_path)
    
    601
    +        tags = _yaml.node_sanitize(element['sources'][0]['tags'])
    
    602
    +        assert len(tags) == 2
    
    603
    +        for tag in tags:
    
    604
    +            assert 'tag' in tag
    
    605
    +            assert 'commit' in tag
    
    606
    +            assert 'annotated' in tag
    
    607
    +            assert tag['annotated'] == (tag_type == 'annotated')
    
    608
    +
    
    609
    +        assert set([(tag['tag'], tag['commit']) for tag in tags]) == set([('tag1', repo.rev_parse('tag1^{commit}')),
    
    610
    +                                                                          ('tag2', repo.rev_parse('tag2^{commit}'))])
    
    611
    +
    
    612
    +    checkout = os.path.join(str(tmpdir), 'checkout')
    
    613
    +
    
    614
    +    result = cli.run(project=project, args=['build', 'target.bst'])
    
    615
    +    result.assert_success()
    
    616
    +    result = cli.run(project=project, args=['checkout', 'target.bst', checkout])
    
    617
    +    result.assert_success()
    
    618
    +
    
    619
    +    if tag_type == 'annotated':
    
    620
    +        options = []
    
    621
    +    else:
    
    622
    +        options = ['--tags']
    
    623
    +    describe = subprocess.check_output(['git', 'describe'] + options,
    
    624
    +                                       cwd=checkout).decode('ascii')
    
    625
    +    assert describe.startswith('tag2-2-')
    
    626
    +
    
    627
    +    describe_fp = subprocess.check_output(['git', 'describe', '--first-parent'] + options,
    
    628
    +                                          cwd=checkout).decode('ascii')
    
    629
    +    assert describe_fp.startswith('tag1-2-')
    
    630
    +
    
    631
    +    tags = subprocess.check_output(['git', 'tag'],
    
    632
    +                                   cwd=checkout).decode('ascii')
    
    633
    +    tags = set(tags.splitlines())
    
    634
    +    assert tags == set(['tag1', 'tag2'])
    
    635
    +
    
    636
    +    p = subprocess.run(['git', 'log', repo.rev_parse('uselesstag')],
    
    637
    +                       cwd=checkout)
    
    638
    +    assert p.returncode != 0
    
    639
    +
    
    640
    +
    
    641
    +@pytest.mark.skipif(HAVE_GIT is False, reason="git is not available")
    
    642
    +@pytest.mark.datafiles(os.path.join(DATA_DIR, 'template'))
    
    643
    +def test_default_do_not_track_tags(cli, tmpdir, datafiles):
    
    644
    +    project = str(datafiles)
    
    645
    +
    
    646
    +    project_config = _yaml.load(os.path.join(project, 'project.conf'))
    
    647
    +    project_config['ref-storage'] = 'inline'
    
    648
    +    _yaml.dump(_yaml.node_sanitize(project_config), os.path.join(project, 'project.conf'))
    
    649
    +
    
    650
    +    repofiles = os.path.join(str(tmpdir), 'repofiles')
    
    651
    +    os.makedirs(repofiles, exist_ok=True)
    
    652
    +    file0 = os.path.join(repofiles, 'file0')
    
    653
    +    with open(file0, 'w') as f:
    
    654
    +        f.write('test\n')
    
    655
    +
    
    656
    +    repo = create_repo('git', str(tmpdir))
    
    657
    +
    
    658
    +    ref = repo.create(repofiles)
    
    659
    +    repo.add_tag('tag')
    
    660
    +
    
    661
    +    config = repo.source_config()
    
    662
    +    config['track'] = repo.latest_commit()
    
    663
    +
    
    664
    +    # Write out our test target
    
    665
    +    element = {
    
    666
    +        'kind': 'import',
    
    667
    +        'sources': [
    
    668
    +            config
    
    669
    +        ],
    
    670
    +    }
    
    671
    +    element_path = os.path.join(project, 'target.bst')
    
    672
    +    _yaml.dump(element, element_path)
    
    673
    +
    
    674
    +    result = cli.run(project=project, args=['track', 'target.bst'])
    
    675
    +    result.assert_success()
    
    676
    +
    
    677
    +    element = _yaml.load(element_path)
    
    678
    +    assert 'tags' not in element['sources'][0]

  • tests/testutils/repo/git.py
    ... ... @@ -45,6 +45,9 @@ class Git(Repo):
    45 45
         def add_tag(self, tag):
    
    46 46
             self._run_git('tag', tag)
    
    47 47
     
    
    48
    +    def add_annotated_tag(self, tag, message):
    
    49
    +        self._run_git('tag', '-a', tag, '-m', message)
    
    50
    +
    
    48 51
         def add_commit(self):
    
    49 52
             self._run_git('commit', '--allow-empty', '-m', 'Additional commit')
    
    50 53
             return self.latest_commit()
    
    ... ... @@ -95,3 +98,14 @@ class Git(Repo):
    95 98
     
    
    96 99
         def branch(self, branch_name):
    
    97 100
             self._run_git('checkout', '-b', branch_name)
    
    101
    +
    
    102
    +    def checkout(self, commit):
    
    103
    +        self._run_git('checkout', commit)
    
    104
    +
    
    105
    +    def merge(self, commit):
    
    106
    +        self._run_git('merge', '-m', 'Merge', commit)
    
    107
    +        return self.latest_commit()
    
    108
    +
    
    109
    +    def rev_parse(self, rev):
    
    110
    +        output = self._run_git('rev-parse', rev, stdout=subprocess.PIPE).stdout
    
    111
    +        return output.decode('UTF-8').strip()



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]