Predicate Validators

PredicateValidator

A general-purpose DHT validator that delegates all validation logic to a custom predicate.

This is a minimal validator that can enforce any condition on the entire DHTRecord. Useful for filtering keys, expiration time, value content, or any combination thereof.

class PredicateValidator(RecordValidatorBase):
    """
    A general-purpose DHT validator that delegates all validation logic to a custom callable.

    This is a minimal validator that can enforce any condition on the entire DHTRecord.
    Useful for filtering keys, expiration time, value content, or any combination thereof.

    This can be used to ensure keys match a specific format, or nodes are doing something within a certain period
    of time in relation to the blockchain, i.e., ensuring a commit-reveal schema where the commit is submitted by the
    first half of the epoch and the reveal is done on the second half of the epoch.

    Attributes:
        record_predicate (Callable[[DHTRecord], bool]): A user-defined function that receives a record and returns True if valid.
    """

    def __init__(
        self,
        record_predicate: Callable[[DHTRecord], bool] = lambda r: True,
    ):
        self.record_predicate = record_predicate

    def validate(self, record: DHTRecord, type: DHTRequestType) -> bool:
        return self.record_predicate(record)

    def sign_value(self, record: DHTRecord) -> bytes:
        return record.value

    def strip_value(self, record: DHTRecord) -> bytes:
        return record.value

    def merge_with(self, other: RecordValidatorBase) -> bool:
        if not isinstance(other, PredicateValidator):
            return False

        # Ignore another KeyValidator instance (it doesn't make sense to have several
        # instances of this class) and report successful merge
        return True

Hypertensor Predicate Validator

Similar to the Predicate Validator, but instead the callable takes in the current epoch data as:

@dataclass
class EpochData:
    block: int
    epoch: int
    block_per_epoch: int
    seconds_per_epoch: int
    percent_complete: float
    blocks_elapsed: int
    blocks_remaining: int
    seconds_elapsed: int
    seconds_remaining: int

This is useful for having conditions based on time, such as for commit-reveal schemes that should be synced with the Hypertensor blockchain clock.

class HypertensorPredicateValidator(RecordValidatorBase):
    """
    A general-purpose DHT validator that delegates all validation logic to a custom callable
    that takes in an epoch data function.

    This is a minimal validator that can enforce any condition on the entire DHTRecord.
    Useful for filtering keys, expiration time, value content, or any combination thereof.

    This can be used to ensure keys match a specific format, or nodes are doing something within a certain period
    of time in relation to the blockchain, i.e., ensuring a commit-reveal schema where the commit is submitted by the
    first half of the epoch and the reveal is done on the second half of the epoch.

    Attributes:
        record_predicate (Callable[[DHTRecord], bool]): A user-defined function that receives a record and returns True if valid.
    """

    def __init__(
        self,
        hypertensor: Hypertensor,
        record_predicate: Callable[[DHTRecord, DHTRequestType], bool] = lambda r: True
    ):
        self.record_predicate = record_predicate
        self.hypertensor = hypertensor

    def validate(self, record: DHTRecord, type: DHTRequestType) -> bool:
        return self.record_predicate(record, type, self._epoch_data())

    def sign_value(self, record: DHTRecord) -> bytes:
        return record.value

    def strip_value(self, record: DHTRecord) -> bytes:
        return record.value

    def _epoch_data(self):
        # Get epoch data from the blockchain and calculate the remaining
        return self.hypertensor.get_epoch_progress()

    def merge_with(self, other: RecordValidatorBase) -> bool:
        if not isinstance(other, HypertensorPredicateValidator):
            return False

        # Ignore another KeyValidator instance (it doesn't make sense to have several
        # instances of this class) and report a successful merge
        return True

Usage

In the following example, the Hypertensor Predicate Validator is used to ensure only specific keys can be stored, when the keys can be stored (based on epoch progression), and how long they can be stored for (maximum expiration).

Heartbeat

Nodes make periodic updates to the database to show they are still running the node with a maximum expiration time, and can only be stored under the 2 roles, "hoster" and "validator".

Commit-Reveal Scheme

Each commit-reveal key is specified on the current epoch. A node cannot copy other nodes after the commit phase, and nodes cannot submit data on previous or future epochs.

Note

To use this in production, a Pydantic validator should be used to ensure peers are storing the correct values.

If a node attempts to store data that doesn't abide by these conditions, it will not be stored.

def get_mock_commit_key(epoch: int) -> str:
    return f"commit_epoch_{epoch}"

def get_mock_reveal_key(epoch: int) -> str:
    return f"reveal_epoch_{epoch}"

def get_mock_consensus_key(epoch: int) -> str:
    return f"consensus_epoch_{epoch}"

# Created At validations
# 0-15%
CONSENSUS_STORE_DEADLINE = 0.15

# node commit-reveal epoch percentage elapsed deadlines
COMMIT_DEADLINE = 0.5
REVEAL_DEADLINE = 0.6

# Expiration validations
MAX_HEART_BEAT_TIME = BLOCK_SECS * EPOCH_LENGTH
MAX_CONSENSUS_TIME = BLOCK_SECS * EPOCH_LENGTH
MAX_COMMIT_TIME = BLOCK_SECS * EPOCH_LENGTH
MAX_REVEAL_TIME = BLOCK_SECS * EPOCH_LENGTH

def mock_hypertensor_consensus_predicate() -> Callable[[DHTRecord, DHTRecordRequestType], bool]:
    def predicate(record: DHTRecord, type: DHTRecordRequestType, epoch_data: EpochData) -> bool:
        try:
            # Enable GET data at any time
            if type is DHTRecordRequestType.GET:
                return True

            current_epoch = epoch_data.epoch
            percent_complete = epoch_data.percent_complete

            # Ensure the keys are valid for the current allowable keys or epoch allowable keys
            valid_keys = {
                # Heartbeat
                DHTID.generate(source="role_name").to_bytes(): "role_name",
                # ⸺ 0-15%
                DHTID.generate(source=f"consensus_epoch_{current_epoch}").to_bytes(): "consensus",
                # ⸺ 15-50%
                DHTID.generate(source=f"commit_epoch_{current_epoch}").to_bytes(): "commit",
                # ⸺ 50-60%
                DHTID.generate(source=f"reveal_epoch_{current_epoch}").to_bytes(): "reveal",
            }

            key_type = valid_keys.get(record.key, None)

            if key_type is None:
                return False

            dht_time = get_dht_time()

            # ⸺ 0-100% (any time)
            if key_type == "role_name":
                max_expiration = dht_time + MAX_HEART_BEAT_TIME
                if record.expiration_time > max_expiration:
                    return False
                # TODO: validate proof-of-stake on each heartbeat
                return True

            # ⸺ 0-15%
            elif key_type == "consensus":
                # Must be submitted before deadline
                if percent_complete > CONSENSUS_STORE_DEADLINE:
                    return False

                max_expiration = dht_time + MAX_CONSENSUS_TIME
                if record.expiration_time > max_expiration:
                    return False

                return True

            # ⸺ 15-50%
            elif key_type == "commit":
                max_expiration = dht_time + MAX_COMMIT_TIME
                if record.expiration_time > max_expiration:
                    return False
                if percent_complete <= CONSENSUS_STORE_DEADLINE or percent_complete > COMMIT_DEADLINE:
                    return False

                return True

            # ⸺ 50-60%
            elif key_type == "reveal":
                max_expiration = dht_time + MAX_REVEAL_TIME
                if record.expiration_time > max_expiration:
                    return False
                if percent_complete <= COMMIT_DEADLINE or percent_complete > REVEAL_DEADLINE:
                    return False
                return True

            return False  # Key doesn't match any known schema
        except Exception as e:
            print(f"Predicate Err: {e}")
            return False

    return predicate

Last updated