... |
... |
@@ -27,6 +27,7 @@ import uuid |
27
|
27
|
import time
|
28
|
28
|
import errno
|
29
|
29
|
import ctypes
|
|
30
|
+import threading
|
30
|
31
|
|
31
|
32
|
import click
|
32
|
33
|
import grpc
|
... |
... |
@@ -62,7 +63,7 @@ class ArtifactTooLargeException(Exception): |
62
|
63
|
# repo (str): Path to CAS repository
|
63
|
64
|
# enable_push (bool): Whether to allow blob uploads and artifact updates
|
64
|
65
|
#
|
65
|
|
-def create_server(repo, *, enable_push):
|
|
66
|
+def create_server(repo, max_head_size, min_head_size, *, enable_push):
|
66
|
67
|
context = Context()
|
67
|
68
|
context.artifactdir = os.path.abspath(repo)
|
68
|
69
|
|
... |
... |
@@ -72,11 +73,13 @@ def create_server(repo, *, enable_push): |
72
|
73
|
max_workers = (os.cpu_count() or 1) * 5
|
73
|
74
|
server = grpc.server(futures.ThreadPoolExecutor(max_workers))
|
74
|
75
|
|
|
76
|
+ cache_cleaner = _CacheCleaner(artifactcache, max_head_size, min_head_size)
|
|
77
|
+
|
75
|
78
|
bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
|
76
|
|
- _ByteStreamServicer(artifactcache, enable_push=enable_push), server)
|
|
79
|
+ _ByteStreamServicer(artifactcache, cache_cleaner, enable_push=enable_push), server)
|
77
|
80
|
|
78
|
81
|
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
|
79
|
|
- _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
|
|
82
|
+ _ContentAddressableStorageServicer(artifactcache, cache_cleaner, enable_push=enable_push), server)
|
80
|
83
|
|
81
|
84
|
remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
|
82
|
85
|
_CapabilitiesServicer(), server)
|
... |
... |
@@ -94,9 +97,16 @@ def create_server(repo, *, enable_push): |
94
|
97
|
@click.option('--client-certs', help="Public client certificates for TLS (PEM-encoded)")
|
95
|
98
|
@click.option('--enable-push', default=False, is_flag=True,
|
96
|
99
|
help="Allow clients to upload blobs and update artifact cache")
|
|
100
|
+@click.option('--head-room-min', type=click.INT,
|
|
101
|
+ help="Disk head room minimum in bytes",
|
|
102
|
+ default=2e9)
|
|
103
|
+@click.option('--head-room-max', type=click.INT,
|
|
104
|
+ help="Disk head room maximum in bytes",
|
|
105
|
+ default=10e9)
|
97
|
106
|
@click.argument('repo')
|
98
|
|
-def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
|
99
|
|
- server = create_server(repo, enable_push=enable_push)
|
|
107
|
+def server_main(repo, port, server_key, server_cert, client_certs, enable_push,
|
|
108
|
+ head_room_min, head_room_max):
|
|
109
|
+ server = create_server(repo, head_room_max, head_room_min, enable_push=enable_push)
|
100
|
110
|
|
101
|
111
|
use_tls = bool(server_key)
|
102
|
112
|
|
... |
... |
@@ -168,11 +178,12 @@ class _FallocateCall: |
168
|
178
|
|
169
|
179
|
|
170
|
180
|
class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
|
171
|
|
- def __init__(self, cas, *, enable_push):
|
|
181
|
+ def __init__(self, cas, cache_cleaner, *, enable_push):
|
172
|
182
|
super().__init__()
|
173
|
183
|
self.cas = cas
|
174
|
184
|
self.enable_push = enable_push
|
175
|
185
|
self.fallocate = _FallocateCall()
|
|
186
|
+ self.cache_cleaner = cache_cleaner
|
176
|
187
|
|
177
|
188
|
def Read(self, request, context):
|
178
|
189
|
resource_name = request.resource_name
|
... |
... |
@@ -234,7 +245,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
234
|
245
|
if client_digest.size_bytes == 0:
|
235
|
246
|
break
|
236
|
247
|
try:
|
237
|
|
- _clean_up_cache(self.cas, client_digest.size_bytes)
|
|
248
|
+ self.cache_cleaner.clean_up(client_digest.size_bytes)
|
238
|
249
|
except ArtifactTooLargeException as e:
|
239
|
250
|
context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
|
240
|
251
|
context.set_details(str(e))
|
... |
... |
@@ -280,10 +291,11 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
280
|
291
|
|
281
|
292
|
|
282
|
293
|
class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
|
283
|
|
- def __init__(self, cas, *, enable_push):
|
|
294
|
+ def __init__(self, cas, cache_cleaner, *, enable_push):
|
284
|
295
|
super().__init__()
|
285
|
296
|
self.cas = cas
|
286
|
297
|
self.enable_push = enable_push
|
|
298
|
+ self.cache_cleaner = cache_cleaner
|
287
|
299
|
|
288
|
300
|
def FindMissingBlobs(self, request, context):
|
289
|
301
|
response = remote_execution_pb2.FindMissingBlobsResponse()
|
... |
... |
@@ -352,7 +364,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres |
352
|
364
|
continue
|
353
|
365
|
|
354
|
366
|
try:
|
355
|
|
- _clean_up_cache(self.cas, digest.size_bytes)
|
|
367
|
+ self.cache_cleaner.clean_up(digest.size_bytes)
|
356
|
368
|
|
357
|
369
|
with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
|
358
|
370
|
out.write(blob_request.data)
|
... |
... |
@@ -473,63 +485,80 @@ def _digest_from_upload_resource_name(resource_name): |
473
|
485
|
return None
|
474
|
486
|
|
475
|
487
|
|
476
|
|
-# _clean_up_cache()
|
477
|
|
-#
|
478
|
|
-# Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
|
479
|
|
-# is enough space for the incoming artifact
|
480
|
|
-#
|
481
|
|
-# Args:
|
482
|
|
-# cas: CASCache object
|
483
|
|
-# object_size: The size of the object being received in bytes
|
484
|
|
-#
|
485
|
|
-# Returns:
|
486
|
|
-# int: The total bytes removed on the filesystem
|
487
|
|
-#
|
488
|
|
-def _clean_up_cache(cas, object_size):
|
489
|
|
- # Determine the available disk space, in bytes, of the file system
|
490
|
|
- # which mounts the repo
|
491
|
|
- stats = os.statvfs(cas.casdir)
|
492
|
|
- buffer_ = int(2e9) # Add a 2 GB buffer
|
493
|
|
- free_disk_space = (stats.f_bavail * stats.f_bsize) - buffer_
|
494
|
|
- total_disk_space = (stats.f_blocks * stats.f_bsize) - buffer_
|
495
|
|
-
|
496
|
|
- if object_size > total_disk_space:
|
497
|
|
- raise ArtifactTooLargeException("Artifact of size: {} is too large for "
|
498
|
|
- "the filesystem which mounts the remote "
|
499
|
|
- "cache".format(object_size))
|
500
|
|
-
|
501
|
|
- if object_size <= free_disk_space:
|
502
|
|
- # No need to clean up
|
503
|
|
- return 0
|
504
|
|
-
|
505
|
|
- # obtain a list of LRP artifacts
|
506
|
|
- LRP_artifacts = cas.list_artifacts()
|
507
|
|
-
|
508
|
|
- keep_after = time.time() - _OBJECT_MIN_AGE
|
509
|
|
-
|
510
|
|
- removed_size = 0 # in bytes
|
511
|
|
- if object_size - removed_size > free_disk_space:
|
512
|
|
- # First we try to see if some unreferenced objects became old
|
513
|
|
- # enough to be removed.
|
514
|
|
- removed_size += cas.prune(keep_after=keep_after)
|
515
|
|
-
|
516
|
|
- while object_size - removed_size > free_disk_space:
|
517
|
|
- try:
|
518
|
|
- to_remove = LRP_artifacts.pop(0) # The first element in the list is the LRP artifact
|
519
|
|
- except IndexError:
|
520
|
|
- # This exception is caught if there are no more artifacts in the list
|
521
|
|
- # LRP_artifacts. This means the the artifact is too large for the filesystem
|
522
|
|
- # so we abort the process
|
523
|
|
- raise ArtifactTooLargeException("Artifact of size {} is too large for "
|
524
|
|
- "the filesystem which mounts the remote "
|
525
|
|
- "cache".format(object_size))
|
|
488
|
+class _CacheCleaner:
|
526
|
489
|
|
527
|
|
- cas.remove(to_remove, defer_prune=True)
|
528
|
|
- removed_size += cas.prune(keep_after=keep_after)
|
|
490
|
+ __cleanup_cache_lock = threading.Lock()
|
529
|
491
|
|
530
|
|
- if removed_size > 0:
|
531
|
|
- logging.info("Successfully removed {} bytes from the cache".format(removed_size))
|
532
|
|
- else:
|
533
|
|
- logging.info("No artifacts were removed from the cache.")
|
|
492
|
+ def __init__(self, cas, max_head_size, min_head_size = int(2e9)):
|
|
493
|
+ self.__cas = cas
|
|
494
|
+ self.__max_head_size = max_head_size
|
|
495
|
+ self.__min_head_size = min_head_size
|
|
496
|
+
|
|
497
|
+ def __has_space(self, object_size):
|
|
498
|
+ stats = os.statvfs(self.__cas.casdir)
|
|
499
|
+ free_disk_space = (stats.f_bavail * stats.f_bsize) - self.__min_head_size
|
|
500
|
+ total_disk_space = (stats.f_blocks * stats.f_bsize) - self.__min_head_size
|
|
501
|
+
|
|
502
|
+ if object_size > total_disk_space:
|
|
503
|
+ raise ArtifactTooLargeException("Artifact of size: {} is too large for "
|
|
504
|
+ "the filesystem which mounts the remote "
|
|
505
|
+ "cache".format(object_size))
|
534
|
506
|
|
535
|
|
- return removed_size
|
|
507
|
+ return object_size <= free_disk_space
|
|
508
|
+
|
|
509
|
+ # _clean_up_cache()
|
|
510
|
+ #
|
|
511
|
+ # Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
|
|
512
|
+ # is enough space for the incoming artifact
|
|
513
|
+ #
|
|
514
|
+ # Args:
|
|
515
|
+ # object_size: The size of the object being received in bytes
|
|
516
|
+ #
|
|
517
|
+ # Returns:
|
|
518
|
+ # int: The total bytes removed on the filesystem
|
|
519
|
+ #
|
|
520
|
+ def clean_up(self, object_size):
|
|
521
|
+ if self.__has_space(object_size):
|
|
522
|
+ return 0
|
|
523
|
+
|
|
524
|
+ with _CacheCleaner.__cleanup_cache_lock:
|
|
525
|
+ if self.__has_space(object_size):
|
|
526
|
+ # Another thread has done the cleanup for us
|
|
527
|
+ return 0
|
|
528
|
+
|
|
529
|
+ stats = os.statvfs(self.__cas.casdir)
|
|
530
|
+ target_disk_space = (stats.f_bavail * stats.f_bsize) - self.__max_head_size
|
|
531
|
+
|
|
532
|
+ # obtain a list of LRP artifacts
|
|
533
|
+ LRP_objects = self.__cas.list_objects()
|
|
534
|
+
|
|
535
|
+ removed_size = 0 # in bytes
|
|
536
|
+
|
|
537
|
+ last_mtime = 0
|
|
538
|
+
|
|
539
|
+ while object_size - removed_size > target_disk_space:
|
|
540
|
+ try:
|
|
541
|
+ last_mtime, to_remove = LRP_objects.pop(0) # The first element in the list is the LRP artifact
|
|
542
|
+ except IndexError:
|
|
543
|
+ # This exception is caught if there are no more artifacts in the list
|
|
544
|
+ # LRP_artifacts. This means the the artifact is too large for the filesystem
|
|
545
|
+ # so we abort the process
|
|
546
|
+ raise ArtifactTooLargeException("Artifact of size {} is too large for "
|
|
547
|
+ "the filesystem which mounts the remote "
|
|
548
|
+ "cache".format(object_size))
|
|
549
|
+
|
|
550
|
+ try:
|
|
551
|
+ size = os.stat(to_remove).st_size
|
|
552
|
+ os.unlink(to_remove)
|
|
553
|
+ removed_size += size
|
|
554
|
+ except FileNotFoundError:
|
|
555
|
+ pass
|
|
556
|
+
|
|
557
|
+ self.__cas.clean_up_refs_until(last_mtime)
|
|
558
|
+
|
|
559
|
+ if removed_size > 0:
|
|
560
|
+ logging.info("Successfully removed {} bytes from the cache".format(removed_size))
|
|
561
|
+ else:
|
|
562
|
+ logging.info("No artifacts were removed from the cache.")
|
|
563
|
+
|
|
564
|
+ return removed_size
|