How It Works
Authentication Wrapper
class AuthRPCWrapperStreamer:
def __init__(
self,
stub,
role: AuthRole,
authorizer: Optional[AuthorizerBase],
service_public_key: Optional[RSAPublicKey] = None
):
self._stub = stub
self._role = role
self._authorizer = authorizer
self._service_public_key = service_public_key
def __getattribute__(self, name: str):
if not name.startswith("rpc_"):
return object.__getattribute__(self, name)
stub = object.__getattribute__(self, "_stub")
method = getattr(stub, name)
role = object.__getattribute__(self, "_role")
authorizer = object.__getattribute__(self, "_authorizer")
service_public_key = object.__getattribute__(self, "_service_public_key")
if inspect.isasyncgenfunction(method):
@functools.wraps(method)
async def wrapped_stream_rpc(request, *args, **kwargs):
if authorizer:
if role == AuthRole.CLIENT:
await authorizer.sign_request(request, service_public_key)
elif role == AuthRole.SERVICER:
if not await authorizer.validate_request(request):
return
async for response in method(request, *args, **kwargs):
if self._authorizer:
if self._role == AuthRole.SERVICER:
await self._authorizer.sign_response(response, request)
elif self._role == AuthRole.CLIENT:
if not await self._authorizer.validate_response(response, request):
continue
yield response
return wrapped_stream_rpc
else:
@functools.wraps(method)
async def wrapped_unary_rpc(request, *args, **kwargs):
print("wrapped_unary_rpc")
if authorizer:
if role == AuthRole.CLIENT:
await authorizer.sign_request(request, service_public_key)
elif role == AuthRole.SERVICER:
if not await authorizer.validate_request(request):
return None
response = await method(request, *args, **kwargs)
if authorizer:
if role == AuthRole.SERVICER:
await authorizer.sign_response(response, request)
elif role == AuthRole.CLIENT:
if not await authorizer.validate_response(response, request):
return None
return response
return wrapped_unary_rpc
Implementation
Last updated