... |
... |
@@ -254,30 +254,9 @@ class CASCache(ArtifactCache): |
254
|
254
|
|
255
|
255
|
self.set_ref(newref, tree)
|
256
|
256
|
|
257
|
|
- def push(self, element, keys):
|
258
|
|
- keys = list(keys)
|
259
|
|
- refs = [self.get_artifact_fullname(element, key) for key in keys]
|
260
|
|
-
|
261
|
|
- project = element._get_project()
|
262
|
|
- self.push_refs(refs, project)
|
263
|
|
-
|
264
|
|
- def push_refs(self, refs, project):
|
265
|
|
-
|
266
|
|
- push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
267
|
|
-
|
268
|
|
- pushed = False
|
269
|
|
-
|
270
|
|
- for remote in push_remotes:
|
271
|
|
- remote.init()
|
272
|
|
- element.info("Pushing {} -> {}".format(element._get_brief_display_key(), remote.spec.url))
|
273
|
|
- if _push_refs_to_remote(refs, remote):
|
274
|
|
- pushed = True
|
275
|
|
-
|
276
|
|
- #TODO: Does pushed get used?
|
277
|
|
- return pushed
|
278
|
|
-
|
279
|
|
- def _push_refs_to_remote(self, refs, remote):
|
|
257
|
+ def _push_refs_to_remote(self, refs, remote, may_have_dependencies):
|
280
|
258
|
skipped_remote = True
|
|
259
|
+
|
281
|
260
|
try:
|
282
|
261
|
for ref in refs:
|
283
|
262
|
tree = self.resolve_ref(ref)
|
... |
... |
@@ -348,19 +327,51 @@ class CASCache(ArtifactCache): |
348
|
327
|
request.digest.size_bytes = tree.size_bytes
|
349
|
328
|
remote.ref_storage.UpdateReference(request)
|
350
|
329
|
|
351
|
|
- pushed = True
|
352
|
|
-
|
353
|
330
|
except grpc.RpcError as e:
|
354
|
331
|
if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
|
355
|
332
|
raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
|
356
|
333
|
|
357
|
|
- if skipped_remote:
|
358
|
|
- self.context.message(Message(
|
359
|
|
- None,
|
360
|
|
- MessageType.SKIPPED,
|
361
|
|
- "Remote ({}) already has {} cached".format(
|
362
|
|
- remote.spec.url, element._get_brief_display_key())
|
363
|
|
- ))
|
|
334
|
+ return not skipped_remote
|
|
335
|
+
|
|
336
|
+ def push(self, element, keys):
|
|
337
|
+ keys = list(keys)
|
|
338
|
+ refs = [self.get_artifact_fullname(element, key) for key in keys]
|
|
339
|
+
|
|
340
|
+ project = element._get_project()
|
|
341
|
+ return self.push_refs(element, refs, project)
|
|
342
|
+
|
|
343
|
+ def push_refs(self, element, refs, project, may_have_dependencies=True):
|
|
344
|
+
|
|
345
|
+ push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
|
346
|
+
|
|
347
|
+ pushed = False
|
|
348
|
+
|
|
349
|
+ for remote in push_remotes:
|
|
350
|
+ remote.init()
|
|
351
|
+ if self._push_refs_to_remote(refs, remote, may_have_dependencies):
|
|
352
|
+ pushed = True
|
|
353
|
+ else:
|
|
354
|
+ self.context.message(Message(
|
|
355
|
+ None,
|
|
356
|
+ MessageType.SKIPPED,
|
|
357
|
+ "Remote ({}) already has {} cached".format(
|
|
358
|
+ remote.spec.url, element._get_brief_display_key())
|
|
359
|
+ ))
|
|
360
|
+
|
|
361
|
+ return pushed
|
|
362
|
+
|
|
363
|
+ def verify_key_pushed(self, key, project):
|
|
364
|
+ ref = key
|
|
365
|
+ push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
|
366
|
+
|
|
367
|
+ pushed = False
|
|
368
|
+
|
|
369
|
+ for remote in push_remotes:
|
|
370
|
+ remote.init()
|
|
371
|
+
|
|
372
|
+ if self._verify_ref_on_remote(ref, remote):
|
|
373
|
+ pushed = True
|
|
374
|
+
|
364
|
375
|
return pushed
|
365
|
376
|
|
366
|
377
|
################################################
|
... |
... |
@@ -736,26 +747,27 @@ class CASCache(ArtifactCache): |
736
|
747
|
#
|
737
|
748
|
q.put(str(e))
|
738
|
749
|
|
739
|
|
- def _required_blobs(self, tree):
|
|
750
|
+ def _required_blobs(self, tree, may_have_dependencies=True):
|
740
|
751
|
# parse directory, and recursively add blobs
|
741
|
752
|
d = remote_execution_pb2.Digest()
|
742
|
753
|
d.hash = tree.hash
|
743
|
754
|
d.size_bytes = tree.size_bytes
|
744
|
755
|
yield d
|
745
|
756
|
|
746
|
|
- directory = remote_execution_pb2.Directory()
|
|
757
|
+ if may_have_dependencies:
|
|
758
|
+ directory = remote_execution_pb2.Directory()
|
747
|
759
|
|
748
|
|
- with open(self.objpath(tree), 'rb') as f:
|
749
|
|
- directory.ParseFromString(f.read())
|
|
760
|
+ with open(self.objpath(tree), 'rb') as f:
|
|
761
|
+ directory.ParseFromString(f.read())
|
750
|
762
|
|
751
|
|
- for filenode in directory.files:
|
752
|
|
- d = remote_execution_pb2.Digest()
|
753
|
|
- d.hash = filenode.digest.hash
|
754
|
|
- d.size_bytes = filenode.digest.size_bytes
|
755
|
|
- yield d
|
|
763
|
+ for filenode in directory.files:
|
|
764
|
+ d = remote_execution_pb2.Digest()
|
|
765
|
+ d.hash = filenode.digest.hash
|
|
766
|
+ d.size_bytes = filenode.digest.size_bytes
|
|
767
|
+ yield d
|
756
|
768
|
|
757
|
|
- for dirnode in directory.directories:
|
758
|
|
- yield from self._required_blobs(dirnode.digest)
|
|
769
|
+ for dirnode in directory.directories:
|
|
770
|
+ yield from self._required_blobs(dirnode.digest)
|
759
|
771
|
|
760
|
772
|
def _fetch_blob(self, remote, digest, out):
|
761
|
773
|
resource_name = os.path.join(digest.hash, str(digest.size_bytes))
|