Tristan Van Berkom pushed to branch tristan/fix-bzr-race at BuildStream / buildstream
Commits:
-
63b02f7c
by James Ennis at 2019-01-17T15:13:27Z
-
f64ee46f
by James Ennis at 2019-01-17T17:02:28Z
-
2cb37a7e
by James Ennis at 2019-01-18T09:31:24Z
-
56ec33cc
by Valentin David at 2019-01-18T11:14:41Z
-
ad2df651
by Javier Jardón at 2019-01-18T12:01:56Z
-
f874295f
by Tristan Van Berkom at 2019-01-18T15:59:28Z
-
42a2fe3c
by Tristan Van Berkom at 2019-01-18T15:59:28Z
-
a8713ed2
by Tristan Van Berkom at 2019-01-18T15:59:28Z
-
a895cb2a
by Tristan Van Berkom at 2019-01-18T15:59:28Z
5 changed files:
- .gitlab-ci.yml
- buildstream/_profile.py
- buildstream/plugins/sources/bzr.py
- tests/frontend/track.py
- tests/testutils/runcli.py
Changes:
| ... | ... | @@ -185,6 +185,9 @@ docs: |
| 185 | 185 |
- pip3 install --user -e ${BST_EXT_URL}@${BST_EXT_REF}#egg=bst_ext
|
| 186 | 186 |
- git clone https://gitlab.com/freedesktop-sdk/freedesktop-sdk.git
|
| 187 | 187 |
- git -C freedesktop-sdk checkout ${FD_SDK_REF}
|
| 188 |
+ artifacts:
|
|
| 189 |
+ paths:
|
|
| 190 |
+ - "${HOME}/.cache/buildstream/logs"
|
|
| 188 | 191 |
only:
|
| 189 | 192 |
- schedules
|
| 190 | 193 |
|
| ... | ... | @@ -62,15 +62,24 @@ class Profile(): |
| 62 | 62 |
def end(self):
|
| 63 | 63 |
self.profiler.disable()
|
| 64 | 64 |
|
| 65 |
+ dt = datetime.datetime.fromtimestamp(self.start)
|
|
| 66 |
+ timestamp = dt.strftime('%Y%m%dT%H%M%S')
|
|
| 67 |
+ |
|
| 65 | 68 |
filename = self.key.replace('/', '-')
|
| 66 | 69 |
filename = filename.replace('.', '-')
|
| 67 |
- filename = os.path.join(os.getcwd(), 'profile-' + filename + '.log')
|
|
| 70 |
+ filename = os.path.join(os.getcwd(), 'profile-' + timestamp + '-' + filename)
|
|
| 68 | 71 |
|
| 69 |
- with open(filename, "a", encoding="utf-8") as f:
|
|
| 72 |
+ time_ = dt.strftime('%Y-%m-%d %H:%M:%S') # Human friendly format
|
|
| 73 |
+ self.__write_log(filename + '.log', time_)
|
|
| 74 |
+ |
|
| 75 |
+ self.__write_binary(filename + '.cprofile')
|
|
| 70 | 76 |
|
| 71 |
- dt = datetime.datetime.fromtimestamp(self.start)
|
|
| 72 |
- time_ = dt.strftime('%Y-%m-%d %H:%M:%S')
|
|
| 77 |
+ ########################################
|
|
| 78 |
+ # Private Methods #
|
|
| 79 |
+ ########################################
|
|
| 73 | 80 |
|
| 81 |
+ def __write_log(self, filename, time_):
|
|
| 82 |
+ with open(filename, "a", encoding="utf-8") as f:
|
|
| 74 | 83 |
heading = '================================================================\n'
|
| 75 | 84 |
heading += 'Profile for key: {}\n'.format(self.key)
|
| 76 | 85 |
heading += 'Started at: {}\n'.format(time_)
|
| ... | ... | @@ -81,6 +90,9 @@ class Profile(): |
| 81 | 90 |
ps = pstats.Stats(self.profiler, stream=f).sort_stats('cumulative')
|
| 82 | 91 |
ps.print_stats()
|
| 83 | 92 |
|
| 93 |
+ def __write_binary(self, filename):
|
|
| 94 |
+ self.profiler.dump_stats(filename)
|
|
| 95 |
+ |
|
| 84 | 96 |
|
| 85 | 97 |
# profile_start()
|
| 86 | 98 |
#
|
| ... | ... | @@ -56,6 +56,7 @@ details on common configuration options for sources. |
| 56 | 56 |
|
| 57 | 57 |
import os
|
| 58 | 58 |
import shutil
|
| 59 |
+import fcntl
|
|
| 59 | 60 |
from contextlib import contextmanager
|
| 60 | 61 |
|
| 61 | 62 |
from buildstream import Source, SourceError, Consistency
|
| ... | ... | @@ -84,10 +85,12 @@ class BzrSource(Source): |
| 84 | 85 |
if self.ref is None or self.tracking is None:
|
| 85 | 86 |
return Consistency.INCONSISTENT
|
| 86 | 87 |
|
| 87 |
- if self._check_ref():
|
|
| 88 |
- return Consistency.CACHED
|
|
| 89 |
- else:
|
|
| 90 |
- return Consistency.RESOLVED
|
|
| 88 |
+ # Lock for the _check_ref()
|
|
| 89 |
+ with self._locked():
|
|
| 90 |
+ if self._check_ref():
|
|
| 91 |
+ return Consistency.CACHED
|
|
| 92 |
+ else:
|
|
| 93 |
+ return Consistency.RESOLVED
|
|
| 91 | 94 |
|
| 92 | 95 |
def load_ref(self, node):
|
| 93 | 96 |
self.ref = self.node_get_member(node, str, 'ref', None)
|
| ... | ... | @@ -100,7 +103,7 @@ class BzrSource(Source): |
| 100 | 103 |
|
| 101 | 104 |
def track(self):
|
| 102 | 105 |
with self.timed_activity("Tracking {}".format(self.url),
|
| 103 |
- silent_nested=True):
|
|
| 106 |
+ silent_nested=True), self._locked():
|
|
| 104 | 107 |
self._ensure_mirror(skip_ref_check=True)
|
| 105 | 108 |
ret, out = self.check_output([self.host_bzr, "version-info",
|
| 106 | 109 |
"--custom", "--template={revno}",
|
| ... | ... | @@ -114,7 +117,7 @@ class BzrSource(Source): |
| 114 | 117 |
|
| 115 | 118 |
def fetch(self):
|
| 116 | 119 |
with self.timed_activity("Fetching {}".format(self.url),
|
| 117 |
- silent_nested=True):
|
|
| 120 |
+ silent_nested=True), self._locked():
|
|
| 118 | 121 |
self._ensure_mirror()
|
| 119 | 122 |
|
| 120 | 123 |
def stage(self, directory):
|
| ... | ... | @@ -141,6 +144,26 @@ class BzrSource(Source): |
| 141 | 144 |
"--directory={}".format(directory), url],
|
| 142 | 145 |
fail="Failed to switch workspace's parent branch to {}".format(url))
|
| 143 | 146 |
|
| 147 |
+ # _locked()
|
|
| 148 |
+ #
|
|
| 149 |
+ # This context manager ensures exclusive access to the
|
|
| 150 |
+ # bzr repository.
|
|
| 151 |
+ #
|
|
| 152 |
+ @contextmanager
|
|
| 153 |
+ def _locked(self):
|
|
| 154 |
+ lockdir = os.path.join(self.get_mirror_directory(), 'locks')
|
|
| 155 |
+ lockfile = os.path.join(
|
|
| 156 |
+ lockdir,
|
|
| 157 |
+ utils.url_directory_name(self.original_url) + '.lock'
|
|
| 158 |
+ )
|
|
| 159 |
+ os.makedirs(lockdir, exist_ok=True)
|
|
| 160 |
+ with open(lockfile, 'w') as lock:
|
|
| 161 |
+ fcntl.flock(lock, fcntl.LOCK_EX)
|
|
| 162 |
+ try:
|
|
| 163 |
+ yield
|
|
| 164 |
+ finally:
|
|
| 165 |
+ fcntl.flock(lock, fcntl.LOCK_UN)
|
|
| 166 |
+ |
|
| 144 | 167 |
def _check_ref(self):
|
| 145 | 168 |
# If the mirror doesnt exist yet, then we dont have the ref
|
| 146 | 169 |
if not os.path.exists(self._get_branch_dir()):
|
| ... | ... | @@ -157,83 +180,27 @@ class BzrSource(Source): |
| 157 | 180 |
return os.path.join(self.get_mirror_directory(),
|
| 158 | 181 |
utils.url_directory_name(self.original_url))
|
| 159 | 182 |
|
| 160 |
- def _atomic_replace_mirrordir(self, srcdir):
|
|
| 161 |
- """Helper function to safely replace the mirror dir"""
|
|
| 183 |
+ def _ensure_mirror(self, skip_ref_check=False):
|
|
| 184 |
+ mirror_dir = self._get_mirror_dir()
|
|
| 185 |
+ bzr_metadata_dir = os.path.join(mirror_dir, ".bzr")
|
|
| 186 |
+ if not os.path.exists(bzr_metadata_dir):
|
|
| 187 |
+ self.call([self.host_bzr, "init-repo", "--no-trees", mirror_dir],
|
|
| 188 |
+ fail="Failed to initialize bzr repository")
|
|
| 189 |
+ |
|
| 190 |
+ branch_dir = os.path.join(mirror_dir, self.tracking)
|
|
| 191 |
+ branch_url = self.url + "/" + self.tracking
|
|
| 192 |
+ if not os.path.exists(branch_dir):
|
|
| 193 |
+ # `bzr branch` the branch if it doesn't exist
|
|
| 194 |
+ # to get the upstream code
|
|
| 195 |
+ self.call([self.host_bzr, "branch", branch_url, branch_dir],
|
|
| 196 |
+ fail="Failed to branch from {} to {}".format(branch_url, branch_dir))
|
|
| 162 | 197 |
|
| 163 |
- if not os.path.exists(self._get_mirror_dir()):
|
|
| 164 |
- # Just move the srcdir to the mirror dir
|
|
| 165 |
- try:
|
|
| 166 |
- os.rename(srcdir, self._get_mirror_dir())
|
|
| 167 |
- except OSError as e:
|
|
| 168 |
- raise SourceError("{}: Failed to move srcdir '{}' to mirror dir '{}'"
|
|
| 169 |
- .format(str(self), srcdir, self._get_mirror_dir())) from e
|
|
| 170 | 198 |
else:
|
| 171 |
- # Atomically swap the backup dir.
|
|
| 172 |
- backupdir = self._get_mirror_dir() + ".bak"
|
|
| 173 |
- try:
|
|
| 174 |
- os.rename(self._get_mirror_dir(), backupdir)
|
|
| 175 |
- except OSError as e:
|
|
| 176 |
- raise SourceError("{}: Failed to move mirrordir '{}' to backup dir '{}'"
|
|
| 177 |
- .format(str(self), self._get_mirror_dir(), backupdir)) from e
|
|
| 199 |
+ # `bzr pull` the branch if it does exist
|
|
| 200 |
+ # to get any changes to the upstream code
|
|
| 201 |
+ self.call([self.host_bzr, "pull", "--directory={}".format(branch_dir), branch_url],
|
|
| 202 |
+ fail="Failed to pull new changes for {}".format(branch_dir))
|
|
| 178 | 203 |
|
| 179 |
- try:
|
|
| 180 |
- os.rename(srcdir, self._get_mirror_dir())
|
|
| 181 |
- except OSError as e:
|
|
| 182 |
- # Attempt to put the backup back!
|
|
| 183 |
- os.rename(backupdir, self._get_mirror_dir())
|
|
| 184 |
- raise SourceError("{}: Failed to replace bzr repo '{}' with '{}"
|
|
| 185 |
- .format(str(self), srcdir, self._get_mirror_dir())) from e
|
|
| 186 |
- finally:
|
|
| 187 |
- if os.path.exists(backupdir):
|
|
| 188 |
- shutil.rmtree(backupdir)
|
|
| 189 |
- |
|
| 190 |
- @contextmanager
|
|
| 191 |
- def _atomic_repodir(self):
|
|
| 192 |
- """Context manager for working in a copy of the bzr repository
|
|
| 193 |
- |
|
| 194 |
- Yields:
|
|
| 195 |
- (str): A path to the copy of the bzr repo
|
|
| 196 |
- |
|
| 197 |
- This should be used because bzr does not give any guarantees of
|
|
| 198 |
- atomicity, and aborting an operation at the wrong time (or
|
|
| 199 |
- accidentally running multiple concurrent operations) can leave the
|
|
| 200 |
- repo in an inconsistent state.
|
|
| 201 |
- """
|
|
| 202 |
- with self.tempdir() as repodir:
|
|
| 203 |
- mirror_dir = self._get_mirror_dir()
|
|
| 204 |
- if os.path.exists(mirror_dir):
|
|
| 205 |
- try:
|
|
| 206 |
- # shutil.copytree doesn't like it if destination exists
|
|
| 207 |
- shutil.rmtree(repodir)
|
|
| 208 |
- shutil.copytree(mirror_dir, repodir)
|
|
| 209 |
- except (shutil.Error, OSError) as e:
|
|
| 210 |
- raise SourceError("{}: Failed to copy bzr repo from '{}' to '{}'"
|
|
| 211 |
- .format(str(self), mirror_dir, repodir)) from e
|
|
| 212 |
- |
|
| 213 |
- yield repodir
|
|
| 214 |
- self._atomic_replace_mirrordir(repodir)
|
|
| 215 |
- |
|
| 216 |
- def _ensure_mirror(self, skip_ref_check=False):
|
|
| 217 |
- with self._atomic_repodir() as repodir:
|
|
| 218 |
- # Initialize repo if no metadata
|
|
| 219 |
- bzr_metadata_dir = os.path.join(repodir, ".bzr")
|
|
| 220 |
- if not os.path.exists(bzr_metadata_dir):
|
|
| 221 |
- self.call([self.host_bzr, "init-repo", "--no-trees", repodir],
|
|
| 222 |
- fail="Failed to initialize bzr repository")
|
|
| 223 |
- |
|
| 224 |
- branch_dir = os.path.join(repodir, self.tracking)
|
|
| 225 |
- branch_url = self.url + "/" + self.tracking
|
|
| 226 |
- if not os.path.exists(branch_dir):
|
|
| 227 |
- # `bzr branch` the branch if it doesn't exist
|
|
| 228 |
- # to get the upstream code
|
|
| 229 |
- self.call([self.host_bzr, "branch", branch_url, branch_dir],
|
|
| 230 |
- fail="Failed to branch from {} to {}".format(branch_url, branch_dir))
|
|
| 231 |
- |
|
| 232 |
- else:
|
|
| 233 |
- # `bzr pull` the branch if it does exist
|
|
| 234 |
- # to get any changes to the upstream code
|
|
| 235 |
- self.call([self.host_bzr, "pull", "--directory={}".format(branch_dir), branch_url],
|
|
| 236 |
- fail="Failed to pull new changes for {}".format(branch_dir))
|
|
| 237 | 204 |
if not skip_ref_check and not self._check_ref():
|
| 238 | 205 |
raise SourceError("Failed to ensure ref '{}' was mirrored".format(self.ref),
|
| 239 | 206 |
reason="ref-not-mirrored")
|
| ... | ... | @@ -73,14 +73,36 @@ def test_track(cli, tmpdir, datafiles, ref_storage, kind): |
| 73 | 73 |
assert not os.path.exists(os.path.join(project, 'project.refs'))
|
| 74 | 74 |
|
| 75 | 75 |
|
| 76 |
+# NOTE:
|
|
| 77 |
+#
|
|
| 78 |
+# This test checks that recursive tracking works by observing
|
|
| 79 |
+# element states after running a recursive tracking operation.
|
|
| 80 |
+#
|
|
| 81 |
+# However, this test is ALSO valuable as it stresses the source
|
|
| 82 |
+# plugins in a situation where many source plugins are operating
|
|
| 83 |
+# at once on the same backing repository.
|
|
| 84 |
+#
|
|
| 85 |
+# Do not change this test to use a separate 'Repo' per element
|
|
| 86 |
+# as that would defeat the purpose of the stress test, otherwise
|
|
| 87 |
+# please refactor that aspect into another test.
|
|
| 88 |
+#
|
|
| 76 | 89 |
@pytest.mark.datafiles(DATA_DIR)
|
| 90 |
+@pytest.mark.parametrize("amount", [(1), (10)])
|
|
| 77 | 91 |
@pytest.mark.parametrize("kind", [(kind) for kind in ALL_REPO_KINDS])
|
| 78 |
-def test_track_recurse(cli, tmpdir, datafiles, kind):
|
|
| 92 |
+def test_track_recurse(cli, tmpdir, datafiles, kind, amount):
|
|
| 79 | 93 |
project = os.path.join(datafiles.dirname, datafiles.basename)
|
| 80 | 94 |
dev_files_path = os.path.join(project, 'files', 'dev-files')
|
| 81 | 95 |
element_path = os.path.join(project, 'elements')
|
| 82 |
- element_dep_name = 'track-test-dep-{}.bst'.format(kind)
|
|
| 83 |
- element_target_name = 'track-test-target-{}.bst'.format(kind)
|
|
| 96 |
+ |
|
| 97 |
+ # Try to actually launch as many fetch jobs as possible at the same time
|
|
| 98 |
+ #
|
|
| 99 |
+ # This stresses the Source plugins and helps to ensure that
|
|
| 100 |
+ # they handle concurrent access to the store correctly.
|
|
| 101 |
+ cli.configure({
|
|
| 102 |
+ 'scheduler': {
|
|
| 103 |
+ 'fetchers': amount,
|
|
| 104 |
+ }
|
|
| 105 |
+ })
|
|
| 84 | 106 |
|
| 85 | 107 |
# Create our repo object of the given source type with
|
| 86 | 108 |
# the dev files, and then collect the initial ref.
|
| ... | ... | @@ -89,18 +111,26 @@ def test_track_recurse(cli, tmpdir, datafiles, kind): |
| 89 | 111 |
ref = repo.create(dev_files_path)
|
| 90 | 112 |
|
| 91 | 113 |
# Write out our test targets
|
| 92 |
- generate_element(repo, os.path.join(element_path, element_dep_name))
|
|
| 93 |
- generate_element(repo, os.path.join(element_path, element_target_name),
|
|
| 94 |
- dep_name=element_dep_name)
|
|
| 114 |
+ element_names = []
|
|
| 115 |
+ last_element_name = None
|
|
| 116 |
+ for i in range(amount + 1):
|
|
| 117 |
+ element_name = 'track-test-{}-{}.bst'.format(kind, i + 1)
|
|
| 118 |
+ filename = os.path.join(element_path, element_name)
|
|
| 119 |
+ |
|
| 120 |
+ element_names.append(element_name)
|
|
| 121 |
+ |
|
| 122 |
+ generate_element(repo, filename, dep_name=last_element_name)
|
|
| 123 |
+ last_element_name = element_name
|
|
| 95 | 124 |
|
| 96 | 125 |
# Assert that a fetch is needed
|
| 97 |
- assert cli.get_element_state(project, element_dep_name) == 'no reference'
|
|
| 98 |
- assert cli.get_element_state(project, element_target_name) == 'no reference'
|
|
| 126 |
+ states = cli.get_element_states(project, last_element_name)
|
|
| 127 |
+ for element_name in element_names:
|
|
| 128 |
+ assert states[element_name] == 'no reference'
|
|
| 99 | 129 |
|
| 100 | 130 |
# Now first try to track it
|
| 101 | 131 |
result = cli.run(project=project, args=[
|
| 102 | 132 |
'source', 'track', '--deps', 'all',
|
| 103 |
- element_target_name])
|
|
| 133 |
+ last_element_name])
|
|
| 104 | 134 |
result.assert_success()
|
| 105 | 135 |
|
| 106 | 136 |
# And now fetch it: The Source has probably already cached the
|
| ... | ... | @@ -109,12 +139,16 @@ def test_track_recurse(cli, tmpdir, datafiles, kind): |
| 109 | 139 |
# is the job of fetch.
|
| 110 | 140 |
result = cli.run(project=project, args=[
|
| 111 | 141 |
'source', 'fetch', '--deps', 'all',
|
| 112 |
- element_target_name])
|
|
| 142 |
+ last_element_name])
|
|
| 113 | 143 |
result.assert_success()
|
| 114 | 144 |
|
| 115 |
- # Assert that the dependency is buildable and the target is waiting
|
|
| 116 |
- assert cli.get_element_state(project, element_dep_name) == 'buildable'
|
|
| 117 |
- assert cli.get_element_state(project, element_target_name) == 'waiting'
|
|
| 145 |
+ # Assert that the base is buildable and the rest are waiting
|
|
| 146 |
+ states = cli.get_element_states(project, last_element_name)
|
|
| 147 |
+ for element_name in element_names:
|
|
| 148 |
+ if element_name == element_names[0]:
|
|
| 149 |
+ assert states[element_name] == 'buildable'
|
|
| 150 |
+ else:
|
|
| 151 |
+ assert states[element_name] == 'waiting'
|
|
| 118 | 152 |
|
| 119 | 153 |
|
| 120 | 154 |
@pytest.mark.datafiles(DATA_DIR)
|
| ... | ... | @@ -375,6 +375,9 @@ class Cli(): |
| 375 | 375 |
# Fetch an element state by name by
|
| 376 | 376 |
# invoking bst show on the project with the CLI
|
| 377 | 377 |
#
|
| 378 |
+ # If you need to get the states of multiple elements,
|
|
| 379 |
+ # then use get_element_states(s) instead.
|
|
| 380 |
+ #
|
|
| 378 | 381 |
def get_element_state(self, project, element_name):
|
| 379 | 382 |
result = self.run(project=project, silent=True, args=[
|
| 380 | 383 |
'show',
|
| ... | ... | @@ -385,6 +388,25 @@ class Cli(): |
| 385 | 388 |
result.assert_success()
|
| 386 | 389 |
return result.output.strip()
|
| 387 | 390 |
|
| 391 |
+ # Fetch the states of elements for a given target / deps
|
|
| 392 |
+ #
|
|
| 393 |
+ # Returns a dictionary with the element names as keys
|
|
| 394 |
+ #
|
|
| 395 |
+ def get_element_states(self, project, target, deps='all'):
|
|
| 396 |
+ result = self.run(project=project, silent=True, args=[
|
|
| 397 |
+ 'show',
|
|
| 398 |
+ '--deps', deps,
|
|
| 399 |
+ '--format', '%{name}||%{state}',
|
|
| 400 |
+ target
|
|
| 401 |
+ ])
|
|
| 402 |
+ result.assert_success()
|
|
| 403 |
+ lines = result.output.splitlines()
|
|
| 404 |
+ states = {}
|
|
| 405 |
+ for line in lines:
|
|
| 406 |
+ split = line.split(sep='||')
|
|
| 407 |
+ states[split[0]] = split[1]
|
|
| 408 |
+ return states
|
|
| 409 |
+ |
|
| 388 | 410 |
# Fetch an element's cache key by invoking bst show
|
| 389 | 411 |
# on the project with the CLI
|
| 390 | 412 |
#
|
