PoS
Proof-of-Stake (PoS) Authenticator
For security-critical operations — especially those involving subnet membership and DHT participation — Hypertensor recommends using the Proof-of-Stake (PoS) Authenticator.
This authenticator ensures that only nodes with a valid on-chain stake can:
Join the DHT
Update routing tables
Register heartbeat or protocol records
Participate in critical peer-to-peer interactions
How It Works
The PoS authenticator combines cryptographic identity verification with on-chain staking validation. It utilizes the Signature Authorizer for cryptographic handshakes.
When a node attempts to make a DHT request, the authenticator:
Verifies the Peer ID Signature
The Mesh Template comes with the option to use RSA or Ed25519 schemes.
Ensures the peer controls the corresponding private key.
Checks On-Chain Stake
Queries the Hypertensor blockchain to verify that the peer ID is staked in the target
subnet_id.Prevents unauthenticated or free-rider nodes from participating in the network.
Caches the Result
After a successful stake verification, the result is cached for 5 minutes to reduce load on the chain.
This cooldown ensures a balance between security and performance.
When to Use It
The PoS Authenticator is ideal for:
Joining the DHT
Updating DHT routing tables
Publishing or modifying DHT records
Authenticating peers in sensitive protocols (e.g., scoring, validation, consensus)
⚠️ Nodes without a valid stake will be rejected from performing any critical or trusted actions in the network.
Built-In Support
The Hypertensor subnet template includes ready-to-use PoS authenticators for:
Ed25519RSA
You can select the appropriate one depending on your key infrastructure.
class ProofOfStakeAuthorizer(AuthorizerBase):
def __init__(
self,
signature_authorizer: SignatureAuthorizer,
pos: ProofOfStake
):
super().__init__()
self.signature_authorizer = signature_authorizer
self.pos = pos
async def sign_request(
self,
request: AuthorizedRequestBase,
service_public_key: Optional[Ed25519PublicKey | RSAPublicKey]
) -> None:
await self.signature_authorizer.sign_request(request, service_public_key)
async def validate_request(self, request: AuthorizedRequestBase) -> bool:
client_public_key, current_time, nonce, valid = await self.signature_authorizer.do_validate_request(request)
if not valid:
return False
# Verify proof of stake
try:
proof_of_stake = self.pos.proof_of_stake(client_public_key)
if not proof_of_stake:
return False
except Exception as e:
logger.debug(f"Proof of stake failed, validate_request={e}", exc_info=True)
return False
self.signature_authorizer._recent_nonces.store(
nonce, None, current_time + self.signature_authorizer._MAX_CLIENT_SERVICER_TIME_DIFF.total_seconds() * 3
)
return True
async def sign_response(self, response: AuthorizedResponseBase, request: AuthorizedRequestBase) -> None:
await self.signature_authorizer.sign_response(response, request)
async def validate_response(self, response: AuthorizedResponseBase, request: AuthorizedRequestBase) -> bool:
service_public_key, valid = await self.signature_authorizer.do_validate_response(response, request)
if not valid:
return False
# Verify proof of stake
try:
proof_of_stake = self.pos.proof_of_stake(service_public_key)
if not proof_of_stake:
return False
except Exception as e:
logger.debug(f"Proof of stake failed, validate_response={e}", exc_info=True)
return False
return TrueLast updated