Chandan Singh pushed to branch chandan/sourcetransform at BuildStream / buildstream
Commits:
-
39c37a64
by Chandan Singh at 2018-07-30T10:55:56Z
9 changed files:
- buildstream/_loader/loader.py
- buildstream/_scheduler/queues/fetchqueue.py
- buildstream/element.py
- buildstream/source.py
- + tests/sources/previous_source_access.py
- + tests/sources/previous_source_access/elements/target.bst
- + tests/sources/previous_source_access/files/file
- + tests/sources/previous_source_access/plugins/sources/foo_transform.py
- + tests/sources/previous_source_access/project.conf
Changes:
| ... | ... | @@ -506,14 +506,15 @@ class Loader(): |
| 506 | 506 |
element = Element._new_from_meta(meta_element, platform.artifactcache)
|
| 507 | 507 |
element._preflight()
|
| 508 | 508 |
|
| 509 |
- for source in element.sources():
|
|
| 509 |
+ sources = list(element.sources())
|
|
| 510 |
+ for idx, source in enumerate(sources):
|
|
| 510 | 511 |
# Handle the case where a subproject needs to be fetched
|
| 511 | 512 |
#
|
| 512 | 513 |
if source.get_consistency() == Consistency.RESOLVED:
|
| 513 | 514 |
if self._fetch_subprojects:
|
| 514 | 515 |
if ticker:
|
| 515 | 516 |
ticker(filename, 'Fetching subproject from {} source'.format(source.get_kind()))
|
| 516 |
- source._fetch()
|
|
| 517 |
+ source._fetch(sources[0:idx])
|
|
| 517 | 518 |
else:
|
| 518 | 519 |
detail = "Try fetching the project with `bst fetch {}`".format(filename)
|
| 519 | 520 |
raise LoadError(LoadErrorReason.SUBPROJECT_FETCH_NEEDED,
|
| ... | ... | @@ -40,8 +40,10 @@ class FetchQueue(Queue): |
| 40 | 40 |
self._skip_cached = skip_cached
|
| 41 | 41 |
|
| 42 | 42 |
def process(self, element):
|
| 43 |
+ previous_sources = []
|
|
| 43 | 44 |
for source in element.sources():
|
| 44 |
- source._fetch()
|
|
| 45 |
+ source._fetch(previous_sources=previous_sources)
|
|
| 46 |
+ previous_sources.append(source)
|
|
| 45 | 47 |
|
| 46 | 48 |
def status(self, element):
|
| 47 | 49 |
# state of dependencies may have changed, recalculate element state
|
| ... | ... | @@ -1195,6 +1195,11 @@ class Element(Plugin): |
| 1195 | 1195 |
# Prepend provenance to the error
|
| 1196 | 1196 |
raise ElementError("{}: {}".format(self, e), reason=e.reason) from e
|
| 1197 | 1197 |
|
| 1198 |
+ # Ensure that the first source does not need access to previous soruces
|
|
| 1199 |
+ if self.__sources and self.__sources[0]._is_transform():
|
|
| 1200 |
+ raise ElementError("{}: A Source Transform plugin cannot be the first source"
|
|
| 1201 |
+ .format(self))
|
|
| 1202 |
+ |
|
| 1198 | 1203 |
# Preflight the sources
|
| 1199 | 1204 |
for source in self.sources():
|
| 1200 | 1205 |
source._preflight()
|
| ... | ... | @@ -1238,9 +1243,9 @@ class Element(Plugin): |
| 1238 | 1243 |
#
|
| 1239 | 1244 |
def _track(self):
|
| 1240 | 1245 |
refs = []
|
| 1241 |
- for source in self.__sources:
|
|
| 1246 |
+ for index, source in enumerate(self.__sources):
|
|
| 1242 | 1247 |
old_ref = source.get_ref()
|
| 1243 |
- new_ref = source._track()
|
|
| 1248 |
+ new_ref = source._track(previous_sources=self.__sources[0:index])
|
|
| 1244 | 1249 |
refs.append((source._get_unique_id(), new_ref))
|
| 1245 | 1250 |
|
| 1246 | 1251 |
# Complimentary warning that the new ref will be unused.
|
| ... | ... | @@ -156,7 +156,7 @@ class SourceFetcher(): |
| 156 | 156 |
#############################################################
|
| 157 | 157 |
# Abstract Methods #
|
| 158 | 158 |
#############################################################
|
| 159 |
- def fetch(self, alias_override=None):
|
|
| 159 |
+ def fetch(self, alias_override=None, **kwargs):
|
|
| 160 | 160 |
"""Fetch remote sources and mirror them locally, ensuring at least
|
| 161 | 161 |
that the specific reference is cached locally.
|
| 162 | 162 |
|
| ... | ... | @@ -209,6 +209,28 @@ class Source(Plugin): |
| 209 | 209 |
__defaults = {} # The defaults from the project
|
| 210 | 210 |
__defaults_set = False # Flag, in case there are not defaults at all
|
| 211 | 211 |
|
| 212 |
+ requires_previous_sources_track = False
|
|
| 213 |
+ """Whether access to previous sources is required during track
|
|
| 214 |
+ |
|
| 215 |
+ When set to True:
|
|
| 216 |
+ * all sources listed before this source in the given element will be
|
|
| 217 |
+ fetched bef*re this source is tracked
|
|
| 218 |
+ * Source.track() will be called with an additional keywork argument
|
|
| 219 |
+ `previous_sources_dir` that will contain a list of sources
|
|
| 220 |
+ * this source can not be the first source for an element
|
|
| 221 |
+ """
|
|
| 222 |
+ |
|
| 223 |
+ requires_previous_sources_fetch = False
|
|
| 224 |
+ """Whether access to previous sources is required during fetch
|
|
| 225 |
+ |
|
| 226 |
+ When set to True:
|
|
| 227 |
+ * all sources listed before this source in the given element will be
|
|
| 228 |
+ fetched bef*re this source is fetched
|
|
| 229 |
+ * Source.fetch() will be called with an additional keyword argument
|
|
| 230 |
+ `previous_sources_dir` that will contain a list of sources
|
|
| 231 |
+ * this source can not be the first source for an element
|
|
| 232 |
+ """
|
|
| 233 |
+ |
|
| 212 | 234 |
def __init__(self, context, project, meta, *, alias_override=None):
|
| 213 | 235 |
provenance = _yaml.node_get_provenance(meta.config)
|
| 214 | 236 |
super().__init__("{}-{}".format(meta.element_name, meta.element_index),
|
| ... | ... | @@ -303,7 +325,7 @@ class Source(Plugin): |
| 303 | 325 |
"""
|
| 304 | 326 |
raise ImplError("Source plugin '{}' does not implement set_ref()".format(self.get_kind()))
|
| 305 | 327 |
|
| 306 |
- def track(self):
|
|
| 328 |
+ def track(self, **kwargs):
|
|
| 307 | 329 |
"""Resolve a new ref from the plugin's track option
|
| 308 | 330 |
|
| 309 | 331 |
Returns:
|
| ... | ... | @@ -324,7 +346,7 @@ class Source(Plugin): |
| 324 | 346 |
# Allow a non implementation
|
| 325 | 347 |
return None
|
| 326 | 348 |
|
| 327 |
- def fetch(self):
|
|
| 349 |
+ def fetch(self, **kwargs):
|
|
| 328 | 350 |
"""Fetch remote sources and mirror them locally, ensuring at least
|
| 329 | 351 |
that the specific reference is cached locally.
|
| 330 | 352 |
|
| ... | ... | @@ -517,46 +539,59 @@ class Source(Plugin): |
| 517 | 539 |
|
| 518 | 540 |
# Wrapper function around plugin provided fetch method
|
| 519 | 541 |
#
|
| 520 |
- def _fetch(self):
|
|
| 521 |
- project = self._get_project()
|
|
| 522 |
- source_fetchers = self.get_source_fetchers()
|
|
| 523 |
- if source_fetchers:
|
|
| 524 |
- for fetcher in source_fetchers:
|
|
| 525 |
- alias = fetcher._get_alias()
|
|
| 526 |
- success = False
|
|
| 542 |
+ # Args:
|
|
| 543 |
+ # previous_sources (list): List of Sources listed prior to this source
|
|
| 544 |
+ #
|
|
| 545 |
+ def _fetch(self, previous_sources):
|
|
| 546 |
+ def __fetch(**kwargs):
|
|
| 547 |
+ project = self._get_project()
|
|
| 548 |
+ source_fetchers = self.get_source_fetchers()
|
|
| 549 |
+ if source_fetchers:
|
|
| 550 |
+ for fetcher in source_fetchers:
|
|
| 551 |
+ alias = fetcher._get_alias()
|
|
| 552 |
+ success = False
|
|
| 553 |
+ for uri in project.get_alias_uris(alias):
|
|
| 554 |
+ try:
|
|
| 555 |
+ fetcher.fetch(uri)
|
|
| 556 |
+ # FIXME: Need to consider temporary vs. permanent failures,
|
|
| 557 |
+ # and how this works with retries.
|
|
| 558 |
+ except BstError as e:
|
|
| 559 |
+ last_error = e
|
|
| 560 |
+ continue
|
|
| 561 |
+ success = True
|
|
| 562 |
+ break
|
|
| 563 |
+ if not success:
|
|
| 564 |
+ raise last_error
|
|
| 565 |
+ else:
|
|
| 566 |
+ alias = self._get_alias()
|
|
| 567 |
+ if not project.mirrors or not alias:
|
|
| 568 |
+ self.fetch(**kwargs)
|
|
| 569 |
+ return
|
|
| 570 |
+ |
|
| 571 |
+ context = self._get_context()
|
|
| 572 |
+ source_kind = type(self)
|
|
| 527 | 573 |
for uri in project.get_alias_uris(alias):
|
| 574 |
+ new_source = source_kind(context, project, self.__meta,
|
|
| 575 |
+ alias_override=(alias, uri))
|
|
| 576 |
+ new_source._preflight()
|
|
| 528 | 577 |
try:
|
| 529 |
- fetcher.fetch(uri)
|
|
| 578 |
+ new_source.fetch(**kwargs)
|
|
| 530 | 579 |
# FIXME: Need to consider temporary vs. permanent failures,
|
| 531 | 580 |
# and how this works with retries.
|
| 532 | 581 |
except BstError as e:
|
| 533 | 582 |
last_error = e
|
| 534 | 583 |
continue
|
| 535 |
- success = True
|
|
| 536 |
- break
|
|
| 537 |
- if not success:
|
|
| 538 |
- raise last_error
|
|
| 584 |
+ return
|
|
| 585 |
+ raise last_error
|
|
| 586 |
+ |
|
| 587 |
+ if self.requires_previous_sources_fetch:
|
|
| 588 |
+ self.__ensure_previous_sources(previous_sources)
|
|
| 589 |
+ with self.tempdir() as staging_directory:
|
|
| 590 |
+ for src in previous_sources:
|
|
| 591 |
+ src._stage(staging_directory)
|
|
| 592 |
+ __fetch(previous_sources_dir=staging_directory)
|
|
| 539 | 593 |
else:
|
| 540 |
- alias = self._get_alias()
|
|
| 541 |
- if not project.mirrors or not alias:
|
|
| 542 |
- self.fetch()
|
|
| 543 |
- return
|
|
| 544 |
- |
|
| 545 |
- context = self._get_context()
|
|
| 546 |
- source_kind = type(self)
|
|
| 547 |
- for uri in project.get_alias_uris(alias):
|
|
| 548 |
- new_source = source_kind(context, project, self.__meta,
|
|
| 549 |
- alias_override=(alias, uri))
|
|
| 550 |
- new_source._preflight()
|
|
| 551 |
- try:
|
|
| 552 |
- new_source.fetch()
|
|
| 553 |
- # FIXME: Need to consider temporary vs. permanent failures,
|
|
| 554 |
- # and how this works with retries.
|
|
| 555 |
- except BstError as e:
|
|
| 556 |
- last_error = e
|
|
| 557 |
- continue
|
|
| 558 |
- return
|
|
| 559 |
- raise last_error
|
|
| 594 |
+ __fetch()
|
|
| 560 | 595 |
|
| 561 | 596 |
# Wrapper for stage() api which gives the source
|
| 562 | 597 |
# plugin a fully constructed path considering the
|
| ... | ... | @@ -762,8 +797,19 @@ class Source(Plugin): |
| 762 | 797 |
|
| 763 | 798 |
# Wrapper for track()
|
| 764 | 799 |
#
|
| 765 |
- def _track(self):
|
|
| 766 |
- new_ref = self.__do_track()
|
|
| 800 |
+ # Args:
|
|
| 801 |
+ # previous_sources (list): List of Sources listed prior to this source
|
|
| 802 |
+ #
|
|
| 803 |
+ def _track(self, previous_sources):
|
|
| 804 |
+ if self.requires_previous_sources_track:
|
|
| 805 |
+ self.__ensure_previous_sources(previous_sources)
|
|
| 806 |
+ with self.tempdir() as staging_directory:
|
|
| 807 |
+ for src in previous_sources:
|
|
| 808 |
+ src._stage(staging_directory)
|
|
| 809 |
+ new_ref = self.__do_track(previous_sources_dir=staging_directory)
|
|
| 810 |
+ else:
|
|
| 811 |
+ new_ref = self.__do_track()
|
|
| 812 |
+ |
|
| 767 | 813 |
current_ref = self.get_ref()
|
| 768 | 814 |
|
| 769 | 815 |
if new_ref is None:
|
| ... | ... | @@ -775,6 +821,18 @@ class Source(Plugin): |
| 775 | 821 |
|
| 776 | 822 |
return new_ref
|
| 777 | 823 |
|
| 824 |
+ # _is_transform()
|
|
| 825 |
+ #
|
|
| 826 |
+ # A plugin is considered a source transform plugin if it requires access to
|
|
| 827 |
+ # previous sources for its tracking or fetching. Such sources cannot be the
|
|
| 828 |
+ # first source of any element.
|
|
| 829 |
+ #
|
|
| 830 |
+ # Returns:
|
|
| 831 |
+ # (bool): Whetner this is a source transform plugin.
|
|
| 832 |
+ #
|
|
| 833 |
+ def _is_transform(self):
|
|
| 834 |
+ return self.requires_previous_sources_track or self.requires_previous_sources_fetch
|
|
| 835 |
+ |
|
| 778 | 836 |
# Returns the alias if it's defined in the project
|
| 779 | 837 |
def _get_alias(self):
|
| 780 | 838 |
alias = self.__expected_alias
|
| ... | ... | @@ -791,12 +849,12 @@ class Source(Plugin): |
| 791 | 849 |
#############################################################
|
| 792 | 850 |
|
| 793 | 851 |
# Tries to call track for every mirror, stopping once it succeeds
|
| 794 |
- def __do_track(self):
|
|
| 852 |
+ def __do_track(self, **kwargs):
|
|
| 795 | 853 |
project = self._get_project()
|
| 796 | 854 |
# If there are no mirrors, or no aliases to replace, there's nothing to do here.
|
| 797 | 855 |
alias = self._get_alias()
|
| 798 | 856 |
if not project.mirrors or not alias:
|
| 799 |
- return self.track()
|
|
| 857 |
+ return self.track(**kwargs)
|
|
| 800 | 858 |
|
| 801 | 859 |
context = self._get_context()
|
| 802 | 860 |
source_kind = type(self)
|
| ... | ... | @@ -808,7 +866,7 @@ class Source(Plugin): |
| 808 | 866 |
alias_override=(alias, uri))
|
| 809 | 867 |
new_source._preflight()
|
| 810 | 868 |
try:
|
| 811 |
- ref = new_source.track()
|
|
| 869 |
+ ref = new_source.track(**kwargs)
|
|
| 812 | 870 |
# FIXME: Need to consider temporary vs. permanent failures,
|
| 813 | 871 |
# and how this works with retries.
|
| 814 | 872 |
except BstError as e:
|
| ... | ... | @@ -849,3 +907,14 @@ class Source(Plugin): |
| 849 | 907 |
_yaml.node_final_assertions(config)
|
| 850 | 908 |
|
| 851 | 909 |
return config
|
| 910 |
+ |
|
| 911 |
+ # Ensures that previous sources have been tracked and fetched.
|
|
| 912 |
+ #
|
|
| 913 |
+ def __ensure_previous_sources(self, previous_sources):
|
|
| 914 |
+ for index, src in enumerate(previous_sources):
|
|
| 915 |
+ if src.get_consistency() == Consistency.RESOLVED:
|
|
| 916 |
+ src._fetch(previous_sources=previous_sources[0:index])
|
|
| 917 |
+ elif src.get_consistency() == Consistency.INCONSISTENT:
|
|
| 918 |
+ new_ref = src._track()
|
|
| 919 |
+ src._save_ref(new_ref)
|
|
| 920 |
+ src._fetch(previous_sources=previous_sources[0:index])
|
| 1 |
+import os
|
|
| 2 |
+import pytest
|
|
| 3 |
+ |
|
| 4 |
+from tests.testutils import cli
|
|
| 5 |
+ |
|
| 6 |
+DATA_DIR = os.path.join(
|
|
| 7 |
+ os.path.dirname(os.path.realpath(__file__)),
|
|
| 8 |
+ 'previous_source_access'
|
|
| 9 |
+)
|
|
| 10 |
+ |
|
| 11 |
+ |
|
| 12 |
+##################################################################
|
|
| 13 |
+# Tests #
|
|
| 14 |
+##################################################################
|
|
| 15 |
+# Test that plugins can access data from previous sources
|
|
| 16 |
+@pytest.mark.datafiles(DATA_DIR)
|
|
| 17 |
+def test_custom_transform_source(cli, tmpdir, datafiles):
|
|
| 18 |
+ project = os.path.join(datafiles.dirname, datafiles.basename)
|
|
| 19 |
+ |
|
| 20 |
+ # Ensure we can track
|
|
| 21 |
+ result = cli.run(project=project, args=[
|
|
| 22 |
+ 'track', 'target.bst'
|
|
| 23 |
+ ])
|
|
| 24 |
+ result.assert_success()
|
|
| 25 |
+ |
|
| 26 |
+ # Ensure we can fetch
|
|
| 27 |
+ result = cli.run(project=project, args=[
|
|
| 28 |
+ 'fetch', 'target.bst'
|
|
| 29 |
+ ])
|
|
| 30 |
+ result.assert_success()
|
|
| 31 |
+ |
|
| 32 |
+ # Ensure we get correct output from foo_transform
|
|
| 33 |
+ result = cli.run(project=project, args=[
|
|
| 34 |
+ 'build', 'target.bst'
|
|
| 35 |
+ ])
|
|
| 36 |
+ destpath = os.path.join(cli.directory, 'checkout')
|
|
| 37 |
+ result = cli.run(project=project, args=[
|
|
| 38 |
+ 'checkout', 'target.bst', destpath
|
|
| 39 |
+ ])
|
|
| 40 |
+ result.assert_success()
|
|
| 41 |
+ # Assert that files from both sources exist, and that they have
|
|
| 42 |
+ # the same content
|
|
| 43 |
+ assert os.path.exists(os.path.join(destpath, 'file'))
|
|
| 44 |
+ assert os.path.exists(os.path.join(destpath, 'filetransform'))
|
|
| 45 |
+ with open(os.path.join(destpath, 'file')) as file1:
|
|
| 46 |
+ with open(os.path.join(destpath, 'filetransform')) as file2:
|
|
| 47 |
+ assert file1.read() == file2.read()
|
| 1 |
+kind: import
|
|
| 2 |
+ |
|
| 3 |
+sources:
|
|
| 4 |
+- kind: local
|
|
| 5 |
+ path: files/file
|
|
| 6 |
+- kind: foo_transform
|
| 1 |
+Hello World!
|
| 1 |
+"""
|
|
| 2 |
+foo_transform - transform "file" from previous sources into "filetransform"
|
|
| 3 |
+===========================================================================
|
|
| 4 |
+ |
|
| 5 |
+This is a test source plugin that looks for a file named "file" staged by
|
|
| 6 |
+previous sources, and copies its contents to a file called "filetransform".
|
|
| 7 |
+ |
|
| 8 |
+"""
|
|
| 9 |
+ |
|
| 10 |
+import os
|
|
| 11 |
+import hashlib
|
|
| 12 |
+ |
|
| 13 |
+from buildstream import Consistency, Source, SourceError, utils
|
|
| 14 |
+ |
|
| 15 |
+ |
|
| 16 |
+class FooTransformSource(Source):
|
|
| 17 |
+ |
|
| 18 |
+ # We need access to previous both at track time and fetch time
|
|
| 19 |
+ requires_previous_sources_track = True
|
|
| 20 |
+ requires_previous_sources_fetch = True
|
|
| 21 |
+ |
|
| 22 |
+ @property
|
|
| 23 |
+ def mirror(self):
|
|
| 24 |
+ """Directory where this source should stage its files
|
|
| 25 |
+ |
|
| 26 |
+ """
|
|
| 27 |
+ path = os.path.join(self.get_mirror_directory(), self.name,
|
|
| 28 |
+ self.ref.strip())
|
|
| 29 |
+ os.makedirs(path, exist_ok=True)
|
|
| 30 |
+ return path
|
|
| 31 |
+ |
|
| 32 |
+ def configure(self, node):
|
|
| 33 |
+ self.node_validate(node, ['ref'] + Source.COMMON_CONFIG_KEYS)
|
|
| 34 |
+ self.ref = self.node_get_member(node, str, 'ref', None)
|
|
| 35 |
+ |
|
| 36 |
+ def preflight(self):
|
|
| 37 |
+ pass
|
|
| 38 |
+ |
|
| 39 |
+ def get_unique_key(self):
|
|
| 40 |
+ return (self.ref,)
|
|
| 41 |
+ |
|
| 42 |
+ def get_consistency(self):
|
|
| 43 |
+ if self.ref is None:
|
|
| 44 |
+ return Consistency.INCONSISTENT
|
|
| 45 |
+ # If we have a file called "filetransform", verify that its checksum
|
|
| 46 |
+ # matches our ref. Otherwise, it resolved but not cached.
|
|
| 47 |
+ fpath = os.path.join(self.mirror, 'filetransform')
|
|
| 48 |
+ try:
|
|
| 49 |
+ with open(fpath, 'rb') as f:
|
|
| 50 |
+ if hashlib.sha256(f.read()).hexdigest() == self.ref.strip():
|
|
| 51 |
+ return Consistency.CACHED
|
|
| 52 |
+ except Exception:
|
|
| 53 |
+ pass
|
|
| 54 |
+ return Consistency.RESOLVED
|
|
| 55 |
+ |
|
| 56 |
+ def get_ref(self):
|
|
| 57 |
+ return self.ref
|
|
| 58 |
+ |
|
| 59 |
+ def set_ref(self, ref, node):
|
|
| 60 |
+ self.ref = node['ref'] = ref
|
|
| 61 |
+ |
|
| 62 |
+ def track(self, previous_sources_dir):
|
|
| 63 |
+ # Store the checksum of the file from previous source as our ref
|
|
| 64 |
+ fpath = os.path.join(previous_sources_dir, 'file')
|
|
| 65 |
+ with open(fpath, 'rb') as f:
|
|
| 66 |
+ return hashlib.sha256(f.read()).hexdigest()
|
|
| 67 |
+ |
|
| 68 |
+ def fetch(self, previous_sources_dir):
|
|
| 69 |
+ fpath = os.path.join(previous_sources_dir, 'file')
|
|
| 70 |
+ # Verify that the checksum of the file from previous source matches
|
|
| 71 |
+ # our ref
|
|
| 72 |
+ with open(fpath, 'rb') as f:
|
|
| 73 |
+ if hashlib.sha256(f.read()).hexdigest() != self.ref.strip():
|
|
| 74 |
+ raise SourceError("Element references do not match")
|
|
| 75 |
+ |
|
| 76 |
+ # Copy "file" as "filetransform"
|
|
| 77 |
+ newfpath = os.path.join(self.mirror, 'filetransform')
|
|
| 78 |
+ utils.safe_copy(fpath, newfpath)
|
|
| 79 |
+ |
|
| 80 |
+ def stage(self, directory):
|
|
| 81 |
+ # Simply stage the "filetransform" file
|
|
| 82 |
+ utils.safe_copy(os.path.join(self.mirror, 'filetransform'),
|
|
| 83 |
+ os.path.join(directory, 'filetransform'))
|
|
| 84 |
+ |
|
| 85 |
+ |
|
| 86 |
+def setup():
|
|
| 87 |
+ return FooTransformSource
|
| 1 |
+# Project with local source plugins
|
|
| 2 |
+name: foo
|
|
| 3 |
+ |
|
| 4 |
+element-path: elements
|
|
| 5 |
+ |
|
| 6 |
+plugins:
|
|
| 7 |
+- origin: local
|
|
| 8 |
+ path: plugins/sources
|
|
| 9 |
+ sources:
|
|
| 10 |
+ foo_transform: 0
|
