Raoul Hidalgo Charman pushed to branch raoul/802-refactor-artifactcache at BuildStream / buildstream
Commits:
-
f112bfae
by Raoul Hidalgo Charman at 2018-12-18T12:35:04Z
-
5d05a079
by Raoul Hidalgo Charman at 2018-12-18T14:01:24Z
3 changed files:
Changes:
... | ... | @@ -656,6 +656,11 @@ class ArtifactCache(): |
656 | 656 |
remote.request_blob(blob_digest)
|
657 | 657 |
for blob_file in remote.get_blobs():
|
658 | 658 |
self.cas.add_object(path=blob_file.name, link_directly=True)
|
659 |
+ |
|
660 |
+ # request the final CAS batch
|
|
661 |
+ for blob_file in remote.get_blobs(request_batch=True):
|
|
662 |
+ self.cas.add_object(path=blob_file.name, link_directly=True)
|
|
663 |
+ |
|
659 | 664 |
self.cas.set_ref(ref, root_digest)
|
660 | 665 |
except BlobNotFound:
|
661 | 666 |
element.info("Remote ({}) is missing blobs for {}".format(
|
... | ... | @@ -689,15 +694,36 @@ class ArtifactCache(): |
689 | 694 |
#
|
690 | 695 |
# Args:
|
691 | 696 |
# project (Project): The current project
|
692 |
- # digest (Digest): The digest of the tree
|
|
697 |
+ # tree_digest (Digest): The digest of the tree
|
|
693 | 698 |
#
|
694 |
- def pull_tree(self, project, digest):
|
|
699 |
+ def pull_tree(self, project, tree_digest):
|
|
695 | 700 |
for remote in self._remotes[project]:
|
696 |
- digest = self.cas.pull_tree(remote, digest)
|
|
697 |
- |
|
698 |
- if digest:
|
|
699 |
- # no need to pull from additional remotes
|
|
700 |
- return digest
|
|
701 |
+ try:
|
|
702 |
+ # get tree
|
|
703 |
+ tree = remote.get_tree_blob(tree_digest)
|
|
704 |
+ |
|
705 |
+ # request files
|
|
706 |
+ tree.children.extend([tree.root])
|
|
707 |
+ for directory in tree.children:
|
|
708 |
+ for filenode in directory.files:
|
|
709 |
+ if self.cas.check_blob(filenode.digest):
|
|
710 |
+ continue
|
|
711 |
+ remote.request_blob(blob_digest)
|
|
712 |
+ for blob_file in remote.get_blobs():
|
|
713 |
+ self.cas.add_object(path=blob_file.name)
|
|
714 |
+ |
|
715 |
+ # Get the last batch
|
|
716 |
+ for blob in remote.get_blobs(request_batch=True):
|
|
717 |
+ self.cas.add_object(blob)
|
|
718 |
+ |
|
719 |
+ # add the directory to CAS
|
|
720 |
+ for directory in tree.children:
|
|
721 |
+ self.cas.add_object(buffer=directory.SerializeToString())
|
|
722 |
+ |
|
723 |
+ except BlobNotFound:
|
|
724 |
+ continue
|
|
725 |
+ else:
|
|
726 |
+ return tree_digest
|
|
701 | 727 |
|
702 | 728 |
return None
|
703 | 729 |
|
... | ... | @@ -183,29 +183,6 @@ class CASCache(): |
183 | 183 |
|
184 | 184 |
return modified, removed, added
|
185 | 185 |
|
186 |
- # pull_tree():
|
|
187 |
- #
|
|
188 |
- # Pull a single Tree rather than a ref.
|
|
189 |
- # Does not update local refs.
|
|
190 |
- #
|
|
191 |
- # Args:
|
|
192 |
- # remote (CASRemote): The remote to pull from
|
|
193 |
- # digest (Digest): The digest of the tree
|
|
194 |
- #
|
|
195 |
- def pull_tree(self, remote, digest):
|
|
196 |
- try:
|
|
197 |
- remote.init()
|
|
198 |
- |
|
199 |
- digest = self._fetch_tree(remote, digest)
|
|
200 |
- |
|
201 |
- return digest
|
|
202 |
- |
|
203 |
- except grpc.RpcError as e:
|
|
204 |
- if e.code() != grpc.StatusCode.NOT_FOUND:
|
|
205 |
- raise
|
|
206 |
- |
|
207 |
- return None
|
|
208 |
- |
|
209 | 186 |
# link_ref():
|
210 | 187 |
#
|
211 | 188 |
# Add an alias for an existing ref.
|
... | ... | @@ -771,29 +748,6 @@ class CASCache(): |
771 | 748 |
|
772 | 749 |
return objpath
|
773 | 750 |
|
774 |
- def _fetch_tree(self, remote, digest):
|
|
775 |
- # download but do not store the Tree object
|
|
776 |
- with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
|
|
777 |
- remote._fetch_blob(digest, out)
|
|
778 |
- |
|
779 |
- tree = remote_execution_pb2.Tree()
|
|
780 |
- |
|
781 |
- with open(out.name, 'rb') as f:
|
|
782 |
- tree.ParseFromString(f.read())
|
|
783 |
- |
|
784 |
- tree.children.extend([tree.root])
|
|
785 |
- for directory in tree.children:
|
|
786 |
- for filenode in directory.files:
|
|
787 |
- self._ensure_blob(remote, filenode.digest)
|
|
788 |
- |
|
789 |
- # place directory blob only in final location when we've downloaded
|
|
790 |
- # all referenced blobs to avoid dangling references in the repository
|
|
791 |
- dirbuffer = directory.SerializeToString()
|
|
792 |
- dirdigest = self.add_object(buffer=dirbuffer)
|
|
793 |
- assert dirdigest.size_bytes == len(dirbuffer)
|
|
794 |
- |
|
795 |
- return dirdigest
|
|
796 |
- |
|
797 | 751 |
def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
|
798 | 752 |
required_blobs = self._required_blobs(digest)
|
799 | 753 |
|
... | ... | @@ -95,6 +95,9 @@ class CASRemote(): |
95 | 95 |
|
96 | 96 |
self.__tmp_downloads = [] # files in the tmpdir waiting to be added to local caches
|
97 | 97 |
|
98 |
+ self.__batch_read = None
|
|
99 |
+ self.__batch_update = None
|
|
100 |
+ |
|
98 | 101 |
def init(self):
|
99 | 102 |
if not self._initialized:
|
100 | 103 |
url = urlparse(self.spec.url)
|
... | ... | @@ -152,6 +155,7 @@ class CASRemote(): |
152 | 155 |
request = remote_execution_pb2.BatchReadBlobsRequest()
|
153 | 156 |
response = self.cas.BatchReadBlobs(request)
|
154 | 157 |
self.batch_read_supported = True
|
158 |
+ self.__batch_read = _CASBatchRead(self)
|
|
155 | 159 |
except grpc.RpcError as e:
|
156 | 160 |
if e.code() != grpc.StatusCode.UNIMPLEMENTED:
|
157 | 161 |
raise
|
... | ... | @@ -162,6 +166,7 @@ class CASRemote(): |
162 | 166 |
request = remote_execution_pb2.BatchUpdateBlobsRequest()
|
163 | 167 |
response = self.cas.BatchUpdateBlobs(request)
|
164 | 168 |
self.batch_update_supported = True
|
169 |
+ self.__batch_update = _CASBatchUpdate(self)
|
|
165 | 170 |
except grpc.RpcError as e:
|
166 | 171 |
if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
|
167 | 172 |
e.code() != grpc.StatusCode.PERMISSION_DENIED):
|
... | ... | @@ -276,6 +281,17 @@ class CASRemote(): |
276 | 281 |
else:
|
277 | 282 |
return None
|
278 | 283 |
|
284 |
+ def get_tree_blob(self, tree_digest):
|
|
285 |
+ self.init()
|
|
286 |
+ f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
|
|
287 |
+ self._fetch_blob(tree_digest, f)
|
|
288 |
+ |
|
289 |
+ tree = remote_execution_pb2.Tree()
|
|
290 |
+ with open(f.name, 'rb') as tmp:
|
|
291 |
+ tree.ParseFromString(tmp.read())
|
|
292 |
+ |
|
293 |
+ return tree
|
|
294 |
+ |
|
279 | 295 |
# yield_directory_digests():
|
280 | 296 |
#
|
281 | 297 |
# Iterate over blobs digests starting from a root digest
|
... | ... | @@ -296,20 +312,30 @@ class CASRemote(): |
296 | 312 |
# Fetch artifact, excluded_subdirs determined in pullqueue
|
297 | 313 |
yield from self._yield_directory_digests(root_digest, excluded_subdirs=excluded_subdirs)
|
298 | 314 |
|
315 |
+ def yield_tree_digests(self, tree_digest):
|
|
316 |
+ self.init()
|
|
317 |
+ tree.children.extend([tree.root])
|
|
318 |
+ for directory in tree.children:
|
|
319 |
+ for filenode in directory.files:
|
|
320 |
+ yield filenode.digest
|
|
321 |
+ |
|
299 | 322 |
# request_blob():
|
300 | 323 |
#
|
301 |
- # Request blob and returns path to tmpdir location
|
|
324 |
+ # Request blob, triggering download depending via bytestream or cas
|
|
325 |
+ # BatchReadBlobs depending on size.
|
|
302 | 326 |
#
|
303 | 327 |
# Args:
|
304 | 328 |
# digest (Digest): digest of the requested blob
|
305 |
- # path (str): tmpdir locations of downloaded blobs
|
|
306 | 329 |
#
|
307 | 330 |
def request_blob(self, digest):
|
308 |
- # TODO expand for adding to batches some other logic
|
|
309 |
- f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
|
|
310 |
- self._fetch_blob(digest, f)
|
|
311 |
- self.__tmp_downloads.append(f)
|
|
312 |
- return f.name
|
|
331 |
+ if (not self.batch_read_supported or
|
|
332 |
+ digest.size_bytes > self.max_batch_total_size_bytes):
|
|
333 |
+ f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
|
|
334 |
+ self._fetch_blob(digest, f)
|
|
335 |
+ self.__tmp_downloads.append(f)
|
|
336 |
+ elif self.__batch_read.add(digest) is False:
|
|
337 |
+ self._download_batch()
|
|
338 |
+ self.__batch_read.add(digest)
|
|
313 | 339 |
|
314 | 340 |
# get_blobs():
|
315 | 341 |
#
|
... | ... | @@ -318,7 +344,12 @@ class CASRemote(): |
318 | 344 |
#
|
319 | 345 |
# Returns:
|
320 | 346 |
# iterator over NamedTemporaryFile
|
321 |
- def get_blobs(self):
|
|
347 |
+ def get_blobs(self, request_batch=False):
|
|
348 |
+ # Send read batch request and download
|
|
349 |
+ if (request_batch is True and
|
|
350 |
+ self.batch_read_supported is True):
|
|
351 |
+ self._download_batch()
|
|
352 |
+ |
|
322 | 353 |
while self.__tmp_downloads:
|
323 | 354 |
yield self.__tmp_downloads.pop()
|
324 | 355 |
|
... | ... | @@ -349,18 +380,18 @@ class CASRemote(): |
349 | 380 |
# excluded_subdirs (list): The optional list of subdirs to not fetch
|
350 | 381 |
#
|
351 | 382 |
def _yield_directory_digests(self, dir_digest, *, excluded_subdirs=[]):
|
383 |
+ # get directory blob
|
|
384 |
+ f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
|
|
385 |
+ self._fetch_blob(dir_digest, f)
|
|
352 | 386 |
|
353 |
- objpath = self.request_blob(dir_digest)
|
|
354 |
- |
|
387 |
+ # need to read in directory structure to iterate over it
|
|
355 | 388 |
directory = remote_execution_pb2.Directory()
|
356 |
- |
|
357 |
- with open(objpath, 'rb') as f:
|
|
358 |
- directory.ParseFromString(f.read())
|
|
389 |
+ with open(f.name, 'rb') as tmp:
|
|
390 |
+ directory.ParseFromString(tmp.read())
|
|
359 | 391 |
|
360 | 392 |
yield dir_digest
|
361 | 393 |
for filenode in directory.files:
|
362 | 394 |
yield filenode.digest
|
363 |
- |
|
364 | 395 |
for dirnode in directory.directories:
|
365 | 396 |
if dirnode.name not in excluded_subdirs:
|
366 | 397 |
yield from self._yield_directory_digests(dirnode.digest)
|
... | ... | @@ -393,6 +424,15 @@ class CASRemote(): |
393 | 424 |
|
394 | 425 |
assert response.committed_size == digest.size_bytes
|
395 | 426 |
|
427 |
+ def _download_batch(self):
|
|
428 |
+ for _, data in self.__batch_read.send():
|
|
429 |
+ f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
|
|
430 |
+ f.write(data)
|
|
431 |
+ f.flush()
|
|
432 |
+ self.__tmp_downloads.append(f)
|
|
433 |
+ |
|
434 |
+ self.__batch_read = _CASBatchRead(self)
|
|
435 |
+ |
|
396 | 436 |
|
397 | 437 |
# Represents a batch of blobs queued for fetching.
|
398 | 438 |
#
|