[Notes] [Git][BuildStream/buildstream][juerg/sandbox] 9 commits: _platform: Fix get_cpu_count() with cap=None



Title: GitLab

Jürg Billeter pushed to branch juerg/sandbox at BuildStream / buildstream

Commits:

6 changed files:

Changes:

  • .gitlab-ci.yml
    ... ... @@ -161,14 +161,14 @@ docs:
    161 161
     .overnight-tests: &overnight-tests-template
    
    162 162
       stage: test
    
    163 163
       variables:
    
    164
    -    bst_ext_url: git+https://gitlab.com/BuildStream/bst-external.git
    
    165
    -    bst_ext_ref: 1d6ab71151b93c8cbc0a91a36ffe9270f3b835f1 # 0.5.1
    
    166
    -    fd_sdk_ref: 88d7c22c2281b987faa02edd57df80d430eecf1f # 18.08.12
    
    164
    +    BST_EXT_URL: git+https://gitlab.com/BuildStream/bst-external.git
    
    165
    +    BST_EXT_REF: 1d6ab71151b93c8cbc0a91a36ffe9270f3b835f1 # 0.5.1
    
    166
    +    FD_SDK_REF: 88d7c22c2281b987faa02edd57df80d430eecf1f # 18.08.11-35-g88d7c22c
    
    167 167
       before_script:
    
    168 168
       - (cd dist && ./unpack.sh && cd buildstream && pip3 install .)
    
    169
    -  - pip3 install --user -e ${bst_ext_url}@${bst_ext_ref}#egg=bst_ext
    
    169
    +  - pip3 install --user -e ${BST_EXT_URL}@${BST_EXT_REF}#egg=bst_ext
    
    170 170
       - git clone https://gitlab.com/freedesktop-sdk/freedesktop-sdk.git
    
    171
    -  - git -C freedesktop-sdk checkout ${fd_sdk_ref}
    
    171
    +  - git -C freedesktop-sdk checkout ${FD_SDK_REF}
    
    172 172
       only:
    
    173 173
       - schedules
    
    174 174
     
    

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -1048,10 +1048,29 @@ class CASCache(ArtifactCache):
    1048 1048
                     missing_blobs[d.hash] = d
    
    1049 1049
     
    
    1050 1050
             # Upload any blobs missing on the server
    
    1051
    -        for blob_digest in missing_blobs.values():
    
    1052
    -            with open(self.objpath(blob_digest), 'rb') as f:
    
    1053
    -                assert os.fstat(f.fileno()).st_size == blob_digest.size_bytes
    
    1054
    -                self._send_blob(remote, blob_digest, f, u_uid=u_uid)
    
    1051
    +        self._send_blobs(remote, missing_blobs.values(), u_uid)
    
    1052
    +
    
    1053
    +    def _send_blobs(self, remote, digests, u_uid=uuid.uuid4()):
    
    1054
    +        batch = _CASBatchUpdate(remote)
    
    1055
    +
    
    1056
    +        for digest in digests:
    
    1057
    +            with open(self.objpath(digest), 'rb') as f:
    
    1058
    +                assert os.fstat(f.fileno()).st_size == digest.size_bytes
    
    1059
    +
    
    1060
    +                if (digest.size_bytes >= remote.max_batch_total_size_bytes or
    
    1061
    +                        not remote.batch_update_supported):
    
    1062
    +                    # Too large for batch request, upload in independent request.
    
    1063
    +                    self._send_blob(remote, digest, f, u_uid=u_uid)
    
    1064
    +                else:
    
    1065
    +                    if not batch.add(digest, f):
    
    1066
    +                        # Not enough space left in batch request.
    
    1067
    +                        # Complete pending batch first.
    
    1068
    +                        batch.send()
    
    1069
    +                        batch = _CASBatchUpdate(remote)
    
    1070
    +                        batch.add(digest, f)
    
    1071
    +
    
    1072
    +        # Send final batch
    
    1073
    +        batch.send()
    
    1055 1074
     
    
    1056 1075
     
    
    1057 1076
     # Represents a single remote CAS cache.
    
    ... ... @@ -1126,6 +1145,17 @@ class _CASRemote():
    1126 1145
                     if e.code() != grpc.StatusCode.UNIMPLEMENTED:
    
    1127 1146
                         raise
    
    1128 1147
     
    
    1148
    +            # Check whether the server supports BatchUpdateBlobs()
    
    1149
    +            self.batch_update_supported = False
    
    1150
    +            try:
    
    1151
    +                request = remote_execution_pb2.BatchUpdateBlobsRequest()
    
    1152
    +                response = self.cas.BatchUpdateBlobs(request)
    
    1153
    +                self.batch_update_supported = True
    
    1154
    +            except grpc.RpcError as e:
    
    1155
    +                if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
    
    1156
    +                        e.code() != grpc.StatusCode.PERMISSION_DENIED):
    
    1157
    +                    raise
    
    1158
    +
    
    1129 1159
                 self._initialized = True
    
    1130 1160
     
    
    1131 1161
     
    
    ... ... @@ -1173,6 +1203,46 @@ class _CASBatchRead():
    1173 1203
                 yield (response.digest, response.data)
    
    1174 1204
     
    
    1175 1205
     
    
    1206
    +# Represents a batch of blobs queued for upload.
    
    1207
    +#
    
    1208
    +class _CASBatchUpdate():
    
    1209
    +    def __init__(self, remote):
    
    1210
    +        self._remote = remote
    
    1211
    +        self._max_total_size_bytes = remote.max_batch_total_size_bytes
    
    1212
    +        self._request = remote_execution_pb2.BatchUpdateBlobsRequest()
    
    1213
    +        self._size = 0
    
    1214
    +        self._sent = False
    
    1215
    +
    
    1216
    +    def add(self, digest, stream):
    
    1217
    +        assert not self._sent
    
    1218
    +
    
    1219
    +        new_batch_size = self._size + digest.size_bytes
    
    1220
    +        if new_batch_size > self._max_total_size_bytes:
    
    1221
    +            # Not enough space left in current batch
    
    1222
    +            return False
    
    1223
    +
    
    1224
    +        blob_request = self._request.requests.add()
    
    1225
    +        blob_request.digest.hash = digest.hash
    
    1226
    +        blob_request.digest.size_bytes = digest.size_bytes
    
    1227
    +        blob_request.data = stream.read(digest.size_bytes)
    
    1228
    +        self._size = new_batch_size
    
    1229
    +        return True
    
    1230
    +
    
    1231
    +    def send(self):
    
    1232
    +        assert not self._sent
    
    1233
    +        self._sent = True
    
    1234
    +
    
    1235
    +        if len(self._request.requests) == 0:
    
    1236
    +            return
    
    1237
    +
    
    1238
    +        batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
    
    1239
    +
    
    1240
    +        for response in batch_response.responses:
    
    1241
    +            if response.status.code != grpc.StatusCode.OK.value[0]:
    
    1242
    +                raise ArtifactError("Failed to upload blob {}: {}".format(
    
    1243
    +                    response.digest.hash, response.status.code))
    
    1244
    +
    
    1245
    +
    
    1176 1246
     def _grouper(iterable, n):
    
    1177 1247
         while True:
    
    1178 1248
             try:
    

  • buildstream/_artifactcache/casserver.py
    ... ... @@ -68,7 +68,7 @@ def create_server(repo, *, enable_push):
    68 68
             _ByteStreamServicer(artifactcache, enable_push=enable_push), server)
    
    69 69
     
    
    70 70
         remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
    
    71
    -        _ContentAddressableStorageServicer(artifactcache), server)
    
    71
    +        _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
    
    72 72
     
    
    73 73
         remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
    
    74 74
             _CapabilitiesServicer(), server)
    
    ... ... @@ -222,9 +222,10 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    222 222
     
    
    223 223
     
    
    224 224
     class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
    
    225
    -    def __init__(self, cas):
    
    225
    +    def __init__(self, cas, *, enable_push):
    
    226 226
             super().__init__()
    
    227 227
             self.cas = cas
    
    228
    +        self.enable_push = enable_push
    
    228 229
     
    
    229 230
         def FindMissingBlobs(self, request, context):
    
    230 231
             response = remote_execution_pb2.FindMissingBlobsResponse()
    
    ... ... @@ -260,6 +261,46 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
    260 261
     
    
    261 262
             return response
    
    262 263
     
    
    264
    +    def BatchUpdateBlobs(self, request, context):
    
    265
    +        response = remote_execution_pb2.BatchUpdateBlobsResponse()
    
    266
    +
    
    267
    +        if not self.enable_push:
    
    268
    +            context.set_code(grpc.StatusCode.PERMISSION_DENIED)
    
    269
    +            return response
    
    270
    +
    
    271
    +        batch_size = 0
    
    272
    +
    
    273
    +        for blob_request in request.requests:
    
    274
    +            digest = blob_request.digest
    
    275
    +
    
    276
    +            batch_size += digest.size_bytes
    
    277
    +            if batch_size > _MAX_PAYLOAD_BYTES:
    
    278
    +                context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    279
    +                return response
    
    280
    +
    
    281
    +            blob_response = response.responses.add()
    
    282
    +            blob_response.digest.hash = digest.hash
    
    283
    +            blob_response.digest.size_bytes = digest.size_bytes
    
    284
    +
    
    285
    +            if len(blob_request.data) != digest.size_bytes:
    
    286
    +                blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
    
    287
    +                continue
    
    288
    +
    
    289
    +            try:
    
    290
    +                _clean_up_cache(self.cas, digest.size_bytes)
    
    291
    +
    
    292
    +                with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
    
    293
    +                    out.write(blob_request.data)
    
    294
    +                    out.flush()
    
    295
    +                    server_digest = self.cas.add_object(path=out.name)
    
    296
    +                    if server_digest.hash != digest.hash:
    
    297
    +                        blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
    
    298
    +
    
    299
    +            except ArtifactTooLargeException:
    
    300
    +                blob_response.status.code = grpc.StatusCode.RESOURCE_EXHAUSTED
    
    301
    +
    
    302
    +        return response
    
    303
    +
    
    263 304
     
    
    264 305
     class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer):
    
    265 306
         def GetCapabilities(self, request, context):
    

  • buildstream/_platform/darwin.py
    ... ... @@ -41,10 +41,11 @@ class Darwin(Platform):
    41 41
             return True
    
    42 42
     
    
    43 43
         def get_cpu_count(self, cap=None):
    
    44
    -        if cap < os.cpu_count():
    
    45
    -            return cap
    
    44
    +        cpu_count = os.cpu_count()
    
    45
    +        if cap is None:
    
    46
    +            return cpu_count
    
    46 47
             else:
    
    47
    -            return os.cpu_count()
    
    48
    +            return min(cpu_count, cap)
    
    48 49
     
    
    49 50
         def set_resource_limits(self, soft_limit=OPEN_MAX, hard_limit=None):
    
    50 51
             super().set_resource_limits(soft_limit)

  • buildstream/_platform/platform.py
    ... ... @@ -67,7 +67,11 @@ class Platform():
    67 67
             return cls._instance
    
    68 68
     
    
    69 69
         def get_cpu_count(self, cap=None):
    
    70
    -        return min(len(os.sched_getaffinity(0)), cap)
    
    70
    +        cpu_count = len(os.sched_getaffinity(0))
    
    71
    +        if cap is None:
    
    72
    +            return cpu_count
    
    73
    +        else:
    
    74
    +            return min(cpu_count, cap)
    
    71 75
     
    
    72 76
         ##################################################################
    
    73 77
         #                        Sandbox functions                       #
    

  • buildstream/sandbox/_sandboxremote.py
    ... ... @@ -177,15 +177,11 @@ class SandboxRemote(Sandbox):
    177 177
             if not cascache.verify_digest_pushed(self._get_project(), upload_vdir.ref):
    
    178 178
                 raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
    
    179 179
     
    
    180
    -        # Set up environment and working directory
    
    181
    -        if cwd is None:
    
    182
    -            cwd = self._get_work_directory()
    
    183
    -
    
    184
    -        if cwd is None:
    
    185
    -            cwd = '/'
    
    186
    -
    
    187
    -        if env is None:
    
    188
    -            env = self._get_environment()
    
    180
    +        # Fallback to the sandbox default settings for
    
    181
    +        # the cwd and env.
    
    182
    +        #
    
    183
    +        cwd = self._get_work_directory(cwd=cwd)
    
    184
    +        env = self._get_environment(cwd=cwd, env=env)
    
    189 185
     
    
    190 186
             # We want command args as a list of strings
    
    191 187
             if isinstance(command, str):
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]