PoS

Proof-of-Stake (PoS) Authenticator

For security-critical operations — especially those involving subnet membership and DHT participation — Hypertensor recommends using the Proof-of-Stake (PoS) Authenticator.

This authenticator ensures that only nodes with a valid on-chain stake can:

  • Join the DHT

  • Update routing tables

  • Register heartbeat or protocol records

  • Participate in critical peer-to-peer interactions


How It Works

The PoS authenticator combines cryptographic identity verification with on-chain staking validation.

When a node attempts to make a DHT request, the authenticator:

  1. Verifies the Peer ID Signature

    • Supports both RSA and Ed25519 schemes.

    • Ensures the peer controls the corresponding private key.

  2. Checks On-Chain Stake

    • Queries the Hypertensor blockchain to verify that the peer ID is staked in the target subnet_id.

    • Prevents unauthenticated or free-rider nodes from participating in the network.

  3. Caches the Result

    • After a successful stake verification, the result is cached for 5 minutes to reduce load on the chain.

    • This cooldown ensures a balance between security and performance.


When to Use It

The PoS Authenticator is ideal for:

  • Joining the DHT

  • Updating DHT routing tables

  • Publishing or modifying DHT records

  • Authenticating peers in sensitive protocols (e.g., scoring, validation, consensus)

⚠️ Nodes without a valid stake will be rejected from performing any critical or trusted actions in the network.


Built-In Support

The Hypertensor subnet template includes ready-to-use PoS authenticators for:

  • Ed25519

  • RSA

You can select the appropriate one depending on your key infrastructure.

class Ed25519ProofOfStakeAuthorizer(AuthorizerBase):
    """
    Implements a proof-of-stake authorization protocol using Ed25519 keys
    Checks the Hypertensor network for nodes `peer_id` is staked.
    The ``peer_id`` is retrieved using the Ed25519 public key
    """

    def __init__(
        self,
        local_private_key: Ed25519PrivateKey,
        subnet_id: int,
        hypertensor: Hypertensor
    ):
        super().__init__()
        self._local_private_key = local_private_key
        self._local_public_key = local_private_key.get_public_key()
        self.subnet_id = subnet_id
        self.hypertensor = hypertensor
        self.peer_id_to_last_successful_pos: Dict[PeerID, float] = {}
        self.pos_success_cooldown = 300
        self.peer_id_to_last_failed_pos: Dict[PeerID, float] = {}
        self.pos_fail_cooldown: float = 300  # 5 minutes default cooldown

    async def get_token(self) -> AccessToken:
        token = AccessToken(
            username="",
            public_key=self._local_public_key.to_bytes(),
            expiration_time=str(datetime.now(timezone.utc) + timedelta(minutes=1)),
        )
        token.signature = self._local_private_key.sign(self._token_to_bytes(token))
        return token

    @staticmethod
    def _token_to_bytes(access_token: AccessToken) -> bytes:
        return f"{access_token.username} {access_token.public_key} {access_token.expiration_time}".encode()

    async def sign_request(
        self, request: AuthorizedRequestBase, service_public_key: Optional[Ed25519PublicKey]
    ) -> None:
        auth = request.auth

        local_access_token = await self.get_token()
        auth.client_access_token.CopyFrom(local_access_token)

        if service_public_key is not None:
            auth.service_public_key = service_public_key.to_bytes()
        auth.time = get_dht_time()
        auth.nonce = secrets.token_bytes(8)

        assert auth.signature == b""
        auth.signature = self._local_private_key.sign(request.SerializeToString())

    _MAX_CLIENT_SERVICER_TIME_DIFF = timedelta(minutes=1)

    async def validate_request(self, request: AuthorizedRequestBase) -> bool:
        auth = request.auth

        # Get public key of signer
        try:
            client_public_key = Ed25519PublicKey.from_bytes(auth.client_access_token.public_key)
        except Exception as e:
            logger.debug(f"Failed to get Ed25519 public key from bytes, Err: {e}", exc_info=True)
            return False

        signature = auth.signature
        auth.signature = b""
        # Verify signature of the request from signer
        if not client_public_key.verify(request.SerializeToString(), signature):
            logger.debug("Request has invalid signature")
            return False

        if auth.service_public_key and auth.service_public_key != self._local_public_key.to_bytes():
            logger.debug("Request is generated for a peer with another public key")
            return False

        # Verify proof of stake
        try:
            proof_of_stake = self.proof_of_stake(client_public_key)
            return proof_of_stake
        except Exception as e:
            logger.debug("Proof of stake failed, validate request", e, exc_info=True)

        return False

    async def sign_response(self, response: AuthorizedResponseBase, request: AuthorizedRequestBase) -> None:
        auth = response.auth

        local_access_token = await self.get_token()
        auth.service_access_token.CopyFrom(local_access_token)

        # auth.service_access_token.CopyFrom(self._local_public_key)
        auth.nonce = request.auth.nonce

        assert auth.signature == b""
        auth.signature = self._local_private_key.sign(response.SerializeToString())

    async def validate_response(self, response: AuthorizedResponseBase, request: AuthorizedRequestBase) -> bool:
        auth = response.auth

        service_public_key = Ed25519PublicKey.from_bytes(auth.service_access_token.public_key)
        signature = auth.signature
        auth.signature = b""
        if not service_public_key.verify(response.SerializeToString(), signature):
            logger.debug("Response has invalid signature")
            return False

        if auth.nonce != request.auth.nonce:
            logger.debug("Response is generated for another request")
            return False

        try:
            proof_of_stake = self.proof_of_stake(service_public_key)
            return proof_of_stake
        except Exception as e:
            logger.debug("Proof of stake failed, validate response", e, exc_info=True)

        return False

    def get_peer_id_last_success(self, peer_id: PeerID) -> float:
        return self.peer_id_to_last_successful_pos.get(peer_id, 0)

    def update_peer_id_success(self, peer_id: PeerID):
        self.peer_id_to_last_successful_pos[peer_id] = get_dht_time()
        self.peer_id_to_last_failed_pos.pop(peer_id, None)

    def get_peer_id_last_fail(self, peer_id: PeerID) -> float:
        return self.peer_id_to_last_failed_pos.get(peer_id, 0)

    def update_peer_id_fail(self, peer_id: PeerID):
        self.peer_id_to_last_failed_pos[peer_id] = get_dht_time()
        self.peer_id_to_last_successful_pos.pop(peer_id, None)

    def proof_of_stake(self, public_key: Ed25519PublicKey) -> bool:
        peer_id: Optional[PeerID] = get_ed25519_peer_id(public_key)
        if peer_id is None:
            return False

        now = get_dht_time()

        # Recently failed — reject immediately
        last_fail = self.get_peer_id_last_fail(peer_id)
        if last_fail and now - last_fail < self.pos_fail_cooldown:
            return False

        # Recent success — no need to check again
        last_success = self.get_peer_id_last_success(peer_id)
        if last_success and now - last_success < self.pos_success_cooldown:
            return True

        # On-chain proof of stake check
        peer_id_vec = self.to_vec_u8(peer_id.to_base58())
        proof_of_stake = self.is_staked(peer_id_vec)

        if proof_of_stake:
            self.update_peer_id_success(peer_id)
            return True
        else:
            self.update_peer_id_fail(peer_id)
            return False

    def to_vec_u8(self, string):
        """Get peer_id in Vec<u8> for blockchain"""
        return [ord(char) for char in string]

    def is_staked(self, peer_id_vector) -> bool:
        """
        Each subnet node must be staked
        """
        is_staked = self.is_subnet_node_staked(peer_id_vector)

        return is_staked

    def is_subnet_node_staked(self, peer_id_vector) -> bool:
        """
        Uses the Hypertensor `proof_of_stake` RPC method that checks
        for a subnet nodes peer_id and bootstrap_peer_id being staked
        """
        result = self.hypertensor.proof_of_stake(self.subnet_id, peer_id_vector)

        if "result" not in result:
            return False

        # must be True or False
        if result["result"] is not True and result["result"] is not False:
            return False

        return result["result"]

    @property
    def local_public_key(self) -> Ed25519PublicKey:
        return self._local_public_key

Last updated