[Notes] [Git][BuildStream/buildstream][master] 5 commits: _scheduler/resources.py: Dont error out in unregister_exclusive_interest()



Title: GitLab

Tristan Van Berkom pushed to branch master at BuildStream / buildstream

Commits:

5 changed files:

Changes:

  • buildstream/_frontend/widget.py
    ... ... @@ -175,29 +175,22 @@ class TypeName(Widget):
    175 175
     # A widget for displaying the Element name
    
    176 176
     class ElementName(Widget):
    
    177 177
     
    
    178
    -    def __init__(self, context, content_profile, format_profile):
    
    179
    -        super(ElementName, self).__init__(context, content_profile, format_profile)
    
    180
    -
    
    181
    -        # Pre initialization format string, before we know the length of
    
    182
    -        # element names in the pipeline
    
    183
    -        self._fmt_string = '{: <30}'
    
    184
    -
    
    185 178
         def render(self, message):
    
    179
    +        action_name = message.action_name
    
    186 180
             element_id = message.task_id or message.unique_id
    
    187
    -        if element_id is None:
    
    188
    -            return ""
    
    189
    -
    
    190
    -        plugin = _plugin_lookup(element_id)
    
    191
    -        name = plugin._get_full_name()
    
    181
    +        if element_id is not None:
    
    182
    +            plugin = _plugin_lookup(element_id)
    
    183
    +            name = plugin._get_full_name()
    
    184
    +            name = '{: <30}'.format(name)
    
    185
    +        else:
    
    186
    +            name = 'core activity'
    
    187
    +            name = '{: <30}'.format(name)
    
    192 188
     
    
    193
    -        # Sneak the action name in with the element name
    
    194
    -        action_name = message.action_name
    
    195 189
             if not action_name:
    
    196 190
                 action_name = "Main"
    
    197 191
     
    
    198 192
             return self.content_profile.fmt("{: >5}".format(action_name.lower())) + \
    
    199
    -            self.format_profile.fmt(':') + \
    
    200
    -            self.content_profile.fmt(self._fmt_string.format(name))
    
    193
    +            self.format_profile.fmt(':') + self.content_profile.fmt(name)
    
    201 194
     
    
    202 195
     
    
    203 196
     # A widget for displaying the primary message text
    
    ... ... @@ -219,9 +212,12 @@ class CacheKey(Widget):
    219 212
         def render(self, message):
    
    220 213
     
    
    221 214
             element_id = message.task_id or message.unique_id
    
    222
    -        if element_id is None or not self._key_length:
    
    215
    +        if not self._key_length:
    
    223 216
                 return ""
    
    224 217
     
    
    218
    +        if element_id is None:
    
    219
    +            return ' ' * self._key_length
    
    220
    +
    
    225 221
             missing = False
    
    226 222
             key = ' ' * self._key_length
    
    227 223
             plugin = _plugin_lookup(element_id)
    

  • buildstream/_scheduler/resources.py
    ... ... @@ -163,4 +163,4 @@ class Resources():
    163 163
         def unregister_exclusive_interest(self, resources, source):
    
    164 164
     
    
    165 165
             for resource in resources:
    
    166
    -            self._exclusive_resources[resource].remove(source)
    166
    +            self._exclusive_resources[resource].discard(source)

  • buildstream/_scheduler/scheduler.py
    ... ... @@ -40,8 +40,8 @@ class SchedStatus():
    40 40
     
    
    41 41
     # Some action names for the internal jobs we launch
    
    42 42
     #
    
    43
    -_ACTION_NAME_CLEANUP = 'cleanup'
    
    44
    -_ACTION_NAME_CACHE_SIZE = 'cache_size'
    
    43
    +_ACTION_NAME_CLEANUP = 'clean'
    
    44
    +_ACTION_NAME_CACHE_SIZE = 'size'
    
    45 45
     
    
    46 46
     
    
    47 47
     # Scheduler()
    
    ... ... @@ -151,6 +151,9 @@ class Scheduler():
    151 151
             # Handle unix signals while running
    
    152 152
             self._connect_signals()
    
    153 153
     
    
    154
    +        # Check if we need to start with some cache maintenance
    
    155
    +        self._check_cache_management()
    
    156
    +
    
    154 157
             # Run the queues
    
    155 158
             self._sched()
    
    156 159
             self.loop.run_forever()
    
    ... ... @@ -272,6 +275,31 @@ class Scheduler():
    272 275
         #                  Local Private Methods              #
    
    273 276
         #######################################################
    
    274 277
     
    
    278
    +    # _check_cache_management()
    
    279
    +    #
    
    280
    +    # Run an initial check if we need to lock the cache
    
    281
    +    # resource and check the size and possibly launch
    
    282
    +    # a cleanup.
    
    283
    +    #
    
    284
    +    # Sessions which do not add to the cache are not affected.
    
    285
    +    #
    
    286
    +    def _check_cache_management(self):
    
    287
    +
    
    288
    +        # Only trigger the check for a scheduler run which has
    
    289
    +        # queues which require the CACHE resource.
    
    290
    +        if not any(q for q in self.queues
    
    291
    +                   if ResourceType.CACHE in q.resources):
    
    292
    +            return
    
    293
    +
    
    294
    +        # If the estimated size outgrows the quota, queue a job to
    
    295
    +        # actually check the real cache size initially, this one
    
    296
    +        # should have exclusive access to the cache to ensure nothing
    
    297
    +        # starts while we are checking the cache.
    
    298
    +        #
    
    299
    +        artifacts = self.context.artifactcache
    
    300
    +        if artifacts.has_quota_exceeded():
    
    301
    +            self._sched_cache_size_job(exclusive=True)
    
    302
    +
    
    275 303
         # _spawn_job()
    
    276 304
         #
    
    277 305
         # Spanws a job
    
    ... ... @@ -292,6 +320,11 @@ class Scheduler():
    292 320
             self._cache_size_running = None
    
    293 321
             self.resources.release([ResourceType.CACHE, ResourceType.PROCESS])
    
    294 322
     
    
    323
    +        # Unregister the exclusive interest if there was any
    
    324
    +        self.resources.unregister_exclusive_interest(
    
    325
    +            [ResourceType.CACHE], 'cache-size'
    
    326
    +        )
    
    327
    +
    
    295 328
             # Schedule a cleanup job if we've hit the threshold
    
    296 329
             if status != JobStatus.OK:
    
    297 330
                 return
    
    ... ... @@ -344,11 +377,35 @@ class Scheduler():
    344 377
         # Runs a cache size job if one is scheduled to run now and
    
    345 378
         # sufficient recources are available.
    
    346 379
         #
    
    347
    -    def _sched_cache_size_job(self):
    
    380
    +    # Args:
    
    381
    +    #    exclusive (bool): Run a cache size job immediately and
    
    382
    +    #                      hold the ResourceType.CACHE resource
    
    383
    +    #                      exclusively (used at startup).
    
    384
    +    #
    
    385
    +    def _sched_cache_size_job(self, *, exclusive=False):
    
    386
    +
    
    387
    +        # The exclusive argument is not intended (or safe) for arbitrary use.
    
    388
    +        if exclusive:
    
    389
    +            assert not self._cache_size_scheduled
    
    390
    +            assert not self._cache_size_running
    
    391
    +            assert not self._active_jobs
    
    392
    +            self._cache_size_scheduled = True
    
    348 393
     
    
    349 394
             if self._cache_size_scheduled and not self._cache_size_running:
    
    350 395
     
    
    351
    -            if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS]):
    
    396
    +            # Handle the exclusive launch
    
    397
    +            exclusive_resources = set()
    
    398
    +            if exclusive:
    
    399
    +                exclusive_resources.add(ResourceType.CACHE)
    
    400
    +                self.resources.register_exclusive_interest(
    
    401
    +                    exclusive_resources, 'cache-size'
    
    402
    +                )
    
    403
    +
    
    404
    +            # Reserve the resources (with the possible exclusive cache resource)
    
    405
    +            if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS],
    
    406
    +                                      exclusive_resources):
    
    407
    +
    
    408
    +                # Update state and launch
    
    352 409
                     self._cache_size_scheduled = False
    
    353 410
                     self._cache_size_running = \
    
    354 411
                         CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE,
    

  • tests/artifactcache/expiry.py
    ... ... @@ -18,6 +18,7 @@
    18 18
     #
    
    19 19
     
    
    20 20
     import os
    
    21
    +import re
    
    21 22
     from unittest import mock
    
    22 23
     
    
    23 24
     import pytest
    
    ... ... @@ -425,3 +426,66 @@ def test_extract_expiry(cli, datafiles, tmpdir):
    425 426
     
    
    426 427
         assert os.path.isdir(refsdirtarget2)
    
    427 428
         assert not os.path.exists(refsdirtarget)
    
    429
    +
    
    430
    +
    
    431
    +# Ensures that when launching BuildStream with a full artifact cache,
    
    432
    +# the cache size and cleanup jobs are run before any other jobs.
    
    433
    +#
    
    434
    +@pytest.mark.datafiles(DATA_DIR)
    
    435
    +def test_cleanup_first(cli, datafiles, tmpdir):
    
    436
    +    project = os.path.join(datafiles.dirname, datafiles.basename)
    
    437
    +    element_path = 'elements'
    
    438
    +    cache_location = os.path.join(project, 'cache', 'artifacts', 'ostree')
    
    439
    +    checkout = os.path.join(project, 'checkout')
    
    440
    +
    
    441
    +    cli.configure({
    
    442
    +        'cache': {
    
    443
    +            'quota': 10000000,
    
    444
    +        }
    
    445
    +    })
    
    446
    +
    
    447
    +    # Create an element that uses almost the entire cache (an empty
    
    448
    +    # ostree cache starts at about ~10KiB, so we need a bit of a
    
    449
    +    # buffer)
    
    450
    +    create_element_size('target.bst', project, element_path, [], 8000000)
    
    451
    +    res = cli.run(project=project, args=['build', 'target.bst'])
    
    452
    +    res.assert_success()
    
    453
    +
    
    454
    +    assert cli.get_element_state(project, 'target.bst') == 'cached'
    
    455
    +
    
    456
    +    # Now configure with a smaller quota, create a situation
    
    457
    +    # where the cache must be cleaned up before building anything else.
    
    458
    +    #
    
    459
    +    # Fix the fetchers and builders just to ensure a predictable
    
    460
    +    # sequence of events (although it does not effect this test)
    
    461
    +    cli.configure({
    
    462
    +        'cache': {
    
    463
    +            'quota': 5000000,
    
    464
    +        },
    
    465
    +        'scheduler': {
    
    466
    +            'fetchers': 1,
    
    467
    +            'builders': 1
    
    468
    +        }
    
    469
    +    })
    
    470
    +
    
    471
    +    # Our cache is now more than full, BuildStream
    
    472
    +    create_element_size('target2.bst', project, element_path, [], 4000000)
    
    473
    +    res = cli.run(project=project, args=['build', 'target2.bst'])
    
    474
    +    res.assert_success()
    
    475
    +
    
    476
    +    # Find all of the activity (like push, pull, fetch) lines
    
    477
    +    results = re.findall(r'\[.*\]\[.*\]\[\s*(\S+):.*\]\s*START\s*.*\.log', res.stderr)
    
    478
    +
    
    479
    +    # Don't bother checking the order of 'fetch', it is allowed to start
    
    480
    +    # before or after the initial cache size job, runs in parallel, and does
    
    481
    +    # not require ResourceType.CACHE.
    
    482
    +    results.remove('fetch')
    
    483
    +    print(results)
    
    484
    +
    
    485
    +    # Assert the expected sequence of events
    
    486
    +    assert results == ['size', 'clean', 'build']
    
    487
    +
    
    488
    +    # Check that the correct element remains in the cache
    
    489
    +    states = cli.get_element_states(project, ['target.bst', 'target2.bst'])
    
    490
    +    assert states['target.bst'] != 'cached'
    
    491
    +    assert states['target2.bst'] == 'cached'

  • tests/frontend/logging.py
    ... ... @@ -41,7 +41,7 @@ def test_default_logging(cli, tmpdir, datafiles):
    41 41
         result = cli.run(project=project, args=['source', 'fetch', element_name])
    
    42 42
         result.assert_success()
    
    43 43
     
    
    44
    -    m = re.search(r"\[\d\d:\d\d:\d\d\]\[\]\[\] SUCCESS Checking sources", result.stderr)
    
    44
    +    m = re.search(r"\[\d\d:\d\d:\d\d\]\[\s*\]\[.*\] SUCCESS Checking sources", result.stderr)
    
    45 45
         assert(m is not None)
    
    46 46
     
    
    47 47
     
    
    ... ... @@ -77,7 +77,7 @@ def test_custom_logging(cli, tmpdir, datafiles):
    77 77
         result = cli.run(project=project, args=['source', 'fetch', element_name])
    
    78 78
         result.assert_success()
    
    79 79
     
    
    80
    -    m = re.search(r"\d\d:\d\d:\d\d,\d\d:\d\d:\d\d.\d{6},\d\d:\d\d:\d\d,,,SUCCESS,Checking sources", result.stderr)
    
    80
    +    m = re.search(r"\d\d:\d\d:\d\d,\d\d:\d\d:\d\d.\d{6},\d\d:\d\d:\d\d,\s*,.*,SUCCESS,Checking sources", result.stderr)
    
    81 81
         assert(m is not None)
    
    82 82
     
    
    83 83
     
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]