Predicate Validator
A general-purpose DHT validator that delegates all validation logic to a custom predicate.
This is a minimal validator that can enforce any condition on the entire DHTRecord, time conditions, etc. Useful for filtering keys, expiration time, value content, accessing the blockchain, or any combination thereof.
The predicate validator can enforce a maximum expiration time for all records, functioning similarly to pruning mechanisms in blockchains. Specifically, by default in the mock predicate validator, each peer heartbeat has a default maximum expiration of one epoch. This ensures that heartbeat records expire after one epoch period, requiring peers to submit fresh heartbeats in each subsequent epoch to maintain their active status.
Hypertensor Predicate Validator
Similar to the Predicate Validator, but instead the callable takes in the current epoch data as:
This is useful for having conditions based on time, such as for commit-reveal schemes that should be synced with the Hypertensor blockchain clock.
class HypertensorPredicateValidator(RecordValidatorBase):
def __init__(self, record_predicate: Callable[[DHTRecord, DHTRecordRequestType], bool]):
self.record_predicate = record_predicate
@classmethod
def from_predicate_class(
cls,
predicate_cls: type,
*args,
**kwargs
) -> "HypertensorPredicateValidator":
"""
Example:
HypertensorPredicateValidator.from_predicate_class(
HypertensorConsensusPredicate, hypertensor, subnet_id
)
"""
predicate = predicate_cls(*args, **kwargs)
return cls(record_predicate=predicate)
def validate(self, record: DHTRecord, type: DHTRecordRequestType) -> bool:
return self.record_predicate(record, type)
def sign_value(self, record: DHTRecord) -> bytes:
return record.value
def strip_value(self, record: DHTRecord) -> bytes:
return record.value
def merge_with(self, other: RecordValidatorBase) -> bool:
if not isinstance(other, HypertensorPredicateValidator):
return False
# Ignore another KeyValidator instance (it doesn't make sense to have several
# instances of this class) and report successful merge
return True
@property
def priority(self) -> int:
# Priority is less than SignatureValidator
return 9Usage
In the following example, the Hypertensor Predicate Validator is used for a commit-reveal schema and general conditions to ensure only specific keys can be stored, when keys can be stored (based on epoch progression, i.e., the percentage completion of an epoch), and how long they can be stored for (maximum expiration).
Heartbeat
Nodes make periodic updates to the database to show they are still running the node with a maximum expiration time, and can only be stored under the "node" key up to 100 stores per epoch.
Commit-Reveal Scheme
Each commit-reveal key is specified on the current epoch. A node cannot copy other nodes after the commit phase, nodes cannot submit data on previous or future epochs, and nodes can only commit and reveal in the specific phases of an epoch.
Note
To use this in production, a Pydantic validator should be used to ensure peers are storing the correct values.
If a node attempts to store data that doesn't abide by these conditions, it will not be stored.
def get_mock_commit_key(epoch: int) -> str:
return f"commit_epoch_{epoch}"
def get_mock_reveal_key(epoch: int) -> str:
return f"reveal_epoch_{epoch}"
def get_mock_consensus_key(epoch: int) -> str:
return f"consensus_epoch_{epoch}"
"""
Store something by the 15% progress of the epoch
"""
CONSENSUS_STORE_DEADLINE = 0.15
# peer commit-reveal epoch percentage elapsed deadlines
COMMIT_DEADLINE = 0.5
REVEAL_DEADLINE = 0.6
"""
Expiration validations
Add expirations for each key stored
"""
MAX_HEART_BEAT_TIME = BLOCK_SECS * EPOCH_LENGTH * 1.1 # Max 1.1 epochs
MAX_CONSENSUS_TIME = BLOCK_SECS * EPOCH_LENGTH * 2 # Max 2 epochs
MAX_COMMIT_TIME = BLOCK_SECS * EPOCH_LENGTH * 2 # Max 2 epochs
MAX_REVEAL_TIME = BLOCK_SECS * EPOCH_LENGTH * 2 # Max 2 epochs
class MockHypertensorCommitReveal:
MAX_EPOCH_HISTORY = 5 # How many epochs to store peer epoch history before clean up
def __init__(self, hypertensor: Hypertensor, subnet_id: int):
self.hypertensor = hypertensor
self.subnet_id = subnet_id
# Store any data required for logic
self.slot: int | None = None
self._epoch_data: Optional[EpochData] = None
self._peer_store_tracker = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
# Stores per peer per epoch
self.per_peer_epoch_limits = {
"node": 100,
"consensus": 1,
"commit": 1,
"reveal": 1,
}
# Store `self.slot`
self._ensure_slot()
# Create any functions required for the logic
def _ensure_slot(self):
if self.slot is None:
subnet_info = self.hypertensor.get_formatted_subnet_info(self.subnet_id)
self.slot = subnet_info.slot_index
return self.slot
def epoch_data(self) -> EpochData:
return self.hypertensor.get_subnet_epoch_data(self._ensure_slot())
def _has_exceeded_store_limit(self, peer_id: str, key_type: str, epoch: int) -> bool:
"""Check if the peer has already hit their per-epoch limit for this key type."""
limit = self.per_peer_epoch_limits.get(key_type, 1)
count = self._peer_store_tracker[epoch][key_type][peer_id]
logger.info(f"Current node key count {count}")
if count >= limit:
logger.debug(
f"Peer {peer_id} exceeded store limit for {key_type} (epoch {epoch}, "
f"count={count}, limit={limit})"
)
return True
return False
def _record_peer_store(self, peer_id: str, key_type: str, epoch: int):
"""Increment peer store counter after a successful PUT."""
self._peer_store_tracker[epoch][key_type][peer_id] += 1
new_count = self._peer_store_tracker[epoch][key_type][peer_id]
logger.debug(
f"Recorded store for {peer_id} → {key_type} @ epoch {epoch} "
f"(new count={new_count})"
)
def _cleanup_old_epochs(self, current_epoch: int):
"""Remove records older than MAX_EPOCH_HISTORY epochs."""
old_epochs = [
e for e in self._peer_store_tracker.keys()
if e < current_epoch - self.MAX_EPOCH_HISTORY
]
for e in old_epochs:
del self._peer_store_tracker[e]
logger.debug(f"Cleaned up tracking data for old epoch {e}")
def _get_key_type(self, record: DHTRecord, current_epoch: int) -> Optional[int]:
"""
Create schemas here
You can use libraries like Pydantic to define schemas for keys, subkeys, and values
TODO: Persist this data in a local database for persistance on validator node restarts
"""
valid_keys = {
# Heartbeat
DHTID.generate(source="node").to_bytes(): "node",
# ⸺ 0-15%
DHTID.generate(source=f"consensus_epoch_{current_epoch}").to_bytes(): "consensus",
# ⸺ 15-50%
DHTID.generate(source=f"commit_epoch_{current_epoch}").to_bytes(): "commit",
# ⸺ 50-60%
DHTID.generate(source=f"reveal_epoch_{current_epoch}").to_bytes(): "reveal",
}
return valid_keys.get(record.key, None)
def __call__(self, record: DHTRecord, type: DHTRecordRequestType) -> bool:
"""
Callable interface
"""
try:
# Get caller peer ID
# This also ensures the record has a public key as a subkey
# NOTE: To use this `SignatureValidator` must be implemented with priority
caller_peer_id = extract_peer_id_from_record_validator(record.subkey)
if caller_peer_id is None:
return False
logger.info(f"caller_peer_id: {caller_peer_id}")
if type is DHTRecordRequestType.GET:
logger.debug(f"{caller_peer_id} requested GET")
return True
# Get `EpochData`
epoch_data = self.epoch_data()
current_epoch = epoch_data.epoch
percent_complete = epoch_data.percent_complete # Get progress of epoch for commit-reveal phases
logger.debug(f"{caller_peer_id} is storing data at slot {self.slot}, epoch={current_epoch}")
# Clean up old keys
self._cleanup_old_epochs(current_epoch)
# Get valid key type
key_type = self._get_key_type(record, current_epoch)
logger.info(f"key_type: {key_type}")
if key_type is None:
return False
# Verify peer store limit condition
if self._has_exceeded_store_limit(caller_peer_id, key_type, current_epoch):
return False
dht_time = get_dht_time()
"""
Logic here can be extended to account for any conditions, such as requiring only specific
on-chain node classifications to allow to store data into the DHT outside of the "node"
heartbeat, etc.
"""
# ───── DEADLINES AND EXPIRATIONS ─────
if key_type == "node":
max_expiration = dht_time + MAX_HEART_BEAT_TIME
if record.expiration_time > max_expiration:
return False
elif key_type == "consensus":
if percent_complete > CONSENSUS_STORE_DEADLINE:
return False
if record.expiration_time > dht_time + MAX_CONSENSUS_TIME:
return False
elif key_type == "commit":
if percent_complete <= CONSENSUS_STORE_DEADLINE or percent_complete > COMMIT_DEADLINE:
return False
if record.expiration_time > dht_time + MAX_COMMIT_TIME:
return False
elif key_type == "reveal":
if percent_complete <= COMMIT_DEADLINE or percent_complete > REVEAL_DEADLINE:
return False
if record.expiration_time > dht_time + MAX_REVEAL_TIME:
return False
self._record_peer_store(caller_peer_id, key_type, current_epoch)
return True
except Exception as e:
logger.warning(f"MockHypertensorCommitReveal error: {e}")
return False
Last updated