[Notes] [Git][BuildStream/buildstream][chandan/bst-docker-import] 25 commits: _artifactcache/casserver.py: Implement BatchUpdateBlobs



Title: GitLab

Chandan Singh pushed to branch chandan/bst-docker-import at BuildStream / buildstream

Commits:

14 changed files:

Changes:

  • .gitlab-ci.yml
    ... ... @@ -145,7 +145,8 @@ docs:
    145 145
       stage: test
    
    146 146
       script:
    
    147 147
       - export BST_SOURCE_CACHE="$(pwd)/cache/integration-cache/sources"
    
    148
    -  - pip3 install sphinx
    
    148
    +  # Currently sphinx_rtd_theme does not support Sphinx >1.8, this breaks search functionality
    
    149
    +  - pip3 install sphinx==1.7.9
    
    149 150
       - pip3 install sphinx-click
    
    150 151
       - pip3 install sphinx_rtd_theme
    
    151 152
       - cd dist && ./unpack.sh && cd buildstream
    

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -506,7 +506,7 @@ class CASCache(ArtifactCache):
    506 506
         def set_ref(self, ref, tree):
    
    507 507
             refpath = self._refpath(ref)
    
    508 508
             os.makedirs(os.path.dirname(refpath), exist_ok=True)
    
    509
    -        with utils.save_file_atomic(refpath, 'wb') as f:
    
    509
    +        with utils.save_file_atomic(refpath, 'wb', tempdir=self.tmpdir) as f:
    
    510 510
                 f.write(tree.SerializeToString())
    
    511 511
     
    
    512 512
         # resolve_ref():
    
    ... ... @@ -1048,10 +1048,29 @@ class CASCache(ArtifactCache):
    1048 1048
                     missing_blobs[d.hash] = d
    
    1049 1049
     
    
    1050 1050
             # Upload any blobs missing on the server
    
    1051
    -        for blob_digest in missing_blobs.values():
    
    1052
    -            with open(self.objpath(blob_digest), 'rb') as f:
    
    1053
    -                assert os.fstat(f.fileno()).st_size == blob_digest.size_bytes
    
    1054
    -                self._send_blob(remote, blob_digest, f, u_uid=u_uid)
    
    1051
    +        self._send_blobs(remote, missing_blobs.values(), u_uid)
    
    1052
    +
    
    1053
    +    def _send_blobs(self, remote, digests, u_uid=uuid.uuid4()):
    
    1054
    +        batch = _CASBatchUpdate(remote)
    
    1055
    +
    
    1056
    +        for digest in digests:
    
    1057
    +            with open(self.objpath(digest), 'rb') as f:
    
    1058
    +                assert os.fstat(f.fileno()).st_size == digest.size_bytes
    
    1059
    +
    
    1060
    +                if (digest.size_bytes >= remote.max_batch_total_size_bytes or
    
    1061
    +                        not remote.batch_update_supported):
    
    1062
    +                    # Too large for batch request, upload in independent request.
    
    1063
    +                    self._send_blob(remote, digest, f, u_uid=u_uid)
    
    1064
    +                else:
    
    1065
    +                    if not batch.add(digest, f):
    
    1066
    +                        # Not enough space left in batch request.
    
    1067
    +                        # Complete pending batch first.
    
    1068
    +                        batch.send()
    
    1069
    +                        batch = _CASBatchUpdate(remote)
    
    1070
    +                        batch.add(digest, f)
    
    1071
    +
    
    1072
    +        # Send final batch
    
    1073
    +        batch.send()
    
    1055 1074
     
    
    1056 1075
     
    
    1057 1076
     # Represents a single remote CAS cache.
    
    ... ... @@ -1126,6 +1145,17 @@ class _CASRemote():
    1126 1145
                     if e.code() != grpc.StatusCode.UNIMPLEMENTED:
    
    1127 1146
                         raise
    
    1128 1147
     
    
    1148
    +            # Check whether the server supports BatchUpdateBlobs()
    
    1149
    +            self.batch_update_supported = False
    
    1150
    +            try:
    
    1151
    +                request = remote_execution_pb2.BatchUpdateBlobsRequest()
    
    1152
    +                response = self.cas.BatchUpdateBlobs(request)
    
    1153
    +                self.batch_update_supported = True
    
    1154
    +            except grpc.RpcError as e:
    
    1155
    +                if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
    
    1156
    +                        e.code() != grpc.StatusCode.PERMISSION_DENIED):
    
    1157
    +                    raise
    
    1158
    +
    
    1129 1159
                 self._initialized = True
    
    1130 1160
     
    
    1131 1161
     
    
    ... ... @@ -1173,6 +1203,46 @@ class _CASBatchRead():
    1173 1203
                 yield (response.digest, response.data)
    
    1174 1204
     
    
    1175 1205
     
    
    1206
    +# Represents a batch of blobs queued for upload.
    
    1207
    +#
    
    1208
    +class _CASBatchUpdate():
    
    1209
    +    def __init__(self, remote):
    
    1210
    +        self._remote = remote
    
    1211
    +        self._max_total_size_bytes = remote.max_batch_total_size_bytes
    
    1212
    +        self._request = remote_execution_pb2.BatchUpdateBlobsRequest()
    
    1213
    +        self._size = 0
    
    1214
    +        self._sent = False
    
    1215
    +
    
    1216
    +    def add(self, digest, stream):
    
    1217
    +        assert not self._sent
    
    1218
    +
    
    1219
    +        new_batch_size = self._size + digest.size_bytes
    
    1220
    +        if new_batch_size > self._max_total_size_bytes:
    
    1221
    +            # Not enough space left in current batch
    
    1222
    +            return False
    
    1223
    +
    
    1224
    +        blob_request = self._request.requests.add()
    
    1225
    +        blob_request.digest.hash = digest.hash
    
    1226
    +        blob_request.digest.size_bytes = digest.size_bytes
    
    1227
    +        blob_request.data = stream.read(digest.size_bytes)
    
    1228
    +        self._size = new_batch_size
    
    1229
    +        return True
    
    1230
    +
    
    1231
    +    def send(self):
    
    1232
    +        assert not self._sent
    
    1233
    +        self._sent = True
    
    1234
    +
    
    1235
    +        if len(self._request.requests) == 0:
    
    1236
    +            return
    
    1237
    +
    
    1238
    +        batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
    
    1239
    +
    
    1240
    +        for response in batch_response.responses:
    
    1241
    +            if response.status.code != grpc.StatusCode.OK.value[0]:
    
    1242
    +                raise ArtifactError("Failed to upload blob {}: {}".format(
    
    1243
    +                    response.digest.hash, response.status.code))
    
    1244
    +
    
    1245
    +
    
    1176 1246
     def _grouper(iterable, n):
    
    1177 1247
         while True:
    
    1178 1248
             try:
    

  • buildstream/_artifactcache/casserver.py
    ... ... @@ -68,7 +68,7 @@ def create_server(repo, *, enable_push):
    68 68
             _ByteStreamServicer(artifactcache, enable_push=enable_push), server)
    
    69 69
     
    
    70 70
         remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
    
    71
    -        _ContentAddressableStorageServicer(artifactcache), server)
    
    71
    +        _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
    
    72 72
     
    
    73 73
         remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
    
    74 74
             _CapabilitiesServicer(), server)
    
    ... ... @@ -222,9 +222,10 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    222 222
     
    
    223 223
     
    
    224 224
     class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
    
    225
    -    def __init__(self, cas):
    
    225
    +    def __init__(self, cas, *, enable_push):
    
    226 226
             super().__init__()
    
    227 227
             self.cas = cas
    
    228
    +        self.enable_push = enable_push
    
    228 229
     
    
    229 230
         def FindMissingBlobs(self, request, context):
    
    230 231
             response = remote_execution_pb2.FindMissingBlobsResponse()
    
    ... ... @@ -260,6 +261,46 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
    260 261
     
    
    261 262
             return response
    
    262 263
     
    
    264
    +    def BatchUpdateBlobs(self, request, context):
    
    265
    +        response = remote_execution_pb2.BatchUpdateBlobsResponse()
    
    266
    +
    
    267
    +        if not self.enable_push:
    
    268
    +            context.set_code(grpc.StatusCode.PERMISSION_DENIED)
    
    269
    +            return response
    
    270
    +
    
    271
    +        batch_size = 0
    
    272
    +
    
    273
    +        for blob_request in request.requests:
    
    274
    +            digest = blob_request.digest
    
    275
    +
    
    276
    +            batch_size += digest.size_bytes
    
    277
    +            if batch_size > _MAX_PAYLOAD_BYTES:
    
    278
    +                context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    279
    +                return response
    
    280
    +
    
    281
    +            blob_response = response.responses.add()
    
    282
    +            blob_response.digest.hash = digest.hash
    
    283
    +            blob_response.digest.size_bytes = digest.size_bytes
    
    284
    +
    
    285
    +            if len(blob_request.data) != digest.size_bytes:
    
    286
    +                blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
    
    287
    +                continue
    
    288
    +
    
    289
    +            try:
    
    290
    +                _clean_up_cache(self.cas, digest.size_bytes)
    
    291
    +
    
    292
    +                with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
    
    293
    +                    out.write(blob_request.data)
    
    294
    +                    out.flush()
    
    295
    +                    server_digest = self.cas.add_object(path=out.name)
    
    296
    +                    if server_digest.hash != digest.hash:
    
    297
    +                        blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
    
    298
    +
    
    299
    +            except ArtifactTooLargeException:
    
    300
    +                blob_response.status.code = grpc.StatusCode.RESOURCE_EXHAUSTED
    
    301
    +
    
    302
    +        return response
    
    303
    +
    
    263 304
     
    
    264 305
     class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer):
    
    265 306
         def GetCapabilities(self, request, context):
    

  • buildstream/_platform/linux.py
    ... ... @@ -55,6 +55,10 @@ class Linux(Platform):
    55 55
                 return SandboxBwrap(*args, **kwargs)
    
    56 56
     
    
    57 57
         def check_sandbox_config(self, config):
    
    58
    +        if not self._local_sandbox_available():
    
    59
    +            # Accept all sandbox configs as it's irrelevant with the dummy sandbox (no Sandbox.run).
    
    60
    +            return True
    
    61
    +
    
    58 62
             if self._user_ns_available:
    
    59 63
                 # User namespace support allows arbitrary build UID/GID settings.
    
    60 64
                 return True
    

  • buildstream/_scheduler/jobs/job.py
    ... ... @@ -119,6 +119,8 @@ class Job():
    119 119
             self._result = None                    # Return value of child action in the parent
    
    120 120
             self._tries = 0                        # Try count, for retryable jobs
    
    121 121
             self._skipped_flag = False             # Indicate whether the job was skipped.
    
    122
    +        self._terminated = False               # Whether this job has been explicitly terminated
    
    123
    +
    
    122 124
             # If False, a retry will not be attempted regardless of whether _tries is less than _max_retries.
    
    123 125
             #
    
    124 126
             self._retry_flag = True
    
    ... ... @@ -190,6 +192,8 @@ class Job():
    190 192
             # Terminate the process using multiprocessing API pathway
    
    191 193
             self._process.terminate()
    
    192 194
     
    
    195
    +        self._terminated = True
    
    196
    +
    
    193 197
         # terminate_wait()
    
    194 198
         #
    
    195 199
         # Wait for terminated jobs to complete
    
    ... ... @@ -273,18 +277,22 @@ class Job():
    273 277
         # running the integration commands).
    
    274 278
         #
    
    275 279
         # Args:
    
    276
    -    #     (int): The plugin identifier for this task
    
    280
    +    #     task_id (int): The plugin identifier for this task
    
    277 281
         #
    
    278 282
         def set_task_id(self, task_id):
    
    279 283
             self._task_id = task_id
    
    280 284
     
    
    281 285
         # skipped
    
    282 286
         #
    
    287
    +    # This will evaluate to True if the job was skipped
    
    288
    +    # during processing, or if it was forcefully terminated.
    
    289
    +    #
    
    283 290
         # Returns:
    
    284
    -    #    bool: True if the job was skipped while processing.
    
    291
    +    #    (bool): Whether the job should appear as skipped
    
    292
    +    #
    
    285 293
         @property
    
    286 294
         def skipped(self):
    
    287
    -        return self._skipped_flag
    
    295
    +        return self._skipped_flag or self._terminated
    
    288 296
     
    
    289 297
         #######################################################
    
    290 298
         #                  Abstract Methods                   #
    

  • buildstream/_scheduler/queues/queue.py
    ... ... @@ -326,16 +326,20 @@ class Queue():
    326 326
                               detail=traceback.format_exc())
    
    327 327
                 self.failed_elements.append(element)
    
    328 328
             else:
    
    329
    -
    
    330
    -            # No exception occured, handle the success/failure state in the normal way
    
    331 329
                 #
    
    330
    +            # No exception occured in post processing
    
    331
    +            #
    
    332
    +
    
    333
    +            # All jobs get placed on the done queue for later processing.
    
    332 334
                 self._done_queue.append(job)
    
    333 335
     
    
    334
    -            if success:
    
    335
    -                if not job.skipped:
    
    336
    -                    self.processed_elements.append(element)
    
    337
    -                else:
    
    338
    -                    self.skipped_elements.append(element)
    
    336
    +            # A Job can be skipped whether or not it has failed,
    
    337
    +            # we want to only bookkeep them as processed or failed
    
    338
    +            # if they are not skipped.
    
    339
    +            if job.skipped:
    
    340
    +                self.skipped_elements.append(element)
    
    341
    +            elif success:
    
    342
    +                self.processed_elements.append(element)
    
    339 343
                 else:
    
    340 344
                     self.failed_elements.append(element)
    
    341 345
     
    

  • buildstream/_scheduler/scheduler.py
    ... ... @@ -387,6 +387,15 @@ class Scheduler():
    387 387
         # A loop registered event callback for keyboard interrupts
    
    388 388
         #
    
    389 389
         def _interrupt_event(self):
    
    390
    +
    
    391
    +        # FIXME: This should not be needed, but for some reason we receive an
    
    392
    +        #        additional SIGINT event when the user hits ^C a second time
    
    393
    +        #        to inform us that they really intend to terminate; even though
    
    394
    +        #        we have disconnected our handlers at this time.
    
    395
    +        #
    
    396
    +        if self.terminated:
    
    397
    +            return
    
    398
    +
    
    390 399
             # Leave this to the frontend to decide, if no
    
    391 400
             # interrrupt callback was specified, then just terminate.
    
    392 401
             if self._interrupt_callback:
    

  • buildstream/element.py
    ... ... @@ -1379,10 +1379,10 @@ class Element(Plugin):
    1379 1379
                 if not vdirectory.is_empty():
    
    1380 1380
                     raise ElementError("Staging directory '{}' is not empty".format(vdirectory))
    
    1381 1381
     
    
    1382
    -            # While mkdtemp is advertised as using the TMP environment variable, it
    
    1383
    -            # doesn't, so this explicit extraction is necesasry.
    
    1384
    -            tmp_prefix = os.environ.get("TMP", None)
    
    1385
    -            temp_staging_directory = tempfile.mkdtemp(prefix=tmp_prefix)
    
    1382
    +            # It's advantageous to have this temporary directory on
    
    1383
    +            # the same filing system as the rest of our cache.
    
    1384
    +            temp_staging_location = os.path.join(self._get_context().artifactdir, "staging_temp")
    
    1385
    +            temp_staging_directory = tempfile.mkdtemp(prefix=temp_staging_location)
    
    1386 1386
     
    
    1387 1387
                 try:
    
    1388 1388
                     workspace = self._get_workspace()
    

  • buildstream/plugins/sources/git.py
    ... ... @@ -184,10 +184,18 @@ class GitMirror(SourceFetcher):
    184 184
                              cwd=self.mirror)
    
    185 185
     
    
    186 186
         def fetch(self, alias_override=None):
    
    187
    -        self.ensure(alias_override)
    
    188
    -        if not self.has_ref():
    
    189
    -            self._fetch(alias_override)
    
    190
    -        self.assert_ref()
    
    187
    +        # Resolve the URL for the message
    
    188
    +        resolved_url = self.source.translate_url(self.url,
    
    189
    +                                                 alias_override=alias_override,
    
    190
    +                                                 primary=self.primary)
    
    191
    +
    
    192
    +        with self.source.timed_activity("Fetching from {}"
    
    193
    +                                        .format(resolved_url),
    
    194
    +                                        silent_nested=True):
    
    195
    +            self.ensure(alias_override)
    
    196
    +            if not self.has_ref():
    
    197
    +                self._fetch(alias_override)
    
    198
    +            self.assert_ref()
    
    191 199
     
    
    192 200
         def has_ref(self):
    
    193 201
             if not self.ref:
    

  • buildstream/sandbox/_sandboxremote.py
    ... ... @@ -177,15 +177,11 @@ class SandboxRemote(Sandbox):
    177 177
             if not cascache.verify_digest_pushed(self._get_project(), upload_vdir.ref):
    
    178 178
                 raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
    
    179 179
     
    
    180
    -        # Set up environment and working directory
    
    181
    -        if cwd is None:
    
    182
    -            cwd = self._get_work_directory()
    
    183
    -
    
    184
    -        if cwd is None:
    
    185
    -            cwd = '/'
    
    186
    -
    
    187
    -        if env is None:
    
    188
    -            env = self._get_environment()
    
    180
    +        # Fallback to the sandbox default settings for
    
    181
    +        # the cwd and env.
    
    182
    +        #
    
    183
    +        cwd = self._get_work_directory(cwd=cwd)
    
    184
    +        env = self._get_environment(cwd=cwd, env=env)
    
    189 185
     
    
    190 186
             # We want command args as a list of strings
    
    191 187
             if isinstance(command, str):
    

  • buildstream/source.py
    ... ... @@ -965,28 +965,48 @@ class Source(Plugin):
    965 965
         # Tries to call fetch for every mirror, stopping once it succeeds
    
    966 966
         def __do_fetch(self, **kwargs):
    
    967 967
             project = self._get_project()
    
    968
    -        source_fetchers = self.get_source_fetchers()
    
    968
    +        context = self._get_context()
    
    969
    +
    
    970
    +        # Silence the STATUS messages which might happen as a result
    
    971
    +        # of checking the source fetchers.
    
    972
    +        with context.silence():
    
    973
    +            source_fetchers = self.get_source_fetchers()
    
    969 974
     
    
    970 975
             # Use the source fetchers if they are provided
    
    971 976
             #
    
    972 977
             if source_fetchers:
    
    973
    -            for fetcher in source_fetchers:
    
    974
    -                alias = fetcher._get_alias()
    
    975
    -                for uri in project.get_alias_uris(alias, first_pass=self.__first_pass):
    
    976
    -                    try:
    
    977
    -                        fetcher.fetch(uri)
    
    978
    -                    # FIXME: Need to consider temporary vs. permanent failures,
    
    979
    -                    #        and how this works with retries.
    
    980
    -                    except BstError as e:
    
    981
    -                        last_error = e
    
    982
    -                        continue
    
    983
    -
    
    984
    -                    # No error, we're done with this fetcher
    
    985
    -                    break
    
    986 978
     
    
    987
    -                else:
    
    988
    -                    # No break occurred, raise the last detected error
    
    989
    -                    raise last_error
    
    979
    +            # Use a contorted loop here, this is to allow us to
    
    980
    +            # silence the messages which can result from consuming
    
    981
    +            # the items of source_fetchers, if it happens to be a generator.
    
    982
    +            #
    
    983
    +            source_fetchers = iter(source_fetchers)
    
    984
    +            try:
    
    985
    +
    
    986
    +                while True:
    
    987
    +
    
    988
    +                    with context.silence():
    
    989
    +                        fetcher = next(source_fetchers)
    
    990
    +
    
    991
    +                    alias = fetcher._get_alias()
    
    992
    +                    for uri in project.get_alias_uris(alias, first_pass=self.__first_pass):
    
    993
    +                        try:
    
    994
    +                            fetcher.fetch(uri)
    
    995
    +                        # FIXME: Need to consider temporary vs. permanent failures,
    
    996
    +                        #        and how this works with retries.
    
    997
    +                        except BstError as e:
    
    998
    +                            last_error = e
    
    999
    +                            continue
    
    1000
    +
    
    1001
    +                        # No error, we're done with this fetcher
    
    1002
    +                        break
    
    1003
    +
    
    1004
    +                    else:
    
    1005
    +                        # No break occurred, raise the last detected error
    
    1006
    +                        raise last_error
    
    1007
    +
    
    1008
    +            except StopIteration:
    
    1009
    +                pass
    
    990 1010
     
    
    991 1011
             # Default codepath is to reinstantiate the Source
    
    992 1012
             #
    

  • buildstream/utils.py
    ... ... @@ -502,7 +502,7 @@ def get_bst_version():
    502 502
     
    
    503 503
     @contextmanager
    
    504 504
     def save_file_atomic(filename, mode='w', *, buffering=-1, encoding=None,
    
    505
    -                     errors=None, newline=None, closefd=True, opener=None):
    
    505
    +                     errors=None, newline=None, closefd=True, opener=None, tempdir=None):
    
    506 506
         """Save a file with a temporary name and rename it into place when ready.
    
    507 507
     
    
    508 508
         This is a context manager which is meant for saving data to files.
    
    ... ... @@ -529,8 +529,9 @@ def save_file_atomic(filename, mode='w', *, buffering=-1, encoding=None,
    529 529
         # https://bugs.python.org/issue8604
    
    530 530
     
    
    531 531
         assert os.path.isabs(filename), "The utils.save_file_atomic() parameter ``filename`` must be an absolute path"
    
    532
    -    dirname = os.path.dirname(filename)
    
    533
    -    fd, tempname = tempfile.mkstemp(dir=dirname)
    
    532
    +    if tempdir is None:
    
    533
    +        tempdir = os.path.dirname(filename)
    
    534
    +    fd, tempname = tempfile.mkstemp(dir=tempdir)
    
    534 535
         os.close(fd)
    
    535 536
     
    
    536 537
         f = open(tempname, mode=mode, buffering=buffering, encoding=encoding,
    
    ... ... @@ -562,6 +563,9 @@ def save_file_atomic(filename, mode='w', *, buffering=-1, encoding=None,
    562 563
     #
    
    563 564
     # Get the disk usage of a given directory in bytes.
    
    564 565
     #
    
    566
    +# This function assumes that files do not inadvertantly
    
    567
    +# disappear while this function is running.
    
    568
    +#
    
    565 569
     # Arguments:
    
    566 570
     #     (str) The path whose size to check.
    
    567 571
     #
    

  • contrib/bst-docker-import
    1
    +#!/bin/bash
    
    2
    +#
    
    3
    +#  Copyright 2018 Bloomberg Finance LP
    
    4
    +#
    
    5
    +#  This program is free software; you can redistribute it and/or
    
    6
    +#  modify it under the terms of the GNU Lesser General Public
    
    7
    +#  License as published by the Free Software Foundation; either
    
    8
    +#  version 2 of the License, or (at your option) any later version.
    
    9
    +#
    
    10
    +#  This library is distributed in the hope that it will be useful,
    
    11
    +#  but WITHOUT ANY WARRANTY; without even the implied warranty of
    
    12
    +#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
    
    13
    +#  Lesser General Public License for more details.
    
    14
    +#
    
    15
    +#  You should have received a copy of the GNU Lesser General Public
    
    16
    +#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
    
    17
    +#
    
    18
    +#  Authors:
    
    19
    +#        Chadnan Singh <csingh43 bloomberg net>
    
    20
    +
    
    21
    +# This is a helper script to generate Docker images using checkouts of
    
    22
    +# BuildStream elements.
    
    23
    +
    
    24
    +usage() {
    
    25
    +    cat <<EOF
    
    26
    +
    
    27
    +USAGE: $(basename "$0") [-c BST_CMD] [-m MESSAGE] [-t TAG] [-h] ELEMENT
    
    28
    +
    
    29
    +Create a Docker image from bst checkout of an element.
    
    30
    +
    
    31
    +OPTIONS:
    
    32
    +    -c BST_CMD    Path to BuildStream command (default: bst).
    
    33
    +    -m MESSAGE    Commit message for the imported image.
    
    34
    +    -t TAG        Tag of the imported image.
    
    35
    +    -h            Print this help text and exit.
    
    36
    +
    
    37
    +EXAMPLES:
    
    38
    +
    
    39
    +    # Import hello.bst as a Docker image with tag "bst-hello" and message "hello"
    
    40
    +    $(basename "$0") -m hello -t bst-hello hello.bst
    
    41
    +
    
    42
    +    # Import hello.bst as a Docker image with tag "bst-hello" using bst-here
    
    43
    +    $(basename "$0") -c bst-here -t bst-hello hello.bst
    
    44
    +
    
    45
    +EOF
    
    46
    +    exit "$1"
    
    47
    +}
    
    48
    +
    
    49
    +die() {
    
    50
    +    echo "FATAL: $1" >&2
    
    51
    +    exit 1
    
    52
    +}
    
    53
    +
    
    54
    +bst_cmd=bst
    
    55
    +docker_import_cmd="docker import"
    
    56
    +docker_image_tag=
    
    57
    +
    
    58
    +while getopts c:m:t:h arg
    
    59
    +do
    
    60
    +    case $arg in
    
    61
    +    c)
    
    62
    +        bst_cmd="$OPTARG"
    
    63
    +        ;;
    
    64
    +    m)
    
    65
    +        docker_import_cmd="$docker_import_cmd -m $OPTARG"
    
    66
    +        ;;
    
    67
    +    t)
    
    68
    +        docker_image_tag="$OPTARG"
    
    69
    +        ;;
    
    70
    +    h)
    
    71
    +        usage 0
    
    72
    +        ;;
    
    73
    +    \?)
    
    74
    +        usage 1
    
    75
    +    esac
    
    76
    +done
    
    77
    +
    
    78
    +shift $((OPTIND-1))
    
    79
    +if [[ "$#" != 1 ]]; then
    
    80
    +    echo "$0: No element specified" >&2
    
    81
    +    usage 1
    
    82
    +fi
    
    83
    +element="$1"
    
    84
    +
    
    85
    +# Dump to a temporary file in the current directory.
    
    86
    +# NOTE: We use current directory to try to ensure compatibility with scripts
    
    87
    +# like bst-here, assuming that the current working directory is mounted
    
    88
    +# inside the container.
    
    89
    +
    
    90
    +checkout_tar="bst-checkout-$(basename "$element")-$RANDOM.tar"
    
    91
    +
    
    92
    +echo "INFO: Checking out $element ..." >&2
    
    93
    +$bst_cmd checkout --tar "$element" "$checkout_tar" || die "Failed to checkout $element"
    
    94
    +echo "INFO: Successfully checked out $element" >&2
    
    95
    +
    
    96
    +echo "INFO: Importing Docker image ..."
    
    97
    +$docker_import_cmd "$checkout_tar" "$docker_image_tag" || die "Failed to import Docker image from tarball"
    
    98
    +echo "INFO: Successfully import Docker image $docker_image_tag"
    
    99
    +
    
    100
    +echo "INFO: Cleaning up ..."
    
    101
    +rm "$checkout_tar" || die "Failed to remove $checkout_tar"
    
    102
    +echo "INFO: Clean up finished"

  • tests/frontend/mirror.py
    ... ... @@ -139,6 +139,82 @@ def test_mirror_fetch(cli, tmpdir, datafiles, kind):
    139 139
         result.assert_success()
    
    140 140
     
    
    141 141
     
    
    142
    +@pytest.mark.datafiles(DATA_DIR)
    
    143
    +@pytest.mark.parametrize("ref_storage", [("inline"), ("project.refs")])
    
    144
    +@pytest.mark.parametrize("mirror", [("no-mirror"), ("mirror"), ("unrelated-mirror")])
    
    145
    +def test_mirror_fetch_ref_storage(cli, tmpdir, datafiles, ref_storage, mirror):
    
    146
    +    bin_files_path = os.path.join(str(datafiles), 'files', 'bin-files', 'usr')
    
    147
    +    dev_files_path = os.path.join(str(datafiles), 'files', 'dev-files', 'usr')
    
    148
    +    upstream_repodir = os.path.join(str(tmpdir), 'upstream')
    
    149
    +    mirror_repodir = os.path.join(str(tmpdir), 'mirror')
    
    150
    +    project_dir = os.path.join(str(tmpdir), 'project')
    
    151
    +    os.makedirs(project_dir)
    
    152
    +    element_dir = os.path.join(project_dir, 'elements')
    
    153
    +
    
    154
    +    # Create repo objects of the upstream and mirror
    
    155
    +    upstream_repo = create_repo('tar', upstream_repodir)
    
    156
    +    upstream_ref = upstream_repo.create(bin_files_path)
    
    157
    +    mirror_repo = upstream_repo.copy(mirror_repodir)
    
    158
    +    mirror_ref = upstream_ref
    
    159
    +    upstream_ref = upstream_repo.create(dev_files_path)
    
    160
    +
    
    161
    +    element = {
    
    162
    +        'kind': 'import',
    
    163
    +        'sources': [
    
    164
    +            upstream_repo.source_config(ref=upstream_ref if ref_storage == 'inline' else None)
    
    165
    +        ]
    
    166
    +    }
    
    167
    +    element_name = 'test.bst'
    
    168
    +    element_path = os.path.join(element_dir, element_name)
    
    169
    +    full_repo = element['sources'][0]['url']
    
    170
    +    upstream_map, repo_name = os.path.split(full_repo)
    
    171
    +    alias = 'foo'
    
    172
    +    aliased_repo = alias + ':' + repo_name
    
    173
    +    element['sources'][0]['url'] = aliased_repo
    
    174
    +    full_mirror = mirror_repo.source_config()['url']
    
    175
    +    mirror_map, _ = os.path.split(full_mirror)
    
    176
    +    os.makedirs(element_dir)
    
    177
    +    _yaml.dump(element, element_path)
    
    178
    +
    
    179
    +    if ref_storage == 'project.refs':
    
    180
    +        # Manually set project.refs to avoid caching the repo prematurely
    
    181
    +        project_refs = {'projects': {
    
    182
    +            'test': {
    
    183
    +                element_name: [
    
    184
    +                    {'ref': upstream_ref}
    
    185
    +                ]
    
    186
    +            }
    
    187
    +        }}
    
    188
    +        project_refs_path = os.path.join(project_dir, 'project.refs')
    
    189
    +        _yaml.dump(project_refs, project_refs_path)
    
    190
    +
    
    191
    +    project = {
    
    192
    +        'name': 'test',
    
    193
    +        'element-path': 'elements',
    
    194
    +        'aliases': {
    
    195
    +            alias: upstream_map + "/"
    
    196
    +        },
    
    197
    +        'ref-storage': ref_storage
    
    198
    +    }
    
    199
    +    if mirror != 'no-mirror':
    
    200
    +        mirror_data = [{
    
    201
    +            'name': 'middle-earth',
    
    202
    +            'aliases': {alias: [mirror_map + '/']}
    
    203
    +        }]
    
    204
    +        if mirror == 'unrelated-mirror':
    
    205
    +            mirror_data.insert(0, {
    
    206
    +                'name': 'narnia',
    
    207
    +                'aliases': {'frob': ['http://www.example.com/repo']}
    
    208
    +            })
    
    209
    +        project['mirrors'] = mirror_data
    
    210
    +
    
    211
    +    project_file = os.path.join(project_dir, 'project.conf')
    
    212
    +    _yaml.dump(project, project_file)
    
    213
    +
    
    214
    +    result = cli.run(project=project_dir, args=['fetch', element_name])
    
    215
    +    result.assert_success()
    
    216
    +
    
    217
    +
    
    142 218
     @pytest.mark.datafiles(DATA_DIR)
    
    143 219
     @pytest.mark.parametrize("kind", [(kind) for kind in ALL_REPO_KINDS])
    
    144 220
     def test_mirror_fetch_upstream_absent(cli, tmpdir, datafiles, kind):
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]