[Notes] [Git][BuildStream/buildstream][Qinusty/skipped-rework] 5 commits: Rework Skipped usage



Title: GitLab

Qinusty pushed to branch Qinusty/skipped-rework at BuildStream / buildstream

Commits:

13 changed files:

Changes:

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -228,7 +228,7 @@ class CASCache(ArtifactCache):
    228 228
                 try:
    
    229 229
                     remote.init()
    
    230 230
     
    
    231
    -                element.info("Pulling {} <- {}".format(element._get_brief_display_key(), remote.spec.url))
    
    231
    +                element.status("Pulling {} <- {}".format(element._get_brief_display_key(), remote.spec.url))
    
    232 232
     
    
    233 233
                     request = buildstream_pb2.GetReferenceRequest()
    
    234 234
                     request.key = ref
    
    ... ... @@ -250,11 +250,8 @@ class CASCache(ArtifactCache):
    250 250
                         raise ArtifactError("Failed to pull artifact {}: {}".format(
    
    251 251
                             element._get_brief_display_key(), e)) from e
    
    252 252
                     else:
    
    253
    -                    self.context.message(Message(
    
    254
    -                        None,
    
    255
    -                        MessageType.SKIPPED,
    
    256
    -                        "Remote ({}) does not have {} cached".format(
    
    257
    -                            remote.spec.url, element._get_brief_display_key())
    
    253
    +                    element.info("Remote ({}) does not have {} cached".format(
    
    254
    +                        remote.spec.url, element._get_brief_display_key()
    
    258 255
                         ))
    
    259 256
     
    
    260 257
             return False
    
    ... ... @@ -279,7 +276,7 @@ class CASCache(ArtifactCache):
    279 276
             for remote in push_remotes:
    
    280 277
                 remote.init()
    
    281 278
                 skipped_remote = True
    
    282
    -            element.info("Pushing {} -> {}".format(element._get_brief_display_key(), remote.spec.url))
    
    279
    +            element.status("Pushing {} -> {}".format(element._get_brief_display_key(), remote.spec.url))
    
    283 280
     
    
    284 281
                 try:
    
    285 282
                     for ref in refs:
    
    ... ... @@ -361,11 +358,8 @@ class CASCache(ArtifactCache):
    361 358
                         raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
    
    362 359
     
    
    363 360
                 if skipped_remote:
    
    364
    -                self.context.message(Message(
    
    365
    -                    None,
    
    366
    -                    MessageType.SKIPPED,
    
    367
    -                    "Remote ({}) already has {} cached".format(
    
    368
    -                        remote.spec.url, element._get_brief_display_key())
    
    361
    +                element.info("Remote ({}) already has {} cached".format(
    
    362
    +                    remote.spec.url, element._get_brief_display_key()
    
    369 363
                     ))
    
    370 364
             return pushed
    
    371 365
     
    

  • buildstream/_exceptions.py
    ... ... @@ -309,3 +309,17 @@ class StreamError(BstError):
    309 309
     class AppError(BstError):
    
    310 310
         def __init__(self, message, detail=None, reason=None):
    
    311 311
             super().__init__(message, detail=detail, domain=ErrorDomain.APP, reason=reason)
    
    312
    +
    
    313
    +
    
    314
    +# SkipJob
    
    315
    +#
    
    316
    +# Raised from a child process within a job when the job should be
    
    317
    +# considered skipped by the parent process.
    
    318
    +#
    
    319
    +class SkipJob(Exception):
    
    320
    +    def __init__(self, *, detail=""):
    
    321
    +        super().__init__()
    
    322
    +        self._detail = detail
    
    323
    +
    
    324
    +    def __str__(self):
    
    325
    +        return self._detail

  • buildstream/_scheduler/jobs/job.py
    ... ... @@ -31,7 +31,7 @@ import multiprocessing
    31 31
     import psutil
    
    32 32
     
    
    33 33
     # BuildStream toplevel imports
    
    34
    -from ..._exceptions import ImplError, BstError, set_last_task_error
    
    34
    +from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
    
    35 35
     from ..._message import Message, MessageType, unconditional_messages
    
    36 36
     from ... import _signals, utils
    
    37 37
     
    
    ... ... @@ -40,6 +40,7 @@ from ... import _signals, utils
    40 40
     RC_OK = 0
    
    41 41
     RC_FAIL = 1
    
    42 42
     RC_PERM_FAIL = 2
    
    43
    +RC_SKIPPED = 3
    
    43 44
     
    
    44 45
     
    
    45 46
     # Used to distinguish between status messages and return values
    
    ... ... @@ -117,7 +118,7 @@ class Job():
    117 118
             self._max_retries = max_retries        # Maximum number of automatic retries
    
    118 119
             self._result = None                    # Return value of child action in the parent
    
    119 120
             self._tries = 0                        # Try count, for retryable jobs
    
    120
    -
    
    121
    +        self._skipped_flag = False             # Indicate whether the job was skipped.
    
    121 122
             # If False, a retry will not be attempted regardless of whether _tries is less than _max_retries.
    
    122 123
             #
    
    123 124
             self._retry_flag = True
    
    ... ... @@ -275,6 +276,10 @@ class Job():
    275 276
         def set_task_id(self, task_id):
    
    276 277
             self._task_id = task_id
    
    277 278
     
    
    279
    +    @property
    
    280
    +    def skipped(self):
    
    281
    +        return self._skipped_flag
    
    282
    +
    
    278 283
         #######################################################
    
    279 284
         #                  Abstract Methods                   #
    
    280 285
         #######################################################
    
    ... ... @@ -396,6 +401,13 @@ class Job():
    396 401
                 try:
    
    397 402
                     # Try the task action
    
    398 403
                     result = self.child_process()
    
    404
    +            except SkipJob as e:
    
    405
    +                elapsed = datetime.datetime.now() - starttime
    
    406
    +                self.message(MessageType.SKIPPED, str(e),
    
    407
    +                             elapsed=elapsed, logfile=filename)
    
    408
    +
    
    409
    +                # Alert parent of skip by return code
    
    410
    +                self._child_shutdown(RC_SKIPPED)
    
    399 411
                 except BstError as e:
    
    400 412
                     elapsed = datetime.datetime.now() - starttime
    
    401 413
                     self._retry_flag = e.temporary
    
    ... ... @@ -441,6 +453,7 @@ class Job():
    441 453
                     self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed,
    
    442 454
                                  logfile=filename)
    
    443 455
     
    
    456
    +                # XXX Verify below.
    
    444 457
                     # Shutdown needs to stay outside of the above context manager,
    
    445 458
                     # make sure we dont try to handle SIGTERM while the process
    
    446 459
                     # is already busy in sys.exit()
    
    ... ... @@ -547,14 +560,18 @@ class Job():
    547 560
             # We don't want to retry if we got OK or a permanent fail.
    
    548 561
             # This is set in _child_action but must also be set for the parent.
    
    549 562
             #
    
    550
    -        self._retry_flag = returncode not in (RC_OK, RC_PERM_FAIL)
    
    563
    +        self._retry_flag = (returncode == RC_FAIL)
    
    564
    +
    
    565
    +        # Set the flag to alert Queue that this job skipped.
    
    566
    +        self._skipped_flag = (returncode == RC_SKIPPED)
    
    551 567
     
    
    552 568
             if self._retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated:
    
    553 569
                 self.spawn()
    
    554 570
                 return
    
    555 571
     
    
    556
    -        self.parent_complete(returncode == RC_OK, self._result)
    
    557
    -        self._scheduler.job_completed(self, returncode == RC_OK)
    
    572
    +        success = (returncode in (RC_OK, RC_SKIPPED))
    
    573
    +        self.parent_complete(success, self._result)
    
    574
    +        self._scheduler.job_completed(self, success)
    
    558 575
     
    
    559 576
         # _parent_process_envelope()
    
    560 577
         #
    

  • buildstream/_scheduler/queues/buildqueue.py
    ... ... @@ -54,12 +54,7 @@ class BuildQueue(Queue):
    54 54
                               detail=detail, action_name=self.action_name,
    
    55 55
                               elapsed=timedelta(seconds=0),
    
    56 56
                               logfile=logfile)
    
    57
    -            job = ElementJob(self._scheduler, self.action_name,
    
    58
    -                             logfile, element=element, queue=self,
    
    59
    -                             resources=self.resources,
    
    60
    -                             action_cb=self.process,
    
    61
    -                             complete_cb=self._job_done,
    
    62
    -                             max_retries=self._max_retries)
    
    57
    +            job = self._create_job(element)
    
    63 58
                 self._done_queue.append(job)
    
    64 59
                 self.failed_elements.append(element)
    
    65 60
                 self._scheduler._job_complete_callback(job, False)
    
    ... ... @@ -109,5 +104,3 @@ class BuildQueue(Queue):
    109 104
             # This has to be done after _assemble_done, such that the
    
    110 105
             # element may register its cache key as required
    
    111 106
             self._check_cache_size(job, element)
    112
    -
    
    113
    -        return True

  • buildstream/_scheduler/queues/fetchqueue.py
    ... ... @@ -70,13 +70,8 @@ class FetchQueue(Queue):
    70 70
             return QueueStatus.READY
    
    71 71
     
    
    72 72
         def done(self, _, element, result, success):
    
    73
    +        if success:
    
    74
    +            element._update_state()
    
    73 75
     
    
    74
    -        if not success:
    
    75
    -            return False
    
    76
    -
    
    77
    -        element._update_state()
    
    78
    -
    
    79
    -        # Successful fetch, we must be CACHED now
    
    80
    -        assert element._get_consistency() == Consistency.CACHED
    
    81
    -
    
    82
    -        return True
    76
    +            # Successful fetch, we must be CACHED now
    
    77
    +            assert element._get_consistency() == Consistency.CACHED

  • buildstream/_scheduler/queues/pullqueue.py
    ... ... @@ -21,6 +21,7 @@
    21 21
     # Local imports
    
    22 22
     from . import Queue, QueueStatus
    
    23 23
     from ..resources import ResourceType
    
    24
    +from ..._exceptions import SkipJob
    
    24 25
     
    
    25 26
     
    
    26 27
     # A queue which pulls element artifacts
    
    ... ... @@ -33,7 +34,12 @@ class PullQueue(Queue):
    33 34
     
    
    34 35
         def process(self, element):
    
    35 36
             # returns whether an artifact was downloaded or not
    
    36
    -        return element._pull()
    
    37
    +        pulled = element._pull()
    
    38
    +
    
    39
    +        if not pulled:
    
    40
    +            raise SkipJob(detail=self.action_name)
    
    41
    +
    
    42
    +        return pulled
    
    37 43
     
    
    38 44
         def status(self, element):
    
    39 45
             # state of dependencies may have changed, recalculate element state
    
    ... ... @@ -53,17 +59,10 @@ class PullQueue(Queue):
    53 59
                 return QueueStatus.SKIP
    
    54 60
     
    
    55 61
         def done(self, _, element, result, success):
    
    62
    +        if success:
    
    63
    +            element._pull_done()
    
    56 64
     
    
    57
    -        if not success:
    
    58
    -            return False
    
    59
    -
    
    60
    -        element._pull_done()
    
    61
    -
    
    62
    -        # Build jobs will check the "approximate" size first. Since we
    
    63
    -        # do not get an artifact size from pull jobs, we have to
    
    64
    -        # actually check the cache size.
    
    65
    -        self._scheduler._check_cache_size_real()
    
    66
    -
    
    67
    -        # Element._pull() returns True if it downloaded an artifact,
    
    68
    -        # here we want to appear skipped if we did not download.
    
    69
    -        return result
    65
    +            # Build jobs will check the "approximate" size first. Since we
    
    66
    +            # do not get an artifact size from pull jobs, we have to
    
    67
    +            # actually check the cache size.
    
    68
    +            self._scheduler._check_cache_size_real()

  • buildstream/_scheduler/queues/pushqueue.py
    ... ... @@ -21,6 +21,7 @@
    21 21
     # Local imports
    
    22 22
     from . import Queue, QueueStatus
    
    23 23
     from ..resources import ResourceType
    
    24
    +from ..._exceptions import SkipJob
    
    24 25
     
    
    25 26
     
    
    26 27
     # A queue which pushes element artifacts
    
    ... ... @@ -33,20 +34,15 @@ class PushQueue(Queue):
    33 34
     
    
    34 35
         def process(self, element):
    
    35 36
             # returns whether an artifact was uploaded or not
    
    36
    -        return element._push()
    
    37
    +        pushed = element._push()
    
    38
    +
    
    39
    +        if not pushed:
    
    40
    +            raise SkipJob(detail=self.action_name)
    
    41
    +
    
    42
    +        return pushed
    
    37 43
     
    
    38 44
         def status(self, element):
    
    39 45
             if element._skip_push():
    
    40 46
                 return QueueStatus.SKIP
    
    41 47
     
    
    42 48
             return QueueStatus.READY
    43
    -
    
    44
    -    def done(self, _, element, result, success):
    
    45
    -
    
    46
    -        if not success:
    
    47
    -            return False
    
    48
    -
    
    49
    -        # Element._push() returns True if it uploaded an artifact,
    
    50
    -        # here we want to appear skipped if the remote already had
    
    51
    -        # the artifact.
    
    52
    -        return result

  • buildstream/_scheduler/queues/queue.py
    ... ... @@ -136,10 +136,6 @@ class Queue():
    136 136
         #    success (bool): True if the process() implementation did not
    
    137 137
         #                    raise any exception
    
    138 138
         #
    
    139
    -    # Returns:
    
    140
    -    #    (bool): True if the element should appear to be processsed,
    
    141
    -    #            Otherwise False will count the element as "skipped"
    
    142
    -    #
    
    143 139
         def done(self, job, element, result, success):
    
    144 140
             pass
    
    145 141
     
    
    ... ... @@ -158,20 +154,8 @@ class Queue():
    158 154
             if not elts:
    
    159 155
                 return
    
    160 156
     
    
    161
    -        # Note: The internal lists work with jobs. This is not
    
    162
    -        #       reflected in any external methods (except
    
    163
    -        #       pop/peek_ready_jobs).
    
    164
    -        def create_job(element):
    
    165
    -            logfile = self._element_log_path(element)
    
    166
    -            return ElementJob(self._scheduler, self.action_name,
    
    167
    -                              logfile, element=element, queue=self,
    
    168
    -                              resources=self.resources,
    
    169
    -                              action_cb=self.process,
    
    170
    -                              complete_cb=self._job_done,
    
    171
    -                              max_retries=self._max_retries)
    
    172
    -
    
    173 157
             # Place skipped elements directly on the done queue
    
    174
    -        jobs = [create_job(elt) for elt in elts]
    
    158
    +        jobs = [self._create_job(elt) for elt in elts]
    
    175 159
             skip = [job for job in jobs if self.status(job.element) == QueueStatus.SKIP]
    
    176 160
             wait = [job for job in jobs if job not in skip]
    
    177 161
     
    
    ... ... @@ -308,8 +292,7 @@ class Queue():
    308 292
             # and determine if it should be considered as processed
    
    309 293
             # or skipped.
    
    310 294
             try:
    
    311
    -            processed = self.done(job, element, result, success)
    
    312
    -
    
    295
    +            self.done(job, element, result, success)
    
    313 296
             except BstError as e:
    
    314 297
     
    
    315 298
                 # Report error and mark as failed
    
    ... ... @@ -339,7 +322,7 @@ class Queue():
    339 322
                 self._done_queue.append(job)
    
    340 323
     
    
    341 324
                 if success:
    
    342
    -                if processed:
    
    325
    +                if not job.skipped:
    
    343 326
                         self.processed_elements.append(element)
    
    344 327
                     else:
    
    345 328
                         self.skipped_elements.append(element)
    
    ... ... @@ -360,3 +343,15 @@ class Queue():
    360 343
             logfile = "{key}-{action}".format(key=key, action=action)
    
    361 344
     
    
    362 345
             return os.path.join(project.name, element.normal_name, logfile)
    
    346
    +
    
    347
    +    # Note: The internal lists work with jobs. This is not
    
    348
    +    #       reflected in any external methods (except
    
    349
    +    #       pop/peek_ready_jobs).
    
    350
    +    def _create_job(self, element):
    
    351
    +        logfile = self._element_log_path(element)
    
    352
    +        return ElementJob(self._scheduler, self.action_name,
    
    353
    +                          logfile, element=element, queue=self,
    
    354
    +                          resources=self.resources,
    
    355
    +                          action_cb=self.process,
    
    356
    +                          complete_cb=self._job_done,
    
    357
    +                          max_retries=self._max_retries)

  • buildstream/_scheduler/queues/trackqueue.py
    ... ... @@ -49,20 +49,10 @@ class TrackQueue(Queue):
    49 49
             return QueueStatus.READY
    
    50 50
     
    
    51 51
         def done(self, _, element, result, success):
    
    52
    +        if success:
    
    53
    +            # Set the new refs in the main process one by one as they complete
    
    54
    +            for unique_id, new_ref in result:
    
    55
    +                source = _plugin_lookup(unique_id)
    
    56
    +                source._save_ref(new_ref)
    
    52 57
     
    
    53
    -        if not success:
    
    54
    -            return False
    
    55
    -
    
    56
    -        changed = False
    
    57
    -
    
    58
    -        # Set the new refs in the main process one by one as they complete
    
    59
    -        for unique_id, new_ref in result:
    
    60
    -            source = _plugin_lookup(unique_id)
    
    61
    -            # We appear processed if at least one source has changed
    
    62
    -            if source._save_ref(new_ref):
    
    63
    -                changed = True
    
    64
    -
    
    65
    -        element._tracking_done()
    
    66
    -
    
    67
    -        # We'll appear as a skipped element if tracking resulted in no change
    
    68
    -        return changed
    58
    +            element._tracking_done()

  • buildstream/element.py
    ... ... @@ -1746,7 +1746,7 @@ class Element(Plugin):
    1746 1746
     
    
    1747 1747
             # Notify successfull download
    
    1748 1748
             display_key = self._get_brief_display_key()
    
    1749
    -        self.info("Downloaded artifact {}".format(display_key))
    
    1749
    +        self.info("Pulled artifact {}".format(display_key))
    
    1750 1750
             return True
    
    1751 1751
     
    
    1752 1752
         # _skip_push():
    
    ... ... @@ -1785,16 +1785,15 @@ class Element(Plugin):
    1785 1785
                 self.warn("Not pushing tainted artifact.")
    
    1786 1786
                 return False
    
    1787 1787
     
    
    1788
    -        display_key = self._get_brief_display_key()
    
    1789
    -        with self.timed_activity("Pushing artifact {}".format(display_key)):
    
    1790
    -            # Push all keys used for local commit
    
    1791
    -            pushed = self.__artifacts.push(self, self.__get_cache_keys_for_commit())
    
    1792
    -            if not pushed:
    
    1793
    -                return False
    
    1788
    +        # Push all keys used for local commit
    
    1789
    +        pushed = self.__artifacts.push(self, self.__get_cache_keys_for_commit())
    
    1790
    +        if not pushed:
    
    1791
    +            return False
    
    1794 1792
     
    
    1795
    -            # Notify successful upload
    
    1796
    -            self.info("Pushed artifact {}".format(display_key))
    
    1797
    -            return True
    
    1793
    +        # Notify successful upload
    
    1794
    +        display_key = self._get_brief_display_key()
    
    1795
    +        self.info("Pushed artifact {}".format(display_key))
    
    1796
    +        return True
    
    1798 1797
     
    
    1799 1798
         # _shell():
    
    1800 1799
         #
    

  • tests/frontend/pull.py
    ... ... @@ -356,4 +356,5 @@ def test_pull_missing_notifies_user(caplog, cli, tmpdir, datafiles):
    356 356
             assert not result.get_pulled_elements(), \
    
    357 357
                 "No elements should have been pulled since the cache was empty"
    
    358 358
     
    
    359
    -        assert "SKIPPED Remote ({}) does not have".format(share.repo) in result.stderr
    359
    +        assert "INFO    Remote ({}) does not have".format(share.repo) in result.stderr
    
    360
    +        assert "SKIPPED Pull" in result.stderr

  • tests/frontend/push.py
    ... ... @@ -386,3 +386,26 @@ def test_push_cross_junction(cli, tmpdir, datafiles):
    386 386
     
    
    387 387
             cache_key = cli.get_element_key(project, 'junction.bst:import-etc.bst')
    
    388 388
             assert share.has_artifact('subtest', 'import-etc.bst', cache_key)
    
    389
    +
    
    390
    +
    
    391
    +@pytest.mark.datafiles(DATA_DIR)
    
    392
    +def test_push_already_cached(caplog, cli, tmpdir, datafiles):
    
    393
    +    project = os.path.join(datafiles.dirname, datafiles.basename)
    
    394
    +    caplog.set_level(1)
    
    395
    +
    
    396
    +    with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
    
    397
    +
    
    398
    +        cli.configure({
    
    399
    +            'artifacts': {'url': share.repo, 'push': True}
    
    400
    +        })
    
    401
    +        result = cli.run(project=project, args=['build', 'target.bst'])
    
    402
    +
    
    403
    +        result.assert_success()
    
    404
    +        assert "SKIPPED Push" not in result.stderr
    
    405
    +
    
    406
    +        result = cli.run(project=project, args=['push', 'target.bst'])
    
    407
    +
    
    408
    +        result.assert_success()
    
    409
    +        assert not result.get_pushed_elements(), "No elements should have been pushed since the cache was populated"
    
    410
    +        assert "INFO    Remote ({}) already has ".format(share.repo) in result.stderr
    
    411
    +        assert "SKIPPED Push" in result.stderr

  • tests/testutils/runcli.py
    ... ... @@ -178,7 +178,7 @@ class Result():
    178 178
             return list(pushed)
    
    179 179
     
    
    180 180
         def get_pulled_elements(self):
    
    181
    -        pulled = re.findall(r'\[\s*pull:(\S+)\s*\]\s*INFO\s*Downloaded artifact', self.stderr)
    
    181
    +        pulled = re.findall(r'\[\s*pull:(\S+)\s*\]\s*INFO\s*Pulled artifact', self.stderr)
    
    182 182
             if pulled is None:
    
    183 183
                 return []
    
    184 184
     
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]