Jim MacArthur pushed to branch jmac/fix-test-hangs-2 at BuildStream / buildstream
Commits:
-
5bf4ce54
by Jim MacArthur at 2018-11-22T15:48:07Z
3 changed files:
Changes:
... | ... | @@ -25,6 +25,17 @@ def message_handler(message, context): |
25 | 25 |
pass
|
26 | 26 |
|
27 | 27 |
|
28 |
+# Since parent processes wait for queue events, we need
|
|
29 |
+# to put something on it if the called process raises an
|
|
30 |
+# exception.
|
|
31 |
+def _queue_wrapper(target, queue, *args):
|
|
32 |
+ try:
|
|
33 |
+ target(*args, queue=queue)
|
|
34 |
+ except Exception as e:
|
|
35 |
+ queue.put(str(e))
|
|
36 |
+ raise
|
|
37 |
+ |
|
38 |
+ |
|
28 | 39 |
def tree_maker(cas, tree, directory):
|
29 | 40 |
if tree.root.ByteSize() == 0:
|
30 | 41 |
tree.root.CopyFrom(directory)
|
... | ... | @@ -97,9 +108,9 @@ def test_pull(cli, tmpdir, datafiles): |
97 | 108 |
queue = multiprocessing.Queue()
|
98 | 109 |
# Use subprocess to avoid creation of gRPC threads in main BuildStream process
|
99 | 110 |
# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
|
100 |
- process = multiprocessing.Process(target=_test_pull,
|
|
101 |
- args=(user_config_file, project_dir, artifact_dir,
|
|
102 |
- 'target.bst', element_key, queue))
|
|
111 |
+ process = multiprocessing.Process(target=_queue_wrapper,
|
|
112 |
+ args=(_test_pull, queue, user_config_file, project_dir,
|
|
113 |
+ artifact_dir, 'target.bst', element_key))
|
|
103 | 114 |
|
104 | 115 |
try:
|
105 | 116 |
# Keep SIGINT blocked in the child process
|
... | ... | @@ -205,9 +216,9 @@ def test_pull_tree(cli, tmpdir, datafiles): |
205 | 216 |
queue = multiprocessing.Queue()
|
206 | 217 |
# Use subprocess to avoid creation of gRPC threads in main BuildStream process
|
207 | 218 |
# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
|
208 |
- process = multiprocessing.Process(target=_test_push_tree,
|
|
209 |
- args=(user_config_file, project_dir, artifact_dir,
|
|
210 |
- artifact_digest, queue))
|
|
219 |
+ process = multiprocessing.Process(target=_queue_wrapper,
|
|
220 |
+ args=(_test_push_tree, queue, user_config_file, project_dir,
|
|
221 |
+ artifact_dir, artifact_digest))
|
|
211 | 222 |
|
212 | 223 |
try:
|
213 | 224 |
# Keep SIGINT blocked in the child process
|
... | ... | @@ -233,9 +244,9 @@ def test_pull_tree(cli, tmpdir, datafiles): |
233 | 244 |
|
234 | 245 |
queue = multiprocessing.Queue()
|
235 | 246 |
# Use subprocess to avoid creation of gRPC threads in main BuildStream process
|
236 |
- process = multiprocessing.Process(target=_test_pull_tree,
|
|
237 |
- args=(user_config_file, project_dir, artifact_dir,
|
|
238 |
- tree_digest, queue))
|
|
247 |
+ process = multiprocessing.Process(target=_queue_wrapper,
|
|
248 |
+ args=(_test_pull_tree, queue, user_config_file, project_dir,
|
|
249 |
+ artifact_dir, tree_digest))
|
|
239 | 250 |
|
240 | 251 |
try:
|
241 | 252 |
# Keep SIGINT blocked in the child process
|
... | ... | @@ -26,6 +26,17 @@ def message_handler(message, context): |
26 | 26 |
pass
|
27 | 27 |
|
28 | 28 |
|
29 |
+# Since parent processes wait for queue events, we need
|
|
30 |
+# to put something on it if the called process raises an
|
|
31 |
+# exception.
|
|
32 |
+def _queue_wrapper(target, queue, *args):
|
|
33 |
+ try:
|
|
34 |
+ target(*args, queue=queue)
|
|
35 |
+ except Exception as e:
|
|
36 |
+ queue.put(str(e))
|
|
37 |
+ raise
|
|
38 |
+ |
|
39 |
+ |
|
29 | 40 |
@pytest.mark.datafiles(DATA_DIR)
|
30 | 41 |
def test_push(cli, tmpdir, datafiles):
|
31 | 42 |
project_dir = str(datafiles)
|
... | ... | @@ -76,9 +87,9 @@ def test_push(cli, tmpdir, datafiles): |
76 | 87 |
queue = multiprocessing.Queue()
|
77 | 88 |
# Use subprocess to avoid creation of gRPC threads in main BuildStream process
|
78 | 89 |
# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
|
79 |
- process = multiprocessing.Process(target=_test_push,
|
|
80 |
- args=(user_config_file, project_dir, artifact_dir,
|
|
81 |
- 'target.bst', element_key, queue))
|
|
90 |
+ process = multiprocessing.Process(target=_queue_wrapper,
|
|
91 |
+ args=(_test_push, queue, user_config_file, project_dir,
|
|
92 |
+ artifact_dir, 'target.bst', element_key))
|
|
82 | 93 |
|
83 | 94 |
try:
|
84 | 95 |
# Keep SIGINT blocked in the child process
|
... | ... | @@ -185,9 +196,9 @@ def test_push_directory(cli, tmpdir, datafiles): |
185 | 196 |
queue = multiprocessing.Queue()
|
186 | 197 |
# Use subprocess to avoid creation of gRPC threads in main BuildStream process
|
187 | 198 |
# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
|
188 |
- process = multiprocessing.Process(target=_test_push_directory,
|
|
189 |
- args=(user_config_file, project_dir, artifact_dir,
|
|
190 |
- artifact_digest, queue))
|
|
199 |
+ process = multiprocessing.Process(target=_queue_wrapper,
|
|
200 |
+ args=(_test_push_directory, queue, user_config_file,
|
|
201 |
+ project_dir, artifact_dir, artifact_digest))
|
|
191 | 202 |
|
192 | 203 |
try:
|
193 | 204 |
# Keep SIGINT blocked in the child process
|
... | ... | @@ -260,8 +271,9 @@ def test_push_message(cli, tmpdir, datafiles): |
260 | 271 |
queue = multiprocessing.Queue()
|
261 | 272 |
# Use subprocess to avoid creation of gRPC threads in main BuildStream process
|
262 | 273 |
# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
|
263 |
- process = multiprocessing.Process(target=_test_push_message,
|
|
264 |
- args=(user_config_file, project_dir, artifact_dir, queue))
|
|
274 |
+ process = multiprocessing.Process(target=_queue_wrapper,
|
|
275 |
+ args=(_test_push_message, queue, user_config_file,
|
|
276 |
+ project_dir, artifact_dir))
|
|
265 | 277 |
|
266 | 278 |
try:
|
267 | 279 |
# Keep SIGINT blocked in the child process
|
... | ... | @@ -67,22 +67,26 @@ class ArtifactShare(): |
67 | 67 |
def run(self, q):
|
68 | 68 |
pytest_cov.embed.cleanup_on_sigterm()
|
69 | 69 |
|
70 |
- # Optionally mock statvfs
|
|
71 |
- if self.total_space:
|
|
72 |
- if self.free_space is None:
|
|
73 |
- self.free_space = self.total_space
|
|
74 |
- os.statvfs = self._mock_statvfs
|
|
70 |
+ try:
|
|
71 |
+ # Optionally mock statvfs
|
|
72 |
+ if self.total_space:
|
|
73 |
+ if self.free_space is None:
|
|
74 |
+ self.free_space = self.total_space
|
|
75 |
+ os.statvfs = self._mock_statvfs
|
|
75 | 76 |
|
76 |
- server = create_server(self.repodir, enable_push=True)
|
|
77 |
- port = server.add_insecure_port('localhost:0')
|
|
77 |
+ server = create_server(self.repodir, enable_push=True)
|
|
78 |
+ port = server.add_insecure_port('localhost:0')
|
|
78 | 79 |
|
79 |
- server.start()
|
|
80 |
+ server.start()
|
|
80 | 81 |
|
81 |
- # Send port to parent
|
|
82 |
- q.put(port)
|
|
82 |
+ # Send port to parent
|
|
83 |
+ q.put(port)
|
|
83 | 84 |
|
84 |
- # Sleep until termination by signal
|
|
85 |
- signal.pause()
|
|
85 |
+ # Sleep until termination by signal
|
|
86 |
+ signal.pause()
|
|
87 |
+ except Exception as e:
|
|
88 |
+ q.put(None)
|
|
89 |
+ raise
|
|
86 | 90 |
|
87 | 91 |
# has_object():
|
88 | 92 |
#
|