Predicate Validator

A general-purpose DHT validator that delegates all validation logic to a custom predicate.

This is a minimal validator that can enforce any condition on the entire DHTRecord, time conditions, etc. Useful for filtering keys, expiration time, value content, accessing the blockchain, or any combination thereof.

The predicate validator can enforce a maximum expiration time for all records, functioning similarly to pruning mechanisms in blockchains. Specifically, by default in the mock predicate validator, each peer heartbeat has a default maximum expiration of one epoch. This ensures that heartbeat records expire after one epoch period, requiring peers to submit fresh heartbeats in each subsequent epoch to maintain their active status.

Hypertensor Predicate Validator

Similar to the Predicate Validator, but instead the callable takes in the current epoch data as:

This is useful for having conditions based on time, such as for commit-reveal schemes that should be synced with the Hypertensor blockchain clock.

class HypertensorPredicateValidator(RecordValidatorBase):
    def __init__(self, record_predicate: Callable[[DHTRecord, DHTRecordRequestType], bool]):
        self.record_predicate = record_predicate

    @classmethod
    def from_predicate_class(
        cls,
        predicate_cls: type,
        *args,
        **kwargs
    ) -> "HypertensorPredicateValidator":
        """
        Example:
            HypertensorPredicateValidator.from_predicate_class(
                HypertensorConsensusPredicate, hypertensor, subnet_id
            )
        """
        predicate = predicate_cls(*args, **kwargs)
        return cls(record_predicate=predicate)

    def validate(self, record: DHTRecord, type: DHTRecordRequestType) -> bool:
        return self.record_predicate(record, type)

    def sign_value(self, record: DHTRecord) -> bytes:
        return record.value

    def strip_value(self, record: DHTRecord) -> bytes:
        return record.value

    def merge_with(self, other: RecordValidatorBase) -> bool:
        if not isinstance(other, HypertensorPredicateValidator):
            return False

        # Ignore another KeyValidator instance (it doesn't make sense to have several
        # instances of this class) and report successful merge
        return True

    @property
    def priority(self) -> int:
        # Priority is less than SignatureValidator
        return 9

Usage

In the following example, the Hypertensor Predicate Validator is used for a commit-reveal schema and general conditions to ensure only specific keys can be stored, when keys can be stored (based on epoch progression, i.e., the percentage completion of an epoch), and how long they can be stored for (maximum expiration).

Heartbeat

Nodes make periodic updates to the database to show they are still running the node with a maximum expiration time, and can only be stored under the "node" key up to 100 stores per epoch.

Commit-Reveal Scheme

Each commit-reveal key is specified on the current epoch. A node cannot copy other nodes after the commit phase, nodes cannot submit data on previous or future epochs, and nodes can only commit and reveal in the specific phases of an epoch.

Note

To use this in production, a Pydantic validator should be used to ensure peers are storing the correct values.

If a node attempts to store data that doesn't abide by these conditions, it will not be stored.

def get_mock_commit_key(epoch: int) -> str:
    return f"commit_epoch_{epoch}"

def get_mock_reveal_key(epoch: int) -> str:
    return f"reveal_epoch_{epoch}"

def get_mock_consensus_key(epoch: int) -> str:
    return f"consensus_epoch_{epoch}"

"""
Store something by the 15% progress of the epoch
"""
CONSENSUS_STORE_DEADLINE = 0.15

# peer commit-reveal epoch percentage elapsed deadlines
COMMIT_DEADLINE = 0.5
REVEAL_DEADLINE = 0.6

"""
Expiration validations

Add expirations for each key stored
"""
MAX_HEART_BEAT_TIME = BLOCK_SECS * EPOCH_LENGTH * 1.1   # Max 1.1 epochs
MAX_CONSENSUS_TIME = BLOCK_SECS * EPOCH_LENGTH * 2      # Max 2 epochs
MAX_COMMIT_TIME = BLOCK_SECS * EPOCH_LENGTH * 2         # Max 2 epochs
MAX_REVEAL_TIME = BLOCK_SECS * EPOCH_LENGTH * 2         # Max 2 epochs

class MockHypertensorCommitReveal:

    MAX_EPOCH_HISTORY = 5 # How many epochs to store peer epoch history before clean up

    def __init__(self, hypertensor: Hypertensor, subnet_id: int):
        self.hypertensor = hypertensor
        self.subnet_id = subnet_id

        # Store any data required for logic
        self.slot: int | None = None
        self._epoch_data: Optional[EpochData] = None
        self._peer_store_tracker = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))

        # Stores per peer per epoch
        self.per_peer_epoch_limits = {
            "node": 100,
            "consensus": 1,
            "commit": 1,
            "reveal": 1,
        }

        # Store `self.slot`
        self._ensure_slot()

    # Create any functions required for the logic
    def _ensure_slot(self):
        if self.slot is None:
            subnet_info = self.hypertensor.get_formatted_subnet_info(self.subnet_id)
            self.slot = subnet_info.slot_index
        return self.slot

    def epoch_data(self) -> EpochData:
        return self.hypertensor.get_subnet_epoch_data(self._ensure_slot())

    def _has_exceeded_store_limit(self, peer_id: str, key_type: str, epoch: int) -> bool:
        """Check if the peer has already hit their per-epoch limit for this key type."""
        limit = self.per_peer_epoch_limits.get(key_type, 1)
        count = self._peer_store_tracker[epoch][key_type][peer_id]
        logger.info(f"Current node key count {count}")
        if count >= limit:
            logger.debug(
                f"Peer {peer_id} exceeded store limit for {key_type} (epoch {epoch}, "
                f"count={count}, limit={limit})"
            )
            return True
        return False

    def _record_peer_store(self, peer_id: str, key_type: str, epoch: int):
        """Increment peer store counter after a successful PUT."""
        self._peer_store_tracker[epoch][key_type][peer_id] += 1
        new_count = self._peer_store_tracker[epoch][key_type][peer_id]
        logger.debug(
            f"Recorded store for {peer_id} → {key_type} @ epoch {epoch} "
            f"(new count={new_count})"
        )

    def _cleanup_old_epochs(self, current_epoch: int):
        """Remove records older than MAX_EPOCH_HISTORY epochs."""
        old_epochs = [
            e for e in self._peer_store_tracker.keys()
            if e < current_epoch - self.MAX_EPOCH_HISTORY
        ]
        for e in old_epochs:
            del self._peer_store_tracker[e]
            logger.debug(f"Cleaned up tracking data for old epoch {e}")

    def _get_key_type(self, record: DHTRecord, current_epoch: int) -> Optional[int]:
        """
        Create schemas here
        You can use libraries like Pydantic to define schemas for keys, subkeys, and values

        TODO: Persist this data in a local database for persistance on validator node restarts
        """
        valid_keys = {
            # Heartbeat
            DHTID.generate(source="node").to_bytes(): "node",
            # ⸺ 0-15%
            DHTID.generate(source=f"consensus_epoch_{current_epoch}").to_bytes(): "consensus",
            # ⸺ 15-50%
            DHTID.generate(source=f"commit_epoch_{current_epoch}").to_bytes(): "commit",
            # ⸺ 50-60%
            DHTID.generate(source=f"reveal_epoch_{current_epoch}").to_bytes(): "reveal",
        }

        return valid_keys.get(record.key, None)

    def __call__(self, record: DHTRecord, type: DHTRecordRequestType) -> bool:
        """
        Callable interface
        """
        try:
            # Get caller peer ID
            # This also ensures the record has a public key as a subkey
            # NOTE: To use this `SignatureValidator` must be implemented with priority
            caller_peer_id = extract_peer_id_from_record_validator(record.subkey)
            if caller_peer_id is None:
                return False

            logger.info(f"caller_peer_id: {caller_peer_id}")

            if type is DHTRecordRequestType.GET:
                logger.debug(f"{caller_peer_id} requested GET")
                return True

            # Get `EpochData`
            epoch_data = self.epoch_data()
            current_epoch = epoch_data.epoch
            percent_complete = epoch_data.percent_complete # Get progress of epoch for commit-reveal phases

            logger.debug(f"{caller_peer_id} is storing data at slot {self.slot}, epoch={current_epoch}")

            # Clean up old keys
            self._cleanup_old_epochs(current_epoch)

            # Get valid key type
            key_type = self._get_key_type(record, current_epoch)
            logger.info(f"key_type: {key_type}")
            if key_type is None:
                return False

            # Verify peer store limit condition
            if self._has_exceeded_store_limit(caller_peer_id, key_type, current_epoch):
                return False

            dht_time = get_dht_time()

            """
            Logic here can be extended to account for any conditions, such as requiring only specific
            on-chain node classifications to allow to store data into the DHT outside of the "node"
            heartbeat, etc.
            """

            # ───── DEADLINES AND EXPIRATIONS ─────
            if key_type == "node":
                max_expiration = dht_time + MAX_HEART_BEAT_TIME
                if record.expiration_time > max_expiration:
                    return False

            elif key_type == "consensus":
                if percent_complete > CONSENSUS_STORE_DEADLINE:
                    return False
                if record.expiration_time > dht_time + MAX_CONSENSUS_TIME:
                    return False

            elif key_type == "commit":
                if percent_complete <= CONSENSUS_STORE_DEADLINE or percent_complete > COMMIT_DEADLINE:
                    return False
                if record.expiration_time > dht_time + MAX_COMMIT_TIME:
                    return False

            elif key_type == "reveal":
                if percent_complete <= COMMIT_DEADLINE or percent_complete > REVEAL_DEADLINE:
                    return False
                if record.expiration_time > dht_time + MAX_REVEAL_TIME:
                    return False

            self._record_peer_store(caller_peer_id, key_type, current_epoch)

            return True
        except Exception as e:
            logger.warning(f"MockHypertensorCommitReveal error: {e}")
            return False

Last updated