Chandan Singh pushed to branch chandan/fix-checkout-none-1.2 at BuildStream / buildstream
Commits:
- 
092038d2
by Tiago Gomes at 2018-09-26T08:16:33Z
- 
075ffefa
by Valentin David at 2018-09-28T09:23:19Z
- 
621bd236
by Tiago Gomes at 2018-09-28T09:52:15Z
- 
34e81ae1
by Tiago Gomes at 2018-09-30T06:33:34Z
- 
b842658c
by Tiago Gomes at 2018-09-30T06:33:46Z
- 
e3ff069e
by Tiago Gomes at 2018-09-30T06:33:49Z
- 
9bca9183
by Tiago Gomes at 2018-09-30T06:34:21Z
- 
2d025076
by Jürg Billeter at 2018-09-30T06:39:57Z
- 
01831afe
by Jürg Billeter at 2018-09-30T06:41:13Z
- 
1b7245da
by Jürg Billeter at 2018-09-30T06:41:18Z
- 
fd46a9f9
by Jürg Billeter at 2018-09-30T06:41:25Z
- 
764b7517
by Jürg Billeter at 2018-10-01T15:42:08Z
- 
a009dcbe
by Tristan Van Berkom at 2018-10-02T11:51:19Z
- 
7da5104b
by Tristan Van Berkom at 2018-10-02T13:18:54Z
- 
6e820362
by Tristan Van Berkom at 2018-10-03T07:35:03Z
- 
9568824f
by Jim MacArthur at 2018-10-03T07:35:51Z
- 
4a67e4e3
by Jürg Billeter at 2018-10-03T07:35:51Z
- 
f585b233
by Jürg Billeter at 2018-10-03T07:35:51Z
- 
244e3c7c
by Tristan Van Berkom at 2018-10-03T08:05:47Z
- 
3f4587ab
by Tristan Van Berkom at 2018-10-03T09:36:34Z
- 
a33fd160
by Tristan Van Berkom at 2018-10-03T10:00:50Z
- 
e80f435a
by Tristan Van Berkom at 2018-10-03T12:06:46Z
- 
013a8ad4
by Tristan Van Berkom at 2018-10-03T12:37:24Z
- 
262e789f
by Tristan Van Berkom at 2018-10-03T13:03:52Z
- 
10d69988
by Tristan Van Berkom at 2018-10-03T13:03:56Z
- 
c02a1ae8
by Tristan Van Berkom at 2018-10-03T13:04:03Z
- 
eb92e8e9
by Tristan Van Berkom at 2018-10-03T13:44:31Z
- 
26d48cc9
by Valentin David at 2018-10-04T14:55:13Z
- 
96f09d48
by Valentin David at 2018-10-04T15:16:46Z
- 
10abe77f
by Tristan Van Berkom at 2018-10-05T07:01:57Z
- 
ed4a2641
by Chandan Singh at 2018-10-17T00:28:25Z
18 changed files:
- .gitlab-ci.yml
- NEWS
- buildstream/_artifactcache/artifactcache.py
- buildstream/_artifactcache/cascache.py
- buildstream/_artifactcache/casserver.py
- buildstream/_scheduler/jobs/job.py
- buildstream/_scheduler/queues/buildqueue.py
- buildstream/_scheduler/queues/queue.py
- buildstream/_scheduler/scheduler.py
- buildstream/_yaml.py
- buildstream/element.py
- buildstream/plugins/sources/git.py
- buildstream/source.py
- buildstream/utils.py
- tests/format/project.py
- tests/frontend/buildcheckout.py
- + tests/frontend/project/elements/checkout-deps.bst
- + tests/frontend/project/files/etc-files/etc/buildstream/config
Changes:
| ... | ... | @@ -168,7 +168,7 @@ docs: | 
| 168 | 168 |    variables:
 | 
| 169 | 169 |      bst_ext_url: git+https://gitlab.com/BuildStream/bst-external.git
 | 
| 170 | 170 |      bst_ext_ref: 1d6ab71151b93c8cbc0a91a36ffe9270f3b835f1 # 0.5.1
 | 
| 171 | -    fd_sdk_ref: 718ea88089644a1ea5b488de0b90c2c565cb75f8 # 18.08.12
 | |
| 171 | +    fd_sdk_ref: 88d7c22c2281b987faa02edd57df80d430eecf1f # 18.08.12
 | |
| 172 | 172 |    before_script:
 | 
| 173 | 173 |    - (cd dist && ./unpack.sh && cd buildstream && pip3 install .)
 | 
| 174 | 174 |    - pip3 install --user -e ${bst_ext_url}@${bst_ext_ref}#egg=bst_ext
 | 
| 1 | +=================
 | |
| 2 | +buildstream 1.2.3
 | |
| 3 | +=================
 | |
| 4 | + | |
| 5 | + o Fixed an unhandled exception when cleaning up a build sandbox (#153)
 | |
| 6 | + | |
| 7 | + o Fixed race condition when calculating cache size and commiting artifacts
 | |
| 8 | + | |
| 9 | + o Fixed regression where terminating with `^C` results in a double user interrogation (#693)
 | |
| 10 | + | |
| 11 | + o Fixed regression in summary when builds are terminated (#479)
 | |
| 12 | + | |
| 13 | + o Fixed regression where irrelevant status messages appear from git sources
 | |
| 14 | + | |
| 15 | + o Improve performance of artifact uploads by batching file transfers (#676/#677)
 | |
| 16 | + | |
| 17 | + o Fixed performance of artifact downloads by batching file transfers (#554)
 | |
| 18 | + | |
| 19 | + o Fixed checks for paths which escape the project directory (#673)
 | |
| 20 | + | |
| 1 | 21 |  =================
 | 
| 2 | 22 |  buildstream 1.2.2
 | 
| 3 | 23 |  =================
 | 
| ... | ... | @@ -277,7 +277,7 @@ class ArtifactCache(): | 
| 277 | 277 |                            "Please increase the cache-quota in {}."
 | 
| 278 | 278 |                            .format(self.context.config_origin or default_conf))
 | 
| 279 | 279 |  | 
| 280 | -                if self.get_quota_exceeded():
 | |
| 280 | +                if self.has_quota_exceeded():
 | |
| 281 | 281 |                      raise ArtifactError("Cache too full. Aborting.",
 | 
| 282 | 282 |                                          detail=detail,
 | 
| 283 | 283 |                                          reason="cache-too-full")
 | 
| ... | ... | @@ -364,14 +364,14 @@ class ArtifactCache(): | 
| 364 | 364 |          self._cache_size = cache_size
 | 
| 365 | 365 |          self._write_cache_size(self._cache_size)
 | 
| 366 | 366 |  | 
| 367 | -    # get_quota_exceeded()
 | |
| 367 | +    # has_quota_exceeded()
 | |
| 368 | 368 |      #
 | 
| 369 | 369 |      # Checks if the current artifact cache size exceeds the quota.
 | 
| 370 | 370 |      #
 | 
| 371 | 371 |      # Returns:
 | 
| 372 | 372 |      #    (bool): True of the quota is exceeded
 | 
| 373 | 373 |      #
 | 
| 374 | -    def get_quota_exceeded(self):
 | |
| 374 | +    def has_quota_exceeded(self):
 | |
| 375 | 375 |          return self.get_cache_size() > self._cache_quota
 | 
| 376 | 376 |  | 
| 377 | 377 |      ################################################
 | 
| ... | ... | @@ -43,6 +43,11 @@ from .._exceptions import ArtifactError | 
| 43 | 43 |  from . import ArtifactCache
 | 
| 44 | 44 |  | 
| 45 | 45 |  | 
| 46 | +# The default limit for gRPC messages is 4 MiB.
 | |
| 47 | +# Limit payload to 1 MiB to leave sufficient headroom for metadata.
 | |
| 48 | +_MAX_PAYLOAD_BYTES = 1024 * 1024
 | |
| 49 | + | |
| 50 | + | |
| 46 | 51 |  # A CASCache manages artifacts in a CAS repository as specified in the
 | 
| 47 | 52 |  # Remote Execution API.
 | 
| 48 | 53 |  #
 | 
| ... | ... | @@ -76,6 +81,7 @@ class CASCache(ArtifactCache): | 
| 76 | 81 |      ################################################
 | 
| 77 | 82 |      #     Implementation of abstract methods       #
 | 
| 78 | 83 |      ################################################
 | 
| 84 | + | |
| 79 | 85 |      def contains(self, element, key):
 | 
| 80 | 86 |          refpath = self._refpath(self.get_artifact_fullname(element, key))
 | 
| 81 | 87 |  | 
| ... | ... | @@ -115,7 +121,7 @@ class CASCache(ArtifactCache): | 
| 115 | 121 |      def commit(self, element, content, keys):
 | 
| 116 | 122 |          refs = [self.get_artifact_fullname(element, key) for key in keys]
 | 
| 117 | 123 |  | 
| 118 | -        tree = self._create_tree(content)
 | |
| 124 | +        tree = self._commit_directory(content)
 | |
| 119 | 125 |  | 
| 120 | 126 |          for ref in refs:
 | 
| 121 | 127 |              self.set_ref(ref, tree)
 | 
| ... | ... | @@ -151,6 +157,7 @@ class CASCache(ArtifactCache): | 
| 151 | 157 |          q = multiprocessing.Queue()
 | 
| 152 | 158 |          for remote_spec in remote_specs:
 | 
| 153 | 159 |              # Use subprocess to avoid creation of gRPC threads in main BuildStream process
 | 
| 160 | +            # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
 | |
| 154 | 161 |              p = multiprocessing.Process(target=self._initialize_remote, args=(remote_spec, q))
 | 
| 155 | 162 |  | 
| 156 | 163 |              try:
 | 
| ... | ... | @@ -263,109 +270,69 @@ class CASCache(ArtifactCache): | 
| 263 | 270 |  | 
| 264 | 271 |          self.set_ref(newref, tree)
 | 
| 265 | 272 |  | 
| 273 | +    def _push_refs_to_remote(self, refs, remote):
 | |
| 274 | +        skipped_remote = True
 | |
| 275 | +        try:
 | |
| 276 | +            for ref in refs:
 | |
| 277 | +                tree = self.resolve_ref(ref)
 | |
| 278 | + | |
| 279 | +                # Check whether ref is already on the server in which case
 | |
| 280 | +                # there is no need to push the artifact
 | |
| 281 | +                try:
 | |
| 282 | +                    request = buildstream_pb2.GetReferenceRequest()
 | |
| 283 | +                    request.key = ref
 | |
| 284 | +                    response = remote.ref_storage.GetReference(request)
 | |
| 285 | + | |
| 286 | +                    if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
 | |
| 287 | +                        # ref is already on the server with the same tree
 | |
| 288 | +                        continue
 | |
| 289 | + | |
| 290 | +                except grpc.RpcError as e:
 | |
| 291 | +                    if e.code() != grpc.StatusCode.NOT_FOUND:
 | |
| 292 | +                        # Intentionally re-raise RpcError for outer except block.
 | |
| 293 | +                        raise
 | |
| 294 | + | |
| 295 | +                self._send_directory(remote, tree)
 | |
| 296 | + | |
| 297 | +                request = buildstream_pb2.UpdateReferenceRequest()
 | |
| 298 | +                request.keys.append(ref)
 | |
| 299 | +                request.digest.hash = tree.hash
 | |
| 300 | +                request.digest.size_bytes = tree.size_bytes
 | |
| 301 | +                remote.ref_storage.UpdateReference(request)
 | |
| 302 | + | |
| 303 | +                skipped_remote = False
 | |
| 304 | +        except grpc.RpcError as e:
 | |
| 305 | +            if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
 | |
| 306 | +                raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
 | |
| 307 | + | |
| 308 | +        return not skipped_remote
 | |
| 309 | + | |
| 266 | 310 |      def push(self, element, keys):
 | 
| 267 | -        refs = [self.get_artifact_fullname(element, key) for key in keys]
 | |
| 311 | + | |
| 312 | +        refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
 | |
| 268 | 313 |  | 
| 269 | 314 |          project = element._get_project()
 | 
| 270 | 315 |  | 
| 271 | 316 |          push_remotes = [r for r in self._remotes[project] if r.spec.push]
 | 
| 272 | 317 |  | 
| 273 | 318 |          pushed = False
 | 
| 274 | -        display_key = element._get_brief_display_key()
 | |
| 319 | + | |
| 275 | 320 |          for remote in push_remotes:
 | 
| 276 | 321 |              remote.init()
 | 
| 277 | -            skipped_remote = True
 | |
| 322 | +            display_key = element._get_brief_display_key()
 | |
| 278 | 323 |              element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
 | 
| 279 | 324 |  | 
| 280 | -            try:
 | |
| 281 | -                for ref in refs:
 | |
| 282 | -                    tree = self.resolve_ref(ref)
 | |
| 283 | - | |
| 284 | -                    # Check whether ref is already on the server in which case
 | |
| 285 | -                    # there is no need to push the artifact
 | |
| 286 | -                    try:
 | |
| 287 | -                        request = buildstream_pb2.GetReferenceRequest()
 | |
| 288 | -                        request.key = ref
 | |
| 289 | -                        response = remote.ref_storage.GetReference(request)
 | |
| 290 | - | |
| 291 | -                        if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
 | |
| 292 | -                            # ref is already on the server with the same tree
 | |
| 293 | -                            continue
 | |
| 294 | - | |
| 295 | -                    except grpc.RpcError as e:
 | |
| 296 | -                        if e.code() != grpc.StatusCode.NOT_FOUND:
 | |
| 297 | -                            # Intentionally re-raise RpcError for outer except block.
 | |
| 298 | -                            raise
 | |
| 299 | - | |
| 300 | -                    missing_blobs = {}
 | |
| 301 | -                    required_blobs = self._required_blobs(tree)
 | |
| 302 | - | |
| 303 | -                    # Limit size of FindMissingBlobs request
 | |
| 304 | -                    for required_blobs_group in _grouper(required_blobs, 512):
 | |
| 305 | -                        request = remote_execution_pb2.FindMissingBlobsRequest()
 | |
| 306 | - | |
| 307 | -                        for required_digest in required_blobs_group:
 | |
| 308 | -                            d = request.blob_digests.add()
 | |
| 309 | -                            d.hash = required_digest.hash
 | |
| 310 | -                            d.size_bytes = required_digest.size_bytes
 | |
| 311 | - | |
| 312 | -                        response = remote.cas.FindMissingBlobs(request)
 | |
| 313 | -                        for digest in response.missing_blob_digests:
 | |
| 314 | -                            d = remote_execution_pb2.Digest()
 | |
| 315 | -                            d.hash = digest.hash
 | |
| 316 | -                            d.size_bytes = digest.size_bytes
 | |
| 317 | -                            missing_blobs[d.hash] = d
 | |
| 318 | - | |
| 319 | -                    # Upload any blobs missing on the server
 | |
| 320 | -                    skipped_remote = False
 | |
| 321 | -                    for digest in missing_blobs.values():
 | |
| 322 | -                        uuid_ = uuid.uuid4()
 | |
| 323 | -                        resource_name = '/'.join(['uploads', str(uuid_), 'blobs',
 | |
| 324 | -                                                  digest.hash, str(digest.size_bytes)])
 | |
| 325 | - | |
| 326 | -                        def request_stream(resname):
 | |
| 327 | -                            with open(self.objpath(digest), 'rb') as f:
 | |
| 328 | -                                assert os.fstat(f.fileno()).st_size == digest.size_bytes
 | |
| 329 | -                                offset = 0
 | |
| 330 | -                                finished = False
 | |
| 331 | -                                remaining = digest.size_bytes
 | |
| 332 | -                                while not finished:
 | |
| 333 | -                                    chunk_size = min(remaining, 64 * 1024)
 | |
| 334 | -                                    remaining -= chunk_size
 | |
| 335 | - | |
| 336 | -                                    request = bytestream_pb2.WriteRequest()
 | |
| 337 | -                                    request.write_offset = offset
 | |
| 338 | -                                    # max. 64 kB chunks
 | |
| 339 | -                                    request.data = f.read(chunk_size)
 | |
| 340 | -                                    request.resource_name = resname
 | |
| 341 | -                                    request.finish_write = remaining <= 0
 | |
| 342 | -                                    yield request
 | |
| 343 | -                                    offset += chunk_size
 | |
| 344 | -                                    finished = request.finish_write
 | |
| 345 | -                        response = remote.bytestream.Write(request_stream(resource_name))
 | |
| 346 | - | |
| 347 | -                    request = buildstream_pb2.UpdateReferenceRequest()
 | |
| 348 | -                    request.keys.append(ref)
 | |
| 349 | -                    request.digest.hash = tree.hash
 | |
| 350 | -                    request.digest.size_bytes = tree.size_bytes
 | |
| 351 | -                    remote.ref_storage.UpdateReference(request)
 | |
| 352 | - | |
| 353 | -                    pushed = True
 | |
| 354 | - | |
| 355 | -                if not skipped_remote:
 | |
| 356 | -                    element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
 | |
| 357 | - | |
| 358 | -            except grpc.RpcError as e:
 | |
| 359 | -                if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
 | |
| 360 | -                    raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
 | |
| 361 | - | |
| 362 | -            if skipped_remote:
 | |
| 325 | +            if self._push_refs_to_remote(refs, remote):
 | |
| 326 | +                element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
 | |
| 327 | +                pushed = True
 | |
| 328 | +            else:
 | |
| 363 | 329 |                  self.context.message(Message(
 | 
| 364 | 330 |                      None,
 | 
| 365 | 331 |                      MessageType.INFO,
 | 
| 366 | 332 |                      "Remote ({}) already has {} cached".format(
 | 
| 367 | 333 |                          remote.spec.url, element._get_brief_display_key())
 | 
| 368 | 334 |                  ))
 | 
| 335 | + | |
| 369 | 336 |          return pushed
 | 
| 370 | 337 |  | 
| 371 | 338 |      ################################################
 | 
| ... | ... | @@ -451,7 +418,7 @@ class CASCache(ArtifactCache): | 
| 451 | 418 |      def set_ref(self, ref, tree):
 | 
| 452 | 419 |          refpath = self._refpath(ref)
 | 
| 453 | 420 |          os.makedirs(os.path.dirname(refpath), exist_ok=True)
 | 
| 454 | -        with utils.save_file_atomic(refpath, 'wb') as f:
 | |
| 421 | +        with utils.save_file_atomic(refpath, 'wb', tempdir=self.tmpdir) as f:
 | |
| 455 | 422 |              f.write(tree.SerializeToString())
 | 
| 456 | 423 |  | 
| 457 | 424 |      # resolve_ref():
 | 
| ... | ... | @@ -594,6 +561,7 @@ class CASCache(ArtifactCache): | 
| 594 | 561 |      ################################################
 | 
| 595 | 562 |      #             Local Private Methods            #
 | 
| 596 | 563 |      ################################################
 | 
| 564 | + | |
| 597 | 565 |      def _checkout(self, dest, tree):
 | 
| 598 | 566 |          os.makedirs(dest, exist_ok=True)
 | 
| 599 | 567 |  | 
| ... | ... | @@ -623,7 +591,21 @@ class CASCache(ArtifactCache): | 
| 623 | 591 |      def _refpath(self, ref):
 | 
| 624 | 592 |          return os.path.join(self.casdir, 'refs', 'heads', ref)
 | 
| 625 | 593 |  | 
| 626 | -    def _create_tree(self, path, *, digest=None):
 | |
| 594 | +    # _commit_directory():
 | |
| 595 | +    #
 | |
| 596 | +    # Adds local directory to content addressable store.
 | |
| 597 | +    #
 | |
| 598 | +    # Adds files, symbolic links and recursively other directories in
 | |
| 599 | +    # a local directory to the content addressable store.
 | |
| 600 | +    #
 | |
| 601 | +    # Args:
 | |
| 602 | +    #     path (str): Path to the directory to add.
 | |
| 603 | +    #     dir_digest (Digest): An optional Digest object to use.
 | |
| 604 | +    #
 | |
| 605 | +    # Returns:
 | |
| 606 | +    #     (Digest): Digest object for the directory added.
 | |
| 607 | +    #
 | |
| 608 | +    def _commit_directory(self, path, *, dir_digest=None):
 | |
| 627 | 609 |          directory = remote_execution_pb2.Directory()
 | 
| 628 | 610 |  | 
| 629 | 611 |          for name in sorted(os.listdir(path)):
 | 
| ... | ... | @@ -632,7 +614,7 @@ class CASCache(ArtifactCache): | 
| 632 | 614 |              if stat.S_ISDIR(mode):
 | 
| 633 | 615 |                  dirnode = directory.directories.add()
 | 
| 634 | 616 |                  dirnode.name = name
 | 
| 635 | -                self._create_tree(full_path, digest=dirnode.digest)
 | |
| 617 | +                self._commit_directory(full_path, dir_digest=dirnode.digest)
 | |
| 636 | 618 |              elif stat.S_ISREG(mode):
 | 
| 637 | 619 |                  filenode = directory.files.add()
 | 
| 638 | 620 |                  filenode.name = name
 | 
| ... | ... | @@ -645,7 +627,8 @@ class CASCache(ArtifactCache): | 
| 645 | 627 |              else:
 | 
| 646 | 628 |                  raise ArtifactError("Unsupported file type for {}".format(full_path))
 | 
| 647 | 629 |  | 
| 648 | -        return self.add_object(digest=digest, buffer=directory.SerializeToString())
 | |
| 630 | +        return self.add_object(digest=dir_digest,
 | |
| 631 | +                               buffer=directory.SerializeToString())
 | |
| 649 | 632 |  | 
| 650 | 633 |      def _get_subdir(self, tree, subdir):
 | 
| 651 | 634 |          head, name = os.path.split(subdir)
 | 
| ... | ... | @@ -756,16 +739,16 @@ class CASCache(ArtifactCache): | 
| 756 | 739 |              #
 | 
| 757 | 740 |              q.put(str(e))
 | 
| 758 | 741 |  | 
| 759 | -    def _required_blobs(self, tree):
 | |
| 742 | +    def _required_blobs(self, directory_digest):
 | |
| 760 | 743 |          # parse directory, and recursively add blobs
 | 
| 761 | 744 |          d = remote_execution_pb2.Digest()
 | 
| 762 | -        d.hash = tree.hash
 | |
| 763 | -        d.size_bytes = tree.size_bytes
 | |
| 745 | +        d.hash = directory_digest.hash
 | |
| 746 | +        d.size_bytes = directory_digest.size_bytes
 | |
| 764 | 747 |          yield d
 | 
| 765 | 748 |  | 
| 766 | 749 |          directory = remote_execution_pb2.Directory()
 | 
| 767 | 750 |  | 
| 768 | -        with open(self.objpath(tree), 'rb') as f:
 | |
| 751 | +        with open(self.objpath(directory_digest), 'rb') as f:
 | |
| 769 | 752 |              directory.ParseFromString(f.read())
 | 
| 770 | 753 |  | 
| 771 | 754 |          for filenode in directory.files:
 | 
| ... | ... | @@ -777,50 +760,203 @@ class CASCache(ArtifactCache): | 
| 777 | 760 |          for dirnode in directory.directories:
 | 
| 778 | 761 |              yield from self._required_blobs(dirnode.digest)
 | 
| 779 | 762 |  | 
| 780 | -    def _fetch_blob(self, remote, digest, out):
 | |
| 763 | +    def _fetch_blob(self, remote, digest, stream):
 | |
| 781 | 764 |          resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
 | 
| 782 | 765 |          request = bytestream_pb2.ReadRequest()
 | 
| 783 | 766 |          request.resource_name = resource_name
 | 
| 784 | 767 |          request.read_offset = 0
 | 
| 785 | 768 |          for response in remote.bytestream.Read(request):
 | 
| 786 | -            out.write(response.data)
 | |
| 769 | +            stream.write(response.data)
 | |
| 770 | +        stream.flush()
 | |
| 787 | 771 |  | 
| 788 | -        out.flush()
 | |
| 789 | -        assert digest.size_bytes == os.fstat(out.fileno()).st_size
 | |
| 772 | +        assert digest.size_bytes == os.fstat(stream.fileno()).st_size
 | |
| 790 | 773 |  | 
| 791 | -    def _fetch_directory(self, remote, tree):
 | |
| 792 | -        objpath = self.objpath(tree)
 | |
| 774 | +    # _ensure_blob():
 | |
| 775 | +    #
 | |
| 776 | +    # Fetch and add blob if it's not already local.
 | |
| 777 | +    #
 | |
| 778 | +    # Args:
 | |
| 779 | +    #     remote (Remote): The remote to use.
 | |
| 780 | +    #     digest (Digest): Digest object for the blob to fetch.
 | |
| 781 | +    #
 | |
| 782 | +    # Returns:
 | |
| 783 | +    #     (str): The path of the object
 | |
| 784 | +    #
 | |
| 785 | +    def _ensure_blob(self, remote, digest):
 | |
| 786 | +        objpath = self.objpath(digest)
 | |
| 793 | 787 |          if os.path.exists(objpath):
 | 
| 794 | -            # already in local cache
 | |
| 795 | -            return
 | |
| 788 | +            # already in local repository
 | |
| 789 | +            return objpath
 | |
| 796 | 790 |  | 
| 797 | -        with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
 | |
| 798 | -            self._fetch_blob(remote, tree, out)
 | |
| 791 | +        with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
 | |
| 792 | +            self._fetch_blob(remote, digest, f)
 | |
| 799 | 793 |  | 
| 800 | -            directory = remote_execution_pb2.Directory()
 | |
| 794 | +            added_digest = self.add_object(path=f.name)
 | |
| 795 | +            assert added_digest.hash == digest.hash
 | |
| 801 | 796 |  | 
| 802 | -            with open(out.name, 'rb') as f:
 | |
| 803 | -                directory.ParseFromString(f.read())
 | |
| 797 | +        return objpath
 | |
| 804 | 798 |  | 
| 805 | -            for filenode in directory.files:
 | |
| 806 | -                fileobjpath = self.objpath(tree)
 | |
| 807 | -                if os.path.exists(fileobjpath):
 | |
| 808 | -                    # already in local cache
 | |
| 809 | -                    continue
 | |
| 799 | +    def _batch_download_complete(self, batch):
 | |
| 800 | +        for digest, data in batch.send():
 | |
| 801 | +            with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
 | |
| 802 | +                f.write(data)
 | |
| 803 | +                f.flush()
 | |
| 804 | + | |
| 805 | +                added_digest = self.add_object(path=f.name)
 | |
| 806 | +                assert added_digest.hash == digest.hash
 | |
| 807 | + | |
| 808 | +    # Helper function for _fetch_directory().
 | |
| 809 | +    def _fetch_directory_batch(self, remote, batch, fetch_queue, fetch_next_queue):
 | |
| 810 | +        self._batch_download_complete(batch)
 | |
| 811 | + | |
| 812 | +        # All previously scheduled directories are now locally available,
 | |
| 813 | +        # move them to the processing queue.
 | |
| 814 | +        fetch_queue.extend(fetch_next_queue)
 | |
| 815 | +        fetch_next_queue.clear()
 | |
| 816 | +        return _CASBatchRead(remote)
 | |
| 817 | + | |
| 818 | +    # Helper function for _fetch_directory().
 | |
| 819 | +    def _fetch_directory_node(self, remote, digest, batch, fetch_queue, fetch_next_queue, *, recursive=False):
 | |
| 820 | +        in_local_cache = os.path.exists(self.objpath(digest))
 | |
| 810 | 821 |  | 
| 811 | -                with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
 | |
| 812 | -                    self._fetch_blob(remote, filenode.digest, f)
 | |
| 822 | +        if in_local_cache:
 | |
| 823 | +            # Skip download, already in local cache.
 | |
| 824 | +            pass
 | |
| 825 | +        elif (digest.size_bytes >= remote.max_batch_total_size_bytes or
 | |
| 826 | +                not remote.batch_read_supported):
 | |
| 827 | +            # Too large for batch request, download in independent request.
 | |
| 828 | +            self._ensure_blob(remote, digest)
 | |
| 829 | +            in_local_cache = True
 | |
| 830 | +        else:
 | |
| 831 | +            if not batch.add(digest):
 | |
| 832 | +                # Not enough space left in batch request.
 | |
| 833 | +                # Complete pending batch first.
 | |
| 834 | +                batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
 | |
| 835 | +                batch.add(digest)
 | |
| 836 | + | |
| 837 | +        if recursive:
 | |
| 838 | +            if in_local_cache:
 | |
| 839 | +                # Add directory to processing queue.
 | |
| 840 | +                fetch_queue.append(digest)
 | |
| 841 | +            else:
 | |
| 842 | +                # Directory will be available after completing pending batch.
 | |
| 843 | +                # Add directory to deferred processing queue.
 | |
| 844 | +                fetch_next_queue.append(digest)
 | |
| 845 | + | |
| 846 | +        return batch
 | |
| 847 | + | |
| 848 | +    # _fetch_directory():
 | |
| 849 | +    #
 | |
| 850 | +    # Fetches remote directory and adds it to content addressable store.
 | |
| 851 | +    #
 | |
| 852 | +    # Fetches files, symbolic links and recursively other directories in
 | |
| 853 | +    # the remote directory and adds them to the content addressable
 | |
| 854 | +    # store.
 | |
| 855 | +    #
 | |
| 856 | +    # Args:
 | |
| 857 | +    #     remote (Remote): The remote to use.
 | |
| 858 | +    #     dir_digest (Digest): Digest object for the directory to fetch.
 | |
| 859 | +    #
 | |
| 860 | +    def _fetch_directory(self, remote, dir_digest):
 | |
| 861 | +        fetch_queue = [dir_digest]
 | |
| 862 | +        fetch_next_queue = []
 | |
| 863 | +        batch = _CASBatchRead(remote)
 | |
| 864 | + | |
| 865 | +        while len(fetch_queue) + len(fetch_next_queue) > 0:
 | |
| 866 | +            if len(fetch_queue) == 0:
 | |
| 867 | +                batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
 | |
| 868 | + | |
| 869 | +            dir_digest = fetch_queue.pop(0)
 | |
| 813 | 870 |  | 
| 814 | -                    digest = self.add_object(path=f.name)
 | |
| 815 | -                    assert digest.hash == filenode.digest.hash
 | |
| 871 | +            objpath = self._ensure_blob(remote, dir_digest)
 | |
| 872 | + | |
| 873 | +            directory = remote_execution_pb2.Directory()
 | |
| 874 | +            with open(objpath, 'rb') as f:
 | |
| 875 | +                directory.ParseFromString(f.read())
 | |
| 816 | 876 |  | 
| 817 | 877 |              for dirnode in directory.directories:
 | 
| 818 | -                self._fetch_directory(remote, dirnode.digest)
 | |
| 878 | +                batch = self._fetch_directory_node(remote, dirnode.digest, batch,
 | |
| 879 | +                                                   fetch_queue, fetch_next_queue, recursive=True)
 | |
| 880 | + | |
| 881 | +            for filenode in directory.files:
 | |
| 882 | +                batch = self._fetch_directory_node(remote, filenode.digest, batch,
 | |
| 883 | +                                                   fetch_queue, fetch_next_queue)
 | |
| 884 | + | |
| 885 | +        # Fetch final batch
 | |
| 886 | +        self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
 | |
| 887 | + | |
| 888 | +    def _send_blob(self, remote, digest, stream, u_uid=uuid.uuid4()):
 | |
| 889 | +        resource_name = '/'.join(['uploads', str(u_uid), 'blobs',
 | |
| 890 | +                                  digest.hash, str(digest.size_bytes)])
 | |
| 891 | + | |
| 892 | +        def request_stream(resname, instream):
 | |
| 893 | +            offset = 0
 | |
| 894 | +            finished = False
 | |
| 895 | +            remaining = digest.size_bytes
 | |
| 896 | +            while not finished:
 | |
| 897 | +                chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
 | |
| 898 | +                remaining -= chunk_size
 | |
| 899 | + | |
| 900 | +                request = bytestream_pb2.WriteRequest()
 | |
| 901 | +                request.write_offset = offset
 | |
| 902 | +                # max. _MAX_PAYLOAD_BYTES chunks
 | |
| 903 | +                request.data = instream.read(chunk_size)
 | |
| 904 | +                request.resource_name = resname
 | |
| 905 | +                request.finish_write = remaining <= 0
 | |
| 906 | + | |
| 907 | +                yield request
 | |
| 908 | + | |
| 909 | +                offset += chunk_size
 | |
| 910 | +                finished = request.finish_write
 | |
| 911 | + | |
| 912 | +        response = remote.bytestream.Write(request_stream(resource_name, stream))
 | |
| 913 | + | |
| 914 | +        assert response.committed_size == digest.size_bytes
 | |
| 915 | + | |
| 916 | +    def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
 | |
| 917 | +        required_blobs = self._required_blobs(digest)
 | |
| 918 | + | |
| 919 | +        missing_blobs = dict()
 | |
| 920 | +        # Limit size of FindMissingBlobs request
 | |
| 921 | +        for required_blobs_group in _grouper(required_blobs, 512):
 | |
| 922 | +            request = remote_execution_pb2.FindMissingBlobsRequest()
 | |
| 923 | + | |
| 924 | +            for required_digest in required_blobs_group:
 | |
| 925 | +                d = request.blob_digests.add()
 | |
| 926 | +                d.hash = required_digest.hash
 | |
| 927 | +                d.size_bytes = required_digest.size_bytes
 | |
| 928 | + | |
| 929 | +            response = remote.cas.FindMissingBlobs(request)
 | |
| 930 | +            for missing_digest in response.missing_blob_digests:
 | |
| 931 | +                d = remote_execution_pb2.Digest()
 | |
| 932 | +                d.hash = missing_digest.hash
 | |
| 933 | +                d.size_bytes = missing_digest.size_bytes
 | |
| 934 | +                missing_blobs[d.hash] = d
 | |
| 935 | + | |
| 936 | +        # Upload any blobs missing on the server
 | |
| 937 | +        self._send_blobs(remote, missing_blobs.values(), u_uid)
 | |
| 938 | + | |
| 939 | +    def _send_blobs(self, remote, digests, u_uid=uuid.uuid4()):
 | |
| 940 | +        batch = _CASBatchUpdate(remote)
 | |
| 941 | + | |
| 942 | +        for digest in digests:
 | |
| 943 | +            with open(self.objpath(digest), 'rb') as f:
 | |
| 944 | +                assert os.fstat(f.fileno()).st_size == digest.size_bytes
 | |
| 945 | + | |
| 946 | +                if (digest.size_bytes >= remote.max_batch_total_size_bytes or
 | |
| 947 | +                        not remote.batch_update_supported):
 | |
| 948 | +                    # Too large for batch request, upload in independent request.
 | |
| 949 | +                    self._send_blob(remote, digest, f, u_uid=u_uid)
 | |
| 950 | +                else:
 | |
| 951 | +                    if not batch.add(digest, f):
 | |
| 952 | +                        # Not enough space left in batch request.
 | |
| 953 | +                        # Complete pending batch first.
 | |
| 954 | +                        batch.send()
 | |
| 955 | +                        batch = _CASBatchUpdate(remote)
 | |
| 956 | +                        batch.add(digest, f)
 | |
| 819 | 957 |  | 
| 820 | -            # place directory blob only in final location when we've downloaded
 | |
| 821 | -            # all referenced blobs to avoid dangling references in the repository
 | |
| 822 | -            digest = self.add_object(path=out.name)
 | |
| 823 | -            assert digest.hash == tree.hash
 | |
| 958 | +        # Send final batch
 | |
| 959 | +        batch.send()
 | |
| 824 | 960 |  | 
| 825 | 961 |  | 
| 826 | 962 |  # Represents a single remote CAS cache.
 | 
| ... | ... | @@ -870,11 +1006,129 @@ class _CASRemote(): | 
| 870 | 1006 |  | 
| 871 | 1007 |              self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel)
 | 
| 872 | 1008 |              self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
 | 
| 1009 | +            self.capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self.channel)
 | |
| 873 | 1010 |              self.ref_storage = buildstream_pb2_grpc.ReferenceStorageStub(self.channel)
 | 
| 874 | 1011 |  | 
| 1012 | +            self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
 | |
| 1013 | +            try:
 | |
| 1014 | +                request = remote_execution_pb2.GetCapabilitiesRequest()
 | |
| 1015 | +                response = self.capabilities.GetCapabilities(request)
 | |
| 1016 | +                server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes
 | |
| 1017 | +                if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes:
 | |
| 1018 | +                    self.max_batch_total_size_bytes = server_max_batch_total_size_bytes
 | |
| 1019 | +            except grpc.RpcError as e:
 | |
| 1020 | +                # Simply use the defaults for servers that don't implement GetCapabilities()
 | |
| 1021 | +                if e.code() != grpc.StatusCode.UNIMPLEMENTED:
 | |
| 1022 | +                    raise
 | |
| 1023 | + | |
| 1024 | +            # Check whether the server supports BatchReadBlobs()
 | |
| 1025 | +            self.batch_read_supported = False
 | |
| 1026 | +            try:
 | |
| 1027 | +                request = remote_execution_pb2.BatchReadBlobsRequest()
 | |
| 1028 | +                response = self.cas.BatchReadBlobs(request)
 | |
| 1029 | +                self.batch_read_supported = True
 | |
| 1030 | +            except grpc.RpcError as e:
 | |
| 1031 | +                if e.code() != grpc.StatusCode.UNIMPLEMENTED:
 | |
| 1032 | +                    raise
 | |
| 1033 | + | |
| 1034 | +            # Check whether the server supports BatchUpdateBlobs()
 | |
| 1035 | +            self.batch_update_supported = False
 | |
| 1036 | +            try:
 | |
| 1037 | +                request = remote_execution_pb2.BatchUpdateBlobsRequest()
 | |
| 1038 | +                response = self.cas.BatchUpdateBlobs(request)
 | |
| 1039 | +                self.batch_update_supported = True
 | |
| 1040 | +            except grpc.RpcError as e:
 | |
| 1041 | +                if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
 | |
| 1042 | +                        e.code() != grpc.StatusCode.PERMISSION_DENIED):
 | |
| 1043 | +                    raise
 | |
| 1044 | + | |
| 875 | 1045 |              self._initialized = True
 | 
| 876 | 1046 |  | 
| 877 | 1047 |  | 
| 1048 | +# Represents a batch of blobs queued for fetching.
 | |
| 1049 | +#
 | |
| 1050 | +class _CASBatchRead():
 | |
| 1051 | +    def __init__(self, remote):
 | |
| 1052 | +        self._remote = remote
 | |
| 1053 | +        self._max_total_size_bytes = remote.max_batch_total_size_bytes
 | |
| 1054 | +        self._request = remote_execution_pb2.BatchReadBlobsRequest()
 | |
| 1055 | +        self._size = 0
 | |
| 1056 | +        self._sent = False
 | |
| 1057 | + | |
| 1058 | +    def add(self, digest):
 | |
| 1059 | +        assert not self._sent
 | |
| 1060 | + | |
| 1061 | +        new_batch_size = self._size + digest.size_bytes
 | |
| 1062 | +        if new_batch_size > self._max_total_size_bytes:
 | |
| 1063 | +            # Not enough space left in current batch
 | |
| 1064 | +            return False
 | |
| 1065 | + | |
| 1066 | +        request_digest = self._request.digests.add()
 | |
| 1067 | +        request_digest.hash = digest.hash
 | |
| 1068 | +        request_digest.size_bytes = digest.size_bytes
 | |
| 1069 | +        self._size = new_batch_size
 | |
| 1070 | +        return True
 | |
| 1071 | + | |
| 1072 | +    def send(self):
 | |
| 1073 | +        assert not self._sent
 | |
| 1074 | +        self._sent = True
 | |
| 1075 | + | |
| 1076 | +        if len(self._request.digests) == 0:
 | |
| 1077 | +            return
 | |
| 1078 | + | |
| 1079 | +        batch_response = self._remote.cas.BatchReadBlobs(self._request)
 | |
| 1080 | + | |
| 1081 | +        for response in batch_response.responses:
 | |
| 1082 | +            if response.status.code != grpc.StatusCode.OK.value[0]:
 | |
| 1083 | +                raise ArtifactError("Failed to download blob {}: {}".format(
 | |
| 1084 | +                    response.digest.hash, response.status.code))
 | |
| 1085 | +            if response.digest.size_bytes != len(response.data):
 | |
| 1086 | +                raise ArtifactError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
 | |
| 1087 | +                    response.digest.hash, response.digest.size_bytes, len(response.data)))
 | |
| 1088 | + | |
| 1089 | +            yield (response.digest, response.data)
 | |
| 1090 | + | |
| 1091 | + | |
| 1092 | +# Represents a batch of blobs queued for upload.
 | |
| 1093 | +#
 | |
| 1094 | +class _CASBatchUpdate():
 | |
| 1095 | +    def __init__(self, remote):
 | |
| 1096 | +        self._remote = remote
 | |
| 1097 | +        self._max_total_size_bytes = remote.max_batch_total_size_bytes
 | |
| 1098 | +        self._request = remote_execution_pb2.BatchUpdateBlobsRequest()
 | |
| 1099 | +        self._size = 0
 | |
| 1100 | +        self._sent = False
 | |
| 1101 | + | |
| 1102 | +    def add(self, digest, stream):
 | |
| 1103 | +        assert not self._sent
 | |
| 1104 | + | |
| 1105 | +        new_batch_size = self._size + digest.size_bytes
 | |
| 1106 | +        if new_batch_size > self._max_total_size_bytes:
 | |
| 1107 | +            # Not enough space left in current batch
 | |
| 1108 | +            return False
 | |
| 1109 | + | |
| 1110 | +        blob_request = self._request.requests.add()
 | |
| 1111 | +        blob_request.digest.hash = digest.hash
 | |
| 1112 | +        blob_request.digest.size_bytes = digest.size_bytes
 | |
| 1113 | +        blob_request.data = stream.read(digest.size_bytes)
 | |
| 1114 | +        self._size = new_batch_size
 | |
| 1115 | +        return True
 | |
| 1116 | + | |
| 1117 | +    def send(self):
 | |
| 1118 | +        assert not self._sent
 | |
| 1119 | +        self._sent = True
 | |
| 1120 | + | |
| 1121 | +        if len(self._request.requests) == 0:
 | |
| 1122 | +            return
 | |
| 1123 | + | |
| 1124 | +        batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
 | |
| 1125 | + | |
| 1126 | +        for response in batch_response.responses:
 | |
| 1127 | +            if response.status.code != grpc.StatusCode.OK.value[0]:
 | |
| 1128 | +                raise ArtifactError("Failed to upload blob {}: {}".format(
 | |
| 1129 | +                    response.digest.hash, response.status.code))
 | |
| 1130 | + | |
| 1131 | + | |
| 878 | 1132 |  def _grouper(iterable, n):
 | 
| 879 | 1133 |      while True:
 | 
| 880 | 1134 |          try:
 | 
| ... | ... | @@ -38,8 +38,9 @@ from .._context import Context | 
| 38 | 38 |  from .cascache import CASCache
 | 
| 39 | 39 |  | 
| 40 | 40 |  | 
| 41 | -# The default limit for gRPC messages is 4 MiB
 | |
| 42 | -_MAX_BATCH_TOTAL_SIZE_BYTES = 4 * 1024 * 1024
 | |
| 41 | +# The default limit for gRPC messages is 4 MiB.
 | |
| 42 | +# Limit payload to 1 MiB to leave sufficient headroom for metadata.
 | |
| 43 | +_MAX_PAYLOAD_BYTES = 1024 * 1024
 | |
| 43 | 44 |  | 
| 44 | 45 |  | 
| 45 | 46 |  # Trying to push an artifact that is too large
 | 
| ... | ... | @@ -69,7 +70,7 @@ def create_server(repo, *, enable_push): | 
| 69 | 70 |          _ByteStreamServicer(artifactcache, enable_push=enable_push), server)
 | 
| 70 | 71 |  | 
| 71 | 72 |      remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
 | 
| 72 | -        _ContentAddressableStorageServicer(artifactcache), server)
 | |
| 73 | +        _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
 | |
| 73 | 74 |  | 
| 74 | 75 |      remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
 | 
| 75 | 76 |          _CapabilitiesServicer(), server)
 | 
| ... | ... | @@ -158,7 +159,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): | 
| 158 | 159 |  | 
| 159 | 160 |                  remaining = client_digest.size_bytes - request.read_offset
 | 
| 160 | 161 |                  while remaining > 0:
 | 
| 161 | -                    chunk_size = min(remaining, 64 * 1024)
 | |
| 162 | +                    chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
 | |
| 162 | 163 |                      remaining -= chunk_size
 | 
| 163 | 164 |  | 
| 164 | 165 |                      response = bytestream_pb2.ReadResponse()
 | 
| ... | ... | @@ -223,9 +224,10 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): | 
| 223 | 224 |  | 
| 224 | 225 |  | 
| 225 | 226 |  class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
 | 
| 226 | -    def __init__(self, cas):
 | |
| 227 | +    def __init__(self, cas, *, enable_push):
 | |
| 227 | 228 |          super().__init__()
 | 
| 228 | 229 |          self.cas = cas
 | 
| 230 | +        self.enable_push = enable_push
 | |
| 229 | 231 |  | 
| 230 | 232 |      def FindMissingBlobs(self, request, context):
 | 
| 231 | 233 |          response = remote_execution_pb2.FindMissingBlobsResponse()
 | 
| ... | ... | @@ -242,7 +244,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres | 
| 242 | 244 |  | 
| 243 | 245 |          for digest in request.digests:
 | 
| 244 | 246 |              batch_size += digest.size_bytes
 | 
| 245 | -            if batch_size > _MAX_BATCH_TOTAL_SIZE_BYTES:
 | |
| 247 | +            if batch_size > _MAX_PAYLOAD_BYTES:
 | |
| 246 | 248 |                  context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
 | 
| 247 | 249 |                  return response
 | 
| 248 | 250 |  | 
| ... | ... | @@ -261,6 +263,46 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres | 
| 261 | 263 |  | 
| 262 | 264 |          return response
 | 
| 263 | 265 |  | 
| 266 | +    def BatchUpdateBlobs(self, request, context):
 | |
| 267 | +        response = remote_execution_pb2.BatchUpdateBlobsResponse()
 | |
| 268 | + | |
| 269 | +        if not self.enable_push:
 | |
| 270 | +            context.set_code(grpc.StatusCode.PERMISSION_DENIED)
 | |
| 271 | +            return response
 | |
| 272 | + | |
| 273 | +        batch_size = 0
 | |
| 274 | + | |
| 275 | +        for blob_request in request.requests:
 | |
| 276 | +            digest = blob_request.digest
 | |
| 277 | + | |
| 278 | +            batch_size += digest.size_bytes
 | |
| 279 | +            if batch_size > _MAX_PAYLOAD_BYTES:
 | |
| 280 | +                context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
 | |
| 281 | +                return response
 | |
| 282 | + | |
| 283 | +            blob_response = response.responses.add()
 | |
| 284 | +            blob_response.digest.hash = digest.hash
 | |
| 285 | +            blob_response.digest.size_bytes = digest.size_bytes
 | |
| 286 | + | |
| 287 | +            if len(blob_request.data) != digest.size_bytes:
 | |
| 288 | +                blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
 | |
| 289 | +                continue
 | |
| 290 | + | |
| 291 | +            try:
 | |
| 292 | +                _clean_up_cache(self.cas, digest.size_bytes)
 | |
| 293 | + | |
| 294 | +                with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
 | |
| 295 | +                    out.write(blob_request.data)
 | |
| 296 | +                    out.flush()
 | |
| 297 | +                    server_digest = self.cas.add_object(path=out.name)
 | |
| 298 | +                    if server_digest.hash != digest.hash:
 | |
| 299 | +                        blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
 | |
| 300 | + | |
| 301 | +            except ArtifactTooLargeException:
 | |
| 302 | +                blob_response.status.code = grpc.StatusCode.RESOURCE_EXHAUSTED
 | |
| 303 | + | |
| 304 | +        return response
 | |
| 305 | + | |
| 264 | 306 |  | 
| 265 | 307 |  class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer):
 | 
| 266 | 308 |      def GetCapabilities(self, request, context):
 | 
| ... | ... | @@ -269,7 +311,7 @@ class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer): | 
| 269 | 311 |          cache_capabilities = response.cache_capabilities
 | 
| 270 | 312 |          cache_capabilities.digest_function.append(remote_execution_pb2.SHA256)
 | 
| 271 | 313 |          cache_capabilities.action_cache_update_capabilities.update_enabled = False
 | 
| 272 | -        cache_capabilities.max_batch_total_size_bytes = _MAX_BATCH_TOTAL_SIZE_BYTES
 | |
| 314 | +        cache_capabilities.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
 | |
| 273 | 315 |          cache_capabilities.symlink_absolute_path_strategy = remote_execution_pb2.CacheCapabilities.ALLOWED
 | 
| 274 | 316 |  | 
| 275 | 317 |          response.deprecated_api_version.major = 2
 | 
| ... | ... | @@ -119,6 +119,8 @@ class Job(): | 
| 119 | 119 |          self._result = None                    # Return value of child action in the parent
 | 
| 120 | 120 |          self._tries = 0                        # Try count, for retryable jobs
 | 
| 121 | 121 |          self._skipped_flag = False             # Indicate whether the job was skipped.
 | 
| 122 | +        self._terminated = False               # Whether this job has been explicitly terminated
 | |
| 123 | + | |
| 122 | 124 |          # If False, a retry will not be attempted regardless of whether _tries is less than _max_retries.
 | 
| 123 | 125 |          #
 | 
| 124 | 126 |          self._retry_flag = True
 | 
| ... | ... | @@ -188,6 +190,8 @@ class Job(): | 
| 188 | 190 |          # Terminate the process using multiprocessing API pathway
 | 
| 189 | 191 |          self._process.terminate()
 | 
| 190 | 192 |  | 
| 193 | +        self._terminated = True
 | |
| 194 | + | |
| 191 | 195 |      # terminate_wait()
 | 
| 192 | 196 |      #
 | 
| 193 | 197 |      # Wait for terminated jobs to complete
 | 
| ... | ... | @@ -271,18 +275,22 @@ class Job(): | 
| 271 | 275 |      # running the integration commands).
 | 
| 272 | 276 |      #
 | 
| 273 | 277 |      # Args:
 | 
| 274 | -    #     (int): The plugin identifier for this task
 | |
| 278 | +    #     task_id (int): The plugin identifier for this task
 | |
| 275 | 279 |      #
 | 
| 276 | 280 |      def set_task_id(self, task_id):
 | 
| 277 | 281 |          self._task_id = task_id
 | 
| 278 | 282 |  | 
| 279 | 283 |      # skipped
 | 
| 280 | 284 |      #
 | 
| 285 | +    # This will evaluate to True if the job was skipped
 | |
| 286 | +    # during processing, or if it was forcefully terminated.
 | |
| 287 | +    #
 | |
| 281 | 288 |      # Returns:
 | 
| 282 | -    #    bool: True if the job was skipped while processing.
 | |
| 289 | +    #    (bool): Whether the job should appear as skipped
 | |
| 290 | +    #
 | |
| 283 | 291 |      @property
 | 
| 284 | 292 |      def skipped(self):
 | 
| 285 | -        return self._skipped_flag
 | |
| 293 | +        return self._skipped_flag or self._terminated
 | |
| 286 | 294 |  | 
| 287 | 295 |      #######################################################
 | 
| 288 | 296 |      #                  Abstract Methods                   #
 | 
| ... | ... | @@ -65,7 +65,7 @@ class BuildQueue(Queue): | 
| 65 | 65 |          # If the estimated size outgrows the quota, ask the scheduler
 | 
| 66 | 66 |          # to queue a job to actually check the real cache size.
 | 
| 67 | 67 |          #
 | 
| 68 | -        if artifacts.get_quota_exceeded():
 | |
| 68 | +        if artifacts.has_quota_exceeded():
 | |
| 69 | 69 |              self._scheduler.check_cache_size()
 | 
| 70 | 70 |  | 
| 71 | 71 |      def done(self, job, element, result, success):
 | 
| ... | ... | @@ -325,15 +325,22 @@ class Queue(): | 
| 325 | 325 |                            detail=traceback.format_exc())
 | 
| 326 | 326 |              self.failed_elements.append(element)
 | 
| 327 | 327 |          else:
 | 
| 328 | - | |
| 329 | -            # No exception occured, handle the success/failure state in the normal way
 | |
| 330 | 328 |              #
 | 
| 329 | +            # No exception occured in post processing
 | |
| 330 | +            #
 | |
| 331 | + | |
| 332 | +            # Only place in the output done queue if the job
 | |
| 333 | +            # was considered successful
 | |
| 331 | 334 |              if success:
 | 
| 332 | 335 |                  self._done_queue.append(job)
 | 
| 333 | -                if not job.skipped:
 | |
| 334 | -                    self.processed_elements.append(element)
 | |
| 335 | -                else:
 | |
| 336 | -                    self.skipped_elements.append(element)
 | |
| 336 | + | |
| 337 | +            # A Job can be skipped whether or not it has failed,
 | |
| 338 | +            # we want to only bookkeep them as processed or failed
 | |
| 339 | +            # if they are not skipped.
 | |
| 340 | +            if job.skipped:
 | |
| 341 | +                self.skipped_elements.append(element)
 | |
| 342 | +            elif success:
 | |
| 343 | +                self.processed_elements.append(element)
 | |
| 337 | 344 |              else:
 | 
| 338 | 345 |                  self.failed_elements.append(element)
 | 
| 339 | 346 |  | 
| ... | ... | @@ -349,7 +349,7 @@ class Scheduler(): | 
| 349 | 349 |          platform = Platform.get_platform()
 | 
| 350 | 350 |          artifacts = platform.artifactcache
 | 
| 351 | 351 |  | 
| 352 | -        if not artifacts.get_quota_exceeded():
 | |
| 352 | +        if not artifacts.has_quota_exceeded():
 | |
| 353 | 353 |              return
 | 
| 354 | 354 |  | 
| 355 | 355 |          job = CleanupJob(self, 'cleanup', 'cleanup/cleanup',
 | 
| ... | ... | @@ -387,6 +387,15 @@ class Scheduler(): | 
| 387 | 387 |      # A loop registered event callback for keyboard interrupts
 | 
| 388 | 388 |      #
 | 
| 389 | 389 |      def _interrupt_event(self):
 | 
| 390 | + | |
| 391 | +        # FIXME: This should not be needed, but for some reason we receive an
 | |
| 392 | +        #        additional SIGINT event when the user hits ^C a second time
 | |
| 393 | +        #        to inform us that they really intend to terminate; even though
 | |
| 394 | +        #        we have disconnected our handlers at this time.
 | |
| 395 | +        #
 | |
| 396 | +        if self.terminated:
 | |
| 397 | +            return
 | |
| 398 | + | |
| 390 | 399 |          # Leave this to the frontend to decide, if no
 | 
| 391 | 400 |          # interrrupt callback was specified, then just terminate.
 | 
| 392 | 401 |          if self._interrupt_callback:
 | 
| ... | ... | @@ -467,7 +467,7 @@ def node_get_project_path(node, key, project_dir, *, | 
| 467 | 467 |                          "{}: Specified path '{}' does not exist"
 | 
| 468 | 468 |                          .format(provenance, path_str))
 | 
| 469 | 469 |  | 
| 470 | -    is_inside = project_dir_path in full_resolved_path.parents or (
 | |
| 470 | +    is_inside = project_dir_path.resolve() in full_resolved_path.parents or (
 | |
| 471 | 471 |          full_resolved_path == project_dir_path)
 | 
| 472 | 472 |  | 
| 473 | 473 |      if path.is_absolute() or not is_inside:
 | 
| ... | ... | @@ -434,7 +434,7 @@ class Element(Plugin): | 
| 434 | 434 |                                                  visited=visited, recursed=True)
 | 
| 435 | 435 |  | 
| 436 | 436 |          # Yeild self only at the end, after anything needed has been traversed
 | 
| 437 | -        if should_yield and (recurse or recursed) and (scope == Scope.ALL or scope == Scope.RUN):
 | |
| 437 | +        if should_yield and (recurse or recursed) and scope != Scope.BUILD:
 | |
| 438 | 438 |              yield self
 | 
| 439 | 439 |  | 
| 440 | 440 |      def search(self, scope, name):
 | 
| ... | ... | @@ -1289,17 +1289,21 @@ class Element(Plugin): | 
| 1289 | 1289 |                  if scope == Scope.BUILD:
 | 
| 1290 | 1290 |                      self.stage(sandbox)
 | 
| 1291 | 1291 |                  elif scope == Scope.RUN:
 | 
| 1292 | -                    # Stage deps in the sandbox root
 | |
| 1293 | 1292 |                      if deps == 'run':
 | 
| 1294 | -                        with self.timed_activity("Staging dependencies", silent_nested=True):
 | |
| 1295 | -                            self.stage_dependency_artifacts(sandbox, scope)
 | |
| 1296 | - | |
| 1297 | -                        # Run any integration commands provided by the dependencies
 | |
| 1298 | -                        # once they are all staged and ready
 | |
| 1299 | -                        if integrate:
 | |
| 1300 | -                            with self.timed_activity("Integrating sandbox"):
 | |
| 1301 | -                                for dep in self.dependencies(scope):
 | |
| 1302 | -                                    dep.integrate(sandbox)
 | |
| 1293 | +                        dependency_scope = Scope.RUN
 | |
| 1294 | +                    else:
 | |
| 1295 | +                        dependency_scope = None
 | |
| 1296 | + | |
| 1297 | +                    # Stage deps in the sandbox root
 | |
| 1298 | +                    with self.timed_activity("Staging dependencies", silent_nested=True):
 | |
| 1299 | +                        self.stage_dependency_artifacts(sandbox, dependency_scope)
 | |
| 1300 | + | |
| 1301 | +                    # Run any integration commands provided by the dependencies
 | |
| 1302 | +                    # once they are all staged and ready
 | |
| 1303 | +                    if integrate:
 | |
| 1304 | +                        with self.timed_activity("Integrating sandbox"):
 | |
| 1305 | +                            for dep in self.dependencies(dependency_scope):
 | |
| 1306 | +                                dep.integrate(sandbox)
 | |
| 1303 | 1307 |  | 
| 1304 | 1308 |              yield sandbox
 | 
| 1305 | 1309 |  | 
| ... | ... | @@ -164,10 +164,18 @@ class GitMirror(SourceFetcher): | 
| 164 | 164 |                           cwd=self.mirror)
 | 
| 165 | 165 |  | 
| 166 | 166 |      def fetch(self, alias_override=None):
 | 
| 167 | -        self.ensure(alias_override)
 | |
| 168 | -        if not self.has_ref():
 | |
| 169 | -            self._fetch(alias_override)
 | |
| 170 | -        self.assert_ref()
 | |
| 167 | +        # Resolve the URL for the message
 | |
| 168 | +        resolved_url = self.source.translate_url(self.url,
 | |
| 169 | +                                                 alias_override=alias_override,
 | |
| 170 | +                                                 primary=self.primary)
 | |
| 171 | + | |
| 172 | +        with self.source.timed_activity("Fetching from {}"
 | |
| 173 | +                                        .format(resolved_url),
 | |
| 174 | +                                        silent_nested=True):
 | |
| 175 | +            self.ensure(alias_override)
 | |
| 176 | +            if not self.has_ref():
 | |
| 177 | +                self._fetch(alias_override)
 | |
| 178 | +            self.assert_ref()
 | |
| 171 | 179 |  | 
| 172 | 180 |      def has_ref(self):
 | 
| 173 | 181 |          if not self.ref:
 | 
| ... | ... | @@ -585,28 +585,48 @@ class Source(Plugin): | 
| 585 | 585 |      #
 | 
| 586 | 586 |      def _fetch(self):
 | 
| 587 | 587 |          project = self._get_project()
 | 
| 588 | -        source_fetchers = self.get_source_fetchers()
 | |
| 588 | +        context = self._get_context()
 | |
| 589 | + | |
| 590 | +        # Silence the STATUS messages which might happen as a result
 | |
| 591 | +        # of checking the source fetchers.
 | |
| 592 | +        with context.silence():
 | |
| 593 | +            source_fetchers = self.get_source_fetchers()
 | |
| 589 | 594 |  | 
| 590 | 595 |          # Use the source fetchers if they are provided
 | 
| 591 | 596 |          #
 | 
| 592 | 597 |          if source_fetchers:
 | 
| 593 | -            for fetcher in source_fetchers:
 | |
| 594 | -                alias = fetcher._get_alias()
 | |
| 595 | -                for uri in project.get_alias_uris(alias, first_pass=self.__first_pass):
 | |
| 596 | -                    try:
 | |
| 597 | -                        fetcher.fetch(uri)
 | |
| 598 | -                    # FIXME: Need to consider temporary vs. permanent failures,
 | |
| 599 | -                    #        and how this works with retries.
 | |
| 600 | -                    except BstError as e:
 | |
| 601 | -                        last_error = e
 | |
| 602 | -                        continue
 | |
| 603 | - | |
| 604 | -                    # No error, we're done with this fetcher
 | |
| 605 | -                    break
 | |
| 606 | 598 |  | 
| 607 | -                else:
 | |
| 608 | -                    # No break occurred, raise the last detected error
 | |
| 609 | -                    raise last_error
 | |
| 599 | +            # Use a contorted loop here, this is to allow us to
 | |
| 600 | +            # silence the messages which can result from consuming
 | |
| 601 | +            # the items of source_fetchers, if it happens to be a generator.
 | |
| 602 | +            #
 | |
| 603 | +            source_fetchers = iter(source_fetchers)
 | |
| 604 | +            try:
 | |
| 605 | + | |
| 606 | +                while True:
 | |
| 607 | + | |
| 608 | +                    with context.silence():
 | |
| 609 | +                        fetcher = next(source_fetchers)
 | |
| 610 | + | |
| 611 | +                    alias = fetcher._get_alias()
 | |
| 612 | +                    for uri in project.get_alias_uris(alias, first_pass=self.__first_pass):
 | |
| 613 | +                        try:
 | |
| 614 | +                            fetcher.fetch(uri)
 | |
| 615 | +                        # FIXME: Need to consider temporary vs. permanent failures,
 | |
| 616 | +                        #        and how this works with retries.
 | |
| 617 | +                        except BstError as e:
 | |
| 618 | +                            last_error = e
 | |
| 619 | +                            continue
 | |
| 620 | + | |
| 621 | +                        # No error, we're done with this fetcher
 | |
| 622 | +                        break
 | |
| 623 | + | |
| 624 | +                    else:
 | |
| 625 | +                        # No break occurred, raise the last detected error
 | |
| 626 | +                        raise last_error
 | |
| 627 | + | |
| 628 | +            except StopIteration:
 | |
| 629 | +                pass
 | |
| 610 | 630 |  | 
| 611 | 631 |          # Default codepath is to reinstantiate the Source
 | 
| 612 | 632 |          #
 | 
| ... | ... | @@ -496,7 +496,7 @@ def get_bst_version(): | 
| 496 | 496 |  | 
| 497 | 497 |  @contextmanager
 | 
| 498 | 498 |  def save_file_atomic(filename, mode='w', *, buffering=-1, encoding=None,
 | 
| 499 | -                     errors=None, newline=None, closefd=True, opener=None):
 | |
| 499 | +                     errors=None, newline=None, closefd=True, opener=None, tempdir=None):
 | |
| 500 | 500 |      """Save a file with a temporary name and rename it into place when ready.
 | 
| 501 | 501 |  | 
| 502 | 502 |      This is a context manager which is meant for saving data to files.
 | 
| ... | ... | @@ -523,8 +523,9 @@ def save_file_atomic(filename, mode='w', *, buffering=-1, encoding=None, | 
| 523 | 523 |      # https://bugs.python.org/issue8604
 | 
| 524 | 524 |  | 
| 525 | 525 |      assert os.path.isabs(filename), "The utils.save_file_atomic() parameter ``filename`` must be an absolute path"
 | 
| 526 | -    dirname = os.path.dirname(filename)
 | |
| 527 | -    fd, tempname = tempfile.mkstemp(dir=dirname)
 | |
| 526 | +    if tempdir is None:
 | |
| 527 | +        tempdir = os.path.dirname(filename)
 | |
| 528 | +    fd, tempname = tempfile.mkstemp(dir=tempdir)
 | |
| 528 | 529 |      os.close(fd)
 | 
| 529 | 530 |  | 
| 530 | 531 |      f = open(tempname, mode=mode, buffering=buffering, encoding=encoding,
 | 
| ... | ... | @@ -556,6 +557,9 @@ def save_file_atomic(filename, mode='w', *, buffering=-1, encoding=None, | 
| 556 | 557 |  #
 | 
| 557 | 558 |  # Get the disk usage of a given directory in bytes.
 | 
| 558 | 559 |  #
 | 
| 560 | +# This function assumes that files do not inadvertantly
 | |
| 561 | +# disappear while this function is running.
 | |
| 562 | +#
 | |
| 559 | 563 |  # Arguments:
 | 
| 560 | 564 |  #     (str) The path whose size to check.
 | 
| 561 | 565 |  #
 | 
| ... | ... | @@ -675,7 +679,7 @@ def _force_rmtree(rootpath, **kwargs): | 
| 675 | 679 |  | 
| 676 | 680 |      try:
 | 
| 677 | 681 |          shutil.rmtree(rootpath, **kwargs)
 | 
| 678 | -    except shutil.Error as e:
 | |
| 682 | +    except OSError as e:
 | |
| 679 | 683 |          raise UtilError("Failed to remove cache directory '{}': {}"
 | 
| 680 | 684 |                          .format(rootpath, e))
 | 
| 681 | 685 |  | 
| ... | ... | @@ -181,3 +181,15 @@ def test_project_refs_options(cli, datafiles): | 
| 181 | 181 |  | 
| 182 | 182 |      # Assert that the cache keys are different
 | 
| 183 | 183 |      assert result1.output != result2.output
 | 
| 184 | + | |
| 185 | + | |
| 186 | +@pytest.mark.datafiles(os.path.join(DATA_DIR, 'element-path'))
 | |
| 187 | +def test_element_path_project_path_contains_symlinks(cli, datafiles, tmpdir):
 | |
| 188 | +    real_project = str(datafiles)
 | |
| 189 | +    linked_project = os.path.join(str(tmpdir), 'linked')
 | |
| 190 | +    os.symlink(real_project, linked_project)
 | |
| 191 | +    os.makedirs(os.path.join(real_project, 'elements'), exist_ok=True)
 | |
| 192 | +    with open(os.path.join(real_project, 'elements', 'element.bst'), 'w') as f:
 | |
| 193 | +        f.write("kind: manual\n")
 | |
| 194 | +    result = cli.run(project=linked_project, args=['show', 'element.bst'])
 | |
| 195 | +    result.assert_success() | 
| ... | ... | @@ -65,9 +65,10 @@ def test_build_checkout(datafiles, cli, strict, hardlinks): | 
| 65 | 65 |  def test_build_checkout_deps(datafiles, cli, deps):
 | 
| 66 | 66 |      project = os.path.join(datafiles.dirname, datafiles.basename)
 | 
| 67 | 67 |      checkout = os.path.join(cli.directory, 'checkout')
 | 
| 68 | +    element_name = "checkout-deps.bst"
 | |
| 68 | 69 |  | 
| 69 | 70 |      # First build it
 | 
| 70 | -    result = cli.run(project=project, args=['build', 'target.bst'])
 | |
| 71 | +    result = cli.run(project=project, args=['build', element_name])
 | |
| 71 | 72 |      result.assert_success()
 | 
| 72 | 73 |  | 
| 73 | 74 |      # Assert that after a successful build, the builddir is empty
 | 
| ... | ... | @@ -76,20 +77,15 @@ def test_build_checkout_deps(datafiles, cli, deps): | 
| 76 | 77 |      assert not os.listdir(builddir)
 | 
| 77 | 78 |  | 
| 78 | 79 |      # Now check it out
 | 
| 79 | -    result = cli.run(project=project, args=['checkout', 'target.bst', '--deps', deps, checkout])
 | |
| 80 | +    result = cli.run(project=project, args=['checkout', element_name, '--deps', deps, checkout])
 | |
| 80 | 81 |      result.assert_success()
 | 
| 81 | 82 |  | 
| 82 | -    # Check that the executable hello file is found in the checkout
 | |
| 83 | -    filename = os.path.join(checkout, 'usr', 'bin', 'hello')
 | |
| 84 | - | |
| 85 | -    if deps == "run":
 | |
| 86 | -        assert os.path.exists(filename)
 | |
| 87 | -    else:
 | |
| 88 | -        assert not os.path.exists(filename)
 | |
| 89 | - | |
| 90 | -    # Check that the executable hello file is found in the checkout
 | |
| 91 | -    filename = os.path.join(checkout, 'usr', 'include', 'pony.h')
 | |
| 83 | +    # Verify output of this element
 | |
| 84 | +    filename = os.path.join(checkout, 'etc', 'buildstream', 'config')
 | |
| 85 | +    assert os.path.exists(filename)
 | |
| 92 | 86 |  | 
| 87 | +    # Verify output of this element's runtime dependencies
 | |
| 88 | +    filename = os.path.join(checkout, 'usr', 'bin', 'hello')
 | |
| 93 | 89 |      if deps == "run":
 | 
| 94 | 90 |          assert os.path.exists(filename)
 | 
| 95 | 91 |      else:
 | 
| 1 | +kind: import
 | |
| 2 | +description: It is important for this element to have both build and runtime dependencies
 | |
| 3 | +sources:
 | |
| 4 | +- kind: local
 | |
| 5 | +  path: files/etc-files
 | |
| 6 | +depends:
 | |
| 7 | +- filename: import-dev.bst
 | |
| 8 | +  type: build
 | |
| 9 | +- filename: import-bin.bst
 | |
| 10 | +  type: runtime | 
| 1 | +config | 
