Predicate Validators
PredicateValidator
A general-purpose DHT validator that delegates all validation logic to a custom predicate.
This is a minimal validator that can enforce any condition on the entire DHTRecord. Useful for filtering keys, expiration time, value content, or any combination thereof.
class PredicateValidator(RecordValidatorBase):
"""
A general-purpose DHT validator that delegates all validation logic to a custom callable.
This is a minimal validator that can enforce any condition on the entire DHTRecord.
Useful for filtering keys, expiration time, value content, or any combination thereof.
This can be used to ensure keys match a specific format, or nodes are doing something within a certain period
of time in relation to the blockchain, i.e., ensuring a commit-reveal schema where the commit is submitted by the
first half of the epoch and the reveal is done on the second half of the epoch.
Attributes:
record_predicate (Callable[[DHTRecord], bool]): A user-defined function that receives a record and returns True if valid.
"""
def __init__(
self,
record_predicate: Callable[[DHTRecord], bool] = lambda r: True,
):
self.record_predicate = record_predicate
def validate(self, record: DHTRecord, type: DHTRequestType) -> bool:
return self.record_predicate(record)
def sign_value(self, record: DHTRecord) -> bytes:
return record.value
def strip_value(self, record: DHTRecord) -> bytes:
return record.value
def merge_with(self, other: RecordValidatorBase) -> bool:
if not isinstance(other, PredicateValidator):
return False
# Ignore another KeyValidator instance (it doesn't make sense to have several
# instances of this class) and report successful merge
return True
Hypertensor Predicate Validator
Similar to the Predicate Validator, but instead the callable takes in the current epoch data as:
@dataclass
class EpochData:
block: int
epoch: int
block_per_epoch: int
seconds_per_epoch: int
percent_complete: float
blocks_elapsed: int
blocks_remaining: int
seconds_elapsed: int
seconds_remaining: int
This is useful for having conditions based on time, such as for commit-reveal schemes that should be synced with the Hypertensor blockchain clock.
class HypertensorPredicateValidator(RecordValidatorBase):
"""
A general-purpose DHT validator that delegates all validation logic to a custom callable
that takes in an epoch data function.
This is a minimal validator that can enforce any condition on the entire DHTRecord.
Useful for filtering keys, expiration time, value content, or any combination thereof.
This can be used to ensure keys match a specific format, or nodes are doing something within a certain period
of time in relation to the blockchain, i.e., ensuring a commit-reveal schema where the commit is submitted by the
first half of the epoch and the reveal is done on the second half of the epoch.
Attributes:
record_predicate (Callable[[DHTRecord], bool]): A user-defined function that receives a record and returns True if valid.
"""
def __init__(
self,
hypertensor: Hypertensor,
record_predicate: Callable[[DHTRecord, DHTRequestType], bool] = lambda r: True
):
self.record_predicate = record_predicate
self.hypertensor = hypertensor
def validate(self, record: DHTRecord, type: DHTRequestType) -> bool:
return self.record_predicate(record, type, self._epoch_data())
def sign_value(self, record: DHTRecord) -> bytes:
return record.value
def strip_value(self, record: DHTRecord) -> bytes:
return record.value
def _epoch_data(self):
# Get epoch data from the blockchain and calculate the remaining
return self.hypertensor.get_epoch_progress()
def merge_with(self, other: RecordValidatorBase) -> bool:
if not isinstance(other, HypertensorPredicateValidator):
return False
# Ignore another KeyValidator instance (it doesn't make sense to have several
# instances of this class) and report a successful merge
return True
Usage
In the following example, the Hypertensor Predicate Validator is used to ensure only specific keys can be stored, when the keys can be stored (based on epoch progression), and how long they can be stored for (maximum expiration).
Heartbeat
Nodes make periodic updates to the database to show they are still running the node with a maximum expiration time, and can only be stored under the 2 roles, "hoster" and "validator".
Commit-Reveal Scheme
Each commit-reveal key is specified on the current epoch. A node cannot copy other nodes after the commit phase, and nodes cannot submit data on previous or future epochs.
Note
To use this in production, a Pydantic validator should be used to ensure peers are storing the correct values.
If a node attempts to store data that doesn't abide by these conditions, it will not be stored.
def get_mock_commit_key(epoch: int) -> str:
return f"commit_epoch_{epoch}"
def get_mock_reveal_key(epoch: int) -> str:
return f"reveal_epoch_{epoch}"
def get_mock_consensus_key(epoch: int) -> str:
return f"consensus_epoch_{epoch}"
# Created At validations
# 0-15%
CONSENSUS_STORE_DEADLINE = 0.15
# node commit-reveal epoch percentage elapsed deadlines
COMMIT_DEADLINE = 0.5
REVEAL_DEADLINE = 0.6
# Expiration validations
MAX_HEART_BEAT_TIME = BLOCK_SECS * EPOCH_LENGTH
MAX_CONSENSUS_TIME = BLOCK_SECS * EPOCH_LENGTH
MAX_COMMIT_TIME = BLOCK_SECS * EPOCH_LENGTH
MAX_REVEAL_TIME = BLOCK_SECS * EPOCH_LENGTH
def mock_hypertensor_consensus_predicate() -> Callable[[DHTRecord, DHTRecordRequestType], bool]:
def predicate(record: DHTRecord, type: DHTRecordRequestType, epoch_data: EpochData) -> bool:
try:
# Enable GET data at any time
if type is DHTRecordRequestType.GET:
return True
current_epoch = epoch_data.epoch
percent_complete = epoch_data.percent_complete
# Ensure the keys are valid for the current allowable keys or epoch allowable keys
valid_keys = {
# Heartbeat
DHTID.generate(source="role_name").to_bytes(): "role_name",
# ⸺ 0-15%
DHTID.generate(source=f"consensus_epoch_{current_epoch}").to_bytes(): "consensus",
# ⸺ 15-50%
DHTID.generate(source=f"commit_epoch_{current_epoch}").to_bytes(): "commit",
# ⸺ 50-60%
DHTID.generate(source=f"reveal_epoch_{current_epoch}").to_bytes(): "reveal",
}
key_type = valid_keys.get(record.key, None)
if key_type is None:
return False
dht_time = get_dht_time()
# ⸺ 0-100% (any time)
if key_type == "role_name":
max_expiration = dht_time + MAX_HEART_BEAT_TIME
if record.expiration_time > max_expiration:
return False
# TODO: validate proof-of-stake on each heartbeat
return True
# ⸺ 0-15%
elif key_type == "consensus":
# Must be submitted before deadline
if percent_complete > CONSENSUS_STORE_DEADLINE:
return False
max_expiration = dht_time + MAX_CONSENSUS_TIME
if record.expiration_time > max_expiration:
return False
return True
# ⸺ 15-50%
elif key_type == "commit":
max_expiration = dht_time + MAX_COMMIT_TIME
if record.expiration_time > max_expiration:
return False
if percent_complete <= CONSENSUS_STORE_DEADLINE or percent_complete > COMMIT_DEADLINE:
return False
return True
# ⸺ 50-60%
elif key_type == "reveal":
max_expiration = dht_time + MAX_REVEAL_TIME
if record.expiration_time > max_expiration:
return False
if percent_complete <= COMMIT_DEADLINE or percent_complete > REVEAL_DEADLINE:
return False
return True
return False # Key doesn't match any known schema
except Exception as e:
print(f"Predicate Err: {e}")
return False
return predicate
Last updated