finn pushed to branch finn/separate-services at BuildGrid / buildgrid
Commits:
-
56268c37
by finn at 2018-09-04T11:01:18Z
-
d506116d
by finn at 2018-09-04T11:01:27Z
-
99cb0f7a
by finn at 2018-09-04T11:01:27Z
6 changed files:
- + buildgrid/server/cas/instance.py
- buildgrid/server/cas/service.py
- + buildgrid/server/cas/storage/remote.py
- buildgrid/utils.py
- tests/cas/test_services.py
- tests/cas/test_storage.py
Changes:
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+ |
|
16 |
+"""
|
|
17 |
+Storage Instances
|
|
18 |
+=========
|
|
19 |
+Instances of CAS and ByteStream
|
|
20 |
+"""
|
|
21 |
+ |
|
22 |
+from buildgrid._protos.google.bytestream import bytestream_pb2
|
|
23 |
+from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
|
|
24 |
+ |
|
25 |
+from .._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
|
|
26 |
+from ...settings import HASH
|
|
27 |
+ |
|
28 |
+ |
|
29 |
+class ContentAddressableStorageInstance:
|
|
30 |
+ |
|
31 |
+ def __init__(self, storage):
|
|
32 |
+ self._storage = storage
|
|
33 |
+ |
|
34 |
+ def find_missing_blobs(self, blob_digests):
|
|
35 |
+ storage = self._storage
|
|
36 |
+ return re_pb2.FindMissingBlobsResponse(
|
|
37 |
+ missing_blob_digests=storage.missing_blobs(blob_digests))
|
|
38 |
+ |
|
39 |
+ def batch_update_blobs(self, requests):
|
|
40 |
+ storage = self._storage
|
|
41 |
+ store = []
|
|
42 |
+ for request_proto in requests:
|
|
43 |
+ store.append((request_proto.digest, request_proto.data))
|
|
44 |
+ |
|
45 |
+ response = re_pb2.BatchUpdateBlobsResponse()
|
|
46 |
+ statuses = storage.bulk_update_blobs(store)
|
|
47 |
+ |
|
48 |
+ for (digest, _), status in zip(store, statuses):
|
|
49 |
+ response_proto = response.responses.add()
|
|
50 |
+ response_proto.digest.CopyFrom(digest)
|
|
51 |
+ response_proto.status.CopyFrom(status)
|
|
52 |
+ |
|
53 |
+ return response
|
|
54 |
+ |
|
55 |
+ |
|
56 |
+class ByteStreamInstance:
|
|
57 |
+ |
|
58 |
+ BLOCK_SIZE = 1 * 1024 * 1024 # 1 MB block size
|
|
59 |
+ |
|
60 |
+ def __init__(self, storage):
|
|
61 |
+ self._storage = storage
|
|
62 |
+ |
|
63 |
+ def read(self, path, read_offset, read_limit):
|
|
64 |
+ storage = self._storage
|
|
65 |
+ |
|
66 |
+ if len(path) == 3:
|
|
67 |
+ path = [""] + path
|
|
68 |
+ |
|
69 |
+ # Parse/verify resource name.
|
|
70 |
+ # Read resource names look like "[instance/]blobs/abc123hash/99".
|
|
71 |
+ digest = re_pb2.Digest(hash=path[2], size_bytes=int(path[3]))
|
|
72 |
+ |
|
73 |
+ # Check the given read offset and limit.
|
|
74 |
+ if read_offset < 0 or read_offset > digest.size_bytes:
|
|
75 |
+ raise OutOfRangeError("Read offset out of range")
|
|
76 |
+ |
|
77 |
+ elif read_limit == 0:
|
|
78 |
+ bytes_remaining = digest.size_bytes - read_offset
|
|
79 |
+ |
|
80 |
+ elif read_limit > 0:
|
|
81 |
+ bytes_remaining = read_limit
|
|
82 |
+ |
|
83 |
+ else:
|
|
84 |
+ raise InvalidArgumentError("Negative read_limit is invalid")
|
|
85 |
+ |
|
86 |
+ # Read the blob from storage and send its contents to the client.
|
|
87 |
+ result = storage.get_blob(digest)
|
|
88 |
+ if result is None:
|
|
89 |
+ raise NotFoundError("Blob not found")
|
|
90 |
+ |
|
91 |
+ elif result.seekable():
|
|
92 |
+ result.seek(read_offset)
|
|
93 |
+ |
|
94 |
+ else:
|
|
95 |
+ result.read(read_offset)
|
|
96 |
+ |
|
97 |
+ while bytes_remaining > 0:
|
|
98 |
+ yield bytestream_pb2.ReadResponse(
|
|
99 |
+ data=result.read(min(self.BLOCK_SIZE, bytes_remaining)))
|
|
100 |
+ bytes_remaining -= self.BLOCK_SIZE
|
|
101 |
+ |
|
102 |
+ def write(self, requests):
|
|
103 |
+ storage = self._storage
|
|
104 |
+ |
|
105 |
+ first_request = next(requests)
|
|
106 |
+ path = first_request.resource_name.split("/")
|
|
107 |
+ |
|
108 |
+ if path[0] == "uploads":
|
|
109 |
+ path = [""] + path
|
|
110 |
+ |
|
111 |
+ if len(path) < 6 or path[1] != "uploads" or path[3] != "blobs" or not path[5].isdigit():
|
|
112 |
+ raise InvalidArgumentError("Invalid resource name")
|
|
113 |
+ |
|
114 |
+ digest = re_pb2.Digest(hash=path[4], size_bytes=int(path[5]))
|
|
115 |
+ write_session = storage.begin_write(digest)
|
|
116 |
+ |
|
117 |
+ # Start the write session and write the first request's data.
|
|
118 |
+ write_session.write(first_request.data)
|
|
119 |
+ hash_ = HASH(first_request.data)
|
|
120 |
+ bytes_written = len(first_request.data)
|
|
121 |
+ finished = first_request.finish_write
|
|
122 |
+ |
|
123 |
+ # Handle subsequent write requests.
|
|
124 |
+ while not finished:
|
|
125 |
+ |
|
126 |
+ for request in requests:
|
|
127 |
+ if finished:
|
|
128 |
+ raise InvalidArgumentError("Write request sent after write finished")
|
|
129 |
+ |
|
130 |
+ elif request.write_offset != bytes_written:
|
|
131 |
+ raise InvalidArgumentError("Invalid write offset")
|
|
132 |
+ |
|
133 |
+ elif request.resource_name and request.resource_name != first_request.resource_name:
|
|
134 |
+ raise InvalidArgumentError("Resource name changed mid-write")
|
|
135 |
+ |
|
136 |
+ finished = request.finish_write
|
|
137 |
+ bytes_written += len(request.data)
|
|
138 |
+ if bytes_written > digest.size_bytes:
|
|
139 |
+ raise InvalidArgumentError("Wrote too much data to blob")
|
|
140 |
+ |
|
141 |
+ write_session.write(request.data)
|
|
142 |
+ hash_.update(request.data)
|
|
143 |
+ |
|
144 |
+ # Check that the data matches the provided digest.
|
|
145 |
+ if bytes_written != digest.size_bytes or not finished:
|
|
146 |
+ raise NotImplementedError("Cannot close stream before finishing write")
|
|
147 |
+ |
|
148 |
+ elif hash_.hexdigest() != digest.hash:
|
|
149 |
+ raise InvalidArgumentError("Data does not match hash")
|
|
150 |
+ |
|
151 |
+ storage.commit_write(digest, write_session)
|
|
152 |
+ return bytestream_pb2.WriteResponse(committed_size=bytes_written)
|
... | ... | @@ -21,131 +21,128 @@ Implements the Content Addressable Storage API and ByteStream API. |
21 | 21 |
"""
|
22 | 22 |
|
23 | 23 |
|
24 |
+from itertools import tee
|
|
25 |
+import logging
|
|
26 |
+ |
|
24 | 27 |
import grpc
|
25 | 28 |
|
26 | 29 |
from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
|
27 | 30 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
|
28 | 31 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc as re_pb2_grpc
|
29 | 32 |
|
30 |
-from ...settings import HASH
|
|
33 |
+from .._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
|
|
31 | 34 |
|
32 | 35 |
|
33 | 36 |
class ContentAddressableStorageService(re_pb2_grpc.ContentAddressableStorageServicer):
|
34 | 37 |
|
35 |
- def __init__(self, storage):
|
|
36 |
- self._storage = storage
|
|
38 |
+ def __init__(self, instances):
|
|
39 |
+ self.logger = logging.getLogger(__name__)
|
|
40 |
+ self._instances = instances
|
|
37 | 41 |
|
38 | 42 |
def FindMissingBlobs(self, request, context):
|
39 |
- # Only one instance for now.
|
|
40 |
- storage = self._storage
|
|
41 |
- return re_pb2.FindMissingBlobsResponse(
|
|
42 |
- missing_blob_digests=storage.missing_blobs(request.blob_digests))
|
|
43 |
+ try:
|
|
44 |
+ instance = self._get_instance(request.instance_name)
|
|
45 |
+ return instance.find_missing_blobs(request.blob_digests)
|
|
46 |
+ |
|
47 |
+ except InvalidArgumentError as e:
|
|
48 |
+ self.logger.error(e)
|
|
49 |
+ context.set_details(str(e))
|
|
50 |
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
|
51 |
+ |
|
52 |
+ return re_pb2.FindMissingBlobsResponse()
|
|
43 | 53 |
|
44 | 54 |
def BatchUpdateBlobs(self, request, context):
|
45 |
- # Only one instance for now.
|
|
46 |
- storage = self._storage
|
|
47 |
- requests = []
|
|
48 |
- for request_proto in request.requests:
|
|
49 |
- requests.append((request_proto.digest, request_proto.data))
|
|
50 |
- response = re_pb2.BatchUpdateBlobsResponse()
|
|
51 |
- for (digest, _), status in zip(requests, storage.bulk_update_blobs(requests)):
|
|
52 |
- response_proto = response.responses.add()
|
|
53 |
- response_proto.digest.CopyFrom(digest)
|
|
54 |
- response_proto.status.CopyFrom(status)
|
|
55 |
- return response
|
|
55 |
+ try:
|
|
56 |
+ instance = self._get_instance(request.instance_name)
|
|
57 |
+ return instance.batch_update_blobs(request.requests)
|
|
56 | 58 |
|
59 |
+ except InvalidArgumentError as e:
|
|
60 |
+ self.logger.error(e)
|
|
61 |
+ context.set_details(str(e))
|
|
62 |
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
|
57 | 63 |
|
58 |
-class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
|
|
64 |
+ return re_pb2.BatchReadBlobsResponse()
|
|
59 | 65 |
|
60 |
- BLOCK_SIZE = 1 * 1024 * 1024 # 1 MB block size
|
|
66 |
+ def _get_instance(self, instance_name):
|
|
67 |
+ try:
|
|
68 |
+ return self._instances[instance_name]
|
|
69 |
+ |
|
70 |
+ except KeyError:
|
|
71 |
+ raise InvalidArgumentError("Invalid instance name: {}".format(instance_name))
|
|
72 |
+ |
|
73 |
+ |
|
74 |
+class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
|
|
61 | 75 |
|
62 |
- def __init__(self, storage):
|
|
63 |
- self._storage = storage
|
|
76 |
+ def __init__(self, instances):
|
|
77 |
+ self.logger = logging.getLogger(__name__)
|
|
78 |
+ self._instances = instances
|
|
64 | 79 |
|
65 | 80 |
def Read(self, request, context):
|
66 |
- # Only one instance for now.
|
|
67 |
- storage = self._storage
|
|
68 |
- |
|
69 |
- # Parse/verify resource name.
|
|
70 |
- # Read resource names look like "[instance/]blobs/abc123hash/99".
|
|
71 |
- path = request.resource_name.split("/")
|
|
72 |
- if len(path) == 3:
|
|
73 |
- path = [""] + path
|
|
74 |
- if len(path) != 4 or path[1] != "blobs" or not path[3].isdigit():
|
|
75 |
- context.abort(grpc.StatusCode.NOT_FOUND, "Invalid resource name")
|
|
76 |
- # instance_name = path[0]
|
|
77 |
- digest = re_pb2.Digest(hash=path[2], size_bytes=int(path[3]))
|
|
78 |
- |
|
79 |
- # Check the given read offset and limit.
|
|
80 |
- if request.read_offset < 0 or request.read_offset > digest.size_bytes:
|
|
81 |
- context.abort(grpc.StatusCode.OUT_OF_RANGE, "Read offset out of range")
|
|
82 |
- elif request.read_limit == 0:
|
|
83 |
- bytes_remaining = digest.size_bytes - request.read_offset
|
|
84 |
- elif request.read_limit > 0:
|
|
85 |
- bytes_remaining = request.read_limit
|
|
86 |
- else:
|
|
87 |
- context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Negative read_limit is invalid")
|
|
88 |
- |
|
89 |
- # Read the blob from storage and send its contents to the client.
|
|
90 |
- result = storage.get_blob(digest)
|
|
91 |
- if result is None:
|
|
92 |
- context.abort(grpc.StatusCode.NOT_FOUND, "Blob not found")
|
|
93 |
- elif result.seekable():
|
|
94 |
- result.seek(request.read_offset)
|
|
95 |
- else:
|
|
96 |
- result.read(request.read_offset)
|
|
97 |
- while bytes_remaining > 0:
|
|
98 |
- yield bytestream_pb2.ReadResponse(
|
|
99 |
- data=result.read(min(self.BLOCK_SIZE, bytes_remaining)))
|
|
100 |
- bytes_remaining -= self.BLOCK_SIZE
|
|
101 |
- |
|
102 |
- def Write(self, request_iterator, context):
|
|
103 |
- # Only one instance for now.
|
|
104 |
- storage = self._storage
|
|
105 |
- |
|
106 |
- requests = iter(request_iterator)
|
|
107 |
- first_request = next(requests)
|
|
108 |
- if first_request.write_offset != 0:
|
|
109 |
- context.abort(grpc.StatusCode.UNIMPLEMENTED, "Nonzero write offset is unsupported")
|
|
110 |
- |
|
111 |
- # Parse/verify resource name.
|
|
112 |
- # Write resource names look like "[instance/]uploads/SOME-GUID/blobs/abc123hash/99".
|
|
113 |
- path = first_request.resource_name.split("/")
|
|
114 |
- if path[0] == "uploads":
|
|
115 |
- path = [""] + path
|
|
116 |
- if len(path) < 6 or path[1] != "uploads" or path[3] != "blobs" or not path[5].isdigit():
|
|
117 |
- context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Invalid resource name")
|
|
118 |
- # instance_name = path[0]
|
|
119 |
- digest = re_pb2.Digest(hash=path[4], size_bytes=int(path[5]))
|
|
120 |
- |
|
121 |
- # Start the write session and write the first request's data.
|
|
122 |
- write_session = storage.begin_write(digest)
|
|
123 |
- write_session.write(first_request.data)
|
|
124 |
- hash_ = HASH(first_request.data)
|
|
125 |
- bytes_written = len(first_request.data)
|
|
126 |
- done = first_request.finish_write
|
|
127 |
- |
|
128 |
- # Handle subsequent write requests.
|
|
129 |
- for request in requests:
|
|
130 |
- if done:
|
|
131 |
- context.abort(grpc.StatusCode.INVALID_ARGUMENT,
|
|
132 |
- "Write request sent after write finished")
|
|
133 |
- elif request.write_offset != bytes_written:
|
|
134 |
- context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Invalid write offset")
|
|
135 |
- elif request.resource_name and request.resource_name != first_request.resource_name:
|
|
136 |
- context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Resource name changed mid-write")
|
|
137 |
- done = request.finish_write
|
|
138 |
- bytes_written += len(request.data)
|
|
139 |
- if bytes_written > digest.size_bytes:
|
|
140 |
- context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Wrote too much data to blob")
|
|
141 |
- write_session.write(request.data)
|
|
142 |
- hash_.update(request.data)
|
|
143 |
- |
|
144 |
- # Check that the data matches the provided digest.
|
|
145 |
- if bytes_written != digest.size_bytes or not done:
|
|
146 |
- context.abort(grpc.StatusCode.UNIMPLEMENTED,
|
|
147 |
- "Cannot close stream before finishing write")
|
|
148 |
- elif hash_.hexdigest() != digest.hash:
|
|
149 |
- context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Data does not match hash")
|
|
150 |
- storage.commit_write(digest, write_session)
|
|
151 |
- return bytestream_pb2.WriteResponse(committed_size=bytes_written)
|
|
81 |
+ try:
|
|
82 |
+ path = request.resource_name.split("/")
|
|
83 |
+ instance_name = path[0]
|
|
84 |
+ if instance_name == "blobs":
|
|
85 |
+ # TODO: Make default if no instance_name
|
|
86 |
+ instance_name = ""
|
|
87 |
+ |
|
88 |
+ instance = self._get_instance(instance_name)
|
|
89 |
+ yield from instance.read(path,
|
|
90 |
+ request.read_offset,
|
|
91 |
+ request.read_limit)
|
|
92 |
+ |
|
93 |
+ except InvalidArgumentError as e:
|
|
94 |
+ self.logger.error(e)
|
|
95 |
+ context.set_details(str(e))
|
|
96 |
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
|
97 |
+ yield bytestream_pb2.ReadResponse()
|
|
98 |
+ |
|
99 |
+ except NotFoundError as e:
|
|
100 |
+ self.logger.error(e)
|
|
101 |
+ context.set_details(str(e))
|
|
102 |
+ context.set_code(grpc.StatusCode.NOT_FOUND)
|
|
103 |
+ yield bytestream_pb2.ReadResponse()
|
|
104 |
+ |
|
105 |
+ except OutOfRangeError as e:
|
|
106 |
+ self.logger.error(e)
|
|
107 |
+ context.set_details(str(e))
|
|
108 |
+ context.set_code(grpc.StatusCode.OUT_OF_RANGE)
|
|
109 |
+ yield bytestream_pb2.ReadResponse()
|
|
110 |
+ |
|
111 |
+ def Write(self, requests, context):
|
|
112 |
+ try:
|
|
113 |
+ requests, request_probe = tee(requests, 2)
|
|
114 |
+ first_request = next(request_probe)
|
|
115 |
+ |
|
116 |
+ path = first_request.resource_name.split("/")
|
|
117 |
+ |
|
118 |
+ instance_name = path[0]
|
|
119 |
+ if instance_name == "uploads":
|
|
120 |
+ # TODO: Make default if no instance_name
|
|
121 |
+ instance_name = ""
|
|
122 |
+ |
|
123 |
+ instance = self._get_instance(instance_name)
|
|
124 |
+ return instance.write(requests)
|
|
125 |
+ |
|
126 |
+ except NotImplementedError as e:
|
|
127 |
+ self.logger.error(e)
|
|
128 |
+ context.set_details(str(e))
|
|
129 |
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
|
130 |
+ |
|
131 |
+ except InvalidArgumentError as e:
|
|
132 |
+ self.logger.error(e)
|
|
133 |
+ context.set_details(str(e))
|
|
134 |
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
|
135 |
+ |
|
136 |
+ except NotFoundError as e:
|
|
137 |
+ self.logger.error(e)
|
|
138 |
+ context.set_details(str(e))
|
|
139 |
+ context.set_code(grpc.StatusCode.NOT_FOUND)
|
|
140 |
+ |
|
141 |
+ return bytestream_pb2.WriteResponse()
|
|
142 |
+ |
|
143 |
+ def _get_instance(self, instance_name):
|
|
144 |
+ try:
|
|
145 |
+ return self._instances[instance_name]
|
|
146 |
+ |
|
147 |
+ except KeyError:
|
|
148 |
+ raise InvalidArgumentError("Invalid instance name: {}".format(instance_name))
|
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+ |
|
16 |
+"""
|
|
17 |
+RemoteStorage
|
|
18 |
+==================
|
|
19 |
+ |
|
20 |
+Forwwards storage requests to a remote storage.
|
|
21 |
+"""
|
|
22 |
+ |
|
23 |
+import io
|
|
24 |
+import logging
|
|
25 |
+ |
|
26 |
+import grpc
|
|
27 |
+ |
|
28 |
+from buildgrid.utils import create_digest, gen_fetch_blob, gen_write_request_blob
|
|
29 |
+from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
|
|
30 |
+from buildgrid._protos.google.rpc.status_pb2 import Status
|
|
31 |
+from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
|
32 |
+ |
|
33 |
+from .storage_abc import StorageABC
|
|
34 |
+ |
|
35 |
+ |
|
36 |
+class RemoteStorage(StorageABC):
|
|
37 |
+ |
|
38 |
+ def __init__(self, channel, instance_name=""):
|
|
39 |
+ self.logger = logging.getLogger(__name__)
|
|
40 |
+ self._instance_name = instance_name
|
|
41 |
+ self._stub_bs = bytestream_pb2_grpc.ByteStreamStub(channel)
|
|
42 |
+ self._stub_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(channel)
|
|
43 |
+ |
|
44 |
+ def has_blob(self, digest):
|
|
45 |
+ try:
|
|
46 |
+ if self.get_blob(digest):
|
|
47 |
+ return True
|
|
48 |
+ |
|
49 |
+ except grpc.RpcError as e:
|
|
50 |
+ if e.code() == grpc.StatusCode.NOT_FOUND:
|
|
51 |
+ pass
|
|
52 |
+ else:
|
|
53 |
+ raise e
|
|
54 |
+ |
|
55 |
+ return False
|
|
56 |
+ |
|
57 |
+ def get_blob(self, digest):
|
|
58 |
+ fetched_data = io.BytesIO()
|
|
59 |
+ length = 0
|
|
60 |
+ for data in gen_fetch_blob(self._stub_bs, digest, self._instance_name):
|
|
61 |
+ length += fetched_data.write(data)
|
|
62 |
+ |
|
63 |
+ if length:
|
|
64 |
+ assert digest.size_bytes == length
|
|
65 |
+ fetched_data.seek(0)
|
|
66 |
+ return fetched_data
|
|
67 |
+ |
|
68 |
+ else:
|
|
69 |
+ return None
|
|
70 |
+ |
|
71 |
+ def begin_write(self, digest):
|
|
72 |
+ return io.BytesIO(digest.SerializeToString())
|
|
73 |
+ |
|
74 |
+ def commit_write(self, digest, write_session):
|
|
75 |
+ write_session.seek(0)
|
|
76 |
+ |
|
77 |
+ for request in gen_write_request_blob(write_session, digest, self._instance_name):
|
|
78 |
+ self._stub_bs.Write(request)
|
|
79 |
+ |
|
80 |
+ def missing_blobs(self, blobs):
|
|
81 |
+ request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=self._instance_name)
|
|
82 |
+ |
|
83 |
+ for blob in blobs:
|
|
84 |
+ request_digest = request.blob_digests.add()
|
|
85 |
+ request_digest.hash = blob.hash
|
|
86 |
+ request_digest.size_bytes = blob.size_bytes
|
|
87 |
+ |
|
88 |
+ response = self._stub_cas.FindMissingBlobs(request)
|
|
89 |
+ |
|
90 |
+ return [x for x in response.missing_blob_digests]
|
|
91 |
+ |
|
92 |
+ def bulk_update_blobs(self, blobs):
|
|
93 |
+ request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=self._instance_name)
|
|
94 |
+ |
|
95 |
+ for digest, data in blobs:
|
|
96 |
+ reqs = request.requests.add()
|
|
97 |
+ reqs.digest.CopyFrom(digest)
|
|
98 |
+ reqs.data = data
|
|
99 |
+ |
|
100 |
+ response = self._stub_cas.BatchUpdateBlobs(request)
|
|
101 |
+ |
|
102 |
+ responses = response.responses
|
|
103 |
+ |
|
104 |
+ # Check everything was sent back, even if order changed
|
|
105 |
+ assert ([x.digest for x in request.requests].sort(key=lambda x: x.hash)) == \
|
|
106 |
+ ([x.digest for x in responses].sort(key=lambda x: x.hash))
|
|
107 |
+ |
|
108 |
+ return [x.status for x in responses]
|
... | ... | @@ -14,6 +14,7 @@ |
14 | 14 |
|
15 | 15 |
|
16 | 16 |
import os
|
17 |
+import uuid
|
|
17 | 18 |
|
18 | 19 |
from buildgrid.settings import HASH
|
19 | 20 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
... | ... | @@ -27,10 +28,37 @@ def gen_fetch_blob(stub, digest, instance_name=""): |
27 | 28 |
resource_name = os.path.join(instance_name, 'blobs', digest.hash, str(digest.size_bytes))
|
28 | 29 |
request = bytestream_pb2.ReadRequest(resource_name=resource_name,
|
29 | 30 |
read_offset=0)
|
31 |
+ |
|
30 | 32 |
for response in stub.Read(request):
|
31 | 33 |
yield response.data
|
32 | 34 |
|
33 | 35 |
|
36 |
+def gen_write_request_blob(digest_bytes, digest, instance_name=""):
|
|
37 |
+ """ Generates a bytestream write request
|
|
38 |
+ """
|
|
39 |
+ resource_name = os.path.join(instance_name, 'uploads', str(uuid.uuid4()),
|
|
40 |
+ 'blobs', digest.hash, str(digest.size_bytes))
|
|
41 |
+ |
|
42 |
+ offset = 0
|
|
43 |
+ finished = False
|
|
44 |
+ remaining = digest.size_bytes
|
|
45 |
+ |
|
46 |
+ while not finished:
|
|
47 |
+ chunk_size = min(remaining, 64 * 1024)
|
|
48 |
+ remaining -= chunk_size
|
|
49 |
+ finished = remaining <= 0
|
|
50 |
+ |
|
51 |
+ request = bytestream_pb2.WriteRequest()
|
|
52 |
+ request.resource_name = resource_name
|
|
53 |
+ request.write_offset = offset
|
|
54 |
+ request.data = digest_bytes.read(chunk_size)
|
|
55 |
+ request.finish_write = finished
|
|
56 |
+ |
|
57 |
+ yield request
|
|
58 |
+ |
|
59 |
+ offset += chunk_size
|
|
60 |
+ |
|
61 |
+ |
|
34 | 62 |
def write_fetch_directory(directory, stub, digest, instance_name=""):
|
35 | 63 |
""" Given a directory digest, fetches files and writes them to a directory
|
36 | 64 |
"""
|
... | ... | @@ -18,17 +18,23 @@ |
18 | 18 |
# pylint: disable=redefined-outer-name
|
19 | 19 |
|
20 | 20 |
import io
|
21 |
+from unittest import mock
|
|
21 | 22 |
|
23 |
+import grpc
|
|
24 |
+from grpc._server import _Context
|
|
22 | 25 |
import pytest
|
23 | 26 |
|
24 | 27 |
from buildgrid._protos.google.bytestream import bytestream_pb2
|
25 | 28 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
|
26 | 29 |
from buildgrid.server.cas.storage.storage_abc import StorageABC
|
27 |
-from buildgrid.server.cas.service import ByteStreamService
|
|
28 |
-from buildgrid.server.cas.service import ContentAddressableStorageService
|
|
30 |
+from buildgrid.server.cas.instance import ByteStreamInstance, ContentAddressableStorageInstance
|
|
31 |
+from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
|
|
29 | 32 |
from buildgrid.settings import HASH
|
30 | 33 |
|
31 | 34 |
|
35 |
+context = mock.create_autospec(_Context)
|
|
36 |
+ |
|
37 |
+ |
|
32 | 38 |
class SimpleStorage(StorageABC):
|
33 | 39 |
"""Storage provider wrapper around a dictionary.
|
34 | 40 |
|
... | ... | @@ -61,19 +67,6 @@ class SimpleStorage(StorageABC): |
61 | 67 |
self.data[(digest.hash, digest.size_bytes)] = data
|
62 | 68 |
|
63 | 69 |
|
64 |
-class MockObject:
|
|
65 |
- def __init__(self):
|
|
66 |
- self.abort = None
|
|
67 |
- |
|
68 |
- |
|
69 |
-class MockException(Exception):
|
|
70 |
- pass
|
|
71 |
- |
|
72 |
- |
|
73 |
-def raise_mock_exception(*args, **kwargs):
|
|
74 |
- raise MockException()
|
|
75 |
- |
|
76 |
- |
|
77 | 70 |
test_strings = [b"", b"hij"]
|
78 | 71 |
instances = ["", "test_inst"]
|
79 | 72 |
|
... | ... | @@ -82,7 +75,9 @@ instances = ["", "test_inst"] |
82 | 75 |
@pytest.mark.parametrize("instance", instances)
|
83 | 76 |
def test_bytestream_read(data_to_read, instance):
|
84 | 77 |
storage = SimpleStorage([b"abc", b"defg", data_to_read])
|
85 |
- servicer = ByteStreamService(storage)
|
|
78 |
+ |
|
79 |
+ bs_instance = ByteStreamInstance(storage)
|
|
80 |
+ servicer = ByteStreamService({instance: bs_instance})
|
|
86 | 81 |
|
87 | 82 |
request = bytestream_pb2.ReadRequest()
|
88 | 83 |
if instance != "":
|
... | ... | @@ -100,7 +95,8 @@ def test_bytestream_read_many(instance): |
100 | 95 |
data_to_read = b"testing" * 10000
|
101 | 96 |
|
102 | 97 |
storage = SimpleStorage([b"abc", b"defg", data_to_read])
|
103 |
- servicer = ByteStreamService(storage)
|
|
98 |
+ bs_instance = ByteStreamInstance(storage)
|
|
99 |
+ servicer = ByteStreamService({instance: bs_instance})
|
|
104 | 100 |
|
105 | 101 |
request = bytestream_pb2.ReadRequest()
|
106 | 102 |
if instance != "":
|
... | ... | @@ -117,7 +113,8 @@ def test_bytestream_read_many(instance): |
117 | 113 |
@pytest.mark.parametrize("extra_data", ["", "/", "/extra/data"])
|
118 | 114 |
def test_bytestream_write(instance, extra_data):
|
119 | 115 |
storage = SimpleStorage()
|
120 |
- servicer = ByteStreamService(storage)
|
|
116 |
+ bs_instance = ByteStreamInstance(storage)
|
|
117 |
+ servicer = ByteStreamService({instance: bs_instance})
|
|
121 | 118 |
|
122 | 119 |
resource_name = ""
|
123 | 120 |
if instance != "":
|
... | ... | @@ -139,7 +136,8 @@ def test_bytestream_write(instance, extra_data): |
139 | 136 |
|
140 | 137 |
def test_bytestream_write_rejects_wrong_hash():
|
141 | 138 |
storage = SimpleStorage()
|
142 |
- servicer = ByteStreamService(storage)
|
|
139 |
+ bs_instance = ByteStreamInstance(storage)
|
|
140 |
+ servicer = ByteStreamService({"": bs_instance})
|
|
143 | 141 |
|
144 | 142 |
data = b'some data'
|
145 | 143 |
wrong_hash = HASH(b'incorrect').hexdigest()
|
... | ... | @@ -148,10 +146,8 @@ def test_bytestream_write_rejects_wrong_hash(): |
148 | 146 |
bytestream_pb2.WriteRequest(resource_name=resource_name, data=data, finish_write=True)
|
149 | 147 |
]
|
150 | 148 |
|
151 |
- context = MockObject()
|
|
152 |
- context.abort = raise_mock_exception
|
|
153 |
- with pytest.raises(MockException):
|
|
154 |
- servicer.Write(requests, context)
|
|
149 |
+ servicer.Write(requests, context)
|
|
150 |
+ context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
|
|
155 | 151 |
|
156 | 152 |
assert len(storage.data) is 0
|
157 | 153 |
|
... | ... | @@ -159,7 +155,8 @@ def test_bytestream_write_rejects_wrong_hash(): |
159 | 155 |
@pytest.mark.parametrize("instance", instances)
|
160 | 156 |
def test_cas_find_missing_blobs(instance):
|
161 | 157 |
storage = SimpleStorage([b'abc', b'def'])
|
162 |
- servicer = ContentAddressableStorageService(storage)
|
|
158 |
+ cas_instance = ContentAddressableStorageInstance(storage)
|
|
159 |
+ servicer = ContentAddressableStorageService({instance: cas_instance})
|
|
163 | 160 |
digests = [
|
164 | 161 |
re_pb2.Digest(hash=HASH(b'def').hexdigest(), size_bytes=3),
|
165 | 162 |
re_pb2.Digest(hash=HASH(b'ghij').hexdigest(), size_bytes=4)
|
... | ... | @@ -173,7 +170,9 @@ def test_cas_find_missing_blobs(instance): |
173 | 170 |
@pytest.mark.parametrize("instance", instances)
|
174 | 171 |
def test_cas_batch_update_blobs(instance):
|
175 | 172 |
storage = SimpleStorage()
|
176 |
- servicer = ContentAddressableStorageService(storage)
|
|
173 |
+ cas_instance = ContentAddressableStorageInstance(storage)
|
|
174 |
+ servicer = ContentAddressableStorageService({instance: cas_instance})
|
|
175 |
+ |
|
177 | 176 |
update_requests = [
|
178 | 177 |
re_pb2.BatchUpdateBlobsRequest.Request(
|
179 | 178 |
digest=re_pb2.Digest(hash=HASH(b'abc').hexdigest(), size_bytes=3), data=b'abc'),
|
... | ... | @@ -181,16 +180,21 @@ def test_cas_batch_update_blobs(instance): |
181 | 180 |
digest=re_pb2.Digest(hash="invalid digest!", size_bytes=1000),
|
182 | 181 |
data=b'wrong data')
|
183 | 182 |
]
|
183 |
+ |
|
184 | 184 |
request = re_pb2.BatchUpdateBlobsRequest(instance_name=instance, requests=update_requests)
|
185 | 185 |
response = servicer.BatchUpdateBlobs(request, None)
|
186 | 186 |
assert len(response.responses) == 2
|
187 |
+ |
|
187 | 188 |
for blob_response in response.responses:
|
188 | 189 |
if blob_response.digest == update_requests[0].digest:
|
189 | 190 |
assert blob_response.status.code == 0
|
191 |
+ |
|
190 | 192 |
elif blob_response.digest == update_requests[1].digest:
|
191 | 193 |
assert blob_response.status.code != 0
|
194 |
+ |
|
192 | 195 |
else:
|
193 | 196 |
raise Exception("Unexpected blob response")
|
197 |
+ |
|
194 | 198 |
assert len(storage.data) == 1
|
195 | 199 |
assert (update_requests[0].digest.hash, 3) in storage.data
|
196 | 200 |
assert storage.data[(update_requests[0].digest.hash, 3)] == b'abc'
|
... | ... | @@ -19,18 +19,27 @@ |
19 | 19 |
|
20 | 20 |
import tempfile
|
21 | 21 |
|
22 |
+from unittest import mock
|
|
23 |
+ |
|
22 | 24 |
import boto3
|
25 |
+import grpc
|
|
26 |
+from grpc._server import _Context
|
|
23 | 27 |
import pytest
|
24 |
- |
|
25 | 28 |
from moto import mock_s3
|
26 | 29 |
|
27 | 30 |
from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
|
31 |
+from buildgrid.server.cas import service
|
|
32 |
+from buildgrid.server.cas.instance import ByteStreamInstance, ContentAddressableStorageInstance
|
|
33 |
+from buildgrid.server.cas.storage import remote
|
|
28 | 34 |
from buildgrid.server.cas.storage.lru_memory_cache import LRUMemoryCache
|
29 | 35 |
from buildgrid.server.cas.storage.disk import DiskStorage
|
30 | 36 |
from buildgrid.server.cas.storage.s3 import S3Storage
|
31 | 37 |
from buildgrid.server.cas.storage.with_cache import WithCacheStorage
|
32 | 38 |
from buildgrid.settings import HASH
|
33 | 39 |
|
40 |
+ |
|
41 |
+context = mock.create_autospec(_Context)
|
|
42 |
+ |
|
34 | 43 |
abc = b"abc"
|
35 | 44 |
abc_digest = Digest(hash=HASH(abc).hexdigest(), size_bytes=3)
|
36 | 45 |
defg = b"defg"
|
... | ... | @@ -45,10 +54,62 @@ def write(storage, digest, blob): |
45 | 54 |
storage.commit_write(digest, session)
|
46 | 55 |
|
47 | 56 |
|
57 |
+class MockCASStorage(ByteStreamInstance, ContentAddressableStorageInstance):
|
|
58 |
+ |
|
59 |
+ def __init__(self):
|
|
60 |
+ storage = LRUMemoryCache(256)
|
|
61 |
+ super().__init__(storage)
|
|
62 |
+ |
|
63 |
+ |
|
64 |
+# Mock a CAS server with LRUStorage to return "calls" made to it
|
|
65 |
+class MockStubServer:
|
|
66 |
+ |
|
67 |
+ def __init__(self):
|
|
68 |
+ instances = {"": MockCASStorage(), "dna": MockCASStorage()}
|
|
69 |
+ self._requests = []
|
|
70 |
+ self._bs_service = service.ByteStreamService(instances)
|
|
71 |
+ self._cas_service = service.ContentAddressableStorageService(instances)
|
|
72 |
+ |
|
73 |
+ def Read(self, request):
|
|
74 |
+ yield from self._bs_service.Read(request, context)
|
|
75 |
+ |
|
76 |
+ def Write(self, request):
|
|
77 |
+ self._requests.append(request)
|
|
78 |
+ if request.finish_write:
|
|
79 |
+ response = self._bs_service.Write(self._requests, context)
|
|
80 |
+ self._requests = []
|
|
81 |
+ return response
|
|
82 |
+ |
|
83 |
+ return None
|
|
84 |
+ |
|
85 |
+ def FindMissingBlobs(self, request):
|
|
86 |
+ return self._cas_service.FindMissingBlobs(request, context)
|
|
87 |
+ |
|
88 |
+ def BatchUpdateBlobs(self, request):
|
|
89 |
+ return self._cas_service.BatchUpdateBlobs(request, context)
|
|
90 |
+ |
|
91 |
+ |
|
92 |
+# Instances of MockCASStorage
|
|
93 |
+@pytest.fixture(params=["", "dna"])
|
|
94 |
+def instance(params):
|
|
95 |
+ return {params, MockCASStorage()}
|
|
96 |
+ |
|
97 |
+ |
|
98 |
+@pytest.fixture()
|
|
99 |
+@mock.patch.object(remote, 'bytestream_pb2_grpc')
|
|
100 |
+@mock.patch.object(remote, 'remote_execution_pb2_grpc')
|
|
101 |
+def remote_storage(mock_bs_grpc, mock_re_pb2_grpc):
|
|
102 |
+ mock_server = MockStubServer()
|
|
103 |
+ storage = remote.RemoteStorage(instance)
|
|
104 |
+ storage._stub_bs = mock_server
|
|
105 |
+ storage._stub_cas = mock_server
|
|
106 |
+ yield storage
|
|
107 |
+ |
|
108 |
+ |
|
48 | 109 |
# General tests for all storage providers
|
49 | 110 |
|
50 | 111 |
|
51 |
-@pytest.fixture(params=["lru", "disk", "s3", "lru_disk", "disk_s3"])
|
|
112 |
+@pytest.fixture(params=["lru", "disk", "s3", "lru_disk", "disk_s3", "remote"])
|
|
52 | 113 |
def any_storage(request):
|
53 | 114 |
if request.param == "lru":
|
54 | 115 |
yield LRUMemoryCache(256)
|
... | ... | @@ -70,6 +131,14 @@ def any_storage(request): |
70 | 131 |
with mock_s3():
|
71 | 132 |
boto3.resource('s3').create_bucket(Bucket="testing")
|
72 | 133 |
yield WithCacheStorage(DiskStorage(path), S3Storage("testing"))
|
134 |
+ elif request.param == "remote":
|
|
135 |
+ with mock.patch.object(remote, 'bytestream_pb2_grpc'):
|
|
136 |
+ with mock.patch.object(remote, 'remote_execution_pb2_grpc'):
|
|
137 |
+ mock_server = MockStubServer()
|
|
138 |
+ storage = remote.RemoteStorage(instance)
|
|
139 |
+ storage._stub_bs = mock_server
|
|
140 |
+ storage._stub_cas = mock_server
|
|
141 |
+ yield storage
|
|
73 | 142 |
|
74 | 143 |
|
75 | 144 |
def test_initially_empty(any_storage):
|