[Notes] [Git][BuildStream/buildstream][bst-1.2] 6 commits: Rework Skipped usage



Title: GitLab

Tristan Van Berkom pushed to branch bst-1.2 at BuildStream / buildstream

Commits:

13 changed files:

Changes:

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -225,8 +225,8 @@ class CASCache(ArtifactCache):
    225 225
             for remote in self._remotes[project]:
    
    226 226
                 try:
    
    227 227
                     remote.init()
    
    228
    -
    
    229
    -                element.info("Pulling {} <- {}".format(element._get_brief_display_key(), remote.spec.url))
    
    228
    +                display_key = element._get_brief_display_key()
    
    229
    +                element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url))
    
    230 230
     
    
    231 231
                     request = buildstream_pb2.GetReferenceRequest()
    
    232 232
                     request.key = ref
    
    ... ... @@ -240,6 +240,7 @@ class CASCache(ArtifactCache):
    240 240
     
    
    241 241
                     self.set_ref(ref, tree)
    
    242 242
     
    
    243
    +                element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
    
    243 244
                     # no need to pull from additional remotes
    
    244 245
                     return True
    
    245 246
     
    
    ... ... @@ -248,11 +249,8 @@ class CASCache(ArtifactCache):
    248 249
                         raise ArtifactError("Failed to pull artifact {}: {}".format(
    
    249 250
                             element._get_brief_display_key(), e)) from e
    
    250 251
                     else:
    
    251
    -                    self.context.message(Message(
    
    252
    -                        None,
    
    253
    -                        MessageType.SKIPPED,
    
    254
    -                        "Remote ({}) does not have {} cached".format(
    
    255
    -                            remote.spec.url, element._get_brief_display_key())
    
    252
    +                    element.info("Remote ({}) does not have {} cached".format(
    
    253
    +                        remote.spec.url, element._get_brief_display_key()
    
    256 254
                         ))
    
    257 255
     
    
    258 256
             return False
    
    ... ... @@ -273,11 +271,11 @@ class CASCache(ArtifactCache):
    273 271
             push_remotes = [r for r in self._remotes[project] if r.spec.push]
    
    274 272
     
    
    275 273
             pushed = False
    
    276
    -
    
    274
    +        display_key = element._get_brief_display_key()
    
    277 275
             for remote in push_remotes:
    
    278 276
                 remote.init()
    
    279 277
                 skipped_remote = True
    
    280
    -            element.info("Pushing {} -> {}".format(element._get_brief_display_key(), remote.spec.url))
    
    278
    +            element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
    
    281 279
     
    
    282 280
                 try:
    
    283 281
                     for ref in refs:
    
    ... ... @@ -354,6 +352,9 @@ class CASCache(ArtifactCache):
    354 352
     
    
    355 353
                         pushed = True
    
    356 354
     
    
    355
    +                if not skipped_remote:
    
    356
    +                    element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
    
    357
    +
    
    357 358
                 except grpc.RpcError as e:
    
    358 359
                     if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
    
    359 360
                         raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
    
    ... ... @@ -361,7 +362,7 @@ class CASCache(ArtifactCache):
    361 362
                 if skipped_remote:
    
    362 363
                     self.context.message(Message(
    
    363 364
                         None,
    
    364
    -                    MessageType.SKIPPED,
    
    365
    +                    MessageType.INFO,
    
    365 366
                         "Remote ({}) already has {} cached".format(
    
    366 367
                             remote.spec.url, element._get_brief_display_key())
    
    367 368
                     ))
    

  • buildstream/_exceptions.py
    ... ... @@ -311,3 +311,12 @@ class StreamError(BstError):
    311 311
     class AppError(BstError):
    
    312 312
         def __init__(self, message, detail=None, reason=None):
    
    313 313
             super().__init__(message, detail=detail, domain=ErrorDomain.APP, reason=reason)
    
    314
    +
    
    315
    +
    
    316
    +# SkipJob
    
    317
    +#
    
    318
    +# Raised from a child process within a job when the job should be
    
    319
    +# considered skipped by the parent process.
    
    320
    +#
    
    321
    +class SkipJob(Exception):
    
    322
    +    pass

  • buildstream/_scheduler/jobs/job.py
    ... ... @@ -31,7 +31,7 @@ import multiprocessing
    31 31
     import psutil
    
    32 32
     
    
    33 33
     # BuildStream toplevel imports
    
    34
    -from ..._exceptions import ImplError, BstError, set_last_task_error
    
    34
    +from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
    
    35 35
     from ..._message import Message, MessageType, unconditional_messages
    
    36 36
     from ... import _signals, utils
    
    37 37
     
    
    ... ... @@ -40,6 +40,7 @@ from ... import _signals, utils
    40 40
     RC_OK = 0
    
    41 41
     RC_FAIL = 1
    
    42 42
     RC_PERM_FAIL = 2
    
    43
    +RC_SKIPPED = 3
    
    43 44
     
    
    44 45
     
    
    45 46
     # Used to distinguish between status messages and return values
    
    ... ... @@ -117,7 +118,7 @@ class Job():
    117 118
             self._max_retries = max_retries        # Maximum number of automatic retries
    
    118 119
             self._result = None                    # Return value of child action in the parent
    
    119 120
             self._tries = 0                        # Try count, for retryable jobs
    
    120
    -
    
    121
    +        self._skipped_flag = False             # Indicate whether the job was skipped.
    
    121 122
             # If False, a retry will not be attempted regardless of whether _tries is less than _max_retries.
    
    122 123
             #
    
    123 124
             self._retry_flag = True
    
    ... ... @@ -275,6 +276,14 @@ class Job():
    275 276
         def set_task_id(self, task_id):
    
    276 277
             self._task_id = task_id
    
    277 278
     
    
    279
    +    # skipped
    
    280
    +    #
    
    281
    +    # Returns:
    
    282
    +    #    bool: True if the job was skipped while processing.
    
    283
    +    @property
    
    284
    +    def skipped(self):
    
    285
    +        return self._skipped_flag
    
    286
    +
    
    278 287
         #######################################################
    
    279 288
         #                  Abstract Methods                   #
    
    280 289
         #######################################################
    
    ... ... @@ -396,6 +405,13 @@ class Job():
    396 405
                 try:
    
    397 406
                     # Try the task action
    
    398 407
                     result = self.child_process()
    
    408
    +            except SkipJob as e:
    
    409
    +                elapsed = datetime.datetime.now() - starttime
    
    410
    +                self.message(MessageType.SKIPPED, str(e),
    
    411
    +                             elapsed=elapsed, logfile=filename)
    
    412
    +
    
    413
    +                # Alert parent of skip by return code
    
    414
    +                self._child_shutdown(RC_SKIPPED)
    
    399 415
                 except BstError as e:
    
    400 416
                     elapsed = datetime.datetime.now() - starttime
    
    401 417
                     self._retry_flag = e.temporary
    
    ... ... @@ -543,14 +559,18 @@ class Job():
    543 559
             # We don't want to retry if we got OK or a permanent fail.
    
    544 560
             # This is set in _child_action but must also be set for the parent.
    
    545 561
             #
    
    546
    -        self._retry_flag = returncode not in (RC_OK, RC_PERM_FAIL)
    
    562
    +        self._retry_flag = returncode == RC_FAIL
    
    563
    +
    
    564
    +        # Set the flag to alert Queue that this job skipped.
    
    565
    +        self._skipped_flag = returncode == RC_SKIPPED
    
    547 566
     
    
    548 567
             if self._retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated:
    
    549 568
                 self.spawn()
    
    550 569
                 return
    
    551 570
     
    
    552
    -        self.parent_complete(returncode == RC_OK, self._result)
    
    553
    -        self._scheduler.job_completed(self, returncode == RC_OK)
    
    571
    +        success = returncode in (RC_OK, RC_SKIPPED)
    
    572
    +        self.parent_complete(success, self._result)
    
    573
    +        self._scheduler.job_completed(self, success)
    
    554 574
     
    
    555 575
         # _parent_process_envelope()
    
    556 576
         #
    

  • buildstream/_scheduler/queues/buildqueue.py
    ... ... @@ -77,5 +77,3 @@ class BuildQueue(Queue):
    77 77
                 # This has to be done after _assemble_done, such that the
    
    78 78
                 # element may register its cache key as required
    
    79 79
                 self._check_cache_size(job, element, result)
    80
    -
    
    81
    -        return True

  • buildstream/_scheduler/queues/fetchqueue.py
    ... ... @@ -70,11 +70,9 @@ class FetchQueue(Queue):
    70 70
         def done(self, _, element, result, success):
    
    71 71
     
    
    72 72
             if not success:
    
    73
    -            return False
    
    73
    +            return
    
    74 74
     
    
    75 75
             element._update_state()
    
    76 76
     
    
    77 77
             # Successful fetch, we must be CACHED now
    
    78 78
             assert element._get_consistency() == Consistency.CACHED
    79
    -
    
    80
    -        return True

  • buildstream/_scheduler/queues/pullqueue.py
    ... ... @@ -21,6 +21,7 @@
    21 21
     # Local imports
    
    22 22
     from . import Queue, QueueStatus
    
    23 23
     from ..resources import ResourceType
    
    24
    +from ..._exceptions import SkipJob
    
    24 25
     
    
    25 26
     
    
    26 27
     # A queue which pulls element artifacts
    
    ... ... @@ -33,7 +34,8 @@ class PullQueue(Queue):
    33 34
     
    
    34 35
         def process(self, element):
    
    35 36
             # returns whether an artifact was downloaded or not
    
    36
    -        return element._pull()
    
    37
    +        if not element._pull():
    
    38
    +            raise SkipJob(self.action_name)
    
    37 39
     
    
    38 40
         def status(self, element):
    
    39 41
             # state of dependencies may have changed, recalculate element state
    
    ... ... @@ -63,7 +65,3 @@ class PullQueue(Queue):
    63 65
             # do not get an artifact size from pull jobs, we have to
    
    64 66
             # actually check the cache size.
    
    65 67
             self._scheduler.check_cache_size()
    66
    -
    
    67
    -        # Element._pull() returns True if it downloaded an artifact,
    
    68
    -        # here we want to appear skipped if we did not download.
    
    69
    -        return result

  • buildstream/_scheduler/queues/pushqueue.py
    ... ... @@ -21,6 +21,7 @@
    21 21
     # Local imports
    
    22 22
     from . import Queue, QueueStatus
    
    23 23
     from ..resources import ResourceType
    
    24
    +from ..._exceptions import SkipJob
    
    24 25
     
    
    25 26
     
    
    26 27
     # A queue which pushes element artifacts
    
    ... ... @@ -33,20 +34,11 @@ class PushQueue(Queue):
    33 34
     
    
    34 35
         def process(self, element):
    
    35 36
             # returns whether an artifact was uploaded or not
    
    36
    -        return element._push()
    
    37
    +        if not element._push():
    
    38
    +            raise SkipJob(self.action_name)
    
    37 39
     
    
    38 40
         def status(self, element):
    
    39 41
             if element._skip_push():
    
    40 42
                 return QueueStatus.SKIP
    
    41 43
     
    
    42 44
             return QueueStatus.READY
    43
    -
    
    44
    -    def done(self, _, element, result, success):
    
    45
    -
    
    46
    -        if not success:
    
    47
    -            return False
    
    48
    -
    
    49
    -        # Element._push() returns True if it uploaded an artifact,
    
    50
    -        # here we want to appear skipped if the remote already had
    
    51
    -        # the artifact.
    
    52
    -        return result

  • buildstream/_scheduler/queues/queue.py
    ... ... @@ -136,10 +136,6 @@ class Queue():
    136 136
         #    success (bool): True if the process() implementation did not
    
    137 137
         #                    raise any exception
    
    138 138
         #
    
    139
    -    # Returns:
    
    140
    -    #    (bool): True if the element should appear to be processsed,
    
    141
    -    #            Otherwise False will count the element as "skipped"
    
    142
    -    #
    
    143 139
         def done(self, job, element, result, success):
    
    144 140
             pass
    
    145 141
     
    
    ... ... @@ -305,8 +301,7 @@ class Queue():
    305 301
             # and determine if it should be considered as processed
    
    306 302
             # or skipped.
    
    307 303
             try:
    
    308
    -            processed = self.done(job, element, result, success)
    
    309
    -
    
    304
    +            self.done(job, element, result, success)
    
    310 305
             except BstError as e:
    
    311 306
     
    
    312 307
                 # Report error and mark as failed
    
    ... ... @@ -335,7 +330,7 @@ class Queue():
    335 330
                 #
    
    336 331
                 if success:
    
    337 332
                     self._done_queue.append(job)
    
    338
    -                if processed:
    
    333
    +                if not job.skipped:
    
    339 334
                         self.processed_elements.append(element)
    
    340 335
                     else:
    
    341 336
                         self.skipped_elements.append(element)
    

  • buildstream/_scheduler/queues/trackqueue.py
    ... ... @@ -51,18 +51,11 @@ class TrackQueue(Queue):
    51 51
         def done(self, _, element, result, success):
    
    52 52
     
    
    53 53
             if not success:
    
    54
    -            return False
    
    55
    -
    
    56
    -        changed = False
    
    54
    +            return
    
    57 55
     
    
    58 56
             # Set the new refs in the main process one by one as they complete
    
    59 57
             for unique_id, new_ref in result:
    
    60 58
                 source = _plugin_lookup(unique_id)
    
    61
    -            # We appear processed if at least one source has changed
    
    62
    -            if source._save_ref(new_ref):
    
    63
    -                changed = True
    
    59
    +            source._save_ref(new_ref)
    
    64 60
     
    
    65 61
             element._tracking_done()
    66
    -
    
    67
    -        # We'll appear as a skipped element if tracking resulted in no change
    
    68
    -        return changed

  • buildstream/element.py
    ... ... @@ -1672,8 +1672,6 @@ class Element(Plugin):
    1672 1672
                 return False
    
    1673 1673
     
    
    1674 1674
             # Notify successfull download
    
    1675
    -        display_key = self._get_brief_display_key()
    
    1676
    -        self.info("Downloaded artifact {}".format(display_key))
    
    1677 1675
             return True
    
    1678 1676
     
    
    1679 1677
         # _skip_push():
    
    ... ... @@ -1712,16 +1710,13 @@ class Element(Plugin):
    1712 1710
                 self.warn("Not pushing tainted artifact.")
    
    1713 1711
                 return False
    
    1714 1712
     
    
    1715
    -        display_key = self._get_brief_display_key()
    
    1716
    -        with self.timed_activity("Pushing artifact {}".format(display_key)):
    
    1717
    -            # Push all keys used for local commit
    
    1718
    -            pushed = self.__artifacts.push(self, self.__get_cache_keys_for_commit())
    
    1719
    -            if not pushed:
    
    1720
    -                return False
    
    1713
    +        # Push all keys used for local commit
    
    1714
    +        pushed = self.__artifacts.push(self, self.__get_cache_keys_for_commit())
    
    1715
    +        if not pushed:
    
    1716
    +            return False
    
    1721 1717
     
    
    1722
    -            # Notify successful upload
    
    1723
    -            self.info("Pushed artifact {}".format(display_key))
    
    1724
    -            return True
    
    1718
    +        # Notify successful upload
    
    1719
    +        return True
    
    1725 1720
     
    
    1726 1721
         # _shell():
    
    1727 1722
         #
    

  • tests/frontend/pull.py
    ... ... @@ -356,4 +356,5 @@ def test_pull_missing_notifies_user(caplog, cli, tmpdir, datafiles):
    356 356
             assert not result.get_pulled_elements(), \
    
    357 357
                 "No elements should have been pulled since the cache was empty"
    
    358 358
     
    
    359
    -        assert "SKIPPED Remote ({}) does not have".format(share.repo) in result.stderr
    359
    +        assert "INFO    Remote ({}) does not have".format(share.repo) in result.stderr
    
    360
    +        assert "SKIPPED Pull" in result.stderr

  • tests/frontend/push.py
    ... ... @@ -362,3 +362,26 @@ def test_push_cross_junction(cli, tmpdir, datafiles):
    362 362
     
    
    363 363
             cache_key = cli.get_element_key(project, 'junction.bst:import-etc.bst')
    
    364 364
             assert share.has_artifact('subtest', 'import-etc.bst', cache_key)
    
    365
    +
    
    366
    +
    
    367
    +@pytest.mark.datafiles(DATA_DIR)
    
    368
    +def test_push_already_cached(caplog, cli, tmpdir, datafiles):
    
    369
    +    project = os.path.join(datafiles.dirname, datafiles.basename)
    
    370
    +    caplog.set_level(1)
    
    371
    +
    
    372
    +    with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
    
    373
    +
    
    374
    +        cli.configure({
    
    375
    +            'artifacts': {'url': share.repo, 'push': True}
    
    376
    +        })
    
    377
    +        result = cli.run(project=project, args=['build', 'target.bst'])
    
    378
    +
    
    379
    +        result.assert_success()
    
    380
    +        assert "SKIPPED Push" not in result.stderr
    
    381
    +
    
    382
    +        result = cli.run(project=project, args=['push', 'target.bst'])
    
    383
    +
    
    384
    +        result.assert_success()
    
    385
    +        assert not result.get_pushed_elements(), "No elements should have been pushed since the cache was populated"
    
    386
    +        assert "INFO    Remote ({}) already has ".format(share.repo) in result.stderr
    
    387
    +        assert "SKIPPED Push" in result.stderr

  • tests/testutils/runcli.py
    ... ... @@ -178,7 +178,7 @@ class Result():
    178 178
             return list(pushed)
    
    179 179
     
    
    180 180
         def get_pulled_elements(self):
    
    181
    -        pulled = re.findall(r'\[\s*pull:(\S+)\s*\]\s*INFO\s*Downloaded artifact', self.stderr)
    
    181
    +        pulled = re.findall(r'\[\s*pull:(\S+)\s*\]\s*INFO\s*Pulled artifact', self.stderr)
    
    182 182
             if pulled is None:
    
    183 183
                 return []
    
    184 184
     
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]