Signature Authorizer

Signature authorization is used for peer-to-peer communication.

Protocols can rely on authentication of peer IDs and build an association between peer IDs and permissions, with the Peer ID serving the same function as the "username" in traditional authorization frameworks, and the peer’s private key serving as the "password". Your protocols can then reject requests from untrusted peers.

The template comes with a signature authenticator for RSA and Ed25519 signature schemes.

The Mesh Template comes with RSA and Ed25519; the mesh template can also utilize Secp256k1 and ECDSA.

class TokenEd25519AuthorizerBase(AuthorizerBase):
    """
    Implements the authorization protocol for a moderated/permissioned Subnet network.

    Uses Ed25519
    """

    def __init__(self, local_private_key: Optional[Ed25519PrivateKey] = None):
        if local_private_key is None:
            local_private_key = Ed25519PrivateKey.process_wide()
        self._local_private_key = local_private_key
        self._local_public_key = local_private_key.get_public_key()

        self._local_access_token = None
        self._refresh_lock = asyncio.Lock()

        self._recent_nonces = TimedStorage()

    @abstractmethod
    async def get_token(self) -> AccessToken: ...

    @abstractmethod
    def is_token_valid(self, access_token: AccessToken) -> bool: ...

    @abstractmethod
    def does_token_need_refreshing(self, access_token: AccessToken) -> bool: ...

    async def refresh_token_if_needed(self) -> None:
        if self._local_access_token is None or self.does_token_need_refreshing(self._local_access_token):
            async with self._refresh_lock:
                if self._local_access_token is None or self.does_token_need_refreshing(self._local_access_token):
                    self._local_access_token = await self.get_token()
                    assert self.is_token_valid(self._local_access_token)

    @property
    def local_public_key(self) -> RSAPublicKey:
        return self._local_public_key

    async def sign_request(self, request: AuthorizedRequestBase, service_public_key: Optional[RSAPublicKey]) -> None:
        await self.refresh_token_if_needed()
        auth = request.auth

        auth.client_access_token.CopyFrom(self._local_access_token)

        if service_public_key is not None:
            auth.service_public_key = service_public_key.to_bytes()
        auth.time = get_dht_time()
        auth.nonce = secrets.token_bytes(8)

        assert auth.signature == b""
        auth.signature = self._local_private_key.sign(request.SerializeToString())

    _MAX_CLIENT_SERVICER_TIME_DIFF = timedelta(minutes=1)

    async def validate_request(self, request: AuthorizedRequestBase) -> bool:
        await self.refresh_token_if_needed()
        auth = request.auth

        if not self.is_token_valid(auth.client_access_token):
            logger.debug("Client failed to prove that it (still) has access to the network")
            return False

        client_public_key = RSAPublicKey.from_bytes(auth.client_access_token.public_key)
        signature = auth.signature
        auth.signature = b""
        if not client_public_key.verify(request.SerializeToString(), signature):
            logger.debug("Request has invalid signature")
            return False

        if auth.service_public_key and auth.service_public_key != self._local_public_key.to_bytes():
            logger.debug("Request is generated for a peer with another public key")
            return False

        with self._recent_nonces.freeze():
            current_time = get_dht_time()
            if abs(auth.time - current_time) > self._MAX_CLIENT_SERVICER_TIME_DIFF.total_seconds():
                logger.debug("Clocks are not synchronized or a previous request is replayed again")
                return False
            if auth.nonce in self._recent_nonces:
                logger.debug("Previous request is replayed again")
                return False

        self._recent_nonces.store(
            auth.nonce, None, current_time + self._MAX_CLIENT_SERVICER_TIME_DIFF.total_seconds() * 3
        )
        return True

    async def sign_response(self, response: AuthorizedResponseBase, request: AuthorizedRequestBase) -> None:
        await self.refresh_token_if_needed()
        auth = response.auth

        auth.service_access_token.CopyFrom(self._local_access_token)
        auth.nonce = request.auth.nonce

        assert auth.signature == b""
        auth.signature = self._local_private_key.sign(response.SerializeToString())

    async def validate_response(self, response: AuthorizedResponseBase, request: AuthorizedRequestBase) -> bool:
        await self.refresh_token_if_needed()
        auth = response.auth

        if not self.is_token_valid(auth.service_access_token):
            logger.debug("Service failed to prove that it (still) has access to the network")
            return False

        service_public_key = RSAPublicKey.from_bytes(auth.service_access_token.public_key)
        signature = auth.signature
        auth.signature = b""
        if not service_public_key.verify(response.SerializeToString(), signature):
            logger.debug("Response has invalid signature")
            return False

        if auth.nonce != request.auth.nonce:
            logger.debug("Response is generated for another request")
            return False

        return True

Last updated