How To Use
Authorizers can be used in protocols by wrapping the P2P get_stub
method to ensure all communication between nodes is authenticated by the specified authenticator (see the protocol example for an example on how this is implemented).
Authentication Wrapper
The authentication wrapper is designed for both single-response and multi-response (async generator) methods.
class AuthRPCWrapperStreamer:
def __init__(
self,
stub,
role: AuthRole,
authorizer: Optional[AuthorizerBase],
service_public_key: Optional[RSAPublicKey] = None
):
self._stub = stub
self._role = role
self._authorizer = authorizer
self._service_public_key = service_public_key
def __getattribute__(self, name: str):
if not name.startswith("rpc_"):
return object.__getattribute__(self, name)
stub = object.__getattribute__(self, "_stub")
method = getattr(stub, name)
role = object.__getattribute__(self, "_role")
authorizer = object.__getattribute__(self, "_authorizer")
service_public_key = object.__getattribute__(self, "_service_public_key")
if inspect.isasyncgenfunction(method):
@functools.wraps(method)
async def wrapped_stream_rpc(request, *args, **kwargs):
if authorizer:
if role == AuthRole.CLIENT:
await authorizer.sign_request(request, service_public_key)
elif role == AuthRole.SERVICER:
if not await authorizer.validate_request(request):
return
async for response in method(request, *args, **kwargs):
if self._authorizer:
if self._role == AuthRole.SERVICER:
await self._authorizer.sign_response(response, request)
elif self._role == AuthRole.CLIENT:
if not await self._authorizer.validate_response(response, request):
continue
yield response
return wrapped_stream_rpc
else:
@functools.wraps(method)
async def wrapped_unary_rpc(request, *args, **kwargs):
print("wrapped_unary_rpc")
if authorizer:
if role == AuthRole.CLIENT:
await authorizer.sign_request(request, service_public_key)
elif role == AuthRole.SERVICER:
if not await authorizer.validate_request(request):
return None
response = await method(request, *args, **kwargs)
if authorizer:
if role == AuthRole.SERVICER:
await authorizer.sign_response(response, request)
elif role == AuthRole.CLIENT:
if not await authorizer.validate_response(response, request):
return None
return response
return wrapped_unary_rpc
Implementation
In each protocol, there should be two functions: a get_stub
function and a class method get_server_stub
that can be called outside the class from other protocols to gather the RPC methods. These functions return the RPC methods of the node requested.
In each protocol's child class, we call add_p2p_handlers()
, this function will add the RPC methods for the node for the protocol.
When get_stub
is called on a node, it will gather all of the methods in the protocol that start with rpc_*
the node registered. When the get_stub
function is wrapped in an authenticator, both the request and response will be validated based on the specified authentication logic used in the authenticator used.
def get_stub(self, p2p: P2P, peer: PeerID) -> AuthRPCWrapperStreamer:
"""
Get a stub that sends requests to a given peer.
It's important here to wrap the stub with an authentication wrapper, see AuthRPCWrapper
"""
stub = super().get_stub(p2p, peer)
return AuthRPCWrapperStreamer(stub, AuthRole.CLIENT, self.authorizer, service_public_key=None)
@classmethod
def get_server_stub(
cls,
p2p: P2P,
peer: PeerID,
authorizer: Optional[AuthorizerBase] = None
) -> "InferenceProtocolStub": # type: ignore # noqa: F821
"""
Get a stub that sends requests to a given peer.
This function can be used to get the RPC methods from this protocol outside of this class.
This is useful for client-side requests.
"""
stub = super().get_stub(p2p, peer)
return AuthRPCWrapperStreamer(stub, AuthRole.CLIENT, authorizer, service_public_key=None)
Last updated