... |
... |
@@ -16,8 +16,10 @@ |
16
|
16
|
|
17
|
17
|
|
18
|
18
|
from collections import namedtuple
|
|
19
|
+from datetime import datetime
|
19
|
20
|
from unittest import mock
|
20
|
21
|
import os
|
|
22
|
+import time
|
21
|
23
|
|
22
|
24
|
import grpc
|
23
|
25
|
from grpc._server import _Context
|
... |
... |
@@ -167,3 +169,60 @@ def test_jwt_authorization(token, secret, algorithm, validity): |
167
|
169
|
else:
|
168
|
170
|
context.abort.assert_called_once_with(grpc.StatusCode.UNAUTHENTICATED, mock.ANY)
|
169
|
171
|
context.set_code.assert_not_called()
|
|
172
|
+
|
|
173
|
+ # Token should have been cached now, let's test authorization again:
|
|
174
|
+ context = mock.create_autospec(_Context, spec_set=True)
|
|
175
|
+
|
|
176
|
+ handler = interceptor.intercept_service(continuator, call_details)
|
|
177
|
+ handler.unary_unary(None, context)
|
|
178
|
+
|
|
179
|
+ if validity:
|
|
180
|
+ context.set_code.assert_called_once_with(grpc.StatusCode.OK)
|
|
181
|
+ context.abort.assert_not_called()
|
|
182
|
+
|
|
183
|
+ else:
|
|
184
|
+ context.abort.assert_called_once_with(grpc.StatusCode.UNAUTHENTICATED, mock.ANY)
|
|
185
|
+ context.set_code.assert_not_called()
|
|
186
|
+
|
|
187
|
+
|
|
188
|
+@pytest.mark.skipif(not HAVE_JWT, reason="No pyjwt")
|
|
189
|
+def test_jwt_authorization_expiry():
|
|
190
|
+ secret, algorithm = 'your-256-bit-secret', AuthMetadataAlgorithm.JWT_HS256
|
|
191
|
+ now = int(datetime.utcnow().timestamp())
|
|
192
|
+ payload = {'sub': 'BuildGrid Expiry Test', 'iat': now, 'exp': now + 2}
|
|
193
|
+ token = jwt.encode(payload, secret, algorithm=algorithm.value.upper()).decode()
|
|
194
|
+
|
|
195
|
+ interceptor = AuthMetadataServerInterceptor(
|
|
196
|
+ method=AuthMetadataMethod.JWT, secret=secret, algorithm=algorithm)
|
|
197
|
+
|
|
198
|
+ # First, test generated token validation:
|
|
199
|
+ continuator = _unary_unary_rpc_terminator
|
|
200
|
+ call_details = _mock_call_details(token)
|
|
201
|
+ context = mock.create_autospec(_Context, spec_set=True)
|
|
202
|
+
|
|
203
|
+ handler = interceptor.intercept_service(continuator, call_details)
|
|
204
|
+ handler.unary_unary(None, context)
|
|
205
|
+
|
|
206
|
+ context.set_code.assert_called_once_with(grpc.StatusCode.OK)
|
|
207
|
+ context.abort.assert_not_called()
|
|
208
|
+
|
|
209
|
+ # Second, ensure cached token validation:
|
|
210
|
+ context = mock.create_autospec(_Context, spec_set=True)
|
|
211
|
+
|
|
212
|
+ handler = interceptor.intercept_service(continuator, call_details)
|
|
213
|
+ handler.unary_unary(None, context)
|
|
214
|
+
|
|
215
|
+ context.set_code.assert_called_once_with(grpc.StatusCode.OK)
|
|
216
|
+ context.abort.assert_not_called()
|
|
217
|
+
|
|
218
|
+ # Then wait for the token to expire:
|
|
219
|
+ time.sleep(3)
|
|
220
|
+
|
|
221
|
+ # Finally, test for cached-token invalidation:
|
|
222
|
+ context = mock.create_autospec(_Context, spec_set=True)
|
|
223
|
+
|
|
224
|
+ handler = interceptor.intercept_service(continuator, call_details)
|
|
225
|
+ handler.unary_unary(None, context)
|
|
226
|
+
|
|
227
|
+ context.abort.assert_called_once_with(grpc.StatusCode.UNAUTHENTICATED, mock.ANY)
|
|
228
|
+ context.set_code.assert_not_called()
|