Predicate Validators

PredicateValidator

A general-purpose DHT validator that delegates all validation logic to a custom predicate.

This is a minimal validator that can enforce any condition on the entire DHTRecord. Useful for filtering keys, expiration time, value content, or any combination thereof.

The predicate validator can enforce a maximum expiration time for all records, functioning similarly to pruning mechanisms in blockchains. Specifically, by default in the mock predicate validator, each peer heartbeat has a default maximum expiration of one epoch. This ensures that heartbeat records expire after one epoch period, requiring peers to submit fresh heartbeats in each subsequent epoch to maintain their active status.

class PredicateValidator(RecordValidatorBase):
    """
    A general-purpose DHT validator that delegates all validation logic to a custom callable.

    This is a minimal validator that can enforce any condition on the entire DHTRecord.
    Useful for filtering keys, expiration time, value content, or any combination thereof.

    This can be used to ensure keys match a specific format, or nodes are doing something within a certain period
    of time in relation to the blockchain, i.e., ensuring a commit-reveal schema where the commit is submitted by the
    first half of the epoch and the reveal is done on the second half of the epoch.

    Attributes:
        record_predicate (Callable[[DHTRecord], bool]): A user-defined function that receives a record and returns True if valid.
    """

    def __init__(
        self,
        record_predicate: Callable[[DHTRecord], bool] = lambda r: True,
    ):
        self.record_predicate = record_predicate

    def validate(self, record: DHTRecord, type: DHTRequestType) -> bool:
        return self.record_predicate(record)

    def sign_value(self, record: DHTRecord) -> bytes:
        return record.value

    def strip_value(self, record: DHTRecord) -> bytes:
        return record.value

    def merge_with(self, other: RecordValidatorBase) -> bool:
        if not isinstance(other, PredicateValidator):
            return False

        # Ignore another KeyValidator instance (it doesn't make sense to have several
        # instances of this class) and report a successful merge
        return True

Hypertensor Predicate Validator

Similar to the Predicate Validator, but instead the callable takes in the current epoch data as:

@dataclass
class EpochData:
  block: int
  epoch: int
  block_per_epoch: int
  seconds_per_epoch: int
  percent_complete: float
  blocks_elapsed: int
  blocks_remaining: int
  seconds_elapsed: int
  seconds_remaining: int

  @staticmethod
  def zero(current_block: int, epoch_length: int) -> "EpochData":
    return EpochData(
      block=current_block,
      epoch=0,
      block_per_epoch=epoch_length,
      seconds_per_epoch=epoch_length * BLOCK_SECS,
      percent_complete=0.0,
      blocks_elapsed=0,
      blocks_remaining=epoch_length,
      seconds_elapsed=0,
      seconds_remaining=epoch_length * BLOCK_SECS
    )

This is useful for having conditions based on time, such as for commit-reveal schemes that should be synced with the Hypertensor blockchain clock.

class HypertensorSlotPredicateValidator(RecordValidatorBase):
    def __init__(
        self,
        hypertensor: Hypertensor,
        subnet_id: int,
        record_predicate: Callable[[DHTRecord, DHTRecordRequestType], bool] = lambda r: True,
        slot: Optional[int] = None
    ):
        self.record_predicate = record_predicate
        self.hypertensor = hypertensor
        self.subnet_id = subnet_id
        self.slot: int | None = slot

    def validate(self, record: DHTRecord, type: DHTRecordRequestType) -> bool:
        return self.record_predicate(record, type, self._epoch_data())

    def sign_value(self, record: DHTRecord) -> bytes:
        return record.value

    def strip_value(self, record: DHTRecord) -> bytes:
        return record.value

    def _epoch_data(self):
        # Get epoch data from the blockchain and calculate the remaining
        if self.slot is None:
            subnet_info = self.hypertensor.get_formatted_subnet_info(
                self.subnet_id
            )
            self.slot = subnet_info.slot_index
        return self.hypertensor.get_subnet_epoch_data(self.slot)

    def merge_with(self, other: RecordValidatorBase) -> bool:
        if not isinstance(other, HypertensorSlotPredicateValidator):
            return False

        # Ignore another KeyValidator instance (it doesn't make sense to have several
        # instances of this class) and report successful merge
        return True

Usage

In the following example, the Hypertensor Predicate Validator is used to ensure only specific keys can be stored, when keys can be stored (based on epoch progression, i.e., the percentage completion of an epoch), and how long they can be stored for (maximum expiration).

Heartbeat

Nodes make periodic updates to the database to show they are still running the node with a maximum expiration time, and can only be stored under the 2 roles, "hoster" and "validator".

Commit-Reveal Scheme

Each commit-reveal key is specified on the current epoch. A node cannot copy other nodes after the commit phase, and nodes cannot submit data on previous or future epochs.

Note

To use this in production, a Pydantic validator should be used to ensure peers are storing the correct values.

If a node attempts to store data that doesn't abide by these conditions, it will not be stored.

def get_mock_commit_key(epoch: int) -> str:
    return f"commit_epoch_{epoch}"

def get_mock_reveal_key(epoch: int) -> str:
    return f"reveal_epoch_{epoch}"

def get_mock_consensus_key(epoch: int) -> str:
    return f"consensus_epoch_{epoch}"

# Created At validations
# 0-15%
CONSENSUS_STORE_DEADLINE = 0.15

# node commit-reveal epoch percentage elapsed deadlines
COMMIT_DEADLINE = 0.5
REVEAL_DEADLINE = 0.6

# Expiration validations
MAX_HEART_BEAT_TIME = BLOCK_SECS * EPOCH_LENGTH
MAX_CONSENSUS_TIME = BLOCK_SECS * EPOCH_LENGTH
MAX_COMMIT_TIME = BLOCK_SECS * EPOCH_LENGTH
MAX_REVEAL_TIME = BLOCK_SECS * EPOCH_LENGTH

def mock_hypertensor_consensus_predicate() -> Callable[[DHTRecord, DHTRecordRequestType], bool]:
    def predicate(record: DHTRecord, type: DHTRecordRequestType, epoch_data: EpochData) -> bool:
        try:
            # Enable GET data at any time
            if type is DHTRecordRequestType.GET:
                return True

            current_epoch = epoch_data.epoch
            percent_complete = epoch_data.percent_complete

            # Ensure the keys are valid for the current allowable keys or epoch allowable keys
            valid_keys = {
                # Heartbeat
                DHTID.generate(source="role_name").to_bytes(): "role_name",
                # ⸺ 0-15%
                DHTID.generate(source=f"consensus_epoch_{current_epoch}").to_bytes(): "consensus",
                # ⸺ 15-50%
                DHTID.generate(source=f"commit_epoch_{current_epoch}").to_bytes(): "commit",
                # ⸺ 50-60%
                DHTID.generate(source=f"reveal_epoch_{current_epoch}").to_bytes(): "reveal",
            }

            key_type = valid_keys.get(record.key, None)

            if key_type is None:
                return False

            dht_time = get_dht_time()

            # ⸺ 0-100% (any time)
            if key_type == "role_name":
                max_expiration = dht_time + MAX_HEART_BEAT_TIME
                if record.expiration_time > max_expiration:
                    return False
                # TODO: validate proof-of-stake on each heartbeat
                return True

            # ⸺ 0-15%
            elif key_type == "consensus":
                # Must be submitted before the deadline
                if percent_complete > CONSENSUS_STORE_DEADLINE:
                    return False

                max_expiration = dht_time + MAX_CONSENSUS_TIME
                if record.expiration_time > max_expiration:
                    return False

                return True

            # ⸺ 15-50%
            elif key_type == "commit":
                max_expiration = dht_time + MAX_COMMIT_TIME
                if record.expiration_time > max_expiration:
                    return False
                if percent_complete <= CONSENSUS_STORE_DEADLINE or percent_complete > COMMIT_DEADLINE:
                    return False

                return True

            # ⸺ 50-60%
            elif key_type == "reveal":
                max_expiration = dht_time + MAX_REVEAL_TIME
                if record.expiration_time > max_expiration:
                    return False
                if percent_complete <= COMMIT_DEADLINE or percent_complete > REVEAL_DEADLINE:
                    return False
                return True

            # Key doesn't match any known schema
            # Don't store data
            return False
        except Exception as e:
            print(f"Predicate Err: {e}")
            return False

    return predicate

Last updated