How To Use

Authorizers can be used in protocols by wrapping the P2P get_stub method to ensure all communication between nodes is authenticated by the specified authenticator (see the protocol example for an example on how this is implemented).

Authentication Wrapper

The authentication wrapper is designed for both single-response and multi-response (async generator) methods.

class AuthRPCWrapperStreamer:
    def __init__(
        self,
        stub,
        role: AuthRole,
        authorizer: Optional[AuthorizerBase],
        service_public_key: Optional[RSAPublicKey] = None
    ):
        self._stub = stub
        self._role = role
        self._authorizer = authorizer
        self._service_public_key = service_public_key

    def __getattribute__(self, name: str):
        if not name.startswith("rpc_"):
            return object.__getattribute__(self, name)

        stub = object.__getattribute__(self, "_stub")
        method = getattr(stub, name)
        role = object.__getattribute__(self, "_role")
        authorizer = object.__getattribute__(self, "_authorizer")
        service_public_key = object.__getattribute__(self, "_service_public_key")

        if inspect.isasyncgenfunction(method):
            @functools.wraps(method)
            async def wrapped_stream_rpc(request, *args, **kwargs):
                if authorizer:
                    if role == AuthRole.CLIENT:
                        await authorizer.sign_request(request, service_public_key)
                    elif role == AuthRole.SERVICER:
                        if not await authorizer.validate_request(request):
                            return

                async for response in method(request, *args, **kwargs):
                    if self._authorizer:
                        if self._role == AuthRole.SERVICER:
                            await self._authorizer.sign_response(response, request)
                        elif self._role == AuthRole.CLIENT:
                            if not await self._authorizer.validate_response(response, request):
                                continue

                    yield response

            return wrapped_stream_rpc
        else:
            @functools.wraps(method)
            async def wrapped_unary_rpc(request, *args, **kwargs):
                print("wrapped_unary_rpc")
                if authorizer:
                    if role == AuthRole.CLIENT:
                        await authorizer.sign_request(request, service_public_key)
                    elif role == AuthRole.SERVICER:
                        if not await authorizer.validate_request(request):
                            return None

                response = await method(request, *args, **kwargs)

                if authorizer:
                    if role == AuthRole.SERVICER:
                        await authorizer.sign_response(response, request)
                    elif role == AuthRole.CLIENT:
                        if not await authorizer.validate_response(response, request):
                            return None

                return response

            return wrapped_unary_rpc

Implementation

In each protocol, there should be two functions: a get_stub function and a class method get_server_stub that can be called outside the class from other protocols to gather the RPC methods. These functions return the RPC methods of the node requested.

In each protocol's child class, we call add_p2p_handlers(), this function will add the RPC methods for the node for the protocol. When get_stub is called on a node, it will gather all of the methods in the protocol that start with rpc_* the node registered. When the get_stub function is wrapped in an authenticator, both the request and response will be validated based on the specified authentication logic used in the authenticator used.

def get_stub(self, p2p: P2P, peer: PeerID) -> AuthRPCWrapperStreamer:
    """
    Get a stub that sends requests to a given peer.

    It's important here to wrap the stub with an authentication wrapper, see AuthRPCWrapper
    """
    stub = super().get_stub(p2p, peer)
    return AuthRPCWrapperStreamer(stub, AuthRole.CLIENT, self.authorizer, service_public_key=None)

@classmethod
def get_server_stub(
    cls,
    p2p: P2P,
    peer: PeerID,
    authorizer: Optional[AuthorizerBase] = None
) -> "InferenceProtocolStub":  # type: ignore # noqa: F821
    """
    Get a stub that sends requests to a given peer.

    This function can be used to get the RPC methods from this protocol outside of this class.

    This is useful for client-side requests.
    """

    stub = super().get_stub(p2p, peer)
    return AuthRPCWrapperStreamer(stub, AuthRole.CLIENT, authorizer, service_public_key=None)

Last updated