Jim MacArthur pushed to branch jmac/remote_execution_client at BuildStream / buildstream
Commits:
- 
c15cb951
by Tristan Van Berkom at 2018-08-20T10:19:03Z
- 
372abed5
by Tristan Van Berkom at 2018-08-20T11:47:53Z
- 
3feefbf6
by Jim MacArthur at 2018-08-20T13:18:17Z
- 
40ae8e1a
by Jim MacArthur at 2018-08-20T13:18:17Z
- 
e8d7f098
by Jim MacArthur at 2018-08-20T13:18:17Z
- 
bb20b6d7
by Jim MacArthur at 2018-08-20T13:18:17Z
- 
fada50a5
by Jim MacArthur at 2018-08-20T13:18:17Z
- 
78fc5499
by Jim MacArthur at 2018-08-20T13:18:17Z
- 
6b83bf3b
by Jim MacArthur at 2018-08-20T13:18:17Z
- 
803189a7
by Jim MacArthur at 2018-08-20T13:18:17Z
- 
f0e077d2
by Jim MacArthur at 2018-08-20T13:18:17Z
- 
13322752
by Jim MacArthur at 2018-08-20T13:18:17Z
15 changed files:
- buildstream/_artifactcache/cascache.py
- buildstream/_loader/loadelement.py
- buildstream/_loader/loader.py
- buildstream/_loader/metaelement.py
- buildstream/_loader/types.py
- buildstream/_project.py
- buildstream/buildelement.py
- buildstream/data/projectconfig.yaml
- buildstream/element.py
- buildstream/plugins/elements/autotools.py
- buildstream/sandbox/__init__.py
- + buildstream/sandbox/_sandboxremote.py
- buildstream/sandbox/sandbox.py
- buildstream/storage/_casbaseddirectory.py
- setup.cfg
Changes:
| ... | ... | @@ -215,6 +215,29 @@ class CASCache(ArtifactCache): | 
| 215 | 215 |              remotes_for_project = self._remotes[element._get_project()]
 | 
| 216 | 216 |              return any(remote.spec.push for remote in remotes_for_project)
 | 
| 217 | 217 |  | 
| 218 | +    def pull_key(self, key, size_bytes, project):
 | |
| 219 | +        """ Pull a single key rather than an artifact.
 | |
| 220 | +        Does not update local refs. """
 | |
| 221 | + | |
| 222 | +        for remote in self._remotes[project]:
 | |
| 223 | +            try:
 | |
| 224 | +                remote.init()
 | |
| 225 | + | |
| 226 | +                tree = remote_execution_pb2.Digest()
 | |
| 227 | +                tree.hash = key
 | |
| 228 | +                tree.size_bytes = size_bytes
 | |
| 229 | + | |
| 230 | +                self._fetch_directory(remote, tree)
 | |
| 231 | + | |
| 232 | +                # no need to pull from additional remotes
 | |
| 233 | +                return True
 | |
| 234 | + | |
| 235 | +            except grpc.RpcError as e:
 | |
| 236 | +                if e.code() != grpc.StatusCode.NOT_FOUND:
 | |
| 237 | +                    raise
 | |
| 238 | + | |
| 239 | +        return False
 | |
| 240 | + | |
| 218 | 241 |      def pull(self, element, key, *, progress=None):
 | 
| 219 | 242 |          ref = self.get_artifact_fullname(element, key)
 | 
| 220 | 243 |  | 
| ... | ... | @@ -256,10 +279,94 @@ class CASCache(ArtifactCache): | 
| 256 | 279 |  | 
| 257 | 280 |          self.set_ref(newref, tree)
 | 
| 258 | 281 |  | 
| 282 | +    def _push_refs_to_remote(self, refs, remote, may_have_dependencies):
 | |
| 283 | +        skipped_remote = True
 | |
| 284 | +        try:
 | |
| 285 | +            for ref in refs:
 | |
| 286 | +                tree = self.resolve_ref(ref)
 | |
| 287 | + | |
| 288 | +                # Check whether ref is already on the server in which case
 | |
| 289 | +                # there is no need to push the artifact
 | |
| 290 | +                try:
 | |
| 291 | +                    request = buildstream_pb2.GetReferenceRequest()
 | |
| 292 | +                    request.key = ref
 | |
| 293 | +                    response = remote.ref_storage.GetReference(request)
 | |
| 294 | + | |
| 295 | +                    if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
 | |
| 296 | +                        # ref is already on the server with the same tree
 | |
| 297 | +                        continue
 | |
| 298 | + | |
| 299 | +                except grpc.RpcError as e:
 | |
| 300 | +                    if e.code() != grpc.StatusCode.NOT_FOUND:
 | |
| 301 | +                        # Intentionally re-raise RpcError for outer except block.
 | |
| 302 | +                        raise
 | |
| 303 | + | |
| 304 | +                missing_blobs = {}
 | |
| 305 | +                required_blobs = self._required_blobs(tree, may_have_dependencies)
 | |
| 306 | + | |
| 307 | +                # Limit size of FindMissingBlobs request
 | |
| 308 | +                for required_blobs_group in _grouper(required_blobs, 512):
 | |
| 309 | +                    request = remote_execution_pb2.FindMissingBlobsRequest()
 | |
| 310 | + | |
| 311 | +                    for required_digest in required_blobs_group:
 | |
| 312 | +                        d = request.blob_digests.add()
 | |
| 313 | +                        d.hash = required_digest.hash
 | |
| 314 | +                        d.size_bytes = required_digest.size_bytes
 | |
| 315 | + | |
| 316 | +                    response = remote.cas.FindMissingBlobs(request)
 | |
| 317 | +                    for digest in response.missing_blob_digests:
 | |
| 318 | +                        d = remote_execution_pb2.Digest()
 | |
| 319 | +                        d.hash = digest.hash
 | |
| 320 | +                        d.size_bytes = digest.size_bytes
 | |
| 321 | +                        missing_blobs[d.hash] = d
 | |
| 322 | + | |
| 323 | +                # Upload any blobs missing on the server
 | |
| 324 | +                skipped_remote = False
 | |
| 325 | +                for digest in missing_blobs.values():
 | |
| 326 | +                    uuid_ = uuid.uuid4()
 | |
| 327 | +                    resource_name = '/'.join(['uploads', str(uuid_), 'blobs',
 | |
| 328 | +                                              digest.hash, str(digest.size_bytes)])
 | |
| 329 | +                    def request_stream():
 | |
| 330 | +                        with open(self.objpath(digest), 'rb') as f:
 | |
| 331 | +                            assert os.fstat(f.fileno()).st_size == digest.size_bytes
 | |
| 332 | +                            offset = 0
 | |
| 333 | +                            finished = False
 | |
| 334 | +                            remaining = digest.size_bytes
 | |
| 335 | +                            while not finished:
 | |
| 336 | +                                chunk_size = min(remaining, 64 * 1024)
 | |
| 337 | +                                remaining -= chunk_size
 | |
| 338 | + | |
| 339 | +                                request = bytestream_pb2.WriteRequest()
 | |
| 340 | +                                request.write_offset = offset
 | |
| 341 | +                                # max. 64 kB chunks
 | |
| 342 | +                                request.data = f.read(chunk_size)
 | |
| 343 | +                                request.resource_name = resource_name
 | |
| 344 | +                                request.finish_write = remaining <= 0
 | |
| 345 | +                                yield request
 | |
| 346 | +                                offset += chunk_size
 | |
| 347 | +                                finished = request.finish_write
 | |
| 348 | +                    response = remote.bytestream.Write(request_stream())
 | |
| 349 | + | |
| 350 | +                request = buildstream_pb2.UpdateReferenceRequest()
 | |
| 351 | +                request.keys.append(ref)
 | |
| 352 | +                request.digest.hash = tree.hash
 | |
| 353 | +                request.digest.size_bytes = tree.size_bytes
 | |
| 354 | +                remote.ref_storage.UpdateReference(request)
 | |
| 355 | + | |
| 356 | +        except grpc.RpcError as e:
 | |
| 357 | +            if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
 | |
| 358 | +                raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
 | |
| 359 | + | |
| 360 | +        return not skipped_remote
 | |
| 361 | + | |
| 259 | 362 |      def push(self, element, keys):
 | 
| 363 | +        keys = list(keys)
 | |
| 260 | 364 |          refs = [self.get_artifact_fullname(element, key) for key in keys]
 | 
| 261 | 365 |  | 
| 262 | 366 |          project = element._get_project()
 | 
| 367 | +        return self.push_refs(refs, project, element=element)
 | |
| 368 | + | |
| 369 | +    def push_refs(self, refs, project, may_have_dependencies=True, element=None):
 | |
| 263 | 370 |  | 
| 264 | 371 |          push_remotes = [r for r in self._remotes[project] if r.spec.push]
 | 
| 265 | 372 |  | 
| ... | ... | @@ -267,97 +374,52 @@ class CASCache(ArtifactCache): | 
| 267 | 374 |  | 
| 268 | 375 |          for remote in push_remotes:
 | 
| 269 | 376 |              remote.init()
 | 
| 270 | -            skipped_remote = True
 | |
| 271 | -            element.info("Pushing {} -> {}".format(element._get_brief_display_key(), remote.spec.url))
 | |
| 272 | - | |
| 273 | -            try:
 | |
| 274 | -                for ref in refs:
 | |
| 275 | -                    tree = self.resolve_ref(ref)
 | |
| 276 | - | |
| 277 | -                    # Check whether ref is already on the server in which case
 | |
| 278 | -                    # there is no need to push the artifact
 | |
| 279 | -                    try:
 | |
| 280 | -                        request = buildstream_pb2.GetReferenceRequest()
 | |
| 281 | -                        request.key = ref
 | |
| 282 | -                        response = remote.ref_storage.GetReference(request)
 | |
| 283 | - | |
| 284 | -                        if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
 | |
| 285 | -                            # ref is already on the server with the same tree
 | |
| 286 | -                            continue
 | |
| 287 | - | |
| 288 | -                    except grpc.RpcError as e:
 | |
| 289 | -                        if e.code() != grpc.StatusCode.NOT_FOUND:
 | |
| 290 | -                            # Intentionally re-raise RpcError for outer except block.
 | |
| 291 | -                            raise
 | |
| 292 | - | |
| 293 | -                    missing_blobs = {}
 | |
| 294 | -                    required_blobs = self._required_blobs(tree)
 | |
| 295 | - | |
| 296 | -                    # Limit size of FindMissingBlobs request
 | |
| 297 | -                    for required_blobs_group in _grouper(required_blobs, 512):
 | |
| 298 | -                        request = remote_execution_pb2.FindMissingBlobsRequest()
 | |
| 299 | - | |
| 300 | -                        for required_digest in required_blobs_group:
 | |
| 301 | -                            d = request.blob_digests.add()
 | |
| 302 | -                            d.hash = required_digest.hash
 | |
| 303 | -                            d.size_bytes = required_digest.size_bytes
 | |
| 304 | - | |
| 305 | -                        response = remote.cas.FindMissingBlobs(request)
 | |
| 306 | -                        for digest in response.missing_blob_digests:
 | |
| 307 | -                            d = remote_execution_pb2.Digest()
 | |
| 308 | -                            d.hash = digest.hash
 | |
| 309 | -                            d.size_bytes = digest.size_bytes
 | |
| 310 | -                            missing_blobs[d.hash] = d
 | |
| 311 | - | |
| 312 | -                    # Upload any blobs missing on the server
 | |
| 313 | -                    skipped_remote = False
 | |
| 314 | -                    for digest in missing_blobs.values():
 | |
| 315 | -                        uuid_ = uuid.uuid4()
 | |
| 316 | -                        resource_name = '/'.join(['uploads', str(uuid_), 'blobs',
 | |
| 317 | -                                                  digest.hash, str(digest.size_bytes)])
 | |
| 318 | - | |
| 319 | -                        def request_stream():
 | |
| 320 | -                            with open(self.objpath(digest), 'rb') as f:
 | |
| 321 | -                                assert os.fstat(f.fileno()).st_size == digest.size_bytes
 | |
| 322 | -                                offset = 0
 | |
| 323 | -                                finished = False
 | |
| 324 | -                                remaining = digest.size_bytes
 | |
| 325 | -                                while not finished:
 | |
| 326 | -                                    chunk_size = min(remaining, 64 * 1024)
 | |
| 327 | -                                    remaining -= chunk_size
 | |
| 328 | - | |
| 329 | -                                    request = bytestream_pb2.WriteRequest()
 | |
| 330 | -                                    request.write_offset = offset
 | |
| 331 | -                                    # max. 64 kB chunks
 | |
| 332 | -                                    request.data = f.read(chunk_size)
 | |
| 333 | -                                    request.resource_name = resource_name
 | |
| 334 | -                                    request.finish_write = remaining <= 0
 | |
| 335 | -                                    yield request
 | |
| 336 | -                                    offset += chunk_size
 | |
| 337 | -                                    finished = request.finish_write
 | |
| 338 | -                        response = remote.bytestream.Write(request_stream())
 | |
| 339 | - | |
| 340 | -                    request = buildstream_pb2.UpdateReferenceRequest()
 | |
| 341 | -                    request.keys.append(ref)
 | |
| 342 | -                    request.digest.hash = tree.hash
 | |
| 343 | -                    request.digest.size_bytes = tree.size_bytes
 | |
| 344 | -                    remote.ref_storage.UpdateReference(request)
 | |
| 345 | - | |
| 346 | -                    pushed = True
 | |
| 347 | - | |
| 348 | -            except grpc.RpcError as e:
 | |
| 349 | -                if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
 | |
| 350 | -                    raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
 | |
| 351 | - | |
| 352 | -            if skipped_remote:
 | |
| 377 | +            if self._push_refs_to_remote(refs, remote, may_have_dependencies):
 | |
| 378 | +                pushed = True
 | |
| 379 | +            elif element:
 | |
| 353 | 380 |                  self.context.message(Message(
 | 
| 354 | 381 |                      None,
 | 
| 355 | 382 |                      MessageType.SKIPPED,
 | 
| 356 | 383 |                      "Remote ({}) already has {} cached".format(
 | 
| 357 | 384 |                          remote.spec.url, element._get_brief_display_key())
 | 
| 358 | 385 |                  ))
 | 
| 386 | + | |
| 359 | 387 |          return pushed
 | 
| 360 | 388 |  | 
| 389 | +    def verify_key_pushed(self, key, project):
 | |
| 390 | +        ref = key
 | |
| 391 | +        push_remotes = [r for r in self._remotes[project] if r.spec.push]
 | |
| 392 | + | |
| 393 | +        pushed = False
 | |
| 394 | + | |
| 395 | +        for remote in push_remotes:
 | |
| 396 | +            remote.init()
 | |
| 397 | + | |
| 398 | +            if self._verify_ref_on_remote(ref, remote):
 | |
| 399 | +                pushed = True
 | |
| 400 | + | |
| 401 | +        return pushed
 | |
| 402 | + | |
| 403 | +    def _verify_ref_on_remote(self, ref, remote):
 | |
| 404 | +        tree = self.resolve_ref(ref)
 | |
| 405 | + | |
| 406 | +        # Check whether ref is already on the server in which case
 | |
| 407 | +        # there is no need to push the artifact
 | |
| 408 | +        try:
 | |
| 409 | +            request = buildstream_pb2.GetReferenceRequest()
 | |
| 410 | +            request.key = ref
 | |
| 411 | +            response = remote.ref_storage.GetReference(request)
 | |
| 412 | + | |
| 413 | +            if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
 | |
| 414 | +                # ref is already on the server with the same tree
 | |
| 415 | +                return True
 | |
| 416 | + | |
| 417 | +        except grpc.RpcError as e:
 | |
| 418 | +            if e.code() != grpc.StatusCode.NOT_FOUND:
 | |
| 419 | +                raise
 | |
| 420 | + | |
| 421 | +        return False
 | |
| 422 | + | |
| 361 | 423 |      ################################################
 | 
| 362 | 424 |      #                API Private Methods           #
 | 
| 363 | 425 |      ################################################
 | 
| ... | ... | @@ -731,26 +793,27 @@ class CASCache(ArtifactCache): | 
| 731 | 793 |              #
 | 
| 732 | 794 |              q.put(str(e))
 | 
| 733 | 795 |  | 
| 734 | -    def _required_blobs(self, tree):
 | |
| 796 | +    def _required_blobs(self, tree, may_have_dependencies=True):
 | |
| 735 | 797 |          # parse directory, and recursively add blobs
 | 
| 736 | 798 |          d = remote_execution_pb2.Digest()
 | 
| 737 | 799 |          d.hash = tree.hash
 | 
| 738 | 800 |          d.size_bytes = tree.size_bytes
 | 
| 739 | 801 |          yield d
 | 
| 740 | 802 |  | 
| 741 | -        directory = remote_execution_pb2.Directory()
 | |
| 803 | +        if may_have_dependencies:
 | |
| 804 | +            directory = remote_execution_pb2.Directory()
 | |
| 742 | 805 |  | 
| 743 | -        with open(self.objpath(tree), 'rb') as f:
 | |
| 744 | -            directory.ParseFromString(f.read())
 | |
| 806 | +            with open(self.objpath(tree), 'rb') as f:
 | |
| 807 | +                directory.ParseFromString(f.read())
 | |
| 745 | 808 |  | 
| 746 | -        for filenode in directory.files:
 | |
| 747 | -            d = remote_execution_pb2.Digest()
 | |
| 748 | -            d.hash = filenode.digest.hash
 | |
| 749 | -            d.size_bytes = filenode.digest.size_bytes
 | |
| 750 | -            yield d
 | |
| 809 | +            for filenode in directory.files:
 | |
| 810 | +                d = remote_execution_pb2.Digest()
 | |
| 811 | +                d.hash = filenode.digest.hash
 | |
| 812 | +                d.size_bytes = filenode.digest.size_bytes
 | |
| 813 | +                yield d
 | |
| 751 | 814 |  | 
| 752 | -        for dirnode in directory.directories:
 | |
| 753 | -            yield from self._required_blobs(dirnode.digest)
 | |
| 815 | +            for dirnode in directory.directories:
 | |
| 816 | +                yield from self._required_blobs(dirnode.digest)
 | |
| 754 | 817 |  | 
| 755 | 818 |      def _fetch_blob(self, remote, digest, out):
 | 
| 756 | 819 |          resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
 | 
| ... | ... | @@ -71,7 +71,7 @@ class LoadElement(): | 
| 71 | 71 |              'kind', 'depends', 'sources', 'sandbox',
 | 
| 72 | 72 |              'variables', 'environment', 'environment-nocache',
 | 
| 73 | 73 |              'config', 'public', 'description',
 | 
| 74 | -            'build-depends', 'runtime-depends',
 | |
| 74 | +            'build-depends', 'runtime-depends', 'remote-execution'
 | |
| 75 | 75 |          ])
 | 
| 76 | 76 |  | 
| 77 | 77 |          # Extract the Dependencies
 | 
| ... | ... | @@ -446,6 +446,7 @@ class Loader(): | 
| 446 | 446 |                                     _yaml.node_get(node, list, Symbol.ENV_NOCACHE, default_value=[]),
 | 
| 447 | 447 |                                     _yaml.node_get(node, Mapping, Symbol.PUBLIC, default_value={}),
 | 
| 448 | 448 |                                     _yaml.node_get(node, Mapping, Symbol.SANDBOX, default_value={}),
 | 
| 449 | +                                   _yaml.node_get(node, Mapping, Symbol.REMOTE_EXECUTION, default_value={}),
 | |
| 449 | 450 |                                     element_kind == 'junction')
 | 
| 450 | 451 |  | 
| 451 | 452 |          # Cache it now, make sure it's already there before recursing
 | 
| ... | ... | @@ -39,7 +39,7 @@ class MetaElement(): | 
| 39 | 39 |      #    first_pass: The element is to be loaded with first pass configuration (junction)
 | 
| 40 | 40 |      #
 | 
| 41 | 41 |      def __init__(self, project, name, kind, provenance, sources, config,
 | 
| 42 | -                 variables, environment, env_nocache, public, sandbox,
 | |
| 42 | +                 variables, environment, env_nocache, public, sandbox, remote_execution,
 | |
| 43 | 43 |                   first_pass):
 | 
| 44 | 44 |          self.project = project
 | 
| 45 | 45 |          self.name = name
 | 
| ... | ... | @@ -52,6 +52,7 @@ class MetaElement(): | 
| 52 | 52 |          self.env_nocache = env_nocache
 | 
| 53 | 53 |          self.public = public
 | 
| 54 | 54 |          self.sandbox = sandbox
 | 
| 55 | +        self.remote_execution = remote_execution
 | |
| 55 | 56 |          self.build_dependencies = []
 | 
| 56 | 57 |          self.dependencies = []
 | 
| 57 | 58 |          self.first_pass = first_pass | 
| ... | ... | @@ -41,6 +41,7 @@ class Symbol(): | 
| 41 | 41 |      DIRECTORY = "directory"
 | 
| 42 | 42 |      JUNCTION = "junction"
 | 
| 43 | 43 |      SANDBOX = "sandbox"
 | 
| 44 | +    REMOTE_EXECUTION = "remote-execution"
 | |
| 44 | 45 |  | 
| 45 | 46 |  | 
| 46 | 47 |  # Dependency()
 | 
| ... | ... | @@ -129,6 +129,7 @@ class Project(): | 
| 129 | 129 |  | 
| 130 | 130 |          self.artifact_cache_specs = None
 | 
| 131 | 131 |          self._sandbox = None
 | 
| 132 | +        self._remote_execution = None
 | |
| 132 | 133 |          self._splits = None
 | 
| 133 | 134 |  | 
| 134 | 135 |          self._context.add_project(self)
 | 
| ... | ... | @@ -460,7 +461,7 @@ class Project(): | 
| 460 | 461 |              'aliases', 'name',
 | 
| 461 | 462 |              'artifacts', 'options',
 | 
| 462 | 463 |              'fail-on-overlap', 'shell', 'fatal-warnings',
 | 
| 463 | -            'ref-storage', 'sandbox', 'mirrors'
 | |
| 464 | +            'ref-storage', 'sandbox', 'mirrors', 'remote-execution'
 | |
| 464 | 465 |          ])
 | 
| 465 | 466 |  | 
| 466 | 467 |          #
 | 
| ... | ... | @@ -478,6 +479,9 @@ class Project(): | 
| 478 | 479 |          # Load sandbox configuration
 | 
| 479 | 480 |          self._sandbox = _yaml.node_get(config, Mapping, 'sandbox')
 | 
| 480 | 481 |  | 
| 482 | +        # Load remote execution configuration
 | |
| 483 | +        self._remote_execution = _yaml.node_get(config, Mapping, 'remote-execution')
 | |
| 484 | + | |
| 481 | 485 |          # Load project split rules
 | 
| 482 | 486 |          self._splits = _yaml.node_get(config, Mapping, 'split-rules')
 | 
| 483 | 487 |  | 
| ... | ... | @@ -155,6 +155,9 @@ class BuildElement(Element): | 
| 155 | 155 |              command_dir = build_root
 | 
| 156 | 156 |          sandbox.set_work_directory(command_dir)
 | 
| 157 | 157 |  | 
| 158 | +        # Tell sandbox which directory is preserved in the finished artifact
 | |
| 159 | +        sandbox.set_output_directory(install_root)
 | |
| 160 | + | |
| 158 | 161 |          # Setup environment
 | 
| 159 | 162 |          sandbox.set_environment(self.get_environment())
 | 
| 160 | 163 |  | 
| ... | ... | @@ -204,3 +204,6 @@ shell: | 
| 204 | 204 |    # Command to run when `bst shell` does not provide a command
 | 
| 205 | 205 |    #
 | 
| 206 | 206 |    command: [ 'sh', '-i' ]
 | 
| 207 | + | |
| 208 | +remote-execution:
 | |
| 209 | +  url: "" | |
| \ No newline at end of file | 
| ... | ... | @@ -95,6 +95,7 @@ from . import _site | 
| 95 | 95 |  from ._platform import Platform
 | 
| 96 | 96 |  from .plugin import CoreWarnings
 | 
| 97 | 97 |  from .sandbox._config import SandboxConfig
 | 
| 98 | +from .sandbox._sandboxremote import SandboxRemote
 | |
| 98 | 99 |  | 
| 99 | 100 |  from .storage.directory import Directory
 | 
| 100 | 101 |  from .storage._filebaseddirectory import FileBasedDirectory
 | 
| ... | ... | @@ -250,6 +251,9 @@ class Element(Plugin): | 
| 250 | 251 |          # Extract Sandbox config
 | 
| 251 | 252 |          self.__sandbox_config = self.__extract_sandbox_config(meta)
 | 
| 252 | 253 |  | 
| 254 | +        # Extract remote execution URL
 | |
| 255 | +        self.__remote_execution_url = self.__extract_remote_execution_config(meta)
 | |
| 256 | + | |
| 253 | 257 |      def __lt__(self, other):
 | 
| 254 | 258 |          return self.name < other.name
 | 
| 255 | 259 |  | 
| ... | ... | @@ -1545,6 +1549,8 @@ class Element(Plugin): | 
| 1545 | 1549 |                  finally:
 | 
| 1546 | 1550 |                      if collect is not None:
 | 
| 1547 | 1551 |                          try:
 | 
| 1552 | +                            # Sandbox will probably have replaced its virtual directory, so get it again
 | |
| 1553 | +                            sandbox_vroot = sandbox.get_virtual_directory()
 | |
| 1548 | 1554 |                              collectvdir = sandbox_vroot.descend(collect.lstrip(os.sep).split(os.sep))
 | 
| 1549 | 1555 |                          except VirtualDirectoryError:
 | 
| 1550 | 1556 |                              # No collect directory existed
 | 
| ... | ... | @@ -2117,7 +2123,24 @@ class Element(Plugin): | 
| 2117 | 2123 |          project = self._get_project()
 | 
| 2118 | 2124 |          platform = Platform.get_platform()
 | 
| 2119 | 2125 |  | 
| 2120 | -        if directory is not None and os.path.exists(directory):
 | |
| 2126 | +        if self.__remote_execution_url and self.BST_VIRTUAL_DIRECTORY:
 | |
| 2127 | +            if not self.__artifacts.has_push_remotes(element=self):
 | |
| 2128 | +                # Give an early warning if remote execution will not work
 | |
| 2129 | +                raise ElementError("Artifact {} is configured to use remote execution but has no push remotes. "
 | |
| 2130 | +                                   .format(self.name) +
 | |
| 2131 | +                                   "The remote artifact server(s) may not be correctly configured or contactable.")
 | |
| 2132 | + | |
| 2133 | +            self.info("Using a remote 'sandbox' for artifact {}".format(self.name))
 | |
| 2134 | +            sandbox = SandboxRemote(context, project,
 | |
| 2135 | +                                              directory,
 | |
| 2136 | +                                              stdout=stdout,
 | |
| 2137 | +                                              stderr=stderr,
 | |
| 2138 | +                                              config=config,
 | |
| 2139 | +                                              server_url=self.__remote_execution_url,
 | |
| 2140 | +                                              allow_real_directory=False)
 | |
| 2141 | +            yield sandbox
 | |
| 2142 | +        elif directory is not None and os.path.exists(directory):
 | |
| 2143 | +            self.info("Using a local sandbox for artifact {}".format(self.name))
 | |
| 2121 | 2144 |              sandbox = platform.create_sandbox(context, project,
 | 
| 2122 | 2145 |                                                directory,
 | 
| 2123 | 2146 |                                                stdout=stdout,
 | 
| ... | ... | @@ -2289,6 +2312,24 @@ class Element(Plugin): | 
| 2289 | 2312 |          return SandboxConfig(self.node_get_member(sandbox_config, int, 'build-uid'),
 | 
| 2290 | 2313 |                               self.node_get_member(sandbox_config, int, 'build-gid'))
 | 
| 2291 | 2314 |  | 
| 2315 | +    def __extract_remote_execution_config(self, meta):
 | |
| 2316 | +        project = self._get_project()
 | |
| 2317 | +        project.ensure_fully_loaded()
 | |
| 2318 | +        rexec_config = _yaml.node_chain_copy(project._remote_execution)
 | |
| 2319 | + | |
| 2320 | +        # The default config is already composited with the project overrides
 | |
| 2321 | +        rexec_defaults = _yaml.node_get(self.__defaults, Mapping, 'remote-execution', default_value={})
 | |
| 2322 | +        rexec_defaults = _yaml.node_chain_copy(rexec_defaults)
 | |
| 2323 | + | |
| 2324 | +        _yaml.composite(rexec_config, rexec_defaults)
 | |
| 2325 | +        _yaml.composite(rexec_config, meta.remote_execution)
 | |
| 2326 | +        _yaml.node_final_assertions(rexec_config)
 | |
| 2327 | + | |
| 2328 | +        # Rexec config, unlike others, has fixed members so we should validate them
 | |
| 2329 | +        _yaml.node_validate(rexec_config, ['url'])
 | |
| 2330 | + | |
| 2331 | +        return self.node_get_member(rexec_config, str, 'url')
 | |
| 2332 | + | |
| 2292 | 2333 |      # This makes a special exception for the split rules, which
 | 
| 2293 | 2334 |      # elements may extend but whos defaults are defined in the project.
 | 
| 2294 | 2335 |      #
 | 
| ... | ... | @@ -57,7 +57,7 @@ from buildstream import BuildElement | 
| 57 | 57 |  | 
| 58 | 58 |  # Element implementation for the 'autotools' kind.
 | 
| 59 | 59 |  class AutotoolsElement(BuildElement):
 | 
| 60 | -    pass
 | |
| 60 | +    BST_VIRTUAL_DIRECTORY = True
 | |
| 61 | 61 |  | 
| 62 | 62 |  | 
| 63 | 63 |  # Plugin entry point
 | 
| ... | ... | @@ -20,3 +20,4 @@ | 
| 20 | 20 |  from .sandbox import Sandbox, SandboxFlags
 | 
| 21 | 21 |  from ._sandboxchroot import SandboxChroot
 | 
| 22 | 22 |  from ._sandboxbwrap import SandboxBwrap
 | 
| 23 | +from ._sandboxremote import SandboxRemote | 
| 1 | +#!/usr/bin/env python3
 | |
| 2 | +#
 | |
| 3 | +#  Copyright (C) 2018 Codethink Limited
 | |
| 4 | +#
 | |
| 5 | +#  This program is free software; you can redistribute it and/or
 | |
| 6 | +#  modify it under the terms of the GNU Lesser General Public
 | |
| 7 | +#  License as published by the Free Software Foundation; either
 | |
| 8 | +#  version 2 of the License, or (at your option) any later version.
 | |
| 9 | +#
 | |
| 10 | +#  This library is distributed in the hope that it will be useful,
 | |
| 11 | +#  but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
| 12 | +#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
 | |
| 13 | +#  Lesser General Public License for more details.
 | |
| 14 | +#
 | |
| 15 | +#  You should have received a copy of the GNU Lesser General Public
 | |
| 16 | +#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
 | |
| 17 | +#
 | |
| 18 | +#  Authors:
 | |
| 19 | +#        Jim MacArthur <jim macarthur codethink co uk>
 | |
| 20 | + | |
| 21 | +import os
 | |
| 22 | +import re
 | |
| 23 | +import time
 | |
| 24 | + | |
| 25 | +import grpc
 | |
| 26 | + | |
| 27 | +from . import Sandbox
 | |
| 28 | +from ..storage._filebaseddirectory import FileBasedDirectory
 | |
| 29 | +from ..storage._casbaseddirectory import CasBasedDirectory
 | |
| 30 | +from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
 | |
| 31 | +from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc
 | |
| 32 | + | |
| 33 | +from .._artifactcache.cascache import CASCache
 | |
| 34 | + | |
| 35 | + | |
| 36 | +class SandboxError(Exception):
 | |
| 37 | +    pass
 | |
| 38 | + | |
| 39 | + | |
| 40 | +# SandboxRemote()
 | |
| 41 | +#
 | |
| 42 | +# This isn't really a sandbox, it's a stub which sends all the source
 | |
| 43 | +# to a remote server and retrieves the results from it.
 | |
| 44 | +#
 | |
| 45 | +class SandboxRemote(Sandbox):
 | |
| 46 | + | |
| 47 | +    def __init__(self, *args, **kwargs):
 | |
| 48 | +        super().__init__(*args, **kwargs)
 | |
| 49 | +        self.cascache = None
 | |
| 50 | +        self.server_url = kwargs['server_url']
 | |
| 51 | +        # Check the format of the url ourselves to save the user from
 | |
| 52 | +        # whatever error messages grpc will produce
 | |
| 53 | +        m = re.match('^(.+):(\d+)$', self.server_url)
 | |
| 54 | +        if m is None:
 | |
| 55 | +            raise SandboxError("Configured remote URL '{}' does not match the expected layout. " +
 | |
| 56 | +                               "It should be of the form <protocol>://<domain name>:<port>."
 | |
| 57 | +                               .format(self.server_url))
 | |
| 58 | + | |
| 59 | + | |
| 60 | +    def _get_cascache(self):
 | |
| 61 | +        if self.cascache is None:
 | |
| 62 | +            self.cascache = CASCache(self._get_context())
 | |
| 63 | +            self.cascache.setup_remotes(use_config=True)
 | |
| 64 | +        return self.cascache
 | |
| 65 | + | |
| 66 | +    def __run_remote_command(self, cascache, command, input_root_digest, environment):
 | |
| 67 | + | |
| 68 | +        environment_variables = [ remote_execution_pb2.Command.
 | |
| 69 | +                                  EnvironmentVariable(name=k, value=v)
 | |
| 70 | +                                  for (k,v) in environment.items() ]
 | |
| 71 | + | |
| 72 | +        # Create and send the Command object.
 | |
| 73 | +        remote_command = remote_execution_pb2.Command(arguments=command, environment_variables=environment_variables,
 | |
| 74 | +                                                      output_files=[],
 | |
| 75 | +                                                      output_directories=[self._output_directory],
 | |
| 76 | +                                                      platform=None)
 | |
| 77 | +        command_digest = cascache.add_object(buffer=remote_command.SerializeToString())
 | |
| 78 | +        command_ref = 'worker-command/{}'.format(command_digest.hash)
 | |
| 79 | +        cascache.set_ref(command_ref, command_digest)
 | |
| 80 | + | |
| 81 | +        command_push_successful = cascache.push_refs([command_ref], self._get_project(), may_have_dependencies=False)
 | |
| 82 | +        if not command_push_successful and not cascache.verify_key_pushed(command_ref, self._get_project()):
 | |
| 83 | +            # Command push failed
 | |
| 84 | +            return None
 | |
| 85 | + | |
| 86 | +        # Create and send the action.
 | |
| 87 | + | |
| 88 | +        action = remote_execution_pb2.Action(command_digest=command_digest,
 | |
| 89 | +                                             input_root_digest=input_root_digest,
 | |
| 90 | +                                             timeout=None,
 | |
| 91 | +                                             do_not_cache=True)
 | |
| 92 | + | |
| 93 | +        action_digest = cascache.add_object(buffer=action.SerializeToString())
 | |
| 94 | +        action_ref = 'worker-action/{}'.format(command_digest.hash)
 | |
| 95 | +        cascache.set_ref(action_ref, action_digest)
 | |
| 96 | +        action_push_successful = cascache.push_refs([action_ref], self._get_project(), may_have_dependencies=False)
 | |
| 97 | + | |
| 98 | +        if not action_push_successful and not cascache.verify_key_pushed(action_ref, self._get_project()):
 | |
| 99 | +            # Action push failed
 | |
| 100 | +            return None
 | |
| 101 | + | |
| 102 | +        # Next, try to create a communication channel to the BuildGrid server.
 | |
| 103 | + | |
| 104 | +        channel = grpc.insecure_channel(self.server_url)
 | |
| 105 | +        stub = remote_execution_pb2_grpc.ExecutionStub(channel)
 | |
| 106 | +        request = remote_execution_pb2.ExecuteRequest(instance_name='default',
 | |
| 107 | +                                                      action_digest=action_digest,
 | |
| 108 | +                                                      skip_cache_lookup=True)
 | |
| 109 | + | |
| 110 | + | |
| 111 | +        operation_iterator = stub.Execute(request)
 | |
| 112 | +        with self._get_context().timed_activity("Waiting for the remote build to complete"):
 | |
| 113 | +            # It is advantageous to check operation_iterator.code() is grpc.StatusCode.OK here,
 | |
| 114 | +            # which will check the server is actually contactable. However, calling it when the
 | |
| 115 | +            # server is available seems to cause .code() to hang forever.
 | |
| 116 | +            for operation in operation_iterator:
 | |
| 117 | +                if operation.done:
 | |
| 118 | +                    break
 | |
| 119 | +        return operation
 | |
| 120 | + | |
| 121 | +    def process_job_output(self, output_directories, output_files):
 | |
| 122 | +        # output_directories is an array of OutputDirectory objects.
 | |
| 123 | +        # output_files is an array of OutputFile objects.
 | |
| 124 | +        #
 | |
| 125 | +        # We only specify one output_directory, so it's an error
 | |
| 126 | +        # for there to be any output files or more than one directory at the moment.
 | |
| 127 | + | |
| 128 | +        if output_files:
 | |
| 129 | +            raise SandboxError("Output files were returned when we didn't request any.")
 | |
| 130 | +        elif len(output_directories) > 1:
 | |
| 131 | +            error_text = "More than one output directory was returned from the build server: {}"
 | |
| 132 | +            raise SandboxError(error_text.format(output_directories))
 | |
| 133 | +        elif len(output_directories) < 1:
 | |
| 134 | +            error_text = "No output directory was returned from the build server."
 | |
| 135 | +            raise SandboxError(error_test)
 | |
| 136 | + | |
| 137 | +        digest = output_directories[0].tree_digest
 | |
| 138 | +        if digest is None or digest.hash is None or digest.hash == "":
 | |
| 139 | +            raise SandboxError("Output directory structure had no digest attached.")
 | |
| 140 | + | |
| 141 | +        # Now do a pull to ensure we have the necessary parts.
 | |
| 142 | +        cascache = self._get_cascache()
 | |
| 143 | +        cascache.pull_key(digest.hash, digest.size_bytes, self._get_project())
 | |
| 144 | +        path_components = os.path.split(self._output_directory)
 | |
| 145 | + | |
| 146 | +        # Now what we have is a digest for the output. Once we return, the calling process will
 | |
| 147 | +        # attempt to descend into our directory and find that directory, so we need to overwrite
 | |
| 148 | +        # that.
 | |
| 149 | + | |
| 150 | +        if not path_components:
 | |
| 151 | +            # The artifact wants the whole directory; we could just return the returned hash in its
 | |
| 152 | +            # place, but we don't have a means to do that yet.
 | |
| 153 | +            raise SandboxError("Unimplemented: Output directory is empty or equal to the sandbox root.")
 | |
| 154 | + | |
| 155 | +        # At the moment, we will get the whole directory back in the first directory argument and we need
 | |
| 156 | +        # to replace the sandbox's virtual directory with that. Creating a new virtual directory object
 | |
| 157 | +        # from another hash will be interesting, though...
 | |
| 158 | + | |
| 159 | +        new_dir = CasBasedDirectory(self._get_context(), ref=digest)
 | |
| 160 | +        self._set_virtual_directory(new_dir)
 | |
| 161 | + | |
| 162 | +    def run(self, command, flags, *, cwd=None, env=None):
 | |
| 163 | +        # Upload sources
 | |
| 164 | +        upload_vdir = self.get_virtual_directory()
 | |
| 165 | + | |
| 166 | +        if isinstance(upload_vdir, FileBasedDirectory):
 | |
| 167 | +            # Make a new temporary directory to put source in
 | |
| 168 | +            upload_vdir = CasBasedDirectory(self._get_context(), ref=None)
 | |
| 169 | +            upload_vdir.import_files(self.get_virtual_directory()._get_underlying_directory())
 | |
| 170 | + | |
| 171 | + | |
| 172 | +        # Now, push that key (without necessarily needing a ref) to the remote.
 | |
| 173 | +        cascache = self._get_cascache()
 | |
| 174 | + | |
| 175 | +        ref = 'worker-source/{}'.format(upload_vdir.ref.hash)
 | |
| 176 | +        upload_vdir._save(ref)
 | |
| 177 | +        source_push_successful = cascache.push_refs([ref], self._get_project())
 | |
| 178 | + | |
| 179 | +        # Set up environment and PWD
 | |
| 180 | +        if env is None:
 | |
| 181 | +            env = self._get_environment()
 | |
| 182 | +        if 'PWD' not in env:
 | |
| 183 | +            env['PWD'] = self._get_work_directory()
 | |
| 184 | + | |
| 185 | +        # We want command args as a list of strings
 | |
| 186 | +        if isinstance(command, str):
 | |
| 187 | +            command = [command]
 | |
| 188 | + | |
| 189 | +        # Now transmit the command to execute
 | |
| 190 | +        if source_push_successful or cascache.verify_key_pushed(ref, self._get_project()):
 | |
| 191 | +            response = self.__run_remote_command(cascache, command, upload_vdir.ref, env)
 | |
| 192 | + | |
| 193 | +            if response is None:
 | |
| 194 | +                # Failure of remote execution, usually due to an error in BuildStream
 | |
| 195 | +                # NB This error could be raised in __run_remote_command
 | |
| 196 | +                raise SandboxError("No response returned from server")
 | |
| 197 | + | |
| 198 | +            assert(response.HasField("error") or response.HasField("response"))
 | |
| 199 | + | |
| 200 | +            if response.HasField("error"):
 | |
| 201 | +                # A normal error during the build
 | |
| 202 | +                error_message = response.error.message
 | |
| 203 | +                # response.error also contains 'details' (iterator of Any) which we ignore at the moment.
 | |
| 204 | +                return response.error.code
 | |
| 205 | +            else:
 | |
| 206 | + | |
| 207 | +                # At the moment, response can either be an
 | |
| 208 | +                # ExecutionResponse containing an ActionResult, or an
 | |
| 209 | +                # ActionResult directly.
 | |
| 210 | +                executeResponse = remote_execution_pb2.ExecuteResponse()
 | |
| 211 | +                if response.response.Is(executeResponse.DESCRIPTOR):
 | |
| 212 | +                    # Unpack ExecuteResponse and set response to its response
 | |
| 213 | +                    response.response.Unpack(executeResponse)
 | |
| 214 | +                    response = executeResponse
 | |
| 215 | + | |
| 216 | +                actionResult = remote_execution_pb2.ActionResult()
 | |
| 217 | +                if response.response.Is(actionResult.DESCRIPTOR):
 | |
| 218 | +                    response.response.Unpack(actionResult)
 | |
| 219 | +                    self.process_job_output(actionResult.output_directories, actionResult.output_files)
 | |
| 220 | +                else:
 | |
| 221 | +                    raise SandboxError("Received unknown message from server (expected ExecutionResponse).")
 | |
| 222 | +        else:
 | |
| 223 | +            raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
 | |
| 224 | +        return 0 | 
| ... | ... | @@ -99,9 +99,11 @@ class Sandbox(): | 
| 99 | 99 |          self.__stdout = kwargs['stdout']
 | 
| 100 | 100 |          self.__stderr = kwargs['stderr']
 | 
| 101 | 101 |  | 
| 102 | -        # Setup the directories. Root should be available to subclasses, hence
 | |
| 103 | -        # being single-underscore. The others are private to this class.
 | |
| 102 | +        # Setup the directories. Root and output_directory should be
 | |
| 103 | +        # available to subclasses, hence being single-underscore. The
 | |
| 104 | +        # others are private to this class.
 | |
| 104 | 105 |          self._root = os.path.join(directory, 'root')
 | 
| 106 | +        self._output_directory = None
 | |
| 105 | 107 |          self.__directory = directory
 | 
| 106 | 108 |          self.__scratch = os.path.join(self.__directory, 'scratch')
 | 
| 107 | 109 |          for directory_ in [self._root, self.__scratch]:
 | 
| ... | ... | @@ -144,11 +146,30 @@ class Sandbox(): | 
| 144 | 146 |                  self._vdir = FileBasedDirectory(self._root)
 | 
| 145 | 147 |          return self._vdir
 | 
| 146 | 148 |  | 
| 149 | +    def _set_virtual_directory(self, vdir):
 | |
| 150 | +        """ Sets virtual directory. Useful after remote execution
 | |
| 151 | +        has rewritten the working directory.
 | |
| 152 | +        """
 | |
| 153 | +        self._vdir = vdir
 | |
| 154 | + | |
| 155 | +    def get_virtual_toplevel_directory(self):
 | |
| 156 | +        """Fetches the sandbox's toplevel directory
 | |
| 157 | + | |
| 158 | +        The toplevel directory contains 'root', 'scratch' and later
 | |
| 159 | +        'artifact' where output is copied to.
 | |
| 160 | + | |
| 161 | +        Returns:
 | |
| 162 | +           (str): The sandbox toplevel directory
 | |
| 163 | + | |
| 164 | +        """
 | |
| 165 | +        # For now, just create a new Directory every time we're asked
 | |
| 166 | +        return FileBasedDirectory(self.__directory)
 | |
| 167 | + | |
| 147 | 168 |      def set_environment(self, environment):
 | 
| 148 | 169 |          """Sets the environment variables for the sandbox
 | 
| 149 | 170 |  | 
| 150 | 171 |          Args:
 | 
| 151 | -           directory (dict): The environment variables to use in the sandbox
 | |
| 172 | +           environment (dict): The environment variables to use in the sandbox
 | |
| 152 | 173 |          """
 | 
| 153 | 174 |          self.__env = environment
 | 
| 154 | 175 |  | 
| ... | ... | @@ -160,6 +181,15 @@ class Sandbox(): | 
| 160 | 181 |          """
 | 
| 161 | 182 |          self.__cwd = directory
 | 
| 162 | 183 |  | 
| 184 | +    def set_output_directory(self, directory):
 | |
| 185 | +        """Sets the output directory - the directory which is preserved
 | |
| 186 | +        as an artifact after assembly.
 | |
| 187 | + | |
| 188 | +        Args:
 | |
| 189 | +           directory (str): An absolute path within the sandbox
 | |
| 190 | +        """
 | |
| 191 | +        self._output_directory = directory
 | |
| 192 | + | |
| 163 | 193 |      def mark_directory(self, directory, *, artifact=False):
 | 
| 164 | 194 |          """Marks a sandbox directory and ensures it will exist
 | 
| 165 | 195 |  | 
| ... | ... | @@ -358,6 +358,20 @@ class CasBasedDirectory(Directory): | 
| 358 | 358 |                      result.files_written.append(relative_pathname)
 | 
| 359 | 359 |          return result
 | 
| 360 | 360 |  | 
| 361 | +    def _save(self, name):
 | |
| 362 | +        """ Saves this directory into the content cache as a named ref. This function is not
 | |
| 363 | +        currently in use, but may be useful later. """
 | |
| 364 | +        self._recalculate_recursing_up()
 | |
| 365 | +        self._recalculate_recursing_down()
 | |
| 366 | +        (rel_refpath, refname) = os.path.split(name)
 | |
| 367 | +        refdir = os.path.join(self.cas_directory, 'refs', 'heads', rel_refpath)
 | |
| 368 | +        refname = os.path.join(refdir, refname)
 | |
| 369 | + | |
| 370 | +        if not os.path.exists(refdir):
 | |
| 371 | +            os.makedirs(refdir)
 | |
| 372 | +        with open(refname, "wb") as f:
 | |
| 373 | +            f.write(self.ref.SerializeToString())
 | |
| 374 | + | |
| 361 | 375 |      def import_files(self, external_pathspec, *, files=None,
 | 
| 362 | 376 |                       report_written=True, update_utimes=False,
 | 
| 363 | 377 |                       can_link=False):
 | 
| ... | ... | @@ -11,7 +11,7 @@ test=pytest | 
| 11 | 11 |  | 
| 12 | 12 |  [tool:pytest]
 | 
| 13 | 13 |  addopts = --verbose --basetemp ./tmp --pep8 --pylint --pylint-rcfile=.pylintrc --cov=buildstream --cov-config .coveragerc
 | 
| 14 | -norecursedirs = integration-cache tmp __pycache__ .eggs
 | |
| 14 | +norecursedirs = tests/integration/project integration-cache tmp __pycache__ .eggs
 | |
| 15 | 15 |  python_files = tests/*/*.py
 | 
| 16 | 16 |  pep8maxlinelength = 119
 | 
| 17 | 17 |  pep8ignore =
 | 
