# Signature Authorizer

The signature authorizer is a key-interoperable peer-to-peer handshake. During the handshake, peers authenticate each other's identity.

The signature authorizer enables the ability to know who is requesting and responding in any communications, therefor can be used to build robust logic in the subnet, such as rate limiting or permissive logic.

> See `subnet/protocols/mock_protocol.py` in the template to see how the peers identity is retrieved to authenticate if the peer can call inference.

The template comes with a signature authenticator that is interoperable between RSA and Ed25519 signature schemes.

{% hint style="info" %}
The Subnet Template comes with RSA and Ed25519; the subnet template can also utilize Secp256k1 and ECDSA.
{% endhint %}

```python
class SignatureAuthorizer(AuthorizerBase):
    def __init__(self, local_private_key: Ed25519PrivateKey | RSAPrivateKey):
        self._key_type = KeyType.RSA if isinstance(local_private_key, RSAPrivateKey) else KeyType.Ed25519
        self._local_private_key = local_private_key
        self._local_public_key = local_private_key.get_public_key()

        self._local_access_token = None
        self._refresh_lock = asyncio.Lock()

        self._recent_nonces = TimedStorage()

    async def get_token(self) -> AccessToken:
        # Uses the built in template ``AccessToken`` format
        token = AccessToken(
            username='',
            public_key=self._local_public_key.to_bytes(),
            expiration_time=str(datetime.now(timezone.utc) + timedelta(minutes=1)),
        )
        token.signature = self._local_private_key.sign(self._token_to_bytes(token))
        return token

    @staticmethod
    def _token_to_bytes(access_token: AccessToken) -> bytes:
        return f"{access_token.username} {access_token.public_key} {access_token.expiration_time}".encode()

    @property
    def local_public_key(self) -> Ed25519PublicKey | RSAPublicKey:
        return self._local_public_key

    async def sign_request(self, request: AuthorizedRequestBase, service_public_key: Optional[Ed25519PublicKey | RSAPublicKey]) -> None:
        auth = request.auth

        local_access_token = await self.get_token()
        auth.client_access_token.CopyFrom(local_access_token)

        if service_public_key is not None:
            auth.service_public_key = service_public_key.to_bytes()
        auth.time = get_dht_time()

        auth.nonce = secrets.token_bytes(8)

        assert auth.signature == b""
        auth.signature = self._local_private_key.sign(request.SerializeToString())

    _MAX_CLIENT_SERVICER_TIME_DIFF = timedelta(minutes=1)

    async def do_validate_request(self, request: AuthorizedRequestBase) -> Tuple[RSAPublicKey | Ed25519PublicKey, float, bytes, bool]:
        """
        Returns:
            public key, current time, nonce, verified
        """
        auth = request.auth

        client_public_key = load_public_key_from_bytes(auth.client_access_token.public_key)

        signature = auth.signature
        auth.signature = b""
        if not client_public_key.verify(request.SerializeToString(), signature):
            logger.debug("Request has invalid signature")
            return client_public_key, 0.0, auth.nonce, False

        if auth.service_public_key and auth.service_public_key != self._local_public_key.to_bytes():
            logger.debug("Request is generated for a peer with another public key")
            return client_public_key, 0.0, auth.nonce, False

        with self._recent_nonces.freeze():
            current_time = get_dht_time()
            if abs(auth.time - current_time) > self._MAX_CLIENT_SERVICER_TIME_DIFF.total_seconds():
                logger.debug("Clocks are not synchronized or a previous request is replayed again")
                return client_public_key, current_time, auth.nonce, False
            if auth.nonce in self._recent_nonces:
                logger.debug("Previous request is replayed again")
                return client_public_key, current_time, auth.nonce, False

        return client_public_key, current_time, auth.nonce, True

    async def validate_request(self, request: AuthorizedRequestBase) -> bool:
        _, current_time, nonce, valid = await self.do_validate_request(request)
        if not valid:
            return False

        self._recent_nonces.store(
            nonce, None, current_time + self._MAX_CLIENT_SERVICER_TIME_DIFF.total_seconds() * 3
        )

        return True

    async def sign_response(self, response: AuthorizedResponseBase, request: AuthorizedRequestBase) -> None:
        auth = response.auth

        # auth.service_access_token.CopyFrom(self._local_access_token)
        local_access_token = await self.get_token()
        auth.service_access_token.CopyFrom(local_access_token)
        auth.nonce = request.auth.nonce

        assert auth.signature == b""
        auth.signature = self._local_private_key.sign(response.SerializeToString())

    async def do_validate_response(self, response: AuthorizedResponseBase, request: AuthorizedRequestBase) -> Tuple[RSAPublicKey | Ed25519PublicKey, bool]:
        auth = response.auth

        service_public_key = load_public_key_from_bytes(auth.service_access_token.public_key)

        signature = auth.signature
        auth.signature = b""
        if not service_public_key.verify(response.SerializeToString(), signature):
            logger.debug("Response has invalid signature")
            return service_public_key, False

        if auth.nonce != request.auth.nonce:
            logger.debug("Response is generated for another request")
            return service_public_key, False

        return service_public_key, True

    async def validate_response(self, response: AuthorizedResponseBase, request: AuthorizedRequestBase) -> bool:
        _, valid = await self.do_validate_response(response, request)
        return valid

```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.hypertensor.org/subnet-template/authorizers/signature-authorizer.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
