[Notes] [Git][BuildStream/buildstream][tristan/element-processing-order] 3 commits: _scheduler: Refactor of queues and resources.



Title: GitLab

Tristan Van Berkom pushed to branch tristan/element-processing-order at BuildStream / buildstream

Commits:

6 changed files:

Changes:

  • buildstream/_artifactcache/artifactcache.py
    ... ... @@ -249,7 +249,7 @@ class ArtifactCache():
    249 249
                     # FIXME: Asking the user what to do may be neater
    
    250 250
                     default_conf = os.path.join(os.environ['XDG_CONFIG_HOME'],
    
    251 251
                                                 'buildstream.conf')
    
    252
    -                detail = ("There is not enough space to build the given element.\n"
    
    252
    +                detail = ("There is not enough space to complete the build.\n"
    
    253 253
                               "Please increase the cache-quota in {}."
    
    254 254
                               .format(self.context.config_origin or default_conf))
    
    255 255
     
    

  • buildstream/_scheduler/jobs/job.py
    ... ... @@ -85,28 +85,11 @@ class Process(multiprocessing.Process):
    85 85
     #    action_name (str): The queue action name
    
    86 86
     #    logfile (str): A template string that points to the logfile
    
    87 87
     #                   that should be used - should contain {pid}.
    
    88
    -#    resources (iter(ResourceType)) - A set of resources this job
    
    89
    -#                                     wants to use.
    
    90
    -#    exclusive_resources (iter(ResourceType)) - A set of resources
    
    91
    -#                                               this job wants to use
    
    92
    -#                                               exclusively.
    
    93 88
     #    max_retries (int): The maximum number of retries
    
    94 89
     #
    
    95 90
     class Job():
    
    96 91
     
    
    97
    -    def __init__(self, scheduler, action_name, logfile, *,
    
    98
    -                 resources=None, exclusive_resources=None, max_retries=0):
    
    99
    -
    
    100
    -        if resources is None:
    
    101
    -            resources = set()
    
    102
    -        else:
    
    103
    -            resources = set(resources)
    
    104
    -        if exclusive_resources is None:
    
    105
    -            exclusive_resources = set()
    
    106
    -        else:
    
    107
    -            exclusive_resources = set(resources)
    
    108
    -
    
    109
    -        assert exclusive_resources <= resources, "All exclusive resources must also be resources!"
    
    92
    +    def __init__(self, scheduler, action_name, logfile, *, max_retries=0):
    
    110 93
     
    
    111 94
             #
    
    112 95
             # Public members
    
    ... ... @@ -114,12 +97,6 @@ class Job():
    114 97
             self.action_name = action_name   # The action name for the Queue
    
    115 98
             self.child_data = None           # Data to be sent to the main process
    
    116 99
     
    
    117
    -        # The resources this job wants to access
    
    118
    -        self.resources = resources
    
    119
    -        # Resources this job needs to access exclusively, i.e., no
    
    120
    -        # other job should be allowed to access them
    
    121
    -        self.exclusive_resources = exclusive_resources
    
    122
    -
    
    123 100
             #
    
    124 101
             # Private members
    
    125 102
             #
    

  • buildstream/_scheduler/queues/queue.py
    ... ... @@ -72,8 +72,9 @@ class Queue():
    72 72
             # Private members
    
    73 73
             #
    
    74 74
             self._scheduler = scheduler
    
    75
    -        self._wait_queue = deque()
    
    76
    -        self._done_queue = deque()
    
    75
    +        self._resources = scheduler.resources  # Shared resource pool
    
    76
    +        self._wait_queue = deque()             # Ready / Waiting elements
    
    77
    +        self._done_queue = deque()             # Processed / Skipped elements
    
    77 78
             self._max_retries = 0
    
    78 79
     
    
    79 80
             # Assert the subclass has setup class data
    
    ... ... @@ -115,16 +116,6 @@ class Queue():
    115 116
         def status(self, element):
    
    116 117
             return QueueStatus.READY
    
    117 118
     
    
    118
    -    # prepare()
    
    119
    -    #
    
    120
    -    # Abstract method for handling job preparation in the main process.
    
    121
    -    #
    
    122
    -    # Args:
    
    123
    -    #    element (Element): The element which is scheduled
    
    124
    -    #
    
    125
    -    def prepare(self, element):
    
    126
    -        pass
    
    127
    -
    
    128 119
         # done()
    
    129 120
         #
    
    130 121
         # Abstract method for handling a successful job completion.
    
    ... ... @@ -153,26 +144,18 @@ class Queue():
    153 144
             if not elts:
    
    154 145
                 return
    
    155 146
     
    
    156
    -        # Note: The internal lists work with jobs. This is not
    
    157
    -        #       reflected in any external methods (except
    
    158
    -        #       pop/peek_ready_jobs).
    
    159
    -        def create_job(element):
    
    160
    -            logfile = self._element_log_path(element)
    
    161
    -            return ElementJob(self._scheduler, self.action_name,
    
    162
    -                              logfile, element=element, queue=self,
    
    163
    -                              resources=self.resources,
    
    164
    -                              action_cb=self.process,
    
    165
    -                              complete_cb=self._job_done,
    
    166
    -                              max_retries=self._max_retries)
    
    167
    -
    
    168
    -        # Place skipped elements directly on the done queue
    
    169
    -        jobs = [create_job(elt) for elt in elts]
    
    170
    -        skip = [job for job in jobs if self.status(job.element) == QueueStatus.SKIP]
    
    171
    -        wait = [job for job in jobs if job not in skip]
    
    172
    -
    
    173
    -        self.skipped_elements.extend([job.element for job in skip])
    
    174
    -        self._wait_queue.extend(wait)
    
    175
    -        self._done_queue.extend(skip)
    
    147
    +        # Place skipped elements on the done queue right away.
    
    148
    +        #
    
    149
    +        # The remaining ready and waiting elements must remain in the
    
    150
    +        # same queue, and ready status must be determined at the moment
    
    151
    +        # which the scheduler is asking for the next job.
    
    152
    +        #
    
    153
    +        skip = [elt for elt in elts if self.status(elt) == QueueStatus.SKIP]
    
    154
    +        wait = [elt for elt in elts if elt not in skip]
    
    155
    +
    
    156
    +        self.skipped_elements.extend(skip)  # Public record of skipped elements
    
    157
    +        self._done_queue.extend(skip)       # Elements to be processed
    
    158
    +        self._wait_queue.extend(wait)       # Elements eligible to be dequeued
    
    176 159
     
    
    177 160
         # dequeue()
    
    178 161
         #
    
    ... ... @@ -184,69 +167,59 @@ class Queue():
    184 167
         #
    
    185 168
         def dequeue(self):
    
    186 169
             while self._done_queue:
    
    187
    -            yield self._done_queue.popleft().element
    
    170
    +            yield self._done_queue.popleft()
    
    188 171
     
    
    189 172
         # dequeue_ready()
    
    190 173
         #
    
    191
    -    # Reports whether there are any elements to dequeue
    
    174
    +    # Reports whether any elements can be promoted to other queues
    
    192 175
         #
    
    193 176
         # Returns:
    
    194
    -    #    (bool): Whether there are elements to dequeue
    
    177
    +    #    (bool): Whether there are elements ready
    
    195 178
         #
    
    196 179
         def dequeue_ready(self):
    
    197 180
             return any(self._done_queue)
    
    198 181
     
    
    199
    -    # pop_ready_jobs()
    
    200
    -    #
    
    201
    -    # Returns:
    
    202
    -    #     ([Job]): A list of jobs to run
    
    182
    +    # harvest_jobs()
    
    203 183
         #
    
    204 184
         # Process elements in the queue, moving elements which were enqueued
    
    205
    -    # into the dequeue pool, and processing them if necessary.
    
    206
    -    #
    
    207
    -    # This will have different results for elements depending
    
    208
    -    # on the Queue.status() implementation.
    
    209
    -    #
    
    210
    -    #   o Elements which are QueueStatus.WAIT will not be affected
    
    185
    +    # into the dequeue pool, and creating as many jobs for which resources
    
    186
    +    # can be reserved.
    
    211 187
         #
    
    212
    -    #   o Elements which are QueueStatus.SKIP will move directly
    
    213
    -    #     to the dequeue pool
    
    214
    -    #
    
    215
    -    #   o For Elements which are QueueStatus.READY a Job will be
    
    216
    -    #     created and returned to the caller, given that the scheduler
    
    217
    -    #     allows the Queue enough resources for the given job
    
    188
    +    # Returns:
    
    189
    +    #     ([Job]): A list of jobs which can be run now
    
    218 190
         #
    
    219
    -    def pop_ready_jobs(self):
    
    191
    +    def harvest_jobs(self):
    
    220 192
             unready = []
    
    221 193
             ready = []
    
    222 194
     
    
    223 195
             while self._wait_queue:
    
    224
    -            job = self._wait_queue.popleft()
    
    225
    -            element = job.element
    
    196
    +            if not self._resources.reserve(self.resources, peek=True):
    
    197
    +                break
    
    226 198
     
    
    199
    +            element = self._wait_queue.popleft()
    
    227 200
                 status = self.status(element)
    
    201
    +
    
    228 202
                 if status == QueueStatus.WAIT:
    
    229
    -                unready.append(job)
    
    230
    -                continue
    
    203
    +                unready.append(element)
    
    231 204
                 elif status == QueueStatus.SKIP:
    
    232
    -                self._done_queue.append(job)
    
    205
    +                self._done_queue.append(element)
    
    233 206
                     self.skipped_elements.append(element)
    
    234
    -                continue
    
    235
    -
    
    236
    -            self.prepare(element)
    
    237
    -            ready.append(job)
    
    207
    +            else:
    
    208
    +                reserved = self._resources.reserve(self.resources)
    
    209
    +                assert reserved
    
    210
    +                ready.append(element)
    
    238 211
     
    
    239
    -        # These were not ready but were in the beginning, give em
    
    240
    -        # first priority again next time around
    
    241 212
             self._wait_queue.extendleft(unready)
    
    242 213
     
    
    243
    -        return ready
    
    244
    -
    
    245
    -    def peek_ready_jobs(self):
    
    246
    -        def ready(job):
    
    247
    -            return self.status(job.element) == QueueStatus.READY
    
    248
    -
    
    249
    -        yield from (job for job in self._wait_queue if ready(job))
    
    214
    +        return [
    
    215
    +            ElementJob(self._scheduler, self.action_name,
    
    216
    +                       self._element_log_path(element),
    
    217
    +                       element=element, queue=self,
    
    218
    +                       action_cb=self.process,
    
    219
    +                       complete_cb=self._job_done,
    
    220
    +                       max_retries=self._max_retries)
    
    221
    +            for element in ready
    
    222
    +        ]
    
    250 223
     
    
    251 224
         #####################################################
    
    252 225
         #                 Private Methods                   #
    
    ... ... @@ -292,6 +265,10 @@ class Queue():
    292 265
         #
    
    293 266
         def _job_done(self, job, element, status, result):
    
    294 267
     
    
    268
    +        # Now release the resources we reserved
    
    269
    +        #
    
    270
    +        self._resources.release(self.resources)
    
    271
    +
    
    295 272
             # Update values that need to be synchronized in the main task
    
    296 273
             # before calling any queue implementation
    
    297 274
             self._update_workspaces(element, job)
    
    ... ... @@ -324,12 +301,8 @@ class Queue():
    324 301
                               detail=traceback.format_exc())
    
    325 302
                 self.failed_elements.append(element)
    
    326 303
             else:
    
    327
    -            #
    
    328
    -            # No exception occured in post processing
    
    329
    -            #
    
    330
    -
    
    331
    -            # All jobs get placed on the done queue for later processing.
    
    332
    -            self._done_queue.append(job)
    
    304
    +            # All elements get placed on the done queue for later processing.
    
    305
    +            self._done_queue.append(element)
    
    333 306
     
    
    334 307
                 # These lists are for bookkeeping purposes for the UI and logging.
    
    335 308
                 if status == JobStatus.SKIPPED:
    

  • buildstream/_scheduler/resources.py
    ... ... @@ -34,28 +34,25 @@ class Resources():
    34 34
                 ResourceType.UPLOAD: set()
    
    35 35
             }
    
    36 36
     
    
    37
    -    def clear_job_resources(self, job):
    
    38
    -        for resource in job.exclusive_resources:
    
    39
    -            self._exclusive_resources[resource].remove(hash(job))
    
    37
    +    # reserve()
    
    38
    +    #
    
    39
    +    # Reserves a set of resources
    
    40
    +    #
    
    41
    +    # Args:
    
    42
    +    #    resources (set): A set of ResourceTypes
    
    43
    +    #    exclusive (set): Another set of ResourceTypes
    
    44
    +    #    peek (bool): Whether to only peek at whether the resource is available
    
    45
    +    #
    
    46
    +    # Returns:
    
    47
    +    #    (bool): True if the resources could be reserved
    
    48
    +    #
    
    49
    +    def reserve(self, resources, exclusive=None, *, peek=False):
    
    50
    +        if exclusive is None:
    
    51
    +            exclusive = set()
    
    40 52
     
    
    41
    -        for resource in job.resources:
    
    42
    -            self._used_resources[resource] -= 1
    
    43
    -
    
    44
    -    def reserve_exclusive_resources(self, job):
    
    45
    -        exclusive = job.exclusive_resources
    
    46
    -
    
    47
    -        # The very first thing we do is to register any exclusive
    
    48
    -        # resources this job may want. Even if the job is not yet
    
    49
    -        # allowed to run (because another job is holding the resource
    
    50
    -        # it wants), we can still set this - it just means that any
    
    51
    -        # job *currently* using these resources has to finish first,
    
    52
    -        # and no new jobs wanting these can be launched (except other
    
    53
    -        # exclusive-access jobs).
    
    54
    -        #
    
    55
    -        for resource in exclusive:
    
    56
    -            self._exclusive_resources[resource].add(hash(job))
    
    53
    +        resources = set(resources)
    
    54
    +        exclusive = set(exclusive)
    
    57 55
     
    
    58
    -    def reserve_job_resources(self, job):
    
    59 56
             # First, we check if the job wants to access a resource that
    
    60 57
             # another job wants exclusive access to. If so, it cannot be
    
    61 58
             # scheduled.
    
    ... ... @@ -68,7 +65,8 @@ class Resources():
    68 65
             #        is currently not possible, but may be worth thinking
    
    69 66
             #        about.
    
    70 67
             #
    
    71
    -        for resource in job.resources - job.exclusive_resources:
    
    68
    +        for resource in resources - exclusive:
    
    69
    +
    
    72 70
                 # If our job wants this resource exclusively, we never
    
    73 71
                 # check this, so we can get away with not (temporarily)
    
    74 72
                 # removing it from the set.
    
    ... ... @@ -84,14 +82,14 @@ class Resources():
    84 82
             # at a time, despite being allowed to be part of the exclusive
    
    85 83
             # set.
    
    86 84
             #
    
    87
    -        for exclusive in job.exclusive_resources:
    
    88
    -            if self._used_resources[exclusive] != 0:
    
    85
    +        for resource in exclusive:
    
    86
    +            if self._used_resources[resource] != 0:
    
    89 87
                     return False
    
    90 88
     
    
    91 89
             # Finally, we check if we have enough of each resource
    
    92 90
             # available. If we don't have enough, the job cannot be
    
    93 91
             # scheduled.
    
    94
    -        for resource in job.resources:
    
    92
    +        for resource in resources:
    
    95 93
                 if (self._max_resources[resource] > 0 and
    
    96 94
                         self._used_resources[resource] >= self._max_resources[resource]):
    
    97 95
                     return False
    
    ... ... @@ -99,7 +97,70 @@ class Resources():
    99 97
             # Now we register the fact that our job is using the resources
    
    100 98
             # it asked for, and tell the scheduler that it is allowed to
    
    101 99
             # continue.
    
    102
    -        for resource in job.resources:
    
    103
    -            self._used_resources[resource] += 1
    
    100
    +        if not peek:
    
    101
    +            for resource in resources:
    
    102
    +                self._used_resources[resource] += 1
    
    104 103
     
    
    105 104
             return True
    
    105
    +
    
    106
    +    # release()
    
    107
    +    #
    
    108
    +    # Release resources previously reserved with Resources.reserve()
    
    109
    +    #
    
    110
    +    # Args:
    
    111
    +    #    resources (set): A set of resources to release
    
    112
    +    #
    
    113
    +    def release(self, resources):
    
    114
    +        for resource in resources:
    
    115
    +            assert self._used_resources[resource] > 0, "Scheduler resource imbalance"
    
    116
    +            self._used_resources[resource] -= 1
    
    117
    +
    
    118
    +    # register_exclusive_interest()
    
    119
    +    #
    
    120
    +    # Inform the resources pool that `source` has an interest in
    
    121
    +    # reserving this resource exclusively.
    
    122
    +    #
    
    123
    +    # The source parameter is used to identify the caller, it
    
    124
    +    # must be ensured to be unique for the time that the
    
    125
    +    # interest is registered.
    
    126
    +    #
    
    127
    +    # This function may be called multiple times, and subsequent
    
    128
    +    # calls will simply have no effect until clear_exclusive_interest()
    
    129
    +    # is used to clear the interest.
    
    130
    +    #
    
    131
    +    # This must be called in advance of reserve()
    
    132
    +    #
    
    133
    +    # Args:
    
    134
    +    #    resources (set): Set of resources to reserve exclusively
    
    135
    +    #    source (any): Source identifier, to be used again when unregistering
    
    136
    +    #                  the interest.
    
    137
    +    #
    
    138
    +    def register_exclusive_interest(self, resources, source):
    
    139
    +
    
    140
    +        # The very first thing we do is to register any exclusive
    
    141
    +        # resources this job may want. Even if the job is not yet
    
    142
    +        # allowed to run (because another job is holding the resource
    
    143
    +        # it wants), we can still set this - it just means that any
    
    144
    +        # job *currently* using these resources has to finish first,
    
    145
    +        # and no new jobs wanting these can be launched (except other
    
    146
    +        # exclusive-access jobs).
    
    147
    +        #
    
    148
    +        for resource in resources:
    
    149
    +            self._exclusive_resources[resource].add(source)
    
    150
    +
    
    151
    +    # unregister_exclusive_interest()
    
    152
    +    #
    
    153
    +    # Clear the exclusive interest in these resources.
    
    154
    +    #
    
    155
    +    # This should be called by the given source which registered
    
    156
    +    # an exclusive interest.
    
    157
    +    #
    
    158
    +    # Args:
    
    159
    +    #    resources (set): Set of resources to reserve exclusively
    
    160
    +    #    source (str): Source identifier, to be used again when unregistering
    
    161
    +    #                  the interest.
    
    162
    +    #
    
    163
    +    def unregister_exclusive_interest(self, resources, source):
    
    164
    +
    
    165
    +        for resource in resources:
    
    166
    +            self._exclusive_resources[resource].remove(source)

  • buildstream/_scheduler/scheduler.py
    ... ... @@ -38,14 +38,10 @@ class SchedStatus():
    38 38
         TERMINATED = 1
    
    39 39
     
    
    40 40
     
    
    41
    -# Our _REDUNDANT_EXCLUSIVE_ACTIONS jobs are special ones
    
    42
    -# which we launch dynamically, they have the property of being
    
    43
    -# meaningless to queue if one is already queued, and it also
    
    44
    -# doesnt make sense to run them in parallel
    
    41
    +# Some action names for the internal jobs we launch
    
    45 42
     #
    
    46 43
     _ACTION_NAME_CLEANUP = 'cleanup'
    
    47 44
     _ACTION_NAME_CACHE_SIZE = 'cache_size'
    
    48
    -_REDUNDANT_EXCLUSIVE_ACTIONS = [_ACTION_NAME_CLEANUP, _ACTION_NAME_CACHE_SIZE]
    
    49 45
     
    
    50 46
     
    
    51 47
     # Scheduler()
    
    ... ... @@ -81,8 +77,6 @@ class Scheduler():
    81 77
             #
    
    82 78
             # Public members
    
    83 79
             #
    
    84
    -        self.active_jobs = []       # Jobs currently being run in the scheduler
    
    85
    -        self.waiting_jobs = []      # Jobs waiting for resources
    
    86 80
             self.queues = None          # Exposed for the frontend to print summaries
    
    87 81
             self.context = context      # The Context object shared with Queues
    
    88 82
             self.terminated = False     # Whether the scheduler was asked to terminate or has terminated
    
    ... ... @@ -95,15 +89,23 @@ class Scheduler():
    95 89
             #
    
    96 90
             # Private members
    
    97 91
             #
    
    92
    +        self._active_jobs = []                # Jobs currently being run in the scheduler
    
    93
    +        self._starttime = start_time          # Initial application start time
    
    94
    +        self._suspendtime = None              # Session time compensation for suspended state
    
    95
    +        self._queue_jobs = True               # Whether we should continue to queue jobs
    
    96
    +
    
    97
    +        # State of cache management related jobs
    
    98
    +        self._cache_size_scheduled = False    # Whether we have a cache size job scheduled
    
    99
    +        self._cache_size_running = None       # A running CacheSizeJob, or None
    
    100
    +        self._cleanup_scheduled = False       # Whether we have a cleanup job scheduled
    
    101
    +        self._cleanup_running = None          # A running CleanupJob, or None
    
    102
    +
    
    103
    +        # Callbacks to report back to the Scheduler owner
    
    98 104
             self._interrupt_callback = interrupt_callback
    
    99 105
             self._ticker_callback = ticker_callback
    
    100 106
             self._job_start_callback = job_start_callback
    
    101 107
             self._job_complete_callback = job_complete_callback
    
    102 108
     
    
    103
    -        self._starttime = start_time
    
    104
    -        self._suspendtime = None
    
    105
    -        self._queue_jobs = True      # Whether we should continue to queue jobs
    
    106
    -
    
    107 109
             # Whether our exclusive jobs, like 'cleanup' are currently already
    
    108 110
             # waiting or active.
    
    109 111
             #
    
    ... ... @@ -113,9 +115,9 @@ class Scheduler():
    113 115
             self._exclusive_waiting = set()
    
    114 116
             self._exclusive_active = set()
    
    115 117
     
    
    116
    -        self._resources = Resources(context.sched_builders,
    
    117
    -                                    context.sched_fetchers,
    
    118
    -                                    context.sched_pushers)
    
    118
    +        self.resources = Resources(context.sched_builders,
    
    119
    +                                   context.sched_fetchers,
    
    120
    +                                   context.sched_pushers)
    
    119 121
     
    
    120 122
         # run()
    
    121 123
         #
    
    ... ... @@ -150,7 +152,7 @@ class Scheduler():
    150 152
             self._connect_signals()
    
    151 153
     
    
    152 154
             # Run the queues
    
    153
    -        self._schedule_queue_jobs()
    
    155
    +        self._sched()
    
    154 156
             self.loop.run_forever()
    
    155 157
             self.loop.close()
    
    156 158
     
    
    ... ... @@ -240,12 +242,32 @@ class Scheduler():
    240 242
         #    status (JobStatus): The status of the completed job
    
    241 243
         #
    
    242 244
         def job_completed(self, job, status):
    
    243
    -        self._resources.clear_job_resources(job)
    
    244
    -        self.active_jobs.remove(job)
    
    245
    -        if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
    
    246
    -            self._exclusive_active.remove(job.action_name)
    
    245
    +
    
    246
    +        #
    
    247
    +        # Deallocate resources for our own internal jobs here
    
    248
    +        #
    
    249
    +        if job is self._cache_size_running:
    
    250
    +            self._cache_size_running = None
    
    251
    +            self.resources.release([ResourceType.CACHE, ResourceType.PROCESS])
    
    252
    +
    
    253
    +        elif job is self._cleanup_running:
    
    254
    +            self._cleanup_running = None
    
    255
    +            self.resources.release([ResourceType.CACHE, ResourceType.PROCESS])
    
    256
    +
    
    257
    +            # Unregister the exclusive interest when we're done with it
    
    258
    +            if not self._cleanup_scheduled:
    
    259
    +                self.resources.unregister_exclusive_interest(
    
    260
    +                    [ResourceType.CACHE],
    
    261
    +                    "cache-cleanup"
    
    262
    +                )
    
    263
    +
    
    264
    +        # Remove from the active jobs list
    
    265
    +        self._active_jobs.remove(job)
    
    266
    +
    
    267
    +        # Scheduler owner facing callback
    
    247 268
             self._job_complete_callback(job, status)
    
    248
    -        self._schedule_queue_jobs()
    
    269
    +
    
    270
    +        # Now check for more jobs
    
    249 271
             self._sched()
    
    250 272
     
    
    251 273
         # check_cache_size():
    
    ... ... @@ -255,78 +277,87 @@ class Scheduler():
    255 277
         # if needed.
    
    256 278
         #
    
    257 279
         def check_cache_size(self):
    
    258
    -        job = CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE,
    
    259
    -                           'cache_size/cache_size',
    
    260
    -                           resources=[ResourceType.CACHE,
    
    261
    -                                      ResourceType.PROCESS],
    
    262
    -                           complete_cb=self._run_cleanup)
    
    263
    -        self._schedule_jobs([job])
    
    280
    +
    
    281
    +        # Here we assume we are called in response to a job
    
    282
    +        # completion callback, or before entering the scheduler.
    
    283
    +        #
    
    284
    +        # As such there is no need to call `_sched()` from here,
    
    285
    +        # and we prefer to run it once at the last moment.
    
    286
    +        #
    
    287
    +        self._cache_size_scheduled = True
    
    264 288
     
    
    265 289
         #######################################################
    
    266 290
         #                  Local Private Methods              #
    
    267 291
         #######################################################
    
    268 292
     
    
    269
    -    # _sched()
    
    293
    +    # _spawn_job()
    
    270 294
         #
    
    271
    -    # The main driving function of the scheduler, it will be called
    
    272
    -    # automatically when Scheduler.run() is called initially,
    
    295
    +    # Spanws a job
    
    273 296
         #
    
    274
    -    def _sched(self):
    
    275
    -        for job in self.waiting_jobs:
    
    276
    -            self._resources.reserve_exclusive_resources(job)
    
    277
    -
    
    278
    -        for job in self.waiting_jobs:
    
    279
    -            if not self._resources.reserve_job_resources(job):
    
    280
    -                continue
    
    281
    -
    
    282
    -            # Postpone these jobs if one is already running
    
    283
    -            if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS and \
    
    284
    -               job.action_name in self._exclusive_active:
    
    285
    -                continue
    
    286
    -
    
    287
    -            job.spawn()
    
    288
    -            self.waiting_jobs.remove(job)
    
    289
    -            self.active_jobs.append(job)
    
    290
    -
    
    291
    -            if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
    
    292
    -                self._exclusive_waiting.remove(job.action_name)
    
    293
    -                self._exclusive_active.add(job.action_name)
    
    297
    +    # Args:
    
    298
    +    #    job (Job): The job to spawn
    
    299
    +    #
    
    300
    +    def _spawn_job(self, job):
    
    301
    +        job.spawn()
    
    302
    +        self._active_jobs.append(job)
    
    303
    +        if self._job_start_callback:
    
    304
    +            self._job_start_callback(job)
    
    294 305
     
    
    295
    -            if self._job_start_callback:
    
    296
    -                self._job_start_callback(job)
    
    306
    +    # Custom complete callback for the cache size job
    
    307
    +    def _cache_size_job_complete(self, cache_size):
    
    308
    +        context = self.context
    
    309
    +        artifacts = context.artifactcache
    
    310
    +        if not artifacts.has_quota_exceeded():
    
    311
    +            return
    
    297 312
     
    
    298
    -        # If nothings ticking, time to bail out
    
    299
    -        if not self.active_jobs and not self.waiting_jobs:
    
    300
    -            self.loop.stop()
    
    313
    +        # Schedule a cleanup job if we've hit the threshold
    
    314
    +        self._cleanup_scheduled = True
    
    301 315
     
    
    302
    -    # _schedule_jobs()
    
    303
    -    #
    
    304
    -    # The main entry point for jobs to be scheduled.
    
    316
    +    # _spawn_internal_jobs()
    
    305 317
         #
    
    306
    -    # This is called either as a result of scanning the queues
    
    307
    -    # in _schedule_queue_jobs(), or directly by the Scheduler
    
    308
    -    # to insert special jobs like cleanups.
    
    318
    +    # Spawn jobs that have to do with internal business logic, such as
    
    319
    +    # cache management related jobs.
    
    309 320
         #
    
    310
    -    # Args:
    
    311
    -    #     jobs ([Job]): A list of jobs to schedule
    
    312
    -    #
    
    313
    -    def _schedule_jobs(self, jobs):
    
    314
    -        for job in jobs:
    
    321
    +    def _spawn_internal_jobs(self):
    
    322
    +
    
    323
    +        #
    
    324
    +        # Try to run a cleanup job if we can
    
    325
    +        #
    
    326
    +        if self._cleanup_scheduled and self._cleanup_running is None:
    
    315 327
     
    
    316
    -            # Special treatment of our redundant exclusive jobs
    
    317 328
                 #
    
    318
    -            if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
    
    329
    +            # Ensure we have an exclusive interest in the resources
    
    330
    +            #
    
    331
    +            self.resources.register_exclusive_interest(
    
    332
    +                [ResourceType.CACHE],
    
    333
    +                "cache-cleanup"
    
    334
    +            )
    
    335
    +
    
    336
    +            if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS],
    
    337
    +                                      [ResourceType.CACHE]):
    
    319 338
     
    
    320
    -                # Drop the job if one is already queued
    
    321
    -                if job.action_name in self._exclusive_waiting:
    
    322
    -                    continue
    
    339
    +                # Update state and launch
    
    340
    +                self._cleanup_scheduled = False
    
    341
    +                self._cleanup_running = \
    
    342
    +                    CleanupJob(self, _ACTION_NAME_CLEANUP, 'cleanup/cleanup')
    
    343
    +                self._spawn_job(self._cleanup_running)
    
    323 344
     
    
    324
    -                # Mark this action type as queued
    
    325
    -                self._exclusive_waiting.add(job.action_name)
    
    345
    +        #
    
    346
    +        # Try to run a cache size job if we can
    
    347
    +        #
    
    348
    +        if self._cache_size_scheduled and not self._cache_size_running:
    
    326 349
     
    
    327
    -            self.waiting_jobs.append(job)
    
    350
    +            if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS]):
    
    328 351
     
    
    329
    -    # _schedule_queue_jobs()
    
    352
    +                # Update state and launch
    
    353
    +                self._cache_size_scheduled = False
    
    354
    +                self._cache_size_running = \
    
    355
    +                    CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE,
    
    356
    +                                 'cache_size/cache_size',
    
    357
    +                                 complete_cb=self._cache_size_job_complete)
    
    358
    +                self._spawn_job(self._cache_size_running)
    
    359
    +
    
    360
    +    # _spawn_queue_jobs()
    
    330 361
         #
    
    331 362
         # Ask the queues what jobs they want to schedule and schedule
    
    332 363
         # them. This is done here so we can ask for new jobs when jobs
    
    ... ... @@ -335,7 +366,7 @@ class Scheduler():
    335 366
         # This will process the Queues, pull elements through the Queues
    
    336 367
         # and process anything that is ready.
    
    337 368
         #
    
    338
    -    def _schedule_queue_jobs(self):
    
    369
    +    def _spawn_queue_jobs(self):
    
    339 370
             ready = []
    
    340 371
             process_queues = True
    
    341 372
     
    
    ... ... @@ -344,10 +375,7 @@ class Scheduler():
    344 375
                 # Pull elements forward through queues
    
    345 376
                 elements = []
    
    346 377
                 for queue in self.queues:
    
    347
    -                # Enqueue elements complete from the last queue
    
    348 378
                     queue.enqueue(elements)
    
    349
    -
    
    350
    -                # Dequeue processed elements for the next queue
    
    351 379
                     elements = list(queue.dequeue())
    
    352 380
     
    
    353 381
                 # Kickoff whatever processes can be processed at this time
    
    ... ... @@ -362,41 +390,51 @@ class Scheduler():
    362 390
                 # thus need all the pulls to complete before ever starting
    
    363 391
                 # a build
    
    364 392
                 ready.extend(chain.from_iterable(
    
    365
    -                queue.pop_ready_jobs() for queue in reversed(self.queues)
    
    393
    +                q.harvest_jobs() for q in reversed(self.queues)
    
    366 394
                 ))
    
    367 395
     
    
    368
    -            # pop_ready_jobs() may have skipped jobs, adding them to
    
    369
    -            # the done_queue.  Pull these skipped elements forward to
    
    370
    -            # the next queue and process them.
    
    396
    +            # harvest_jobs() may have decided to skip some jobs, making
    
    397
    +            # them eligible for promotion to the next queue as a side effect.
    
    398
    +            #
    
    399
    +            # If that happens, do another round.
    
    371 400
                 process_queues = any(q.dequeue_ready() for q in self.queues)
    
    372 401
     
    
    373
    -        self._schedule_jobs(ready)
    
    374
    -        self._sched()
    
    402
    +        # Spawn the jobs
    
    403
    +        #
    
    404
    +        for job in ready:
    
    405
    +            self._spawn_job(job)
    
    375 406
     
    
    376
    -    # _run_cleanup()
    
    377
    -    #
    
    378
    -    # Schedules the cache cleanup job if the passed size
    
    379
    -    # exceeds the cache quota.
    
    407
    +    # _sched()
    
    380 408
         #
    
    381
    -    # Args:
    
    382
    -    #    cache_size (int): The calculated cache size (ignored)
    
    409
    +    # Run any jobs which are ready to run, or quit the main loop
    
    410
    +    # when nothing is running or is ready to run.
    
    383 411
         #
    
    384
    -    # NOTE: This runs in response to completion of the cache size
    
    385
    -    #       calculation job lauched by Scheduler.check_cache_size(),
    
    386
    -    #       which will report the calculated cache size.
    
    412
    +    # This is the main driving function of the scheduler, it is called
    
    413
    +    # initially when we enter Scheduler.run(), and at the end of whenever
    
    414
    +    # any job completes, after any bussiness logic has occurred and before
    
    415
    +    # going back to sleep.
    
    387 416
         #
    
    388
    -    def _run_cleanup(self, cache_size):
    
    389
    -        context = self.context
    
    390
    -        artifacts = context.artifactcache
    
    417
    +    def _sched(self):
    
    391 418
     
    
    392
    -        if not artifacts.has_quota_exceeded():
    
    393
    -            return
    
    419
    +        if not self.terminated:
    
    420
    +
    
    421
    +            #
    
    422
    +            # Process our internal business related jobs first,
    
    423
    +            # they have overall priority over queues.
    
    424
    +            #
    
    425
    +            self._spawn_internal_jobs()
    
    394 426
     
    
    395
    -        job = CleanupJob(self, _ACTION_NAME_CLEANUP, 'cleanup/cleanup',
    
    396
    -                         resources=[ResourceType.CACHE,
    
    397
    -                                    ResourceType.PROCESS],
    
    398
    -                         exclusive_resources=[ResourceType.CACHE])
    
    399
    -        self._schedule_jobs([job])
    
    427
    +            #
    
    428
    +            # Run as many jobs as the queues can handle for the
    
    429
    +            # available resources
    
    430
    +            #
    
    431
    +            self._spawn_queue_jobs()
    
    432
    +
    
    433
    +        #
    
    434
    +        # If nothing is ticking then bail out
    
    435
    +        #
    
    436
    +        if not self._active_jobs:
    
    437
    +            self.loop.stop()
    
    400 438
     
    
    401 439
         # _suspend_jobs()
    
    402 440
         #
    
    ... ... @@ -406,7 +444,7 @@ class Scheduler():
    406 444
             if not self.suspended:
    
    407 445
                 self._suspendtime = datetime.datetime.now()
    
    408 446
                 self.suspended = True
    
    409
    -            for job in self.active_jobs:
    
    447
    +            for job in self._active_jobs:
    
    410 448
                     job.suspend()
    
    411 449
     
    
    412 450
         # _resume_jobs()
    
    ... ... @@ -415,7 +453,7 @@ class Scheduler():
    415 453
         #
    
    416 454
         def _resume_jobs(self):
    
    417 455
             if self.suspended:
    
    418
    -            for job in self.active_jobs:
    
    456
    +            for job in self._active_jobs:
    
    419 457
                     job.resume()
    
    420 458
                 self.suspended = False
    
    421 459
                 self._starttime += (datetime.datetime.now() - self._suspendtime)
    
    ... ... @@ -488,19 +526,16 @@ class Scheduler():
    488 526
             wait_limit = 20.0
    
    489 527
     
    
    490 528
             # First tell all jobs to terminate
    
    491
    -        for job in self.active_jobs:
    
    529
    +        for job in self._active_jobs:
    
    492 530
                 job.terminate()
    
    493 531
     
    
    494 532
             # Now wait for them to really terminate
    
    495
    -        for job in self.active_jobs:
    
    533
    +        for job in self._active_jobs:
    
    496 534
                 elapsed = datetime.datetime.now() - wait_start
    
    497 535
                 timeout = max(wait_limit - elapsed.total_seconds(), 0.0)
    
    498 536
                 if not job.terminate_wait(timeout):
    
    499 537
                     job.kill()
    
    500 538
     
    
    501
    -        # Clear out the waiting jobs
    
    502
    -        self.waiting_jobs = []
    
    503
    -
    
    504 539
         # Regular timeout for driving status in the UI
    
    505 540
         def _tick(self):
    
    506 541
             elapsed = self.elapsed_time()
    

  • tests/frontend/order.py
    ... ... @@ -12,7 +12,21 @@ DATA_DIR = os.path.join(
    12 12
     )
    
    13 13
     
    
    14 14
     
    
    15
    -def create_element(repo, name, path, dependencies, ref=None):
    
    15
    +# create_element()
    
    16
    +#
    
    17
    +# Args:
    
    18
    +#    project (str): The project directory where testing is happening
    
    19
    +#    name (str): The element name to create
    
    20
    +#    dependencies (list): The list of dependencies to dump into YAML format
    
    21
    +#
    
    22
    +# Returns:
    
    23
    +#    (Repo): The corresponding git repository created for the element
    
    24
    +def create_element(project, name, dependencies):
    
    25
    +    dev_files_path = os.path.join(project, 'files', 'dev-files')
    
    26
    +    element_path = os.path.join(project, 'elements')
    
    27
    +    repo = create_repo('git', project, "{}-repo".format(name))
    
    28
    +    ref = repo.create(dev_files_path)
    
    29
    +
    
    16 30
         element = {
    
    17 31
             'kind': 'import',
    
    18 32
             'sources': [
    
    ... ... @@ -20,7 +34,9 @@ def create_element(repo, name, path, dependencies, ref=None):
    20 34
             ],
    
    21 35
             'depends': dependencies
    
    22 36
         }
    
    23
    -    _yaml.dump(element, os.path.join(path, name))
    
    37
    +    _yaml.dump(element, os.path.join(element_path, name))
    
    38
    +
    
    39
    +    return repo
    
    24 40
     
    
    25 41
     
    
    26 42
     # This tests a variety of scenarios and checks that the order in
    
    ... ... @@ -59,18 +75,6 @@ def create_element(repo, name, path, dependencies, ref=None):
    59 75
     @pytest.mark.parametrize("operation", [('show'), ('fetch'), ('build')])
    
    60 76
     def test_order(cli, datafiles, tmpdir, operation, target, template, expected):
    
    61 77
         project = os.path.join(datafiles.dirname, datafiles.basename)
    
    62
    -    dev_files_path = os.path.join(project, 'files', 'dev-files')
    
    63
    -    element_path = os.path.join(project, 'elements')
    
    64
    -
    
    65
    -    # FIXME: Remove this when the test passes reliably.
    
    66
    -    #
    
    67
    -    #        There is no reason why the order should not
    
    68
    -    #        be preserved when the builders is set to 1,
    
    69
    -    #        the scheduler queue processing still seems to
    
    70
    -    #        be losing the order.
    
    71
    -    #
    
    72
    -    if operation == 'build':
    
    73
    -        pytest.skip("FIXME: This still only sometimes passes")
    
    74 78
     
    
    75 79
         # Configure to only allow one fetcher at a time, make it easy to
    
    76 80
         # determine what is being planned in what order.
    
    ... ... @@ -84,11 +88,8 @@ def test_order(cli, datafiles, tmpdir, operation, target, template, expected):
    84 88
         # Build the project from the template, make import elements
    
    85 89
         # all with the same repo
    
    86 90
         #
    
    87
    -    repo = create_repo('git', str(tmpdir))
    
    88
    -    ref = repo.create(dev_files_path)
    
    89 91
         for element, dependencies in template.items():
    
    90
    -        create_element(repo, element, element_path, dependencies, ref=ref)
    
    91
    -        repo.add_commit()
    
    92
    +        create_element(project, element, dependencies)
    
    92 93
     
    
    93 94
         # Run test and collect results
    
    94 95
         if operation == 'show':
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]