Tristan Van Berkom pushed to branch master at BuildStream / buildstream
Commits:
-
6991d6d6
by Tristan Van Berkom at 2019-01-16T19:28:14Z
-
04ba59bc
by Tristan Van Berkom at 2019-01-16T19:28:14Z
-
32bdded8
by Tristan Van Berkom at 2019-01-16T19:28:14Z
-
17f6e5a8
by Tristan Van Berkom at 2019-01-16T20:10:30Z
9 changed files:
- buildstream/_artifactcache.py
- buildstream/_scheduler/jobs/cachesizejob.py
- buildstream/_scheduler/jobs/cleanupjob.py
- buildstream/_scheduler/jobs/job.py
- buildstream/_scheduler/queues/buildqueue.py
- buildstream/_scheduler/queues/queue.py
- buildstream/_scheduler/resources.py
- buildstream/_scheduler/scheduler.py
- tests/frontend/order.py
Changes:
| ... | ... | @@ -247,7 +247,7 @@ class ArtifactCache(): |
| 247 | 247 |
# FIXME: Asking the user what to do may be neater
|
| 248 | 248 |
default_conf = os.path.join(os.environ['XDG_CONFIG_HOME'],
|
| 249 | 249 |
'buildstream.conf')
|
| 250 |
- detail = ("There is not enough space to build the given element.\n"
|
|
| 250 |
+ detail = ("There is not enough space to complete the build.\n"
|
|
| 251 | 251 |
"Please increase the cache-quota in {}."
|
| 252 | 252 |
.format(self.context.config_origin or default_conf))
|
| 253 | 253 |
|
| ... | ... | @@ -34,8 +34,8 @@ class CacheSizeJob(Job): |
| 34 | 34 |
if status == JobStatus.OK:
|
| 35 | 35 |
self._artifacts.set_cache_size(result)
|
| 36 | 36 |
|
| 37 |
- if self._complete_cb:
|
|
| 38 |
- self._complete_cb(result)
|
|
| 37 |
+ if self._complete_cb:
|
|
| 38 |
+ self._complete_cb(status, result)
|
|
| 39 | 39 |
|
| 40 | 40 |
def child_process_data(self):
|
| 41 | 41 |
return {}
|
| ... | ... | @@ -20,8 +20,9 @@ from .job import Job, JobStatus |
| 20 | 20 |
|
| 21 | 21 |
|
| 22 | 22 |
class CleanupJob(Job):
|
| 23 |
- def __init__(self, *args, **kwargs):
|
|
| 23 |
+ def __init__(self, *args, complete_cb, **kwargs):
|
|
| 24 | 24 |
super().__init__(*args, **kwargs)
|
| 25 |
+ self._complete_cb = complete_cb
|
|
| 25 | 26 |
|
| 26 | 27 |
context = self._scheduler.context
|
| 27 | 28 |
self._artifacts = context.artifactcache
|
| ... | ... | @@ -32,3 +33,6 @@ class CleanupJob(Job): |
| 32 | 33 |
def parent_complete(self, status, result):
|
| 33 | 34 |
if status == JobStatus.OK:
|
| 34 | 35 |
self._artifacts.set_cache_size(result)
|
| 36 |
+ |
|
| 37 |
+ if self._complete_cb:
|
|
| 38 |
+ self._complete_cb(status, result)
|
| ... | ... | @@ -85,28 +85,11 @@ class Process(multiprocessing.Process): |
| 85 | 85 |
# action_name (str): The queue action name
|
| 86 | 86 |
# logfile (str): A template string that points to the logfile
|
| 87 | 87 |
# that should be used - should contain {pid}.
|
| 88 |
-# resources (iter(ResourceType)) - A set of resources this job
|
|
| 89 |
-# wants to use.
|
|
| 90 |
-# exclusive_resources (iter(ResourceType)) - A set of resources
|
|
| 91 |
-# this job wants to use
|
|
| 92 |
-# exclusively.
|
|
| 93 | 88 |
# max_retries (int): The maximum number of retries
|
| 94 | 89 |
#
|
| 95 | 90 |
class Job():
|
| 96 | 91 |
|
| 97 |
- def __init__(self, scheduler, action_name, logfile, *,
|
|
| 98 |
- resources=None, exclusive_resources=None, max_retries=0):
|
|
| 99 |
- |
|
| 100 |
- if resources is None:
|
|
| 101 |
- resources = set()
|
|
| 102 |
- else:
|
|
| 103 |
- resources = set(resources)
|
|
| 104 |
- if exclusive_resources is None:
|
|
| 105 |
- exclusive_resources = set()
|
|
| 106 |
- else:
|
|
| 107 |
- exclusive_resources = set(resources)
|
|
| 108 |
- |
|
| 109 |
- assert exclusive_resources <= resources, "All exclusive resources must also be resources!"
|
|
| 92 |
+ def __init__(self, scheduler, action_name, logfile, *, max_retries=0):
|
|
| 110 | 93 |
|
| 111 | 94 |
#
|
| 112 | 95 |
# Public members
|
| ... | ... | @@ -114,12 +97,6 @@ class Job(): |
| 114 | 97 |
self.action_name = action_name # The action name for the Queue
|
| 115 | 98 |
self.child_data = None # Data to be sent to the main process
|
| 116 | 99 |
|
| 117 |
- # The resources this job wants to access
|
|
| 118 |
- self.resources = resources
|
|
| 119 |
- # Resources this job needs to access exclusively, i.e., no
|
|
| 120 |
- # other job should be allowed to access them
|
|
| 121 |
- self.exclusive_resources = exclusive_resources
|
|
| 122 |
- |
|
| 123 | 100 |
#
|
| 124 | 101 |
# Private members
|
| 125 | 102 |
#
|
| ... | ... | @@ -57,11 +57,10 @@ class BuildQueue(Queue): |
| 57 | 57 |
logfile=logfile)
|
| 58 | 58 |
job = ElementJob(self._scheduler, self.action_name,
|
| 59 | 59 |
logfile, element=element, queue=self,
|
| 60 |
- resources=self.resources,
|
|
| 61 | 60 |
action_cb=self.process,
|
| 62 | 61 |
complete_cb=self._job_done,
|
| 63 | 62 |
max_retries=self._max_retries)
|
| 64 |
- self._done_queue.append(job)
|
|
| 63 |
+ self._done_queue.append(element)
|
|
| 65 | 64 |
self.failed_elements.append(element)
|
| 66 | 65 |
self._scheduler._job_complete_callback(job, False)
|
| 67 | 66 |
|
| ... | ... | @@ -72,8 +72,9 @@ class Queue(): |
| 72 | 72 |
# Private members
|
| 73 | 73 |
#
|
| 74 | 74 |
self._scheduler = scheduler
|
| 75 |
- self._wait_queue = deque()
|
|
| 76 |
- self._done_queue = deque()
|
|
| 75 |
+ self._resources = scheduler.resources # Shared resource pool
|
|
| 76 |
+ self._wait_queue = deque() # Ready / Waiting elements
|
|
| 77 |
+ self._done_queue = deque() # Processed / Skipped elements
|
|
| 77 | 78 |
self._max_retries = 0
|
| 78 | 79 |
|
| 79 | 80 |
# Assert the subclass has setup class data
|
| ... | ... | @@ -115,16 +116,6 @@ class Queue(): |
| 115 | 116 |
def status(self, element):
|
| 116 | 117 |
return QueueStatus.READY
|
| 117 | 118 |
|
| 118 |
- # prepare()
|
|
| 119 |
- #
|
|
| 120 |
- # Abstract method for handling job preparation in the main process.
|
|
| 121 |
- #
|
|
| 122 |
- # Args:
|
|
| 123 |
- # element (Element): The element which is scheduled
|
|
| 124 |
- #
|
|
| 125 |
- def prepare(self, element):
|
|
| 126 |
- pass
|
|
| 127 |
- |
|
| 128 | 119 |
# done()
|
| 129 | 120 |
#
|
| 130 | 121 |
# Abstract method for handling a successful job completion.
|
| ... | ... | @@ -153,26 +144,18 @@ class Queue(): |
| 153 | 144 |
if not elts:
|
| 154 | 145 |
return
|
| 155 | 146 |
|
| 156 |
- # Note: The internal lists work with jobs. This is not
|
|
| 157 |
- # reflected in any external methods (except
|
|
| 158 |
- # pop/peek_ready_jobs).
|
|
| 159 |
- def create_job(element):
|
|
| 160 |
- logfile = self._element_log_path(element)
|
|
| 161 |
- return ElementJob(self._scheduler, self.action_name,
|
|
| 162 |
- logfile, element=element, queue=self,
|
|
| 163 |
- resources=self.resources,
|
|
| 164 |
- action_cb=self.process,
|
|
| 165 |
- complete_cb=self._job_done,
|
|
| 166 |
- max_retries=self._max_retries)
|
|
| 167 |
- |
|
| 168 |
- # Place skipped elements directly on the done queue
|
|
| 169 |
- jobs = [create_job(elt) for elt in elts]
|
|
| 170 |
- skip = [job for job in jobs if self.status(job.element) == QueueStatus.SKIP]
|
|
| 171 |
- wait = [job for job in jobs if job not in skip]
|
|
| 172 |
- |
|
| 173 |
- self.skipped_elements.extend([job.element for job in skip])
|
|
| 174 |
- self._wait_queue.extend(wait)
|
|
| 175 |
- self._done_queue.extend(skip)
|
|
| 147 |
+ # Place skipped elements on the done queue right away.
|
|
| 148 |
+ #
|
|
| 149 |
+ # The remaining ready and waiting elements must remain in the
|
|
| 150 |
+ # same queue, and ready status must be determined at the moment
|
|
| 151 |
+ # which the scheduler is asking for the next job.
|
|
| 152 |
+ #
|
|
| 153 |
+ skip = [elt for elt in elts if self.status(elt) == QueueStatus.SKIP]
|
|
| 154 |
+ wait = [elt for elt in elts if elt not in skip]
|
|
| 155 |
+ |
|
| 156 |
+ self.skipped_elements.extend(skip) # Public record of skipped elements
|
|
| 157 |
+ self._done_queue.extend(skip) # Elements to be processed
|
|
| 158 |
+ self._wait_queue.extend(wait) # Elements eligible to be dequeued
|
|
| 176 | 159 |
|
| 177 | 160 |
# dequeue()
|
| 178 | 161 |
#
|
| ... | ... | @@ -184,69 +167,59 @@ class Queue(): |
| 184 | 167 |
#
|
| 185 | 168 |
def dequeue(self):
|
| 186 | 169 |
while self._done_queue:
|
| 187 |
- yield self._done_queue.popleft().element
|
|
| 170 |
+ yield self._done_queue.popleft()
|
|
| 188 | 171 |
|
| 189 | 172 |
# dequeue_ready()
|
| 190 | 173 |
#
|
| 191 |
- # Reports whether there are any elements to dequeue
|
|
| 174 |
+ # Reports whether any elements can be promoted to other queues
|
|
| 192 | 175 |
#
|
| 193 | 176 |
# Returns:
|
| 194 |
- # (bool): Whether there are elements to dequeue
|
|
| 177 |
+ # (bool): Whether there are elements ready
|
|
| 195 | 178 |
#
|
| 196 | 179 |
def dequeue_ready(self):
|
| 197 | 180 |
return any(self._done_queue)
|
| 198 | 181 |
|
| 199 |
- # pop_ready_jobs()
|
|
| 200 |
- #
|
|
| 201 |
- # Returns:
|
|
| 202 |
- # ([Job]): A list of jobs to run
|
|
| 182 |
+ # harvest_jobs()
|
|
| 203 | 183 |
#
|
| 204 | 184 |
# Process elements in the queue, moving elements which were enqueued
|
| 205 |
- # into the dequeue pool, and processing them if necessary.
|
|
| 206 |
- #
|
|
| 207 |
- # This will have different results for elements depending
|
|
| 208 |
- # on the Queue.status() implementation.
|
|
| 209 |
- #
|
|
| 210 |
- # o Elements which are QueueStatus.WAIT will not be affected
|
|
| 185 |
+ # into the dequeue pool, and creating as many jobs for which resources
|
|
| 186 |
+ # can be reserved.
|
|
| 211 | 187 |
#
|
| 212 |
- # o Elements which are QueueStatus.SKIP will move directly
|
|
| 213 |
- # to the dequeue pool
|
|
| 214 |
- #
|
|
| 215 |
- # o For Elements which are QueueStatus.READY a Job will be
|
|
| 216 |
- # created and returned to the caller, given that the scheduler
|
|
| 217 |
- # allows the Queue enough resources for the given job
|
|
| 188 |
+ # Returns:
|
|
| 189 |
+ # ([Job]): A list of jobs which can be run now
|
|
| 218 | 190 |
#
|
| 219 |
- def pop_ready_jobs(self):
|
|
| 191 |
+ def harvest_jobs(self):
|
|
| 220 | 192 |
unready = []
|
| 221 | 193 |
ready = []
|
| 222 | 194 |
|
| 223 | 195 |
while self._wait_queue:
|
| 224 |
- job = self._wait_queue.popleft()
|
|
| 225 |
- element = job.element
|
|
| 196 |
+ if not self._resources.reserve(self.resources, peek=True):
|
|
| 197 |
+ break
|
|
| 226 | 198 |
|
| 199 |
+ element = self._wait_queue.popleft()
|
|
| 227 | 200 |
status = self.status(element)
|
| 201 |
+ |
|
| 228 | 202 |
if status == QueueStatus.WAIT:
|
| 229 |
- unready.append(job)
|
|
| 230 |
- continue
|
|
| 203 |
+ unready.append(element)
|
|
| 231 | 204 |
elif status == QueueStatus.SKIP:
|
| 232 |
- self._done_queue.append(job)
|
|
| 205 |
+ self._done_queue.append(element)
|
|
| 233 | 206 |
self.skipped_elements.append(element)
|
| 234 |
- continue
|
|
| 235 |
- |
|
| 236 |
- self.prepare(element)
|
|
| 237 |
- ready.append(job)
|
|
| 207 |
+ else:
|
|
| 208 |
+ reserved = self._resources.reserve(self.resources)
|
|
| 209 |
+ assert reserved
|
|
| 210 |
+ ready.append(element)
|
|
| 238 | 211 |
|
| 239 |
- # These were not ready but were in the beginning, give em
|
|
| 240 |
- # first priority again next time around
|
|
| 241 | 212 |
self._wait_queue.extendleft(unready)
|
| 242 | 213 |
|
| 243 |
- return ready
|
|
| 244 |
- |
|
| 245 |
- def peek_ready_jobs(self):
|
|
| 246 |
- def ready(job):
|
|
| 247 |
- return self.status(job.element) == QueueStatus.READY
|
|
| 248 |
- |
|
| 249 |
- yield from (job for job in self._wait_queue if ready(job))
|
|
| 214 |
+ return [
|
|
| 215 |
+ ElementJob(self._scheduler, self.action_name,
|
|
| 216 |
+ self._element_log_path(element),
|
|
| 217 |
+ element=element, queue=self,
|
|
| 218 |
+ action_cb=self.process,
|
|
| 219 |
+ complete_cb=self._job_done,
|
|
| 220 |
+ max_retries=self._max_retries)
|
|
| 221 |
+ for element in ready
|
|
| 222 |
+ ]
|
|
| 250 | 223 |
|
| 251 | 224 |
#####################################################
|
| 252 | 225 |
# Private Methods #
|
| ... | ... | @@ -292,6 +265,10 @@ class Queue(): |
| 292 | 265 |
#
|
| 293 | 266 |
def _job_done(self, job, element, status, result):
|
| 294 | 267 |
|
| 268 |
+ # Now release the resources we reserved
|
|
| 269 |
+ #
|
|
| 270 |
+ self._resources.release(self.resources)
|
|
| 271 |
+ |
|
| 295 | 272 |
# Update values that need to be synchronized in the main task
|
| 296 | 273 |
# before calling any queue implementation
|
| 297 | 274 |
self._update_workspaces(element, job)
|
| ... | ... | @@ -324,12 +301,8 @@ class Queue(): |
| 324 | 301 |
detail=traceback.format_exc())
|
| 325 | 302 |
self.failed_elements.append(element)
|
| 326 | 303 |
else:
|
| 327 |
- #
|
|
| 328 |
- # No exception occured in post processing
|
|
| 329 |
- #
|
|
| 330 |
- |
|
| 331 |
- # All jobs get placed on the done queue for later processing.
|
|
| 332 |
- self._done_queue.append(job)
|
|
| 304 |
+ # All elements get placed on the done queue for later processing.
|
|
| 305 |
+ self._done_queue.append(element)
|
|
| 333 | 306 |
|
| 334 | 307 |
# These lists are for bookkeeping purposes for the UI and logging.
|
| 335 | 308 |
if status == JobStatus.SKIPPED:
|
| ... | ... | @@ -34,28 +34,25 @@ class Resources(): |
| 34 | 34 |
ResourceType.UPLOAD: set()
|
| 35 | 35 |
}
|
| 36 | 36 |
|
| 37 |
- def clear_job_resources(self, job):
|
|
| 38 |
- for resource in job.exclusive_resources:
|
|
| 39 |
- self._exclusive_resources[resource].remove(hash(job))
|
|
| 37 |
+ # reserve()
|
|
| 38 |
+ #
|
|
| 39 |
+ # Reserves a set of resources
|
|
| 40 |
+ #
|
|
| 41 |
+ # Args:
|
|
| 42 |
+ # resources (set): A set of ResourceTypes
|
|
| 43 |
+ # exclusive (set): Another set of ResourceTypes
|
|
| 44 |
+ # peek (bool): Whether to only peek at whether the resource is available
|
|
| 45 |
+ #
|
|
| 46 |
+ # Returns:
|
|
| 47 |
+ # (bool): True if the resources could be reserved
|
|
| 48 |
+ #
|
|
| 49 |
+ def reserve(self, resources, exclusive=None, *, peek=False):
|
|
| 50 |
+ if exclusive is None:
|
|
| 51 |
+ exclusive = set()
|
|
| 40 | 52 |
|
| 41 |
- for resource in job.resources:
|
|
| 42 |
- self._used_resources[resource] -= 1
|
|
| 43 |
- |
|
| 44 |
- def reserve_exclusive_resources(self, job):
|
|
| 45 |
- exclusive = job.exclusive_resources
|
|
| 46 |
- |
|
| 47 |
- # The very first thing we do is to register any exclusive
|
|
| 48 |
- # resources this job may want. Even if the job is not yet
|
|
| 49 |
- # allowed to run (because another job is holding the resource
|
|
| 50 |
- # it wants), we can still set this - it just means that any
|
|
| 51 |
- # job *currently* using these resources has to finish first,
|
|
| 52 |
- # and no new jobs wanting these can be launched (except other
|
|
| 53 |
- # exclusive-access jobs).
|
|
| 54 |
- #
|
|
| 55 |
- for resource in exclusive:
|
|
| 56 |
- self._exclusive_resources[resource].add(hash(job))
|
|
| 53 |
+ resources = set(resources)
|
|
| 54 |
+ exclusive = set(exclusive)
|
|
| 57 | 55 |
|
| 58 |
- def reserve_job_resources(self, job):
|
|
| 59 | 56 |
# First, we check if the job wants to access a resource that
|
| 60 | 57 |
# another job wants exclusive access to. If so, it cannot be
|
| 61 | 58 |
# scheduled.
|
| ... | ... | @@ -68,7 +65,8 @@ class Resources(): |
| 68 | 65 |
# is currently not possible, but may be worth thinking
|
| 69 | 66 |
# about.
|
| 70 | 67 |
#
|
| 71 |
- for resource in job.resources - job.exclusive_resources:
|
|
| 68 |
+ for resource in resources - exclusive:
|
|
| 69 |
+ |
|
| 72 | 70 |
# If our job wants this resource exclusively, we never
|
| 73 | 71 |
# check this, so we can get away with not (temporarily)
|
| 74 | 72 |
# removing it from the set.
|
| ... | ... | @@ -84,14 +82,14 @@ class Resources(): |
| 84 | 82 |
# at a time, despite being allowed to be part of the exclusive
|
| 85 | 83 |
# set.
|
| 86 | 84 |
#
|
| 87 |
- for exclusive in job.exclusive_resources:
|
|
| 88 |
- if self._used_resources[exclusive] != 0:
|
|
| 85 |
+ for resource in exclusive:
|
|
| 86 |
+ if self._used_resources[resource] != 0:
|
|
| 89 | 87 |
return False
|
| 90 | 88 |
|
| 91 | 89 |
# Finally, we check if we have enough of each resource
|
| 92 | 90 |
# available. If we don't have enough, the job cannot be
|
| 93 | 91 |
# scheduled.
|
| 94 |
- for resource in job.resources:
|
|
| 92 |
+ for resource in resources:
|
|
| 95 | 93 |
if (self._max_resources[resource] > 0 and
|
| 96 | 94 |
self._used_resources[resource] >= self._max_resources[resource]):
|
| 97 | 95 |
return False
|
| ... | ... | @@ -99,7 +97,70 @@ class Resources(): |
| 99 | 97 |
# Now we register the fact that our job is using the resources
|
| 100 | 98 |
# it asked for, and tell the scheduler that it is allowed to
|
| 101 | 99 |
# continue.
|
| 102 |
- for resource in job.resources:
|
|
| 103 |
- self._used_resources[resource] += 1
|
|
| 100 |
+ if not peek:
|
|
| 101 |
+ for resource in resources:
|
|
| 102 |
+ self._used_resources[resource] += 1
|
|
| 104 | 103 |
|
| 105 | 104 |
return True
|
| 105 |
+ |
|
| 106 |
+ # release()
|
|
| 107 |
+ #
|
|
| 108 |
+ # Release resources previously reserved with Resources.reserve()
|
|
| 109 |
+ #
|
|
| 110 |
+ # Args:
|
|
| 111 |
+ # resources (set): A set of resources to release
|
|
| 112 |
+ #
|
|
| 113 |
+ def release(self, resources):
|
|
| 114 |
+ for resource in resources:
|
|
| 115 |
+ assert self._used_resources[resource] > 0, "Scheduler resource imbalance"
|
|
| 116 |
+ self._used_resources[resource] -= 1
|
|
| 117 |
+ |
|
| 118 |
+ # register_exclusive_interest()
|
|
| 119 |
+ #
|
|
| 120 |
+ # Inform the resources pool that `source` has an interest in
|
|
| 121 |
+ # reserving this resource exclusively.
|
|
| 122 |
+ #
|
|
| 123 |
+ # The source parameter is used to identify the caller, it
|
|
| 124 |
+ # must be ensured to be unique for the time that the
|
|
| 125 |
+ # interest is registered.
|
|
| 126 |
+ #
|
|
| 127 |
+ # This function may be called multiple times, and subsequent
|
|
| 128 |
+ # calls will simply have no effect until clear_exclusive_interest()
|
|
| 129 |
+ # is used to clear the interest.
|
|
| 130 |
+ #
|
|
| 131 |
+ # This must be called in advance of reserve()
|
|
| 132 |
+ #
|
|
| 133 |
+ # Args:
|
|
| 134 |
+ # resources (set): Set of resources to reserve exclusively
|
|
| 135 |
+ # source (any): Source identifier, to be used again when unregistering
|
|
| 136 |
+ # the interest.
|
|
| 137 |
+ #
|
|
| 138 |
+ def register_exclusive_interest(self, resources, source):
|
|
| 139 |
+ |
|
| 140 |
+ # The very first thing we do is to register any exclusive
|
|
| 141 |
+ # resources this job may want. Even if the job is not yet
|
|
| 142 |
+ # allowed to run (because another job is holding the resource
|
|
| 143 |
+ # it wants), we can still set this - it just means that any
|
|
| 144 |
+ # job *currently* using these resources has to finish first,
|
|
| 145 |
+ # and no new jobs wanting these can be launched (except other
|
|
| 146 |
+ # exclusive-access jobs).
|
|
| 147 |
+ #
|
|
| 148 |
+ for resource in resources:
|
|
| 149 |
+ self._exclusive_resources[resource].add(source)
|
|
| 150 |
+ |
|
| 151 |
+ # unregister_exclusive_interest()
|
|
| 152 |
+ #
|
|
| 153 |
+ # Clear the exclusive interest in these resources.
|
|
| 154 |
+ #
|
|
| 155 |
+ # This should be called by the given source which registered
|
|
| 156 |
+ # an exclusive interest.
|
|
| 157 |
+ #
|
|
| 158 |
+ # Args:
|
|
| 159 |
+ # resources (set): Set of resources to reserve exclusively
|
|
| 160 |
+ # source (str): Source identifier, to be used again when unregistering
|
|
| 161 |
+ # the interest.
|
|
| 162 |
+ #
|
|
| 163 |
+ def unregister_exclusive_interest(self, resources, source):
|
|
| 164 |
+ |
|
| 165 |
+ for resource in resources:
|
|
| 166 |
+ self._exclusive_resources[resource].remove(source)
|
| ... | ... | @@ -28,7 +28,7 @@ from contextlib import contextmanager |
| 28 | 28 |
|
| 29 | 29 |
# Local imports
|
| 30 | 30 |
from .resources import Resources, ResourceType
|
| 31 |
-from .jobs import CacheSizeJob, CleanupJob
|
|
| 31 |
+from .jobs import JobStatus, CacheSizeJob, CleanupJob
|
|
| 32 | 32 |
|
| 33 | 33 |
|
| 34 | 34 |
# A decent return code for Scheduler.run()
|
| ... | ... | @@ -38,14 +38,10 @@ class SchedStatus(): |
| 38 | 38 |
TERMINATED = 1
|
| 39 | 39 |
|
| 40 | 40 |
|
| 41 |
-# Our _REDUNDANT_EXCLUSIVE_ACTIONS jobs are special ones
|
|
| 42 |
-# which we launch dynamically, they have the property of being
|
|
| 43 |
-# meaningless to queue if one is already queued, and it also
|
|
| 44 |
-# doesnt make sense to run them in parallel
|
|
| 41 |
+# Some action names for the internal jobs we launch
|
|
| 45 | 42 |
#
|
| 46 | 43 |
_ACTION_NAME_CLEANUP = 'cleanup'
|
| 47 | 44 |
_ACTION_NAME_CACHE_SIZE = 'cache_size'
|
| 48 |
-_REDUNDANT_EXCLUSIVE_ACTIONS = [_ACTION_NAME_CLEANUP, _ACTION_NAME_CACHE_SIZE]
|
|
| 49 | 45 |
|
| 50 | 46 |
|
| 51 | 47 |
# Scheduler()
|
| ... | ... | @@ -81,8 +77,6 @@ class Scheduler(): |
| 81 | 77 |
#
|
| 82 | 78 |
# Public members
|
| 83 | 79 |
#
|
| 84 |
- self.active_jobs = [] # Jobs currently being run in the scheduler
|
|
| 85 |
- self.waiting_jobs = [] # Jobs waiting for resources
|
|
| 86 | 80 |
self.queues = None # Exposed for the frontend to print summaries
|
| 87 | 81 |
self.context = context # The Context object shared with Queues
|
| 88 | 82 |
self.terminated = False # Whether the scheduler was asked to terminate or has terminated
|
| ... | ... | @@ -95,15 +89,23 @@ class Scheduler(): |
| 95 | 89 |
#
|
| 96 | 90 |
# Private members
|
| 97 | 91 |
#
|
| 92 |
+ self._active_jobs = [] # Jobs currently being run in the scheduler
|
|
| 93 |
+ self._starttime = start_time # Initial application start time
|
|
| 94 |
+ self._suspendtime = None # Session time compensation for suspended state
|
|
| 95 |
+ self._queue_jobs = True # Whether we should continue to queue jobs
|
|
| 96 |
+ |
|
| 97 |
+ # State of cache management related jobs
|
|
| 98 |
+ self._cache_size_scheduled = False # Whether we have a cache size job scheduled
|
|
| 99 |
+ self._cache_size_running = None # A running CacheSizeJob, or None
|
|
| 100 |
+ self._cleanup_scheduled = False # Whether we have a cleanup job scheduled
|
|
| 101 |
+ self._cleanup_running = None # A running CleanupJob, or None
|
|
| 102 |
+ |
|
| 103 |
+ # Callbacks to report back to the Scheduler owner
|
|
| 98 | 104 |
self._interrupt_callback = interrupt_callback
|
| 99 | 105 |
self._ticker_callback = ticker_callback
|
| 100 | 106 |
self._job_start_callback = job_start_callback
|
| 101 | 107 |
self._job_complete_callback = job_complete_callback
|
| 102 | 108 |
|
| 103 |
- self._starttime = start_time
|
|
| 104 |
- self._suspendtime = None
|
|
| 105 |
- self._queue_jobs = True # Whether we should continue to queue jobs
|
|
| 106 |
- |
|
| 107 | 109 |
# Whether our exclusive jobs, like 'cleanup' are currently already
|
| 108 | 110 |
# waiting or active.
|
| 109 | 111 |
#
|
| ... | ... | @@ -113,9 +115,9 @@ class Scheduler(): |
| 113 | 115 |
self._exclusive_waiting = set()
|
| 114 | 116 |
self._exclusive_active = set()
|
| 115 | 117 |
|
| 116 |
- self._resources = Resources(context.sched_builders,
|
|
| 117 |
- context.sched_fetchers,
|
|
| 118 |
- context.sched_pushers)
|
|
| 118 |
+ self.resources = Resources(context.sched_builders,
|
|
| 119 |
+ context.sched_fetchers,
|
|
| 120 |
+ context.sched_pushers)
|
|
| 119 | 121 |
|
| 120 | 122 |
# run()
|
| 121 | 123 |
#
|
| ... | ... | @@ -150,7 +152,7 @@ class Scheduler(): |
| 150 | 152 |
self._connect_signals()
|
| 151 | 153 |
|
| 152 | 154 |
# Run the queues
|
| 153 |
- self._schedule_queue_jobs()
|
|
| 155 |
+ self._sched()
|
|
| 154 | 156 |
self.loop.run_forever()
|
| 155 | 157 |
self.loop.close()
|
| 156 | 158 |
|
| ... | ... | @@ -240,12 +242,14 @@ class Scheduler(): |
| 240 | 242 |
# status (JobStatus): The status of the completed job
|
| 241 | 243 |
#
|
| 242 | 244 |
def job_completed(self, job, status):
|
| 243 |
- self._resources.clear_job_resources(job)
|
|
| 244 |
- self.active_jobs.remove(job)
|
|
| 245 |
- if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
|
|
| 246 |
- self._exclusive_active.remove(job.action_name)
|
|
| 245 |
+ |
|
| 246 |
+ # Remove from the active jobs list
|
|
| 247 |
+ self._active_jobs.remove(job)
|
|
| 248 |
+ |
|
| 249 |
+ # Scheduler owner facing callback
|
|
| 247 | 250 |
self._job_complete_callback(job, status)
|
| 248 |
- self._schedule_queue_jobs()
|
|
| 251 |
+ |
|
| 252 |
+ # Now check for more jobs
|
|
| 249 | 253 |
self._sched()
|
| 250 | 254 |
|
| 251 | 255 |
# check_cache_size():
|
| ... | ... | @@ -255,78 +259,104 @@ class Scheduler(): |
| 255 | 259 |
# if needed.
|
| 256 | 260 |
#
|
| 257 | 261 |
def check_cache_size(self):
|
| 258 |
- job = CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE,
|
|
| 259 |
- 'cache_size/cache_size',
|
|
| 260 |
- resources=[ResourceType.CACHE,
|
|
| 261 |
- ResourceType.PROCESS],
|
|
| 262 |
- complete_cb=self._run_cleanup)
|
|
| 263 |
- self._schedule_jobs([job])
|
|
| 262 |
+ |
|
| 263 |
+ # Here we assume we are called in response to a job
|
|
| 264 |
+ # completion callback, or before entering the scheduler.
|
|
| 265 |
+ #
|
|
| 266 |
+ # As such there is no need to call `_sched()` from here,
|
|
| 267 |
+ # and we prefer to run it once at the last moment.
|
|
| 268 |
+ #
|
|
| 269 |
+ self._cache_size_scheduled = True
|
|
| 264 | 270 |
|
| 265 | 271 |
#######################################################
|
| 266 | 272 |
# Local Private Methods #
|
| 267 | 273 |
#######################################################
|
| 268 | 274 |
|
| 269 |
- # _sched()
|
|
| 275 |
+ # _spawn_job()
|
|
| 270 | 276 |
#
|
| 271 |
- # The main driving function of the scheduler, it will be called
|
|
| 272 |
- # automatically when Scheduler.run() is called initially,
|
|
| 277 |
+ # Spanws a job
|
|
| 273 | 278 |
#
|
| 274 |
- def _sched(self):
|
|
| 275 |
- for job in self.waiting_jobs:
|
|
| 276 |
- self._resources.reserve_exclusive_resources(job)
|
|
| 279 |
+ # Args:
|
|
| 280 |
+ # job (Job): The job to spawn
|
|
| 281 |
+ #
|
|
| 282 |
+ def _spawn_job(self, job):
|
|
| 283 |
+ job.spawn()
|
|
| 284 |
+ self._active_jobs.append(job)
|
|
| 285 |
+ if self._job_start_callback:
|
|
| 286 |
+ self._job_start_callback(job)
|
|
| 277 | 287 |
|
| 278 |
- for job in self.waiting_jobs:
|
|
| 279 |
- if not self._resources.reserve_job_resources(job):
|
|
| 280 |
- continue
|
|
| 288 |
+ # Callback for the cache size job
|
|
| 289 |
+ def _cache_size_job_complete(self, status, cache_size):
|
|
| 281 | 290 |
|
| 282 |
- # Postpone these jobs if one is already running
|
|
| 283 |
- if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS and \
|
|
| 284 |
- job.action_name in self._exclusive_active:
|
|
| 285 |
- continue
|
|
| 291 |
+ # Deallocate cache size job resources
|
|
| 292 |
+ self._cache_size_running = None
|
|
| 293 |
+ self.resources.release([ResourceType.CACHE, ResourceType.PROCESS])
|
|
| 286 | 294 |
|
| 287 |
- job.spawn()
|
|
| 288 |
- self.waiting_jobs.remove(job)
|
|
| 289 |
- self.active_jobs.append(job)
|
|
| 295 |
+ # Schedule a cleanup job if we've hit the threshold
|
|
| 296 |
+ if status != JobStatus.OK:
|
|
| 297 |
+ return
|
|
| 290 | 298 |
|
| 291 |
- if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
|
|
| 292 |
- self._exclusive_waiting.remove(job.action_name)
|
|
| 293 |
- self._exclusive_active.add(job.action_name)
|
|
| 299 |
+ context = self.context
|
|
| 300 |
+ artifacts = context.artifactcache
|
|
| 294 | 301 |
|
| 295 |
- if self._job_start_callback:
|
|
| 296 |
- self._job_start_callback(job)
|
|
| 302 |
+ if artifacts.has_quota_exceeded():
|
|
| 303 |
+ self._cleanup_scheduled = True
|
|
| 297 | 304 |
|
| 298 |
- # If nothings ticking, time to bail out
|
|
| 299 |
- if not self.active_jobs and not self.waiting_jobs:
|
|
| 300 |
- self.loop.stop()
|
|
| 305 |
+ # Callback for the cleanup job
|
|
| 306 |
+ def _cleanup_job_complete(self, status, cache_size):
|
|
| 301 | 307 |
|
| 302 |
- # _schedule_jobs()
|
|
| 303 |
- #
|
|
| 304 |
- # The main entry point for jobs to be scheduled.
|
|
| 305 |
- #
|
|
| 306 |
- # This is called either as a result of scanning the queues
|
|
| 307 |
- # in _schedule_queue_jobs(), or directly by the Scheduler
|
|
| 308 |
- # to insert special jobs like cleanups.
|
|
| 308 |
+ # Deallocate cleanup job resources
|
|
| 309 |
+ self._cleanup_running = None
|
|
| 310 |
+ self.resources.release([ResourceType.CACHE, ResourceType.PROCESS])
|
|
| 311 |
+ |
|
| 312 |
+ # Unregister the exclusive interest when we're done with it
|
|
| 313 |
+ if not self._cleanup_scheduled:
|
|
| 314 |
+ self.resources.unregister_exclusive_interest(
|
|
| 315 |
+ [ResourceType.CACHE], 'cache-cleanup'
|
|
| 316 |
+ )
|
|
| 317 |
+ |
|
| 318 |
+ # _sched_cleanup_job()
|
|
| 309 | 319 |
#
|
| 310 |
- # Args:
|
|
| 311 |
- # jobs ([Job]): A list of jobs to schedule
|
|
| 320 |
+ # Runs a cleanup job if one is scheduled to run now and
|
|
| 321 |
+ # sufficient recources are available.
|
|
| 312 | 322 |
#
|
| 313 |
- def _schedule_jobs(self, jobs):
|
|
| 314 |
- for job in jobs:
|
|
| 323 |
+ def _sched_cleanup_job(self):
|
|
| 315 | 324 |
|
| 316 |
- # Special treatment of our redundant exclusive jobs
|
|
| 317 |
- #
|
|
| 318 |
- if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
|
|
| 325 |
+ if self._cleanup_scheduled and self._cleanup_running is None:
|
|
| 326 |
+ |
|
| 327 |
+ # Ensure we have an exclusive interest in the resources
|
|
| 328 |
+ self.resources.register_exclusive_interest(
|
|
| 329 |
+ [ResourceType.CACHE], 'cache-cleanup'
|
|
| 330 |
+ )
|
|
| 331 |
+ |
|
| 332 |
+ if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS],
|
|
| 333 |
+ [ResourceType.CACHE]):
|
|
| 319 | 334 |
|
| 320 |
- # Drop the job if one is already queued
|
|
| 321 |
- if job.action_name in self._exclusive_waiting:
|
|
| 322 |
- continue
|
|
| 335 |
+ # Update state and launch
|
|
| 336 |
+ self._cleanup_scheduled = False
|
|
| 337 |
+ self._cleanup_running = \
|
|
| 338 |
+ CleanupJob(self, _ACTION_NAME_CLEANUP, 'cleanup/cleanup',
|
|
| 339 |
+ complete_cb=self._cleanup_job_complete)
|
|
| 340 |
+ self._spawn_job(self._cleanup_running)
|
|
| 323 | 341 |
|
| 324 |
- # Mark this action type as queued
|
|
| 325 |
- self._exclusive_waiting.add(job.action_name)
|
|
| 342 |
+ # _sched_cache_size_job()
|
|
| 343 |
+ #
|
|
| 344 |
+ # Runs a cache size job if one is scheduled to run now and
|
|
| 345 |
+ # sufficient recources are available.
|
|
| 346 |
+ #
|
|
| 347 |
+ def _sched_cache_size_job(self):
|
|
| 348 |
+ |
|
| 349 |
+ if self._cache_size_scheduled and not self._cache_size_running:
|
|
| 326 | 350 |
|
| 327 |
- self.waiting_jobs.append(job)
|
|
| 351 |
+ if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS]):
|
|
| 352 |
+ self._cache_size_scheduled = False
|
|
| 353 |
+ self._cache_size_running = \
|
|
| 354 |
+ CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE,
|
|
| 355 |
+ 'cache_size/cache_size',
|
|
| 356 |
+ complete_cb=self._cache_size_job_complete)
|
|
| 357 |
+ self._spawn_job(self._cache_size_running)
|
|
| 328 | 358 |
|
| 329 |
- # _schedule_queue_jobs()
|
|
| 359 |
+ # _sched_queue_jobs()
|
|
| 330 | 360 |
#
|
| 331 | 361 |
# Ask the queues what jobs they want to schedule and schedule
|
| 332 | 362 |
# them. This is done here so we can ask for new jobs when jobs
|
| ... | ... | @@ -335,7 +365,7 @@ class Scheduler(): |
| 335 | 365 |
# This will process the Queues, pull elements through the Queues
|
| 336 | 366 |
# and process anything that is ready.
|
| 337 | 367 |
#
|
| 338 |
- def _schedule_queue_jobs(self):
|
|
| 368 |
+ def _sched_queue_jobs(self):
|
|
| 339 | 369 |
ready = []
|
| 340 | 370 |
process_queues = True
|
| 341 | 371 |
|
| ... | ... | @@ -344,10 +374,7 @@ class Scheduler(): |
| 344 | 374 |
# Pull elements forward through queues
|
| 345 | 375 |
elements = []
|
| 346 | 376 |
for queue in self.queues:
|
| 347 |
- # Enqueue elements complete from the last queue
|
|
| 348 | 377 |
queue.enqueue(elements)
|
| 349 |
- |
|
| 350 |
- # Dequeue processed elements for the next queue
|
|
| 351 | 378 |
elements = list(queue.dequeue())
|
| 352 | 379 |
|
| 353 | 380 |
# Kickoff whatever processes can be processed at this time
|
| ... | ... | @@ -362,41 +389,51 @@ class Scheduler(): |
| 362 | 389 |
# thus need all the pulls to complete before ever starting
|
| 363 | 390 |
# a build
|
| 364 | 391 |
ready.extend(chain.from_iterable(
|
| 365 |
- queue.pop_ready_jobs() for queue in reversed(self.queues)
|
|
| 392 |
+ q.harvest_jobs() for q in reversed(self.queues)
|
|
| 366 | 393 |
))
|
| 367 | 394 |
|
| 368 |
- # pop_ready_jobs() may have skipped jobs, adding them to
|
|
| 369 |
- # the done_queue. Pull these skipped elements forward to
|
|
| 370 |
- # the next queue and process them.
|
|
| 395 |
+ # harvest_jobs() may have decided to skip some jobs, making
|
|
| 396 |
+ # them eligible for promotion to the next queue as a side effect.
|
|
| 397 |
+ #
|
|
| 398 |
+ # If that happens, do another round.
|
|
| 371 | 399 |
process_queues = any(q.dequeue_ready() for q in self.queues)
|
| 372 | 400 |
|
| 373 |
- self._schedule_jobs(ready)
|
|
| 374 |
- self._sched()
|
|
| 401 |
+ # Spawn the jobs
|
|
| 402 |
+ #
|
|
| 403 |
+ for job in ready:
|
|
| 404 |
+ self._spawn_job(job)
|
|
| 375 | 405 |
|
| 376 |
- # _run_cleanup()
|
|
| 377 |
- #
|
|
| 378 |
- # Schedules the cache cleanup job if the passed size
|
|
| 379 |
- # exceeds the cache quota.
|
|
| 406 |
+ # _sched()
|
|
| 380 | 407 |
#
|
| 381 |
- # Args:
|
|
| 382 |
- # cache_size (int): The calculated cache size (ignored)
|
|
| 408 |
+ # Run any jobs which are ready to run, or quit the main loop
|
|
| 409 |
+ # when nothing is running or is ready to run.
|
|
| 383 | 410 |
#
|
| 384 |
- # NOTE: This runs in response to completion of the cache size
|
|
| 385 |
- # calculation job lauched by Scheduler.check_cache_size(),
|
|
| 386 |
- # which will report the calculated cache size.
|
|
| 411 |
+ # This is the main driving function of the scheduler, it is called
|
|
| 412 |
+ # initially when we enter Scheduler.run(), and at the end of whenever
|
|
| 413 |
+ # any job completes, after any bussiness logic has occurred and before
|
|
| 414 |
+ # going back to sleep.
|
|
| 387 | 415 |
#
|
| 388 |
- def _run_cleanup(self, cache_size):
|
|
| 389 |
- context = self.context
|
|
| 390 |
- artifacts = context.artifactcache
|
|
| 416 |
+ def _sched(self):
|
|
| 391 | 417 |
|
| 392 |
- if not artifacts.has_quota_exceeded():
|
|
| 393 |
- return
|
|
| 418 |
+ if not self.terminated:
|
|
| 419 |
+ |
|
| 420 |
+ #
|
|
| 421 |
+ # Try the cache management jobs
|
|
| 422 |
+ #
|
|
| 423 |
+ self._sched_cleanup_job()
|
|
| 424 |
+ self._sched_cache_size_job()
|
|
| 425 |
+ |
|
| 426 |
+ #
|
|
| 427 |
+ # Run as many jobs as the queues can handle for the
|
|
| 428 |
+ # available resources
|
|
| 429 |
+ #
|
|
| 430 |
+ self._sched_queue_jobs()
|
|
| 394 | 431 |
|
| 395 |
- job = CleanupJob(self, _ACTION_NAME_CLEANUP, 'cleanup/cleanup',
|
|
| 396 |
- resources=[ResourceType.CACHE,
|
|
| 397 |
- ResourceType.PROCESS],
|
|
| 398 |
- exclusive_resources=[ResourceType.CACHE])
|
|
| 399 |
- self._schedule_jobs([job])
|
|
| 432 |
+ #
|
|
| 433 |
+ # If nothing is ticking then bail out
|
|
| 434 |
+ #
|
|
| 435 |
+ if not self._active_jobs:
|
|
| 436 |
+ self.loop.stop()
|
|
| 400 | 437 |
|
| 401 | 438 |
# _suspend_jobs()
|
| 402 | 439 |
#
|
| ... | ... | @@ -406,7 +443,7 @@ class Scheduler(): |
| 406 | 443 |
if not self.suspended:
|
| 407 | 444 |
self._suspendtime = datetime.datetime.now()
|
| 408 | 445 |
self.suspended = True
|
| 409 |
- for job in self.active_jobs:
|
|
| 446 |
+ for job in self._active_jobs:
|
|
| 410 | 447 |
job.suspend()
|
| 411 | 448 |
|
| 412 | 449 |
# _resume_jobs()
|
| ... | ... | @@ -415,7 +452,7 @@ class Scheduler(): |
| 415 | 452 |
#
|
| 416 | 453 |
def _resume_jobs(self):
|
| 417 | 454 |
if self.suspended:
|
| 418 |
- for job in self.active_jobs:
|
|
| 455 |
+ for job in self._active_jobs:
|
|
| 419 | 456 |
job.resume()
|
| 420 | 457 |
self.suspended = False
|
| 421 | 458 |
self._starttime += (datetime.datetime.now() - self._suspendtime)
|
| ... | ... | @@ -488,19 +525,16 @@ class Scheduler(): |
| 488 | 525 |
wait_limit = 20.0
|
| 489 | 526 |
|
| 490 | 527 |
# First tell all jobs to terminate
|
| 491 |
- for job in self.active_jobs:
|
|
| 528 |
+ for job in self._active_jobs:
|
|
| 492 | 529 |
job.terminate()
|
| 493 | 530 |
|
| 494 | 531 |
# Now wait for them to really terminate
|
| 495 |
- for job in self.active_jobs:
|
|
| 532 |
+ for job in self._active_jobs:
|
|
| 496 | 533 |
elapsed = datetime.datetime.now() - wait_start
|
| 497 | 534 |
timeout = max(wait_limit - elapsed.total_seconds(), 0.0)
|
| 498 | 535 |
if not job.terminate_wait(timeout):
|
| 499 | 536 |
job.kill()
|
| 500 | 537 |
|
| 501 |
- # Clear out the waiting jobs
|
|
| 502 |
- self.waiting_jobs = []
|
|
| 503 |
- |
|
| 504 | 538 |
# Regular timeout for driving status in the UI
|
| 505 | 539 |
def _tick(self):
|
| 506 | 540 |
elapsed = self.elapsed_time()
|
| ... | ... | @@ -12,7 +12,21 @@ DATA_DIR = os.path.join( |
| 12 | 12 |
)
|
| 13 | 13 |
|
| 14 | 14 |
|
| 15 |
-def create_element(repo, name, path, dependencies, ref=None):
|
|
| 15 |
+# create_element()
|
|
| 16 |
+#
|
|
| 17 |
+# Args:
|
|
| 18 |
+# project (str): The project directory where testing is happening
|
|
| 19 |
+# name (str): The element name to create
|
|
| 20 |
+# dependencies (list): The list of dependencies to dump into YAML format
|
|
| 21 |
+#
|
|
| 22 |
+# Returns:
|
|
| 23 |
+# (Repo): The corresponding git repository created for the element
|
|
| 24 |
+def create_element(project, name, dependencies):
|
|
| 25 |
+ dev_files_path = os.path.join(project, 'files', 'dev-files')
|
|
| 26 |
+ element_path = os.path.join(project, 'elements')
|
|
| 27 |
+ repo = create_repo('git', project, "{}-repo".format(name))
|
|
| 28 |
+ ref = repo.create(dev_files_path)
|
|
| 29 |
+ |
|
| 16 | 30 |
element = {
|
| 17 | 31 |
'kind': 'import',
|
| 18 | 32 |
'sources': [
|
| ... | ... | @@ -20,7 +34,9 @@ def create_element(repo, name, path, dependencies, ref=None): |
| 20 | 34 |
],
|
| 21 | 35 |
'depends': dependencies
|
| 22 | 36 |
}
|
| 23 |
- _yaml.dump(element, os.path.join(path, name))
|
|
| 37 |
+ _yaml.dump(element, os.path.join(element_path, name))
|
|
| 38 |
+ |
|
| 39 |
+ return repo
|
|
| 24 | 40 |
|
| 25 | 41 |
|
| 26 | 42 |
# This tests a variety of scenarios and checks that the order in
|
| ... | ... | @@ -59,18 +75,6 @@ def create_element(repo, name, path, dependencies, ref=None): |
| 59 | 75 |
@pytest.mark.parametrize("operation", [('show'), ('fetch'), ('build')])
|
| 60 | 76 |
def test_order(cli, datafiles, tmpdir, operation, target, template, expected):
|
| 61 | 77 |
project = os.path.join(datafiles.dirname, datafiles.basename)
|
| 62 |
- dev_files_path = os.path.join(project, 'files', 'dev-files')
|
|
| 63 |
- element_path = os.path.join(project, 'elements')
|
|
| 64 |
- |
|
| 65 |
- # FIXME: Remove this when the test passes reliably.
|
|
| 66 |
- #
|
|
| 67 |
- # There is no reason why the order should not
|
|
| 68 |
- # be preserved when the builders is set to 1,
|
|
| 69 |
- # the scheduler queue processing still seems to
|
|
| 70 |
- # be losing the order.
|
|
| 71 |
- #
|
|
| 72 |
- if operation == 'build':
|
|
| 73 |
- pytest.skip("FIXME: This still only sometimes passes")
|
|
| 74 | 78 |
|
| 75 | 79 |
# Configure to only allow one fetcher at a time, make it easy to
|
| 76 | 80 |
# determine what is being planned in what order.
|
| ... | ... | @@ -84,11 +88,8 @@ def test_order(cli, datafiles, tmpdir, operation, target, template, expected): |
| 84 | 88 |
# Build the project from the template, make import elements
|
| 85 | 89 |
# all with the same repo
|
| 86 | 90 |
#
|
| 87 |
- repo = create_repo('git', str(tmpdir))
|
|
| 88 |
- ref = repo.create(dev_files_path)
|
|
| 89 | 91 |
for element, dependencies in template.items():
|
| 90 |
- create_element(repo, element, element_path, dependencies, ref=ref)
|
|
| 91 |
- repo.add_commit()
|
|
| 92 |
+ create_element(project, element, dependencies)
|
|
| 92 | 93 |
|
| 93 | 94 |
# Run test and collect results
|
| 94 | 95 |
if operation == 'show':
|
