[Notes] [Git][BuildStream/buildstream][master] 6 commits: Rework Skipped usage



Title: GitLab

Qinusty pushed to branch master at BuildStream / buildstream

Commits:

13 changed files:

Changes:

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -228,8 +228,8 @@ class CASCache(ArtifactCache):
    228 228
             for remote in self._remotes[project]:
    
    229 229
                 try:
    
    230 230
                     remote.init()
    
    231
    -
    
    232
    -                element.info("Pulling {} <- {}".format(element._get_brief_display_key(), remote.spec.url))
    
    231
    +                display_key = element._get_brief_display_key()
    
    232
    +                element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url))
    
    233 233
     
    
    234 234
                     request = buildstream_pb2.GetReferenceRequest()
    
    235 235
                     request.key = ref
    
    ... ... @@ -243,6 +243,7 @@ class CASCache(ArtifactCache):
    243 243
     
    
    244 244
                     self.set_ref(ref, tree)
    
    245 245
     
    
    246
    +                element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
    
    246 247
                     # no need to pull from additional remotes
    
    247 248
                     return True
    
    248 249
     
    
    ... ... @@ -251,11 +252,8 @@ class CASCache(ArtifactCache):
    251 252
                         raise ArtifactError("Failed to pull artifact {}: {}".format(
    
    252 253
                             element._get_brief_display_key(), e)) from e
    
    253 254
                     else:
    
    254
    -                    self.context.message(Message(
    
    255
    -                        None,
    
    256
    -                        MessageType.SKIPPED,
    
    257
    -                        "Remote ({}) does not have {} cached".format(
    
    258
    -                            remote.spec.url, element._get_brief_display_key())
    
    255
    +                    element.info("Remote ({}) does not have {} cached".format(
    
    256
    +                        remote.spec.url, element._get_brief_display_key()
    
    259 257
                         ))
    
    260 258
     
    
    261 259
             return False
    
    ... ... @@ -336,17 +334,15 @@ class CASCache(ArtifactCache):
    336 334
     
    
    337 335
             for remote in push_remotes:
    
    338 336
                 remote.init()
    
    339
    -
    
    340
    -            element.info("Pushing {} -> {}".format(element._get_brief_display_key(), remote.spec.url))
    
    337
    +            display_key = element._get_brief_display_key()
    
    338
    +            element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
    
    341 339
     
    
    342 340
                 if self._push_refs_to_remote(refs, remote):
    
    341
    +                element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
    
    343 342
                     pushed = True
    
    344 343
                 else:
    
    345
    -                self.context.message(Message(
    
    346
    -                    None,
    
    347
    -                    MessageType.SKIPPED,
    
    348
    -                    "Remote ({}) already has {} cached".format(
    
    349
    -                        remote.spec.url, element._get_brief_display_key())
    
    344
    +                element.info("Remote ({}) already has {} cached".format(
    
    345
    +                    remote.spec.url, element._get_brief_display_key()
    
    350 346
                     ))
    
    351 347
     
    
    352 348
             return pushed
    

  • buildstream/_exceptions.py
    ... ... @@ -312,3 +312,12 @@ class StreamError(BstError):
    312 312
     class AppError(BstError):
    
    313 313
         def __init__(self, message, detail=None, reason=None):
    
    314 314
             super().__init__(message, detail=detail, domain=ErrorDomain.APP, reason=reason)
    
    315
    +
    
    316
    +
    
    317
    +# SkipJob
    
    318
    +#
    
    319
    +# Raised from a child process within a job when the job should be
    
    320
    +# considered skipped by the parent process.
    
    321
    +#
    
    322
    +class SkipJob(Exception):
    
    323
    +    pass

  • buildstream/_scheduler/jobs/job.py
    ... ... @@ -31,7 +31,7 @@ import multiprocessing
    31 31
     import psutil
    
    32 32
     
    
    33 33
     # BuildStream toplevel imports
    
    34
    -from ..._exceptions import ImplError, BstError, set_last_task_error
    
    34
    +from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
    
    35 35
     from ..._message import Message, MessageType, unconditional_messages
    
    36 36
     from ... import _signals, utils
    
    37 37
     
    
    ... ... @@ -40,6 +40,7 @@ from ... import _signals, utils
    40 40
     RC_OK = 0
    
    41 41
     RC_FAIL = 1
    
    42 42
     RC_PERM_FAIL = 2
    
    43
    +RC_SKIPPED = 3
    
    43 44
     
    
    44 45
     
    
    45 46
     # Used to distinguish between status messages and return values
    
    ... ... @@ -117,7 +118,7 @@ class Job():
    117 118
             self._max_retries = max_retries        # Maximum number of automatic retries
    
    118 119
             self._result = None                    # Return value of child action in the parent
    
    119 120
             self._tries = 0                        # Try count, for retryable jobs
    
    120
    -
    
    121
    +        self._skipped_flag = False             # Indicate whether the job was skipped.
    
    121 122
             # If False, a retry will not be attempted regardless of whether _tries is less than _max_retries.
    
    122 123
             #
    
    123 124
             self._retry_flag = True
    
    ... ... @@ -277,6 +278,14 @@ class Job():
    277 278
         def set_task_id(self, task_id):
    
    278 279
             self._task_id = task_id
    
    279 280
     
    
    281
    +    # skipped
    
    282
    +    #
    
    283
    +    # Returns:
    
    284
    +    #    bool: True if the job was skipped while processing.
    
    285
    +    @property
    
    286
    +    def skipped(self):
    
    287
    +        return self._skipped_flag
    
    288
    +
    
    280 289
         #######################################################
    
    281 290
         #                  Abstract Methods                   #
    
    282 291
         #######################################################
    
    ... ... @@ -398,6 +407,13 @@ class Job():
    398 407
                 try:
    
    399 408
                     # Try the task action
    
    400 409
                     result = self.child_process()
    
    410
    +            except SkipJob as e:
    
    411
    +                elapsed = datetime.datetime.now() - starttime
    
    412
    +                self.message(MessageType.SKIPPED, str(e),
    
    413
    +                             elapsed=elapsed, logfile=filename)
    
    414
    +
    
    415
    +                # Alert parent of skip by return code
    
    416
    +                self._child_shutdown(RC_SKIPPED)
    
    401 417
                 except BstError as e:
    
    402 418
                     elapsed = datetime.datetime.now() - starttime
    
    403 419
                     self._retry_flag = e.temporary
    
    ... ... @@ -545,14 +561,18 @@ class Job():
    545 561
             # We don't want to retry if we got OK or a permanent fail.
    
    546 562
             # This is set in _child_action but must also be set for the parent.
    
    547 563
             #
    
    548
    -        self._retry_flag = returncode not in (RC_OK, RC_PERM_FAIL)
    
    564
    +        self._retry_flag = returncode == RC_FAIL
    
    565
    +
    
    566
    +        # Set the flag to alert Queue that this job skipped.
    
    567
    +        self._skipped_flag = returncode == RC_SKIPPED
    
    549 568
     
    
    550 569
             if self._retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated:
    
    551 570
                 self.spawn()
    
    552 571
                 return
    
    553 572
     
    
    554
    -        self.parent_complete(returncode == RC_OK, self._result)
    
    555
    -        self._scheduler.job_completed(self, returncode == RC_OK)
    
    573
    +        success = returncode in (RC_OK, RC_SKIPPED)
    
    574
    +        self.parent_complete(success, self._result)
    
    575
    +        self._scheduler.job_completed(self, success)
    
    556 576
     
    
    557 577
             # Force the deletion of the queue and process objects to try and clean up FDs
    
    558 578
             self._queue = self._process = None
    

  • buildstream/_scheduler/queues/buildqueue.py
    ... ... @@ -47,6 +47,7 @@ class BuildQueue(Queue):
    47 47
                     to_queue.append(element)
    
    48 48
                     continue
    
    49 49
     
    
    50
    +            # XXX: Fix this, See https://mail.gnome.org/archives/buildstream-list/2018-September/msg00029.html
    
    50 51
                 # Bypass queue processing entirely the first time it's tried.
    
    51 52
                 self._tried.add(element)
    
    52 53
                 _, description, detail = element._get_build_result()
    
    ... ... @@ -113,5 +114,3 @@ class BuildQueue(Queue):
    113 114
                 # This has to be done after _assemble_done, such that the
    
    114 115
                 # element may register its cache key as required
    
    115 116
                 self._check_cache_size(job, element, result)
    116
    -
    
    117
    -        return True

  • buildstream/_scheduler/queues/fetchqueue.py
    ... ... @@ -72,11 +72,9 @@ class FetchQueue(Queue):
    72 72
         def done(self, _, element, result, success):
    
    73 73
     
    
    74 74
             if not success:
    
    75
    -            return False
    
    75
    +            return
    
    76 76
     
    
    77 77
             element._update_state()
    
    78 78
     
    
    79 79
             # Successful fetch, we must be CACHED now
    
    80 80
             assert element._get_consistency() == Consistency.CACHED
    81
    -
    
    82
    -        return True

  • buildstream/_scheduler/queues/pullqueue.py
    ... ... @@ -21,6 +21,7 @@
    21 21
     # Local imports
    
    22 22
     from . import Queue, QueueStatus
    
    23 23
     from ..resources import ResourceType
    
    24
    +from ..._exceptions import SkipJob
    
    24 25
     
    
    25 26
     
    
    26 27
     # A queue which pulls element artifacts
    
    ... ... @@ -33,7 +34,8 @@ class PullQueue(Queue):
    33 34
     
    
    34 35
         def process(self, element):
    
    35 36
             # returns whether an artifact was downloaded or not
    
    36
    -        return element._pull()
    
    37
    +        if not element._pull():
    
    38
    +            raise SkipJob(self.action_name)
    
    37 39
     
    
    38 40
         def status(self, element):
    
    39 41
             # state of dependencies may have changed, recalculate element state
    
    ... ... @@ -63,7 +65,3 @@ class PullQueue(Queue):
    63 65
             # do not get an artifact size from pull jobs, we have to
    
    64 66
             # actually check the cache size.
    
    65 67
             self._scheduler.check_cache_size()
    66
    -
    
    67
    -        # Element._pull() returns True if it downloaded an artifact,
    
    68
    -        # here we want to appear skipped if we did not download.
    
    69
    -        return result

  • buildstream/_scheduler/queues/pushqueue.py
    ... ... @@ -21,6 +21,7 @@
    21 21
     # Local imports
    
    22 22
     from . import Queue, QueueStatus
    
    23 23
     from ..resources import ResourceType
    
    24
    +from ..._exceptions import SkipJob
    
    24 25
     
    
    25 26
     
    
    26 27
     # A queue which pushes element artifacts
    
    ... ... @@ -33,20 +34,11 @@ class PushQueue(Queue):
    33 34
     
    
    34 35
         def process(self, element):
    
    35 36
             # returns whether an artifact was uploaded or not
    
    36
    -        return element._push()
    
    37
    +        if not element._push():
    
    38
    +            raise SkipJob(self.action_name)
    
    37 39
     
    
    38 40
         def status(self, element):
    
    39 41
             if element._skip_push():
    
    40 42
                 return QueueStatus.SKIP
    
    41 43
     
    
    42 44
             return QueueStatus.READY
    43
    -
    
    44
    -    def done(self, _, element, result, success):
    
    45
    -
    
    46
    -        if not success:
    
    47
    -            return False
    
    48
    -
    
    49
    -        # Element._push() returns True if it uploaded an artifact,
    
    50
    -        # here we want to appear skipped if the remote already had
    
    51
    -        # the artifact.
    
    52
    -        return result

  • buildstream/_scheduler/queues/queue.py
    ... ... @@ -136,10 +136,6 @@ class Queue():
    136 136
         #    success (bool): True if the process() implementation did not
    
    137 137
         #                    raise any exception
    
    138 138
         #
    
    139
    -    # Returns:
    
    140
    -    #    (bool): True if the element should appear to be processsed,
    
    141
    -    #            Otherwise False will count the element as "skipped"
    
    142
    -    #
    
    143 139
         def done(self, job, element, result, success):
    
    144 140
             pass
    
    145 141
     
    
    ... ... @@ -306,8 +302,7 @@ class Queue():
    306 302
             # and determine if it should be considered as processed
    
    307 303
             # or skipped.
    
    308 304
             try:
    
    309
    -            processed = self.done(job, element, result, success)
    
    310
    -
    
    305
    +            self.done(job, element, result, success)
    
    311 306
             except BstError as e:
    
    312 307
     
    
    313 308
                 # Report error and mark as failed
    
    ... ... @@ -337,7 +332,7 @@ class Queue():
    337 332
                 self._done_queue.append(job)
    
    338 333
     
    
    339 334
                 if success:
    
    340
    -                if processed:
    
    335
    +                if not job.skipped:
    
    341 336
                         self.processed_elements.append(element)
    
    342 337
                     else:
    
    343 338
                         self.skipped_elements.append(element)
    

  • buildstream/_scheduler/queues/trackqueue.py
    ... ... @@ -51,18 +51,11 @@ class TrackQueue(Queue):
    51 51
         def done(self, _, element, result, success):
    
    52 52
     
    
    53 53
             if not success:
    
    54
    -            return False
    
    55
    -
    
    56
    -        changed = False
    
    54
    +            return
    
    57 55
     
    
    58 56
             # Set the new refs in the main process one by one as they complete
    
    59 57
             for unique_id, new_ref in result:
    
    60 58
                 source = _plugin_lookup(unique_id)
    
    61
    -            # We appear processed if at least one source has changed
    
    62
    -            if source._save_ref(new_ref):
    
    63
    -                changed = True
    
    59
    +            source._save_ref(new_ref)
    
    64 60
     
    
    65 61
             element._tracking_done()
    66
    -
    
    67
    -        # We'll appear as a skipped element if tracking resulted in no change
    
    68
    -        return changed

  • buildstream/element.py
    ... ... @@ -1760,8 +1760,6 @@ class Element(Plugin):
    1760 1760
                 return False
    
    1761 1761
     
    
    1762 1762
             # Notify successfull download
    
    1763
    -        display_key = self._get_brief_display_key()
    
    1764
    -        self.info("Downloaded artifact {}".format(display_key))
    
    1765 1763
             return True
    
    1766 1764
     
    
    1767 1765
         # _skip_push():
    
    ... ... @@ -1800,16 +1798,13 @@ class Element(Plugin):
    1800 1798
                 self.warn("Not pushing tainted artifact.")
    
    1801 1799
                 return False
    
    1802 1800
     
    
    1803
    -        display_key = self._get_brief_display_key()
    
    1804
    -        with self.timed_activity("Pushing artifact {}".format(display_key)):
    
    1805
    -            # Push all keys used for local commit
    
    1806
    -            pushed = self.__artifacts.push(self, self.__get_cache_keys_for_commit())
    
    1807
    -            if not pushed:
    
    1808
    -                return False
    
    1801
    +        # Push all keys used for local commit
    
    1802
    +        pushed = self.__artifacts.push(self, self.__get_cache_keys_for_commit())
    
    1803
    +        if not pushed:
    
    1804
    +            return False
    
    1809 1805
     
    
    1810
    -            # Notify successful upload
    
    1811
    -            self.info("Pushed artifact {}".format(display_key))
    
    1812
    -            return True
    
    1806
    +        # Notify successful upload
    
    1807
    +        return True
    
    1813 1808
     
    
    1814 1809
         # _shell():
    
    1815 1810
         #
    

  • tests/frontend/pull.py
    ... ... @@ -356,4 +356,5 @@ def test_pull_missing_notifies_user(caplog, cli, tmpdir, datafiles):
    356 356
             assert not result.get_pulled_elements(), \
    
    357 357
                 "No elements should have been pulled since the cache was empty"
    
    358 358
     
    
    359
    -        assert "SKIPPED Remote ({}) does not have".format(share.repo) in result.stderr
    359
    +        assert "INFO    Remote ({}) does not have".format(share.repo) in result.stderr
    
    360
    +        assert "SKIPPED Pull" in result.stderr

  • tests/frontend/push.py
    ... ... @@ -386,3 +386,26 @@ def test_push_cross_junction(cli, tmpdir, datafiles):
    386 386
     
    
    387 387
             cache_key = cli.get_element_key(project, 'junction.bst:import-etc.bst')
    
    388 388
             assert share.has_artifact('subtest', 'import-etc.bst', cache_key)
    
    389
    +
    
    390
    +
    
    391
    +@pytest.mark.datafiles(DATA_DIR)
    
    392
    +def test_push_already_cached(caplog, cli, tmpdir, datafiles):
    
    393
    +    project = os.path.join(datafiles.dirname, datafiles.basename)
    
    394
    +    caplog.set_level(1)
    
    395
    +
    
    396
    +    with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
    
    397
    +
    
    398
    +        cli.configure({
    
    399
    +            'artifacts': {'url': share.repo, 'push': True}
    
    400
    +        })
    
    401
    +        result = cli.run(project=project, args=['build', 'target.bst'])
    
    402
    +
    
    403
    +        result.assert_success()
    
    404
    +        assert "SKIPPED Push" not in result.stderr
    
    405
    +
    
    406
    +        result = cli.run(project=project, args=['push', 'target.bst'])
    
    407
    +
    
    408
    +        result.assert_success()
    
    409
    +        assert not result.get_pushed_elements(), "No elements should have been pushed since the cache was populated"
    
    410
    +        assert "INFO    Remote ({}) already has ".format(share.repo) in result.stderr
    
    411
    +        assert "SKIPPED Push" in result.stderr

  • tests/testutils/runcli.py
    ... ... @@ -178,7 +178,7 @@ class Result():
    178 178
             return list(pushed)
    
    179 179
     
    
    180 180
         def get_pulled_elements(self):
    
    181
    -        pulled = re.findall(r'\[\s*pull:(\S+)\s*\]\s*INFO\s*Downloaded artifact', self.stderr)
    
    181
    +        pulled = re.findall(r'\[\s*pull:(\S+)\s*\]\s*INFO\s*Pulled artifact', self.stderr)
    
    182 182
             if pulled is None:
    
    183 183
                 return []
    
    184 184
     
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]