Chandan Singh pushed to branch chandan/sourcetransform at BuildStream / buildstream
Commits:
-
51532b33
by Chandan Singh at 2018-07-25T18:59:17Z
8 changed files:
- buildstream/_scheduler/queues/fetchqueue.py
- buildstream/element.py
- buildstream/source.py
- + tests/sources/previous_source_access.py
- + tests/sources/previous_source_access/elements/target.bst
- + tests/sources/previous_source_access/files/file
- + tests/sources/previous_source_access/plugins/sources/foo_transform.py
- + tests/sources/previous_source_access/project.conf
Changes:
... | ... | @@ -40,8 +40,10 @@ class FetchQueue(Queue): |
40 | 40 |
self._skip_cached = skip_cached
|
41 | 41 |
|
42 | 42 |
def process(self, element):
|
43 |
+ previous_sources = []
|
|
43 | 44 |
for source in element.sources():
|
44 |
- source._fetch()
|
|
45 |
+ source._fetch(previous_sources=previous_sources)
|
|
46 |
+ previous_sources.append(source)
|
|
45 | 47 |
|
46 | 48 |
def status(self, element):
|
47 | 49 |
# state of dependencies may have changed, recalculate element state
|
... | ... | @@ -1194,6 +1194,11 @@ class Element(Plugin): |
1194 | 1194 |
# Prepend provenance to the error
|
1195 | 1195 |
raise ElementError("{}: {}".format(self, e), reason=e.reason) from e
|
1196 | 1196 |
|
1197 |
+ # Ensure that the first source does not need access to previous soruces
|
|
1198 |
+ if self.__sources and self.__sources[0]._is_transform():
|
|
1199 |
+ raise ElementError("{}: A Source Transform plugin cannot be the first source"
|
|
1200 |
+ .format(self))
|
|
1201 |
+ |
|
1197 | 1202 |
# Preflight the sources
|
1198 | 1203 |
for source in self.sources():
|
1199 | 1204 |
source._preflight()
|
... | ... | @@ -1237,9 +1242,9 @@ class Element(Plugin): |
1237 | 1242 |
#
|
1238 | 1243 |
def _track(self):
|
1239 | 1244 |
refs = []
|
1240 |
- for source in self.__sources:
|
|
1245 |
+ for index, source in enumerate(self.__sources):
|
|
1241 | 1246 |
old_ref = source.get_ref()
|
1242 |
- new_ref = source._track()
|
|
1247 |
+ new_ref = source._track(previous_sources=self.__sources[0:index])
|
|
1243 | 1248 |
refs.append((source._get_unique_id(), new_ref))
|
1244 | 1249 |
|
1245 | 1250 |
# Complimentary warning that the new ref will be unused.
|
... | ... | @@ -124,6 +124,9 @@ class Source(Plugin): |
124 | 124 |
__defaults = {} # The defaults from the project
|
125 | 125 |
__defaults_set = False # Flag, in case there are not defaults at all
|
126 | 126 |
|
127 |
+ requires_previous_sources_track = False
|
|
128 |
+ requires_previous_sources_fetch = False
|
|
129 |
+ |
|
127 | 130 |
def __init__(self, context, project, meta):
|
128 | 131 |
provenance = _yaml.node_get_provenance(meta.config)
|
129 | 132 |
super().__init__("{}-{}".format(meta.element_name, meta.element_index),
|
... | ... | @@ -213,7 +216,7 @@ class Source(Plugin): |
213 | 216 |
"""
|
214 | 217 |
raise ImplError("Source plugin '{}' does not implement set_ref()".format(self.get_kind()))
|
215 | 218 |
|
216 |
- def track(self):
|
|
219 |
+ def track(self, **kwargs):
|
|
217 | 220 |
"""Resolve a new ref from the plugin's track option
|
218 | 221 |
|
219 | 222 |
Returns:
|
... | ... | @@ -234,7 +237,7 @@ class Source(Plugin): |
234 | 237 |
# Allow a non implementation
|
235 | 238 |
return None
|
236 | 239 |
|
237 |
- def fetch(self):
|
|
240 |
+ def fetch(self, **kwargs):
|
|
238 | 241 |
"""Fetch remote sources and mirror them locally, ensuring at least
|
239 | 242 |
that the specific reference is cached locally.
|
240 | 243 |
|
... | ... | @@ -373,8 +376,18 @@ class Source(Plugin): |
373 | 376 |
|
374 | 377 |
# Wrapper function around plugin provided fetch method
|
375 | 378 |
#
|
376 |
- def _fetch(self):
|
|
377 |
- self.fetch()
|
|
379 |
+ # Args:
|
|
380 |
+ # previous_sources (list): List of Sources listed prior to this source
|
|
381 |
+ #
|
|
382 |
+ def _fetch(self, previous_sources):
|
|
383 |
+ if self.requires_previous_sources_fetch:
|
|
384 |
+ self.__ensure_previous_sources(previous_sources)
|
|
385 |
+ with self.tempdir() as staging_directory:
|
|
386 |
+ for src in previous_sources:
|
|
387 |
+ src._stage(staging_directory)
|
|
388 |
+ self.fetch(previous_sources_dir=staging_directory)
|
|
389 |
+ else:
|
|
390 |
+ self.fetch()
|
|
378 | 391 |
|
379 | 392 |
# Wrapper for stage() api which gives the source
|
380 | 393 |
# plugin a fully constructed path considering the
|
... | ... | @@ -580,18 +593,43 @@ class Source(Plugin): |
580 | 593 |
|
581 | 594 |
# Wrapper for track()
|
582 | 595 |
#
|
583 |
- def _track(self):
|
|
584 |
- new_ref = self.track()
|
|
585 |
- current_ref = self.get_ref()
|
|
596 |
+ # Args:
|
|
597 |
+ # previous_sources (list): List of Sources listed prior to this source
|
|
598 |
+ #
|
|
599 |
+ def _track(self, previous_sources):
|
|
600 |
+ def __track(**kwargs):
|
|
601 |
+ new_ref = self.track(**kwargs)
|
|
602 |
+ current_ref = self.get_ref()
|
|
603 |
+ |
|
604 |
+ if new_ref is None:
|
|
605 |
+ # No tracking, keep current ref
|
|
606 |
+ new_ref = current_ref
|
|
586 | 607 |
|
587 |
- if new_ref is None:
|
|
588 |
- # No tracking, keep current ref
|
|
589 |
- new_ref = current_ref
|
|
608 |
+ if current_ref != new_ref:
|
|
609 |
+ self.info("Found new revision: {}".format(new_ref))
|
|
590 | 610 |
|
591 |
- if current_ref != new_ref:
|
|
592 |
- self.info("Found new revision: {}".format(new_ref))
|
|
611 |
+ return new_ref
|
|
593 | 612 |
|
594 |
- return new_ref
|
|
613 |
+ if self.requires_previous_sources_track:
|
|
614 |
+ self.__ensure_previous_sources(previous_sources)
|
|
615 |
+ with self.tempdir() as staging_directory:
|
|
616 |
+ for src in previous_sources:
|
|
617 |
+ src._stage(staging_directory)
|
|
618 |
+ return __track(previous_sources_dir=staging_directory)
|
|
619 |
+ |
|
620 |
+ return __track()
|
|
621 |
+ |
|
622 |
+ # _is_transform()
|
|
623 |
+ #
|
|
624 |
+ # A plugin is considered a source transform plugin if it requires access to
|
|
625 |
+ # previous sources for its tracking or fetching. Such sources cannot be the
|
|
626 |
+ # first source of any element.
|
|
627 |
+ #
|
|
628 |
+ # Returns:
|
|
629 |
+ # (bool): Whetner this is a source transform plugin.
|
|
630 |
+ #
|
|
631 |
+ def _is_transform(self):
|
|
632 |
+ return self.requires_previous_sources_track or self.requires_previous_sources_fetch
|
|
595 | 633 |
|
596 | 634 |
#############################################################
|
597 | 635 |
# Local Private Methods #
|
... | ... | @@ -629,3 +667,14 @@ class Source(Plugin): |
629 | 667 |
_yaml.node_final_assertions(config)
|
630 | 668 |
|
631 | 669 |
return config
|
670 |
+ |
|
671 |
+ # Ensures that previous sources have been tracked and fetched.
|
|
672 |
+ #
|
|
673 |
+ def __ensure_previous_sources(self, previous_sources):
|
|
674 |
+ for index, src in enumerate(previous_sources):
|
|
675 |
+ if src.get_consistency() == Consistency.RESOLVED:
|
|
676 |
+ src._fetch(previous_sources=previous_sources[0:index])
|
|
677 |
+ elif src.get_consistency() == Consistency.INCONSISTENT:
|
|
678 |
+ new_ref = src._track()
|
|
679 |
+ src._save_ref(new_ref)
|
|
680 |
+ src._fetch(previous_sources=previous_sources[0:index])
|
1 |
+import os
|
|
2 |
+import pytest
|
|
3 |
+ |
|
4 |
+from tests.testutils import cli
|
|
5 |
+ |
|
6 |
+DATA_DIR = os.path.join(
|
|
7 |
+ os.path.dirname(os.path.realpath(__file__)),
|
|
8 |
+ 'previous_source_access'
|
|
9 |
+)
|
|
10 |
+ |
|
11 |
+ |
|
12 |
+##################################################################
|
|
13 |
+# Tests #
|
|
14 |
+##################################################################
|
|
15 |
+# Test that plugins can access data from previous sources
|
|
16 |
+@pytest.mark.datafiles(DATA_DIR)
|
|
17 |
+def test_custom_transform_source(cli, tmpdir, datafiles):
|
|
18 |
+ project = os.path.join(datafiles.dirname, datafiles.basename)
|
|
19 |
+ |
|
20 |
+ # Ensure we can track
|
|
21 |
+ result = cli.run(project=project, args=[
|
|
22 |
+ 'track', 'target.bst'
|
|
23 |
+ ])
|
|
24 |
+ result.assert_success()
|
|
25 |
+ |
|
26 |
+ # Ensure we can fetch
|
|
27 |
+ result = cli.run(project=project, args=[
|
|
28 |
+ 'fetch', 'target.bst'
|
|
29 |
+ ])
|
|
30 |
+ result.assert_success()
|
|
31 |
+ |
|
32 |
+ # Ensure we get correct output from foo_transform
|
|
33 |
+ result = cli.run(project=project, args=[
|
|
34 |
+ 'build', 'target.bst'
|
|
35 |
+ ])
|
|
36 |
+ destpath = os.path.join(cli.directory, 'checkout')
|
|
37 |
+ result = cli.run(project=project, args=[
|
|
38 |
+ 'checkout', 'target.bst', destpath
|
|
39 |
+ ])
|
|
40 |
+ result.assert_success()
|
|
41 |
+ # Assert that files from both sources exist, and that they have
|
|
42 |
+ # the same content
|
|
43 |
+ assert os.path.exists(os.path.join(destpath, 'file'))
|
|
44 |
+ assert os.path.exists(os.path.join(destpath, 'filetransform'))
|
|
45 |
+ with open(os.path.join(destpath, 'file')) as file1:
|
|
46 |
+ with open(os.path.join(destpath, 'filetransform')) as file2:
|
|
47 |
+ assert file1.read() == file2.read()
|
1 |
+kind: import
|
|
2 |
+ |
|
3 |
+sources:
|
|
4 |
+- kind: local
|
|
5 |
+ path: files/file
|
|
6 |
+- kind: foo_transform
|
1 |
+Hello World!
|
1 |
+import os
|
|
2 |
+import hashlib
|
|
3 |
+ |
|
4 |
+from buildstream import Consistency, Source, SourceError, utils
|
|
5 |
+ |
|
6 |
+ |
|
7 |
+class FooTransformSource(Source):
|
|
8 |
+ """Test plugin that copies "file" from previous source as "filetransform".
|
|
9 |
+ |
|
10 |
+ """
|
|
11 |
+ requires_previous_sources_fetch = True
|
|
12 |
+ requires_previous_sources_track = True
|
|
13 |
+ |
|
14 |
+ def configure(self, node):
|
|
15 |
+ self.node_validate(node, ['ref'] + Source.COMMON_CONFIG_KEYS)
|
|
16 |
+ self.ref = self.node_get_member(node, str, 'ref', None)
|
|
17 |
+ |
|
18 |
+ @property
|
|
19 |
+ def mirror(self):
|
|
20 |
+ path = os.path.join(self.get_mirror_directory(), self.name,
|
|
21 |
+ self.ref.strip())
|
|
22 |
+ os.makedirs(path, exist_ok=True)
|
|
23 |
+ return path
|
|
24 |
+ |
|
25 |
+ def preflight(self):
|
|
26 |
+ pass
|
|
27 |
+ |
|
28 |
+ def get_unique_key(self):
|
|
29 |
+ return (self.ref,)
|
|
30 |
+ |
|
31 |
+ def get_consistency(self):
|
|
32 |
+ if self.ref is None:
|
|
33 |
+ return Consistency.INCONSISTENT
|
|
34 |
+ fpath = os.path.join(self.mirror, 'filetransform')
|
|
35 |
+ try:
|
|
36 |
+ with open(fpath, 'rb') as f:
|
|
37 |
+ if hashlib.sha256(f.read()).hexdigest() == self.ref.strip():
|
|
38 |
+ return Consistency.CACHED
|
|
39 |
+ except Exception:
|
|
40 |
+ pass
|
|
41 |
+ return Consistency.RESOLVED
|
|
42 |
+ |
|
43 |
+ def get_ref(self):
|
|
44 |
+ return self.ref
|
|
45 |
+ |
|
46 |
+ def set_ref(self, ref, node):
|
|
47 |
+ self.ref = node['ref'] = ref
|
|
48 |
+ |
|
49 |
+ def track(self, previous_sources_dir):
|
|
50 |
+ fpath = os.path.join(previous_sources_dir, 'file')
|
|
51 |
+ with open(fpath, 'rb') as f:
|
|
52 |
+ return hashlib.sha256(f.read()).hexdigest()
|
|
53 |
+ |
|
54 |
+ def fetch(self, previous_sources_dir):
|
|
55 |
+ fpath = os.path.join(previous_sources_dir, 'file')
|
|
56 |
+ with open(fpath, 'rb') as f:
|
|
57 |
+ if hashlib.sha256(f.read()).hexdigest() != self.ref.strip():
|
|
58 |
+ raise SourceError("Element references do not match")
|
|
59 |
+ newfpath = os.path.join(self.mirror, 'filetransform')
|
|
60 |
+ utils.safe_copy(fpath, newfpath)
|
|
61 |
+ |
|
62 |
+ def stage(self, directory):
|
|
63 |
+ utils.safe_copy(os.path.join(self.mirror, 'filetransform'),
|
|
64 |
+ os.path.join(directory, 'filetransform'))
|
|
65 |
+ |
|
66 |
+ |
|
67 |
+def setup():
|
|
68 |
+ return FooTransformSource
|
1 |
+# Project with local source plugins
|
|
2 |
+name: foo
|
|
3 |
+ |
|
4 |
+element-path: elements
|
|
5 |
+ |
|
6 |
+plugins:
|
|
7 |
+- origin: local
|
|
8 |
+ path: plugins/sources
|
|
9 |
+ sources:
|
|
10 |
+ foo_transform: 0
|