Signature Authorizer
The signature authorizer is a peer-to-peer handshake. During the handshake, peers authenticate each other's identity.
The signature authorizer enables the ability to know who is requesting and responding in any communications, therefor can be used to build robust logic in the subnet, such as rate limiting or permissive logic.
See
subnet/protocols/mock_protocol.py
in the template to see how the peers identity is retrieved to authenticate if the peer can call inference.
The template comes with a signature authenticator for RSA and Ed25519 signature schemes.
class TokenRSAAuthorizerBase(AuthorizerBase):
"""
Implements the authorization protocol for a moderated/permissioned Subnet network.
Uses RSA
"""
def __init__(self, local_private_key: Optional[RSAPrivateKey] = None):
if local_private_key is None:
local_private_key = RSAPrivateKey.process_wide()
self._local_private_key = local_private_key
self._local_public_key = local_private_key.get_public_key()
self._local_access_token = None
self._refresh_lock = asyncio.Lock()
self._recent_nonces = TimedStorage()
async def get_token(self) -> AccessToken:
# Uses the built in template ``AccessToken`` format
token = AccessToken(
username='',
public_key=self._local_public_key.to_bytes(),
expiration_time=str(datetime.now(timezone.utc) + timedelta(minutes=1)),
)
token.signature = self._local_private_key.sign(self._token_to_bytes(token))
return token
@staticmethod
def _token_to_bytes(access_token: AccessToken) -> bytes:
return f"{access_token.username} {access_token.public_key} {access_token.expiration_time}".encode()
@property
def local_public_key(self) -> RSAPublicKey:
return self._local_public_key
async def sign_request(self, request: AuthorizedRequestBase, service_public_key: Optional[RSAPublicKey]) -> None:
auth = request.auth
local_access_token = await self.get_token()
auth.client_access_token.CopyFrom(local_access_token)
if service_public_key is not None:
auth.service_public_key = service_public_key.to_bytes()
auth.time = get_dht_time()
auth.nonce = secrets.token_bytes(8)
assert auth.signature == b""
auth.signature = self._local_private_key.sign(request.SerializeToString())
_MAX_CLIENT_SERVICER_TIME_DIFF = timedelta(minutes=1)
async def validate_request(self, request: AuthorizedRequestBase) -> bool:
auth = request.auth
client_public_key = RSAPublicKey.from_bytes(auth.client_access_token.public_key)
signature = auth.signature
auth.signature = b""
if not client_public_key.verify(request.SerializeToString(), signature):
logger.debug("Request has invalid signature")
return False
if auth.service_public_key and auth.service_public_key != self._local_public_key.to_bytes():
logger.debug("Request is generated for a peer with another public key")
return False
with self._recent_nonces.freeze():
current_time = get_dht_time()
if abs(auth.time - current_time) > self._MAX_CLIENT_SERVICER_TIME_DIFF.total_seconds():
logger.debug("Clocks are not synchronized or a previous request is replayed again")
return False
if auth.nonce in self._recent_nonces:
logger.debug("Previous request is replayed again")
return False
if auth.nonce in self._recent_nonces:
logger.debug("Previous request is replayed again")
return False
self._recent_nonces.store(
auth.nonce, None, current_time + self._MAX_CLIENT_SERVICER_TIME_DIFF.total_seconds() * 3
)
return True
async def sign_response(self, response: AuthorizedResponseBase, request: AuthorizedRequestBase) -> None:
auth = response.auth
# auth.service_access_token.CopyFrom(self._local_access_token)
local_access_token = await self.get_token()
auth.service_access_token.CopyFrom(local_access_token)
auth.nonce = request.auth.nonce
assert auth.signature == b""
auth.signature = self._local_private_key.sign(response.SerializeToString())
async def validate_response(self, response: AuthorizedResponseBase, request: AuthorizedRequestBase) -> bool:
if inspect.isasyncgen(response):
# asyncgenerator block for inference protocol
response = await anext(response)
auth = response.auth
else:
auth = response.auth
service_public_key = RSAPublicKey.from_bytes(auth.service_access_token.public_key)
signature = auth.signature
auth.signature = b""
if not service_public_key.verify(response.SerializeToString(), signature):
logger.debug("Response has invalid signature")
return False
if auth.nonce != request.auth.nonce:
logger.debug("Response is generated for another request")
return False
return True
Last updated