[Notes] [Git][BuildStream/buildstream][chandan/sourcetransform] Allow source plugins to access previous sources



Title: GitLab

Chandan Singh pushed to branch chandan/sourcetransform at BuildStream / buildstream

Commits:

9 changed files:

Changes:

  • buildstream/_loader/loader.py
    ... ... @@ -506,14 +506,15 @@ class Loader():
    506 506
             element = Element._new_from_meta(meta_element, platform.artifactcache)
    
    507 507
             element._preflight()
    
    508 508
     
    
    509
    -        for source in element.sources():
    
    509
    +        sources = list(element.sources())
    
    510
    +        for idx, source in enumerate(sources):
    
    510 511
                 # Handle the case where a subproject needs to be fetched
    
    511 512
                 #
    
    512 513
                 if source.get_consistency() == Consistency.RESOLVED:
    
    513 514
                     if self._fetch_subprojects:
    
    514 515
                         if ticker:
    
    515 516
                             ticker(filename, 'Fetching subproject from {} source'.format(source.get_kind()))
    
    516
    -                    source._fetch()
    
    517
    +                    source._fetch(sources[0:idx])
    
    517 518
                     else:
    
    518 519
                         detail = "Try fetching the project with `bst fetch {}`".format(filename)
    
    519 520
                         raise LoadError(LoadErrorReason.SUBPROJECT_FETCH_NEEDED,
    

  • buildstream/_scheduler/queues/fetchqueue.py
    ... ... @@ -40,8 +40,10 @@ class FetchQueue(Queue):
    40 40
             self._skip_cached = skip_cached
    
    41 41
     
    
    42 42
         def process(self, element):
    
    43
    +        previous_sources = []
    
    43 44
             for source in element.sources():
    
    44
    -            source._fetch()
    
    45
    +            source._fetch(previous_sources)
    
    46
    +            previous_sources.append(source)
    
    45 47
     
    
    46 48
         def status(self, element):
    
    47 49
             # state of dependencies may have changed, recalculate element state
    

  • buildstream/element.py
    ... ... @@ -1195,6 +1195,11 @@ class Element(Plugin):
    1195 1195
                 # Prepend provenance to the error
    
    1196 1196
                 raise ElementError("{}: {}".format(self, e), reason=e.reason) from e
    
    1197 1197
     
    
    1198
    +        # Ensure that the first source does not need access to previous soruces
    
    1199
    +        if self.__sources and self.__sources[0]._is_transform():
    
    1200
    +            raise ElementError("{}: A Source Transform plugin cannot be the first source"
    
    1201
    +                               .format(self))
    
    1202
    +
    
    1198 1203
             # Preflight the sources
    
    1199 1204
             for source in self.sources():
    
    1200 1205
                 source._preflight()
    
    ... ... @@ -1238,9 +1243,9 @@ class Element(Plugin):
    1238 1243
         #
    
    1239 1244
         def _track(self):
    
    1240 1245
             refs = []
    
    1241
    -        for source in self.__sources:
    
    1246
    +        for index, source in enumerate(self.__sources):
    
    1242 1247
                 old_ref = source.get_ref()
    
    1243
    -            new_ref = source._track()
    
    1248
    +            new_ref = source._track(self.__sources[0:index])
    
    1244 1249
                 refs.append((source._get_unique_id(), new_ref))
    
    1245 1250
     
    
    1246 1251
                 # Complimentary warning that the new ref will be unused.
    

  • buildstream/source.py
    ... ... @@ -156,7 +156,7 @@ class SourceFetcher():
    156 156
         #############################################################
    
    157 157
         #                      Abstract Methods                     #
    
    158 158
         #############################################################
    
    159
    -    def fetch(self, alias_override=None):
    
    159
    +    def fetch(self, alias_override=None, **kwargs):
    
    160 160
             """Fetch remote sources and mirror them locally, ensuring at least
    
    161 161
             that the specific reference is cached locally.
    
    162 162
     
    
    ... ... @@ -209,6 +209,28 @@ class Source(Plugin):
    209 209
         __defaults = {}          # The defaults from the project
    
    210 210
         __defaults_set = False   # Flag, in case there are not defaults at all
    
    211 211
     
    
    212
    +    requires_previous_sources_track = False
    
    213
    +    """Whether access to previous sources is required during track
    
    214
    +
    
    215
    +    When set to True:
    
    216
    +      * all sources listed before this source in the given element will be
    
    217
    +        fetched before this source is tracked
    
    218
    +      * Source.track() will be called with an additional keywork argument
    
    219
    +        `previous_sources_dir` that will contain a list of sources
    
    220
    +      * this source can not be the first source for an element
    
    221
    +    """
    
    222
    +
    
    223
    +    requires_previous_sources_fetch = False
    
    224
    +    """Whether access to previous sources is required during fetch
    
    225
    +
    
    226
    +    When set to True:
    
    227
    +      * all sources listed before this source in the given element will be
    
    228
    +        fetched before this source is fetched
    
    229
    +      * Source.fetch() will be called with an additional keyword argument
    
    230
    +        `previous_sources_dir` that will contain a list of sources
    
    231
    +      * this source can not be the first source for an element
    
    232
    +    """
    
    233
    +
    
    212 234
         def __init__(self, context, project, meta, *, alias_override=None):
    
    213 235
             provenance = _yaml.node_get_provenance(meta.config)
    
    214 236
             super().__init__("{}-{}".format(meta.element_name, meta.element_index),
    
    ... ... @@ -303,7 +325,7 @@ class Source(Plugin):
    303 325
             """
    
    304 326
             raise ImplError("Source plugin '{}' does not implement set_ref()".format(self.get_kind()))
    
    305 327
     
    
    306
    -    def track(self):
    
    328
    +    def track(self, **kwargs):
    
    307 329
             """Resolve a new ref from the plugin's track option
    
    308 330
     
    
    309 331
             Returns:
    
    ... ... @@ -324,7 +346,7 @@ class Source(Plugin):
    324 346
             # Allow a non implementation
    
    325 347
             return None
    
    326 348
     
    
    327
    -    def fetch(self):
    
    349
    +    def fetch(self, **kwargs):
    
    328 350
             """Fetch remote sources and mirror them locally, ensuring at least
    
    329 351
             that the specific reference is cached locally.
    
    330 352
     
    
    ... ... @@ -517,46 +539,59 @@ class Source(Plugin):
    517 539
     
    
    518 540
         # Wrapper function around plugin provided fetch method
    
    519 541
         #
    
    520
    -    def _fetch(self):
    
    521
    -        project = self._get_project()
    
    522
    -        source_fetchers = self.get_source_fetchers()
    
    523
    -        if source_fetchers:
    
    524
    -            for fetcher in source_fetchers:
    
    525
    -                alias = fetcher._get_alias()
    
    526
    -                success = False
    
    542
    +    # Args:
    
    543
    +    #   previous_sources (list): List of Sources listed prior to this source
    
    544
    +    #
    
    545
    +    def _fetch(self, previous_sources):
    
    546
    +        def __fetch(**kwargs):
    
    547
    +            project = self._get_project()
    
    548
    +            source_fetchers = self.get_source_fetchers()
    
    549
    +            if source_fetchers:
    
    550
    +                for fetcher in source_fetchers:
    
    551
    +                    alias = fetcher._get_alias()
    
    552
    +                    success = False
    
    553
    +                    for uri in project.get_alias_uris(alias):
    
    554
    +                        try:
    
    555
    +                            fetcher.fetch(uri)
    
    556
    +                        # FIXME: Need to consider temporary vs. permanent failures,
    
    557
    +                        #        and how this works with retries.
    
    558
    +                        except BstError as e:
    
    559
    +                            last_error = e
    
    560
    +                            continue
    
    561
    +                        success = True
    
    562
    +                        break
    
    563
    +                    if not success:
    
    564
    +                        raise last_error
    
    565
    +            else:
    
    566
    +                alias = self._get_alias()
    
    567
    +                if not project.mirrors or not alias:
    
    568
    +                    self.fetch(**kwargs)
    
    569
    +                    return
    
    570
    +
    
    571
    +                context = self._get_context()
    
    572
    +                source_kind = type(self)
    
    527 573
                     for uri in project.get_alias_uris(alias):
    
    574
    +                    new_source = source_kind(context, project, self.__meta,
    
    575
    +                                             alias_override=(alias, uri))
    
    576
    +                    new_source._preflight()
    
    528 577
                         try:
    
    529
    -                        fetcher.fetch(uri)
    
    578
    +                        new_source.fetch(**kwargs)
    
    530 579
                         # FIXME: Need to consider temporary vs. permanent failures,
    
    531 580
                         #        and how this works with retries.
    
    532 581
                         except BstError as e:
    
    533 582
                             last_error = e
    
    534 583
                             continue
    
    535
    -                    success = True
    
    536
    -                    break
    
    537
    -                if not success:
    
    538
    -                    raise last_error
    
    584
    +                    return
    
    585
    +                raise last_error
    
    586
    +
    
    587
    +        if self.requires_previous_sources_fetch:
    
    588
    +            self.__ensure_previous_sources(previous_sources)
    
    589
    +            with self.tempdir() as staging_directory:
    
    590
    +                for src in previous_sources:
    
    591
    +                    src._stage(staging_directory)
    
    592
    +                __fetch(previous_sources_dir=staging_directory)
    
    539 593
             else:
    
    540
    -            alias = self._get_alias()
    
    541
    -            if not project.mirrors or not alias:
    
    542
    -                self.fetch()
    
    543
    -                return
    
    544
    -
    
    545
    -            context = self._get_context()
    
    546
    -            source_kind = type(self)
    
    547
    -            for uri in project.get_alias_uris(alias):
    
    548
    -                new_source = source_kind(context, project, self.__meta,
    
    549
    -                                         alias_override=(alias, uri))
    
    550
    -                new_source._preflight()
    
    551
    -                try:
    
    552
    -                    new_source.fetch()
    
    553
    -                # FIXME: Need to consider temporary vs. permanent failures,
    
    554
    -                #        and how this works with retries.
    
    555
    -                except BstError as e:
    
    556
    -                    last_error = e
    
    557
    -                    continue
    
    558
    -                return
    
    559
    -            raise last_error
    
    594
    +            __fetch()
    
    560 595
     
    
    561 596
         # Wrapper for stage() api which gives the source
    
    562 597
         # plugin a fully constructed path considering the
    
    ... ... @@ -762,8 +797,19 @@ class Source(Plugin):
    762 797
     
    
    763 798
         # Wrapper for track()
    
    764 799
         #
    
    765
    -    def _track(self):
    
    766
    -        new_ref = self.__do_track()
    
    800
    +    # Args:
    
    801
    +    #   previous_sources (list): List of Sources listed prior to this source
    
    802
    +    #
    
    803
    +    def _track(self, previous_sources):
    
    804
    +        if self.requires_previous_sources_track:
    
    805
    +            self.__ensure_previous_sources(previous_sources)
    
    806
    +            with self.tempdir() as staging_directory:
    
    807
    +                for src in previous_sources:
    
    808
    +                    src._stage(staging_directory)
    
    809
    +                new_ref = self.__do_track(previous_sources_dir=staging_directory)
    
    810
    +        else:
    
    811
    +            new_ref = self.__do_track()
    
    812
    +
    
    767 813
             current_ref = self.get_ref()
    
    768 814
     
    
    769 815
             if new_ref is None:
    
    ... ... @@ -775,6 +821,18 @@ class Source(Plugin):
    775 821
     
    
    776 822
             return new_ref
    
    777 823
     
    
    824
    +    # _is_transform()
    
    825
    +    #
    
    826
    +    # A plugin is considered a source transform plugin if it requires access to
    
    827
    +    # previous sources for its tracking or fetching. Such sources cannot be the
    
    828
    +    # first source of any element.
    
    829
    +    #
    
    830
    +    # Returns:
    
    831
    +    #   (bool): Whetner this is a source transform plugin.
    
    832
    +    #
    
    833
    +    def _is_transform(self):
    
    834
    +        return self.requires_previous_sources_track or self.requires_previous_sources_fetch
    
    835
    +
    
    778 836
         # Returns the alias if it's defined in the project
    
    779 837
         def _get_alias(self):
    
    780 838
             alias = self.__expected_alias
    
    ... ... @@ -791,12 +849,12 @@ class Source(Plugin):
    791 849
         #############################################################
    
    792 850
     
    
    793 851
         # Tries to call track for every mirror, stopping once it succeeds
    
    794
    -    def __do_track(self):
    
    852
    +    def __do_track(self, **kwargs):
    
    795 853
             project = self._get_project()
    
    796 854
             # If there are no mirrors, or no aliases to replace, there's nothing to do here.
    
    797 855
             alias = self._get_alias()
    
    798 856
             if not project.mirrors or not alias:
    
    799
    -            return self.track()
    
    857
    +            return self.track(**kwargs)
    
    800 858
     
    
    801 859
             context = self._get_context()
    
    802 860
             source_kind = type(self)
    
    ... ... @@ -808,7 +866,7 @@ class Source(Plugin):
    808 866
                                          alias_override=(alias, uri))
    
    809 867
                 new_source._preflight()
    
    810 868
                 try:
    
    811
    -                ref = new_source.track()
    
    869
    +                ref = new_source.track(**kwargs)
    
    812 870
                 # FIXME: Need to consider temporary vs. permanent failures,
    
    813 871
                 #        and how this works with retries.
    
    814 872
                 except BstError as e:
    
    ... ... @@ -849,3 +907,14 @@ class Source(Plugin):
    849 907
             _yaml.node_final_assertions(config)
    
    850 908
     
    
    851 909
             return config
    
    910
    +
    
    911
    +    # Ensures that previous sources have been tracked and fetched.
    
    912
    +    #
    
    913
    +    def __ensure_previous_sources(self, previous_sources):
    
    914
    +        for index, src in enumerate(previous_sources):
    
    915
    +            if src.get_consistency() == Consistency.RESOLVED:
    
    916
    +                src._fetch(previous_sources[0:index])
    
    917
    +            elif src.get_consistency() == Consistency.INCONSISTENT:
    
    918
    +                new_ref = src._track(previous_sources[0:index])
    
    919
    +                src._save_ref(new_ref)
    
    920
    +                src._fetch(previous_sources[0:index])

  • tests/sources/previous_source_access.py
    1
    +import os
    
    2
    +import pytest
    
    3
    +
    
    4
    +from tests.testutils import cli
    
    5
    +
    
    6
    +DATA_DIR = os.path.join(
    
    7
    +    os.path.dirname(os.path.realpath(__file__)),
    
    8
    +    'previous_source_access'
    
    9
    +)
    
    10
    +
    
    11
    +
    
    12
    +##################################################################
    
    13
    +#                              Tests                             #
    
    14
    +##################################################################
    
    15
    +# Test that plugins can access data from previous sources
    
    16
    +@pytest.mark.datafiles(DATA_DIR)
    
    17
    +def test_custom_transform_source(cli, tmpdir, datafiles):
    
    18
    +    project = os.path.join(datafiles.dirname, datafiles.basename)
    
    19
    +
    
    20
    +    # Ensure we can track
    
    21
    +    result = cli.run(project=project, args=[
    
    22
    +        'track', 'target.bst'
    
    23
    +    ])
    
    24
    +    result.assert_success()
    
    25
    +
    
    26
    +    # Ensure we can fetch
    
    27
    +    result = cli.run(project=project, args=[
    
    28
    +        'fetch', 'target.bst'
    
    29
    +    ])
    
    30
    +    result.assert_success()
    
    31
    +
    
    32
    +    # Ensure we get correct output from foo_transform
    
    33
    +    result = cli.run(project=project, args=[
    
    34
    +        'build', 'target.bst'
    
    35
    +    ])
    
    36
    +    destpath = os.path.join(cli.directory, 'checkout')
    
    37
    +    result = cli.run(project=project, args=[
    
    38
    +        'checkout', 'target.bst', destpath
    
    39
    +    ])
    
    40
    +    result.assert_success()
    
    41
    +    # Assert that files from both sources exist, and that they have
    
    42
    +    # the same content
    
    43
    +    assert os.path.exists(os.path.join(destpath, 'file'))
    
    44
    +    assert os.path.exists(os.path.join(destpath, 'filetransform'))
    
    45
    +    with open(os.path.join(destpath, 'file')) as file1:
    
    46
    +        with open(os.path.join(destpath, 'filetransform')) as file2:
    
    47
    +            assert file1.read() == file2.read()

  • tests/sources/previous_source_access/elements/target.bst
    1
    +kind: import
    
    2
    +
    
    3
    +sources:
    
    4
    +- kind: local
    
    5
    +  path: files/file
    
    6
    +- kind: foo_transform

  • tests/sources/previous_source_access/files/file
    1
    +Hello World!

  • tests/sources/previous_source_access/plugins/sources/foo_transform.py
    1
    +"""
    
    2
    +foo_transform - transform "file" from previous sources into "filetransform"
    
    3
    +===========================================================================
    
    4
    +
    
    5
    +This is a test source plugin that looks for a file named "file" staged by
    
    6
    +previous sources, and copies its contents to a file called "filetransform".
    
    7
    +
    
    8
    +"""
    
    9
    +
    
    10
    +import os
    
    11
    +import hashlib
    
    12
    +
    
    13
    +from buildstream import Consistency, Source, SourceError, utils
    
    14
    +
    
    15
    +
    
    16
    +class FooTransformSource(Source):
    
    17
    +
    
    18
    +    # We need access to previous both at track time and fetch time
    
    19
    +    requires_previous_sources_track = True
    
    20
    +    requires_previous_sources_fetch = True
    
    21
    +
    
    22
    +    @property
    
    23
    +    def mirror(self):
    
    24
    +        """Directory where this source should stage its files
    
    25
    +
    
    26
    +        """
    
    27
    +        path = os.path.join(self.get_mirror_directory(), self.name,
    
    28
    +                            self.ref.strip())
    
    29
    +        os.makedirs(path, exist_ok=True)
    
    30
    +        return path
    
    31
    +
    
    32
    +    def configure(self, node):
    
    33
    +        self.node_validate(node, ['ref'] + Source.COMMON_CONFIG_KEYS)
    
    34
    +        self.ref = self.node_get_member(node, str, 'ref', None)
    
    35
    +
    
    36
    +    def preflight(self):
    
    37
    +        pass
    
    38
    +
    
    39
    +    def get_unique_key(self):
    
    40
    +        return (self.ref,)
    
    41
    +
    
    42
    +    def get_consistency(self):
    
    43
    +        if self.ref is None:
    
    44
    +            return Consistency.INCONSISTENT
    
    45
    +        # If we have a file called "filetransform", verify that its checksum
    
    46
    +        # matches our ref. Otherwise, it resolved but not cached.
    
    47
    +        fpath = os.path.join(self.mirror, 'filetransform')
    
    48
    +        try:
    
    49
    +            with open(fpath, 'rb') as f:
    
    50
    +                if hashlib.sha256(f.read()).hexdigest() == self.ref.strip():
    
    51
    +                    return Consistency.CACHED
    
    52
    +        except Exception:
    
    53
    +            pass
    
    54
    +        return Consistency.RESOLVED
    
    55
    +
    
    56
    +    def get_ref(self):
    
    57
    +        return self.ref
    
    58
    +
    
    59
    +    def set_ref(self, ref, node):
    
    60
    +        self.ref = node['ref'] = ref
    
    61
    +
    
    62
    +    def track(self, previous_sources_dir):
    
    63
    +        # Store the checksum of the file from previous source as our ref
    
    64
    +        fpath = os.path.join(previous_sources_dir, 'file')
    
    65
    +        with open(fpath, 'rb') as f:
    
    66
    +            return hashlib.sha256(f.read()).hexdigest()
    
    67
    +
    
    68
    +    def fetch(self, previous_sources_dir):
    
    69
    +        fpath = os.path.join(previous_sources_dir, 'file')
    
    70
    +        # Verify that the checksum of the file from previous source matches
    
    71
    +        # our ref
    
    72
    +        with open(fpath, 'rb') as f:
    
    73
    +            if hashlib.sha256(f.read()).hexdigest() != self.ref.strip():
    
    74
    +                raise SourceError("Element references do not match")
    
    75
    +
    
    76
    +        # Copy "file" as "filetransform"
    
    77
    +        newfpath = os.path.join(self.mirror, 'filetransform')
    
    78
    +        utils.safe_copy(fpath, newfpath)
    
    79
    +
    
    80
    +    def stage(self, directory):
    
    81
    +        # Simply stage the "filetransform" file
    
    82
    +        utils.safe_copy(os.path.join(self.mirror, 'filetransform'),
    
    83
    +                        os.path.join(directory, 'filetransform'))
    
    84
    +
    
    85
    +
    
    86
    +def setup():
    
    87
    +    return FooTransformSource

  • tests/sources/previous_source_access/project.conf
    1
    +# Project with local source plugins
    
    2
    +name: foo
    
    3
    +
    
    4
    +element-path: elements
    
    5
    +
    
    6
    +plugins:
    
    7
    +- origin: local
    
    8
    +  path: plugins/sources
    
    9
    +  sources:
    
    10
    +    foo_transform: 0



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]