Tristan Van Berkom pushed to branch tristan/element-processing-order at BuildStream / buildstream
Commits:
-
0d423e1a
by Tristan Van Berkom at 2019-01-14T19:55:07Z
-
5768ede9
by Tristan Van Berkom at 2019-01-14T19:55:07Z
-
db8931ed
by Tristan Van Berkom at 2019-01-14T19:55:07Z
6 changed files:
- buildstream/_artifactcache/artifactcache.py
- buildstream/_scheduler/jobs/job.py
- buildstream/_scheduler/queues/queue.py
- buildstream/_scheduler/resources.py
- buildstream/_scheduler/scheduler.py
- tests/frontend/order.py
Changes:
... | ... | @@ -249,7 +249,7 @@ class ArtifactCache(): |
249 | 249 |
# FIXME: Asking the user what to do may be neater
|
250 | 250 |
default_conf = os.path.join(os.environ['XDG_CONFIG_HOME'],
|
251 | 251 |
'buildstream.conf')
|
252 |
- detail = ("There is not enough space to build the given element.\n"
|
|
252 |
+ detail = ("There is not enough space to complete the build.\n"
|
|
253 | 253 |
"Please increase the cache-quota in {}."
|
254 | 254 |
.format(self.context.config_origin or default_conf))
|
255 | 255 |
|
... | ... | @@ -85,28 +85,11 @@ class Process(multiprocessing.Process): |
85 | 85 |
# action_name (str): The queue action name
|
86 | 86 |
# logfile (str): A template string that points to the logfile
|
87 | 87 |
# that should be used - should contain {pid}.
|
88 |
-# resources (iter(ResourceType)) - A set of resources this job
|
|
89 |
-# wants to use.
|
|
90 |
-# exclusive_resources (iter(ResourceType)) - A set of resources
|
|
91 |
-# this job wants to use
|
|
92 |
-# exclusively.
|
|
93 | 88 |
# max_retries (int): The maximum number of retries
|
94 | 89 |
#
|
95 | 90 |
class Job():
|
96 | 91 |
|
97 |
- def __init__(self, scheduler, action_name, logfile, *,
|
|
98 |
- resources=None, exclusive_resources=None, max_retries=0):
|
|
99 |
- |
|
100 |
- if resources is None:
|
|
101 |
- resources = set()
|
|
102 |
- else:
|
|
103 |
- resources = set(resources)
|
|
104 |
- if exclusive_resources is None:
|
|
105 |
- exclusive_resources = set()
|
|
106 |
- else:
|
|
107 |
- exclusive_resources = set(resources)
|
|
108 |
- |
|
109 |
- assert exclusive_resources <= resources, "All exclusive resources must also be resources!"
|
|
92 |
+ def __init__(self, scheduler, action_name, logfile, *, max_retries=0):
|
|
110 | 93 |
|
111 | 94 |
#
|
112 | 95 |
# Public members
|
... | ... | @@ -114,12 +97,6 @@ class Job(): |
114 | 97 |
self.action_name = action_name # The action name for the Queue
|
115 | 98 |
self.child_data = None # Data to be sent to the main process
|
116 | 99 |
|
117 |
- # The resources this job wants to access
|
|
118 |
- self.resources = resources
|
|
119 |
- # Resources this job needs to access exclusively, i.e., no
|
|
120 |
- # other job should be allowed to access them
|
|
121 |
- self.exclusive_resources = exclusive_resources
|
|
122 |
- |
|
123 | 100 |
#
|
124 | 101 |
# Private members
|
125 | 102 |
#
|
... | ... | @@ -72,8 +72,9 @@ class Queue(): |
72 | 72 |
# Private members
|
73 | 73 |
#
|
74 | 74 |
self._scheduler = scheduler
|
75 |
- self._wait_queue = deque()
|
|
76 |
- self._done_queue = deque()
|
|
75 |
+ self._resources = scheduler.resources # Shared resource pool
|
|
76 |
+ self._wait_queue = deque() # Ready / Waiting elements
|
|
77 |
+ self._done_queue = deque() # Processed / Skipped elements
|
|
77 | 78 |
self._max_retries = 0
|
78 | 79 |
|
79 | 80 |
# Assert the subclass has setup class data
|
... | ... | @@ -115,16 +116,6 @@ class Queue(): |
115 | 116 |
def status(self, element):
|
116 | 117 |
return QueueStatus.READY
|
117 | 118 |
|
118 |
- # prepare()
|
|
119 |
- #
|
|
120 |
- # Abstract method for handling job preparation in the main process.
|
|
121 |
- #
|
|
122 |
- # Args:
|
|
123 |
- # element (Element): The element which is scheduled
|
|
124 |
- #
|
|
125 |
- def prepare(self, element):
|
|
126 |
- pass
|
|
127 |
- |
|
128 | 119 |
# done()
|
129 | 120 |
#
|
130 | 121 |
# Abstract method for handling a successful job completion.
|
... | ... | @@ -153,26 +144,18 @@ class Queue(): |
153 | 144 |
if not elts:
|
154 | 145 |
return
|
155 | 146 |
|
156 |
- # Note: The internal lists work with jobs. This is not
|
|
157 |
- # reflected in any external methods (except
|
|
158 |
- # pop/peek_ready_jobs).
|
|
159 |
- def create_job(element):
|
|
160 |
- logfile = self._element_log_path(element)
|
|
161 |
- return ElementJob(self._scheduler, self.action_name,
|
|
162 |
- logfile, element=element, queue=self,
|
|
163 |
- resources=self.resources,
|
|
164 |
- action_cb=self.process,
|
|
165 |
- complete_cb=self._job_done,
|
|
166 |
- max_retries=self._max_retries)
|
|
167 |
- |
|
168 |
- # Place skipped elements directly on the done queue
|
|
169 |
- jobs = [create_job(elt) for elt in elts]
|
|
170 |
- skip = [job for job in jobs if self.status(job.element) == QueueStatus.SKIP]
|
|
171 |
- wait = [job for job in jobs if job not in skip]
|
|
172 |
- |
|
173 |
- self.skipped_elements.extend([job.element for job in skip])
|
|
174 |
- self._wait_queue.extend(wait)
|
|
175 |
- self._done_queue.extend(skip)
|
|
147 |
+ # Place skipped elements on the done queue right away.
|
|
148 |
+ #
|
|
149 |
+ # The remaining ready and waiting elements must remain in the
|
|
150 |
+ # same queue, and ready status must be determined at the moment
|
|
151 |
+ # which the scheduler is asking for the next job.
|
|
152 |
+ #
|
|
153 |
+ skip = [elt for elt in elts if self.status(elt) == QueueStatus.SKIP]
|
|
154 |
+ wait = [elt for elt in elts if elt not in skip]
|
|
155 |
+ |
|
156 |
+ self.skipped_elements.extend(skip) # Public record of skipped elements
|
|
157 |
+ self._done_queue.extend(skip) # Elements to be processed
|
|
158 |
+ self._wait_queue.extend(wait) # Elements eligible to be dequeued
|
|
176 | 159 |
|
177 | 160 |
# dequeue()
|
178 | 161 |
#
|
... | ... | @@ -184,69 +167,59 @@ class Queue(): |
184 | 167 |
#
|
185 | 168 |
def dequeue(self):
|
186 | 169 |
while self._done_queue:
|
187 |
- yield self._done_queue.popleft().element
|
|
170 |
+ yield self._done_queue.popleft()
|
|
188 | 171 |
|
189 | 172 |
# dequeue_ready()
|
190 | 173 |
#
|
191 |
- # Reports whether there are any elements to dequeue
|
|
174 |
+ # Reports whether any elements can be promoted to other queues
|
|
192 | 175 |
#
|
193 | 176 |
# Returns:
|
194 |
- # (bool): Whether there are elements to dequeue
|
|
177 |
+ # (bool): Whether there are elements ready
|
|
195 | 178 |
#
|
196 | 179 |
def dequeue_ready(self):
|
197 | 180 |
return any(self._done_queue)
|
198 | 181 |
|
199 |
- # pop_ready_jobs()
|
|
200 |
- #
|
|
201 |
- # Returns:
|
|
202 |
- # ([Job]): A list of jobs to run
|
|
182 |
+ # harvest_jobs()
|
|
203 | 183 |
#
|
204 | 184 |
# Process elements in the queue, moving elements which were enqueued
|
205 |
- # into the dequeue pool, and processing them if necessary.
|
|
206 |
- #
|
|
207 |
- # This will have different results for elements depending
|
|
208 |
- # on the Queue.status() implementation.
|
|
209 |
- #
|
|
210 |
- # o Elements which are QueueStatus.WAIT will not be affected
|
|
185 |
+ # into the dequeue pool, and creating as many jobs for which resources
|
|
186 |
+ # can be reserved.
|
|
211 | 187 |
#
|
212 |
- # o Elements which are QueueStatus.SKIP will move directly
|
|
213 |
- # to the dequeue pool
|
|
214 |
- #
|
|
215 |
- # o For Elements which are QueueStatus.READY a Job will be
|
|
216 |
- # created and returned to the caller, given that the scheduler
|
|
217 |
- # allows the Queue enough resources for the given job
|
|
188 |
+ # Returns:
|
|
189 |
+ # ([Job]): A list of jobs which can be run now
|
|
218 | 190 |
#
|
219 |
- def pop_ready_jobs(self):
|
|
191 |
+ def harvest_jobs(self):
|
|
220 | 192 |
unready = []
|
221 | 193 |
ready = []
|
222 | 194 |
|
223 | 195 |
while self._wait_queue:
|
224 |
- job = self._wait_queue.popleft()
|
|
225 |
- element = job.element
|
|
196 |
+ if not self._resources.reserve(self.resources, peek=True):
|
|
197 |
+ break
|
|
226 | 198 |
|
199 |
+ element = self._wait_queue.popleft()
|
|
227 | 200 |
status = self.status(element)
|
201 |
+ |
|
228 | 202 |
if status == QueueStatus.WAIT:
|
229 |
- unready.append(job)
|
|
230 |
- continue
|
|
203 |
+ unready.append(element)
|
|
231 | 204 |
elif status == QueueStatus.SKIP:
|
232 |
- self._done_queue.append(job)
|
|
205 |
+ self._done_queue.append(element)
|
|
233 | 206 |
self.skipped_elements.append(element)
|
234 |
- continue
|
|
235 |
- |
|
236 |
- self.prepare(element)
|
|
237 |
- ready.append(job)
|
|
207 |
+ else:
|
|
208 |
+ reserved = self._resources.reserve(self.resources)
|
|
209 |
+ assert reserved
|
|
210 |
+ ready.append(element)
|
|
238 | 211 |
|
239 |
- # These were not ready but were in the beginning, give em
|
|
240 |
- # first priority again next time around
|
|
241 | 212 |
self._wait_queue.extendleft(unready)
|
242 | 213 |
|
243 |
- return ready
|
|
244 |
- |
|
245 |
- def peek_ready_jobs(self):
|
|
246 |
- def ready(job):
|
|
247 |
- return self.status(job.element) == QueueStatus.READY
|
|
248 |
- |
|
249 |
- yield from (job for job in self._wait_queue if ready(job))
|
|
214 |
+ return [
|
|
215 |
+ ElementJob(self._scheduler, self.action_name,
|
|
216 |
+ self._element_log_path(element),
|
|
217 |
+ element=element, queue=self,
|
|
218 |
+ action_cb=self.process,
|
|
219 |
+ complete_cb=self._job_done,
|
|
220 |
+ max_retries=self._max_retries)
|
|
221 |
+ for element in ready
|
|
222 |
+ ]
|
|
250 | 223 |
|
251 | 224 |
#####################################################
|
252 | 225 |
# Private Methods #
|
... | ... | @@ -292,6 +265,10 @@ class Queue(): |
292 | 265 |
#
|
293 | 266 |
def _job_done(self, job, element, status, result):
|
294 | 267 |
|
268 |
+ # Now release the resources we reserved
|
|
269 |
+ #
|
|
270 |
+ self._resources.release(self.resources)
|
|
271 |
+ |
|
295 | 272 |
# Update values that need to be synchronized in the main task
|
296 | 273 |
# before calling any queue implementation
|
297 | 274 |
self._update_workspaces(element, job)
|
... | ... | @@ -324,12 +301,8 @@ class Queue(): |
324 | 301 |
detail=traceback.format_exc())
|
325 | 302 |
self.failed_elements.append(element)
|
326 | 303 |
else:
|
327 |
- #
|
|
328 |
- # No exception occured in post processing
|
|
329 |
- #
|
|
330 |
- |
|
331 |
- # All jobs get placed on the done queue for later processing.
|
|
332 |
- self._done_queue.append(job)
|
|
304 |
+ # All elements get placed on the done queue for later processing.
|
|
305 |
+ self._done_queue.append(element)
|
|
333 | 306 |
|
334 | 307 |
# These lists are for bookkeeping purposes for the UI and logging.
|
335 | 308 |
if status == JobStatus.SKIPPED:
|
... | ... | @@ -34,28 +34,25 @@ class Resources(): |
34 | 34 |
ResourceType.UPLOAD: set()
|
35 | 35 |
}
|
36 | 36 |
|
37 |
- def clear_job_resources(self, job):
|
|
38 |
- for resource in job.exclusive_resources:
|
|
39 |
- self._exclusive_resources[resource].remove(hash(job))
|
|
37 |
+ # reserve()
|
|
38 |
+ #
|
|
39 |
+ # Reserves a set of resources
|
|
40 |
+ #
|
|
41 |
+ # Args:
|
|
42 |
+ # resources (set): A set of ResourceTypes
|
|
43 |
+ # exclusive (set): Another set of ResourceTypes
|
|
44 |
+ # peek (bool): Whether to only peek at whether the resource is available
|
|
45 |
+ #
|
|
46 |
+ # Returns:
|
|
47 |
+ # (bool): True if the resources could be reserved
|
|
48 |
+ #
|
|
49 |
+ def reserve(self, resources, exclusive=None, *, peek=False):
|
|
50 |
+ if exclusive is None:
|
|
51 |
+ exclusive = set()
|
|
40 | 52 |
|
41 |
- for resource in job.resources:
|
|
42 |
- self._used_resources[resource] -= 1
|
|
43 |
- |
|
44 |
- def reserve_exclusive_resources(self, job):
|
|
45 |
- exclusive = job.exclusive_resources
|
|
46 |
- |
|
47 |
- # The very first thing we do is to register any exclusive
|
|
48 |
- # resources this job may want. Even if the job is not yet
|
|
49 |
- # allowed to run (because another job is holding the resource
|
|
50 |
- # it wants), we can still set this - it just means that any
|
|
51 |
- # job *currently* using these resources has to finish first,
|
|
52 |
- # and no new jobs wanting these can be launched (except other
|
|
53 |
- # exclusive-access jobs).
|
|
54 |
- #
|
|
55 |
- for resource in exclusive:
|
|
56 |
- self._exclusive_resources[resource].add(hash(job))
|
|
53 |
+ resources = set(resources)
|
|
54 |
+ exclusive = set(exclusive)
|
|
57 | 55 |
|
58 |
- def reserve_job_resources(self, job):
|
|
59 | 56 |
# First, we check if the job wants to access a resource that
|
60 | 57 |
# another job wants exclusive access to. If so, it cannot be
|
61 | 58 |
# scheduled.
|
... | ... | @@ -68,7 +65,8 @@ class Resources(): |
68 | 65 |
# is currently not possible, but may be worth thinking
|
69 | 66 |
# about.
|
70 | 67 |
#
|
71 |
- for resource in job.resources - job.exclusive_resources:
|
|
68 |
+ for resource in resources - exclusive:
|
|
69 |
+ |
|
72 | 70 |
# If our job wants this resource exclusively, we never
|
73 | 71 |
# check this, so we can get away with not (temporarily)
|
74 | 72 |
# removing it from the set.
|
... | ... | @@ -84,14 +82,14 @@ class Resources(): |
84 | 82 |
# at a time, despite being allowed to be part of the exclusive
|
85 | 83 |
# set.
|
86 | 84 |
#
|
87 |
- for exclusive in job.exclusive_resources:
|
|
88 |
- if self._used_resources[exclusive] != 0:
|
|
85 |
+ for resource in exclusive:
|
|
86 |
+ if self._used_resources[resource] != 0:
|
|
89 | 87 |
return False
|
90 | 88 |
|
91 | 89 |
# Finally, we check if we have enough of each resource
|
92 | 90 |
# available. If we don't have enough, the job cannot be
|
93 | 91 |
# scheduled.
|
94 |
- for resource in job.resources:
|
|
92 |
+ for resource in resources:
|
|
95 | 93 |
if (self._max_resources[resource] > 0 and
|
96 | 94 |
self._used_resources[resource] >= self._max_resources[resource]):
|
97 | 95 |
return False
|
... | ... | @@ -99,7 +97,70 @@ class Resources(): |
99 | 97 |
# Now we register the fact that our job is using the resources
|
100 | 98 |
# it asked for, and tell the scheduler that it is allowed to
|
101 | 99 |
# continue.
|
102 |
- for resource in job.resources:
|
|
103 |
- self._used_resources[resource] += 1
|
|
100 |
+ if not peek:
|
|
101 |
+ for resource in resources:
|
|
102 |
+ self._used_resources[resource] += 1
|
|
104 | 103 |
|
105 | 104 |
return True
|
105 |
+ |
|
106 |
+ # release()
|
|
107 |
+ #
|
|
108 |
+ # Release resources previously reserved with Resources.reserve()
|
|
109 |
+ #
|
|
110 |
+ # Args:
|
|
111 |
+ # resources (set): A set of resources to release
|
|
112 |
+ #
|
|
113 |
+ def release(self, resources):
|
|
114 |
+ for resource in resources:
|
|
115 |
+ assert self._used_resources[resource] > 0, "Scheduler resource imbalance"
|
|
116 |
+ self._used_resources[resource] -= 1
|
|
117 |
+ |
|
118 |
+ # register_exclusive_interest()
|
|
119 |
+ #
|
|
120 |
+ # Inform the resources pool that `source` has an interest in
|
|
121 |
+ # reserving this resource exclusively.
|
|
122 |
+ #
|
|
123 |
+ # The source parameter is used to identify the caller, it
|
|
124 |
+ # must be ensured to be unique for the time that the
|
|
125 |
+ # interest is registered.
|
|
126 |
+ #
|
|
127 |
+ # This function may be called multiple times, and subsequent
|
|
128 |
+ # calls will simply have no effect until clear_exclusive_interest()
|
|
129 |
+ # is used to clear the interest.
|
|
130 |
+ #
|
|
131 |
+ # This must be called in advance of reserve()
|
|
132 |
+ #
|
|
133 |
+ # Args:
|
|
134 |
+ # resources (set): Set of resources to reserve exclusively
|
|
135 |
+ # source (any): Source identifier, to be used again when unregistering
|
|
136 |
+ # the interest.
|
|
137 |
+ #
|
|
138 |
+ def register_exclusive_interest(self, resources, source):
|
|
139 |
+ |
|
140 |
+ # The very first thing we do is to register any exclusive
|
|
141 |
+ # resources this job may want. Even if the job is not yet
|
|
142 |
+ # allowed to run (because another job is holding the resource
|
|
143 |
+ # it wants), we can still set this - it just means that any
|
|
144 |
+ # job *currently* using these resources has to finish first,
|
|
145 |
+ # and no new jobs wanting these can be launched (except other
|
|
146 |
+ # exclusive-access jobs).
|
|
147 |
+ #
|
|
148 |
+ for resource in resources:
|
|
149 |
+ self._exclusive_resources[resource].add(source)
|
|
150 |
+ |
|
151 |
+ # unregister_exclusive_interest()
|
|
152 |
+ #
|
|
153 |
+ # Clear the exclusive interest in these resources.
|
|
154 |
+ #
|
|
155 |
+ # This should be called by the given source which registered
|
|
156 |
+ # an exclusive interest.
|
|
157 |
+ #
|
|
158 |
+ # Args:
|
|
159 |
+ # resources (set): Set of resources to reserve exclusively
|
|
160 |
+ # source (str): Source identifier, to be used again when unregistering
|
|
161 |
+ # the interest.
|
|
162 |
+ #
|
|
163 |
+ def unregister_exclusive_interest(self, resources, source):
|
|
164 |
+ |
|
165 |
+ for resource in resources:
|
|
166 |
+ self._exclusive_resources[resource].remove(source)
|
... | ... | @@ -38,14 +38,10 @@ class SchedStatus(): |
38 | 38 |
TERMINATED = 1
|
39 | 39 |
|
40 | 40 |
|
41 |
-# Our _REDUNDANT_EXCLUSIVE_ACTIONS jobs are special ones
|
|
42 |
-# which we launch dynamically, they have the property of being
|
|
43 |
-# meaningless to queue if one is already queued, and it also
|
|
44 |
-# doesnt make sense to run them in parallel
|
|
41 |
+# Some action names for the internal jobs we launch
|
|
45 | 42 |
#
|
46 | 43 |
_ACTION_NAME_CLEANUP = 'cleanup'
|
47 | 44 |
_ACTION_NAME_CACHE_SIZE = 'cache_size'
|
48 |
-_REDUNDANT_EXCLUSIVE_ACTIONS = [_ACTION_NAME_CLEANUP, _ACTION_NAME_CACHE_SIZE]
|
|
49 | 45 |
|
50 | 46 |
|
51 | 47 |
# Scheduler()
|
... | ... | @@ -81,8 +77,6 @@ class Scheduler(): |
81 | 77 |
#
|
82 | 78 |
# Public members
|
83 | 79 |
#
|
84 |
- self.active_jobs = [] # Jobs currently being run in the scheduler
|
|
85 |
- self.waiting_jobs = [] # Jobs waiting for resources
|
|
86 | 80 |
self.queues = None # Exposed for the frontend to print summaries
|
87 | 81 |
self.context = context # The Context object shared with Queues
|
88 | 82 |
self.terminated = False # Whether the scheduler was asked to terminate or has terminated
|
... | ... | @@ -95,15 +89,23 @@ class Scheduler(): |
95 | 89 |
#
|
96 | 90 |
# Private members
|
97 | 91 |
#
|
92 |
+ self._active_jobs = [] # Jobs currently being run in the scheduler
|
|
93 |
+ self._starttime = start_time # Initial application start time
|
|
94 |
+ self._suspendtime = None # Session time compensation for suspended state
|
|
95 |
+ self._queue_jobs = True # Whether we should continue to queue jobs
|
|
96 |
+ |
|
97 |
+ # State of cache management related jobs
|
|
98 |
+ self._cache_size_scheduled = False # Whether we have a cache size job scheduled
|
|
99 |
+ self._cache_size_running = None # A running CacheSizeJob, or None
|
|
100 |
+ self._cleanup_scheduled = False # Whether we have a cleanup job scheduled
|
|
101 |
+ self._cleanup_running = None # A running CleanupJob, or None
|
|
102 |
+ |
|
103 |
+ # Callbacks to report back to the Scheduler owner
|
|
98 | 104 |
self._interrupt_callback = interrupt_callback
|
99 | 105 |
self._ticker_callback = ticker_callback
|
100 | 106 |
self._job_start_callback = job_start_callback
|
101 | 107 |
self._job_complete_callback = job_complete_callback
|
102 | 108 |
|
103 |
- self._starttime = start_time
|
|
104 |
- self._suspendtime = None
|
|
105 |
- self._queue_jobs = True # Whether we should continue to queue jobs
|
|
106 |
- |
|
107 | 109 |
# Whether our exclusive jobs, like 'cleanup' are currently already
|
108 | 110 |
# waiting or active.
|
109 | 111 |
#
|
... | ... | @@ -113,9 +115,9 @@ class Scheduler(): |
113 | 115 |
self._exclusive_waiting = set()
|
114 | 116 |
self._exclusive_active = set()
|
115 | 117 |
|
116 |
- self._resources = Resources(context.sched_builders,
|
|
117 |
- context.sched_fetchers,
|
|
118 |
- context.sched_pushers)
|
|
118 |
+ self.resources = Resources(context.sched_builders,
|
|
119 |
+ context.sched_fetchers,
|
|
120 |
+ context.sched_pushers)
|
|
119 | 121 |
|
120 | 122 |
# run()
|
121 | 123 |
#
|
... | ... | @@ -150,7 +152,7 @@ class Scheduler(): |
150 | 152 |
self._connect_signals()
|
151 | 153 |
|
152 | 154 |
# Run the queues
|
153 |
- self._schedule_queue_jobs()
|
|
155 |
+ self._sched()
|
|
154 | 156 |
self.loop.run_forever()
|
155 | 157 |
self.loop.close()
|
156 | 158 |
|
... | ... | @@ -240,12 +242,32 @@ class Scheduler(): |
240 | 242 |
# status (JobStatus): The status of the completed job
|
241 | 243 |
#
|
242 | 244 |
def job_completed(self, job, status):
|
243 |
- self._resources.clear_job_resources(job)
|
|
244 |
- self.active_jobs.remove(job)
|
|
245 |
- if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
|
|
246 |
- self._exclusive_active.remove(job.action_name)
|
|
245 |
+ |
|
246 |
+ #
|
|
247 |
+ # Deallocate resources for our own internal jobs here
|
|
248 |
+ #
|
|
249 |
+ if job is self._cache_size_running:
|
|
250 |
+ self._cache_size_running = None
|
|
251 |
+ self.resources.release([ResourceType.CACHE, ResourceType.PROCESS])
|
|
252 |
+ |
|
253 |
+ elif job is self._cleanup_running:
|
|
254 |
+ self._cleanup_running = None
|
|
255 |
+ self.resources.release([ResourceType.CACHE, ResourceType.PROCESS])
|
|
256 |
+ |
|
257 |
+ # Unregister the exclusive interest when we're done with it
|
|
258 |
+ if not self._cleanup_scheduled:
|
|
259 |
+ self.resources.unregister_exclusive_interest(
|
|
260 |
+ [ResourceType.CACHE],
|
|
261 |
+ "cache-cleanup"
|
|
262 |
+ )
|
|
263 |
+ |
|
264 |
+ # Remove from the active jobs list
|
|
265 |
+ self._active_jobs.remove(job)
|
|
266 |
+ |
|
267 |
+ # Scheduler owner facing callback
|
|
247 | 268 |
self._job_complete_callback(job, status)
|
248 |
- self._schedule_queue_jobs()
|
|
269 |
+ |
|
270 |
+ # Now check for more jobs
|
|
249 | 271 |
self._sched()
|
250 | 272 |
|
251 | 273 |
# check_cache_size():
|
... | ... | @@ -255,78 +277,87 @@ class Scheduler(): |
255 | 277 |
# if needed.
|
256 | 278 |
#
|
257 | 279 |
def check_cache_size(self):
|
258 |
- job = CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE,
|
|
259 |
- 'cache_size/cache_size',
|
|
260 |
- resources=[ResourceType.CACHE,
|
|
261 |
- ResourceType.PROCESS],
|
|
262 |
- complete_cb=self._run_cleanup)
|
|
263 |
- self._schedule_jobs([job])
|
|
280 |
+ |
|
281 |
+ # Here we assume we are called in response to a job
|
|
282 |
+ # completion callback, or before entering the scheduler.
|
|
283 |
+ #
|
|
284 |
+ # As such there is no need to call `_sched()` from here,
|
|
285 |
+ # and we prefer to run it once at the last moment.
|
|
286 |
+ #
|
|
287 |
+ self._cache_size_scheduled = True
|
|
264 | 288 |
|
265 | 289 |
#######################################################
|
266 | 290 |
# Local Private Methods #
|
267 | 291 |
#######################################################
|
268 | 292 |
|
269 |
- # _sched()
|
|
293 |
+ # _spawn_job()
|
|
270 | 294 |
#
|
271 |
- # The main driving function of the scheduler, it will be called
|
|
272 |
- # automatically when Scheduler.run() is called initially,
|
|
295 |
+ # Spanws a job
|
|
273 | 296 |
#
|
274 |
- def _sched(self):
|
|
275 |
- for job in self.waiting_jobs:
|
|
276 |
- self._resources.reserve_exclusive_resources(job)
|
|
277 |
- |
|
278 |
- for job in self.waiting_jobs:
|
|
279 |
- if not self._resources.reserve_job_resources(job):
|
|
280 |
- continue
|
|
281 |
- |
|
282 |
- # Postpone these jobs if one is already running
|
|
283 |
- if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS and \
|
|
284 |
- job.action_name in self._exclusive_active:
|
|
285 |
- continue
|
|
286 |
- |
|
287 |
- job.spawn()
|
|
288 |
- self.waiting_jobs.remove(job)
|
|
289 |
- self.active_jobs.append(job)
|
|
290 |
- |
|
291 |
- if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
|
|
292 |
- self._exclusive_waiting.remove(job.action_name)
|
|
293 |
- self._exclusive_active.add(job.action_name)
|
|
297 |
+ # Args:
|
|
298 |
+ # job (Job): The job to spawn
|
|
299 |
+ #
|
|
300 |
+ def _spawn_job(self, job):
|
|
301 |
+ job.spawn()
|
|
302 |
+ self._active_jobs.append(job)
|
|
303 |
+ if self._job_start_callback:
|
|
304 |
+ self._job_start_callback(job)
|
|
294 | 305 |
|
295 |
- if self._job_start_callback:
|
|
296 |
- self._job_start_callback(job)
|
|
306 |
+ # Custom complete callback for the cache size job
|
|
307 |
+ def _cache_size_job_complete(self, cache_size):
|
|
308 |
+ context = self.context
|
|
309 |
+ artifacts = context.artifactcache
|
|
310 |
+ if not artifacts.has_quota_exceeded():
|
|
311 |
+ return
|
|
297 | 312 |
|
298 |
- # If nothings ticking, time to bail out
|
|
299 |
- if not self.active_jobs and not self.waiting_jobs:
|
|
300 |
- self.loop.stop()
|
|
313 |
+ # Schedule a cleanup job if we've hit the threshold
|
|
314 |
+ self._cleanup_scheduled = True
|
|
301 | 315 |
|
302 |
- # _schedule_jobs()
|
|
303 |
- #
|
|
304 |
- # The main entry point for jobs to be scheduled.
|
|
316 |
+ # _spawn_internal_jobs()
|
|
305 | 317 |
#
|
306 |
- # This is called either as a result of scanning the queues
|
|
307 |
- # in _schedule_queue_jobs(), or directly by the Scheduler
|
|
308 |
- # to insert special jobs like cleanups.
|
|
318 |
+ # Spawn jobs that have to do with internal business logic, such as
|
|
319 |
+ # cache management related jobs.
|
|
309 | 320 |
#
|
310 |
- # Args:
|
|
311 |
- # jobs ([Job]): A list of jobs to schedule
|
|
312 |
- #
|
|
313 |
- def _schedule_jobs(self, jobs):
|
|
314 |
- for job in jobs:
|
|
321 |
+ def _spawn_internal_jobs(self):
|
|
322 |
+ |
|
323 |
+ #
|
|
324 |
+ # Try to run a cleanup job if we can
|
|
325 |
+ #
|
|
326 |
+ if self._cleanup_scheduled and self._cleanup_running is None:
|
|
315 | 327 |
|
316 |
- # Special treatment of our redundant exclusive jobs
|
|
317 | 328 |
#
|
318 |
- if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
|
|
329 |
+ # Ensure we have an exclusive interest in the resources
|
|
330 |
+ #
|
|
331 |
+ self.resources.register_exclusive_interest(
|
|
332 |
+ [ResourceType.CACHE],
|
|
333 |
+ "cache-cleanup"
|
|
334 |
+ )
|
|
335 |
+ |
|
336 |
+ if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS],
|
|
337 |
+ [ResourceType.CACHE]):
|
|
319 | 338 |
|
320 |
- # Drop the job if one is already queued
|
|
321 |
- if job.action_name in self._exclusive_waiting:
|
|
322 |
- continue
|
|
339 |
+ # Update state and launch
|
|
340 |
+ self._cleanup_scheduled = False
|
|
341 |
+ self._cleanup_running = \
|
|
342 |
+ CleanupJob(self, _ACTION_NAME_CLEANUP, 'cleanup/cleanup')
|
|
343 |
+ self._spawn_job(self._cleanup_running)
|
|
323 | 344 |
|
324 |
- # Mark this action type as queued
|
|
325 |
- self._exclusive_waiting.add(job.action_name)
|
|
345 |
+ #
|
|
346 |
+ # Try to run a cache size job if we can
|
|
347 |
+ #
|
|
348 |
+ if self._cache_size_scheduled and not self._cache_size_running:
|
|
326 | 349 |
|
327 |
- self.waiting_jobs.append(job)
|
|
350 |
+ if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS]):
|
|
328 | 351 |
|
329 |
- # _schedule_queue_jobs()
|
|
352 |
+ # Update state and launch
|
|
353 |
+ self._cache_size_scheduled = False
|
|
354 |
+ self._cache_size_running = \
|
|
355 |
+ CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE,
|
|
356 |
+ 'cache_size/cache_size',
|
|
357 |
+ complete_cb=self._cache_size_job_complete)
|
|
358 |
+ self._spawn_job(self._cache_size_running)
|
|
359 |
+ |
|
360 |
+ # _spawn_queue_jobs()
|
|
330 | 361 |
#
|
331 | 362 |
# Ask the queues what jobs they want to schedule and schedule
|
332 | 363 |
# them. This is done here so we can ask for new jobs when jobs
|
... | ... | @@ -335,7 +366,7 @@ class Scheduler(): |
335 | 366 |
# This will process the Queues, pull elements through the Queues
|
336 | 367 |
# and process anything that is ready.
|
337 | 368 |
#
|
338 |
- def _schedule_queue_jobs(self):
|
|
369 |
+ def _spawn_queue_jobs(self):
|
|
339 | 370 |
ready = []
|
340 | 371 |
process_queues = True
|
341 | 372 |
|
... | ... | @@ -344,10 +375,7 @@ class Scheduler(): |
344 | 375 |
# Pull elements forward through queues
|
345 | 376 |
elements = []
|
346 | 377 |
for queue in self.queues:
|
347 |
- # Enqueue elements complete from the last queue
|
|
348 | 378 |
queue.enqueue(elements)
|
349 |
- |
|
350 |
- # Dequeue processed elements for the next queue
|
|
351 | 379 |
elements = list(queue.dequeue())
|
352 | 380 |
|
353 | 381 |
# Kickoff whatever processes can be processed at this time
|
... | ... | @@ -362,41 +390,51 @@ class Scheduler(): |
362 | 390 |
# thus need all the pulls to complete before ever starting
|
363 | 391 |
# a build
|
364 | 392 |
ready.extend(chain.from_iterable(
|
365 |
- queue.pop_ready_jobs() for queue in reversed(self.queues)
|
|
393 |
+ q.harvest_jobs() for q in reversed(self.queues)
|
|
366 | 394 |
))
|
367 | 395 |
|
368 |
- # pop_ready_jobs() may have skipped jobs, adding them to
|
|
369 |
- # the done_queue. Pull these skipped elements forward to
|
|
370 |
- # the next queue and process them.
|
|
396 |
+ # harvest_jobs() may have decided to skip some jobs, making
|
|
397 |
+ # them eligible for promotion to the next queue as a side effect.
|
|
398 |
+ #
|
|
399 |
+ # If that happens, do another round.
|
|
371 | 400 |
process_queues = any(q.dequeue_ready() for q in self.queues)
|
372 | 401 |
|
373 |
- self._schedule_jobs(ready)
|
|
374 |
- self._sched()
|
|
402 |
+ # Spawn the jobs
|
|
403 |
+ #
|
|
404 |
+ for job in ready:
|
|
405 |
+ self._spawn_job(job)
|
|
375 | 406 |
|
376 |
- # _run_cleanup()
|
|
377 |
- #
|
|
378 |
- # Schedules the cache cleanup job if the passed size
|
|
379 |
- # exceeds the cache quota.
|
|
407 |
+ # _sched()
|
|
380 | 408 |
#
|
381 |
- # Args:
|
|
382 |
- # cache_size (int): The calculated cache size (ignored)
|
|
409 |
+ # Run any jobs which are ready to run, or quit the main loop
|
|
410 |
+ # when nothing is running or is ready to run.
|
|
383 | 411 |
#
|
384 |
- # NOTE: This runs in response to completion of the cache size
|
|
385 |
- # calculation job lauched by Scheduler.check_cache_size(),
|
|
386 |
- # which will report the calculated cache size.
|
|
412 |
+ # This is the main driving function of the scheduler, it is called
|
|
413 |
+ # initially when we enter Scheduler.run(), and at the end of whenever
|
|
414 |
+ # any job completes, after any bussiness logic has occurred and before
|
|
415 |
+ # going back to sleep.
|
|
387 | 416 |
#
|
388 |
- def _run_cleanup(self, cache_size):
|
|
389 |
- context = self.context
|
|
390 |
- artifacts = context.artifactcache
|
|
417 |
+ def _sched(self):
|
|
391 | 418 |
|
392 |
- if not artifacts.has_quota_exceeded():
|
|
393 |
- return
|
|
419 |
+ if not self.terminated:
|
|
420 |
+ |
|
421 |
+ #
|
|
422 |
+ # Process our internal business related jobs first,
|
|
423 |
+ # they have overall priority over queues.
|
|
424 |
+ #
|
|
425 |
+ self._spawn_internal_jobs()
|
|
394 | 426 |
|
395 |
- job = CleanupJob(self, _ACTION_NAME_CLEANUP, 'cleanup/cleanup',
|
|
396 |
- resources=[ResourceType.CACHE,
|
|
397 |
- ResourceType.PROCESS],
|
|
398 |
- exclusive_resources=[ResourceType.CACHE])
|
|
399 |
- self._schedule_jobs([job])
|
|
427 |
+ #
|
|
428 |
+ # Run as many jobs as the queues can handle for the
|
|
429 |
+ # available resources
|
|
430 |
+ #
|
|
431 |
+ self._spawn_queue_jobs()
|
|
432 |
+ |
|
433 |
+ #
|
|
434 |
+ # If nothing is ticking then bail out
|
|
435 |
+ #
|
|
436 |
+ if not self._active_jobs:
|
|
437 |
+ self.loop.stop()
|
|
400 | 438 |
|
401 | 439 |
# _suspend_jobs()
|
402 | 440 |
#
|
... | ... | @@ -406,7 +444,7 @@ class Scheduler(): |
406 | 444 |
if not self.suspended:
|
407 | 445 |
self._suspendtime = datetime.datetime.now()
|
408 | 446 |
self.suspended = True
|
409 |
- for job in self.active_jobs:
|
|
447 |
+ for job in self._active_jobs:
|
|
410 | 448 |
job.suspend()
|
411 | 449 |
|
412 | 450 |
# _resume_jobs()
|
... | ... | @@ -415,7 +453,7 @@ class Scheduler(): |
415 | 453 |
#
|
416 | 454 |
def _resume_jobs(self):
|
417 | 455 |
if self.suspended:
|
418 |
- for job in self.active_jobs:
|
|
456 |
+ for job in self._active_jobs:
|
|
419 | 457 |
job.resume()
|
420 | 458 |
self.suspended = False
|
421 | 459 |
self._starttime += (datetime.datetime.now() - self._suspendtime)
|
... | ... | @@ -488,19 +526,16 @@ class Scheduler(): |
488 | 526 |
wait_limit = 20.0
|
489 | 527 |
|
490 | 528 |
# First tell all jobs to terminate
|
491 |
- for job in self.active_jobs:
|
|
529 |
+ for job in self._active_jobs:
|
|
492 | 530 |
job.terminate()
|
493 | 531 |
|
494 | 532 |
# Now wait for them to really terminate
|
495 |
- for job in self.active_jobs:
|
|
533 |
+ for job in self._active_jobs:
|
|
496 | 534 |
elapsed = datetime.datetime.now() - wait_start
|
497 | 535 |
timeout = max(wait_limit - elapsed.total_seconds(), 0.0)
|
498 | 536 |
if not job.terminate_wait(timeout):
|
499 | 537 |
job.kill()
|
500 | 538 |
|
501 |
- # Clear out the waiting jobs
|
|
502 |
- self.waiting_jobs = []
|
|
503 |
- |
|
504 | 539 |
# Regular timeout for driving status in the UI
|
505 | 540 |
def _tick(self):
|
506 | 541 |
elapsed = self.elapsed_time()
|
... | ... | @@ -12,7 +12,21 @@ DATA_DIR = os.path.join( |
12 | 12 |
)
|
13 | 13 |
|
14 | 14 |
|
15 |
-def create_element(repo, name, path, dependencies, ref=None):
|
|
15 |
+# create_element()
|
|
16 |
+#
|
|
17 |
+# Args:
|
|
18 |
+# project (str): The project directory where testing is happening
|
|
19 |
+# name (str): The element name to create
|
|
20 |
+# dependencies (list): The list of dependencies to dump into YAML format
|
|
21 |
+#
|
|
22 |
+# Returns:
|
|
23 |
+# (Repo): The corresponding git repository created for the element
|
|
24 |
+def create_element(project, name, dependencies):
|
|
25 |
+ dev_files_path = os.path.join(project, 'files', 'dev-files')
|
|
26 |
+ element_path = os.path.join(project, 'elements')
|
|
27 |
+ repo = create_repo('git', project, "{}-repo".format(name))
|
|
28 |
+ ref = repo.create(dev_files_path)
|
|
29 |
+ |
|
16 | 30 |
element = {
|
17 | 31 |
'kind': 'import',
|
18 | 32 |
'sources': [
|
... | ... | @@ -20,7 +34,9 @@ def create_element(repo, name, path, dependencies, ref=None): |
20 | 34 |
],
|
21 | 35 |
'depends': dependencies
|
22 | 36 |
}
|
23 |
- _yaml.dump(element, os.path.join(path, name))
|
|
37 |
+ _yaml.dump(element, os.path.join(element_path, name))
|
|
38 |
+ |
|
39 |
+ return repo
|
|
24 | 40 |
|
25 | 41 |
|
26 | 42 |
# This tests a variety of scenarios and checks that the order in
|
... | ... | @@ -59,18 +75,6 @@ def create_element(repo, name, path, dependencies, ref=None): |
59 | 75 |
@pytest.mark.parametrize("operation", [('show'), ('fetch'), ('build')])
|
60 | 76 |
def test_order(cli, datafiles, tmpdir, operation, target, template, expected):
|
61 | 77 |
project = os.path.join(datafiles.dirname, datafiles.basename)
|
62 |
- dev_files_path = os.path.join(project, 'files', 'dev-files')
|
|
63 |
- element_path = os.path.join(project, 'elements')
|
|
64 |
- |
|
65 |
- # FIXME: Remove this when the test passes reliably.
|
|
66 |
- #
|
|
67 |
- # There is no reason why the order should not
|
|
68 |
- # be preserved when the builders is set to 1,
|
|
69 |
- # the scheduler queue processing still seems to
|
|
70 |
- # be losing the order.
|
|
71 |
- #
|
|
72 |
- if operation == 'build':
|
|
73 |
- pytest.skip("FIXME: This still only sometimes passes")
|
|
74 | 78 |
|
75 | 79 |
# Configure to only allow one fetcher at a time, make it easy to
|
76 | 80 |
# determine what is being planned in what order.
|
... | ... | @@ -84,11 +88,8 @@ def test_order(cli, datafiles, tmpdir, operation, target, template, expected): |
84 | 88 |
# Build the project from the template, make import elements
|
85 | 89 |
# all with the same repo
|
86 | 90 |
#
|
87 |
- repo = create_repo('git', str(tmpdir))
|
|
88 |
- ref = repo.create(dev_files_path)
|
|
89 | 91 |
for element, dependencies in template.items():
|
90 |
- create_element(repo, element, element_path, dependencies, ref=ref)
|
|
91 |
- repo.add_commit()
|
|
92 |
+ create_element(project, element, dependencies)
|
|
92 | 93 |
|
93 | 94 |
# Run test and collect results
|
94 | 95 |
if operation == 'show':
|